From 19fec5d0b042ff42a96e8bb7c05308c1c24c30f5 Mon Sep 17 00:00:00 2001 From: GLYASAI Date: Thu, 5 Aug 2021 11:48:31 +0800 Subject: [PATCH] static endpoints discover --- config/crd/rainbond.io_thirdcomponents.yaml | 24 +++------- db/model/third_party_service.go | 8 ++-- pkg/apis/rainbond/v1alpha1/third_component.go | 9 +++- .../v1alpha1/zz_generated.deepcopy.go | 10 +--- .../component_properties.go | 9 +--- .../componentdefinition.go | 22 ++++----- .../thirdcomponentdefinition.go | 6 +-- .../controller/thirdcomponent/controller.go | 20 ++++++-- .../thirdcomponent/{ => discover}/discover.go | 13 +++++- .../thirdcomponent/discover/staticendpoint.go | 46 +++++++++++++++++++ .../thirdcomponent/discover_pool.go | 9 ++-- 11 files changed, 110 insertions(+), 66 deletions(-) rename worker/master/controller/thirdcomponent/{ => discover}/discover.go (91%) create mode 100644 worker/master/controller/thirdcomponent/discover/staticendpoint.go diff --git a/config/crd/rainbond.io_thirdcomponents.yaml b/config/crd/rainbond.io_thirdcomponents.yaml index ad28f100f..bdf217389 100644 --- a/config/crd/rainbond.io_thirdcomponents.yaml +++ b/config/crd/rainbond.io_thirdcomponents.yaml @@ -19,7 +19,7 @@ spec: status: {} validation: openAPIV3Schema: - description: HelmApp - + description: ThirdComponent - properties: apiVersion: description: 'APIVersion defines the versioned schema of this representation @@ -34,29 +34,15 @@ spec: metadata: type: object spec: + description: ThirdComponentSpec - properties: endpointSource: description: endpoint source config properties: endpoints: - items: - properties: - address: - description: The address including the port number. - type: string - clientSecret: - description: Specify a private certificate when the protocol - is HTTPS - type: string - protocol: - description: 'Address protocols, including: HTTP, TCP, UDP, - HTTPS' - type: string - required: - - address - type: object - type: array + type: boolean kubernetesService: + description: KubernetesServiceSource - properties: name: type: string @@ -127,6 +113,7 @@ spec: - ports type: object status: + description: ThirdComponentStatus - properties: endpoints: items: @@ -187,6 +174,7 @@ spec: type: object type: array phase: + description: ComponentPhase - type: string reason: type: string diff --git a/db/model/third_party_service.go b/db/model/third_party_service.go index ed460233b..f379f1208 100644 --- a/db/model/third_party_service.go +++ b/db/model/third_party_service.go @@ -18,7 +18,9 @@ package model -import "fmt" +import ( + "fmt" +) // Endpoint is a persistent object for table 3rd_party_svc_endpoints. type Endpoint struct { @@ -36,8 +38,8 @@ func (Endpoint) TableName() string { return "tenant_service_3rd_party_endpoints" } -// Address - -func (e *Endpoint) Address() string { +// GetAddress - +func (e *Endpoint) GetAddress() string { if e.Port == 0 { return e.IP } diff --git a/pkg/apis/rainbond/v1alpha1/third_component.go b/pkg/apis/rainbond/v1alpha1/third_component.go index bcaa1b1dc..082bb9fb6 100644 --- a/pkg/apis/rainbond/v1alpha1/third_component.go +++ b/pkg/apis/rainbond/v1alpha1/third_component.go @@ -46,6 +46,11 @@ type ThirdComponent struct { Status ThirdComponentStatus `json:"status,omitempty"` } +// GetComponentID - +func (in *ThirdComponent) GetComponentID() string { + return in.Name +} + // +kubebuilder:object:root=true // ThirdComponentList contains a list of ThirdComponent @@ -68,8 +73,8 @@ type ThirdComponentSpec struct { // ThirdComponentEndpointSource - type ThirdComponentEndpointSource struct { - StaticEndpoints []*ThirdComponentEndpoint `json:"endpoints,omitempty"` - KubernetesService *KubernetesServiceSource `json:"kubernetesService,omitempty"` + StaticEndpoints *bool `json:"endpoints,omitempty"` + KubernetesService *KubernetesServiceSource `json:"kubernetesService,omitempty"` //other source // NacosSource // EurekaSource diff --git a/pkg/apis/rainbond/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/rainbond/v1alpha1/zz_generated.deepcopy.go index 3beafa6dc..06b7c0e96 100644 --- a/pkg/apis/rainbond/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/rainbond/v1alpha1/zz_generated.deepcopy.go @@ -460,14 +460,8 @@ func (in *ThirdComponentEndpointSource) DeepCopyInto(out *ThirdComponentEndpoint *out = *in if in.StaticEndpoints != nil { in, out := &in.StaticEndpoints, &out.StaticEndpoints - *out = make([]*ThirdComponentEndpoint, len(*in)) - for i := range *in { - if (*in)[i] != nil { - in, out := &(*in)[i], &(*out)[i] - *out = new(ThirdComponentEndpoint) - **out = **in - } - } + *out = new(bool) + **out = **in } if in.KubernetesService != nil { in, out := &in.KubernetesService, &out.KubernetesService diff --git a/worker/appm/componentdefinition/component_properties.go b/worker/appm/componentdefinition/component_properties.go index c8becaeb2..11b6c8b74 100644 --- a/worker/appm/componentdefinition/component_properties.go +++ b/worker/appm/componentdefinition/component_properties.go @@ -21,7 +21,7 @@ package componentdefinition //ThirdComponentProperties third component properties type ThirdComponentProperties struct { Kubernetes *ThirdComponentKubernetes `json:"kubernetes,omitempty"` - Endpoints []*ThirdComponentEndpoint `json:"endpoints,omitempty"` + Endpoints *bool `json:"endpoints,omitempty"` Port []*ThirdComponentPort `json:"port"` } @@ -38,10 +38,3 @@ type ThirdComponentKubernetes struct { Name string `json:"name"` Namespace string `json:"namespace"` } - -// ThirdComponentEndpoint - -type ThirdComponentEndpoint struct { - Address string `json:"address"` - Protocol string `json:"protocol,omitempty"` - ClientSecret string `json:"clientSecret,omitempty"` -} diff --git a/worker/appm/componentdefinition/componentdefinition.go b/worker/appm/componentdefinition/componentdefinition.go index 687c1ed04..d2f8d3ef6 100644 --- a/worker/appm/componentdefinition/componentdefinition.go +++ b/worker/appm/componentdefinition/componentdefinition.go @@ -28,13 +28,16 @@ import ( dbmodel "github.com/goodrain/rainbond/db/model" "github.com/goodrain/rainbond/pkg/apis/rainbond/v1alpha1" rainbondversioned "github.com/goodrain/rainbond/pkg/generated/clientset/versioned" + "github.com/goodrain/rainbond/util/commonutil" v1 "github.com/goodrain/rainbond/worker/appm/types/v1" "github.com/sirupsen/logrus" + k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // ErrNotSupport - var ErrNotSupport = fmt.Errorf("not support component definition") + // ErrOnlyCUESupport - var ErrOnlyCUESupport = fmt.Errorf("component definition only support cue template") @@ -124,11 +127,11 @@ func (c *ComponentDefinitionBuilder) GetComponentProperties(as *v1.AppService, d } // static endpoints - endpoints, err := c.listStaticEndpoints(as.ServiceID) + containsEndpoints, err := c.containsEndpoints(as.ServiceID) if err != nil { c.logger.Errorf("component id: %s; list static endpoints: %v", as.ServiceID, err) } - properties.Endpoints = endpoints + properties.Endpoints = commonutil.Bool(containsEndpoints) ports, err := dbm.TenantServicesPortDao().GetPortsByServiceID(as.ServiceID) if err != nil { @@ -152,19 +155,12 @@ func (c *ComponentDefinitionBuilder) GetComponentProperties(as *v1.AppService, d } } -func (c *ComponentDefinitionBuilder) listStaticEndpoints(componentID string) ([]*ThirdComponentEndpoint, error) { +func (c *ComponentDefinitionBuilder) containsEndpoints(componentID string) (bool, error) { endpoints, err := db.GetManager().EndpointsDao().List(componentID) if err != nil { - return nil, err + return false, err } - - var res []*ThirdComponentEndpoint - for _, ep := range endpoints { - res = append(res, &ThirdComponentEndpoint{ - Address: ep.Address(), - }) - } - return res, nil + return len(endpoints) > 0, nil } // BuildWorkloadResource - @@ -196,7 +192,7 @@ func (c *ComponentDefinitionBuilder) InitCoreComponentDefinition(rainbondClient for _, ccd := range coreComponentDefinition { if c.GetComponentDefinition(ccd.Name) == nil { logrus.Infof("create core componentdefinition %s", ccd.Name) - if _, err := rainbondClient.RainbondV1alpha1().ComponentDefinitions(c.namespace).Create(context.Background(), ccd, metav1.CreateOptions{}); err != nil { + if _, err := rainbondClient.RainbondV1alpha1().ComponentDefinitions(c.namespace).Create(context.Background(), ccd, metav1.CreateOptions{}); err != nil && !k8sErrors.IsNotFound(err) { logrus.Errorf("create core componentdefinition %s failire %s", ccd.Name, err.Error()) } } diff --git a/worker/appm/componentdefinition/thirdcomponentdefinition.go b/worker/appm/componentdefinition/thirdcomponentdefinition.go index f99569daa..cf9ded35b 100644 --- a/worker/appm/componentdefinition/thirdcomponentdefinition.go +++ b/worker/appm/componentdefinition/thirdcomponentdefinition.go @@ -55,11 +55,7 @@ parameter: { namespace?: string name: string } - endpoints?: [...{ - address: string - protocol?: string - clientSecret?: string - }] + endpoints?: bool port?: [...{ name: string port: >0 & <=65533 diff --git a/worker/master/controller/thirdcomponent/controller.go b/worker/master/controller/thirdcomponent/controller.go index 84bbcb60d..78ecb22e5 100644 --- a/worker/master/controller/thirdcomponent/controller.go +++ b/worker/master/controller/thirdcomponent/controller.go @@ -24,7 +24,10 @@ import ( "time" "github.com/goodrain/rainbond/pkg/apis/rainbond/v1alpha1" + rainbondlistersv1alpha1 "github.com/goodrain/rainbond/pkg/generated/listers/rainbond/v1alpha1" + dis "github.com/goodrain/rainbond/worker/master/controller/thirdcomponent/discover" "github.com/oam-dev/kubevela/pkg/utils/apply" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" @@ -34,8 +37,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" + runtimecache "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -52,6 +57,9 @@ type Reconciler struct { applyer apply.Applicator discoverPool *DiscoverPool discoverNum prometheus.Gauge + + informer runtimecache.Informer + lister rainbondlistersv1alpha1.ThirdComponentLister } // Reconcile is the main logic of appDeployment controller @@ -82,7 +90,7 @@ func (r *Reconciler) Reconcile(req ctrl.Request) (res reconcile.Result, retErr e return ctrl.Result{}, nil } logrus.Debugf("start to reconcile component %s/%s", component.Namespace, component.Name) - discover, err := NewDiscover(component, r.restConfig) + discover, err := dis.NewDiscover(component, r.restConfig, r.lister) if err != nil { component.Status.Phase = v1alpha1.ComponentFailed component.Status.Reason = err.Error() @@ -283,17 +291,23 @@ func (r *Reconciler) Collect(ch chan<- prometheus.Metric) { // Setup adds a controller that reconciles AppDeployment. func Setup(ctx context.Context, mgr ctrl.Manager) (*Reconciler, error) { - applyer := apply.NewAPIApplicator(mgr.GetClient()) + informer, err := mgr.GetCache().GetInformerForKind(ctx, v1alpha1.SchemeGroupVersion.WithKind("ThirdComponent")) + if err != nil { + return nil, errors.WithMessage(err, "get informer for thirdcomponent") + } + lister := rainbondlistersv1alpha1.NewThirdComponentLister(informer.(cache.SharedIndexInformer).GetIndexer()) + r := &Reconciler{ Client: mgr.GetClient(), restConfig: mgr.GetConfig(), Scheme: mgr.GetScheme(), - applyer: applyer, + applyer: apply.NewAPIApplicator(mgr.GetClient()), discoverNum: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "controller", Name: "third_component_discover_number", Help: "Number of running endpoint discover worker of third component.", }), + lister: lister, } dp := NewDiscoverPool(ctx, r) r.discoverPool = dp diff --git a/worker/master/controller/thirdcomponent/discover.go b/worker/master/controller/thirdcomponent/discover/discover.go similarity index 91% rename from worker/master/controller/thirdcomponent/discover.go rename to worker/master/controller/thirdcomponent/discover/discover.go index 80b36b1d1..37cbfc85f 100644 --- a/worker/master/controller/thirdcomponent/discover.go +++ b/worker/master/controller/thirdcomponent/discover/discover.go @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -package thirdcomponent +package discover import ( "context" @@ -24,6 +24,8 @@ import ( "time" "github.com/goodrain/rainbond/pkg/apis/rainbond/v1alpha1" + rainbondlistersv1alpha1 "github.com/goodrain/rainbond/pkg/generated/listers/rainbond/v1alpha1" + "github.com/goodrain/rainbond/util/commonutil" "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,7 +40,7 @@ type Discover interface { Discover(ctx context.Context, update chan *v1alpha1.ThirdComponent) ([]*v1alpha1.ThirdComponentEndpointStatus, error) } -func NewDiscover(component *v1alpha1.ThirdComponent, restConfig *rest.Config) (Discover, error) { +func NewDiscover(component *v1alpha1.ThirdComponent, restConfig *rest.Config, lister rainbondlistersv1alpha1.ThirdComponentLister) (Discover, error) { if component.Spec.EndpointSource.KubernetesService != nil { clientset, err := kubernetes.NewForConfig(restConfig) if err != nil { @@ -50,6 +52,12 @@ func NewDiscover(component *v1alpha1.ThirdComponent, restConfig *rest.Config) (D client: clientset, }, nil } + if commonutil.BoolValue(component.Spec.EndpointSource.StaticEndpoints) { + return &staticEndpoint{ + component: component, + lister: lister, + }, nil + } return nil, fmt.Errorf("not support source type") } @@ -86,6 +94,7 @@ func (k *kubernetesDiscover) Discover(ctx context.Context, update chan *v1alpha1 case <-ctx.Done(): return nil, nil case <-re.ResultChan(): + logrus.Infof("endpoint(%s/%s) changed", namespace, service.Name) func() { ctx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() diff --git a/worker/master/controller/thirdcomponent/discover/staticendpoint.go b/worker/master/controller/thirdcomponent/discover/staticendpoint.go new file mode 100644 index 000000000..af12ab7be --- /dev/null +++ b/worker/master/controller/thirdcomponent/discover/staticendpoint.go @@ -0,0 +1,46 @@ +package discover + +import ( + "context" + + "github.com/goodrain/rainbond/db" + "github.com/goodrain/rainbond/pkg/apis/rainbond/v1alpha1" + rainbondlistersv1alpha1 "github.com/goodrain/rainbond/pkg/generated/listers/rainbond/v1alpha1" + "github.com/pkg/errors" +) + +type staticEndpoint struct { + lister rainbondlistersv1alpha1.ThirdComponentLister + component *v1alpha1.ThirdComponent +} + +func (s *staticEndpoint) GetComponent() *v1alpha1.ThirdComponent { + return s.component +} + +func (s *staticEndpoint) Discover(ctx context.Context, update chan *v1alpha1.ThirdComponent) ([]*v1alpha1.ThirdComponentEndpointStatus, error) { + return nil, nil +} + +func (s *staticEndpoint) DiscoverOne(ctx context.Context) ([]*v1alpha1.ThirdComponentEndpointStatus, error) { + // Optimization: reduce the press of database if necessary. + endpoints, err := db.GetManager().EndpointsDao().ListIsOnline(s.component.GetComponentID()) + if err != nil { + return nil, errors.WithMessage(err, "list online static endpoints") + } + + var res []*v1alpha1.ThirdComponentEndpointStatus + for _, ep := range endpoints { + ed := v1alpha1.NewEndpointAddress(ep.IP, ep.Port) + if ed == nil { + continue + } + res = append(res, &v1alpha1.ThirdComponentEndpointStatus{ + ServicePort: ep.Port, + Address: v1alpha1.EndpointAddress(ep.GetAddress()), + Status: v1alpha1.EndpointReady, + }) + } + + return res, nil +} diff --git a/worker/master/controller/thirdcomponent/discover_pool.go b/worker/master/controller/thirdcomponent/discover_pool.go index 33c2a0391..8ada251e4 100644 --- a/worker/master/controller/thirdcomponent/discover_pool.go +++ b/worker/master/controller/thirdcomponent/discover_pool.go @@ -25,6 +25,7 @@ import ( "time" "github.com/goodrain/rainbond/pkg/apis/rainbond/v1alpha1" + dis "github.com/goodrain/rainbond/worker/master/controller/thirdcomponent/discover" "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" @@ -86,7 +87,7 @@ func (d *DiscoverPool) Start() { } type Worker struct { - discover Discover + discover dis.Discover cancel context.CancelFunc ctx context.Context updateChan chan *v1alpha1.ThirdComponent @@ -110,7 +111,7 @@ func (w *Worker) Start() { } } -func (w *Worker) UpdateDiscover(discover Discover) { +func (w *Worker) UpdateDiscover(discover dis.Discover) { w.discover = discover } @@ -122,7 +123,7 @@ func (w *Worker) IsStop() bool { return w.stoped } -func (d *DiscoverPool) newWorker(dis Discover) *Worker { +func (d *DiscoverPool) newWorker(dis dis.Discover) *Worker { ctx, cancel := context.WithCancel(d.ctx) return &Worker{ ctx: ctx, @@ -132,7 +133,7 @@ func (d *DiscoverPool) newWorker(dis Discover) *Worker { } } -func (d *DiscoverPool) AddDiscover(dis Discover) { +func (d *DiscoverPool) AddDiscover(dis dis.Discover) { d.lock.Lock() defer d.lock.Unlock() component := dis.GetComponent()