From 1f08870a42573485971a5c67c87e9bc1caf4db3d Mon Sep 17 00:00:00 2001 From: barnettZQG Date: Mon, 15 Oct 2018 14:54:19 +0800 Subject: [PATCH] [REV] Optimize load balancing control (#126 #111) --- entrance/core/manager.go | 21 ++++++++++++++ entrance/plugin/openresty/openresty_plugin.go | 29 +++---------------- entrance/plugin/zeus/zeus_plugin.go | 1 - entrance/source/config/config.go | 7 ----- entrance/source/handle.go | 9 +++++- 5 files changed, 33 insertions(+), 34 deletions(-) diff --git a/entrance/core/manager.go b/entrance/core/manager.go index 8eea5f40a..bf7cff347 100644 --- a/entrance/core/manager.go +++ b/entrance/core/manager.go @@ -185,6 +185,27 @@ func (m *manager) add(source object.Object) { } func (m *manager) delete(source object.Object) { + switch source.(type) { + case *object.PoolObject: + pool := source.(*object.PoolObject) + plugin, err := m.getPlugin(pool.PluginName, pool.PluginOpts) + if err != nil { + logrus.Errorf("get default plugin error.%s pool don't add to lb", err.Error()) + return + } + if plugin.GetName() == "zeus" { + //zeus do not handle pool delete + return + } + nodes, err := m.storeManager.GetNodeByPool(pool.GetName()) + if err != nil { + logrus.Errorf("get node list by pool name(%s) error.%s,do not delete pool", pool.GetName(), err.Error()) + return + } + if len(nodes) != 0 { + return + } + } ok, err := m.storeManager.DeleteSource(source) if err != nil { logrus.Errorf("Update %s to store error.%s", source.GetName(), err.Error()) diff --git a/entrance/plugin/openresty/openresty_plugin.go b/entrance/plugin/openresty/openresty_plugin.go index d4b85c34b..bcfb56988 100644 --- a/entrance/plugin/openresty/openresty_plugin.go +++ b/entrance/plugin/openresty/openresty_plugin.go @@ -62,7 +62,7 @@ type openresty struct { var defaultNodeList = []NginxNode{ { "Active", - "127.0.0.1:404", + "127.0.0.1:10004", 1, }} @@ -204,14 +204,12 @@ func (o *openresty) AddPool(originalPools ...*object.PoolObject) error { // nginx默认会试图将所有请求根据请求头中的host字段转发到名字与该host字段值相同的upstream func (o *openresty) UpdatePool(pools ...*object.PoolObject) error { var errs []error - for _, originalPool := range pools { upstreamName, err := getUpstreamNameByPool(originalPool.Name) if err != nil { logrus.Error(fmt.Sprintf("Failed to update pool %s: %s", originalPool.Name, err)) continue } - // get nodes from store, for example etcd. originalNodes, err := o.ctx.Store.GetNodeByPool(originalPool.Name) if err != nil { @@ -219,19 +217,11 @@ func (o *openresty) UpdatePool(pools ...*object.PoolObject) error { errs = append(errs, err) continue } - - if len(originalNodes) < 1 { - logrus.Info("Delete the pool, because no servers are inside the pool ", originalPool.Name) - o.deleteUpstream(originalPool.Name) - continue - } - protocol := "tcp" _, err = o.ctx.Store.GetVSByPoolName(originalPool.Name) if err != nil { protocol = "http" } - // build pool for openresty by original nodes pool := NginxUpstream{upstreamName, []NginxNode{}, protocol} for _, originalNode := range originalNodes { @@ -239,13 +229,11 @@ func (o *openresty) UpdatePool(pools ...*object.PoolObject) error { if state == "" { state = "Active" } - addr := fmt.Sprintf("%s:%d", originalNode.Host, originalNode.Port) if len(originalNode.Host) < 7 || originalNode.Port < 1 { logrus.Info(fmt.Sprintf("Ignore node in pool %s, illegal address [%s]", pool.Name, addr)) continue } - weight := originalNode.Weight if weight < 1 { weight = 1 @@ -259,13 +247,11 @@ func (o *openresty) UpdatePool(pools ...*object.PoolObject) error { } if len(pool.Servers) < 1 { - logrus.Info("Ignore update the pool, because no servers are inside the pool ", pool.Name) - continue + logrus.Warningf("nginx upstream %s not have server,will use default server", pool.Name) + pool.Servers = defaultNodeList } - // push data to all openresty instance by rest api err = o.doEach(UPDATE, o.urlPool(pool.Name), pool) - if err != nil { errs = append(errs, err) continue @@ -278,16 +264,14 @@ func (o *openresty) UpdatePool(pools ...*object.PoolObject) error { func (o *openresty) DeletePool(pools ...*object.PoolObject) error { var errs []error - for _, pool := range pools { err := o.deleteUpstream(pool.Name) if err != nil { errs = append(errs, err) - logrus.Error(err) + logrus.Errorf("delete nginx upstream %s error %s", pool.Name, err.Error()) continue } } - return reduceErr(errs) } @@ -302,17 +286,14 @@ func (o *openresty) AddNode(nodes ...*object.NodeObject) error { // 将node根据所属pool分类,根据每个pool名字取出该pool下所有node,然后全量更新 func (o *openresty) UpdateNode(nodes ...*object.NodeObject) error { poolNames := make(map[string]string, 0) - for _, node := range nodes { poolNames[node.PoolName] = node.NodeName } - pools, err := o.ctx.Store.GetPools(poolNames) if err != nil { logrus.Error(err) return err } - return o.UpdatePool(pools...) } @@ -490,9 +471,7 @@ func (o *openresty) DeleteRule(rules ...*object.RuleObject) error { if rule.HTTPS { protocol = "https" } - err := o.doEach(DELETE, o.urlServer(rule.Name), Options{protocol}) - if err != nil { errs = append(errs, err) logrus.Error(err) diff --git a/entrance/plugin/zeus/zeus_plugin.go b/entrance/plugin/zeus/zeus_plugin.go index 72f7427fa..94b00d41b 100644 --- a/entrance/plugin/zeus/zeus_plugin.go +++ b/entrance/plugin/zeus/zeus_plugin.go @@ -390,7 +390,6 @@ func (z *zeus) DeletePool(pools ...*object.PoolObject) error { if err != nil { errs = append(errs, err) } - } return handleErr(errs) } diff --git a/entrance/source/config/config.go b/entrance/source/config/config.go index 7f89a5c96..e698dc50a 100644 --- a/entrance/source/config/config.go +++ b/entrance/source/config/config.go @@ -25,8 +25,6 @@ import ( "github.com/goodrain/rainbond/entrance/core" - "github.com/Sirupsen/logrus" - "k8s.io/client-go/pkg/api/v1" ) @@ -84,7 +82,6 @@ const ( ) func (s *SourceBranch) RePoolName() string { - logrus.Debugf("tenant name is %s, in RePoolName", s.Tenant) return fmt.Sprintf("%s@%s_%d.Pool", s.Tenant, s.Service, @@ -93,7 +90,6 @@ func (s *SourceBranch) RePoolName() string { } func (s *SourceBranch) ReVSName() string { - logrus.Debugf("tenant name is %s, in ReVSName", s.Tenant) return fmt.Sprintf("%s_%s_%d.VS", s.Tenant, s.Service, @@ -102,7 +98,6 @@ func (s *SourceBranch) ReVSName() string { } func (s *SourceBranch) ReNodeName() string { - logrus.Debugf("tenant name is %s, in ReNodeName", s.Tenant) return fmt.Sprintf("%s_%d.Node", s.PodName, s.ContainerPort, @@ -117,7 +112,6 @@ func sha8(s string) string { } func (s *SourceBranch) ReRuleName(domain string) string { - logrus.Debugf("tenant name is %s, in ReRuleName", s.Tenant) return fmt.Sprintf("%s_%s_%d_%s.Rule", s.Tenant, s.Service, @@ -127,7 +121,6 @@ func (s *SourceBranch) ReRuleName(domain string) string { } func (s *SourceBranch) ReServiceId() string { - logrus.Debugf("tenant name is %s, in ReServiceId", s.Tenant) return fmt.Sprintf("%s.%s_%d", s.Tenant, s.Service, diff --git a/entrance/source/handle.go b/entrance/source/handle.go index 8a662ee6e..d01accfca 100644 --- a/entrance/source/handle.go +++ b/entrance/source/handle.go @@ -69,7 +69,7 @@ func (m *Manager) addPodSource(s *config.SourceBranch) { } else { s.NodePort = s.ContainerPort } - // event pool + // event pool first m.RcPool(s) // event node m.RcNode(s) @@ -101,10 +101,17 @@ func (m *Manager) deletePodSource(s *config.SourceBranch) { if !s.IsMidonet { s.NodePort = s.ContainerPort } + // event node first m.RcNode(s) + // event pool + m.RcPool(s) } func (m *Manager) podSource(pods *v1.Pod, method core.EventMethod) { + //if pod do not have ip and method is update,ignore it + if pods.Status.PodIP == "" && method == core.UPDATEEventMethod { + return + } index, _ := strconv.ParseInt(pods.ResourceVersion, 10, 64) var flagHost bool for _, envs := range pods.Spec.Containers[0].Env {