delete metrics-server installation

This commit is contained in:
glyasai 2020-02-12 05:47:57 +08:00
parent 72f4dfd16c
commit 73bd7e6fb6
4 changed files with 6 additions and 413 deletions

View File

@ -25,9 +25,6 @@ import (
"github.com/Sirupsen/logrus"
"github.com/eapache/channels"
"k8s.io/client-go/kubernetes"
kubeaggregatorclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"github.com/goodrain/rainbond/cmd/worker/option"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/config"
@ -42,6 +39,7 @@ import (
"github.com/goodrain/rainbond/worker/master"
"github.com/goodrain/rainbond/worker/monitor"
"github.com/goodrain/rainbond/worker/server"
"k8s.io/client-go/kubernetes"
)
//Run start run
@ -85,12 +83,6 @@ func Run(s *option.Worker) error {
}
s.Config.KubeClient = clientset
kubeaggregatorclientset, err := kubeaggregatorclientset.NewForConfig(restConfig)
if err != nil {
logrus.Error("kube aggregator; read kube config file error.", err)
return err
}
//step 3: create resource store
startCh := channels.NewRingChannel(1024)
updateCh := channels.NewRingChannel(1024)
@ -111,7 +103,7 @@ func Run(s *option.Worker) error {
defer controllerManager.Stop()
//step 5 : start runtime master
masterCon, err := master.NewMasterController(s.Config, cachestore, kubeaggregatorclientset)
masterCon, err := master.NewMasterController(s.Config, cachestore)
if err != nil {
return err
}

View File

@ -24,18 +24,15 @@ import (
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
kubeaggregatorclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/cmd/worker/option"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model"
etcdutil "github.com/goodrain/rainbond/util/etcd"
"github.com/goodrain/rainbond/util/leader"
"github.com/goodrain/rainbond/worker/appm/store"
"github.com/goodrain/rainbond/worker/master/metricsserv"
"github.com/goodrain/rainbond/worker/master/podevent"
"github.com/goodrain/rainbond/worker/master/volumes/provider"
"github.com/goodrain/rainbond/worker/master/volumes/provider/lib/controller"
@ -58,12 +55,10 @@ type Controller struct {
stopCh chan struct{}
podEventChs []chan *corev1.Pod
podEvent *podevent.PodEvent
metricsServerManager *metricsserv.MetricsServiceManager
}
//NewMasterController new master controller
func NewMasterController(conf option.Config, store store.Storer, kubeaggregatorclientset kubeaggregatorclientset.Interface) (*Controller, error) {
func NewMasterController(conf option.Config, store store.Storer) (*Controller, error) {
ctx, cancel := context.WithCancel(context.Background())
// The controller needs to know what the server version is because out-of-tree
// provisioners aren't officially supported until 1.5
@ -87,19 +82,6 @@ func NewMasterController(conf option.Config, store store.Storer, kubeaggregatorc
rainbondsslcProvisioner.Name(): rainbondsslcProvisioner,
}, serverVersion.GitVersion)
stopCh := make(chan struct{})
etcdClientArgs := &etcdutil.ClientArgs{
Endpoints: conf.EtcdEndPoints,
CaFile: conf.EtcdCaFile,
CertFile: conf.EtcdCertFile,
KeyFile: conf.EtcdKeyFile,
AutoSyncInterval: time.Second * 30,
DialTimeout: time.Second * 10,
}
clientv3, err := etcdutil.NewClient(ctx, etcdClientArgs)
if err != nil {
cancel()
return nil, err
}
return &Controller{
conf: conf,
@ -119,9 +101,8 @@ func NewMasterController(conf option.Config, store store.Storer, kubeaggregatorc
Name: "appfs",
Help: "tenant service fs used.",
}, []string{"tenant_id", "service_id", "volume_type"}),
diskCache: statistical.CreatDiskCache(ctx),
podEvent: podevent.New(conf.KubeClient, stopCh),
metricsServerManager: metricsserv.New(conf.KubeClient, kubeaggregatorclientset, clientv3),
diskCache: statistical.CreatDiskCache(ctx),
podEvent: podevent.New(conf.KubeClient, stopCh),
}, nil
}
@ -159,9 +140,6 @@ func (m *Controller) Start() error {
if m.conf.LeaderElectionIdentity == "" {
return fmt.Errorf("-leader-election-identity must not be empty")
}
if err := m.metricsServerManager.Start(); err != nil {
return fmt.Errorf("start metrics-server manager: %v", err)
}
// Name of config map with leader election lock
lockName := "rainbond-appruntime-worker-leader"
go leader.RunAsLeader(m.ctx, m.conf.KubeClient, m.conf.LeaderElectionNamespace, m.conf.LeaderElectionIdentity, lockName, start, func() {})

View File

@ -1,97 +0,0 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package metricsserv
import (
"testing"
"time"
"github.com/coreos/etcd/clientv3"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
kubeaggregatorclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
)
func TestNewMetricsServerAPIServer(t *testing.T) {
c, err := clientcmd.BuildConfigFromFlags("", "/opt/rainbond/etc/kubernetes/kubecfg/admin.kubeconfig")
if err != nil {
t.Fatal(err)
}
kubeaggregatorclientset, err := kubeaggregatorclientset.NewForConfig(c)
if err != nil {
t.Fatal(err)
}
metricsServiceManager := New(nil, kubeaggregatorclientset, nil)
if err := metricsServiceManager.newMetricsServerAPIService(); err != nil {
t.Fatal(err)
}
}
func TestStart(t *testing.T) {
clientv3, err := clientv3.New(clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
AutoSyncInterval: time.Second * 30,
DialTimeout: time.Second * 10,
})
if err != nil {
t.Fatal(err)
}
c, err := clientcmd.BuildConfigFromFlags("", "../../../test/admin.kubeconfig")
if err != nil {
t.Fatal(err)
}
clientset, err := kubernetes.NewForConfig(c)
if err != nil {
t.Fatal(err)
}
kubeaggregatorclientset, err := kubeaggregatorclientset.NewForConfig(c)
if err != nil {
t.Fatal(err)
}
metricsServiceManager := New(clientset, kubeaggregatorclientset, clientv3)
if err := metricsServiceManager.Start(); err != nil {
t.Fatal(err)
}
select {}
}
func TestListMetricsServiceEndpoints(t *testing.T) {
clientv3, err := clientv3.New(clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
AutoSyncInterval: time.Second * 30,
DialTimeout: time.Second * 10,
})
if err != nil {
t.Fatal(err)
}
metricsServiceManager := New(nil, nil, clientv3)
e, err := metricsServiceManager.listMetricsServiceEndpoints()
if err != nil {
t.Fatal(err)
}
t.Log(e)
}

View File

@ -1,280 +0,0 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package metricsserv
import (
"context"
"fmt"
"strconv"
"strings"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/pquerna/ffjson/ffjson"
"k8s.io/apimachinery/pkg/util/intstr"
"github.com/Sirupsen/logrus"
"github.com/coreos/etcd/clientv3"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
kubeaggregatorclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
)
// MetricsServiceManager -
type MetricsServiceManager struct {
clientset kubernetes.Interface
apiregistrationClientset kubeaggregatorclientset.Interface
clientv3 *clientv3.Client
stopCh chan struct{}
}
type metricsServerEndpoint struct {
Address string
Port int
}
//New new
func New(clientset kubernetes.Interface, apiregistrationClientset kubeaggregatorclientset.Interface, clientv3 *clientv3.Client) *MetricsServiceManager {
msm := &MetricsServiceManager{
clientset: clientset,
apiregistrationClientset: apiregistrationClientset,
clientv3: clientv3,
}
return msm
}
//Start start
func (m *MetricsServiceManager) Start() error {
if err := m.newMetricsServerAPIService(); err != nil {
return err
}
if err := m.newMetricsServerService(); err != nil {
return err
}
if err := m.newMetricsServiceEndpoints(); err != nil {
return err
}
return nil
}
func (m *MetricsServiceManager) newMetricsServerAPIService() error {
apiService := &v1beta1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: "v1beta1.metrics.k8s.io",
},
Spec: v1beta1.APIServiceSpec{
Service: &v1beta1.ServiceReference{
Name: "metrics-server",
Namespace: "kube-system",
},
Group: "metrics.k8s.io",
Version: "v1beta1",
InsecureSkipTLSVerify: true,
GroupPriorityMinimum: 100,
VersionPriority: 30,
},
}
old, err := m.apiregistrationClientset.ApiregistrationV1beta1().APIServices().Get(apiService.GetName(), metav1.GetOptions{})
if err != nil {
if k8sErrors.IsNotFound(err) {
logrus.Infof("api service(%s) not found, create one.", apiService.GetName())
_, err = m.apiregistrationClientset.ApiregistrationV1beta1().APIServices().Create(apiService)
if err != nil {
return fmt.Errorf("create new api service: %v", err)
}
return nil
}
return fmt.Errorf("retrieve api service: %v", err)
}
logrus.Infof("an old api service(%s) has been found, update it.", apiService.GetName())
apiService.ResourceVersion = old.ResourceVersion
if _, err := m.apiregistrationClientset.ApiregistrationV1beta1().APIServices().Update(apiService); err != nil {
return fmt.Errorf("update api service: %v", err)
}
return nil
}
func (m *MetricsServiceManager) newMetricsServerService() error {
new := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "metrics-server",
Namespace: "kube-system",
Labels: map[string]string{
"kubernetes.io/name": "Metrics-server",
"kubernetes.io/cluster-service": "true",
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Port: 443,
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromString("main-port"),
},
},
},
Status: corev1.ServiceStatus{},
}
old, err := m.clientset.CoreV1().Services(new.Namespace).Get(new.Name, metav1.GetOptions{})
if err != nil {
if k8sErrors.IsNotFound(err) {
_, err = m.clientset.CoreV1().Services(new.Namespace).Create(new)
if err != nil {
return fmt.Errorf("create new service for : %v", err)
}
return nil
}
return fmt.Errorf("retrieve service for metrics-server: %v", err)
}
new.ResourceVersion = old.ResourceVersion
new.Spec.ClusterIP = old.Spec.ClusterIP
_, err = m.clientset.CoreV1().Services(new.Namespace).Update(new)
if err != nil {
return fmt.Errorf("update service for metrics-server: %v", err)
}
return nil
}
func (m *MetricsServiceManager) newMetricsServiceEndpoints() error {
endpoints, err := m.listMetricsServiceEndpoints()
if err != nil {
return err
}
ep := m.metricsServerEndpoint2CoreV1Endpoints(endpoints)
m.ensureEndpoints(ep)
go m.watchMetricsServiceEndpoints()
return nil
}
func (m *MetricsServiceManager) listMetricsServiceEndpoints() ([]metricsServerEndpoint, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp, err := m.clientv3.Get(ctx, "/rainbond/endpoint/METRICS_SERVER_ENDPOINTS", clientv3.WithPrefix())
if err != nil {
return nil, fmt.Errorf("list metrics-server endpoints: %v", err)
}
var endpoints []metricsServerEndpoint
for _, kv := range resp.Kvs {
eps := m.str2MetricsServerEndpoint(kv)
endpoints = append(endpoints, eps...)
}
return endpoints, nil
}
func (m *MetricsServiceManager) watchMetricsServiceEndpoints() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
watchCh := m.clientv3.Watch(ctx, "/rainbond/endpoint/METRICS_SERVER_ENDPOINTS", clientv3.WithPrefix())
for {
select {
case resp := <-watchCh:
for _, event := range resp.Events {
eps := m.str2MetricsServerEndpoint(event.Kv)
ep := m.metricsServerEndpoint2CoreV1Endpoints(eps)
m.ensureEndpoints(ep)
}
}
}
}
func (m *MetricsServiceManager) str2MetricsServerEndpoint(kv *mvccpb.KeyValue) []metricsServerEndpoint {
var endpoints []metricsServerEndpoint
var eps []string
if err := ffjson.Unmarshal(kv.Value, &eps); err != nil {
logrus.Warningf("key: %s; value: %s; wrong metrics-server endpoints: %v", kv.Key, kv.Value, err)
return nil
}
for _, ep := range eps {
ep = strings.Replace(ep, "http://", "", -1)
ep = strings.Replace(ep, "https://", "", -1)
epsli := strings.Split(ep, ":")
if len(epsli) != 2 {
logrus.Warningf("key: %s; value: %s; wrong metrics-server endpoints.", kv.Key, kv.Value)
continue
}
port, err := strconv.Atoi(epsli[1])
if err != nil {
logrus.Warningf("key: %s; value: %s; wrong metrics-server endpoints: %v", kv.Key, kv.Value, err)
continue
}
endpoints = append(endpoints, metricsServerEndpoint{
Address: epsli[0],
Port: port,
})
}
return endpoints
}
func (m *MetricsServiceManager) metricsServerEndpoint2CoreV1Endpoints(endpoints []metricsServerEndpoint) *corev1.Endpoints {
var subsets []corev1.EndpointSubset
for _, ep := range endpoints {
subset := corev1.EndpointSubset{
Addresses: []corev1.EndpointAddress{{IP: ep.Address}},
Ports: []corev1.EndpointPort{{Port: int32(ep.Port)}},
}
subsets = append(subsets, subset)
}
return &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "metrics-server",
Namespace: "kube-system",
Labels: map[string]string{
"kubernetes.io/name": "Metrics-server",
"kubernetes.io/cluster-service": "true",
},
},
Subsets: subsets,
}
}
func (m *MetricsServiceManager) ensureEndpoints(ep *corev1.Endpoints) {
old, err := m.clientset.CoreV1().Endpoints(ep.Namespace).Get(ep.Name, metav1.GetOptions{})
if err != nil {
if k8sErrors.IsNotFound(err) {
_, err = m.clientset.CoreV1().Endpoints(ep.Namespace).Create(ep)
if err != nil {
logrus.Warningf("create endpoints for metrics-server: %v", err)
}
return
}
logrus.Errorf("retrieve endpoints: %v", err)
return
}
ep.ResourceVersion = old.ResourceVersion
_, err = m.clientset.CoreV1().Endpoints(ep.Namespace).Update(ep)
if err != nil {
logrus.Warningf("update endpoints for metrics-server: %v", err)
}
}