From cc4f3d5c49e6dc416e8bea23efe956d83202f420 Mon Sep 17 00:00:00 2001 From: GLYASAI Date: Thu, 5 Aug 2021 13:35:22 +0800 Subject: [PATCH] delete deprecated thirdcomponent --- cmd/worker/server/server.go | 12 +- worker/appm/appm.go | 73 ---- worker/appm/prober/prober.go | 294 ------------- worker/appm/prober/prober_test.go | 8 - worker/appm/store/store.go | 145 +------ worker/appm/thirdparty/thirdparty.go | 606 --------------------------- worker/discover/manager.go | 6 +- worker/handle/manager.go | 40 +- 8 files changed, 9 insertions(+), 1175 deletions(-) delete mode 100644 worker/appm/appm.go delete mode 100644 worker/appm/prober/prober.go delete mode 100644 worker/appm/prober/prober_test.go delete mode 100644 worker/appm/thirdparty/thirdparty.go diff --git a/cmd/worker/server/server.go b/cmd/worker/server/server.go index b67481150..d77db8988 100644 --- a/cmd/worker/server/server.go +++ b/cmd/worker/server/server.go @@ -32,7 +32,6 @@ import ( "github.com/goodrain/rainbond/pkg/generated/clientset/versioned" etcdutil "github.com/goodrain/rainbond/util/etcd" k8sutil "github.com/goodrain/rainbond/util/k8s" - "github.com/goodrain/rainbond/worker/appm" "github.com/goodrain/rainbond/worker/appm/componentdefinition" "github.com/goodrain/rainbond/worker/appm/controller" "github.com/goodrain/rainbond/worker/appm/store" @@ -98,15 +97,8 @@ func Run(s *option.Worker) error { componentdefinition.NewComponentDefinitionBuilder(s.Config.RBDNamespace) //step 4: create component resource store - startCh := channels.NewRingChannel(1024) updateCh := channels.NewRingChannel(1024) - probeCh := channels.NewRingChannel(1024) - cachestore := store.NewStore(restConfig, clientset, rainbondClient, db.GetManager(), s.Config, startCh, probeCh) - appmController := appm.NewAPPMController(clientset, cachestore, startCh, updateCh, probeCh) - if err := appmController.Start(); err != nil { - logrus.Errorf("error starting appm controller: %v", err) - } - defer appmController.Stop() + cachestore := store.NewStore(restConfig, clientset, rainbondClient, db.GetManager(), s.Config) if err := cachestore.Start(); err != nil { logrus.Error("start kube cache store error", err) return err @@ -128,7 +120,7 @@ func Run(s *option.Worker) error { //step 7 : create discover module garbageCollector := gc.NewGarbageCollector(clientset) - taskManager := discover.NewTaskManager(s.Config, cachestore, controllerManager, garbageCollector, startCh) + taskManager := discover.NewTaskManager(s.Config, cachestore, controllerManager, garbageCollector) if err := taskManager.Start(); err != nil { return err } diff --git a/worker/appm/appm.go b/worker/appm/appm.go deleted file mode 100644 index 3f9525009..000000000 --- a/worker/appm/appm.go +++ /dev/null @@ -1,73 +0,0 @@ -// RAINBOND, Application Management Platform -// Copyright (C) 2014-2017 Goodrain Co., Ltd. - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. For any non-GPL usage of Rainbond, -// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. -// must be obtained first. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package appm - -import ( - "github.com/eapache/channels" - "github.com/goodrain/rainbond/worker/appm/prober" - "github.com/goodrain/rainbond/worker/appm/store" - "github.com/goodrain/rainbond/worker/appm/thirdparty" - "github.com/sirupsen/logrus" - "k8s.io/client-go/kubernetes" -) - -// NewAPPMController creates a new appm controller. -func NewAPPMController(clientset kubernetes.Interface, - store store.Storer, - startCh *channels.RingChannel, - updateCh *channels.RingChannel, - probeCh *channels.RingChannel) *Controller { - c := &Controller{ - store: store, - updateCh: updateCh, - startCh: startCh, - probeCh: probeCh, - stopCh: make(chan struct{}), - } - // create prober first, then thirdparty - c.prober = prober.NewProber(c.store, c.probeCh, c.updateCh) - c.thirdparty = thirdparty.NewThirdPartier(clientset, c.store, c.startCh, c.updateCh, c.stopCh, c.prober) - return c -} - -// Controller describes a new appm controller. -type Controller struct { - store store.Storer - thirdparty thirdparty.ThirdPartier - prober prober.Prober - - startCh *channels.RingChannel - updateCh *channels.RingChannel - probeCh *channels.RingChannel - stopCh chan struct{} -} - -// Start starts appm controller -func (c *Controller) Start() error { - c.thirdparty.Start() - c.prober.Start() - logrus.Debugf("start thirdparty appm manager success") - return nil -} - -// Stop stops appm controller. -func (c *Controller) Stop() { - close(c.stopCh) - c.prober.Stop() -} diff --git a/worker/appm/prober/prober.go b/worker/appm/prober/prober.go deleted file mode 100644 index 7d38eee2d..000000000 --- a/worker/appm/prober/prober.go +++ /dev/null @@ -1,294 +0,0 @@ -// RAINBOND, Application Management Platform -// Copyright (C) 2014-2017 Goodrain Co., Ltd. - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. For any non-GPL usage of Rainbond, -// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. -// must be obtained first. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package prober - -import ( - "context" - "fmt" - "net" - "strings" - "sync" - "time" - - "github.com/eapache/channels" - "github.com/goodrain/rainbond/db" - "github.com/goodrain/rainbond/db/model" - uitlprober "github.com/goodrain/rainbond/util/prober" - v1 "github.com/goodrain/rainbond/util/prober/types/v1" - "github.com/goodrain/rainbond/worker/appm/store" - "github.com/goodrain/rainbond/worker/appm/thirdparty/discovery" - appmv1 "github.com/goodrain/rainbond/worker/appm/types/v1" - "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" -) - -// Prober is the interface that wraps the required methods to maintain status -// about upstream servers(Endpoints) associated with a third-party service. -type Prober interface { - Start() - Stop() - UpdateProbes(info []*store.ProbeInfo) - StopProbe(uuids []string) - IsUsedProbe(sid string) bool -} - -// NewProber creates a new third-party service prober. -func NewProber(store store.Storer, - probeCh *channels.RingChannel, - updateCh *channels.RingChannel) Prober { - ctx, cancel := context.WithCancel(context.Background()) - return &tpProbe{ - utilprober: uitlprober.NewProber(ctx, cancel), - dbm: db.GetManager(), - store: store, - - updateCh: updateCh, - probeCh: probeCh, - watcher: make(map[string]map[string]uitlprober.Watcher), - ctx: ctx, - cancel: cancel, - } -} - -// third-party service probe -type tpProbe struct { - utilprober uitlprober.Prober - dbm db.Manager - store store.Storer - probeCh *channels.RingChannel - updateCh *channels.RingChannel - ctx context.Context - cancel context.CancelFunc - watcher map[string]map[string]uitlprober.Watcher - lock sync.Mutex -} - -func createService(probe *model.TenantServiceProbe) *v1.Service { - return &v1.Service{ - Disable: probe.IsUsed == nil || *probe.IsUsed != 1, - ServiceHealth: &v1.Health{ - Model: probe.Scheme, - TimeInterval: probe.PeriodSecond, - MaxErrorsNum: probe.FailureThreshold, - MaxTimeoutSecond: probe.TimeoutSecond, - }, - } -} - -func (t *tpProbe) Start() { - t.utilprober.Start() - - go func() { - for { - select { - case event := <-t.probeCh.Out(): - if event == nil { - return - } - evt := event.(store.Event) - switch evt.Type { - case store.CreateEvent: - infos := evt.Obj.([]*store.ProbeInfo) - t.UpdateProbes(infos) - case store.UpdateEvent: - infos := evt.Obj.([]*store.ProbeInfo) - t.UpdateProbes(infos) - case store.DeleteEvent: - uuids := evt.Obj.([]string) - t.StopProbe(uuids) - } - case <-t.ctx.Done(): - return - } - } - }() -} - -// Stop stops prober. -func (t *tpProbe) Stop() { - t.cancel() -} - -func (t *tpProbe) UpdateProbes(infos []*store.ProbeInfo) { - t.lock.Lock() - defer t.lock.Unlock() - var services []*v1.Service - for _, info := range infos { - service, probeInfo := t.createServices(info) - if service == nil { - t.utilprober.StopProbes([]string{info.UUID}) - continue - } - services = append(services, service) - // watch - if swatchers, exist := t.watcher[service.Sid]; exist && swatchers != nil { - if watcher, exist := swatchers[service.Name]; exist && watcher != nil { - continue - } - } else { - t.watcher[service.Sid] = make(map[string]uitlprober.Watcher) - } - logrus.Infof("create probe[sid: %s, address: %s, port: %d]", service.Sid, service.ServiceHealth.Address, service.ServiceHealth.Port) - watcher := t.utilprober.WatchServiceHealthy(service.Name) - t.utilprober.EnableWatcher(watcher.GetServiceName(), watcher.GetID()) - t.watcher[service.Sid][service.Name] = watcher - go func(watcher uitlprober.Watcher, info *store.ProbeInfo) { - defer watcher.Close() - defer t.utilprober.DisableWatcher(watcher.GetServiceName(), watcher.GetID()) - defer delete(t.watcher[service.Sid], service.Name) - for { - select { - case event, ok := <-watcher.Watch(): - if !ok { - return - } - if event == nil { - logrus.Errorf("get nil event from prober status chan, will retry") - time.Sleep(time.Second * 3) - } - switch event.Status { - case v1.StatHealthy: - obj := &appmv1.RbdEndpoint{ - UUID: info.UUID, - IP: info.IP, - Port: int(info.Port), - Sid: info.Sid, - } - t.updateCh.In() <- discovery.Event{ - Type: discovery.HealthEvent, - Obj: obj, - } - case v1.StatDeath, v1.StatUnhealthy: - if event.ErrorNumber > service.ServiceHealth.MaxErrorsNum { - if probeInfo.Mode == model.OfflineFailureAction.String() { - obj := &appmv1.RbdEndpoint{ - UUID: info.UUID, - IP: info.IP, - Port: int(info.Port), - Sid: info.Sid, - } - t.updateCh.In() <- discovery.Event{ - Type: discovery.UnhealthyEvent, - Obj: obj, - } - } - } - } - case <-t.ctx.Done(): - // TODO: should stop for one service, not all services. - logrus.Infof("third app %s probe watcher exist", service.Name) - return - } - } - }(watcher, info) - } - //Method internally to determine if the configuration has changed - //remove old address probe - t.utilprober.UpdateServicesProbe(services) -} - -func (t *tpProbe) StopProbe(uuids []string) { - for _, name := range uuids { - t.utilprober.StopProbes([]string{name}) - } -} - -// GetProbeInfo returns probe info associated with sid. -// If there is a probe in the database, return directly -// If there is no probe in the database, return a default probe -func (t *tpProbe) GetProbeInfo(sid string) (*model.TenantServiceProbe, error) { - probes, err := t.dbm.ServiceProbeDao().GetServiceProbes(sid) - if err != nil || probes == nil || len(probes) == 0 || *(probes[0].IsUsed) == 0 { - if err != nil { - logrus.Warningf("ServiceID: %s; error getting probes: %v", sid, err) - } - return nil, nil - } - return probes[0], nil -} - -func (t *tpProbe) IsUsedProbe(sid string) bool { - if p, _ := t.GetProbeInfo(sid); p != nil { - return true - } - return false -} - -func (t *tpProbe) createServices(probeInfo *store.ProbeInfo) (*v1.Service, *model.TenantServiceProbe) { - if probeInfo.IP == "1.1.1.1" { - app := t.store.GetAppService(probeInfo.Sid) - if len(app.GetServices(true)) >= 1 { - appService := app.GetServices(true)[0] - if appService.Annotations != nil && appService.Annotations["domain"] != "" { - probeInfo.IP = appService.Annotations["domain"] - logrus.Debugf("domain address is : %s", probeInfo.IP) - } - } - if probeInfo.IP == "1.1.1.1" { - logrus.Warningf("serviceID: %s, is a domain thirdpart endpoint, but do not found domain info", probeInfo.Sid) - return nil, nil - } - } - tsp, err := t.GetProbeInfo(probeInfo.Sid) - if err != nil { - logrus.Warningf("ServiceID: %s; Unexpected error occurred, ignore the creation of "+ - "probes: %s", probeInfo.Sid, err.Error()) - return nil, nil - } - if tsp == nil { - return nil, nil - } - if tsp.Mode == "liveness" { - tsp.Mode = model.IgnoreFailureAction.String() - } - service := createService(tsp) - service.Sid = probeInfo.Sid - service.Name = probeInfo.UUID - service.ServiceHealth.Port = int(probeInfo.Port) - service.ServiceHealth.Name = service.Name - address := fmt.Sprintf("%s:%d", probeInfo.IP, probeInfo.Port) - if service.ServiceHealth.Model == "tcp" { - address = parseTCPHostAddress(probeInfo.IP, probeInfo.Port) - } - service.ServiceHealth.Address = address - return service, tsp -} - -func (t *tpProbe) createServiceNames(ep *corev1.Endpoints) string { - return ep.GetLabels()["uuid"] -} - -func parseTCPHostAddress(address string, port int32) string { - if strings.HasPrefix(address, "https://") { - address = strings.Split(address, "https://")[1] - } - if strings.HasPrefix(address, "http://") { - address = strings.Split(address, "http://")[1] - } - if strings.Contains(address, ":") { - address = strings.Split(address, ":")[0] - } - ns, err := net.LookupHost(address) - if err != nil || len(ns) == 0 { - return address - } - address = ns[0] - address = fmt.Sprintf("%s:%d", address, port) - return address -} diff --git a/worker/appm/prober/prober_test.go b/worker/appm/prober/prober_test.go deleted file mode 100644 index 26212a14b..000000000 --- a/worker/appm/prober/prober_test.go +++ /dev/null @@ -1,8 +0,0 @@ -package prober - -import "testing" - -func TestParseTCPHostAddress(t *testing.T) { - re := parseTCPHostAddress("rm-2ze0xlsi14xz6q6sz.mysql.rds.aliyuncs.com", 3306) - t.Log(re) -} diff --git a/worker/appm/store/store.go b/worker/appm/store/store.go index 5975f4c90..7f1e71371 100644 --- a/worker/appm/store/store.go +++ b/worker/appm/store/store.go @@ -25,7 +25,6 @@ import ( "sync" "time" - "github.com/eapache/channels" "github.com/goodrain/rainbond/api/util/bcode" "github.com/goodrain/rainbond/cmd/worker/option" "github.com/goodrain/rainbond/db" @@ -37,7 +36,6 @@ import ( k8sutil "github.com/goodrain/rainbond/util/k8s" "github.com/goodrain/rainbond/worker/appm/componentdefinition" "github.com/goodrain/rainbond/worker/appm/conversion" - "github.com/goodrain/rainbond/worker/appm/f" v1 "github.com/goodrain/rainbond/worker/appm/types/v1" "github.com/goodrain/rainbond/worker/server/pb" workerutil "github.com/goodrain/rainbond/worker/util" @@ -92,7 +90,6 @@ type Storer interface { UnRegistPodUpdateListener(string) RegisterVolumeTypeListener(string, chan<- *model.TenantServiceVolumeType) UnRegisterVolumeTypeListener(string) - InitOneThirdPartService(service *model.TenantServices) error GetCrds() ([]*apiextensions.CustomResourceDefinition, error) GetCrd(name string) (*apiextensions.CustomResourceDefinition, error) GetServiceMonitorClient() (*versioned.Clientset, error) @@ -125,14 +122,6 @@ type Event struct { Obj interface{} } -// ProbeInfo holds the context of a probe. -type ProbeInfo struct { - Sid string `json:"sid"` - UUID string `json:"uuid"` - IP string `json:"ip"` - Port int32 `json:"port"` -} - //appRuntimeStore app runtime store //cache all kubernetes object and appservice type appRuntimeStore struct { @@ -150,7 +139,6 @@ type appRuntimeStore struct { appCount int32 dbmanager db.Manager conf option.Config - startCh *channels.RingChannel stopch chan struct{} podUpdateListeners map[string]chan<- *corev1.Pod podUpdateListenerLock sync.Mutex @@ -165,9 +153,7 @@ func NewStore( clientset kubernetes.Interface, rainbondClient rainbondversioned.Interface, dbmanager db.Manager, - conf option.Config, - startCh *channels.RingChannel, - probeCh *channels.RingChannel) Storer { + conf option.Config) Storer { ctx, cancel := context.WithCancel(context.Background()) store := &appRuntimeStore{ kubeconfig: kubeconfig, @@ -181,7 +167,6 @@ func NewStore( conf: conf, dbmanager: dbmanager, crClients: make(map[string]interface{}), - startCh: startCh, resourceCache: NewResourceCache(), podUpdateListeners: make(map[string]chan<- *corev1.Pod, 1), volumeTypeListeners: make(map[string]chan<- *model.TenantServiceVolumeType, 1), @@ -255,9 +240,6 @@ func NewStore( store.informers.ComponentDefinition = rainbondInformer.Rainbond().V1alpha1().ComponentDefinitions().Informer() store.informers.ComponentDefinition.AddEventHandlerWithResyncPeriod(componentdefinition.GetComponentDefinitionBuilder(), time.Second*300) - isThirdParty := func(ep *corev1.Endpoints) bool { - return ep.Labels["service-kind"] == model.ServiceKindThirdParty.String() - } // Endpoint Event Handler epEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -273,15 +255,6 @@ func NewStore( } if appservice != nil { appservice.AddEndpoints(ep) - if isThirdParty(ep) && ep.Subsets != nil && len(ep.Subsets) > 0 { - logrus.Debugf("received add endpoints: %+v", ep) - probeInfos := listProbeInfos(ep, serviceID) - probeCh.In() <- Event{ - Type: CreateEvent, - Obj: probeInfos, - } - } - return } } }, @@ -298,17 +271,6 @@ func NewStore( logrus.Debugf("ServiceID: %s; Action: DeleteFunc;service is closed", serviceID) store.DeleteAppService(appservice) } - if isThirdParty(ep) { - logrus.Debugf("received delete endpoints: %+v", ep) - var uuids []string - for _, item := range ep.Subsets { - uuids = append(uuids, item.Ports[0].Name) - } - probeCh.In() <- Event{ - Type: DeleteEvent, - Obj: uuids, - } - } } } }, @@ -325,13 +287,6 @@ func NewStore( } if appservice != nil { appservice.AddEndpoints(cep) - if isThirdParty(cep) { - curInfos := listProbeInfos(cep, serviceID) - probeCh.In() <- Event{ - Type: UpdateEvent, - Obj: curInfos, - } - } } } }, @@ -365,51 +320,6 @@ func (a *appRuntimeStore) Lister() *Lister { return a.listers } -func listProbeInfos(ep *corev1.Endpoints, sid string) []*ProbeInfo { - var probeInfos []*ProbeInfo - addProbe := func(pi *ProbeInfo) { - for _, c := range probeInfos { - if c.IP == pi.IP && c.Port == pi.Port { - return - } - } - probeInfos = append(probeInfos, pi) - } - for _, subset := range ep.Subsets { - for _, port := range subset.Ports { - if ep.Annotations != nil { - if domain, ok := ep.Annotations["domain"]; ok && domain != "" { - logrus.Debugf("thirdpart service[sid: %s] add domain endpoint[domain: %s] probe", sid, domain) - probeInfos = []*ProbeInfo{{ - Sid: sid, - UUID: fmt.Sprintf("%s_%d", domain, port.Port), - IP: domain, - Port: port.Port, - }} - return probeInfos - } - } - for _, address := range subset.NotReadyAddresses { - addProbe(&ProbeInfo{ - Sid: sid, - UUID: fmt.Sprintf("%s_%d", address.IP, port.Port), - IP: address.IP, - Port: port.Port, - }) - } - for _, address := range subset.Addresses { - addProbe(&ProbeInfo{ - Sid: sid, - UUID: fmt.Sprintf("%s_%d", address.IP, port.Port), - IP: address.IP, - Port: port.Port, - }) - } - } - } - return probeInfos -} - func (a *appRuntimeStore) init() error { //init leader namespace leaderNamespace := a.conf.LeaderElectionNamespace @@ -442,64 +352,11 @@ func (a *appRuntimeStore) Start() error { // init core componentdefinition componentdefinition.GetComponentDefinitionBuilder().InitCoreComponentDefinition(a.rainbondClient) go func() { - a.initThirdPartyService() a.initCustomResourceInformer(stopch) }() return nil } -func (a *appRuntimeStore) initThirdPartyService() error { - logrus.Debugf("begin initializing third-party services.") - // TODO: list third party services that have open ports directly. - svcs, err := a.dbmanager.TenantServiceDao().ListThirdPartyServices() - if err != nil { - logrus.Errorf("error listing third-party services: %v", err) - return err - } - for _, svc := range svcs { - disCfg, _ := a.dbmanager.ThirdPartySvcDiscoveryCfgDao().GetByServiceID(svc.ServiceID) - if disCfg != nil && disCfg.Type == "kubernetes" { - continue - } - if err = a.InitOneThirdPartService(svc); err != nil { - logrus.Errorf("init thridpart service error: %v", err) - return err - } - - a.startCh.In() <- &v1.Event{ - Type: v1.StartEvent, // TODO: no need to distinguish between event types. - Sid: svc.ServiceID, - } - } - logrus.Infof("initializing third-party services success") - return nil -} - -// InitOneThirdPartService init one thridpart service -func (a *appRuntimeStore) InitOneThirdPartService(service *model.TenantServices) error { - // ignore service without open port. - if !a.dbmanager.TenantServicesPortDao().HasOpenPort(service.ServiceID) { - return nil - } - - appService, err := conversion.InitCacheAppService(a.dbmanager, service.ServiceID, "Rainbond") - if err != nil { - logrus.Errorf("error initializing cache app service: %v", err) - return err - } - if appService.IsCustomComponent() { - return nil - } - a.RegistAppService(appService) - err = f.ApplyOne(context.Background(), nil, a.clientset, appService) - if err != nil { - logrus.Errorf("error applying rule: %v", err) - return err - } - logrus.Infof("init third app %s kubernetes resource", appService.ServiceAlias) - return nil -} - //Ready if all kube informers is syncd, store is ready func (a *appRuntimeStore) Ready() bool { return a.informers.Ready() diff --git a/worker/appm/thirdparty/thirdparty.go b/worker/appm/thirdparty/thirdparty.go deleted file mode 100644 index 61c540508..000000000 --- a/worker/appm/thirdparty/thirdparty.go +++ /dev/null @@ -1,606 +0,0 @@ -// RAINBOND, Application Management Platform -// Copyright (C) 2014-2017 Goodrain Co., Ltd. - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. For any non-GPL usage of Rainbond, -// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. -// must be obtained first. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package thirdparty - -import ( - "context" - "fmt" - - "github.com/eapache/channels" - "github.com/goodrain/rainbond/db" - "github.com/goodrain/rainbond/db/model" - validation "github.com/goodrain/rainbond/util/endpoint" - "github.com/goodrain/rainbond/worker/appm/f" - "github.com/goodrain/rainbond/worker/appm/prober" - "github.com/goodrain/rainbond/worker/appm/store" - "github.com/goodrain/rainbond/worker/appm/thirdparty/discovery" - v1 "github.com/goodrain/rainbond/worker/appm/types/v1" - "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" -) - -// ThirdPartier is the interface that wraps the required methods to update status -// about upstream servers(Endpoints) associated with a third-party service. -type ThirdPartier interface { - Start() -} - -// NewThirdPartier creates a new ThirdPartier. -func NewThirdPartier(clientset kubernetes.Interface, - store store.Storer, - startCh *channels.RingChannel, - updateCh *channels.RingChannel, - stopCh chan struct{}, - prober prober.Prober) ThirdPartier { - t := &thirdparty{ - clientset: clientset, - store: store, - svcStopCh: make(map[string]chan struct{}), - startCh: startCh, - updateCh: updateCh, - stopCh: stopCh, - prober: prober, - } - return t -} - -type thirdparty struct { - clientset kubernetes.Interface - store store.Storer - prober prober.Prober - // a collection of stop channel for every service. - svcStopCh map[string]chan struct{} - - startCh *channels.RingChannel - updateCh *channels.RingChannel - stopCh chan struct{} -} - -// Start starts receiving event that update k8s endpoints status from start channel(startCh). -func (t *thirdparty) Start() { - go func() { - for { - select { - case event := <-t.updateCh.Out(): - devent, ok := event.(discovery.Event) - if !ok { - logrus.Warningf("Unexpected event received %+v", event) - continue - } - t.runUpdate(devent) - case <-t.stopCh: - for _, stopCh := range t.svcStopCh { - close(stopCh) - } - return - } - } - }() - go func() { - for { - select { - case event := <-t.startCh.Out(): - evt, ok := event.(*v1.Event) - if !ok { - logrus.Warningf("Unexpected event received %+v", event) - continue - } - logrus.Debugf("Received event: %+v", evt) - if evt.Type == v1.StartEvent { // no need to distinguish between event types - needWatch := false - stopCh := t.svcStopCh[evt.Sid] - if stopCh == nil { - logrus.Debugf("ServiceID: %s; already started.", evt.Sid) - needWatch = true - t.svcStopCh[evt.Sid] = make(chan struct{}) - } - go t.runStart(evt.Sid, needWatch) - } - if evt.Type == v1.StopEvent { - stopCh := t.svcStopCh[evt.Sid] - if stopCh == nil { - logrus.Warningf("ServiceID: %s; The third-party service has not started yet, cant't be stoped", evt.Sid) - continue - } - t.runDelete(evt.Sid) - close(stopCh) - delete(t.svcStopCh, evt.Sid) - } - case <-t.stopCh: - for _, stopCh := range t.svcStopCh { - close(stopCh) - } - return - } - } - }() -} - -func (t *thirdparty) runStart(sid string, needWatch bool) { - as := t.store.GetAppService(sid) - if as == nil { - logrus.Warnf("get app service from store failure, sid=%s", sid) - return - } - var err error - for i := 3; i > 0; i-- { - rbdeps, ir := t.ListRbdEndpoints(sid) - if rbdeps == nil || len(rbdeps) == 0 { - logrus.Warningf("ServiceID: %s;Empty rbd endpoints, stop starting third-party service.", sid) - continue - } - var eps []*corev1.Endpoints - eps, err = t.k8sEndpoints(as, rbdeps) - if err != nil { - logrus.Warningf("ServiceID: %s; error creating k8s endpoints: %s", sid, err.Error()) - continue - } - for _, ep := range eps { - if err := f.EnsureEndpoints(ep, t.clientset); err != nil { - logrus.Errorf("create or update endpoint %s failure %s", ep.Name, err.Error()) - } - } - - for _, service := range as.GetServices(true) { - if err := f.EnsureService(service, t.clientset); err != nil { - logrus.Errorf("create or update service %s failure %s", service.Name, err.Error()) - } - } - - if needWatch && ir != nil { - ir.Watch() - } - logrus.Infof("ServiceID: %s; successfully running start task", sid) - return - } - logrus.Errorf("ServiceID: %s; error running start task: %v", sid, err) -} - -// ListRbdEndpoints lists all rbd endpoints, include static and dynamic. -func (t *thirdparty) ListRbdEndpoints(sid string) ([]*v1.RbdEndpoint, Interacter) { - var res []*v1.RbdEndpoint - // static - s := NewStaticInteracter(sid) - slist, err := s.List() - if err != nil { - logrus.Warningf("ServiceID: %s;error listing static rbd endpoints: %v", sid, err) - } - if slist != nil && len(slist) > 0 { - res = append(res, slist...) - } - d := NewDynamicInteracter(sid, t.updateCh, t.stopCh) - if d != nil { - dlist, err := d.List() - if err != nil { - logrus.Warningf("ServiceID: %s;error listing dynamic rbd endpoints: %v", sid, err) - } - if dlist != nil && len(dlist) > 0 { - res = append(res, dlist...) - } - } - return res, d -} - -func deleteSubset(as *v1.AppService, rbdep *v1.RbdEndpoint) { - eps := as.GetEndpoints(true) - for _, ep := range eps { - for idx, item := range ep.Subsets { - if item.Ports[0].Name == rbdep.UUID { - logrus.Debugf("UUID: %s; subset deleted", rbdep.UUID) - ep.Subsets[idx] = ep.Subsets[len(ep.Subsets)-1] - ep.Subsets = ep.Subsets[:len(ep.Subsets)-1] - } - isDomain := false - for _, addr := range item.Addresses { - if addr.IP == "1.1.1.1" { - isDomain = true - } - } - for _, addr := range item.NotReadyAddresses { - if addr.IP == "1.1.1.1" { - isDomain = true - } - } - if isDomain { - for _, service := range as.GetServices(true) { - if service.Annotations != nil { - if rbdep.IP == service.Annotations["domain"] { - delete(service.Annotations, "domain") - } - } - } - } - } - } -} - -func (t *thirdparty) k8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpoint) ([]*corev1.Endpoints, error) { - ports, err := db.GetManager().TenantServicesPortDao().GetPortsByServiceID(as.ServiceID) - if err != nil { - return nil, err - } - // third-party service can only have one port - if len(ports) == 0 { - return nil, fmt.Errorf("port not found") - } - p := ports[0] - - var res []*corev1.Endpoints - if *p.IsInnerService { - ep := &corev1.Endpoints{} - ep.Namespace = as.TenantID - // inner or outer - if *p.IsInnerService { - ep.Name = fmt.Sprintf("service-%d-%d", p.ID, p.ContainerPort) - if p.K8sServiceName != "" { - ep.Name = p.K8sServiceName - } - ep.Labels = as.GetCommonLabels(map[string]string{ - "name": as.ServiceAlias + "Service", - "service-kind": model.ServiceKindThirdParty.String(), - }) - } - res = append(res, ep) - } - if *p.IsOuterService { - ep := &corev1.Endpoints{} - ep.Namespace = as.TenantID - // inner or outer - if *p.IsOuterService { - ep.Name = fmt.Sprintf("service-%d-%dout", p.ID, p.ContainerPort) - ep.Labels = as.GetCommonLabels(map[string]string{ - "name": as.ServiceAlias + "ServiceOUT", - "service-kind": model.ServiceKindThirdParty.String(), - }) - } - res = append(res, ep) - } - - var subsets []corev1.EndpointSubset - for _, epi := range epinfo { - logrus.Debugf("make endpoints[address: %s] subset", epi.IP) - subset := corev1.EndpointSubset{ - Ports: []corev1.EndpointPort{ - { - Name: epi.UUID, - Port: func(targetPort int, realPort int) int32 { - if realPort == 0 { - return int32(targetPort) - } - return int32(realPort) - }(p.ContainerPort, epi.Port), - Protocol: corev1.ProtocolTCP, - }, - }, - } - eaddressIP := epi.IP - address := validation.SplitEndpointAddress(epi.IP) - if validation.IsDomainNotIP(address) { - if len(as.GetServices(false)) > 0 { - annotations := as.GetServices(false)[0].Annotations - if annotations == nil { - annotations = make(map[string]string) - } - annotations["domain"] = epi.IP - as.GetServices(false)[0].Annotations = annotations - } - eaddressIP = "1.1.1.1" - } - eaddress := []corev1.EndpointAddress{ - { - IP: eaddressIP, - }, - } - useProbe := t.prober.IsUsedProbe(as.ServiceID) - if useProbe { - subset.NotReadyAddresses = eaddress - } else { - subset.Addresses = eaddress - } - subsets = append(subsets, subset) - } - //all endpoint for one third app is same - for _, item := range res { - item.Subsets = subsets - } - return res, nil -} - -func (t *thirdparty) createSubsetForAllEndpoint(as *v1.AppService, rbdep *v1.RbdEndpoint) error { - port, err := db.GetManager().TenantServicesPortDao().GetPortsByServiceID(as.ServiceID) - if err != nil { - return err - } - // third-party service can only have one port - if port == nil || len(port) == 0 { - return fmt.Errorf("Port not found") - } - ipAddress := rbdep.IP - address := validation.SplitEndpointAddress(rbdep.IP) - if validation.IsDomainNotIP(address) { - //domain endpoint set ip is 1.1.1.1 - ipAddress = "1.1.1.1" - if len(as.GetServices(false)) > 0 { - annotations := as.GetServices(false)[0].Annotations - if annotations == nil { - annotations = make(map[string]string) - } - annotations["domain"] = rbdep.IP - as.GetServices(false)[0].Annotations = annotations - } - } - - subset := corev1.EndpointSubset{ - Ports: []corev1.EndpointPort{ - { - Name: rbdep.UUID, - Port: func() int32 { - //if endpoint have port, will ues this port - //or use service port - if rbdep.Port != 0 { - return int32(rbdep.Port) - } - return int32(port[0].ContainerPort) - }(), - Protocol: corev1.ProtocolTCP, - }, - }, - } - eaddress := []corev1.EndpointAddress{ - { - IP: ipAddress, - }, - } - useProbe := t.prober.IsUsedProbe(as.ServiceID) - if useProbe { - subset.NotReadyAddresses = eaddress - } else { - subset.Addresses = eaddress - } - - for _, ep := range as.GetEndpoints(true) { - existPort := false - existAddress := false - for i, item := range ep.Subsets { - for _, port := range item.Ports { - if port.Port == int32(subset.Ports[0].Port) && len(item.Ports) < 2 { - for _, a := range item.Addresses { - if a.IP == ipAddress { - existAddress = true - break - } - } - for _, a := range item.NotReadyAddresses { - if a.IP == ipAddress { - existAddress = true - break - } - } - if !existAddress { - if useProbe { - ep.Subsets[i].NotReadyAddresses = append(ep.Subsets[i].NotReadyAddresses, subset.NotReadyAddresses...) - } else { - ep.Subsets[i].Addresses = append(ep.Subsets[i].NotReadyAddresses, subset.Addresses...) - } - } - existPort = true - } - } - } - if !existPort { - ep.Subsets = append(ep.Subsets, subset) - } - if err := f.EnsureEndpoints(ep, t.clientset); err != nil { - logrus.Errorf("update endpoint %s failure %s", ep.Name, err.Error()) - } - } - return nil -} - -func (t *thirdparty) runUpdate(event discovery.Event) { - - updateAddress := func(as *v1.AppService, rbdep *v1.RbdEndpoint, ready bool) { - ad := validation.SplitEndpointAddress(rbdep.IP) - for _, ep := range as.GetEndpoints(true) { - var needUpdate bool - for idx, subset := range ep.Subsets { - for _, port := range subset.Ports { - address := subset.Addresses - if ready { - address = subset.NotReadyAddresses - } - for i, addr := range address { - ipequal := fmt.Sprintf("%s_%d", addr.IP, port.Port) == fmt.Sprintf("%s_%d", rbdep.IP, rbdep.Port) - if (addr.IP == "1.1.1.1" && validation.IsDomainNotIP(ad)) || ipequal { - if validation.IsDomainNotIP(ad) { - rbdep.IP = "1.1.1.1" - } - ep.Subsets[idx] = updateSubsetAddress(ready, subset, address[i]) - needUpdate = true - break - } - } - logrus.Debugf("not found need update address by %s", fmt.Sprintf("%s_%d", rbdep.IP, rbdep.Port)) - } - } - if needUpdate { - if err := f.EnsureEndpoints(ep, t.clientset); err != nil { - logrus.Errorf("update endpoint %s failure %s", ep.Name, err.Error()) - } - } - } - } - // do not have multiple ports, multiple addresses - removeAddress := func(as *v1.AppService, rbdep *v1.RbdEndpoint) { - - ad := validation.SplitEndpointAddress(rbdep.IP) - for _, ep := range as.GetEndpoints(true) { - var needUpdate bool - var newSubsets []corev1.EndpointSubset - for idx, subset := range ep.Subsets { - var handleSubset bool - for i, port := range subset.Ports { - address := append(subset.Addresses, subset.NotReadyAddresses...) - for j, addr := range address { - ipequal := fmt.Sprintf("%s_%d", addr.IP, port.Port) == fmt.Sprintf("%s_%d", rbdep.IP, rbdep.Port) - if (addr.IP == "1.1.1.1" && validation.IsDomainNotIP(ad)) || ipequal { - //multiple port remove port, Instead remove the address - if len(subset.Ports) > 1 { - subset.Ports = append(subset.Ports[:i], subset.Ports[:i]...) - newSubsets = append(newSubsets, subset) - } else { - if validation.IsDomainNotIP(ad) { - rbdep.IP = "1.1.1.1" - } - newsub := removeSubsetAddress(ep.Subsets[idx], address[j]) - if len(newsub.Addresses) != 0 || len(newsub.NotReadyAddresses) != 0 { - newSubsets = append(newSubsets, newsub) - } - } - needUpdate = true - handleSubset = true - break - } - } - } - if !handleSubset { - newSubsets = append(newSubsets, subset) - } - } - ep.Subsets = newSubsets - if needUpdate { - if err := f.EnsureEndpoints(ep, t.clientset); err != nil { - logrus.Errorf("update endpoint %s failure %s", ep.Name, err.Error()) - } - } - } - } - - rbdep := event.Obj.(*v1.RbdEndpoint) - if rbdep == nil { - logrus.Warning("update event obj transfer to *v1.RbdEndpoint failure") - return - } - as := t.store.GetAppService(rbdep.Sid) - if as == nil { - logrus.Warnf("get app service from store failure, sid=%s", rbdep.Sid) - return - } - //rbdep.IP may be set "1.1.1.1" if it is domain - //so cache doamin address for show after handle complete - showEndpointIP := rbdep.IP - switch event.Type { - case discovery.UpdateEvent, discovery.CreateEvent: - err := t.createSubsetForAllEndpoint(as, rbdep) - if err != nil { - logrus.Warningf("ServiceID: %s; error adding subset: %s", - rbdep.Sid, err.Error()) - return - } - for _, service := range as.GetServices(true) { - if err := f.EnsureService(service, t.clientset); err != nil { - logrus.Errorf("create or update service %s failure %s", service.Name, err.Error()) - } - } - logrus.Debugf("upgrade endpoints and service for third app %s", as.ServiceAlias) - case discovery.DeleteEvent: - removeAddress(as, rbdep) - logrus.Debugf("third endpoint %s ip %s is deleted", rbdep.UUID, showEndpointIP) - case discovery.HealthEvent: - updateAddress(as, rbdep, true) - logrus.Debugf("third endpoint %s ip %s is onlined", rbdep.UUID, showEndpointIP) - case discovery.UnhealthyEvent: - logrus.Debugf("third endpoint %s ip %s is offlined", rbdep.UUID, showEndpointIP) - updateAddress(as, rbdep, false) - } -} - -func (t *thirdparty) runDelete(sid string) { - as := t.store.GetAppService(sid) // TODO: need to delete? - if eps := as.GetEndpoints(true); eps != nil { - for _, ep := range eps { - logrus.Debugf("Endpoints delete: %+v", ep) - err := t.clientset.CoreV1().Endpoints(as.TenantID).Delete(context.Background(), ep.Name, metav1.DeleteOptions{}) - if err != nil && !errors.IsNotFound(err) { - logrus.Warningf("error deleting endpoint empty old app endpoints: %v", err) - } - } - } -} - -func updateSubsetAddress(ready bool, subset corev1.EndpointSubset, address corev1.EndpointAddress) corev1.EndpointSubset { - if ready { - for i, a := range subset.NotReadyAddresses { - if a.IP == address.IP { - subset.NotReadyAddresses = append(subset.NotReadyAddresses[:i], subset.NotReadyAddresses[i+1:]...) - } - } - var exist bool - for _, a := range subset.Addresses { - if a.IP == address.IP { - exist = true - break - } - } - if !exist { - subset.Addresses = append(subset.Addresses, address) - } - } else { - for i, a := range subset.Addresses { - if a.IP == address.IP { - subset.Addresses = append(subset.Addresses[:i], subset.Addresses[i+1:]...) - } - } - var exist bool - for _, a := range subset.NotReadyAddresses { - if a.IP == address.IP { - exist = true - break - } - } - if !exist { - subset.NotReadyAddresses = append(subset.NotReadyAddresses, address) - } - } - return subset -} - -func removeSubsetAddress(subset corev1.EndpointSubset, address corev1.EndpointAddress) corev1.EndpointSubset { - for i, a := range subset.Addresses { - if a.IP == address.IP { - subset.Addresses = append(subset.Addresses[:i], subset.Addresses[i+1:]...) - } - } - for i, a := range subset.NotReadyAddresses { - if a.IP == address.IP { - subset.NotReadyAddresses = append(subset.NotReadyAddresses[:i], subset.NotReadyAddresses[i+1:]...) - } - } - return subset -} - -func isHealthy(subset corev1.EndpointSubset) bool { - if subset.Addresses != nil && len(subset.Addresses) > 0 { - return true - } - return false -} diff --git a/worker/discover/manager.go b/worker/discover/manager.go index 35640f0ff..a3f93e136 100644 --- a/worker/discover/manager.go +++ b/worker/discover/manager.go @@ -24,7 +24,6 @@ import ( "os" "time" - "github.com/eapache/channels" "github.com/goodrain/rainbond/cmd/worker/option" "github.com/goodrain/rainbond/mq/api/grpc/pb" "github.com/goodrain/rainbond/mq/client" @@ -59,11 +58,10 @@ type TaskManager struct { func NewTaskManager(cfg option.Config, store store.Storer, controllermanager *controller.Manager, - garbageCollector *gc.GarbageCollector, - startCh *channels.RingChannel) *TaskManager { + garbageCollector *gc.GarbageCollector) *TaskManager { ctx, cancel := context.WithCancel(context.Background()) - handleManager := handle.NewManager(ctx, cfg, store, controllermanager, garbageCollector, startCh) + handleManager := handle.NewManager(ctx, cfg, store, controllermanager, garbageCollector) healthStatus["status"] = "health" healthStatus["info"] = "worker service health" return &TaskManager{ diff --git a/worker/handle/manager.go b/worker/handle/manager.go index 1df8db36d..615e372dd 100644 --- a/worker/handle/manager.go +++ b/worker/handle/manager.go @@ -25,11 +25,6 @@ import ( "strings" "time" - "github.com/eapache/channels" - "github.com/sirupsen/logrus" - k8sErrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/goodrain/rainbond/cmd/worker/option" "github.com/goodrain/rainbond/db" dbmodel "github.com/goodrain/rainbond/db/model" @@ -41,6 +36,9 @@ import ( v1 "github.com/goodrain/rainbond/worker/appm/types/v1" "github.com/goodrain/rainbond/worker/discover/model" "github.com/goodrain/rainbond/worker/gc" + "github.com/sirupsen/logrus" + k8sErrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) //Manager manager @@ -51,8 +49,6 @@ type Manager struct { dbmanager db.Manager controllerManager *controller.Manager garbageCollector *gc.GarbageCollector - - startCh *channels.RingChannel } //NewManager now handle @@ -60,8 +56,7 @@ func NewManager(ctx context.Context, config option.Config, store store.Storer, controllerManager *controller.Manager, - garbageCollector *gc.GarbageCollector, - startCh *channels.RingChannel) *Manager { + garbageCollector *gc.GarbageCollector) *Manager { return &Manager{ ctx: ctx, @@ -70,7 +65,6 @@ func NewManager(ctx context.Context, store: store, controllerManager: controllerManager, garbageCollector: garbageCollector, - startCh: startCh, } } @@ -426,32 +420,6 @@ func (m *Manager) applyRuleExec(task *model.Task) error { return fmt.Errorf("component apply rule controller failure:%s", err.Error()) } - if svc.Kind == dbmodel.ServiceKindThirdParty.String() && strings.HasPrefix(body.Action, "port") { - if oldAppService == nil { - m.store.RegistAppService(newAppService) - } - if err = m.store.InitOneThirdPartService(svc); err != nil { - logrus.Errorf("application apply service resource failure: %s", err.Error()) - return fmt.Errorf("application apply service resource failure: %s", err.Error()) - } - if body.Action == "port-open" { - m.startCh.In() <- &v1.Event{ - Type: v1.StartEvent, - Sid: body.ServiceID, - Port: body.Port, - IsInner: body.IsInner, - } - } - if body.Action == "port-close" { - if !db.GetManager().TenantServicesPortDao().HasOpenPort(body.ServiceID) { - m.startCh.In() <- &v1.Event{ - Type: v1.StopEvent, - Sid: body.ServiceID, - } - } - } - } - return nil }