[REV] Optimize load balancing control (#126 #111)

This commit is contained in:
barnettZQG 2018-10-15 14:54:19 +08:00
parent f4b26b9d50
commit 1f08870a42
5 changed files with 33 additions and 34 deletions

View File

@ -185,6 +185,27 @@ func (m *manager) add(source object.Object) {
}
func (m *manager) delete(source object.Object) {
switch source.(type) {
case *object.PoolObject:
pool := source.(*object.PoolObject)
plugin, err := m.getPlugin(pool.PluginName, pool.PluginOpts)
if err != nil {
logrus.Errorf("get default plugin error.%s pool don't add to lb", err.Error())
return
}
if plugin.GetName() == "zeus" {
//zeus do not handle pool delete
return
}
nodes, err := m.storeManager.GetNodeByPool(pool.GetName())
if err != nil {
logrus.Errorf("get node list by pool name(%s) error.%s,do not delete pool", pool.GetName(), err.Error())
return
}
if len(nodes) != 0 {
return
}
}
ok, err := m.storeManager.DeleteSource(source)
if err != nil {
logrus.Errorf("Update %s to store error.%s", source.GetName(), err.Error())

View File

@ -62,7 +62,7 @@ type openresty struct {
var defaultNodeList = []NginxNode{
{
"Active",
"127.0.0.1:404",
"127.0.0.1:10004",
1,
}}
@ -204,14 +204,12 @@ func (o *openresty) AddPool(originalPools ...*object.PoolObject) error {
// nginx默认会试图将所有请求根据请求头中的host字段转发到名字与该host字段值相同的upstream
func (o *openresty) UpdatePool(pools ...*object.PoolObject) error {
var errs []error
for _, originalPool := range pools {
upstreamName, err := getUpstreamNameByPool(originalPool.Name)
if err != nil {
logrus.Error(fmt.Sprintf("Failed to update pool %s: %s", originalPool.Name, err))
continue
}
// get nodes from store, for example etcd.
originalNodes, err := o.ctx.Store.GetNodeByPool(originalPool.Name)
if err != nil {
@ -219,19 +217,11 @@ func (o *openresty) UpdatePool(pools ...*object.PoolObject) error {
errs = append(errs, err)
continue
}
if len(originalNodes) < 1 {
logrus.Info("Delete the pool, because no servers are inside the pool ", originalPool.Name)
o.deleteUpstream(originalPool.Name)
continue
}
protocol := "tcp"
_, err = o.ctx.Store.GetVSByPoolName(originalPool.Name)
if err != nil {
protocol = "http"
}
// build pool for openresty by original nodes
pool := NginxUpstream{upstreamName, []NginxNode{}, protocol}
for _, originalNode := range originalNodes {
@ -239,13 +229,11 @@ func (o *openresty) UpdatePool(pools ...*object.PoolObject) error {
if state == "" {
state = "Active"
}
addr := fmt.Sprintf("%s:%d", originalNode.Host, originalNode.Port)
if len(originalNode.Host) < 7 || originalNode.Port < 1 {
logrus.Info(fmt.Sprintf("Ignore node in pool %s, illegal address [%s]", pool.Name, addr))
continue
}
weight := originalNode.Weight
if weight < 1 {
weight = 1
@ -259,13 +247,11 @@ func (o *openresty) UpdatePool(pools ...*object.PoolObject) error {
}
if len(pool.Servers) < 1 {
logrus.Info("Ignore update the pool, because no servers are inside the pool ", pool.Name)
continue
logrus.Warningf("nginx upstream %s not have server,will use default server", pool.Name)
pool.Servers = defaultNodeList
}
// push data to all openresty instance by rest api
err = o.doEach(UPDATE, o.urlPool(pool.Name), pool)
if err != nil {
errs = append(errs, err)
continue
@ -278,16 +264,14 @@ func (o *openresty) UpdatePool(pools ...*object.PoolObject) error {
func (o *openresty) DeletePool(pools ...*object.PoolObject) error {
var errs []error
for _, pool := range pools {
err := o.deleteUpstream(pool.Name)
if err != nil {
errs = append(errs, err)
logrus.Error(err)
logrus.Errorf("delete nginx upstream %s error %s", pool.Name, err.Error())
continue
}
}
return reduceErr(errs)
}
@ -302,17 +286,14 @@ func (o *openresty) AddNode(nodes ...*object.NodeObject) error {
// 将node根据所属pool分类根据每个pool名字取出该pool下所有node然后全量更新
func (o *openresty) UpdateNode(nodes ...*object.NodeObject) error {
poolNames := make(map[string]string, 0)
for _, node := range nodes {
poolNames[node.PoolName] = node.NodeName
}
pools, err := o.ctx.Store.GetPools(poolNames)
if err != nil {
logrus.Error(err)
return err
}
return o.UpdatePool(pools...)
}
@ -490,9 +471,7 @@ func (o *openresty) DeleteRule(rules ...*object.RuleObject) error {
if rule.HTTPS {
protocol = "https"
}
err := o.doEach(DELETE, o.urlServer(rule.Name), Options{protocol})
if err != nil {
errs = append(errs, err)
logrus.Error(err)

View File

@ -390,7 +390,6 @@ func (z *zeus) DeletePool(pools ...*object.PoolObject) error {
if err != nil {
errs = append(errs, err)
}
}
return handleErr(errs)
}

View File

@ -25,8 +25,6 @@ import (
"github.com/goodrain/rainbond/entrance/core"
"github.com/Sirupsen/logrus"
"k8s.io/client-go/pkg/api/v1"
)
@ -84,7 +82,6 @@ const (
)
func (s *SourceBranch) RePoolName() string {
logrus.Debugf("tenant name is %s, in RePoolName", s.Tenant)
return fmt.Sprintf("%s@%s_%d.Pool",
s.Tenant,
s.Service,
@ -93,7 +90,6 @@ func (s *SourceBranch) RePoolName() string {
}
func (s *SourceBranch) ReVSName() string {
logrus.Debugf("tenant name is %s, in ReVSName", s.Tenant)
return fmt.Sprintf("%s_%s_%d.VS",
s.Tenant,
s.Service,
@ -102,7 +98,6 @@ func (s *SourceBranch) ReVSName() string {
}
func (s *SourceBranch) ReNodeName() string {
logrus.Debugf("tenant name is %s, in ReNodeName", s.Tenant)
return fmt.Sprintf("%s_%d.Node",
s.PodName,
s.ContainerPort,
@ -117,7 +112,6 @@ func sha8(s string) string {
}
func (s *SourceBranch) ReRuleName(domain string) string {
logrus.Debugf("tenant name is %s, in ReRuleName", s.Tenant)
return fmt.Sprintf("%s_%s_%d_%s.Rule",
s.Tenant,
s.Service,
@ -127,7 +121,6 @@ func (s *SourceBranch) ReRuleName(domain string) string {
}
func (s *SourceBranch) ReServiceId() string {
logrus.Debugf("tenant name is %s, in ReServiceId", s.Tenant)
return fmt.Sprintf("%s.%s_%d",
s.Tenant,
s.Service,

View File

@ -69,7 +69,7 @@ func (m *Manager) addPodSource(s *config.SourceBranch) {
} else {
s.NodePort = s.ContainerPort
}
// event pool
// event pool first
m.RcPool(s)
// event node
m.RcNode(s)
@ -101,10 +101,17 @@ func (m *Manager) deletePodSource(s *config.SourceBranch) {
if !s.IsMidonet {
s.NodePort = s.ContainerPort
}
// event node first
m.RcNode(s)
// event pool
m.RcPool(s)
}
func (m *Manager) podSource(pods *v1.Pod, method core.EventMethod) {
//if pod do not have ip and method is update,ignore it
if pods.Status.PodIP == "" && method == core.UPDATEEventMethod {
return
}
index, _ := strconv.ParseInt(pods.ResourceVersion, 10, 64)
var flagHost bool
for _, envs := range pods.Spec.Containers[0].Env {