rate limit for kube client

This commit is contained in:
GLYASAI 2021-05-18 14:53:21 +08:00
parent 24fb3c96d1
commit 61afc2387f
3 changed files with 27 additions and 12 deletions

View File

@ -41,6 +41,8 @@ type Config struct {
PrometheusMetricPath string PrometheusMetricPath string
EventLogServers []string EventLogServers []string
KubeConfig string KubeConfig string
KubeApiQPS int
KubeApiBurst int
MaxTasks int MaxTasks int
MQAPI string MQAPI string
NodeName string NodeName string
@ -81,6 +83,8 @@ func (a *Worker) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&a.MysqlConnectionInfo, "mysql", "root:admin@tcp(127.0.0.1:3306)/region", "mysql db connection info") fs.StringVar(&a.MysqlConnectionInfo, "mysql", "root:admin@tcp(127.0.0.1:3306)/region", "mysql db connection info")
fs.StringSliceVar(&a.EventLogServers, "event-servers", []string{"127.0.0.1:6366"}, "event log server address. simple lb") fs.StringSliceVar(&a.EventLogServers, "event-servers", []string{"127.0.0.1:6366"}, "event log server address. simple lb")
fs.StringVar(&a.KubeConfig, "kube-config", "", "kubernetes api server config file") fs.StringVar(&a.KubeConfig, "kube-config", "", "kubernetes api server config file")
fs.IntVar(&a.KubeApiQPS, "kube-api-qps", 50, "kube client qps")
fs.IntVar(&a.KubeApiBurst, "kube-api-burst", 10, "kube clint burst")
fs.IntVar(&a.MaxTasks, "max-tasks", 50, "the max tasks for per node") fs.IntVar(&a.MaxTasks, "max-tasks", 50, "the max tasks for per node")
fs.StringVar(&a.MQAPI, "mq-api", "127.0.0.1:6300", "acp_mq api") fs.StringVar(&a.MQAPI, "mq-api", "127.0.0.1:6300", "acp_mq api")
fs.StringVar(&a.RunMode, "run", "sync", "sync data when worker start") fs.StringVar(&a.RunMode, "run", "sync", "sync data when worker start")

View File

