From 3636855e139d1aafe266217a294d71064a1f8241 Mon Sep 17 00:00:00 2001 From: huangrh Date: Sun, 31 Mar 2019 02:15:41 +0800 Subject: [PATCH 1/3] [FIX] free endpoints were removed by controller-manager --- util/prober/manager.go | 66 ++- util/prober/probes/probe.go | 3 - util/prober/probes/tcp.go | 2 +- util/prober/types/v1/types.go | 14 +- worker/appm/f/function.go | 65 ++- worker/appm/prober/prober.go | 199 ++++---- worker/appm/store/store.go | 90 +++- worker/appm/thirdparty/discovery/discovery.go | 6 +- worker/appm/thirdparty/thirdparty.go | 431 ++++++++---------- .../provider/lib/controller/controller.go | 3 +- worker/server/server.go | 75 +-- 11 files changed, 528 insertions(+), 426 deletions(-) diff --git a/util/prober/manager.go b/util/prober/manager.go index 85f3fe651..43bffcedd 100644 --- a/util/prober/manager.go +++ b/util/prober/manager.go @@ -20,7 +20,6 @@ package prober import ( "context" - "encoding/json" "errors" "sync" "time" @@ -39,12 +38,13 @@ type Prober interface { CloseWatch(serviceName string, id string) error Start() AddServices(in []*v1.Service) - CheckAndAddService(in *v1.Service) bool + CheckIfExist(in *v1.Service) bool SetServices([]*v1.Service) GetServices() []*v1.Service GetServiceHealth() map[string]*v1.HealthStatus SetAndUpdateServices([]*v1.Service) error AddAndUpdateServices([]*v1.Service) error + UpdateServiceProbe(service *v1.Service) UpdateServicesProbe(services []*v1.Service) Stop() error DisableWatcher(serviceName, watcherID string) @@ -276,18 +276,13 @@ func (p *probeManager) AddServices(in []*v1.Service) { } // CheckAndAddService checks if the input service exists, if it does not exist, add it. -func (p *probeManager) CheckAndAddService(in *v1.Service) bool { +func (p *probeManager) CheckIfExist(in *v1.Service) bool { exist := false for _, svc := range p.services { if svc.Name == in.Name { exist = true } } - if !exist { - b, _ := json.Marshal(in) - logrus.Debugf("add service: %s", string(b)) - p.services = append(p.services, in) - } return exist } @@ -300,9 +295,9 @@ func (p *probeManager) isNeedUpdate(new *v1.Service) bool { } if old == nil { // not found old one - return false + return true } - return new.Equal(old) + return !new.Equal(old) } func (p *probeManager) GetServiceHealth() map[string]*v1.HealthStatus { @@ -374,16 +369,35 @@ func (p *probeManager) updateAllServicesProbe() { } } +// UpdateServiceProbe updates and runs one service probe. +func (p *probeManager) UpdateServiceProbe(service *v1.Service) { + if service.ServiceHealth == nil || service.Disable || !p.isNeedUpdate(service) { + return + } + logrus.Debugf("Probe: %s; update probe.", service.Name) + // stop old probe + old := p.serviceProbe[service.Name] + if old != nil { + old.Stop() + } + // create new probe + serviceProbe := probe.CreateProbe(p.ctx, p.statusChan, service) + if serviceProbe != nil { + p.serviceProbe[service.Name] = serviceProbe + serviceProbe.Check() + } +} + // UpdateServicesProbe updates and runs services probe. func (p *probeManager) UpdateServicesProbe(services []*v1.Service) { + if services == nil || len(services) == 0 { + logrus.Debug("Empty services") + return + } + oldSvcs := p.ListServicesBySid(services[0].Sid) for _, v := range services { - if v.ServiceHealth == nil { - continue - } - if v.Disable { - continue - } - if !p.isNeedUpdate(v) { + delete(oldSvcs, v.Name) + if v.ServiceHealth == nil || v.Disable || !p.isNeedUpdate(v) { continue } logrus.Debugf("Probe: %s; update probe.", v.Name) @@ -392,6 +406,9 @@ func (p *probeManager) UpdateServicesProbe(services []*v1.Service) { if old != nil { old.Stop() } + if !p.CheckIfExist(v) { + p.services = append(p.services, v) + } // create new probe serviceProbe := probe.CreateProbe(p.ctx, p.statusChan, v) if serviceProbe != nil { @@ -399,6 +416,20 @@ func (p *probeManager) UpdateServicesProbe(services []*v1.Service) { serviceProbe.Check() } } + for _, svc := range oldSvcs { + p.StopProbes([]string{svc.Name}) + } +} + +// ListServicesBySid lists services corresponding to sid. +func (p *probeManager) ListServicesBySid(sid string) map[string]*v1.Service { + res := make(map[string]*v1.Service) + for _, svc := range p.services { + if svc.Sid == sid { + res[svc.Name] = svc + } + } + return res } // GetProbe returns a probe associated with name. @@ -413,6 +444,7 @@ func (p *probeManager) StopProbes(names []string) { logrus.Debugf("Name: %s; Probe not found.", name) continue } + logrus.Debugf("Name: %s; Probe stopped", name) probe.Stop() delete(p.serviceProbe, name) for idx, svc := range p.services { diff --git a/util/prober/probes/probe.go b/util/prober/probes/probe.go index 9b15666d2..a14d48b0d 100644 --- a/util/prober/probes/probe.go +++ b/util/prober/probes/probe.go @@ -20,7 +20,6 @@ package probe import ( "context" - "github.com/Sirupsen/logrus" "github.com/goodrain/rainbond/util/prober/types/v1" ) @@ -34,7 +33,6 @@ type Probe interface { func CreateProbe(ctx context.Context, statusChan chan *v1.HealthStatus, v *v1.Service) Probe { ctx, cancel := context.WithCancel(ctx) if v.ServiceHealth.Model == "tcp" { - logrus.Debug("creat tcp probe...") t := &TCPProbe{ Name: v.ServiceHealth.Name, Address: v.ServiceHealth.Address, @@ -47,7 +45,6 @@ func CreateProbe(ctx context.Context, statusChan chan *v1.HealthStatus, v *v1.Se return t } if v.ServiceHealth.Model == "http" { - logrus.Debug("creat http probe...") t := &HTTPProbe{ Name: v.ServiceHealth.Name, Address: v.ServiceHealth.Address, diff --git a/util/prober/probes/tcp.go b/util/prober/probes/tcp.go index 60149e7ae..4e330f659 100644 --- a/util/prober/probes/tcp.go +++ b/util/prober/probes/tcp.go @@ -32,7 +32,7 @@ func (h *TCPProbe) Stop() { // TCPCheck - func (h *TCPProbe) TCPCheck() { - logrus.Debug("tcp check...") + logrus.Debugf("TCP check; Name: %s; Address: %s", h.Name, h.Address) timer := time.NewTimer(time.Second * time.Duration(h.TimeInterval)) defer timer.Stop() for { diff --git a/util/prober/types/v1/types.go b/util/prober/types/v1/types.go index bc2f4a477..0e70cc398 100644 --- a/util/prober/types/v1/types.go +++ b/util/prober/types/v1/types.go @@ -25,12 +25,13 @@ const ( StatHealthy string = "healthy" // StatUnhealthy - StatUnhealthy string = "unhealthy" - // StaTDeath - + // StatDeath - StatDeath string = "death" ) -//Service Service +// Service Service type Service struct { + Sid string `json:"service_id"` Name string `json:"name"` ServiceHealth *Health `json:"health"` Disable bool `json:"disable"` @@ -41,13 +42,16 @@ func (l *Service) Equal(r *Service) bool { if l == r { return true } + if l.Sid != r.Sid { + return false + } if l.Name != r.Name { return false } if l.Disable != r.Disable { return false } - if l.ServiceHealth != r.ServiceHealth { + if !l.ServiceHealth.Equal(r.ServiceHealth) { return false } return true @@ -57,6 +61,7 @@ func (l *Service) Equal(r *Service) bool { type Health struct { Name string `json:"name"` Model string `json:"model"` + IP string `json:"ip"` Port int `json:"port"` Address string `json:"address"` TimeInterval int `json:"time_interval"` @@ -74,6 +79,9 @@ func (l *Health) Equal(r *Health) bool { if l.Model != r.Model { return false } + if l.IP != r.IP { + return false + } if l.Port != r.Port { return false } diff --git a/worker/appm/f/function.go b/worker/appm/f/function.go index 4d374a238..f84eb7787 100644 --- a/worker/appm/f/function.go +++ b/worker/appm/f/function.go @@ -53,7 +53,7 @@ func ApplyOne(clientset *kubernetes.Clientset, app *v1.AppService) error { } // update endpoints for _, ep := range app.GetEndpoints() { - ensureEndpoints(ep, clientset) + EnsureEndpoints(ep, clientset) } // update ingress for _, ing := range app.GetIngress() { @@ -140,9 +140,9 @@ func ensureSecret(secret *corev1.Secret, clientSet kubernetes.Interface) { } } -func ensureEndpoints(ep *corev1.Endpoints, clientSet kubernetes.Interface) { +// EnsureEndpoints creates or updates endpoints. +func EnsureEndpoints(ep *corev1.Endpoints, clientSet kubernetes.Interface) { _, err := clientSet.CoreV1().Endpoints(ep.Namespace).Update(ep) - if err != nil { if k8sErrors.IsNotFound(err) { _, err := clientSet.CoreV1().Endpoints(ep.Namespace).Create(ep) @@ -259,3 +259,62 @@ func UpgradeSecrets(clientset *kubernetes.Clientset, } return nil } + +// UpgradeEndpoints is used to update *corev1.Endpoints. +func UpgradeEndpoints(clientset *kubernetes.Clientset, + as *v1.AppService, old, new []*corev1.Endpoints, + handleErr func(msg string, err error) error) error { + var oldMap = make(map[string]*corev1.Endpoints, len(old)) + for i, item := range old { + oldMap[item.Name] = old[i] + } + for _, n := range new { + if o, ok := oldMap[n.Name]; ok { + ep, err := clientset.CoreV1().Endpoints(n.Namespace).Update(n) + if err != nil { + if e := handleErr(fmt.Sprintf("error updating endpoints: %+v: err: %v", + ep, err), err); e != nil { + return e + } + continue + } + as.AddEndpoints(ep) + delete(oldMap, o.Name) + logrus.Debugf("ServiceID: %s; successfully update endpoints: %s", as.ServiceID, ep.Name) + } else { + _, err := clientset.CoreV1().Endpoints(n.Namespace).Create(n) + if err != nil { + if err := handleErr(fmt.Sprintf("error creating endpoints: %+v: err: %v", + n, err), err); err != nil { + return err + } + continue + } + as.AddEndpoints(n) + logrus.Debugf("ServiceID: %s; successfully create endpoints: %s", as.ServiceID, n.Name) + } + } + for _, sec := range oldMap { + if sec != nil { + if err := clientset.CoreV1().Endpoints(sec.Namespace).Delete(sec.Name, &metav1.DeleteOptions{}); err != nil { + if err := handleErr(fmt.Sprintf("error deleting endpoints: %+v: err: %v", + sec, err), err); err != nil { + return err + } + continue + } + logrus.Debugf("ServiceID: %s; successfully delete endpoints: %s", as.ServiceID, sec.Name) + } + } + return nil +} + +// UpdateEndpoints uses clientset to update the given Endpoints. +func UpdateEndpoints(ep *corev1.Endpoints, clientSet *kubernetes.Clientset) { + _, err := clientSet.CoreV1().Endpoints(ep.Namespace).Update(ep) + if err != nil { + logrus.Warningf("error updating endpoints: %+v; err: %v", ep, err) + return + } + logrus.Debugf("Key: %s/%s; Successfully update endpoints", ep.GetNamespace(), ep.GetName()) +} diff --git a/worker/appm/prober/prober.go b/worker/appm/prober/prober.go index 9f116ceca..991365e9c 100644 --- a/worker/appm/prober/prober.go +++ b/worker/appm/prober/prober.go @@ -21,8 +21,6 @@ package prober import ( "context" "fmt" - "strconv" - "strings" "github.com/Sirupsen/logrus" "github.com/eapache/channels" @@ -41,9 +39,8 @@ import ( type Prober interface { Start() Stop() - AddProbe(ep *corev1.Endpoints) - UpdateProbe(ep *corev1.Endpoints) - StopProbe(ep *corev1.Endpoints) + UpdateProbes(info []*store.ProbeInfo) + StopProbe(uuids []string) } // NewProber creates a new third-party service prober. @@ -101,14 +98,14 @@ func (t *tpProbe) Start() { evt := event.(store.Event) switch evt.Type { case store.CreateEvent: - ep := evt.Obj.(*corev1.Endpoints) - t.AddProbe(ep) + infos := evt.Obj.([]*store.ProbeInfo) + t.UpdateProbes(infos) case store.UpdateEvent: - new := evt.Obj.(*corev1.Endpoints) - t.UpdateProbe(new) + infos := evt.Obj.([]*store.ProbeInfo) + t.UpdateProbes(infos) case store.DeleteEvent: - ep := evt.Obj.(*corev1.Endpoints) - t.StopProbe(ep) + uuids := evt.Obj.([]string) + t.StopProbe(uuids) } case <-t.ctx.Done(): return @@ -122,93 +119,83 @@ func (t *tpProbe) Stop() { t.cancel() } -func (t *tpProbe) AddProbe(ep *corev1.Endpoints) { - service, probeInfo, sid := t.createServices(ep) - if service == nil { - logrus.Debugf("Empty service, stop creating probe") - return - } - ip := ep.GetLabels()["ip"] - port, _ := strconv.Atoi(ep.GetLabels()["port"]) - // watch - if t.utilprober.CheckAndAddService(service) { - logrus.Debugf("Service: %+v; Exists", service) - return - } - go func(service *v1.Service) { - watcher := t.utilprober.WatchServiceHealthy(service.Name) - t.utilprober.EnableWatcher(watcher.GetServiceName(), watcher.GetID()) - defer watcher.Close() - defer t.utilprober.DisableWatcher(watcher.GetServiceName(), watcher.GetID()) - for { - select { - case event := <-watcher.Watch(): - if event == nil { - return - } - switch event.Status { - case v1.StatHealthy: - obj := &appmv1.RbdEndpoint{ - UUID: service.Name, - IP: ip, - Port: port, - Sid: sid, +func (t *tpProbe) UpdateProbes(infos []*store.ProbeInfo) { + var services []*v1.Service + for _, info := range infos { + service, probeInfo := t.createServices(info) + if service == nil { + logrus.Debugf("Empty service, stop creating probe") + continue + } + services = append(services, service) + // watch + if t.utilprober.CheckIfExist(service) { + continue + } + go func(service *v1.Service, info *store.ProbeInfo) { + watcher := t.utilprober.WatchServiceHealthy(service.Name) + t.utilprober.EnableWatcher(watcher.GetServiceName(), watcher.GetID()) + defer watcher.Close() + defer t.utilprober.DisableWatcher(watcher.GetServiceName(), watcher.GetID()) + for { + select { + case event := <-watcher.Watch(): + if event == nil { + return } - t.updateCh.In() <- discovery.Event{ - Type: discovery.HealthEvent, - Obj: obj, - } - case v1.StatDeath, v1.StatUnhealthy: - if event.ErrorNumber > service.ServiceHealth.MaxErrorsNum { - if probeInfo.Mode == model.OfflineFailureAction.String() { - obj := &appmv1.RbdEndpoint{ - UUID: service.Name, - IP: ip, - Port: port, - Sid: sid, - } - t.updateCh.In() <- discovery.Event{ - Type: discovery.DeleteEvent, - Obj: obj, - } - } else { - obj := &appmv1.RbdEndpoint{ - UUID: service.Name, - IP: ip, - Port: port, - Sid: sid, - } - t.updateCh.In() <- discovery.Event{ - Type: discovery.UnhealthyEvent, - Obj: obj, + switch event.Status { + case v1.StatHealthy: + obj := &appmv1.RbdEndpoint{ + UUID: info.UUID, + IP: info.IP, + Port: int(info.Port), + Sid: info.Sid, + } + t.updateCh.In() <- discovery.Event{ + Type: discovery.HealthEvent, + Obj: obj, + } + case v1.StatDeath, v1.StatUnhealthy: + if event.ErrorNumber > service.ServiceHealth.MaxErrorsNum { + if probeInfo.Mode == model.OfflineFailureAction.String() { + obj := &appmv1.RbdEndpoint{ + UUID: info.UUID, + IP: info.IP, + Port: int(info.Port), + Sid: info.Sid, + } + t.updateCh.In() <- discovery.Event{ + Type: discovery.DeleteEvent, + Obj: obj, + } + } else { + obj := &appmv1.RbdEndpoint{ + UUID: info.UUID, + IP: info.IP, + Port: int(info.Port), + Sid: info.Sid, + } + t.updateCh.In() <- discovery.Event{ + Type: discovery.UnhealthyEvent, + Obj: obj, + } } } } + case <-t.ctx.Done(): + // TODO: should stop for one service, not all services. + return } - case <-t.ctx.Done(): - // TODO: should stop for one service, not all services. - return } - } - }(service) - // start - t.utilprober.UpdateServicesProbe([]*v1.Service{service}) -} - -func (t *tpProbe) UpdateProbe(ep *corev1.Endpoints) { - service, _, _ := t.createServices(ep) - if service == nil { - logrus.Debugf("Empty service, stop updating probe") - return + }(service, info) } - - t.utilprober.UpdateServicesProbe([]*v1.Service{service}) + t.utilprober.UpdateServicesProbe(services) } -func (t *tpProbe) StopProbe(ep *corev1.Endpoints) { - name := t.createServiceNames(ep) - logrus.Debugf("Name: %s; Stop probe.", name) - t.utilprober.StopProbes([]string{name}) +func (t *tpProbe) StopProbe(uuids []string) { + for _, name := range uuids { + t.utilprober.StopProbes([]string{name}) + } } // GetProbeInfo returns probe info associated with sid. @@ -231,31 +218,23 @@ func (t *tpProbe) GetProbeInfo(sid string) (*model.TenantServiceProbe, error) { return probes[0], nil } -func (t *tpProbe) createServices(ep *corev1.Endpoints) (*v1.Service, *model.TenantServiceProbe, string) { - sid := ep.GetLabels()["service_id"] - ip := ep.GetLabels()["ip"] - port := ep.GetLabels()["port"] - if strings.TrimSpace(sid) == "" { - logrus.Warningf("Endpoints key: %s; ServiceID not found, stop creating probe", - fmt.Sprintf("%s/%s", ep.Namespace, ep.Name)) - return nil, nil, "" - } - probeInfo, err := t.GetProbeInfo(sid) +func (t *tpProbe) createServices(probeInfo *store.ProbeInfo) (*v1.Service, *model.TenantServiceProbe) { + tsp, err := t.GetProbeInfo(probeInfo.Sid) if err != nil { logrus.Warningf("ServiceID: %s; Unexpected error occurred, ignore the creation of "+ - "probes: %s", sid, err.Error()) - return nil, nil, "" + "probes: %s", probeInfo.Sid, err.Error()) + return nil, nil } - if probeInfo.Mode == "liveness" { - probeInfo.Mode = model.IgnoreFailureAction.String() + if tsp.Mode == "liveness" { + tsp.Mode = model.IgnoreFailureAction.String() } - service := createService(probeInfo) - service.Name = ep.GetLabels()["uuid"] - portint, _ := strconv.Atoi(port) - service.ServiceHealth.Port = portint - service.ServiceHealth.Name = service.Name // TODO: no need? - service.ServiceHealth.Address = fmt.Sprintf("%s:%s", ip, port) - return service, probeInfo, sid + service := createService(tsp) + service.Sid = probeInfo.Sid + service.Name = probeInfo.UUID + service.ServiceHealth.Port = int(probeInfo.Port) + service.ServiceHealth.Name = service.Name + service.ServiceHealth.Address = fmt.Sprintf("%s:%d", probeInfo.IP, probeInfo.Port) + return service, tsp } func (t *tpProbe) createServiceNames(ep *corev1.Endpoints) string { diff --git a/worker/appm/store/store.go b/worker/appm/store/store.go index 0186a4953..f3819dc2b 100644 --- a/worker/appm/store/store.go +++ b/worker/appm/store/store.go @@ -20,6 +20,7 @@ package store import ( "context" + "encoding/json" "fmt" "sync" "time" @@ -78,7 +79,14 @@ const ( type Event struct { Type EventType Obj interface{} - Old interface{} +} + +// ProbeInfo holds the context of a probe. +type ProbeInfo struct { + Sid string `json:"sid"` + UUID string `json:"uuid"` + IP string `json:"ip"` + Port int32 `json:"port"` } //appRuntimeStore app runtime store @@ -154,7 +162,6 @@ func NewStore(clientset *kubernetes.Clientset, epEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ep := obj.(*corev1.Endpoints) - logrus.Debugf("received add endpoints: %+v", ep) serviceID := ep.Labels["service_id"] version := ep.Labels["version"] createrID := ep.Labels["creater_id"] @@ -165,10 +172,12 @@ func NewStore(clientset *kubernetes.Clientset, } if appservice != nil { appservice.AddEndpoints(ep) - if isThirdParty(ep) { + if isThirdParty(ep) && ep.Subsets != nil && len(ep.Subsets) > 0 { + logrus.Debugf("received add endpoints: %+v", ep) + probeInfos := listProbeInfos(ep, serviceID) probeCh.In() <- Event{ Type: CreateEvent, - Obj: obj, + Obj: probeInfos, } } return @@ -177,7 +186,6 @@ func NewStore(clientset *kubernetes.Clientset, }, DeleteFunc: func(obj interface{}) { ep := obj.(*corev1.Endpoints) - logrus.Debugf("received delete endpoints: %+v", ep) serviceID := ep.Labels["service_id"] version := ep.Labels["version"] createrID := ep.Labels["creater_id"] @@ -190,9 +198,14 @@ func NewStore(clientset *kubernetes.Clientset, store.DeleteAppService(appservice) } if isThirdParty(ep) { + logrus.Debugf("received delete endpoints: %+v", ep) + var uuids []string + for _, item := range ep.Subsets { + uuids = append(uuids, item.Ports[0].Name) + } probeCh.In() <- Event{ Type: DeleteEvent, - Obj: obj, + Obj: uuids, } } } @@ -212,10 +225,10 @@ func NewStore(clientset *kubernetes.Clientset, if appservice != nil { appservice.AddEndpoints(cep) if isThirdParty(cep) { + curInfos := listProbeInfos(cep, serviceID) probeCh.In() <- Event{ Type: UpdateEvent, - Obj: cur, - Old: old, + Obj: curInfos, } } } @@ -235,6 +248,67 @@ func NewStore(clientset *kubernetes.Clientset, return store } +func listProbeInfos(ep *corev1.Endpoints, sid string) []*ProbeInfo { + var probeInfos []*ProbeInfo + for _, subset := range ep.Subsets { + uuid := subset.Ports[0].Name + port := subset.Ports[0].Port + for _, address := range subset.NotReadyAddresses { + info := &ProbeInfo{ + Sid: sid, + UUID: uuid, + IP: address.IP, + Port: port, + } + probeInfos = append(probeInfos, info) + } + for _, address := range subset.Addresses { + info := &ProbeInfo{ + Sid: sid, + UUID: uuid, + IP: address.IP, + Port: port, + } + probeInfos = append(probeInfos, info) + } + } + return probeInfos +} + +func upgradeProbe(ch chan<- interface{}, old, cur []*ProbeInfo) { + ob, _ := json.Marshal(old) + cb, _ := json.Marshal(cur) + logrus.Debugf("Old probe infos: %s", string(ob)) + logrus.Debugf("Current probe infos: %s", string(cb)) + oldMap := make(map[string]*ProbeInfo, len(old)) + for i := 0; i < len(old); i++ { + oldMap[old[i].UUID] = old[i] + } + for _, c := range cur { + if info := oldMap[c.UUID]; info != nil { + delete(oldMap, c.UUID) + logrus.Debugf("UUID: %s; update probe", c.UUID) + ch <- Event{ + Type: UpdateEvent, + Obj: c, + } + } else { + logrus.Debugf("UUID: %s; create probe", c.UUID) + ch <- Event{ + Type: CreateEvent, + Obj: []*ProbeInfo{c}, + } + } + } + for _, info := range oldMap { + logrus.Debugf("UUID: %s; delete probe", info.UUID) + ch <- Event{ + Type: DeleteEvent, + Obj: info, + } + } +} + func (a *appRuntimeStore) init() error { //init leader namespace leaderNamespace := a.conf.LeaderElectionNamespace diff --git a/worker/appm/thirdparty/discovery/discovery.go b/worker/appm/thirdparty/discovery/discovery.go index 2c9a93604..f9988450b 100644 --- a/worker/appm/thirdparty/discovery/discovery.go +++ b/worker/appm/thirdparty/discovery/discovery.go @@ -35,9 +35,11 @@ const ( // UpdateEvent event associated with an object update in a service discovery center UpdateEvent EventType = "UPDATE" // DeleteEvent event associated when an object is removed from a service discovery center - DeleteEvent EventType = "DELETE" + DeleteEvent EventType = "DELETE" + // UnhealthyEvent - UnhealthyEvent EventType = "UNHEALTHY" - HealthEvent EventType = "HEALTH" + // HealthEvent - + HealthEvent EventType = "HEALTH" ) // Event holds the context of an event. diff --git a/worker/appm/thirdparty/thirdparty.go b/worker/appm/thirdparty/thirdparty.go index 0fda3784e..5c632d543 100644 --- a/worker/appm/thirdparty/thirdparty.go +++ b/worker/appm/thirdparty/thirdparty.go @@ -21,19 +21,18 @@ package thirdparty import ( "encoding/json" "fmt" - "github.com/goodrain/rainbond/worker/util" - "strconv" + "sync" "github.com/Sirupsen/logrus" "github.com/eapache/channels" "github.com/goodrain/rainbond/db" "github.com/goodrain/rainbond/db/model" + "github.com/goodrain/rainbond/worker/appm/f" "github.com/goodrain/rainbond/worker/appm/store" "github.com/goodrain/rainbond/worker/appm/thirdparty/discovery" "github.com/goodrain/rainbond/worker/appm/types/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) @@ -63,7 +62,7 @@ func NewThirdPartier(clientset *kubernetes.Clientset, } type thirdparty struct { - clientset kubernetes.Interface + clientset *kubernetes.Clientset store store.Storer // a collection of stop channel for every service. @@ -87,13 +86,14 @@ func (t *thirdparty) Start() { } logrus.Debugf("Received event: %+v", evt) if evt.Type == v1.StartEvent { // no need to distinguish between event types + needWatch := false stopCh := t.svcStopCh[evt.Sid] - if stopCh != nil { + if stopCh == nil { logrus.Debugf("ServiceID: %s; already started.", evt.Sid) - continue + needWatch = true + t.svcStopCh[evt.Sid] = make(chan struct{}) } - t.svcStopCh[evt.Sid] = make(chan struct{}) - go t.runStart(evt.Sid) + go t.runStart(evt.Sid, needWatch) } if evt.Type == v1.StopEvent { stopCh := t.svcStopCh[evt.Sid] @@ -122,37 +122,33 @@ func (t *thirdparty) Start() { }() } -func (t *thirdparty) runStart(sid string) { - logrus.Debugf("ServiceID: %s; run start...", sid) +func (t *thirdparty) runStart(sid string, needWatch bool) { as := t.store.GetAppService(sid) - // TODO: when an error occurs, consider retrying. - rbdeps, d := t.ListRbdEndpoints(sid) - b, _ := json.Marshal(rbdeps) - logrus.Debugf("ServiceID: %s; rbd endpoints: %+v", sid, string(b)) - if rbdeps == nil || len(rbdeps) == 0 { - logrus.Warningf("ServiceID: %s;Empty rbd endpoints, stop starting third-party service.", sid) + var err error + for i := 3; i > 0; i-- { // retry 3 times + rbdeps, ir := t.ListRbdEndpoints(sid) + if rbdeps == nil || len(rbdeps) == 0 { + logrus.Warningf("ServiceID: %s;Empty rbd endpoints, stop starting third-party service.", sid) + continue + } + + var eps []*corev1.Endpoints + eps, err = t.k8sEndpoints(as, rbdeps) + if err != nil { + logrus.Warningf("ServiceID: %s; error creating k8s endpoints: %s", sid, err.Error()) + continue + } + for _, ep := range eps { + f.EnsureEndpoints(ep, t.clientset) + } + + if needWatch && ir != nil { + ir.Watch() + } + logrus.Infof("ServiceID: %s; successfully running start task", sid) return } - - eps, err := t.createK8sEndpoints(as, rbdeps) - if err != nil { - logrus.Errorf("ServiceID: %s; error creating k8s endpoints: %s", sid, err.Error()) - return - } - // find out old endpoints, and delete it. - old := as.GetEndpoints() - // TODO: can do better - del := findDeletedEndpoints(old, eps) - for _, ep := range del { - deleteEndpoints(ep, t.clientset) - } - for _, ep := range eps { - ensureEndpoints(ep, t.clientset) - } - - if d != nil { - d.Watch() - } + logrus.Errorf("ServiceID: %s; error running start task: %v", sid, err) } // ListRbdEndpoints lists all rbd endpoints, include static and dynamic. @@ -180,28 +176,20 @@ func (t *thirdparty) ListRbdEndpoints(sid string) ([]*v1.RbdEndpoint, Interacter return res, d } -func findDeletedEndpoints(old, new []*corev1.Endpoints) []*corev1.Endpoints { - if old == nil { - logrus.Debugf("empty old endpoints.") - return nil - } - var res []*corev1.Endpoints - for _, o := range old { - del := true - for _, n := range new { - if o.Name == n.Name { - del = false - break +func deleteSubset(as *v1.AppService, rbdep *v1.RbdEndpoint) { + eps := as.GetEndpoints() + for _, ep := range eps { + for idx, item := range ep.Subsets { + if item.Ports[0].Name == rbdep.UUID { + logrus.Debugf("UUID: %s; subset deleted", rbdep.UUID) + ep.Subsets[idx] = ep.Subsets[len(ep.Subsets)-1] + ep.Subsets = ep.Subsets[:len(ep.Subsets)-1] } } - if del { - res = append(res, o) - } } - return res } -func (t *thirdparty) createK8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpoint) ([]*corev1.Endpoints, error) { +func (t *thirdparty) k8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpoint) ([]*corev1.Endpoints, error) { ports, err := db.GetManager().TenantServicesPortDao().GetPortsByServiceID(as.ServiceID) if err != nil { return nil, err @@ -212,258 +200,201 @@ func (t *thirdparty) createK8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpo } p := ports[0] - logrus.Debugf("create outer third-party service") - f := func() []*corev1.Endpoints { - var eps []*corev1.Endpoints - for _, epi := range epinfo { - port, realport := func(targetPort int, realPort int) (int32, bool) { // final port - if realPort == 0 { - return int32(targetPort), false - } - return int32(realPort), true - }(p.ContainerPort, epi.Port) - ep := corev1.Endpoints{} - ep.Namespace = as.TenantID - if p.IsInnerService { - ep.Name = util.CreateEndpointsName(as.TenantName, as.ServiceAlias, epi.UUID) - ep.Labels = as.GetCommonLabels(map[string]string{ - "name": as.ServiceAlias + "Service", - "service-kind": model.ServiceKindThirdParty.String(), - }) - } - if p.IsOuterService { - ep.Name = util.CreateEndpointsName(as.TenantName, as.ServiceAlias, epi.UUID) + "out" - ep.Labels = as.GetCommonLabels(map[string]string{ - "name": as.ServiceAlias + "ServiceOUT", - "service-kind": model.ServiceKindThirdParty.String(), - }) - } - ep.Labels["uuid"] = epi.UUID - ep.Labels["ip"] = epi.IP - ep.Labels["port"] = strconv.Itoa(int(port)) - ep.Labels["real-port"] = strconv.FormatBool(realport) - subset := corev1.EndpointSubset{ - Ports: []corev1.EndpointPort{ - { - Port: port, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: epi.IP, - }, - }, - } - ep.Subsets = append(ep.Subsets, subset) - eps = append(eps, &ep) - } - return eps - } - var res []*corev1.Endpoints if p.IsInnerService { - res = append(res, f()...) + ep := &corev1.Endpoints{} + ep.Namespace = as.TenantID + // inner or outer + if p.IsInnerService { + ep.Name = fmt.Sprintf("service-%d-%d", p.ID, p.ContainerPort) + ep.Labels = as.GetCommonLabels(map[string]string{ + "name": as.ServiceAlias + "Service", + "service-kind": model.ServiceKindThirdParty.String(), + }) + } + res = append(res, ep) } if p.IsOuterService { - res = append(res, f()...) + ep := &corev1.Endpoints{} + ep.Namespace = as.TenantID + // inner or outer + if p.IsOuterService { + ep.Name = fmt.Sprintf("service-%d-%dout", p.ID, p.ContainerPort) + ep.Labels = as.GetCommonLabels(map[string]string{ + "name": as.ServiceAlias + "ServiceOUT", + "service-kind": model.ServiceKindThirdParty.String(), + }) + } + res = append(res, ep) + } + + var subsets []corev1.EndpointSubset + for _, epi := range epinfo { + subset := corev1.EndpointSubset{ + Ports: []corev1.EndpointPort{ + { + Name: epi.UUID, + Port: func(targetPort int, realPort int) int32 { + if realPort == 0 { + return int32(targetPort) + } + return int32(realPort) + }(p.ContainerPort, epi.Port), + Protocol: corev1.ProtocolTCP, + }, + }, + Addresses: []corev1.EndpointAddress{ + { + IP: epi.IP, + }, + }, + } + subsets = append(subsets, subset) + } + for _, item := range res { + item.Subsets = subsets } return res, nil } -func deleteEndpoints(ep *corev1.Endpoints, clientset kubernetes.Interface) { - err := clientset.CoreV1().Endpoints(ep.Namespace).Delete(ep.Name, &metav1.DeleteOptions{}) +func updateSubset(as *v1.AppService, rbdep *v1.RbdEndpoint) error { + ports, err := db.GetManager().TenantServicesPortDao().GetPortsByServiceID(as.ServiceID) if err != nil { - logrus.Debugf("Ignore; error deleting endpoints%+v: %v", ep, err) + return err } - logrus.Debugf("Delete endpoints: %+v", ep) -} - -func ensureEndpoints(ep *corev1.Endpoints, clientSet kubernetes.Interface) { - logrus.Debugf("ensure endpoints: %+v", ep) - _, err := clientSet.CoreV1().Endpoints(ep.Namespace).Update(ep) - - if err != nil { - if k8sErrors.IsNotFound(err) { - _, err := clientSet.CoreV1().Endpoints(ep.Namespace).Create(ep) - if err != nil { - logrus.Warningf("error creating endpoints %+v: %v", ep, err) + // third-party service can only have one port + if ports == nil || len(ports) == 0 { + return fmt.Errorf("Port not found") + } + p := ports[0] + subset := corev1.EndpointSubset{ + Ports: []corev1.EndpointPort{ + { + Name: rbdep.UUID, + Port: func(targetPort int, realPort int) int32 { + if realPort == 0 { + return int32(targetPort) + } + return int32(realPort) + }(p.ContainerPort, rbdep.Port), + Protocol: corev1.ProtocolTCP, + }, + }, + Addresses: []corev1.EndpointAddress{ + { + IP: rbdep.IP, + }, + }, + } + for _, ep := range as.GetEndpoints() { + exist := false + for idx, item := range ep.Subsets { + if item.Ports[0].Name == subset.Ports[0].Name { + ep.Subsets[idx] = item + exist = true + break } - return } - logrus.Warningf("error updating endpoints %+v: %v", ep, err) - } -} - -func ensureConfigMap(cm *corev1.ConfigMap, clientSet kubernetes.Interface) { - _, err := clientSet.CoreV1().ConfigMaps(cm.Namespace).Update(cm) - - if err != nil { - if k8sErrors.IsNotFound(err) { - _, err := clientSet.CoreV1().ConfigMaps(cm.Namespace).Create(cm) - if err != nil { - logrus.Warningf("error creating ConfigMaps %+v: %v", cm, err) - } - return + if !exist { + ep.Subsets = append(ep.Subsets, subset) } - logrus.Warningf("error updating ConfigMaps %+v: %v", cm, err) } + return nil } func (t *thirdparty) runUpdate(event discovery.Event) { - ep := event.Obj.(*v1.RbdEndpoint) - as := t.store.GetAppService(ep.Sid) - b, _ := json.Marshal(ep) - switch event.Type { - case discovery.CreateEvent: - logrus.Debugf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b)) - endpoints, err := t.createK8sEndpoints(as, []*v1.RbdEndpoint{ep}) - if err != nil { - logrus.Warningf("ServiceID: %s; error creating k8s endpoints struct: %s", - ep.Sid, err.Error()) - return - } - for _, ep := range endpoints { - ensureEndpoints(ep, t.clientset) - } - case discovery.UpdateEvent: - logrus.Debugf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b)) - // TODO: Compare old and new endpoints - // TODO: delete old endpoints - if !ep.IsOnline { - eps := ListOldEndpoints(as, ep) - for _, item := range eps { - deleteEndpoints(item, t.clientset) + fc := func(as *v1.AppService, rbdep *v1.RbdEndpoint, ready bool, msg string, condition func(subset corev1.EndpointSubset) bool) { + var wait sync.WaitGroup + go func() { + wait.Add(1) + defer wait.Done() + for _, ep := range as.GetEndpoints() { + for idx, subset := range ep.Subsets { + if subset.Ports[0].Name == rbdep.UUID && condition(subset) { + logrus.Debugf("Executed; health: %v; msg: %s", ready, msg) + ep.Subsets[idx] = createSubset(rbdep, ready) + f.UpdateEndpoints(ep, t.clientset) + } + } } - return - } - endpoints, err := t.createK8sEndpoints(as, []*v1.RbdEndpoint{ep}) + }() + wait.Wait() + } + + rbdep := event.Obj.(*v1.RbdEndpoint) + as := t.store.GetAppService(rbdep.Sid) + b, _ := json.Marshal(rbdep) + msg := fmt.Sprintf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b)) + switch event.Type { + case discovery.CreateEvent, discovery.UpdateEvent: + logrus.Debug(msg) + err := updateSubset(as, rbdep) if err != nil { - logrus.Warningf("ServiceID: %s; error creating k8s endpoints struct: %s", - ep.Sid, err.Error()) + logrus.Warningf("ServiceID: %s; error adding subset: %s", + rbdep.Sid, err.Error()) return } - for _, ep := range endpoints { - ensureEndpoints(ep, t.clientset) - } + _ = f.UpgradeEndpoints(t.clientset, as, as.GetEndpoints(), as.GetEndpoints(), + func(msg string, err error) error { + logrus.Warning(msg) + return nil + }) case discovery.DeleteEvent: - logrus.Debugf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b)) - eps := ListOldEndpoints(as, ep) - for _, item := range eps { - deleteEndpoints(item, t.clientset) - } + logrus.Debug(msg) + deleteSubset(as, rbdep) + eps := as.GetEndpoints() + _ = f.UpgradeEndpoints(t.clientset, as, as.GetEndpoints(), eps, + func(msg string, err error) error { + logrus.Warning(msg) + return nil + }) case discovery.HealthEvent: - subset := createSubset(ep, false) - eps := ListOldEndpoints(as, ep) - for _, ep := range eps { - if !isHealth(ep) { - ep.Subsets = []corev1.EndpointSubset{ - subset, - } - logrus.Debugf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b)) - ensureEndpoints(ep, t.clientset) - } + isUnhealthy := func(subset corev1.EndpointSubset) bool { + return !isHealthy(subset) } + fc(as, rbdep, true, msg, isUnhealthy) case discovery.UnhealthyEvent: - subset := createSubset(ep, true) - eps := ListOldEndpoints(as, ep) - for _, ep := range eps { - if isHealth(ep) { - ep.Subsets = []corev1.EndpointSubset{ - subset, - } - logrus.Debugf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b)) - ensureEndpoints(ep, t.clientset) - } - } + fc(as, rbdep, false, msg, isHealthy) } } func (t *thirdparty) runDelete(sid string) { as := t.store.GetAppService(sid) // TODO: need to delete? - - if services := as.GetServices(); services != nil { - for _, service := range services { - err := t.clientset.CoreV1().Services(as.TenantID).Delete(service.Name, &metav1.DeleteOptions{}) - if err != nil && !errors.IsNotFound(err) { - logrus.Warningf("error deleting service: %v", err) - } - } - } - if secrets := as.GetSecrets(); secrets != nil { - for _, secret := range secrets { - if secret != nil { - err := t.clientset.CoreV1().Secrets(as.TenantID).Delete(secret.Name, &metav1.DeleteOptions{}) - if err != nil && !errors.IsNotFound(err) { - logrus.Warningf("error deleting secrets: %v", err) - } - t.store.OnDelete(secret) - } - } - } - if ingresses := as.GetIngress(); ingresses != nil { - for _, ingress := range ingresses { - err := t.clientset.ExtensionsV1beta1().Ingresses(as.TenantID).Delete(ingress.Name, &metav1.DeleteOptions{}) - if err != nil && !errors.IsNotFound(err) { - logrus.Warningf("error deleting ingress: %v", err) - } - t.store.OnDelete(ingress) - } - } - if configs := as.GetConfigMaps(); configs != nil { - for _, config := range configs { - err := t.clientset.CoreV1().ConfigMaps(as.TenantID).Delete(config.Name, &metav1.DeleteOptions{}) - if err != nil && !errors.IsNotFound(err) { - logrus.Warningf("error deleting config map: %v", err) - } - t.store.OnDelete(config) - } - } if eps := as.GetEndpoints(); eps != nil { for _, ep := range eps { logrus.Debugf("Endpoints delete: %+v", ep) err := t.clientset.CoreV1().Endpoints(as.TenantID).Delete(ep.Name, &metav1.DeleteOptions{}) if err != nil && !errors.IsNotFound(err) { - logrus.Warningf("error deleting endpoint empty old app servicets: %v", err) + logrus.Warningf("error deleting endpoint empty old app endpoints: %v", err) } t.store.OnDelete(ep) } } } -func ListOldEndpoints(as *v1.AppService, ep *v1.RbdEndpoint) []*corev1.Endpoints { - var res []*corev1.Endpoints - for _, item := range as.GetEndpoints() { - if item.GetLabels()["uuid"] == ep.UUID { - res = append(res, item) - } - } - return res -} - -func createSubset(ep *v1.RbdEndpoint, notReady bool) corev1.EndpointSubset { +func createSubset(ep *v1.RbdEndpoint, ready bool) corev1.EndpointSubset { address := corev1.EndpointAddress{ IP: ep.IP, } port := corev1.EndpointPort{ - Port: int32(ep.Port), + Name: ep.UUID, + Port: int32(ep.Port), + Protocol: corev1.ProtocolTCP, } subset := corev1.EndpointSubset{} - if notReady { - subset.NotReadyAddresses = append(subset.Addresses, address) - } else { + if ready { subset.Addresses = append(subset.Addresses, address) + } else { + subset.NotReadyAddresses = append(subset.NotReadyAddresses, address) } subset.Ports = append(subset.Ports, port) return subset } -func isHealth(ep *corev1.Endpoints) bool { - if ep.Subsets == nil || len(ep.Subsets) == 0 { - return false - } - if ep.Subsets[0].Addresses != nil && len(ep.Subsets[0].Addresses) > 0 { +func isHealthy(subset corev1.EndpointSubset) bool { + if subset.Addresses != nil && len(subset.Addresses) > 0 { return true } return false diff --git a/worker/master/volumes/provider/lib/controller/controller.go b/worker/master/volumes/provider/lib/controller/controller.go index 9584ede05..50dadea5b 100644 --- a/worker/master/volumes/provider/lib/controller/controller.go +++ b/worker/master/volumes/provider/lib/controller/controller.go @@ -615,7 +615,8 @@ func (ctrl *ProvisionController) processNextClaimWorkItem() bool { if err := ctrl.syncClaimHandler(key); err != nil { if ctrl.claimQueue.NumRequeues(obj) < ctrl.failedProvisionThreshold { - logrus.Warningf("Retrying syncing claim %q because failures %v < threshold %v", key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold) + logrus.Warningf("Retrying syncing claim %q because failures %v < threshold %v", + key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold) ctrl.claimQueue.AddRateLimited(obj) } else { logrus.Errorf("Giving up syncing claim %q because failures %v >= threshold %v", key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold) diff --git a/worker/server/server.go b/worker/server/server.go index 30b118f15..a823d00b6 100644 --- a/worker/server/server.go +++ b/worker/server/server.go @@ -21,9 +21,7 @@ package server import ( "context" "fmt" - corev1 "k8s.io/api/core/v1" "net" - "strconv" "strings" "time" @@ -38,6 +36,7 @@ import ( "github.com/goodrain/rainbond/worker/server/pb" "google.golang.org/grpc" "google.golang.org/grpc/reflection" + corev1 "k8s.io/api/core/v1" ) //RuntimeServer app runtime grpc server @@ -242,36 +241,49 @@ func (r *RuntimeServer) ListThirdPartyEndpoints(ctx context.Context, re *pb.Serv return new(pb.ThirdPartyEndpoints), nil } var pbeps []*pb.ThirdPartyEndpoint + // The same IP may correspond to two endpoints, which are internal and external endpoints. + // So it is need to filter the same IP. exists := make(map[string]bool) for _, ep := range as.GetEndpoints() { - if exists[ep.GetLabels()["uuid"]] { + if ep.Subsets == nil || len(ep.Subsets) == 0 { + logrus.Debugf("Key: %s; empty subsets", fmt.Sprintf("%s/%s", ep.Namespace, ep.Name)) continue } - exists[ep.GetLabels()["uuid"]] = true - pbep := &pb.ThirdPartyEndpoint{ - Uuid: ep.GetLabels()["uuid"], - Sid: ep.GetLabels()["service_id"], - Ip: ep.GetLabels()["ip"], - Port: func(item *corev1.Endpoints) int32 { - realport, _ := strconv.ParseBool(item.GetLabels()["realport"]) - if realport { - portstr := item.GetLabels()["port"] - port, _ := strconv.Atoi(portstr) - return int32(port) + for idx, subset := range ep.Subsets { + if exists[subset.Ports[0].Name] { + continue + } + ip := func(subset corev1.EndpointSubset) string { + if subset.Addresses != nil && len(subset.Addresses) > 0 { + return subset.Addresses[0].IP } - return 0 - }(ep), - Status: func(item *corev1.Endpoints) string { - if item.Subsets == nil || len(item.Subsets) == 0 { + if subset.NotReadyAddresses != nil && len(subset.NotReadyAddresses) > 0 { + return subset.NotReadyAddresses[0].IP + } + return "" + }(subset) + if strings.TrimSpace(ip) == "" { + logrus.Debugf("Key: %s; Index: %d; IP not found", fmt.Sprintf("%s/%s", ep.Namespace, ep.Name), idx) + continue + } + exists[subset.Ports[0].Name] = true + pbep := &pb.ThirdPartyEndpoint{ + Uuid: subset.Ports[0].Name, + Sid: ep.GetLabels()["service_id"], + Ip: ip, + Port: subset.Ports[0].Port, + Status: func(item *corev1.Endpoints) string { + if subset.Addresses != nil && len(subset.Addresses) > 0 { + return "healthy" + } + if subset.NotReadyAddresses != nil && len(subset.NotReadyAddresses) > 0 { + return "unhealthy" + } return "unknown" - } - if item.Subsets[0].Addresses == nil || len(item.Subsets[0].Addresses) == 0 { - return "unhealthy" - } - return "healthy" - }(ep), + }(ep), + } + pbeps = append(pbeps, pbep) } - pbeps = append(pbeps, pbep) } return &pb.ThirdPartyEndpoints{ Obj: pbeps, @@ -310,9 +322,16 @@ func (r *RuntimeServer) UpdThirdPartyEndpoint(ctx context.Context, re *pb.UpdThi Port: int(re.Port), IsOnline: re.IsOnline, } - r.updateCh.In() <- discovery.Event{ - Type: discovery.UpdateEvent, - Obj: rbdep, + if re.IsOnline == false { + r.updateCh.In() <- discovery.Event{ + Type: discovery.DeleteEvent, + Obj: rbdep, + } + } else { + r.updateCh.In() <- discovery.Event{ + Type: discovery.UpdateEvent, + Obj: rbdep, + } } return new(pb.Empty), nil } From 2320e1d96213d112d24e27a80045ea4ce8f860cd Mon Sep 17 00:00:00 2001 From: huangrh Date: Sun, 31 Mar 2019 02:17:07 +0800 Subject: [PATCH 2/3] [REV] find the associated endpoints by the name of the service --- gateway/store/store.go | 227 ++++++++++++++++------------------------- 1 file changed, 89 insertions(+), 138 deletions(-) diff --git a/gateway/store/store.go b/gateway/store/store.go index 713ed3529..74c30ae62 100644 --- a/gateway/store/store.go +++ b/gateway/store/store.go @@ -29,9 +29,6 @@ import ( "strings" "sync" - "github.com/goodrain/rainbond/db/model" - "k8s.io/apimachinery/pkg/labels" - "github.com/Sirupsen/logrus" "github.com/eapache/channels" "github.com/goodrain/rainbond/cmd/gateway/option" @@ -40,7 +37,7 @@ import ( "github.com/goodrain/rainbond/gateway/controller/config" "github.com/goodrain/rainbond/gateway/defaults" "github.com/goodrain/rainbond/gateway/util" - v1 "github.com/goodrain/rainbond/gateway/v1" + "github.com/goodrain/rainbond/gateway/v1" corev1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -377,97 +374,74 @@ func (s *k8sStore) extractAnnotations(ing *extensions.Ingress) { func (s *k8sStore) ListPool() ([]*v1.Pool, []*v1.Pool) { var httpPools []*v1.Pool var tcpPools []*v1.Pool + l7Pools := make(map[string]*v1.Pool) + l4Pools := make(map[string]*v1.Pool) + for _, item := range s.listers.Endpoint.List() { + ep := item.(*corev1.Endpoints) - f := func(backendMap map[string][]backend, poolMap map[string]struct{}) map[string]*v1.Pool { - pools := make(map[string]*v1.Pool) - for k, v := range backendMap { - service, err := s.listers.Service.ByKey(k) - if err != nil { - logrus.Warningf("Key: %s;error getting service: %v", k, err) - continue - } - originPort := service.GetLabels()["origin_port"] - var pluginPort int32 - if originPort != "" { - port, err := strconv.Atoi(originPort) - if err != nil { - logrus.Warningf("Origin port: %s; error converting to type int: %v", err) - pluginPort = 0 - } else { - pluginPort = int32(port) + if ep.Subsets != nil || len(ep.Subsets) != 0 { + epn := ep.ObjectMeta.Name + // l7 + backends := l7PoolBackendMap[ep.ObjectMeta.Name] + for _, backend := range backends { + pool := l7Pools[backend.name] + if pool == nil { + pool = &v1.Pool{ + Nodes: []*v1.Node{}, + } + pool.Name = backend.name + pool.UpstreamHashBy = backend.hashBy + l7Pools[backend.name] = pool } - } - - // list related endpoints - labelname, ok := service.GetLabels()["name"] - if !ok { - logrus.Warningf("Key: %s;label 'name' not found", k) - continue - } - selector, err := labels.Parse(fmt.Sprintf("name=%s", labelname)) - if err != nil { - logrus.Warningf("Label: %s; error parsing labels: %s", - fmt.Sprintf("name=%s", labelname), err.Error()) - continue - } - endpoints, err := s.sharedInformer.Core().V1().Endpoints().Lister().Endpoints(service.GetNamespace()).List(selector) - if err != nil { - logrus.Warningf("Label: %s; error listing endpoints: %v", - fmt.Sprintf("name=%s", labelname), err) - continue - } - - for _, ep := range endpoints { - if ep.Subsets == nil || len(ep.Subsets) == 0 { - continue - } - if ep.GetLabels()["service_kind"] != model.ServiceKindThirdParty.String() { - if ep.Subsets[0].Ports[0].Port != service.Spec.Ports[0].TargetPort.IntVal { - continue + for _, ss := range ep.Subsets { + var addresses []corev1.EndpointAddress + if ss.Addresses != nil && len(ss.Addresses) > 0 { + addresses = append(addresses, ss.Addresses...) + } else { + addresses = append(addresses, ss.NotReadyAddresses...) + } + for _, address := range addresses { + if _, ok := l7PoolMap[epn]; ok { // l7 + pool.Nodes = append(pool.Nodes, &v1.Node{ + Host: address.IP, + Port: ss.Ports[0].Port, + Weight: backend.weight, + }) + } } } - - for _, backend := range v { - pool := pools[backend.name] - if pool == nil { - pool = &v1.Pool{ - Nodes: []*v1.Node{}, - } - pool.Name = backend.name - pool.UpstreamHashBy = backend.hashBy - pools[backend.name] = pool + } + // l4 + backends = l4PoolBackendMap[ep.ObjectMeta.Name] + for _, backend := range backends { + pool := l4Pools[backend.name] + if pool == nil { + pool = &v1.Pool{ + Nodes: []*v1.Node{}, } - for _, ss := range ep.Subsets { - var addresses []corev1.EndpointAddress - if ss.Addresses != nil && len(ss.Addresses) > 0 { - addresses = append(addresses, ss.Addresses...) - } else { - addresses = append(addresses, ss.NotReadyAddresses...) - } - for _, address := range addresses { - if _, ok := poolMap[k]; ok { // l7 - pool.Nodes = append(pool.Nodes, &v1.Node{ - Host: address.IP, - Port: func(pluginPort, addressPort int32) int32 { - if pluginPort != 0 { - return pluginPort - } - return addressPort - }(pluginPort, ss.Ports[0].Port), - Weight: backend.weight, - }) - } + pool.Name = backend.name + l4Pools[backend.name] = pool + } + for _, ss := range ep.Subsets { + var addresses []corev1.EndpointAddress + if ss.Addresses != nil && len(ss.Addresses) > 0 { + addresses = append(addresses, ss.Addresses...) + } else { + addresses = append(addresses, ss.NotReadyAddresses...) + } + for _, address := range addresses { + if _, ok := l4PoolMap[epn]; ok { // l7 + pool.Nodes = append(pool.Nodes, &v1.Node{ + Host: address.IP, + Port: ss.Ports[0].Port, + Weight: backend.weight, + }) } } } } } - return pools } - - l7Pools := f(l7PoolBackendMap, l7PoolMap) - l4Pools := f(l4PoolBackendMap, l4PoolMap) - // change map to slice TODO: use map directly for _, pool := range l7Pools { httpPools = append(httpPools, pool) @@ -491,20 +465,20 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V if !s.ingressIsValid(ing) { continue } + ingKey := k8s.MetaNamespaceKey(ing) anns, err := s.GetIngressAnnotations(ingKey) if err != nil { logrus.Errorf("Error getting Ingress annotations %q: %v", ingKey, err) } if anns.L4.L4Enable && anns.L4.L4Port != 0 { - svcKey := fmt.Sprintf("%v/%v", ing.Namespace, ing.Spec.Backend.ServiceName) - // region l4 host := strings.Replace(anns.L4.L4Host, " ", "", -1) if host == "" { host = s.conf.IP } host = s.conf.IP + svcKey := fmt.Sprintf("%v/%v", ing.Namespace, ing.Spec.Backend.ServiceName) protocol := s.GetServiceProtocol(svcKey, ing.Spec.Backend.ServicePort.IntVal) listening := fmt.Sprintf("%s:%v", host, anns.L4.L4Port) if string(protocol) == string(v1.ProtocolUDP) { @@ -521,11 +495,12 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V vs.Namespace = anns.Namespace vs.ServiceID = anns.Labels["service_id"] } - l4PoolMap[svcKey] = struct{}{} + + l4PoolMap[ing.Spec.Backend.ServiceName] = struct{}{} l4vsMap[listening] = vs l4vs = append(l4vs, vs) backend := backend{name: backendName, weight: anns.Weight.Weight} - l4PoolBackendMap[svcKey] = append(l4PoolBackendMap[svcKey], backend) + l4PoolBackendMap[ing.Spec.Backend.ServiceName] = append(l4PoolBackendMap[ing.Spec.Backend.ServiceName], backend) // endregion } else { // region l7 @@ -577,14 +552,15 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V vs.SSLCert = hostSSLMap[DefVirSrvName] } } + l7vsMap[virSrvName] = vs l7vs = append(l7vs, vs) } + for _, path := range rule.IngressRuleValue.HTTP.Paths { - svckey := fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName) locKey := fmt.Sprintf("%s_%s", virSrvName, path.Path) location := srvLocMap[locKey] - l7PoolMap[svckey] = struct{}{} + l7PoolMap[path.Backend.ServiceName] = struct{}{} // if location do not exists, then creates a new one if location == nil { location = &v1.Location{ @@ -593,10 +569,9 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V } srvLocMap[locKey] = location vs.Locations = append(vs.Locations, location) + // the first ingress proxy takes effect + location.Proxy = anns.Proxy } - - location.Proxy = anns.Proxy - // If their ServiceName is the same, then the new one will overwrite the old one. nameCondition := &v1.Condition{} var backendName string @@ -619,7 +594,7 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V if anns.UpstreamHashBy != "" { backend.hashBy = anns.UpstreamHashBy } - l7PoolBackendMap[svckey] = append(l7PoolBackendMap[svckey], backend) + l7PoolBackendMap[path.Backend.ServiceName] = append(l7PoolBackendMap[path.Backend.ServiceName], backend) } } // endregion @@ -630,57 +605,47 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V // ingressIsValid checks if the specified ingress is valid func (s *k8sStore) ingressIsValid(ing *extensions.Ingress) bool { - var svcKey string + + var endpointKey string if ing.Spec.Backend != nil { // stream - svcKey = fmt.Sprintf("%s/%s", ing.Namespace, ing.Spec.Backend.ServiceName) + endpointKey = fmt.Sprintf("%s/%s", ing.Namespace, ing.Spec.Backend.ServiceName) } else { // http Loop: for _, rule := range ing.Spec.Rules { for _, path := range rule.IngressRuleValue.HTTP.Paths { - svcKey = fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName) - if svcKey != "" { + endpointKey = fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName) + if endpointKey != "" { break Loop } } } } - service, err := s.listers.Service.ByKey(svcKey) + item, exists, err := s.listers.Endpoint.GetByKey(endpointKey) if err != nil { - logrus.Warningf("Key: %s; error getting key: %v", err) + logrus.Errorf("Can not get endpoint by key(%s): %v", endpointKey, err) return false } - labelname := fmt.Sprintf("name=%s", service.GetLabels()["name"]) - selector, err := labels.Parse(labelname) - if err != nil { - logrus.Warningf("Label: %s;error parsing labels: %v", labelname, err) + if !exists { + logrus.Warningf("Endpoint \"%s\" does not exist.", endpointKey) return false } - eps, err := s.sharedInformer.Core().V1().Endpoints().Lister().Endpoints(service.GetNamespace()).List(selector) - if err != nil { - logrus.Warningf("selector: %s; error listing endpoints: %v", - fmt.Sprintf("name=%s", labelname), err) + endpoint, ok := item.(*corev1.Endpoints) + if !ok { + logrus.Errorf("Cant not convert %v to %v", reflect.TypeOf(item), reflect.TypeOf(endpoint)) return false } - if eps == nil || len(eps) == 0 { - logrus.Warningf("Selector: %s; empty endpoints", fmt.Sprintf("name=%s", labelname)) + if endpoint.Subsets == nil || len(endpoint.Subsets) == 0 { + logrus.Warningf("Endpoints(%s) is empty, ignore it", endpointKey) + return false } - - result := false - for _, ep := range eps { - if ep.Subsets != nil && len(ep.Subsets) > 0 { - e := ep.Subsets[0] - if ep.GetLabels()["service_kind"] != model.ServiceKindThirdParty.String() && - e.Ports[0].Port != service.Spec.Ports[0].Port { - continue - } - if !((e.Addresses == nil || len(e.Addresses) == 0) && (e.NotReadyAddresses == nil || len(e.NotReadyAddresses) == 0)) { - result = true - break - } + for _, ep := range endpoint.Subsets { + if (ep.Addresses == nil || len(ep.Addresses) == 0) && (ep.NotReadyAddresses == nil || len(ep.NotReadyAddresses) == 0) { + logrus.Warningf("Endpoints(%s) is empty, ignore it", endpointKey) + return false } } - return result + return true } // GetIngress returns the Ingress matching key. @@ -701,20 +666,6 @@ func (s *k8sStore) ListIngresses() []*extensions.Ingress { return ingresses } -// GetServiceNameLabelByKey returns name in the labels of corev1.Service -// matching key(name/namespace). -func (s *k8sStore) GetServiceNameLabelByKey(key string) (string, error) { - svc, err := s.listers.Service.ByKey(key) - if err != nil { - return "", err - } - name, ok := svc.Labels["name"] - if !ok { - return "", fmt.Errorf("label \"name\" not found") - } - return name, nil -} - // GetServiceProtocol returns the Service matching key and port. func (s *k8sStore) GetServiceProtocol(key string, port int32) corev1.Protocol { svcs, err := s.listers.Service.ByKey(key) From 18bdc929a80cd53843f5315277c986f6e2208ddd Mon Sep 17 00:00:00 2001 From: huangrh Date: Sun, 31 Mar 2019 02:43:04 +0800 Subject: [PATCH 3/3] [REV] remove flag real-port --- api/handler/third_party_service_handler.go | 37 +++------------------- 1 file changed, 5 insertions(+), 32 deletions(-) diff --git a/api/handler/third_party_service_handler.go b/api/handler/third_party_service_handler.go index 21148280b..1c00f9fa0 100644 --- a/api/handler/third_party_service_handler.go +++ b/api/handler/third_party_service_handler.go @@ -19,7 +19,6 @@ package handler import ( - "encoding/json" "fmt" "github.com/Sirupsen/logrus" "github.com/goodrain/rainbond/api/model" @@ -27,8 +26,6 @@ import ( dbmodel "github.com/goodrain/rainbond/db/model" "github.com/goodrain/rainbond/util" "github.com/goodrain/rainbond/worker/client" - "strconv" - "strings" ) // ThirdPartyServiceHanlder handles business logic for all third-party services @@ -96,8 +93,6 @@ func (t *ThirdPartyServiceHanlder) DelEndpoints(epid, sid string) error { // ListEndpoints lists third-party service endpoints. func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointResp, error) { endpoints, err := t.dbmanager.EndpointsDao().List(sid) - b, _ := json.Marshal(endpoints) - logrus.Debugf("Endpoints from db: %s", string(b)) if err != nil { logrus.Warningf("ServiceID: %s; error listing endpoints from db; %v", sid, err) } @@ -105,9 +100,9 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR for _, item := range endpoints { m[item.UUID] = &model.EndpointResp{ EpID: item.UUID, - IP: func(ip string, port int) string { - if port != 0 { - return fmt.Sprintf("%s:%d", ip, port) + IP: func(ip string, p int) string { + if p != 0 { + return fmt.Sprintf("%s:%d", ip, p) } return ip }(item.IP, item.Port), @@ -123,8 +118,6 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR return nil, err } if thirdPartyEndpoints != nil && thirdPartyEndpoints.Obj != nil { - b, _ = json.Marshal(thirdPartyEndpoints) - logrus.Debugf("Endpoints from rpc: %s", string(b)) for _, item := range thirdPartyEndpoints.Obj { ep := m[item.Uuid] if ep != nil { @@ -133,13 +126,8 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR continue } m[item.Uuid] = &model.EndpointResp{ - EpID: item.Uuid, - IP: func(ip string, port int32) string { - if port != 0 { - return fmt.Sprintf("%s:%d", ip, port) - } - return ip - }(item.Ip, item.Port), + EpID: item.Uuid, + IP: item.Ip, Status: item.Status, IsOnline: true, IsStatic: false, @@ -154,18 +142,3 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR return res, nil } - -func splitIP(input string) (string, int) { - sli := strings.Split(input, ":") - if len(sli) == 2 { - return sli[0], func(port string) int { - p, err := strconv.Atoi(port) - if err != nil { - logrus.Warningf("String: %s; error converting string to int", port) - return 0 - } - return p - }(sli[1]) - } - return input, 0 -}