[ADD] add some default rainbond endpoints

This commit is contained in:
GLYASAI 2018-12-01 10:26:35 +08:00
parent c18adef936
commit f39f6fee27
8 changed files with 500 additions and 126 deletions

View File

@ -38,7 +38,12 @@ func NewGWServer() *GWServer {
//Config contains all configuration
type Config struct {
K8SConfPath string
K8SConfPath string
EnableRbdEndpoints bool
RbdEndpointsKey string // key of Rainbond endpoints in ETCD
EtcdEndPoints []string
EtcdTimeout int
ListenPorts ListenPorts
//This number should be, at maximum, the number of CPU cores on your system.
WorkerProcesses int
@ -66,6 +71,8 @@ type ListenPorts struct {
func (g *GWServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&g.LogLevel, "log-level", "debug", "the gateway log level")
fs.StringVar(&g.K8SConfPath, "kube-conf", "/opt/rainbond/etc/kubernetes/kubecfg/admin.kubeconfig", "absolute path to the kubeconfig file")
fs.BoolVar(&g.EnableRbdEndpoints, "enable-rbd-endpoints", true, "switch of Rainbond endpoints")
fs.StringVar(&g.RbdEndpointsKey, "rbd-endpoints", "/rainbond/endpoint/", "key of Rainbond endpoints in ETCD")
fs.IntVar(&g.ListenPorts.AuxiliaryPort, "auxiliary-port", 10253, "port of auxiliary server")
fs.IntVar(&g.WorkerProcesses, "worker-processes", 0, "Default get current compute cpu core number.This number should be, at maximum, the number of CPU cores on your system.")
fs.IntVar(&g.WorkerConnections, "worker-connections", 4000, "Determines how many clients will be served by each worker process.")
@ -77,6 +84,9 @@ func (g *GWServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&g.KeepaliveRequests, "keepalive-requests", 100000, "Number of requests a client can make over the keep-alive connection. This is set high for testing.")
fs.IntVar(&g.KeepaliveTimeout, "keepalive-timeout", 30, "Timeout for keep-alive connections. Server will close connections after this time.")
fs.StringVar(&g.IP, "ip", "0.0.0.0", "Node ip.") // TODO: more detail
// etcd
fs.StringSliceVar(&g.EtcdEndPoints, "etcd-endpoints", []string{"http://127.0.0.1:2379"}, "etcd cluster endpoints.")
fs.IntVar(&g.EtcdTimeout, "etcd-timeout", 5, "etcd http timeout seconds")
}
// SetLog sets log

View File

@ -19,6 +19,7 @@
package server
import (
"context"
"fmt"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/cmd/gateway/option"
@ -32,12 +33,13 @@ import (
func Run(s *option.GWServer) error {
errCh := make(chan error)
gwc := controller.NewGWController(
&s.Config,
errCh)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
gwc := controller.NewGWController(ctx, &s.Config, errCh)
if gwc == nil {
return fmt.Errorf("fail to new GWController")
}
defer gwc.EtcdCli.Close()
if err := gwc.Start(); err != nil {
return err

View File

@ -1,13 +1,17 @@
package controller
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
client "github.com/coreos/etcd/clientv3"
"github.com/eapache/channels"
"github.com/golang/glog"
"github.com/goodrain/rainbond/cmd/gateway/option"
"github.com/goodrain/rainbond/gateway/controller/openresty"
"github.com/goodrain/rainbond/gateway/store"
@ -16,10 +20,15 @@ import (
"k8s.io/ingress-nginx/task"
)
const (
// TryTimes -
TryTimes = 2
)
// rainbond endpoints map
var rbdemap = make(map[string]struct{})
func init() {
rbdemap["APISERVER_ENDPOINTS"] = struct{}{}
rbdemap["APP_UI_ENDPOINTS"] = struct{}{}
rbdemap["HUB_ENDPOINTS"] = struct{}{}
rbdemap["REPO_ENDPOINTS"] = struct{}{}
}
// GWController -
type GWController struct {
@ -27,7 +36,7 @@ type GWController struct {
store store.Storer
syncQueue *task.Queue
syncRateLimiter flowcontrol.RateLimiter
syncRateLimiter flowcontrol.RateLimiter // TODO: use it
isShuttingDown bool
// stopLock is used to enforce that only a single call to Stop send at
@ -35,58 +44,35 @@ type GWController struct {
// allowing concurrent stoppers leads to stack traces.
stopLock *sync.Mutex
optionConfig option.Config
RunningConfig *v1.Config
RunningHttpPools []*v1.Pool
ocfg *option.Config
rcfg *v1.Config // running configuration
rhp []*v1.Pool // running http pools
stopCh chan struct{}
updateCh *channels.RingChannel
errCh chan error
}
func (gwc *GWController) syncGateway(key interface{}) error {
if gwc.syncQueue.IsShuttingDown() {
return nil
}
l7sv, l4sv := gwc.store.ListVirtualService()
httpPools, tcpPools := gwc.store.ListPool()
currentConfig := &v1.Config{
HTTPPools: httpPools,
TCPPools: tcpPools,
L7VS: l7sv,
L4VS: l4sv,
}
if gwc.RunningConfig.Equals(currentConfig) {
logrus.Info("No need to update running configuration.")
// refresh http pools dynamically
gwc.refreshPools(httpPools)
return nil
}
gwc.RunningConfig = currentConfig
err := gwc.GWS.PersistConfig(gwc.RunningConfig)
if err != nil {
logrus.Errorf("Fail to persist Nginx config: %v\n", err)
} else {
// refresh http pools dynamically
gwc.refreshPools(httpPools)
gwc.RunningHttpPools = httpPools
}
return nil
EtcdCli *client.Client
ctx context.Context
}
// Start starts Gateway
func (gwc *GWController) Start() error {
// start informer
gwc.store.Run(gwc.stopCh)
errCh := make(chan error)
gwc.GWS.Starts(errCh)
if gwc.ocfg.EnableRbdEndpoints {
go gwc.initRbdEndpoints()
go gwc.watchRbdEndpoints()
}
// start plugin(eg: nginx, zeus and etc)
errCh := make(chan error)
gwc.GWS.Start(errCh)
// start task queue
go gwc.syncQueue.Run(1*time.Second, gwc.stopCh)
// force initial sync
gwc.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))
@ -95,30 +81,6 @@ func (gwc *GWController) Start() error {
return nil
}
func (gwc *GWController) handleEvent(errCh chan error) {
for {
select {
case err := <-errCh:
if err != nil {
logrus.Debugf("Unexpected error: %v", err)
}
// TODO: 20181122 huangrh
case event := <-gwc.updateCh.Out():
if gwc.isShuttingDown {
break
}
if evt, ok := event.(store.Event); ok {
logrus.Debugf("Event %v received - object %v", evt.Type, evt.Obj)
gwc.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
glog.Warningf("Unexpected event type received %T", event)
}
case <-gwc.stopCh:
break
}
}
}
// Stop stops Gateway
func (gwc *GWController) Stop() error {
gwc.isShuttingDown = true
@ -137,19 +99,80 @@ func (gwc *GWController) Stop() error {
return gwc.GWS.Stop()
}
func (gwc *GWController) handleEvent(errCh chan error) {
for {
select {
case err := <-errCh:
if err != nil {
logrus.Debugf("Unexpected error: %v", err)
}
// TODO: 20181122 huangrh
case event := <-gwc.updateCh.Out(): // received k8s events
if gwc.isShuttingDown {
break
}
if evt, ok := event.(store.Event); ok {
logrus.Debugf("Event %v received - object %v", evt.Type, evt.Obj)
gwc.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
logrus.Warningf("Unexpected event type received %T", event)
}
case <-gwc.stopCh:
break
}
}
}
func (gwc *GWController) syncGateway(key interface{}) error {
if gwc.syncQueue.IsShuttingDown() {
return nil
}
l7sv, l4sv := gwc.store.ListVirtualService()
httpPools, tcpPools := gwc.store.ListPool()
currentConfig := &v1.Config{
HTTPPools: httpPools,
TCPPools: tcpPools,
L7VS: l7sv,
L4VS: l4sv,
}
if gwc.rcfg.Equals(currentConfig) {
logrus.Info("No need to update running configuration.")
// refresh http pools dynamically
gwc.refreshPools(httpPools)
return nil
}
gwc.rcfg = currentConfig
err := gwc.GWS.PersistConfig(gwc.rcfg)
if err != nil {
// TODO: if nginx is not ready, then stop gateway
logrus.Errorf("Fail to persist Nginx config: %v\n", err)
} else {
// refresh http pools dynamically
gwc.refreshPools(httpPools)
gwc.rhp = httpPools
}
return nil
}
// refreshPools refresh pools dynamically.
func (gwc *GWController) refreshPools(pools []*v1.Pool) {
gwc.GWS.WaitPluginReady()
delPools, updPools := gwc.getDelUpdPools(pools)
// delete delPools first, then update updPools
for i := 0; i < TryTimes; i++ {
tryTimes := 3
for i := 0; i < tryTimes; i++ {
err := gwc.GWS.DeletePools(delPools)
if err == nil {
break
}
}
for i := 0; i < TryTimes; i++ {
for i := 0; i < tryTimes; i++ {
err := gwc.GWS.UpdatePools(updPools)
if err == nil {
break
@ -161,7 +184,7 @@ func (gwc *GWController) refreshPools(pools []*v1.Pool) {
func (gwc *GWController) getDelUpdPools(updPools []*v1.Pool) ([]*v1.Pool, []*v1.Pool) {
// updPools need to delete
var delPools []*v1.Pool
for _, rPool := range gwc.RunningHttpPools {
for _, rPool := range gwc.rhp {
flag := false
for i, pool := range updPools {
if rPool.Equals(pool) {
@ -180,29 +203,180 @@ func (gwc *GWController) getDelUpdPools(updPools []*v1.Pool) ([]*v1.Pool, []*v1.
}
//NewGWController new Gateway controller
func NewGWController(config *option.Config, errCh chan error) *GWController {
logrus.Debug("NewGWController...")
func NewGWController(ctx context.Context, cfg *option.Config, errCh chan error, ) *GWController {
gwc := &GWController{
updateCh: channels.NewRingChannel(1024),
errCh: errCh,
stopLock: &sync.Mutex{},
stopCh: make(chan struct{}),
ocfg: cfg,
ctx: ctx,
}
gws := openresty.CreateOpenrestyService(config, &gwc.isShuttingDown)
gwc.GWS = gws
if cfg.EnableRbdEndpoints {
// create etcd client
cli, err := client.New(client.Config{
Endpoints: cfg.EtcdEndPoints,
DialTimeout: time.Duration(cfg.EtcdTimeout) * time.Second,
})
if err != nil {
logrus.Error(err)
errCh <- err
}
gwc.EtcdCli = cli
}
clientSet, err := NewClientSet(config.K8SConfPath)
gwc.GWS = openresty.CreateOpenrestyService(cfg, &gwc.isShuttingDown)
clientSet, err := NewClientSet(cfg.K8SConfPath)
if err != nil {
logrus.Error("can't create kubernetes's client.")
errCh <- err
}
gwc.store = store.New(
clientSet,
gwc.updateCh,
config)
cfg)
gwc.syncQueue = task.NewTaskQueue(gwc.syncGateway)
return gwc
}
// initRbdEndpoints inits rainbond endpoints
func (gwc *GWController) initRbdEndpoints() {
gwc.GWS.WaitPluginReady()
// get endpoints for etcd
resp, err := gwc.EtcdCli.Get(gwc.ctx, gwc.ocfg.RbdEndpointsKey, client.WithPrefix())
if err != nil {
// error occurred -> stop gateway
gwc.errCh <- err
}
for _, kv := range resp.Kvs {
logrus.Debugf("key: %s; value: %s\n", string(kv.Key), string(kv.Value))
key := strings.Replace(string(kv.Key), gwc.ocfg.RbdEndpointsKey, "", -1)
// skip unexpected key
if _, ok := rbdemap[key]; !ok {
continue
}
var data []string
val := strings.Replace(string(kv.Value), "http://", "", -1)
if err := json.Unmarshal([]byte(val), &data); err != nil {
// error occurred -> stop gateway
gwc.errCh <- err
}
switch key {
case "REPO_ENDPOINTS":
pools := getPool(data, "lang", "maven")
if pools[0].Nodes == nil || len(pools[0].Nodes) == 0 {
gwc.errCh <- fmt.Errorf("there is no endpoints for REPO_ENDPOINTS")
}
err := gwc.GWS.UpdatePools(pools)
if err != nil {
logrus.Warningf("Unexpected error whiling updating pools: %v", err)
gwc.errCh <- fmt.Errorf("Unexpected error whiling updating pools: %v", err)
}
case "HUB_ENDPOINTS":
pools := getPool(data, "registry")
if pools[0].Nodes == nil || len(pools[0].Nodes) == 0 {
gwc.errCh <- fmt.Errorf("there is no endpoints for REPO_ENDPOINTS")
}
err := gwc.GWS.UpdatePools(pools)
if err != nil {
logrus.Warningf("Unexpected error whiling updating pools: %v", err)
gwc.errCh <- fmt.Errorf("Unexpected error whiling updating pools: %v", err)
}
}
}
}
// watchRbdEndpoints watches the change of Rainbond endpoints
func (gwc *GWController) watchRbdEndpoints() {
gwc.GWS.WaitPluginReady()
logrus.Infof("Start watching Rainbond servers. Watch key: %s", gwc.ocfg.RbdEndpointsKey)
rch := gwc.EtcdCli.Watch(gwc.ctx, gwc.ocfg.RbdEndpointsKey, client.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
// APISERVER_ENDPOINTS(external), APP_UI_ENDPOINTS, HUB_ENDPOINTS, REPO_ENDPOINTS
logrus.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
gwc.updDelPools(ev)
}
}
}
// updDelPools updates or deletes pools of rainbond endpoints
func (gwc *GWController) updDelPools(ev *client.Event) {
key := strings.Replace(string(ev.Kv.Key), gwc.ocfg.RbdEndpointsKey, "", -1)
if ev.IsCreate() || ev.IsModify() {
var data []string
val := strings.Replace(string(ev.Kv.Value), "http://", "", -1)
if err := json.Unmarshal([]byte(val), &data); err != nil {
logrus.Warningf("Unexpected error while unmarshaling string(%s) to slice: %v", val, err)
return
}
switch key {
case "REPO_ENDPOINTS":
pools := getPool(data, "lang", "maven")
err := gwc.GWS.UpdatePools(pools)
if err != nil {
logrus.Warningf("Unexpected error whiling updating pools: %v", err)
}
case "HUB_ENDPOINTS":
pools := getPool(data, "registry")
err := gwc.GWS.UpdatePools(pools)
if err != nil {
logrus.Warningf("Unexpected error whiling updating pools: %v", err)
}
}
} else {
switch key {
case "REPO_ENDPOINTS":
pools := getPool(nil, "lang", "maven")
err := gwc.GWS.DeletePools(pools)
if err != nil {
logrus.Warningf("Unexpected error whiling deleting pools: %v", err)
}
case "HUB_ENDPOINTS":
pools := getPool(nil, "registry")
err := gwc.GWS.DeletePools(pools)
if err != nil {
logrus.Warningf("Unexpected error whiling deleting pools: %v", err)
}
}
}
}
func getPool(data []string, names ...string) []*v1.Pool {
var nodes []*v1.Node
if data != nil || len(data) > 0 {
for _, d := range data {
s := strings.Split(d, ":")
p, err := strconv.Atoi(s[1])
if err != nil {
logrus.Warningf("Can't convert string(%s) to int", s[1])
continue
}
n := &v1.Node{
Host: s[0],
Port: int32(p),
}
nodes = append(nodes, n)
}
}
var pools []*v1.Pool
for _, name := range names {
pool := &v1.Pool{
Meta: v1.Meta{
Name: name,
},
}
pool.Nodes = nodes
pools = append(pools, pool)
}
return pools
}

View File

@ -19,8 +19,10 @@
package controller
import (
"github.com/coreos/etcd/clientv3"
"github.com/goodrain/rainbond/gateway/v1"
"testing"
"time"
)
func TestController_GetDelUpdPools(t *testing.T) {
@ -47,7 +49,7 @@ func TestController_GetDelUpdPools(t *testing.T) {
currentHttpPools = append(currentHttpPools, fooPools...)
gwc := &GWController{
RunningHttpPools: runningHttpPools,
rhp: runningHttpPools,
}
del, upd := gwc.getDelUpdPools(currentHttpPools)
if !poolsIsEqual(delPools, del) {
@ -57,7 +59,7 @@ func TestController_GetDelUpdPools(t *testing.T) {
t.Errorf("upd should equal udpPools.")
}
gwc.RunningHttpPools = fooPools
gwc.rhp = fooPools
currentHttpPools = fooPools
del, upd = gwc.getDelUpdPools(currentHttpPools)
if len(del) != 0 {
@ -68,6 +70,22 @@ func TestController_GetDelUpdPools(t *testing.T) {
}
}
func TestGWController_WatchRbdEndpoints(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 3 * time.Second,
})
if err != nil {
t.Fatal(err)
}
defer cli.Close()
gwc := GWController{
EtcdCli: cli,
}
go gwc.watchRbdEndpoints()
}
func poolsIsEqual(old []*v1.Pool, new []*v1.Pool) bool {
if len(old) != len(new) {
return false

View File

@ -4,12 +4,14 @@ import "github.com/goodrain/rainbond/gateway/v1"
// Server sets configuration for a virtual server...
type Server struct {
Listen string // DefaultType: listen *:80 | *:8000; Sets the address and port for IP, or the path for a UNIX-domain socket on which the server will accept requests
ServerName string // Sets names of a virtual server
KeepaliveTimeout Time // DefaultType 60s. Sets a timeout during which an idle keepalive connection to an upstream server will stay open.
DefaultType string // Defines the default MIME type of a response.
Charset string // Adds the specified charset to the “Content-Type” response header field.
ServerTokens bool // Enables or disables emitting nginx version on error pages and in the “Server” response header field.
Listen string // DefaultType: listen *:80 | *:8000; Sets the address and port for IP, or the path for a UNIX-domain socket on which the server will accept requests
ServerName string // Sets names of a virtual server
KeepaliveTimeout Time // DefaultType 60s. Sets a timeout during which an idle keepalive connection to an upstream server will stay open.
DefaultType string // Defines the default MIME type of a response.
Charset string // Adds the specified charset to the “Content-Type” response header field.
ServerTokens bool // Enables or disables emitting nginx version on error pages and in the “Server” response header field.
ClientMaxBodySize Size // Sets the maximum allowed size of the client request body
ChunkedTransferEncoding bool // Allows disabling chunked transfer encoding in HTTP/1.1
ProxyConnectTimeout Time
ProxyTimeout Time
@ -52,6 +54,19 @@ type Location struct {
Rewrites []Rewrite
Return Return
// Sets the protocol and address of a proxied server and an optional URI to which a location should be mapped
ProxyPass string
NameCondition map[string]*v1.Condition
ProxyPass string
ProxySetHeaders []*ProxySetHeader
ProxyRedirect string // Sets the text that should be changed in the “Location” and “Refresh” header fields of a proxied server response
ProxyConnectTimeout Time // Defines a timeout for establishing a connection with a proxied server
ProxyReadTimeout Time // Defines a timeout for reading a response from the proxied server.
ProxySendTimeout Time // Sets a timeout for transmitting a request to the proxied server.
DisableProxyPass bool
NameCondition map[string]*v1.Condition
}
// ProxySetHeader allows redefining or appending fields to the request header passed to the proxied server.
type ProxySetHeader struct {
Field string
Value string
}

View File

@ -0,0 +1,124 @@
package openresty
import (
"fmt"
"github.com/goodrain/rainbond/gateway/controller/openresty/model"
"github.com/goodrain/rainbond/gateway/v1"
)
func langGoodrainMe() (*model.Server, *model.Upstream) {
svr := &model.Server{
Listen: fmt.Sprintf("%s:%d", "0.0.0.0", 80), // TODO: change ip address
ServerName: "lang.goodrain.me",
Rewrites: []model.Rewrite{
{
Regex: "^/(.*)$",
Replacement: "/artifactory/pkg_lang/$1",
Flag: "break",
},
},
Locations: []*model.Location{
{
Path: "/",
ProxyRedirect: "off",
ProxyConnectTimeout: model.Time{Num: 60, Unit: "s"},
ProxyReadTimeout: model.Time{Num: 600, Unit: "s"},
ProxySendTimeout: model.Time{Num: 600, Unit: "s"},
ProxySetHeaders: []*model.ProxySetHeader{
{Field: "Host", Value: "$http_host"},
{Field: "X-Real-IP", Value: "$remote_addr"},
{Field: "X-Forwarded-For", Value: "$proxy_add_x_forwarded_for"},
},
NameCondition: map[string]*v1.Condition{
"lang": {
Type: v1.DefaultType,
Value: map[string]string{"1": "1"},
},
},
},
},
}
us := &model.Upstream{
Name: "lang",
}
return svr, us
}
func mavenGoodrainMe() (*model.Server, *model.Upstream) {
svr := &model.Server{
Listen: fmt.Sprintf("%s:%d", "0.0.0.0", 80),
ServerName: "maven.goodrain.me",
Locations: []*model.Location{
{
Path: "/",
Rewrites: []model.Rewrite{
{
Regex: "^/(.*)$",
Replacement: "/artifactory/libs-release/$1",
Flag: "break",
},
},
ProxyRedirect: "off",
ProxyConnectTimeout: model.Time{Num: 60, Unit: "s"},
ProxyReadTimeout: model.Time{Num: 600, Unit: "s"},
ProxySendTimeout: model.Time{Num: 600, Unit: "s"},
ProxySetHeaders: []*model.ProxySetHeader{
{Field: "Host", Value: "$http_host"},
{Field: "X-Real-IP", Value: "$remote_addr"},
{Field: "X-Forwarded-For", Value: "$proxy_add_x_forwarded_for"},
},
NameCondition: map[string]*v1.Condition{
"maven": {
Type: v1.DefaultType,
Value: map[string]string{"1": "1"},
},
},
},
{
Path: "/monitor",
Return: model.Return{Code: 204},
DisableProxyPass: true,
},
},
}
us := &model.Upstream{
Name: "maven",
}
return svr, us
}
func goodrainMe() (*model.Server, *model.Upstream) {
svr := &model.Server{
Listen: fmt.Sprintf("%s:%d %s", "0.0.0.0", 443, "ssl"),
ServerName: "goodrain.me",
SSLCertificate: "server.crt",
SSLCertificateKey: "server.key",
ClientMaxBodySize: model.Size{Num:0, Unit:"k"},
ChunkedTransferEncoding: true,
Locations: []*model.Location{
{
Path: "/v2/",
ProxySetHeaders: []*model.ProxySetHeader{
{Field: "Host", Value: "$http_host"},
{Field: "X-Real-IP", Value: "$remote_addr"},
{Field: "X-Forwarded-For", Value: "$proxy_add_x_forwarded_for"},
{Field: "X-Forwarded-Proto", Value: "$scheme"},
},
ProxyReadTimeout: model.Time{
Num: 900,
Unit: "s",
},
NameCondition: map[string]*v1.Condition{
"registry": {
Type: v1.DefaultType,
Value: map[string]string{"1": "1"},
},
},
},
},
}
us := &model.Upstream{
Name: "registry",
}
return svr, us
}

View File

@ -20,8 +20,8 @@ import (
"k8s.io/ingress-nginx/ingress/controller/process"
)
// OpenrestyService handles the business logic of OpenrestyService
type OpenrestyService struct {
// OrService handles the business logic of OpenrestyService
type OrService struct {
AuxiliaryPort int
IsShuttingDown *bool
@ -33,8 +33,8 @@ type OpenrestyService struct {
}
//CreateOpenrestyService create openresty service
func CreateOpenrestyService(config *option.Config, isShuttingDown *bool) *OpenrestyService {
gws := &OpenrestyService{
func CreateOpenrestyService(config *option.Config, isShuttingDown *bool) *OrService {
gws := &OrService{
AuxiliaryPort: config.ListenPorts.AuxiliaryPort,
IsShuttingDown: isShuttingDown,
config: config,
@ -50,29 +50,24 @@ type Upstream struct {
// Server belongs to Upstream
type Server struct {
Host string
Port int32
Host string
Port int32
Weight int
}
//Start start
func (osvc *OpenrestyService) Start() error {
nginx := model.NewNginx(*osvc.config, template.CustomConfigPath)
if err := template.NewNginxTemplate(nginx, defaultNginxConf); err != nil {
return err
}
o, err := nginxExecCommand().CombinedOutput()
if err != nil {
return fmt.Errorf("%v\n%v", err, string(o))
}
return nil
}
// Starts starts nginx
func (osvc *OpenrestyService) Starts(errCh chan error) {
// Start starts nginx
func (osvc *OrService) Start(errCh chan error) {
// generate default nginx.conf
nginx := model.NewNginx(*osvc.config, template.CustomConfigPath)
nginx.HTTP = model.NewHTTP(osvc.config)
if err := template.NewNginxTemplate(nginx, defaultNginxConf); err != nil {
logrus.Fatalf("Can't not new nginx config: %v", err)
logrus.Fatalf("Can't not new nginx config: %v", err) // TODO: send err to errCh???
}
if osvc.config.EnableRbdEndpoints {
if err := osvc.newRbdServers(); err != nil {
errCh <- err // TODO: consider if it is right
}
}
cmd := nginxExecCommand()
@ -90,7 +85,7 @@ func (osvc *OpenrestyService) Starts(errCh chan error) {
}
// Stop gracefully stops the NGINX master process.
func (osvc *OpenrestyService) Stop() error {
func (osvc *OrService) Stop() error {
// send stop signal to NGINX
logrus.Info("Stopping NGINX process")
cmd := nginxExecCommand("-s", "quit")
@ -115,7 +110,7 @@ func (osvc *OpenrestyService) Stop() error {
}
// PersistConfig persists config
func (osvc *OpenrestyService) PersistConfig(conf *v1.Config) error {
func (osvc *OrService) PersistConfig(conf *v1.Config) error {
// delete the old configuration
if err := os.RemoveAll(template.CustomConfigPath); err != nil {
logrus.Errorf("Cant not remove directory(%s): %v", template.CustomConfigPath, err)
@ -164,7 +159,7 @@ func (osvc *OpenrestyService) PersistConfig(conf *v1.Config) error {
}
// persistUpstreams persists upstreams
func (osvc *OpenrestyService) persistUpstreams(pools []*v1.Pool, tmpl string, filename string) error {
func (osvc *OrService) persistUpstreams(pools []*v1.Pool, tmpl string, filename string) error {
var upstreams []model.Upstream
for _, pool := range pools {
upstream := model.Upstream{}
@ -187,6 +182,11 @@ func (osvc *OpenrestyService) persistUpstreams(pools []*v1.Pool, tmpl string, fi
logrus.Errorf("Fail to new nginx Upstream config file: %v", err)
return err
}
if err := template.NewUpdateUpsTemplate(upstreams,
"update-ups.tmpl", "/run/nginx/", "update-ups.conf"); err != nil {
logrus.Errorf("Fail to new update-ups.conf: %v", err)
return err
}
}
return nil
}
@ -224,7 +224,7 @@ func getNgxServer(conf *v1.Config) (l7srv []*model.Server, l4srv []*model.Server
}
// UpdatePools updates http upstreams dynamically.
func (osvc *OpenrestyService) UpdatePools(pools []*v1.Pool) error {
func (osvc *OrService) UpdatePools(pools []*v1.Pool) error {
if len(pools) == 0 {
return nil
}
@ -234,8 +234,9 @@ func (osvc *OpenrestyService) UpdatePools(pools []*v1.Pool) error {
upstream.Name = pool.Name
for _, node := range pool.Nodes {
server := &Server{
Host: node.Host,
Port: node.Port,
Host: node.Host,
Port: node.Port,
Weight: node.Weight,
}
upstream.Servers = append(upstream.Servers, server)
}
@ -245,7 +246,7 @@ func (osvc *OpenrestyService) UpdatePools(pools []*v1.Pool) error {
}
// updateUpstreams updates the upstreams in ngx.shared.dict by post
func (osvc *OpenrestyService) updateUpstreams(upstream []*Upstream) error {
func (osvc *OrService) updateUpstreams(upstream []*Upstream) error {
url := fmt.Sprintf("http://127.0.0.1:%v/update-upstreams", osvc.AuxiliaryPort)
data, _ := json.Marshal(upstream)
logrus.Debugf("request contest of update-upstreams is %v", string(data))
@ -269,7 +270,7 @@ func (osvc *OpenrestyService) updateUpstreams(upstream []*Upstream) error {
}
// DeletePools deletes pools
func (osvc *OpenrestyService) DeletePools(pools []*v1.Pool) error {
func (osvc *OrService) DeletePools(pools []*v1.Pool) error {
if len(pools) == 0 {
return nil
}
@ -280,7 +281,7 @@ func (osvc *OpenrestyService) DeletePools(pools []*v1.Pool) error {
return osvc.deletePools(data)
}
func (osvc *OpenrestyService) deletePools(names []string) error {
func (osvc *OrService) deletePools(names []string) error {
url := fmt.Sprintf("http://127.0.0.1:%v/delete-upstreams", osvc.AuxiliaryPort)
data, _ := json.Marshal(names)
logrus.Debugf("request content of delete-upstreams is %v", string(data))
@ -297,7 +298,7 @@ func (osvc *OpenrestyService) deletePools(names []string) error {
}
// WaitPluginReady waits for nginx to be ready.
func (osvc *OpenrestyService) WaitPluginReady() {
func (osvc *OrService) WaitPluginReady() {
url := fmt.Sprintf("http://127.0.0.1:%v/healthz", osvc.AuxiliaryPort)
for {
resp, err := http.Get(url)
@ -309,3 +310,34 @@ func (osvc *OpenrestyService) WaitPluginReady() {
time.Sleep(1 * time.Second)
}
}
// newRbdServers creates new configuration file for Rainbond servers
func (osvc *OrService) newRbdServers() error {
cfgPath := "/run/nginx/rainbond"
// delete the old configuration
if err := os.RemoveAll(cfgPath); err != nil {
logrus.Errorf("Cant not remove directory(%s): %v", cfgPath, err)
return err
}
lesrv, leus := langGoodrainMe()
mesrv, meus := mavenGoodrainMe()
gesrv, geus := goodrainMe()
if err := template.NewServerTemplateWithCfgPath([]*model.Server{
lesrv,
mesrv,
gesrv,
}, cfgPath, "servers.default.http.conf"); err != nil {
return err
}
// upstreams
if err := template.NewUpstreamTemplateWithCfgPath([]*model.Upstream{
leus,
meus,
geus,
}, "upstreams-http.tmpl", cfgPath, "upstreams.default.http.conf"); err != nil {
return err
}
return nil
}

View File

@ -5,8 +5,7 @@ import (
)
type GWServicer interface {
Start() error
Starts(errCh chan error)
Start(errCh chan error)
PersistConfig(conf *v1.Config) error
UpdatePools(pools []*v1.Pool) error
DeletePools(pools []*v1.Pool) error