[ADD] watch /rainbond/endpoint/APISERVER_ENDPOINTS

This commit is contained in:
GLYASAI 2018-12-25 11:26:22 +08:00
parent b2f31e127a
commit 4011971b46
5 changed files with 120 additions and 63 deletions

View File

@ -44,6 +44,7 @@ func Run(s *option.GWServer) error {
reg := prometheus.NewRegistry()
reg.MustRegister(prometheus.NewGoCollector())
// TODO
//reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{
// PidFn: func() (int, error) { return os.Getpid(), nil },
// ReportErrors: true,

View File

@ -44,7 +44,6 @@ var rbdemap = make(map[string]struct{})
func init() {
rbdemap["APISERVER_ENDPOINTS"] = struct{}{}
rbdemap["APP_UI_ENDPOINTS"] = struct{}{}
rbdemap["HUB_ENDPOINTS"] = struct{}{}
rbdemap["REPO_ENDPOINTS"] = struct{}{}
}
@ -178,7 +177,7 @@ func (gwc *GWController) syncGateway(key interface{}) error {
// refreshPools refresh pools dynamically.
func (gwc *GWController) refreshPools(pools []*v1.Pool) {
if err := gwc.GWS.UpdatePools(pools); err != nil {
if err := gwc.GWS.UpdatePools(pools, nil); err != nil {
logrus.Warningf("error updating pools: %v", err)
}
}
@ -208,11 +207,11 @@ func (gwc *GWController) getDelUpdPools(updPools []*v1.Pool) ([]*v1.Pool, []*v1.
//NewGWController new Gateway controller
func NewGWController(ctx context.Context, cfg *option.Config, mc metric.Collector) (*GWController, error) {
gwc := &GWController{
updateCh: channels.NewRingChannel(1024),
stopLock: &sync.Mutex{},
stopCh: make(chan struct{}),
ocfg: cfg,
ctx: ctx,
updateCh: channels.NewRingChannel(1024),
stopLock: &sync.Mutex{},
stopCh: make(chan struct{}),
ocfg: cfg,
ctx: ctx,
metricCollector: mc,
}
@ -248,6 +247,7 @@ func (gwc *GWController) initRbdEndpoints(errCh chan<- error) {
// get endpoints for etcd
gwc.watchRbdEndpoints(gwc.listEndpoints())
}
func (gwc *GWController) listEndpoints() int64 {
// get endpoints for etcd
resp, err := gwc.EtcdCli.Get(gwc.ctx, gwc.ocfg.RbdEndpointsKey, client.WithPrefix())
@ -255,9 +255,9 @@ func (gwc *GWController) listEndpoints() int64 {
logrus.Errorf("get rainbond service endpoint from etcd error %s", err.Error())
return 0
}
var pools []*v1.Pool
var hpools []*v1.Pool // http pools
var tpools []*v1.Pool // tcp pools
for _, kv := range resp.Kvs {
//logrus.Debugf("key: %s; value: %s\n", string(kv.Key), string(kv.Value))
key := strings.Replace(string(kv.Key), gwc.ocfg.RbdEndpointsKey, "", -1)
// skip unexpected key
if _, ok := rbdemap[key]; !ok {
@ -274,24 +274,28 @@ func (gwc *GWController) listEndpoints() int64 {
lpools := getPool(data, "lang", "maven")
if lpools[0].Nodes == nil || len(lpools[0].Nodes) == 0 {
logrus.Debug("there is no endpoints for REPO_ENDPOINTS")
continue
}
pools = append(pools, lpools...)
hpools = append(hpools, lpools...)
case "HUB_ENDPOINTS":
lpools := getPool(data, "registry")
if lpools[0].Nodes == nil || len(lpools[0].Nodes) == 0 {
logrus.Debug("there is no endpoints for REPO_ENDPOINTS")
continue
}
pools = append(pools, lpools...)
hpools = append(hpools, lpools...)
case "APISERVER_ENDPOINTS":
apools := getPool(data, "kube_apiserver")
if apools[0].Nodes == nil || len(apools[0].Nodes) == 0 {
logrus.Debug("there is no endpoints for REPO_ENDPOINTS")
}
tpools = append(tpools, apools...)
}
}
gwc.rrbdp = pools
gwc.rrbdp = hpools
//merge app pool
pools = append(pools, gwc.rhp...)
if err := gwc.GWS.UpdatePools(pools); err != nil {
logrus.Errorf("update pools failure %s", err.Error())
hpools = append(hpools, gwc.rhp...)
if err := gwc.GWS.UpdatePools(hpools, tpools); err != nil {
logrus.Errorf("update hpools failure %s", err.Error())
}
if resp.Header != nil {
return resp.Header.Revision
@ -305,8 +309,11 @@ func (gwc *GWController) watchRbdEndpoints(version int64) {
rch := gwc.EtcdCli.Watch(gwc.ctx, gwc.ocfg.RbdEndpointsKey, client.WithPrefix(), client.WithRev(version+1))
for wresp := range rch {
for _, ev := range wresp.Events {
// APISERVER_ENDPOINTS(external), APP_UI_ENDPOINTS, HUB_ENDPOINTS, REPO_ENDPOINTS
key := strings.Replace(string(ev.Kv.Key), gwc.ocfg.RbdEndpointsKey, "", -1)
if key == "APISERVER_ENDPOINTS" {
logrus.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
if key == "REPO_ENDPOINTS" || key == "HUB_ENDPOINTS" {
logrus.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
//only need update one

View File

@ -6,9 +6,9 @@ import (
"github.com/goodrain/rainbond/gateway/v1"
)
func langGoodrainMe(ip string) (*model.Server, *model.Upstream) {
func langGoodrainMe(ip string) *model.Server {
svr := &model.Server{
Listen: fmt.Sprintf("%s:%d", ip, 80), // TODO: change ip address
Listen: fmt.Sprintf("%s:%d", ip, 80),
ServerName: "lang.goodrain.me",
Rewrites: []model.Rewrite{
{
@ -38,13 +38,10 @@ func langGoodrainMe(ip string) (*model.Server, *model.Upstream) {
},
},
}
us := &model.Upstream{
Name: "lang",
}
return svr, us
return svr
}
func mavenGoodrainMe(ip string) (*model.Server, *model.Upstream) {
func mavenGoodrainMe(ip string) *model.Server {
svr := &model.Server{
Listen: fmt.Sprintf("%s:%d", ip, 80),
ServerName: "maven.goodrain.me",
@ -81,13 +78,10 @@ func mavenGoodrainMe(ip string) (*model.Server, *model.Upstream) {
},
},
}
us := &model.Upstream{
Name: "maven",
}
return svr, us
return svr
}
func goodrainMe(cfgPath string, ip string) (*model.Server, *model.Upstream) {
func goodrainMe(cfgPath string, ip string) *model.Server {
svr := &model.Server{
Listen: fmt.Sprintf("%s:%d %s", ip, 443, "ssl"),
ServerName: "goodrain.me",
@ -122,8 +116,23 @@ func goodrainMe(cfgPath string, ip string) (*model.Server, *model.Upstream) {
},
},
}
us := &model.Upstream{
Name: "registry",
}
return svr, us
return svr
}
func kubeApiserver() (*model.Server) {
svr := &model.Server{
Listen: fmt.Sprintf("%s:%d", "127.0.0.1", 6443),
ProxyPass: "kube_apiserver",
ProxyTimeout: model.Time{
Num: 10,
Unit: "m",
},
ProxyConnectTimeout: model.Time{
Num: 10,
Unit: "m",
},
}
return svr
}

View File

@ -78,7 +78,7 @@ type Server struct {
}
// Start starts nginx
func (osvc *OrService) Start(errCh chan error) {
func (o *OrService) Start(errCh chan error) {
defaultNginxConf = path.Join(template.CustomConfigPath, "nginx.conf")
// delete the old configuration
if !util.DirIsEmpty(template.CustomConfigPath) {
@ -92,14 +92,14 @@ func (osvc *OrService) Start(errCh chan error) {
os.RemoveAll(defaultNginxConf)
}
// generate default nginx.conf
nginx := model.NewNginx(*osvc.ocfg)
nginx.HTTP = model.NewHTTP(osvc.ocfg)
nginx := model.NewNginx(*o.ocfg)
nginx.HTTP = model.NewHTTP(o.ocfg)
if err := template.NewNginxTemplate(nginx, defaultNginxConf); err != nil {
errCh <- fmt.Errorf("Can't not new nginx ocfg: %s", err.Error())
return
}
if osvc.ocfg.EnableRbdEndpoints {
if err := osvc.newRbdServers(); err != nil {
if o.ocfg.EnableRbdEndpoints {
if err := o.newRbdServers(); err != nil {
errCh <- err // TODO: consider if it is right
}
}
@ -111,7 +111,7 @@ func (osvc *OrService) Start(errCh chan error) {
errCh <- err
return
}
osvc.nginxProgress = cmd.Process
o.nginxProgress = cmd.Process
go func() {
if err := cmd.Wait(); err != nil {
errCh <- err
@ -121,11 +121,11 @@ func (osvc *OrService) Start(errCh chan error) {
}
// Stop gracefully stops the NGINX master process.
func (osvc *OrService) Stop() error {
func (o *OrService) Stop() error {
// send stop signal to NGINX
logrus.Info("Stopping NGINX process")
if osvc.nginxProgress != nil {
if err := osvc.nginxProgress.Signal(syscall.SIGTERM); err != nil {
if o.nginxProgress != nil {
if err := o.nginxProgress.Signal(syscall.SIGTERM); err != nil {
return err
}
}
@ -133,8 +133,8 @@ func (osvc *OrService) Stop() error {
}
// PersistConfig persists ocfg
func (osvc *OrService) PersistConfig(conf *v1.Config) error {
if err := osvc.persistUpstreams(conf.TCPPools, "upstreams-tcp.tmpl", template.CustomConfigPath, "stream/upstreams.conf"); err != nil {
func (o *OrService) PersistConfig(conf *v1.Config) error {
if err := o.persistUpstreams(conf.TCPPools, "upstreams-tcp.tmpl", template.CustomConfigPath, "stream/upstreams.conf"); err != nil {
logrus.Errorf("fail to persist tcp upstreams.conf")
}
@ -173,7 +173,7 @@ func (osvc *OrService) PersistConfig(conf *v1.Config) error {
}
// persistUpstreams persists upstreams
func (osvc *OrService) persistUpstreams(pools []*v1.Pool, tmpl string, path string, filename string) error {
func (o *OrService) persistUpstreams(pools []*v1.Pool, tmpl string, path string, filename string) error {
var upstreams []*model.Upstream
for _, pool := range pools {
upstream := &model.Upstream{}
@ -238,15 +238,23 @@ func getNgxServer(conf *v1.Config) (l7srv []*model.Server, l4srv []*model.Server
}
// UpdatePools updates http upstreams dynamically.
func (osvc *OrService) UpdatePools(pools []*v1.Pool) error {
if len(pools) == 0 {
func (o *OrService) UpdatePools(hpools []*v1.Pool, tpools []*v1.Pool) error {
if len(tpools) > 0 {
err := o.persistUpstreams(tpools, "upstreams-tcp.tmpl", "/run/nginx/rainbond/stream",
"upstream.default.tcp.conf")
if err != nil {
logrus.Warningf("error updating upstream.default.tcp.conf")
}
}
if hpools == nil || len(hpools) == 0 {
return nil
}
var backends []*model.Backend
for _, pool := range pools {
for _, pool := range hpools {
backends = append(backends, model.CreateBackendByPool(pool))
}
return osvc.updateBackends(backends)
return o.updateBackends(backends)
}
// updateUpstreams updates the upstreams in ngx.shared.dict by post
@ -298,12 +306,17 @@ func (o *OrService) WaitPluginReady() {
}
// newRbdServers creates new configuration file for Rainbond servers
func (osvc *OrService) newRbdServers() error {
func (o *OrService) newRbdServers() error {
cfgPath := "/run/nginx/rainbond"
httpCfgPath := fmt.Sprintf("%s/%s", cfgPath, "/http")
httpCfgPath := fmt.Sprintf("%s/%s", cfgPath, "http")
tcpCfgPath := fmt.Sprintf("%s/%s", cfgPath, "stream")
// delete the old configuration
if err := os.RemoveAll(cfgPath + "httpCfgPath"); err != nil {
logrus.Errorf("Cant not remove directory(%s): %v", cfgPath, err)
if err := os.RemoveAll(httpCfgPath); err != nil {
logrus.Errorf("Cant not remove directory(%s): %v", httpCfgPath, err)
return err
}
if err := os.RemoveAll(tcpCfgPath); err != nil {
logrus.Errorf("Cant not remove directory(%s): %v", tcpCfgPath, err)
return err
}
@ -313,14 +326,41 @@ func (osvc *OrService) newRbdServers() error {
return err
}
lesrv, _ := langGoodrainMe(osvc.ocfg.RBDServerInIP)
mesrv, _ := mavenGoodrainMe(osvc.ocfg.RBDServerInIP)
gesrv, _ := goodrainMe(cfgPath, osvc.ocfg.RBDServerInIP)
if err := template.NewServerTemplateWithCfgPath([]*model.Server{
lesrv,
mesrv,
gesrv,
}, httpCfgPath, "servers.default.http.conf"); err != nil {
lesrv := langGoodrainMe(o.ocfg.RBDServerInIP)
mesrv := mavenGoodrainMe(o.ocfg.RBDServerInIP)
gesrv := goodrainMe(cfgPath, o.ocfg.RBDServerInIP)
if err := template.NewServerTemplateWithCfgPath(
[]*model.Server{
lesrv,
mesrv,
gesrv,
}, httpCfgPath, "servers.default.http.conf"); err != nil {
return err
}
ksrv := kubeApiserver()
if err := template.NewServerTemplateWithCfgPath(
[]*model.Server{
ksrv,
}, tcpCfgPath, "server.default.tcp.conf"); err != nil {
return err
}
dummyUpstream := &model.Upstream{
Name: "kube_apiserver",
Servers: []model.UServer{
{
Address: "0.0.0.1:65535", // placeholder
Params: model.Params{
Weight: 1,
},
},
},
}
if err := template.NewUpstreamTemplateWithCfgPath(
[]*model.Upstream{dummyUpstream},
"upstreams-tcp.tmpl",
tcpCfgPath,
"upstream.default.tcp.conf"); err != nil {
return err
}

View File

@ -27,6 +27,6 @@ type GWServicer interface {
Stop() error
Check() error
PersistConfig(conf *v1.Config) error
UpdatePools(pools []*v1.Pool) error
UpdatePools(hpools []*v1.Pool, tpools []*v1.Pool) error
WaitPluginReady()
}