diff --git a/cmd/gateway/server/server.go b/cmd/gateway/server/server.go index 56b881a67..3141cf50d 100644 --- a/cmd/gateway/server/server.go +++ b/cmd/gateway/server/server.go @@ -44,6 +44,7 @@ func Run(s *option.GWServer) error { reg := prometheus.NewRegistry() reg.MustRegister(prometheus.NewGoCollector()) + // TODO //reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{ // PidFn: func() (int, error) { return os.Getpid(), nil }, // ReportErrors: true, diff --git a/gateway/controller/controller.go b/gateway/controller/controller.go index e14a01ae6..09c18326e 100644 --- a/gateway/controller/controller.go +++ b/gateway/controller/controller.go @@ -44,7 +44,6 @@ var rbdemap = make(map[string]struct{}) func init() { rbdemap["APISERVER_ENDPOINTS"] = struct{}{} - rbdemap["APP_UI_ENDPOINTS"] = struct{}{} rbdemap["HUB_ENDPOINTS"] = struct{}{} rbdemap["REPO_ENDPOINTS"] = struct{}{} } @@ -178,7 +177,7 @@ func (gwc *GWController) syncGateway(key interface{}) error { // refreshPools refresh pools dynamically. func (gwc *GWController) refreshPools(pools []*v1.Pool) { - if err := gwc.GWS.UpdatePools(pools); err != nil { + if err := gwc.GWS.UpdatePools(pools, nil); err != nil { logrus.Warningf("error updating pools: %v", err) } } @@ -208,11 +207,11 @@ func (gwc *GWController) getDelUpdPools(updPools []*v1.Pool) ([]*v1.Pool, []*v1. //NewGWController new Gateway controller func NewGWController(ctx context.Context, cfg *option.Config, mc metric.Collector) (*GWController, error) { gwc := &GWController{ - updateCh: channels.NewRingChannel(1024), - stopLock: &sync.Mutex{}, - stopCh: make(chan struct{}), - ocfg: cfg, - ctx: ctx, + updateCh: channels.NewRingChannel(1024), + stopLock: &sync.Mutex{}, + stopCh: make(chan struct{}), + ocfg: cfg, + ctx: ctx, metricCollector: mc, } @@ -248,6 +247,7 @@ func (gwc *GWController) initRbdEndpoints(errCh chan<- error) { // get endpoints for etcd gwc.watchRbdEndpoints(gwc.listEndpoints()) } + func (gwc *GWController) listEndpoints() int64 { // get endpoints for etcd resp, err := gwc.EtcdCli.Get(gwc.ctx, gwc.ocfg.RbdEndpointsKey, client.WithPrefix()) @@ -255,9 +255,9 @@ func (gwc *GWController) listEndpoints() int64 { logrus.Errorf("get rainbond service endpoint from etcd error %s", err.Error()) return 0 } - var pools []*v1.Pool + var hpools []*v1.Pool // http pools + var tpools []*v1.Pool // tcp pools for _, kv := range resp.Kvs { - //logrus.Debugf("key: %s; value: %s\n", string(kv.Key), string(kv.Value)) key := strings.Replace(string(kv.Key), gwc.ocfg.RbdEndpointsKey, "", -1) // skip unexpected key if _, ok := rbdemap[key]; !ok { @@ -274,24 +274,28 @@ func (gwc *GWController) listEndpoints() int64 { lpools := getPool(data, "lang", "maven") if lpools[0].Nodes == nil || len(lpools[0].Nodes) == 0 { logrus.Debug("there is no endpoints for REPO_ENDPOINTS") - continue } - pools = append(pools, lpools...) + hpools = append(hpools, lpools...) case "HUB_ENDPOINTS": lpools := getPool(data, "registry") if lpools[0].Nodes == nil || len(lpools[0].Nodes) == 0 { logrus.Debug("there is no endpoints for REPO_ENDPOINTS") - continue } - pools = append(pools, lpools...) + hpools = append(hpools, lpools...) + case "APISERVER_ENDPOINTS": + apools := getPool(data, "kube_apiserver") + if apools[0].Nodes == nil || len(apools[0].Nodes) == 0 { + logrus.Debug("there is no endpoints for REPO_ENDPOINTS") + } + tpools = append(tpools, apools...) } } - gwc.rrbdp = pools + gwc.rrbdp = hpools //merge app pool - pools = append(pools, gwc.rhp...) - if err := gwc.GWS.UpdatePools(pools); err != nil { - logrus.Errorf("update pools failure %s", err.Error()) + hpools = append(hpools, gwc.rhp...) + if err := gwc.GWS.UpdatePools(hpools, tpools); err != nil { + logrus.Errorf("update hpools failure %s", err.Error()) } if resp.Header != nil { return resp.Header.Revision @@ -305,8 +309,11 @@ func (gwc *GWController) watchRbdEndpoints(version int64) { rch := gwc.EtcdCli.Watch(gwc.ctx, gwc.ocfg.RbdEndpointsKey, client.WithPrefix(), client.WithRev(version+1)) for wresp := range rch { for _, ev := range wresp.Events { - // APISERVER_ENDPOINTS(external), APP_UI_ENDPOINTS, HUB_ENDPOINTS, REPO_ENDPOINTS key := strings.Replace(string(ev.Kv.Key), gwc.ocfg.RbdEndpointsKey, "", -1) + if key == "APISERVER_ENDPOINTS" { + logrus.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) + + } if key == "REPO_ENDPOINTS" || key == "HUB_ENDPOINTS" { logrus.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) //only need update one diff --git a/gateway/controller/openresty/rbd_endpoints.go b/gateway/controller/openresty/rbd_endpoints.go index b6d477d2e..7c8ec9c2a 100644 --- a/gateway/controller/openresty/rbd_endpoints.go +++ b/gateway/controller/openresty/rbd_endpoints.go @@ -6,9 +6,9 @@ import ( "github.com/goodrain/rainbond/gateway/v1" ) -func langGoodrainMe(ip string) (*model.Server, *model.Upstream) { +func langGoodrainMe(ip string) *model.Server { svr := &model.Server{ - Listen: fmt.Sprintf("%s:%d", ip, 80), // TODO: change ip address + Listen: fmt.Sprintf("%s:%d", ip, 80), ServerName: "lang.goodrain.me", Rewrites: []model.Rewrite{ { @@ -38,13 +38,10 @@ func langGoodrainMe(ip string) (*model.Server, *model.Upstream) { }, }, } - us := &model.Upstream{ - Name: "lang", - } - return svr, us + return svr } -func mavenGoodrainMe(ip string) (*model.Server, *model.Upstream) { +func mavenGoodrainMe(ip string) *model.Server { svr := &model.Server{ Listen: fmt.Sprintf("%s:%d", ip, 80), ServerName: "maven.goodrain.me", @@ -81,13 +78,10 @@ func mavenGoodrainMe(ip string) (*model.Server, *model.Upstream) { }, }, } - us := &model.Upstream{ - Name: "maven", - } - return svr, us + return svr } -func goodrainMe(cfgPath string, ip string) (*model.Server, *model.Upstream) { +func goodrainMe(cfgPath string, ip string) *model.Server { svr := &model.Server{ Listen: fmt.Sprintf("%s:%d %s", ip, 443, "ssl"), ServerName: "goodrain.me", @@ -122,8 +116,23 @@ func goodrainMe(cfgPath string, ip string) (*model.Server, *model.Upstream) { }, }, } - us := &model.Upstream{ - Name: "registry", - } - return svr, us + return svr } + + +func kubeApiserver() (*model.Server) { + svr := &model.Server{ + Listen: fmt.Sprintf("%s:%d", "127.0.0.1", 6443), + ProxyPass: "kube_apiserver", + ProxyTimeout: model.Time{ + Num: 10, + Unit: "m", + }, + ProxyConnectTimeout: model.Time{ + Num: 10, + Unit: "m", + }, + } + + return svr +} \ No newline at end of file diff --git a/gateway/controller/openresty/service.go b/gateway/controller/openresty/service.go index 173332664..309800f26 100644 --- a/gateway/controller/openresty/service.go +++ b/gateway/controller/openresty/service.go @@ -78,7 +78,7 @@ type Server struct { } // Start starts nginx -func (osvc *OrService) Start(errCh chan error) { +func (o *OrService) Start(errCh chan error) { defaultNginxConf = path.Join(template.CustomConfigPath, "nginx.conf") // delete the old configuration if !util.DirIsEmpty(template.CustomConfigPath) { @@ -92,14 +92,14 @@ func (osvc *OrService) Start(errCh chan error) { os.RemoveAll(defaultNginxConf) } // generate default nginx.conf - nginx := model.NewNginx(*osvc.ocfg) - nginx.HTTP = model.NewHTTP(osvc.ocfg) + nginx := model.NewNginx(*o.ocfg) + nginx.HTTP = model.NewHTTP(o.ocfg) if err := template.NewNginxTemplate(nginx, defaultNginxConf); err != nil { errCh <- fmt.Errorf("Can't not new nginx ocfg: %s", err.Error()) return } - if osvc.ocfg.EnableRbdEndpoints { - if err := osvc.newRbdServers(); err != nil { + if o.ocfg.EnableRbdEndpoints { + if err := o.newRbdServers(); err != nil { errCh <- err // TODO: consider if it is right } } @@ -111,7 +111,7 @@ func (osvc *OrService) Start(errCh chan error) { errCh <- err return } - osvc.nginxProgress = cmd.Process + o.nginxProgress = cmd.Process go func() { if err := cmd.Wait(); err != nil { errCh <- err @@ -121,11 +121,11 @@ func (osvc *OrService) Start(errCh chan error) { } // Stop gracefully stops the NGINX master process. -func (osvc *OrService) Stop() error { +func (o *OrService) Stop() error { // send stop signal to NGINX logrus.Info("Stopping NGINX process") - if osvc.nginxProgress != nil { - if err := osvc.nginxProgress.Signal(syscall.SIGTERM); err != nil { + if o.nginxProgress != nil { + if err := o.nginxProgress.Signal(syscall.SIGTERM); err != nil { return err } } @@ -133,8 +133,8 @@ func (osvc *OrService) Stop() error { } // PersistConfig persists ocfg -func (osvc *OrService) PersistConfig(conf *v1.Config) error { - if err := osvc.persistUpstreams(conf.TCPPools, "upstreams-tcp.tmpl", template.CustomConfigPath, "stream/upstreams.conf"); err != nil { +func (o *OrService) PersistConfig(conf *v1.Config) error { + if err := o.persistUpstreams(conf.TCPPools, "upstreams-tcp.tmpl", template.CustomConfigPath, "stream/upstreams.conf"); err != nil { logrus.Errorf("fail to persist tcp upstreams.conf") } @@ -173,7 +173,7 @@ func (osvc *OrService) PersistConfig(conf *v1.Config) error { } // persistUpstreams persists upstreams -func (osvc *OrService) persistUpstreams(pools []*v1.Pool, tmpl string, path string, filename string) error { +func (o *OrService) persistUpstreams(pools []*v1.Pool, tmpl string, path string, filename string) error { var upstreams []*model.Upstream for _, pool := range pools { upstream := &model.Upstream{} @@ -238,15 +238,23 @@ func getNgxServer(conf *v1.Config) (l7srv []*model.Server, l4srv []*model.Server } // UpdatePools updates http upstreams dynamically. -func (osvc *OrService) UpdatePools(pools []*v1.Pool) error { - if len(pools) == 0 { +func (o *OrService) UpdatePools(hpools []*v1.Pool, tpools []*v1.Pool) error { + if len(tpools) > 0 { + err := o.persistUpstreams(tpools, "upstreams-tcp.tmpl", "/run/nginx/rainbond/stream", + "upstream.default.tcp.conf") + if err != nil { + logrus.Warningf("error updating upstream.default.tcp.conf") + } + } + + if hpools == nil || len(hpools) == 0 { return nil } var backends []*model.Backend - for _, pool := range pools { + for _, pool := range hpools { backends = append(backends, model.CreateBackendByPool(pool)) } - return osvc.updateBackends(backends) + return o.updateBackends(backends) } // updateUpstreams updates the upstreams in ngx.shared.dict by post @@ -298,12 +306,17 @@ func (o *OrService) WaitPluginReady() { } // newRbdServers creates new configuration file for Rainbond servers -func (osvc *OrService) newRbdServers() error { +func (o *OrService) newRbdServers() error { cfgPath := "/run/nginx/rainbond" - httpCfgPath := fmt.Sprintf("%s/%s", cfgPath, "/http") + httpCfgPath := fmt.Sprintf("%s/%s", cfgPath, "http") + tcpCfgPath := fmt.Sprintf("%s/%s", cfgPath, "stream") // delete the old configuration - if err := os.RemoveAll(cfgPath + "httpCfgPath"); err != nil { - logrus.Errorf("Cant not remove directory(%s): %v", cfgPath, err) + if err := os.RemoveAll(httpCfgPath); err != nil { + logrus.Errorf("Cant not remove directory(%s): %v", httpCfgPath, err) + return err + } + if err := os.RemoveAll(tcpCfgPath); err != nil { + logrus.Errorf("Cant not remove directory(%s): %v", tcpCfgPath, err) return err } @@ -313,14 +326,41 @@ func (osvc *OrService) newRbdServers() error { return err } - lesrv, _ := langGoodrainMe(osvc.ocfg.RBDServerInIP) - mesrv, _ := mavenGoodrainMe(osvc.ocfg.RBDServerInIP) - gesrv, _ := goodrainMe(cfgPath, osvc.ocfg.RBDServerInIP) - if err := template.NewServerTemplateWithCfgPath([]*model.Server{ - lesrv, - mesrv, - gesrv, - }, httpCfgPath, "servers.default.http.conf"); err != nil { + lesrv := langGoodrainMe(o.ocfg.RBDServerInIP) + mesrv := mavenGoodrainMe(o.ocfg.RBDServerInIP) + gesrv := goodrainMe(cfgPath, o.ocfg.RBDServerInIP) + if err := template.NewServerTemplateWithCfgPath( + []*model.Server{ + lesrv, + mesrv, + gesrv, + }, httpCfgPath, "servers.default.http.conf"); err != nil { + return err + } + + ksrv := kubeApiserver() + if err := template.NewServerTemplateWithCfgPath( + []*model.Server{ + ksrv, + }, tcpCfgPath, "server.default.tcp.conf"); err != nil { + return err + } + dummyUpstream := &model.Upstream{ + Name: "kube_apiserver", + Servers: []model.UServer{ + { + Address: "0.0.0.1:65535", // placeholder + Params: model.Params{ + Weight: 1, + }, + }, + }, + } + if err := template.NewUpstreamTemplateWithCfgPath( + []*model.Upstream{dummyUpstream}, + "upstreams-tcp.tmpl", + tcpCfgPath, + "upstream.default.tcp.conf"); err != nil { return err } diff --git a/gateway/controller/service.go b/gateway/controller/service.go index 777471b0d..71dc22790 100644 --- a/gateway/controller/service.go +++ b/gateway/controller/service.go @@ -27,6 +27,6 @@ type GWServicer interface { Stop() error Check() error PersistConfig(conf *v1.Config) error - UpdatePools(pools []*v1.Pool) error + UpdatePools(hpools []*v1.Pool, tpools []*v1.Pool) error WaitPluginReady() }