mirror of
https://gitee.com/rainbond/Rainbond.git
synced 2024-12-02 03:37:46 +08:00
clientset with service acount
This commit is contained in:
parent
66672a409a
commit
fa562842e5
@ -93,7 +93,7 @@ type ListenPorts struct {
|
||||
// AddFlags adds flags
|
||||
func (g *GWServer) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.StringVar(&g.LogLevel, "log-level", "debug", "the gateway log level")
|
||||
fs.StringVar(&g.K8SConfPath, "kube-conf", "/opt/rainbond/etc/kubernetes/kubecfg/admin.kubeconfig", "absolute path to the kubeconfig file")
|
||||
fs.StringVar(&g.K8SConfPath, "kube-conf", "", "absolute path to the kubeconfig file")
|
||||
fs.IntVar(&g.ListenPorts.Status, "status-port", 18080, `Port to use for exposing NGINX status pages.`)
|
||||
fs.IntVar(&g.WorkerProcesses, "worker-processes", 0, "Default get current compute cpu core number.This number should be, at maximum, the number of CPU cores on your system.")
|
||||
fs.IntVar(&g.WorkerConnections, "worker-connections", 4000, "Determines how many clients will be served by each worker process.")
|
||||
@ -146,12 +146,6 @@ func (g *GWServer) SetLog() {
|
||||
|
||||
//CheckConfig check config
|
||||
func (g *GWServer) CheckConfig() error {
|
||||
if g.K8SConfPath == "" {
|
||||
return fmt.Errorf("kube config file path can not be empty")
|
||||
}
|
||||
if exist, _ := util.FileExists(g.K8SConfPath); !exist {
|
||||
return fmt.Errorf("kube config file %s not exist", g.K8SConfPath)
|
||||
}
|
||||
if g.NodeName == "" {
|
||||
g.NodeName, _ = os.Hostname()
|
||||
}
|
||||
|
@ -27,22 +27,20 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/goodrain/rainbond/gateway/cluster"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/go-chi/chi"
|
||||
|
||||
"github.com/goodrain/rainbond/util"
|
||||
|
||||
"github.com/goodrain/rainbond/discover"
|
||||
|
||||
"github.com/goodrain/rainbond/gateway/metric"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/goodrain/rainbond/cmd/gateway/option"
|
||||
"github.com/goodrain/rainbond/gateway/controller"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
"github.com/goodrain/rainbond/cmd/gateway/option"
|
||||
"github.com/goodrain/rainbond/discover"
|
||||
"github.com/goodrain/rainbond/gateway/cluster"
|
||||
"github.com/goodrain/rainbond/gateway/controller"
|
||||
"github.com/goodrain/rainbond/gateway/metric"
|
||||
"github.com/goodrain/rainbond/util"
|
||||
k8sutil "github.com/goodrain/rainbond/util/k8s"
|
||||
)
|
||||
|
||||
//Run start run
|
||||
@ -51,6 +49,14 @@ func Run(s *option.GWServer) error {
|
||||
errCh := make(chan error, 1)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var clientset kubernetes.Interface
|
||||
if s.Config.K8SConfPath == "" {
|
||||
clientset = k8sutil.MustNewKubeClient()
|
||||
} else {
|
||||
k8sutil.NewClientset(s.K8SConfPath)
|
||||
}
|
||||
|
||||
//create cluster node manage
|
||||
node, err := cluster.CreateNodeManager(s.Config)
|
||||
if err != nil {
|
||||
@ -67,7 +73,8 @@ func Run(s *option.GWServer) error {
|
||||
}
|
||||
}
|
||||
mc.Start()
|
||||
gwc, err := controller.NewGWController(ctx, &s.Config, mc, node)
|
||||
|
||||
gwc, err := controller.NewGWController(ctx, clientset, &s.Config, mc, node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
|
||||
"github.com/goodrain/rainbond/gateway/cluster"
|
||||
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"github.com/Sirupsen/logrus"
|
||||
client "github.com/coreos/etcd/clientv3"
|
||||
"github.com/eapache/channels"
|
||||
@ -217,7 +218,7 @@ func (gwc *GWController) getDelUpdPools(updPools []*v1.Pool) ([]*v1.Pool, []*v1.
|
||||
}
|
||||
|
||||
//NewGWController new Gateway controller
|
||||
func NewGWController(ctx context.Context, cfg *option.Config, mc metric.Collector, node *cluster.NodeManager) (*GWController, error) {
|
||||
func NewGWController(ctx context.Context, clientset kubernetes.Interface, cfg *option.Config, mc metric.Collector, node *cluster.NodeManager) (*GWController, error) {
|
||||
gwc := &GWController{
|
||||
updateCh: channels.NewRingChannel(1024),
|
||||
stopLock: &sync.Mutex{},
|
||||
@ -239,13 +240,9 @@ func NewGWController(ctx context.Context, cfg *option.Config, mc metric.Collecto
|
||||
gwc.EtcdCli = cli
|
||||
}
|
||||
gwc.GWS = openresty.CreateOpenrestyService(cfg, &gwc.isShuttingDown)
|
||||
clientSet, err := NewClientSet(cfg.K8SConfPath)
|
||||
if err != nil {
|
||||
logrus.Error("can't create kubernetes's client.")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
gwc.store = store.New(
|
||||
clientSet,
|
||||
clientset,
|
||||
gwc.updateCh,
|
||||
cfg, node)
|
||||
gwc.syncQueue = task.NewTaskQueue(gwc.syncGateway)
|
||||
|
@ -171,7 +171,7 @@ func New(client kubernetes.Interface,
|
||||
// create informers factory, enable and assign required informers
|
||||
store.sharedInformer = informers.NewFilteredSharedInformerFactory(client, conf.ResyncPeriod, corev1.NamespaceAll,
|
||||
func(options *metav1.ListOptions) {
|
||||
options.LabelSelector = "creater=Rainbond"
|
||||
// options.LabelSelector = "creater=Rainbond"
|
||||
})
|
||||
|
||||
store.informers.Ingress = store.sharedInformer.Extensions().V1beta1().Ingresses().Informer()
|
||||
|
@ -1,6 +1,8 @@
|
||||
package k8s
|
||||
|
||||
import (
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
@ -10,6 +12,7 @@ import (
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/client-go/tools/reference"
|
||||
)
|
||||
@ -29,6 +32,34 @@ func NewClientset(kubecfg string) (kubernetes.Interface, error) {
|
||||
return clientset, nil
|
||||
}
|
||||
|
||||
func MustNewKubeClient() kubernetes.Interface {
|
||||
cfg, err := InClusterConfig()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return kubernetes.NewForConfigOrDie(cfg)
|
||||
}
|
||||
|
||||
func InClusterConfig() (*rest.Config, error) {
|
||||
// Work around https://github.com/kubernetes/kubernetes/issues/40973
|
||||
// See https://github.com/coreos/etcd-operator/issues/731#issuecomment-283804819
|
||||
if len(os.Getenv("KUBERNETES_SERVICE_HOST")) == 0 {
|
||||
addrs, err := net.LookupHost("kubernetes.default.svc")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
os.Setenv("KUBERNETES_SERVICE_HOST", addrs[0])
|
||||
}
|
||||
if len(os.Getenv("KUBERNETES_SERVICE_PORT")) == 0 {
|
||||
os.Setenv("KUBERNETES_SERVICE_PORT", "443")
|
||||
}
|
||||
cfg, err := rest.InClusterConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// NewRainbondFilteredSharedInformerFactory -
|
||||
func NewRainbondFilteredSharedInformerFactory(clientset kubernetes.Interface) informers.SharedInformerFactory {
|
||||
return informers.NewFilteredSharedInformerFactory(
|
||||
|
@ -190,6 +190,7 @@ func NewStore(clientset *kubernetes.Clientset,
|
||||
epEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
ep := obj.(*corev1.Endpoints)
|
||||
|
||||
serviceID := ep.Labels["service_id"]
|
||||
version := ep.Labels["version"]
|
||||
createrID := ep.Labels["creater_id"]
|
||||
|
Loading…
Reference in New Issue
Block a user