diff --git a/gateway/store/store.go b/gateway/store/store.go index 1f71450ee..78e371d1a 100644 --- a/gateway/store/store.go +++ b/gateway/store/store.go @@ -21,7 +21,9 @@ package store import ( "bytes" "fmt" + "github.com/goodrain/rainbond/db/model" "io/ioutil" + "k8s.io/apimachinery/pkg/labels" "net" "os" "reflect" @@ -37,7 +39,7 @@ import ( "github.com/goodrain/rainbond/gateway/controller/config" "github.com/goodrain/rainbond/gateway/defaults" "github.com/goodrain/rainbond/gateway/util" - v1 "github.com/goodrain/rainbond/gateway/v1" + "github.com/goodrain/rainbond/gateway/v1" corev1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -118,7 +120,8 @@ type k8sStore struct { conf *option.Config client kubernetes.Interface // informer contains the cache Informers - informers *Informer + informers *Informer + sharedInformer informers.SharedInformerFactory // Lister contains object listers (stores). listers *Lister secretIngressMap *secretIngressMap @@ -158,21 +161,21 @@ func New(client kubernetes.Interface, store.listers.IngressAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) // create informers factory, enable and assign required informers - infFactory := informers.NewFilteredSharedInformerFactory(client, conf.ResyncPeriod, corev1.NamespaceAll, + store.sharedInformer = informers.NewFilteredSharedInformerFactory(client, conf.ResyncPeriod, corev1.NamespaceAll, func(options *metav1.ListOptions) { options.LabelSelector = "creater=Rainbond" }) - store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer() + store.informers.Ingress = store.sharedInformer.Extensions().V1beta1().Ingresses().Informer() store.listers.Ingress.Store = store.informers.Ingress.GetStore() - store.informers.Service = infFactory.Core().V1().Services().Informer() + store.informers.Service = store.sharedInformer.Core().V1().Services().Informer() store.listers.Service.Store = store.informers.Service.GetStore() - store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer() + store.informers.Endpoint = store.sharedInformer.Core().V1().Endpoints().Informer() store.listers.Endpoint.Store = store.informers.Endpoint.GetStore() - store.informers.Secret = infFactory.Core().V1().Secrets().Informer() + store.informers.Secret = store.sharedInformer.Core().V1().Secrets().Informer() store.listers.Secret.Store = store.informers.Secret.GetStore() ingEventHandler := cache.ResourceEventHandlerFuncs{ @@ -373,86 +376,101 @@ func (s *k8sStore) extractAnnotations(ing *extensions.Ingress) { func (s *k8sStore) ListPool() ([]*v1.Pool, []*v1.Pool) { var httpPools []*v1.Pool var tcpPools []*v1.Pool - l7Pools := make(map[string]*v1.Pool) - l4Pools := make(map[string]*v1.Pool) - for _, item := range s.listers.Endpoint.List() { - ep := item.(*corev1.Endpoints) - labels := ep.GetLabels() - name, ok := labels["name"] - if !ok { - logrus.Warningf("there is no name in the labels of corev1.Endpoints(%s/%s)", - ep.Namespace, ep.Name) - continue - } - - if ep.Subsets != nil || len(ep.Subsets) != 0 { - // l7 - backends := l7PoolBackendMap[name] - for _, backend := range backends { - pool := l7Pools[backend.name] - if pool == nil { - pool = &v1.Pool{ - Nodes: []*v1.Node{}, - } - pool.Name = backend.name - pool.UpstreamHashBy = backend.hashBy - l7Pools[backend.name] = pool - } - for _, ss := range ep.Subsets { - var addresses []corev1.EndpointAddress - if ss.Addresses != nil && len(ss.Addresses) > 0 { - addresses = append(addresses, ss.Addresses...) - } else { - addresses = append(addresses, ss.NotReadyAddresses...) - } - for _, address := range addresses { - if _, ok := l7PoolMap[name]; ok { // l7 - pool.Nodes = append(pool.Nodes, &v1.Node{ - Host: address.IP, - Port: ss.Ports[0].Port, - Weight: backend.weight, - }) - } - } + f := func(backendMap map[string][]backend, poolMap map[string]struct{}) map[string]*v1.Pool { + pools := make(map[string]*v1.Pool) + for k, v := range backendMap { + service, err := s.listers.Service.ByKey(k) + if err != nil { + logrus.Warningf("Key: %s;error getting service: %v", k, err) + continue + } + originPort := service.GetLabels()["origin_port"] + var pluginPort int32 + if originPort != "" { + port, err := strconv.Atoi(originPort) + if err != nil { + logrus.Warningf("Origin port: %s; error converting to type int: %v", err) + pluginPort = 0 + } else { + pluginPort = int32(port) } } - // l4 - backends = l4PoolBackendMap[name] - for _, backend := range backends { - pool := l4Pools[backend.name] - if pool == nil { - pool = &v1.Pool{ - Nodes: []*v1.Node{}, - } - pool.Name = backend.name - l4Pools[backend.name] = pool + + // list related endpoints + labelname, ok := service.GetLabels()["name"] + if !ok { + logrus.Warningf("Key: %s;label 'name' not found", k) + continue + } + selector, err := labels.Parse(fmt.Sprintf("name=%s", labelname)) + if err != nil { + logrus.Warningf("Label: %s; error parsing labels: %s", + fmt.Sprintf("name=%s", labelname), err.Error()) + continue + } + endpoints, err := s.sharedInformer.Core().V1().Endpoints().Lister().Endpoints(service.GetNamespace()).List(selector) + if err != nil { + logrus.Warningf("Label: %s; error listing endpoints: %v", + fmt.Sprintf("name=%s", labelname), err) + continue + } + + for _, ep := range endpoints { + if ep.Subsets == nil || len(ep.Subsets) == 0 { + continue } - for _, ss := range ep.Subsets { - var addresses []corev1.EndpointAddress - if ss.Addresses != nil && len(ss.Addresses) > 0 { - addresses = append(addresses, ss.Addresses...) - } else { - addresses = append(addresses, ss.NotReadyAddresses...) + if ep.GetLabels()["service_kind"] != model.ServiceKindThirdParty.String() { + if ep.Subsets[0].Ports[0].Port != service.Spec.Ports[0].Port { + continue } - for _, address := range addresses { - if _, ok := l4PoolMap[name]; ok { // l7 - pool.Nodes = append(pool.Nodes, &v1.Node{ - Host: address.IP, - Port: ss.Ports[0].Port, - Weight: backend.weight, - }) + } + + for _, backend := range v { + pool := pools[backend.name] + if pool == nil { + pool = &v1.Pool{ + Nodes: []*v1.Node{}, + } + pool.Name = backend.name + pool.UpstreamHashBy = backend.hashBy + pools[backend.name] = pool + } + for _, ss := range ep.Subsets { + var addresses []corev1.EndpointAddress + if ss.Addresses != nil && len(ss.Addresses) > 0 { + addresses = append(addresses, ss.Addresses...) + } else { + addresses = append(addresses, ss.NotReadyAddresses...) + } + for _, address := range addresses { + if _, ok := poolMap[k]; ok { // l7 + pool.Nodes = append(pool.Nodes, &v1.Node{ + Host: address.IP, + Port: func(pluginPort, addressPort int32) int32 { + if pluginPort != 0 { + return pluginPort + } + return ss.Ports[0].Port + }(pluginPort, ss.Ports[0].Port), + Weight: backend.weight, + }) + } } } } } } + return pools } + + l7Pools := f(l7PoolBackendMap, l7PoolMap) + l4Pools := f(l4PoolBackendMap, l4PoolMap) + // change map to slice TODO: use map directly for _, pool := range l7Pools { httpPools = append(httpPools, pool) } - for _, pool := range l4Pools { tcpPools = append(tcpPools, pool) } @@ -479,11 +497,6 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V } if anns.L4.L4Enable && anns.L4.L4Port != 0 { svcKey := fmt.Sprintf("%v/%v", ing.Namespace, ing.Spec.Backend.ServiceName) - name, err := s.GetServiceNameLabelByKey(svcKey) - if err != nil { - logrus.Warningf("key: %s; error getting service name label: %v", svcKey, err) - continue - } // region l4 host := strings.Replace(anns.L4.L4Host, " ", "", -1) @@ -507,11 +520,11 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V vs.Namespace = anns.Namespace vs.ServiceID = anns.Labels["service_id"] } - l4PoolMap[name] = struct{}{} + l4PoolMap[svcKey] = struct{}{} l4vsMap[listening] = vs l4vs = append(l4vs, vs) backend := backend{name: backendName, weight: anns.Weight.Weight} - l4PoolBackendMap[name] = append(l4PoolBackendMap[name], backend) + l4PoolBackendMap[svcKey] = append(l4PoolBackendMap[svcKey], backend) // endregion } else { // region l7 @@ -568,14 +581,9 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V } for _, path := range rule.IngressRuleValue.HTTP.Paths { svckey := fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName) - name, err := s.GetServiceNameLabelByKey(svckey) - if err != nil { - logrus.Warningf("key: %s; error getting service name label: %v", svckey, err) - continue - } locKey := fmt.Sprintf("%s_%s", virSrvName, path.Path) location := srvLocMap[locKey] - l7PoolMap[name] = struct{}{} + l7PoolMap[svckey] = struct{}{} // if location do not exists, then creates a new one if location == nil { location = &v1.Location{ @@ -610,7 +618,7 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V if anns.UpstreamHashBy != "" { backend.hashBy = anns.UpstreamHashBy } - l7PoolBackendMap[name] = append(l7PoolBackendMap[name], backend) + l7PoolBackendMap[svckey] = append(l7PoolBackendMap[svckey], backend) } } // endregion @@ -635,38 +643,38 @@ func (s *k8sStore) ingressIsValid(ing *extensions.Ingress) bool { } } } - labelname, err := s.GetServiceNameLabelByKey(svcKey) + service, err := s.listers.Service.ByKey(svcKey) if err != nil { - logrus.Warningf("label: %s; error parsing label: %v", labelname, err) + logrus.Warningf("Key: %s; error getting key: %v", err) return false } - endpointsList, err := s.client.CoreV1().Endpoints(ing.Namespace).List(metav1.ListOptions{ - LabelSelector: fmt.Sprintf("name=%s", labelname), - }) + labelname := fmt.Sprintf("name=%s", service.GetLabels()["name"]) + selector, err := labels.Parse(labelname) if err != nil { - logrus.Warningf("selector: %s; error list endpoints: %v", + logrus.Warningf("Label: %s;error parsing labels: %v", labelname, err) + return false + } + eps, err := s.sharedInformer.Core().V1().Endpoints().Lister().Endpoints(service.GetNamespace()).List(selector) + if err != nil { + logrus.Warningf("selector: %s; error listing endpoints: %v", fmt.Sprintf("name=%s", labelname), err) return false } - if endpointsList == nil || endpointsList.Items == nil || len(endpointsList.Items) == 0 { - logrus.Warningf("selector: %s; can't find any endpoints", - fmt.Sprintf("name=%s", labelname)) - return false + if eps == nil || len(eps) == 0 { + logrus.Warningf("Selector: %s; empty endpoints", fmt.Sprintf("name=%s", labelname)) } result := false -RESULT: - for _, ep := range endpointsList.Items { + for _, ep := range eps { if ep.Subsets != nil && len(ep.Subsets) > 0 { - //logrus.Warningf("selector: %s; empty endpoints subsets; endpoints: %V", - //fmt.Sprintf("name=%s", labelname), ep) - for _, e := range ep.Subsets { - if !((e.Addresses == nil || len(e.Addresses) == 0) && (e.NotReadyAddresses == nil || len(e.NotReadyAddresses) == 0)) { - //logrus.Warningf("selector: %s; empty endpoints addresses; endpoints: %V", - // fmt.Sprintf("name=%s", labelname), ep) - result = true - break RESULT - } + e := ep.Subsets[0] + if ep.GetLabels()["service_kind"] != model.ServiceKindThirdParty.String() && + e.Ports[0].Port != service.Spec.Ports[0].Port { + continue + } + if !((e.Addresses == nil || len(e.Addresses) == 0) && (e.NotReadyAddresses == nil || len(e.NotReadyAddresses) == 0)) { + result = true + break } } } diff --git a/hack/contrib/docker/gateway/nginxtmp/servers.tmpl b/hack/contrib/docker/gateway/nginxtmp/servers.tmpl index 2d15fad18..21ca7d0f3 100644 --- a/hack/contrib/docker/gateway/nginxtmp/servers.tmpl +++ b/hack/contrib/docker/gateway/nginxtmp/servers.tmpl @@ -81,9 +81,7 @@ server { proxy_cookie_domain {{ $loc.Proxy.CookieDomain }}; proxy_cookie_path {{ $loc.Proxy.CookiePath }}; - {{ if isValidByteSize $loc.Proxy.BodySize true }} client_max_body_size {{ $loc.Proxy.BodySize }}m; - {{ end }} {{ if $loc.DisableAccessLog }} access_log off; diff --git a/worker/appm/store/store.go b/worker/appm/store/store.go index 46a4de9e8..5972e72b6 100644 --- a/worker/appm/store/store.go +++ b/worker/appm/store/store.go @@ -198,11 +198,7 @@ func NewStore(clientset *kubernetes.Clientset, } }, UpdateFunc: func(old, cur interface{}) { - //oep := old.(*corev1.Endpoints) cep := cur.(*corev1.Endpoints) - logrus.Debugf("received update endpoints; name: %s", cep.GetName()) - //if cep.ResourceVersion != oep.ResourceVersion && - // !reflect.DeepEqual(cep.Subsets, oep.Subsets) { serviceID := cep.Labels["service_id"] version := cep.Labels["version"] diff --git a/worker/handle/manager.go b/worker/handle/manager.go index 0f650f1fd..fdcb6b0a6 100644 --- a/worker/handle/manager.go +++ b/worker/handle/manager.go @@ -274,7 +274,7 @@ func (m *Manager) verticalScalingExec(task *model.Task) error { } newAppService.Logger = logger appService.SetUpgradePatch(newAppService) - err = m.controllerManager.StartController(controller.TypeUpgradeController, *appService) + err = m.controllerManager.StartController(controller.TypeUpgradeController, *newAppService) if err != nil { logrus.Errorf("Application run vertical scaling(upgrade) controller failure:%s", err.Error()) logger.Info("Application run vertical scaling(upgrade) controller failure", controller.GetCallbackLoggerOption())