Merge branch 'V5.2' into feature/k8s/mavendep

This commit is contained in:
黄润豪 2020-01-08 22:30:58 +08:00 committed by GitHub
commit bf2e03a2f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 178 additions and 68 deletions

View File

@ -23,8 +23,6 @@ import (
"os/signal"
"syscall"
"github.com/goodrain/rainbond/mq/client"
"github.com/goodrain/rainbond/builder/discover"
"github.com/goodrain/rainbond/builder/exector"
"github.com/goodrain/rainbond/builder/monitor"
@ -32,6 +30,7 @@ import (
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/config"
"github.com/goodrain/rainbond/event"
"github.com/goodrain/rainbond/mq/client"
"net/http"

View File

@ -67,7 +67,7 @@ type Conf struct {
K8SConfPath string //absolute path to the kubeconfig file
LogLevel string
LogFile string
HostIDFile string
HostID string
HostIP string
RunMode string //ACP_NODE 运行模式:master,node
NodeRule string //节点属性 compute manage storage
@ -138,7 +138,7 @@ func (a *Conf) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&a.LogFile, "log-file", "", "the log file path that log output")
fs.StringVar(&a.PrometheusAPI, "prometheus", "http://localhost:9999", "the prometheus server address")
fs.StringVar(&a.NodePath, "nodePath", "/rainbond/nodes", "the path of node in etcd")
fs.StringVar(&a.HostIDFile, "nodeid-file", "/opt/rainbond/etc/node/node_host_uuid.conf", "the unique ID for this node. Just specify, don't modify")
fs.StringVar(&a.HostID, "nodeid", "", "the unique ID for this node. Just specify, don't modify")
fs.StringVar(&a.HostIP, "hostIP", "", "the host ip you can define. default get ip from eth0")
fs.StringSliceVar(&a.EventLogServer, "event-log-server", []string{"127.0.0.1:6366"}, "host:port slice of event log server")
fs.StringVar(&a.ConfigStoragePath, "config-path", "/rainbond/acp_configs", "the path of config to store(new)")
@ -239,5 +239,8 @@ func (a *Conf) parse() error {
if a.APIAddr == "" {
a.APIAddr = ":6100"
}
if a.HostID == "" {
return fmt.Errorf("kubernetes node id can't empty")
}
return nil
}

View File

@ -46,7 +46,7 @@ type Config struct {
Listen string
HostIP string
ServerPort int
KubeClient *kubernetes.Clientset
KubeClient kubernetes.Interface
LeaderElectionNamespace string
LeaderElectionIdentity string
}

View File

@ -25,14 +25,14 @@ import (
"github.com/Sirupsen/logrus"
"github.com/eapache/channels"
kubeaggregatorclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
kubeaggregatorclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"github.com/goodrain/rainbond/cmd/worker/option"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/config"
"github.com/goodrain/rainbond/event"
k8sutil "github.com/goodrain/rainbond/util/k8s"
"github.com/goodrain/rainbond/worker/appm"
"github.com/goodrain/rainbond/worker/appm/controller"
"github.com/goodrain/rainbond/worker/appm/store"
@ -67,19 +67,15 @@ func Run(s *option.Worker) error {
defer event.CloseManager()
//step 2 : create kube client and etcd client
c, err := clientcmd.BuildConfigFromFlags("", s.Config.KubeConfig)
restConfig, err := k8sutil.NewRestConfig(s.Config.KubeConfig)
if err != nil {
logrus.Error("read kube config file error.", err)
return err
}
clientset, err := kubernetes.NewForConfig(c)
if err != nil {
logrus.Error("create kube api client error", err)
logrus.Errorf("create kube rest config error: %s", err.Error())
return err
}
clientset, _ := kubernetes.NewForConfig(restConfig)
s.Config.KubeClient = clientset
kubeaggregatorclientset, err := kubeaggregatorclientset.NewForConfig(c)
kubeaggregatorclientset, err := kubeaggregatorclientset.NewForConfig(restConfig)
if err != nil {
logrus.Error("kube aggregator; read kube config file error.", err)
return err

View File

@ -23,13 +23,12 @@ import (
"github.com/goodrain/rainbond/builder/sources"
"github.com/Sirupsen/logrus"
k8sutil "github.com/goodrain/rainbond/util/k8s"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
//K8SClient K8SClient
var K8SClient *kubernetes.Clientset
var K8SClient kubernetes.Interface
//InitClient init k8s client
func InitClient(kubeconfig string) error {
@ -38,16 +37,13 @@ func InitClient(kubeconfig string) error {
kubeconfig = path.Join(homePath, ".kube/config")
}
// use the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
config, err := k8sutil.NewRestConfig(kubeconfig)
if err != nil {
return err
}
config.QPS = 50
config.Burst = 100
K8SClient, err = kubernetes.NewForConfig(config)
if err != nil {
logrus.Error("Create kubernetes client error.", err.Error())
return err
}
K8SClient, _ = kubernetes.NewForConfig(config)
return nil
}

View File

@ -30,6 +30,7 @@ import (
"github.com/goodrain/rainbond/node/nodem/client"
"github.com/Sirupsen/logrus"
k8sutil "github.com/goodrain/rainbond/util/k8s"
v1 "k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -41,7 +42,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
const (
@ -73,16 +73,15 @@ type KubeClient interface {
//NewKubeClient NewKubeClient
func NewKubeClient(cfg *conf.Conf) (KubeClient, error) {
config, err := clientcmd.BuildConfigFromFlags("", cfg.K8SConfPath)
config, err := k8sutil.NewRestConfig(cfg.K8SConfPath)
if err != nil {
return nil, err
}
config.QPS = 50
config.Burst = 100
cli, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
cli, _ := kubernetes.NewForConfig(config)
stop := make(chan struct{})
sharedInformers := informers.NewFilteredSharedInformerFactory(cli, cfg.MinResyncPeriod, v1.NamespaceAll,
func(options *metav1.ListOptions) {
@ -102,7 +101,7 @@ func NewKubeClient(cfg *conf.Conf) (KubeClient, error) {
}
type kubeClient struct {
kubeclient *kubernetes.Clientset
kubeclient kubernetes.Interface
sharedInformers informers.SharedInformerFactory
stop chan struct{}
}

View File

@ -0,0 +1,46 @@
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package node
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func TestCluster_handleNodeStatus(t *testing.T) {
config, err := clientcmd.BuildConfigFromFlags("", "/Users/fanyangyang/Documents/company/goodrain/remote/192.168.2.200/admin.kubeconfig")
if err != nil {
return
}
cli, err := kubernetes.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
node, err := cli.CoreV1().Nodes().Get("192.168.2.200", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
t.Logf("node is :%+v", node)
t.Logf("cpu:%v", node.Status.Allocatable.Cpu().Value())
t.Logf("mem: %v", node.Status.Allocatable.Memory().Value())
}

View File

@ -4,7 +4,6 @@ import (
"testing"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

View File

@ -69,7 +69,6 @@ func NewNodeManager(conf *option.Conf) (*NodeManager, error) {
}
clm := logger.CreatContainerLogManage(conf)
controller := controller.NewManagerService(conf, healthyManager, cluster)
uid, err := util.ReadHostID(conf.HostIDFile)
if err != nil {
return nil, fmt.Errorf("Get host id error:%s", err.Error())
}
@ -95,7 +94,7 @@ func NewNodeManager(conf *option.Conf) (*NodeManager, error) {
healthy: healthyManager,
controller: controller,
clm: clm,
currentNode: &client.HostNode{ID: uid},
currentNode: &client.HostNode{ID: conf.HostID},
imageGCManager: imageGCManager,
}
return nodem, nil

View File

@ -1,6 +1,8 @@
package k8s
import (
"net"
"os"
"time"
"github.com/Sirupsen/logrus"
@ -10,6 +12,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/reference"
)
@ -29,6 +32,45 @@ func NewClientset(kubecfg string) (kubernetes.Interface, error) {
return clientset, nil
}
// NewClientsetOrDie new clientset or die
// used for who just wants a kubernetes clientset
func NewClientsetOrDie(kubecfg string) kubernetes.Interface {
restConfig, err := NewRestConfig(kubecfg)
if err != nil {
panic(err)
}
return kubernetes.NewForConfigOrDie(restConfig)
}
// NewRestConfig new rest config
func NewRestConfig(kubecfg string) (restConfig *rest.Config, err error) {
if kubecfg == "" {
return InClusterConfig()
}
return clientcmd.BuildConfigFromFlags("", kubecfg)
}
// InClusterConfig in cluster config
func InClusterConfig() (*rest.Config, error) {
// Work around https://github.com/kubernetes/kubernetes/issues/40973
// See https://github.com/coreos/etcd-operator/issues/731#issuecomment-283804819
if len(os.Getenv("KUBERNETES_SERVICE_HOST")) == 0 {
addrs, err := net.LookupHost("kubernetes.default.svc")
if err != nil {
panic(err)
}
os.Setenv("KUBERNETES_SERVICE_HOST", addrs[0])
}
if len(os.Getenv("KUBERNETES_SERVICE_PORT")) == 0 {
os.Setenv("KUBERNETES_SERVICE_PORT", "443")
}
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
return cfg, nil
}
// NewRainbondFilteredSharedInformerFactory -
func NewRainbondFilteredSharedInformerFactory(clientset kubernetes.Interface) informers.SharedInformerFactory {
return informers.NewFilteredSharedInformerFactory(

24
util/k8s/k8s_test.go Normal file
View File

@ -0,0 +1,24 @@
package k8s
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"testing"
)
func TestNewRestConfig(t *testing.T) {
os.Setenv("KUBERNETES_SERVICE_HOST", "192.168.2.200")
restConfig, err := NewRestConfig("")
if err != nil {
t.Fatal("create restconfig error: ", err.Error())
}
clientset, err := NewClientsetWithRestConfig(restConfig)
if err != nil {
t.Fatal("create clientset error: ", err.Error())
}
pod, err := clientset.CoreV1().Pods("5d7bd886e6dc4425bb6c2ac5fc9fa593").Get("gr2e4b9f-0", metav1.GetOptions{})
if err != nil {
t.Fatal("get pod info error: ", err.Error())
}
t.Logf("pod info : %+v", pod)
}

View File

@ -42,7 +42,7 @@ const (
)
// RunAsLeader starts this particular external attacher after becoming a leader.
func RunAsLeader(ctx context.Context, clientset *kubernetes.Clientset, namespace string, identity string, lockName string, startFunc func(ctx context.Context), stopFunc func()) {
func RunAsLeader(ctx context.Context, clientset kubernetes.Interface, namespace string, identity string, lockName string, startFunc func(ctx context.Context), stopFunc func()) {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: clientset.CoreV1().Events(namespace)})
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s %s", lockName, string(identity))})

View File

@ -29,7 +29,7 @@ import (
)
// NewAPPMController creates a new appm controller.
func NewAPPMController(clientset *kubernetes.Clientset,
func NewAPPMController(clientset kubernetes.Interface,
store store.Storer,
startCh *channels.RingChannel,
updateCh *channels.RingChannel,

View File

@ -25,7 +25,7 @@ import (
"github.com/goodrain/rainbond/util"
"github.com/goodrain/rainbond/worker/appm/store"
"github.com/goodrain/rainbond/worker/appm/types/v1"
v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
"k8s.io/client-go/kubernetes"
)
@ -66,14 +66,14 @@ var TypeControllerRefreshHPA TypeController = "refreshhpa"
type Manager struct {
ctx context.Context
cancel context.CancelFunc
client *kubernetes.Clientset
client kubernetes.Interface
controllers map[string]Controller
store store.Storer
lock sync.Mutex
}
//NewManager new manager
func NewManager(store store.Storer, client *kubernetes.Clientset) *Manager {
func NewManager(store store.Storer, client kubernetes.Interface) *Manager {
ctx, cancel := context.WithCancel(context.Background())
return &Manager{
ctx: ctx,

View File

@ -25,7 +25,7 @@ import (
)
//CreateKubeService create kube service
func CreateKubeService(client *kubernetes.Clientset, namespace string, services ...*corev1.Service) error {
func CreateKubeService(client kubernetes.Interface, namespace string, services ...*corev1.Service) error {
var retryService []*corev1.Service
for i := range services {
createService := services[i]

View File

@ -26,7 +26,7 @@ import (
"k8s.io/client-go/kubernetes"
"github.com/goodrain/rainbond/worker/appm/f"
"github.com/goodrain/rainbond/worker/appm/types/v1"
v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
)
type refreshXPAController struct {
@ -52,7 +52,7 @@ func (a *refreshXPAController) Begin() {
a.manager.callback(a.controllerID, nil)
}
func (a *refreshXPAController) applyOne(clientset *kubernetes.Clientset, app *v1.AppService) error {
func (a *refreshXPAController) applyOne(clientset kubernetes.Interface, app *v1.AppService) error {
for _, hpa := range app.GetHPAs() {
f.EnsureHPA(hpa, clientset)
}

View File

@ -33,7 +33,7 @@ import (
)
// ApplyOne applies one rule.
func ApplyOne(clientset *kubernetes.Clientset, app *v1.AppService) error {
func ApplyOne(clientset kubernetes.Interface, app *v1.AppService) error {
_, err := clientset.CoreV1().Namespaces().Get(app.TenantID, metav1.GetOptions{})
if err != nil {
if k8sErrors.IsNotFound(err) {
@ -191,7 +191,7 @@ func EnsureHPA(new *v2beta1.HorizontalPodAutoscaler, clientSet kubernetes.Interf
}
// UpgradeIngress is used to update *extensions.Ingress.
func UpgradeIngress(clientset *kubernetes.Clientset,
func UpgradeIngress(clientset kubernetes.Interface,
as *v1.AppService,
old, new []*extensions.Ingress,
handleErr func(msg string, err error) error) error {
@ -245,7 +245,7 @@ func UpgradeIngress(clientset *kubernetes.Clientset,
}
// UpgradeSecrets is used to update *corev1.Secret.
func UpgradeSecrets(clientset *kubernetes.Clientset,
func UpgradeSecrets(clientset kubernetes.Interface,
as *v1.AppService, old, new []*corev1.Secret,
handleErr func(msg string, err error) error) error {
var oldMap = make(map[string]*corev1.Secret, len(old))
@ -363,7 +363,7 @@ func UpgradeClaims(clientset *kubernetes.Clientset, as *v1.AppService, old, new
}
// UpgradeEndpoints is used to update *corev1.Endpoints.
func UpgradeEndpoints(clientset *kubernetes.Clientset,
func UpgradeEndpoints(clientset kubernetes.Interface,
as *v1.AppService, old, new []*corev1.Endpoints,
handleErr func(msg string, err error) error) error {
var oldMap = make(map[string]*corev1.Endpoints, len(old))

View File

@ -106,7 +106,7 @@ type ProbeInfo struct {
//appRuntimeStore app runtime store
//cache all kubernetes object and appservice
type appRuntimeStore struct {
clientset *kubernetes.Clientset
clientset kubernetes.Interface
ctx context.Context
cancel context.CancelFunc
informers *Informer
@ -122,7 +122,7 @@ type appRuntimeStore struct {
}
//NewStore new app runtime store
func NewStore(clientset *kubernetes.Clientset,
func NewStore(clientset kubernetes.Interface,
dbmanager db.Manager,
conf option.Config,
startCh *channels.RingChannel,

View File

@ -79,8 +79,14 @@ func TestAppRuntimeStore_GetTenantResource(t *testing.T) {
t.Logf("%+v", resource)
}
func TestGetStorageClass(t *testing.T) {
getStoreForTest(t)
func TestStorer(t *testing.T) {
storer := getStoreForTest(t, nil)
lister := storer.GetPodLister()
pod, err := lister.Pods("5d7bd886e6dc4425bb6c2ac5fc9fa593").Get("122f02921da549731888a31e052e4b9f-deployment-6974f46fc6-xz29n")
if err != nil {
t.Fatal(err)
}
t.Logf("pod is %+v", pod)
}
@ -159,16 +165,17 @@ func TestListHPAEvents(t *testing.T) {
}
}
func getStoreForTest(t *testing.T) Storer {
ocfg := option.Config{
DBType: "mysql",
MysqlConnectionInfo: "oc6Poh:noot6Mea@tcp(192.168.2.203:3306)/region",
EtcdEndPoints: []string{"http://192.168.2.203:2379"},
EtcdTimeout: 5,
KubeConfig: "/Users/fanyangyang/Documents/company/goodrain/admin.kubeconfig",
LeaderElectionNamespace: "rainbond",
func getStoreForTest(t *testing.T, ocfg *option.Config) Storer {
if ocfg == nil {
ocfg = &option.Config{
DBType: "mysql",
MysqlConnectionInfo: "EeM2oc:lee7OhQu@tcp(192.168.2.203:3306)/region",
EtcdEndPoints: []string{"http://192.168.2.203:2379"},
EtcdTimeout: 5,
KubeConfig: "/Users/fanyangyang/Documents/company/goodrain/admin.kubeconfig",
LeaderElectionNamespace: "rainbond",
}
}
dbconfig := config.Config{
DBType: ocfg.DBType,
MysqlConnectionInfo: ocfg.MysqlConnectionInfo,
@ -199,15 +206,15 @@ func getStoreForTest(t *testing.T) Storer {
}
func TestPatch(t *testing.T) {
ocfg := option.Config{
ocfg := &option.Config{
DBType: "mysql",
MysqlConnectionInfo: "oc6Poh:noot6Mea@tcp(192.168.2.203:3306)/region",
MysqlConnectionInfo: "EeM2oc:lee7OhQu@tcp(192.168.2.203:3306)/region",
EtcdEndPoints: []string{"http://192.168.2.203:2379"},
EtcdTimeout: 5,
KubeConfig: "/Users/fanyangyang/Documents/company/goodrain/admin.kubeconfig",
LeaderElectionNamespace: "rainbond",
}
storer := getStoreForTest(t)
storer := getStoreForTest(t, ocfg)
c, err := clientcmd.BuildConfigFromFlags("", ocfg.KubeConfig)
if err != nil {
t.Fatalf("read kube config file error: %v", err)
@ -216,9 +223,9 @@ func TestPatch(t *testing.T) {
if err != nil {
t.Fatalf("create kube api client error: %v", err)
}
app := storer.GetAppService("f55f7e28ec441ce5765998259756cc09")
app := storer.GetAppService("122f02921da549731888a31e052e4b9f")
if app == nil {
t.Fatal("app is niil")
t.Fatal("app is nil")
}
statefulset := app.GetStatefulSet()
if statefulset == nil {

View File

@ -45,7 +45,7 @@ type ThirdPartier interface {
}
// NewThirdPartier creates a new ThirdPartier.
func NewThirdPartier(clientset *kubernetes.Clientset,
func NewThirdPartier(clientset kubernetes.Interface,
store store.Storer,
startCh *channels.RingChannel,
updateCh *channels.RingChannel,
@ -63,7 +63,7 @@ func NewThirdPartier(clientset *kubernetes.Clientset,
}
type thirdparty struct {
clientset *kubernetes.Clientset
clientset kubernetes.Interface
store store.Storer
// a collection of stop channel for every service.

View File

@ -44,12 +44,12 @@ import (
type rainbondsslcProvisioner struct {
// The directory to create PV-backing directories in
name string
kubecli *kubernetes.Clientset
kubecli kubernetes.Interface
store store.Storer
}
// NewRainbondsslcProvisioner creates a new Rainbond statefulset share volume provisioner
func NewRainbondsslcProvisioner(kubecli *kubernetes.Clientset, store store.Storer) controller.Provisioner {
func NewRainbondsslcProvisioner(kubecli kubernetes.Interface, store store.Storer) controller.Provisioner {
return &rainbondsslcProvisioner{
name: "rainbond.io/provisioner-sslc",
kubecli: kubecli,