mirror of
https://gitee.com/rainbond/Rainbond.git
synced 2024-12-01 03:07:51 +08:00
Merge pull request #290 from GLYASAI/RAINBOND-879
[FIX] duplicate gateway backends
This commit is contained in:
commit
bcb7938c36
@ -262,9 +262,21 @@ func (gwc *GWController) updateRbdPools(edps map[string][]string) {
|
||||
|
||||
if h != nil {
|
||||
//merge app pool
|
||||
h = append(h, gwc.rhp...)
|
||||
for _, rbd := range h {
|
||||
exist := false
|
||||
for idx := range gwc.rhp {
|
||||
item := gwc.rhp[idx]
|
||||
if rbd.Name == item.Name {
|
||||
gwc.rhp[idx] = rbd
|
||||
exist = true
|
||||
}
|
||||
}
|
||||
if !exist {
|
||||
gwc.rhp = append(gwc.rhp, rbd)
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := gwc.GWS.UpdatePools(h, t); err != nil {
|
||||
if err := gwc.GWS.UpdatePools(gwc.rhp, t); err != nil {
|
||||
logrus.Errorf("update rainbond pools failure %s", err.Error())
|
||||
}
|
||||
}
|
||||
@ -374,7 +386,6 @@ func (gwc *GWController) listRbdEndpoints() (map[string][]string, int64) {
|
||||
|
||||
var d []string
|
||||
for _, dat := range data {
|
||||
logrus.Debugf("dat: %s", dat)
|
||||
s := strings.Split(dat, ":")
|
||||
if len(s) != 2 || strings.Replace(s[0], " ", "", -1) == "" {
|
||||
logrus.Warningf("wrong endpoint: %s", dat)
|
||||
|
178
gateway/controller/openresty/template/template_darwin.go
Normal file
178
gateway/controller/openresty/template/template_darwin.go
Normal file
@ -0,0 +1,178 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package template
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
text_template "text/template"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/goodrain/rainbond/cmd/gateway/option"
|
||||
"github.com/goodrain/rainbond/gateway/controller/openresty/model"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
defBufferSize = 65535
|
||||
//CustomConfigPath custom config file path
|
||||
CustomConfigPath = "/run/nginx/conf"
|
||||
//tmplPath Tmpl config file path
|
||||
tmplPath = "/run/nginxtmp/tmpl"
|
||||
)
|
||||
|
||||
func init() {
|
||||
if os.Getenv("NGINX_CONFIG_TMPL") != "" {
|
||||
tmplPath = os.Getenv("NGINX_CONFIG_TMPL")
|
||||
}
|
||||
}
|
||||
|
||||
// Template ...
|
||||
type Template struct {
|
||||
tmpl *text_template.Template
|
||||
//fw watch.FileWatcher
|
||||
bp *BufferPool
|
||||
}
|
||||
|
||||
//NewTemplate returns a new Template instance or an
|
||||
//error if the specified template file contains errors
|
||||
func NewTemplate(fileName string) (*Template, error) {
|
||||
tmplFile, err := ioutil.ReadFile(fileName)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "unexpected error reading template %v", tmplFile)
|
||||
}
|
||||
|
||||
tmpl, err := text_template.New("gateway").Funcs(funcMap).Parse(string(tmplFile))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Template{
|
||||
tmpl: tmpl,
|
||||
bp: NewBufferPool(defBufferSize),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewNginxTemplate creates a nginx configuration file(nginx.conf)
|
||||
func NewNginxTemplate(data *model.Nginx, defaultNginxConf string) error {
|
||||
if e := Persist(tmplPath+"/nginx.tmpl", data, path.Dir(defaultNginxConf), path.Base(defaultNginxConf)); e != nil {
|
||||
return e
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//ServerContext ServerContext
|
||||
type ServerContext struct {
|
||||
Servers []*model.Server
|
||||
Set option.Config
|
||||
}
|
||||
|
||||
// NewServerTemplate creates a configuration file for the nginx server module
|
||||
func NewServerTemplate(data *ServerContext, filename string) error {
|
||||
if e := Persist(tmplPath+"/servers.tmpl", data, CustomConfigPath, filename); e != nil {
|
||||
return e
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewServerTemplateWithCfgPath creates a configuration file for the nginx server module
|
||||
func NewServerTemplateWithCfgPath(data *ServerContext, cfgPath string, filename string) error {
|
||||
if e := Persist(tmplPath+"/servers.tmpl", data, cfgPath, filename); e != nil {
|
||||
return e
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewUpstreamTemplate creates a configuration file for the nginx upstream module
|
||||
func NewUpstreamTemplate(data []model.Upstream, tmpl, filename string) error {
|
||||
if e := Persist(tmplPath+"/"+tmpl, data, CustomConfigPath, filename); e != nil {
|
||||
return e
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewUpstreamTemplateWithCfgPath creates a configuration file for the nginx upstream module
|
||||
func NewUpstreamTemplateWithCfgPath(data []*model.Upstream, tmpl, cfgPath string, filename string) error {
|
||||
if e := Persist(tmplPath+"/"+tmpl, data, cfgPath, filename); e != nil {
|
||||
return e
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewUpdateUpsTemplate creates a configuration file for the nginx upstream module
|
||||
func NewUpdateUpsTemplate(data []model.Upstream, tmpl, path string, filename string) error {
|
||||
if e := Persist(tmplPath+"/"+tmpl, data, path, filename); e != nil {
|
||||
return e
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Persist persists the nginx configuration file to disk
|
||||
func Persist(tmplFilename string, data interface{}, p string, f string) error {
|
||||
tpl, err := NewTemplate(tmplFilename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rt, err := tpl.Write(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f = fmt.Sprintf("%s/%s", p, f)
|
||||
p = path.Dir(f)
|
||||
f = path.Base(f)
|
||||
if !isExists(p) {
|
||||
logrus.Debugf("mkdir %s", p)
|
||||
if e := os.MkdirAll(p, 0777); e != nil {
|
||||
return e
|
||||
}
|
||||
}
|
||||
|
||||
if e := ioutil.WriteFile(p+"/"+f, rt, 0666); e != nil {
|
||||
return e
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Template) Write(conf interface{}) ([]byte, error) {
|
||||
tmplBuf := t.bp.Get()
|
||||
defer t.bp.Put(tmplBuf)
|
||||
|
||||
outCmdBuf := t.bp.Get()
|
||||
defer t.bp.Put(outCmdBuf)
|
||||
|
||||
if err := t.tmpl.Execute(tmplBuf, conf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tmplBuf.Bytes(), nil
|
||||
}
|
||||
|
||||
func isExists(f string) bool {
|
||||
_, err := os.Stat(f)
|
||||
if err != nil {
|
||||
if os.IsExist(err) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
@ -46,6 +46,7 @@ type ClusterClient interface {
|
||||
GetOptions() *option.Conf
|
||||
GetEndpoints(key string) []string
|
||||
SetEndpoints(key string, value []string)
|
||||
DelEndpoints(key string)
|
||||
}
|
||||
|
||||
//NewClusterClient new cluster client
|
||||
@ -124,7 +125,6 @@ func (e *etcdClusterClient) GetEndpoints(key string) (result []string) {
|
||||
|
||||
func (e *etcdClusterClient) SetEndpoints(key string, value []string) {
|
||||
key = "/rainbond/endpoint/" + key
|
||||
logrus.Infof("Put endpoints %s => %v", key, value)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@ -141,6 +141,19 @@ func (e *etcdClusterClient) SetEndpoints(key string, value []string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *etcdClusterClient) DelEndpoints(key string) {
|
||||
key = "/rainbond/endpoint/" + key
|
||||
logrus.Infof("Delete endpoints: %s", key)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
_, err := e.conf.EtcdCli.Delete(ctx, key)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to put endpoint for %s: %v", key, Error)
|
||||
}
|
||||
}
|
||||
|
||||
//ErrorNotFound node not found.
|
||||
var ErrorNotFound = fmt.Errorf("node not found")
|
||||
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// ArgsReg -
|
||||
ArgsReg = regexp.MustCompile(`\$\{(\w+)\|{0,1}(.{0,1})\}`)
|
||||
)
|
||||
|
||||
@ -103,6 +104,7 @@ func (m *ManagerService) Stop() error {
|
||||
func (m *ManagerService) Online() error {
|
||||
logrus.Info("Doing node online by node controller manager")
|
||||
if ok := m.ctr.CheckBeforeStart(); !ok {
|
||||
logrus.Debug("check before starting: false")
|
||||
return nil
|
||||
}
|
||||
go m.StartServices()
|
||||
@ -158,16 +160,22 @@ func (m *ManagerService) Offline() error {
|
||||
|
||||
//DownOneServiceEndpoint down service endpoint
|
||||
func (m *ManagerService) DownOneServiceEndpoint(s *service.Service) {
|
||||
HostIP := m.cluster.GetOptions().HostIP
|
||||
hostIP := m.cluster.GetOptions().HostIP
|
||||
for _, end := range s.Endpoints {
|
||||
logrus.Debug("Anti-registry endpoint: ", end.Name)
|
||||
endpoint := toEndpoint(end, HostIP)
|
||||
oldEndpoints := m.cluster.GetEndpoints(end.Name)
|
||||
key := end.Name + "/" + hostIP
|
||||
endpoint := toEndpoint(end, hostIP)
|
||||
oldEndpoints := m.cluster.GetEndpoints(key)
|
||||
if exist := isExistEndpoint(oldEndpoints, endpoint); exist {
|
||||
m.cluster.SetEndpoints(end.Name, rmEndpointFrom(oldEndpoints, endpoint))
|
||||
endpoints := rmEndpointFrom(oldEndpoints, endpoint)
|
||||
if len(endpoints) > 0 {
|
||||
m.cluster.SetEndpoints(key, endpoints)
|
||||
continue
|
||||
}
|
||||
m.cluster.DelEndpoints(key)
|
||||
}
|
||||
}
|
||||
logrus.Infof("node %s down service %s endpoints", HostIP, s.Name)
|
||||
logrus.Infof("node %s down service %s endpoints", hostIP, s.Name)
|
||||
}
|
||||
|
||||
//UpOneServiceEndpoint up service endpoint
|
||||
@ -186,11 +194,11 @@ func (m *ManagerService) UpOneServiceEndpoint(s *service.Service) {
|
||||
endpoint := toEndpoint(end, hostIP)
|
||||
m.cluster.SetEndpoints(key, []string{endpoint})
|
||||
}
|
||||
logrus.Infof("node %s up service %s endpoints", hostIP, s.Name)
|
||||
}
|
||||
|
||||
//SyncServiceStatusController synchronize all service status to as we expect
|
||||
func (m *ManagerService) SyncServiceStatusController() {
|
||||
logrus.Debug("run SyncServiceStatusController")
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
if m.autoStatusController != nil && len(m.autoStatusController) > 0 {
|
||||
@ -263,6 +271,7 @@ type statusController struct {
|
||||
}
|
||||
|
||||
func (s *statusController) Run() {
|
||||
logrus.Info("run status controller")
|
||||
s.healthyManager.EnableWatcher(s.service.Name, s.watcher.GetID())
|
||||
defer s.watcher.Close()
|
||||
defer s.healthyManager.DisableWatcher(s.service.Name, s.watcher.GetID())
|
||||
@ -295,6 +304,7 @@ func (s *statusController) Stop() {
|
||||
s.cancel()
|
||||
}
|
||||
|
||||
// StopSyncService -
|
||||
func (m *ManagerService) StopSyncService() {
|
||||
if m.syncCtx != nil {
|
||||
m.syncCancel()
|
||||
@ -323,6 +333,7 @@ func (m *ManagerService) WaitStart(name string, duration time.Duration) bool {
|
||||
}
|
||||
}
|
||||
|
||||
// ReLoadServices -
|
||||
/*
|
||||
1. reload services info from local file system
|
||||
2. regenerate systemd config file and restart with config changes
|
||||
|
@ -236,7 +236,9 @@ func (n *NodeManager) heartbeat() {
|
||||
if err := n.cluster.UpdateStatus(n.currentNode, n.getInitLable(n.currentNode)); err != nil {
|
||||
logrus.Errorf("update node status error %s", err.Error())
|
||||
}
|
||||
logrus.Infof("Send node %s heartbeat to master:%s ", n.currentNode.ID, n.currentNode.NodeStatus.Status)
|
||||
if n.currentNode.NodeStatus.Status != "running" {
|
||||
logrus.Infof("Send node %s heartbeat to master:%s ", n.currentNode.ID, n.currentNode.NodeStatus.Status)
|
||||
}
|
||||
return nil
|
||||
}, time.Second*time.Duration(n.cfg.TTL))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user