Merge branch 'buildjob' of https://github.com/fanyangyang/rainbond into buildjob

This commit is contained in:
凡羊羊 2020-01-19 10:46:31 +08:00
commit b8f6296891
18 changed files with 100 additions and 161 deletions

View File

@ -19,10 +19,8 @@
package handler
import (
"context"
"time"
"github.com/Sirupsen/logrus"
"github.com/coreos/etcd/clientv3"
api_db "github.com/goodrain/rainbond/api/db"
"github.com/goodrain/rainbond/api/handler/group"
"github.com/goodrain/rainbond/api/handler/share"
@ -33,14 +31,7 @@ import (
)
//InitHandle 初始化handle
func InitHandle(conf option.Config, statusCli *client.AppRuntimeSyncClient) error {
etcdClientArgs := &etcdutil.ClientArgs{
Endpoints: conf.EtcdEndpoint,
CaFile: conf.EtcdCaFile,
CertFile: conf.EtcdCertFile,
KeyFile: conf.EtcdKeyFile,
DialTimeout: 10 * time.Second,
}
func InitHandle(conf option.Config, etcdClientArgs *etcdutil.ClientArgs, statusCli *client.AppRuntimeSyncClient, etcdcli *clientv3.Client) error {
mq := api_db.MQManager{
EtcdClientArgs: etcdClientArgs,
DefaultServer: conf.MQAPI,
@ -50,36 +41,29 @@ func InitHandle(conf option.Config, statusCli *client.AppRuntimeSyncClient) erro
logrus.Errorf("new MQ manager failed, %v", errMQ)
return errMQ
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
etcdCli, err := etcdutil.NewClient(ctx, etcdClientArgs)
if err != nil {
logrus.Errorf("create etcd client v3 error, %v", err)
return err
}
dbmanager := db.GetManager()
defaultServieHandler = CreateManager(conf, mqClient, etcdCli, statusCli)
defaultServieHandler = CreateManager(conf, mqClient, etcdcli, statusCli)
defaultPluginHandler = CreatePluginManager(mqClient)
defaultAppHandler = CreateAppManager(mqClient)
defaultTenantHandler = CreateTenManager(mqClient, statusCli, &conf)
defaultNetRulesHandler = CreateNetRulesManager(etcdCli)
defaultNetRulesHandler = CreateNetRulesManager(etcdcli)
defaultCloudHandler = CreateCloudManager(conf)
defaultAPPBackupHandler = group.CreateBackupHandle(mqClient, statusCli, etcdCli)
defaultAPPBackupHandler = group.CreateBackupHandle(mqClient, statusCli, etcdcli)
//需要使用etcd v2 API TODO fanyangyang
defaultEventHandler = CreateLogManager(conf.EtcdEndpoint)
shareHandler = &share.ServiceShareHandle{MQClient: mqClient, EtcdCli: etcdCli}
pluginShareHandler = &share.PluginShareHandle{MQClient: mqClient, EtcdCli: etcdCli}
shareHandler = &share.ServiceShareHandle{MQClient: mqClient, EtcdCli: etcdcli}
pluginShareHandler = &share.PluginShareHandle{MQClient: mqClient, EtcdCli: etcdcli}
if err := CreateTokenIdenHandler(conf); err != nil {
logrus.Errorf("create token identification mannager error, %v", err)
return err
}
defaultGatewayHandler = CreateGatewayManager(dbmanager, mqClient, etcdCli)
defaultGatewayHandler = CreateGatewayManager(dbmanager, mqClient, etcdcli)
def3rdPartySvcHandler = Create3rdPartySvcHandler(dbmanager, statusCli)
operationHandler = CreateOperationHandler(mqClient)
batchOperationHandler = CreateBatchOperationHandler(mqClient, operationHandler)
defaultAppRestoreHandler = NewAppRestoreHandler()
defPodHandler = NewPodHandler(statusCli)
defaultVolumeTypeHandler = CreateVolumeTypeManger(statusCli)
return nil

View File

@ -24,14 +24,13 @@ import (
"strings"
"time"
"github.com/pquerna/ffjson/ffjson"
"github.com/Sirupsen/logrus"
api_model "github.com/goodrain/rainbond/api/model"
"github.com/goodrain/rainbond/api/util"
"github.com/goodrain/rainbond/builder/exector"
client "github.com/goodrain/rainbond/mq/client"
tutil "github.com/goodrain/rainbond/util"
"github.com/pquerna/ffjson/ffjson"
"github.com/twinj/uuid"
)
@ -82,8 +81,8 @@ func (s *ServiceAction) GetServiceCheckInfo(uuid string) (*exector.ServiceCheckR
k := fmt.Sprintf("/servicecheck/%s", uuid)
var si exector.ServiceCheckResult
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := s.EtcdCli.Get(ctx, k)
cancel()
if err != nil {
logrus.Errorf("get etcd k %s error, %v", k, err)
return nil, util.CreateAPIHandleError(500, err)

View File

@ -48,7 +48,6 @@ import (
"github.com/Sirupsen/logrus"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
etcdutil "github.com/goodrain/rainbond/util/etcd"
)
//Manager apiserver
@ -59,10 +58,11 @@ type Manager struct {
stopChan chan struct{}
r *chi.Mux
prometheusProxy proxy.Proxy
etcdcli *clientv3.Client
}
//NewManager newManager
func NewManager(c option.Config) *Manager {
func NewManager(c option.Config, etcdcli *clientv3.Client) *Manager {
ctx, cancel := context.WithCancel(context.Background())
//controller.CreateV2RouterManager(c)
r := chi.NewRouter()
@ -108,6 +108,7 @@ func NewManager(c option.Config) *Manager {
conf: c,
stopChan: make(chan struct{}),
r: r,
etcdcli: etcdcli,
}
}
@ -194,21 +195,10 @@ func (m *Manager) Run() {
//EventLogInstance 查询event server instance
func (m *Manager) EventLogInstance(w http.ResponseWriter, r *http.Request) {
etcdClientArgs := &etcdutil.ClientArgs{
Endpoints: m.conf.EtcdEndpoint,
CaFile: m.conf.EtcdCaFile,
CertFile: m.conf.EtcdCertFile,
KeyFile: m.conf.EtcdKeyFile,
DialTimeout: 10 * time.Second,
}
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(m.ctx)
defer cancel()
etcdclient, err := etcdutil.NewClient(ctx, etcdClientArgs)
if err != nil {
w.WriteHeader(500)
return
}
res, err := etcdclient.Get(ctx, "/event/instance", clientv3.WithPrefix())
res, err := m.etcdcli.Get(ctx, "/event/instance", clientv3.WithPrefix())
if err != nil {
w.WriteHeader(500)
return

View File

@ -95,7 +95,6 @@ func NewManager(conf option.Config, mqc mqclient.MQClient) (Manager, error) {
ctx, cancel := context.WithCancel(context.Background())
etcdCli, err := etcdutil.NewClient(ctx, etcdClientArgs)
if err != nil {
cancel()
return nil, err
}
var maxConcurrentTask int

View File

@ -39,6 +39,9 @@ import (
//Run start run
func Run(s *option.APIServer) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errChan := make(chan error)
etcdClientArgs := &etcdutil.ClientArgs{
Endpoints: s.Config.EtcdEndpoint,
@ -68,8 +71,6 @@ func Run(s *option.APIServer) error {
}
defer event.CloseManager()
//create app status client
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli, err := client.NewClient(ctx, client.AppRuntimeSyncClientConf{
EtcdEndpoints: s.Config.EtcdEndpoint,
EtcdCaFile: s.Config.EtcdCaFile,
@ -80,10 +81,17 @@ func Run(s *option.APIServer) error {
logrus.Errorf("create app status client error, %v", err)
return err
}
etcdcli, err := etcdutil.NewClient(ctx, etcdClientArgs)
if err != nil {
logrus.Errorf("create etcd client v3 error, %v", err)
return err
}
//初始化 middleware
handler.InitProxy(s.Config)
//创建handle
if err := handler.InitHandle(s.Config, cli); err != nil {
if err := handler.InitHandle(s.Config, etcdClientArgs, cli, etcdcli); err != nil {
logrus.Errorf("init all handle error, %v", err)
return err
}
@ -92,7 +100,7 @@ func Run(s *option.APIServer) error {
logrus.Errorf("create v2 route manager error, %v", err)
}
// 启动api
apiManager := server.NewManager(s.Config)
apiManager := server.NewManager(s.Config, etcdcli)
if err := apiManager.Start(); err != nil {
return err
}

View File

@ -204,22 +204,12 @@ func (a *Conf) SetLog() {
}
//ParseClient handle config and create some api
func (a *Conf) ParseClient() (err error) {
func (a *Conf) ParseClient(ctx context.Context, etcdClientArgs *etcdutil.ClientArgs) (err error) {
a.DockerCli, err = dockercli.NewEnvClient()
if err != nil {
return err
}
logrus.Infof("begin create etcd client: %s", a.EtcdEndpoints)
etcdClientArgs := &etcdutil.ClientArgs{
Endpoints: a.EtcdEndpoints,
CaFile: a.EtcdCaFile,
CertFile: a.EtcdCertFile,
KeyFile: a.EtcdKeyFile,
AutoSyncInterval: time.Second * 30,
DialTimeout: a.EtcdDialTimeout,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
a.EtcdCli, err = etcdutil.NewClient(ctx, etcdClientArgs)
if err != nil {

View File

@ -19,28 +19,25 @@
package server
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"github.com/goodrain/rainbond/node/nodem/docker"
"github.com/goodrain/rainbond/node/nodem/envoy"
"github.com/goodrain/rainbond/cmd/node/option"
eventLog "github.com/goodrain/rainbond/event"
"github.com/goodrain/rainbond/node/api"
"github.com/goodrain/rainbond/node/api/controller"
"github.com/goodrain/rainbond/node/core/store"
"github.com/goodrain/rainbond/node/kubecache"
"github.com/goodrain/rainbond/node/masterserver"
"github.com/goodrain/rainbond/node/nodem"
"github.com/goodrain/rainbond/node/nodem/docker"
"github.com/goodrain/rainbond/node/nodem/envoy"
etcdutil "github.com/goodrain/rainbond/util/etcd"
"github.com/Sirupsen/logrus"
eventLog "github.com/goodrain/rainbond/event"
"os/signal"
etcdutil "github.com/goodrain/rainbond/util/etcd"
)
//Run start run
@ -51,7 +48,18 @@ func Run(c *option.Conf) error {
return nil
}
startfunc := func() error {
if err := c.ParseClient(); err != nil {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
etcdClientArgs := &etcdutil.ClientArgs{
Endpoints: c.EtcdEndpoints,
CaFile: c.EtcdCaFile,
CertFile: c.EtcdCertFile,
KeyFile: c.EtcdKeyFile,
DialTimeout: c.EtcdDialTimeout,
}
if err := c.ParseClient(ctx, etcdClientArgs); err != nil {
return fmt.Errorf("config parse error:%s", err.Error())
}
@ -62,14 +70,7 @@ func Run(c *option.Conf) error {
if err := nodemanager.InitStart(); err != nil {
return err
}
errChan := make(chan error, 3)
etcdClientArgs := &etcdutil.ClientArgs{
Endpoints: c.EtcdEndpoints,
CaFile: c.EtcdCaFile,
CertFile: c.EtcdCertFile,
KeyFile: c.EtcdKeyFile,
DialTimeout: c.EtcdDialTimeout,
}
err = eventLog.NewManager(eventLog.EventConfig{
EventLogServers: c.EventLogServer,
DiscoverArgs: etcdClientArgs,
@ -95,9 +96,10 @@ func Run(c *option.Conf) error {
}
// init etcd client
if err = store.NewClient(c); err != nil {
if err = store.NewClient(ctx, c, etcdClientArgs); err != nil {
return fmt.Errorf("Connect to ETCD %s failed: %s", c.EtcdEndpoints, err)
}
errChan := make(chan error, 3)
if err := nodemanager.Start(errChan); err != nil {
return fmt.Errorf("start node manager failed: %s", err)
}

View File

@ -47,9 +47,7 @@ func CreateManager(config config.Config) (*Manager, error) {
CertFile: config.EtcdCertFile,
KeyFile: config.EtcdKeyFile,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli, err := etcdutil.NewClient(ctx, etcdClientArgs)
cli, err := etcdutil.NewClient(context.Background(), etcdClientArgs)
if err != nil {
etcdutil.HandleEtcdError(err)
return nil, err

View File

@ -35,6 +35,7 @@ import (
//KeepAlive 服务注册
type KeepAlive struct {
cancel context.CancelFunc
EtcdClientArgs *etcdutil.ClientArgs
ServerName string
HostName string
@ -61,12 +62,20 @@ func CreateKeepAlive(etcdClientArgs *etcdutil.ClientArgs, ServerName string, Pro
HostIP = ip.String()
}
ctx, cancel := context.WithCancel(context.Background())
etcdclient, err := etcdutil.NewClient(ctx, etcdClientArgs)
if err != nil {
return nil, err
}
k := &KeepAlive{
EtcdClientArgs: etcdClientArgs,
ServerName: ServerName,
Endpoint: fmt.Sprintf("%s:%d", HostIP, Port),
TTL: 5,
Done: make(chan struct{}),
etcdClient: etcdclient,
cancel: cancel,
}
if Protocol == "" {
k.Endpoint = fmt.Sprintf("%s:%d", HostIP, Port)
@ -80,13 +89,7 @@ func CreateKeepAlive(etcdClientArgs *etcdutil.ClientArgs, ServerName string, Pro
func (k *KeepAlive) Start() error {
duration := time.Duration(k.TTL) * time.Second
timer := time.NewTimer(duration)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
etcdclient, err := etcdutil.NewClient(ctx, k.EtcdClientArgs)
if err != nil {
return err
}
k.etcdClient = etcdclient
go func() {
for {
select {
@ -143,6 +146,8 @@ func (k *KeepAlive) reg() error {
func (k *KeepAlive) Stop() {
k.once.Do(func() {
close(k.Done)
k.cancel()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
if k.gRPCResolver != nil {

View File

@ -32,6 +32,7 @@ import (
//KeepAlive 服务注册
type KeepAlive struct {
cancel context.CancelFunc
EtcdClentArgs *etcdutil.ClientArgs
ServerName string
HostName string
@ -71,12 +72,12 @@ func (k *KeepAlive) Start() error {
duration := time.Duration(k.TTL) * time.Second
timer := time.NewTimer(duration)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
etcdclient, err := etcdutil.NewClient(ctx, k.EtcdClentArgs)
if err != nil {
return err
}
k.etcdClient = etcdclient
k.cancel = cancel
go func() {
for {
select {
@ -135,6 +136,8 @@ func (k *KeepAlive) reg() error {
//Stop 结束
func (k *KeepAlive) Stop() error {
close(k.Done)
defer k.cancel()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
if _, err := k.etcdClient.Delete(ctx, k.etcdKey()); err != nil {

View File

@ -187,9 +187,7 @@ func (d *EtcdDiscoverManager) Run() error {
CertFile: d.conf.EtcdCertFile,
KeyFile: d.conf.EtcdKeyFile,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d.etcdclientv3, err = etcdutil.NewClient(ctx, etcdClientArgs)
d.etcdclientv3, err = etcdutil.NewClient(d.context, etcdClientArgs)
if err != nil {
d.log.Error("Create etcd v3 client error.", err.Error())
return err

View File

@ -42,9 +42,11 @@ type IPManager interface {
Start() error
//An IP pool change triggers a forced update of the gateway policy
NeedUpdateGatewayPolicy() <-chan util.IPEVENT
Stop()
}
type ipManager struct {
cancel context.CancelFunc
IPPool *util.IPPool
ipLease map[string]clientv3.LeaseID
lock sync.Mutex
@ -65,12 +67,12 @@ func CreateIPManager(config option.Config) (IPManager, error) {
DialTimeout: time.Duration(config.EtcdTimeout) * time.Second,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
etcdCli, err := etcdutil.NewClient(ctx, etcdClientArgs)
if err != nil {
return nil, err
}
return &ipManager{
cancel: cancel,
IPPool: IPPool,
config: config,
etcdCli: etcdCli,
@ -159,3 +161,7 @@ func (i *ipManager) deleteIP(ips ...net.IP) {
delete(i.ipLease, ip.String())
}
}
func (i *ipManager) Stop() {
i.cancel()
}

View File

@ -46,6 +46,7 @@ func CreateNodeManager(config option.Config) (*NodeManager, error) {
if err := ipManager.Start(); err != nil {
return nil, err
}
defer ipManager.Stop()
nm.ipManager = ipManager
if ok := nm.checkGatewayPort(); !ok {
return nil, fmt.Errorf("Check gateway node port failure")

View File

@ -29,7 +29,6 @@ import (
"github.com/goodrain/rainbond/gateway/cluster"
"k8s.io/client-go/kubernetes"
"github.com/Sirupsen/logrus"
client "github.com/coreos/etcd/clientv3"
"github.com/eapache/channels"
@ -40,6 +39,7 @@ import (
v1 "github.com/goodrain/rainbond/gateway/v1"
etcdutil "github.com/goodrain/rainbond/util/etcd"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/ingress-nginx/task"
)
@ -238,8 +238,6 @@ func NewGWController(ctx context.Context, clientset kubernetes.Interface, cfg *o
KeyFile: cfg.EtcdKeyFile,
DialTimeout: time.Duration(cfg.EtcdTimeout) * time.Second,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli, err := etcdutil.NewClient(ctx, etcdClientArgs)
if err != nil {
return nil, err

View File

@ -78,9 +78,7 @@ func (e *etcdQueue) Start() error {
KeyFile: e.config.EtcdKeyFile,
DialTimeout: time.Duration(e.config.EtcdTimeout) * time.Second,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli, err := etcdutil.NewClient(ctx, etcdClientArgs)
cli, err := etcdutil.NewClient(context.Background(), etcdClientArgs)
if err != nil {
etcdutil.HandleEtcdError(err)
return err

View File

@ -46,16 +46,7 @@ type Client struct {
}
//NewClient 创建client
func NewClient(cfg *conf.Conf) (err error) {
etcdClientArgs := &etcdutil.ClientArgs{
Endpoints: cfg.EtcdEndpoints,
CaFile: cfg.EtcdCaFile,
CertFile: cfg.EtcdCertFile,
KeyFile: cfg.EtcdKeyFile,
DialTimeout: cfg.EtcdDialTimeout,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func NewClient(ctx context.Context, cfg *conf.Conf, etcdClientArgs *etcdutil.ClientArgs) (err error) {
cli, err := etcdutil.NewClient(ctx, etcdClientArgs)
if err != nil {
return

View File

@ -20,13 +20,12 @@ package etcd
import (
"errors"
"github.com/coreos/etcd/pkg/transport"
"time"
"github.com/Sirupsen/logrus"
"github.com/coreos/etcd/clientv3"
v3 "github.com/coreos/etcd/clientv3"
spb "github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/transport"
"golang.org/x/net/context"
)
@ -81,66 +80,38 @@ type ClientArgs struct {
var (
// for parsing ca from k8s object
defaultCAPath string = ""
defaultCertPath string = ""
defaultKeyPath string = ""
defaultDialTimeout time.Duration = 10 * time.Second
defaultAotuSyncInterval time.Duration = 30 * time.Second
defaultEndpoints []string = []string{"127.0.0.1:2379"}
defaultDialTimeout = 10 * time.Second
defaultAotuSyncInterval = 30 * time.Second
)
// NewClient new etcd client v3 for all rainbond module, attention: do not support v2
func NewClient(ctx context.Context, clientArgs *ClientArgs) (*v3.Client, error) {
if clientArgs == nil {
return nil, errors.New("etcd args is nil")
}
if clientArgs.Endpoints == nil || len(clientArgs.Endpoints) == 0 { // TODO if endpoint contain invalid value
logrus.Warning("create etcd client without endpoint, use default endpoint 127.0.0.1:2379")
clientArgs.Endpoints = defaultEndpoints
}
if clientArgs.CaFile != "" && clientArgs.CertFile != "" && clientArgs.KeyFile != "" {
defaultCAPath = clientArgs.CaFile
defaultCertPath = clientArgs.CertFile
defaultKeyPath = clientArgs.KeyFile
}
if clientArgs.DialTimeout <= 10 {
if clientArgs.DialTimeout <= 5 {
clientArgs.DialTimeout = defaultDialTimeout
}
if clientArgs.AutoSyncInterval <= 30 {
clientArgs.AutoSyncInterval = defaultAotuSyncInterval
}
if defaultCAPath == "" && defaultCertPath == "" && defaultKeyPath == "" {
// create etcd client without tls
config := clientv3.Config{
Endpoints: clientArgs.Endpoints,
Context: ctx,
DialTimeout: clientArgs.DialTimeout,
AutoSyncInterval: clientArgs.AutoSyncInterval,
config := clientv3.Config{
Context: ctx,
Endpoints: clientArgs.Endpoints,
DialTimeout: clientArgs.DialTimeout,
AutoSyncInterval: clientArgs.AutoSyncInterval,
}
if clientArgs.CaFile != "" && clientArgs.CertFile != "" && clientArgs.KeyFile != "" {
// create etcd client with tls
tlsInfo := transport.TLSInfo{
CertFile: clientArgs.CertFile,
KeyFile: clientArgs.KeyFile,
TrustedCAFile: clientArgs.CaFile,
}
client, err := clientv3.New(config)
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
return client, nil
}
// create etcd client with tls
tlsInfo := transport.TLSInfo{
CertFile: clientArgs.CertFile,
KeyFile: clientArgs.KeyFile,
TrustedCAFile: clientArgs.CaFile,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
config := clientv3.Config{
Endpoints: clientArgs.Endpoints,
Context: ctx,
DialTimeout: clientArgs.DialTimeout,
AutoSyncInterval: clientArgs.AutoSyncInterval,
TLS: tlsConfig,
config.TLS = tlsConfig
}
return clientv3.New(config)
}

View File

@ -59,12 +59,10 @@ func NewClient(ctx context.Context, conf AppRuntimeSyncClientConf) (*AppRuntimeS
CertFile: conf.EtcdCertFile,
KeyFile: conf.EtcdKeyFile,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := etcdutil.NewClient(ctx, etcdClientArgs)
r := &etcdnaming.GRPCResolver{Client: c}
b := grpc.RoundRobin(r)
arsc.cc, err = grpc.DialContext(ctx, "/rainbond/discover/app_sync_runtime_server", grpc.WithBalancer(b), grpc.WithInsecure())
arsc.cc, err = grpc.DialContext(ctx, "/rainbond/discover/app_sync_runtime_server", grpc.WithBalancer(b), grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return nil, err
}