@ -40,6 +40,7 @@ import (
"github.com/goodrain/rainbond/worker/server" "github.com/goodrain/rainbond/worker/server"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/flowcontrol"
) )
//Run start run //Run start run
@ -76,6 +77,7 @@ func Run(s *option.Worker) error {
logrus.Errorf("create kube rest config error: %s", err.Error()) logrus.Errorf("create kube rest config error: %s", err.Error())
return err return err
} }
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(s.Config.KubeApiQPS), s.Config.KubeApiBurst)
clientset, err := kubernetes.NewForConfig(restConfig) clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil { if err != nil {
logrus.Errorf("create kube client error: %s", err.Error()) logrus.Errorf("create kube client error: %s", err.Error())
@ -103,7 +105,8 @@ func Run(s *option.Worker) error {
defer controllerManager.Stop() defer controllerManager.Stop()
//step 5 : start runtime master //step 5 : start runtime master
masterCon, err := master.NewMasterController(s.Config, cachestore)
masterCon, err := master.NewMasterController(s.Config, restConfig, cachestore)
if err != nil { if err != nil {
return err return err
} }

View File

@ -24,11 +24,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"github.com/goodrain/rainbond/cmd/worker/option" "github.com/goodrain/rainbond/cmd/worker/option"
"github.com/goodrain/rainbond/db" "github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model" "github.com/goodrain/rainbond/db/model"
@ -39,6 +34,10 @@ import (
"github.com/goodrain/rainbond/worker/master/volumes/provider/lib/controller" "github.com/goodrain/rainbond/worker/master/volumes/provider/lib/controller"
"github.com/goodrain/rainbond/worker/master/volumes/statistical" "github.com/goodrain/rainbond/worker/master/volumes/statistical"
"github.com/goodrain/rainbond/worker/master/volumes/sync" "github.com/goodrain/rainbond/worker/master/volumes/sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
) )
//Controller app runtime master controller //Controller app runtime master controller
@ -59,18 +58,26 @@ type Controller struct {
pc *controller.ProvisionController pc *controller.ProvisionController
isLeader bool isLeader bool
kubeClient kubernetes.Interface
stopCh chan struct{} stopCh chan struct{}
podEventChs []chan *corev1.Pod
podEvent *podevent.PodEvent podEvent *podevent.PodEvent
volumeTypeEvent *sync.VolumeTypeEvent volumeTypeEvent *sync.VolumeTypeEvent
} }
//NewMasterController new master controller //NewMasterController new master controller
func NewMasterController(conf option.Config, store store.Storer) (*Controller, error) { func NewMasterController(conf option.Config, kubecfg *rest.Config, store store.Storer) (*Controller, error) {
kubecfg.RateLimiter = nil
kubeClient, err := kubernetes.NewForConfig(kubecfg)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// The controller needs to know what the server version is because out-of-tree // The controller needs to know what the server version is because out-of-tree
// provisioners aren't officially supported until 1.5 // provisioners aren't officially supported until 1.5
serverVersion, err := conf.KubeClient.Discovery().ServerVersion() serverVersion, err := kubeClient.Discovery().ServerVersion()
if err != nil { if err != nil {
logrus.Errorf("Error getting server version: %v", err) logrus.Errorf("Error getting server version: %v", err)
cancel() cancel()
@ -82,10 +89,10 @@ func NewMasterController(conf option.Config, store store.Storer) (*Controller, e
//statefulset share controller //statefulset share controller
rainbondssscProvisioner := provider.NewRainbondssscProvisioner() rainbondssscProvisioner := provider.NewRainbondssscProvisioner()
//statefulset local controller //statefulset local controller
rainbondsslcProvisioner := provider.NewRainbondsslcProvisioner(conf.KubeClient, store) rainbondsslcProvisioner := provider.NewRainbondsslcProvisioner(kubeClient, store)
// Start the provision controller which will dynamically provision hostPath // Start the provision controller which will dynamically provision hostPath
// PVs // PVs
pc := controller.NewProvisionController(conf.KubeClient, &conf, map[string]controller.Provisioner{ pc := controller.NewProvisionController(kubeClient, &conf, map[string]controller.Provisioner{
rainbondssscProvisioner.Name(): rainbondssscProvisioner, rainbondssscProvisioner.Name(): rainbondssscProvisioner,
rainbondsslcProvisioner.Name(): rainbondsslcProvisioner, rainbondsslcProvisioner.Name(): rainbondsslcProvisioner,
}, serverVersion.GitVersion) }, serverVersion.GitVersion)
@ -137,6 +144,7 @@ func NewMasterController(conf option.Config, store store.Storer) (*Controller, e
diskCache: statistical.CreatDiskCache(ctx), diskCache: statistical.CreatDiskCache(ctx),
podEvent: podevent.New(conf.KubeClient, stopCh), podEvent: podevent.New(conf.KubeClient, stopCh),
volumeTypeEvent: sync.New(stopCh), volumeTypeEvent: sync.New(stopCh),
kubeClient: kubeClient,
}, nil }, nil
} }
@ -181,7 +189,7 @@ func (m *Controller) Start() error {
} }
// Name of config map with leader election lock // Name of config map with leader election lock
lockName := "rainbond-appruntime-worker-leader" lockName := "rainbond-appruntime-worker-leader"
go leader.RunAsLeader(m.ctx, m.conf.KubeClient, m.conf.LeaderElectionNamespace, m.conf.LeaderElectionIdentity, lockName, start, func() {}) go leader.RunAsLeader(m.ctx, m.kubeClient, m.conf.LeaderElectionNamespace, m.conf.LeaderElectionIdentity, lockName, start, func() {})
return nil return nil
} }