diff --git a/cmd/worker/option/option.go b/cmd/worker/option/option.go index c4b90f12b..3e30e993e 100644 --- a/cmd/worker/option/option.go +++ b/cmd/worker/option/option.go @@ -41,6 +41,8 @@ type Config struct { PrometheusMetricPath string EventLogServers []string KubeConfig string + KubeApiQPS int + KubeApiBurst int MaxTasks int MQAPI string NodeName string @@ -81,6 +83,8 @@ func (a *Worker) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&a.MysqlConnectionInfo, "mysql", "root:admin@tcp(127.0.0.1:3306)/region", "mysql db connection info") fs.StringSliceVar(&a.EventLogServers, "event-servers", []string{"127.0.0.1:6366"}, "event log server address. simple lb") fs.StringVar(&a.KubeConfig, "kube-config", "", "kubernetes api server config file") + fs.IntVar(&a.KubeApiQPS, "kube-api-qps", 50, "kube client qps") + fs.IntVar(&a.KubeApiBurst, "kube-api-burst", 10, "kube clint burst") fs.IntVar(&a.MaxTasks, "max-tasks", 50, "the max tasks for per node") fs.StringVar(&a.MQAPI, "mq-api", "127.0.0.1:6300", "acp_mq api") fs.StringVar(&a.RunMode, "run", "sync", "sync data when worker start") diff --git a/cmd/worker/server/server.go b/cmd/worker/server/server.go index 336d2d731..334ed67b9 100644 --- a/cmd/worker/server/server.go +++ b/cmd/worker/server/server.go @@ -40,6 +40,7 @@ import ( "github.com/goodrain/rainbond/worker/server" "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/flowcontrol" ) //Run start run @@ -76,6 +77,7 @@ func Run(s *option.Worker) error { logrus.Errorf("create kube rest config error: %s", err.Error()) return err } + restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(s.Config.KubeApiQPS), s.Config.KubeApiBurst) clientset, err := kubernetes.NewForConfig(restConfig) if err != nil { logrus.Errorf("create kube client error: %s", err.Error()) @@ -103,7 +105,8 @@ func Run(s *option.Worker) error { defer controllerManager.Stop() //step 5 : start runtime master - masterCon, err := master.NewMasterController(s.Config, cachestore) + + masterCon, err := master.NewMasterController(s.Config, restConfig, cachestore) if err != nil { return err } diff --git a/worker/master/master.go b/worker/master/master.go index cb3e577ef..abad7351f 100644 --- a/worker/master/master.go +++ b/worker/master/master.go @@ -24,11 +24,6 @@ import ( "strings" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" - - corev1 "k8s.io/api/core/v1" - "github.com/goodrain/rainbond/cmd/worker/option" "github.com/goodrain/rainbond/db" "github.com/goodrain/rainbond/db/model" @@ -39,6 +34,10 @@ import ( "github.com/goodrain/rainbond/worker/master/volumes/provider/lib/controller" "github.com/goodrain/rainbond/worker/master/volumes/statistical" "github.com/goodrain/rainbond/worker/master/volumes/sync" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" ) //Controller app runtime master controller @@ -59,18 +58,26 @@ type Controller struct { pc *controller.ProvisionController isLeader bool + kubeClient kubernetes.Interface + stopCh chan struct{} - podEventChs []chan *corev1.Pod podEvent *podevent.PodEvent volumeTypeEvent *sync.VolumeTypeEvent } //NewMasterController new master controller -func NewMasterController(conf option.Config, store store.Storer) (*Controller, error) { +func NewMasterController(conf option.Config, kubecfg *rest.Config, store store.Storer) (*Controller, error) { + kubecfg.RateLimiter = nil + kubeClient, err := kubernetes.NewForConfig(kubecfg) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + // The controller needs to know what the server version is because out-of-tree // provisioners aren't officially supported until 1.5 - serverVersion, err := conf.KubeClient.Discovery().ServerVersion() + serverVersion, err := kubeClient.Discovery().ServerVersion() if err != nil { logrus.Errorf("Error getting server version: %v", err) cancel() @@ -82,10 +89,10 @@ func NewMasterController(conf option.Config, store store.Storer) (*Controller, e //statefulset share controller rainbondssscProvisioner := provider.NewRainbondssscProvisioner() //statefulset local controller - rainbondsslcProvisioner := provider.NewRainbondsslcProvisioner(conf.KubeClient, store) + rainbondsslcProvisioner := provider.NewRainbondsslcProvisioner(kubeClient, store) // Start the provision controller which will dynamically provision hostPath // PVs - pc := controller.NewProvisionController(conf.KubeClient, &conf, map[string]controller.Provisioner{ + pc := controller.NewProvisionController(kubeClient, &conf, map[string]controller.Provisioner{ rainbondssscProvisioner.Name(): rainbondssscProvisioner, rainbondsslcProvisioner.Name(): rainbondsslcProvisioner, }, serverVersion.GitVersion) @@ -137,6 +144,7 @@ func NewMasterController(conf option.Config, store store.Storer) (*Controller, e diskCache: statistical.CreatDiskCache(ctx), podEvent: podevent.New(conf.KubeClient, stopCh), volumeTypeEvent: sync.New(stopCh), + kubeClient: kubeClient, }, nil } @@ -181,7 +189,7 @@ func (m *Controller) Start() error { } // Name of config map with leader election lock lockName := "rainbond-appruntime-worker-leader" - go leader.RunAsLeader(m.ctx, m.conf.KubeClient, m.conf.LeaderElectionNamespace, m.conf.LeaderElectionIdentity, lockName, start, func() {}) + go leader.RunAsLeader(m.ctx, m.kubeClient, m.conf.LeaderElectionNamespace, m.conf.LeaderElectionIdentity, lockName, start, func() {}) return nil }