change some code

This commit is contained in:
barnettZQG 2020-06-17 12:57:41 +08:00
parent b1dd746e24
commit b3955d0ee7
8 changed files with 83 additions and 65 deletions

View File

@ -28,10 +28,16 @@ import (
"github.com/goodrain/rainbond/db"
etcdutil "github.com/goodrain/rainbond/util/etcd"
"github.com/goodrain/rainbond/worker/client"
"k8s.io/client-go/kubernetes"
)
//InitHandle 初始化handle
func InitHandle(conf option.Config, etcdClientArgs *etcdutil.ClientArgs, statusCli *client.AppRuntimeSyncClient, etcdcli *clientv3.Client) error {
func InitHandle(conf option.Config,
etcdClientArgs *etcdutil.ClientArgs,
statusCli *client.AppRuntimeSyncClient,
etcdcli *clientv3.Client,
kubeClient *kubernetes.Clientset,
) error {
mq := api_db.MQManager{
EtcdClientArgs: etcdClientArgs,
DefaultServer: conf.MQAPI,
@ -45,7 +51,7 @@ func InitHandle(conf option.Config, etcdClientArgs *etcdutil.ClientArgs, statusC
defaultServieHandler = CreateManager(conf, mqClient, etcdcli, statusCli)
defaultPluginHandler = CreatePluginManager(mqClient)
defaultAppHandler = CreateAppManager(mqClient)
defaultTenantHandler = CreateTenManager(mqClient, statusCli, &conf)
defaultTenantHandler = CreateTenManager(mqClient, statusCli, &conf, kubeClient)
defaultNetRulesHandler = CreateNetRulesManager(etcdcli)
defaultCloudHandler = CreateCloudManager(conf)
defaultAPPBackupHandler = group.CreateBackupHandle(mqClient, statusCli, etcdcli)

View File

@ -25,6 +25,7 @@ import (
"sort"
"strconv"
"strings"
"time"
"github.com/Sirupsen/logrus"
api_model "github.com/goodrain/rainbond/api/model"
@ -33,24 +34,30 @@ import (
"github.com/goodrain/rainbond/db"
dbmodel "github.com/goodrain/rainbond/db/model"
mqclient "github.com/goodrain/rainbond/mq/client"
cli "github.com/goodrain/rainbond/node/nodem/client"
"github.com/goodrain/rainbond/worker/client"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
//TenantAction tenant act
type TenantAction struct {
MQClient mqclient.MQClient
statusCli *client.AppRuntimeSyncClient
OptCfg *option.Config
MQClient mqclient.MQClient
statusCli *client.AppRuntimeSyncClient
OptCfg *option.Config
kubeClient *kubernetes.Clientset
cacheClusterResourceStats *ClusterResourceStats
cacheTime time.Time
}
//CreateTenManager create Manger
func CreateTenManager(mqc mqclient.MQClient, statusCli *client.AppRuntimeSyncClient,
optCfg *option.Config) *TenantAction {
optCfg *option.Config, kubeClient *kubernetes.Clientset) *TenantAction {
return &TenantAction{
MQClient: mqc,
statusCli: statusCli,
OptCfg: optCfg,
MQClient: mqc,
statusCli: statusCli,
OptCfg: optCfg,
kubeClient: kubeClient,
}
}
@ -355,53 +362,38 @@ type ClusterResourceStats struct {
// GetAllocatableResources returns allocatable cpu and memory (MB)
func (t *TenantAction) GetAllocatableResources() (*ClusterResourceStats, error) {
var crs ClusterResourceStats
nproxy := GetNodeProxy()
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/v2/nodes/rule/compute",
t.OptCfg.NodeAPI), nil)
if err != nil {
return &crs, fmt.Errorf("error creating http request: %v", err)
}
resp, err := nproxy.Do(req)
if err != nil {
return &crs, fmt.Errorf("error getting cluster resources: %v", err)
}
if resp.Body != nil {
defer resp.Body.Close()
if resp.StatusCode != 200 {
return &crs, fmt.Errorf("error getting cluster resources: status code: %d; "+
"response: %v", resp.StatusCode, resp)
}
type foo struct {
List []*cli.HostNode `json:"list"`
}
var f foo
err = json.NewDecoder(resp.Body).Decode(&f)
if t.cacheClusterResourceStats == nil || t.cacheTime.Add(time.Minute*3).Before(time.Now()) {
var crs ClusterResourceStats
nodes, err := t.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return &crs, fmt.Errorf("error decoding response body: %v", err)
logrus.Errorf("get cluster nodes failure %s", err.Error())
return &crs, nil
}
for _, n := range f.List {
if k := n.NodeStatus.KubeNode; k != nil && !k.Spec.Unschedulable {
s := strings.Replace(k.Status.Allocatable.Cpu().String(), "m", "", -1)
i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return &crs, fmt.Errorf("error converting string to int64: %v", err)
}
crs.AllCPU += i
crs.AllMemory += k.Status.Allocatable.Memory().Value() / (1024 * 1024)
for _, node := range nodes.Items {
if node.Spec.Unschedulable {
continue
}
for _, c := range node.Status.Conditions {
if c.Type == v1.NodeReady && c.Status != v1.ConditionTrue {
continue
}
}
crs.AllMemory += node.Status.Allocatable.Memory().Value() / (1024 * 1024)
crs.AllCPU += node.Status.Allocatable.Cpu().Value()
}
t.cacheClusterResourceStats = &crs
t.cacheTime = time.Now()
}
ts, err := t.statusCli.GetTenantResource("")
if err != nil {
logrus.Errorf("get tenant resource failure %s", err.Error())
}
crs := t.cacheClusterResourceStats
if ts != nil {
crs.RequestCPU = ts.CpuRequest
crs.RequestMemory = ts.MemoryRequest
}
return &crs, nil
return crs, nil
}
//GetServicesResources Gets the resource usage of the specified service.

View File

@ -58,6 +58,7 @@ type Config struct {
LicSoPath string
LogPath string
KuberentesDashboardAPI string
KubeConfigPath string
}
//APIServer apiserver server
@ -106,6 +107,7 @@ func (a *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&a.LicensePath, "license-path", "/opt/rainbond/etc/license/license.yb", "the license path of the enterprise version.")
fs.StringVar(&a.LicSoPath, "license-so-path", "/opt/rainbond/etc/license/license.so", "Dynamic library file path for parsing the license.")
fs.StringVar(&a.LogPath, "log-path", "/grdata/logs", "Where Docker log files and event log files are stored.")
fs.StringVar(&a.KubeConfigPath, "kube-config", "", "kube config file path, No setup is required to run in a cluster.")
fs.StringVar(&a.KuberentesDashboardAPI, "k8s-dashboard-api", "kubernetes-dashboard.rbd-system:443", "The service DNS name of Kubernetes dashboard. Default to kubernetes-dashboard.kubernetes-dashboard")
}

View File

@ -32,7 +32,9 @@ import (
"github.com/goodrain/rainbond/cmd/api/option"
"github.com/goodrain/rainbond/event"
etcdutil "github.com/goodrain/rainbond/util/etcd"
k8sutil "github.com/goodrain/rainbond/util/k8s"
"github.com/goodrain/rainbond/worker/client"
"k8s.io/client-go/kubernetes"
"github.com/Sirupsen/logrus"
)
@ -62,7 +64,14 @@ func Run(s *option.APIServer) error {
if err := db.CreateEventManager(s.Config); err != nil {
logrus.Debugf("create event manager error, %v", err)
}
config, err := k8sutil.NewRestConfig(s.KubeConfigPath)
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}
if err := event.NewManager(event.EventConfig{
EventLogServers: s.Config.EventLogServers,
DiscoverArgs: etcdClientArgs,
@ -91,7 +100,7 @@ func Run(s *option.APIServer) error {
//初始化 middleware
handler.InitProxy(s.Config)
//创建handle
if err := handler.InitHandle(s.Config, etcdClientArgs, cli, etcdcli); err != nil {
if err := handler.InitHandle(s.Config, etcdClientArgs, cli, etcdcli, clientset); err != nil {
logrus.Errorf("init all handle error, %v", err)
return err
}

View File

@ -72,15 +72,15 @@ func (e EventStatus) String() string {
// EventStatusSuccess -
var EventStatusSuccess EventStatus = "success"
// EventStatusFailure
// EventStatusFailure -
var EventStatusFailure EventStatus = "failure"
//ServiceEvent event struct
type ServiceEvent struct {
Model
EventID string `gorm:"column:event_id;size:40"`
TenantID string `gorm:"column:tenant_id;size:40"`
ServiceID string `gorm:"column:service_id;size:40"`
TenantID string `gorm:"column:tenant_id;size:40;index:tenant_id"`
ServiceID string `gorm:"column:service_id;size:40;index:service_id"`
Target string `gorm:"column:target;size:40"`
TargetID string `gorm:"column:target_id;size:255"`
RequestBody string `gorm:"column:request_body;size:1024"`

View File

@ -131,7 +131,7 @@ func (c *EventDaoImpl) GetEventsByTarget(target, targetID string, offset, limit
if err := db.Find(&result).Count(&total).Error; err != nil {
return nil, 0, err
}
if err := db.Offset(offset).Limit(limit).Order("create_time DESC, id DESC").Find(&result).Error; err != nil {
if err := db.Offset(offset).Limit(limit).Order("create_time, id DESC").Find(&result).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return result, 0, nil
}
@ -149,7 +149,7 @@ func (c *EventDaoImpl) GetEventsByTenantID(tenantID string, offset, limit int) (
if err := db.Find(&result).Count(&total).Error; err != nil {
return nil, 0, err
}
if err := db.Offset(offset).Limit(limit).Order("start_time DESC, id DESC").Find(&result).Error; err != nil {
if err := db.Offset(offset).Limit(limit).Order("start_time, id DESC").Find(&result).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return result, 0, nil
}

View File

@ -66,7 +66,8 @@ func CreateCluster(kubecli kubecache.KubeClient, node *client.HostNode, datacent
//Start 启动
func (n *Cluster) Start(errchan chan error) error {
go n.loadAndWatchNodes(errchan)
go n.installWorker(errchan)
// disable after 5.2.0
// go n.installWorker(errchan)
go n.loopHandleNodeStatus(errchan)
return nil
}
@ -138,7 +139,7 @@ func (n *Cluster) handleNodeStatus(v *client.HostNode) {
v.NodeStatus.Status = client.Unknown
v.GetAndUpdateCondition(client.NodeUp, client.ConditionFalse, "", "Node lost connection, state unknown")
//node lost connection, advice offline action
v.NodeStatus.AdviceAction = append(v.NodeStatus.AdviceAction, "offline")
//v.NodeStatus.AdviceAction = append(v.NodeStatus.AdviceAction, "offline")
} else {
v.GetAndUpdateCondition(client.NodeUp, client.ConditionTrue, "", "")
v.NodeStatus.CurrentScheduleStatus = !v.Unschedulable
@ -199,19 +200,21 @@ func (n *Cluster) handleNodeStatus(v *client.HostNode) {
}
if action == "scheduler" && !v.Unschedulable {
//if node status is not scheduler
if v.NodeStatus.KubeNode != nil && v.NodeStatus.KubeNode.Spec.Unschedulable {
logrus.Infof("node %s is advice set scheduler,will do this action", v.ID)
_, err := n.kubecli.CordonOrUnCordon(v.ID, false)
if err != nil {
logrus.Errorf("auto set node is scheduler failure.")
}
}
// disable from 5.2.0
// if v.NodeStatus.KubeNode != nil && v.NodeStatus.KubeNode.Spec.Unschedulable {
// logrus.Infof("node %s is advice set scheduler,will do this action", v.ID)
// _, err := n.kubecli.CordonOrUnCordon(v.ID, false)
// if err != nil {
// logrus.Errorf("auto set node is scheduler failure.")
// }
// }
}
if action == "offline" {
logrus.Warningf("node %s is advice set offline", v.ID)
// k8s will offline node itself.
// remove the endpoints associated with the node from etcd
v.DelEndpoints()
// disable from 5.2.0
// v.DelEndpoints()
}
}
}

View File

@ -217,10 +217,16 @@ func (n *NodeManager) heartbeat() {
if n.cfg.AutoUnschedulerUnHealthDuration == 0 {
continue
}
if v.ErrorDuration > n.cfg.AutoUnschedulerUnHealthDuration && n.cfg.AutoScheduler && n.currentNode.Role.HasRule(client.ComputeNode) {
n.currentNode.NodeStatus.AdviceAction = []string{"unscheduler"}
logrus.Warningf("node unhealth more than %s(service %s unhealth), will send unscheduler advice action to master", n.cfg.AutoUnschedulerUnHealthDuration.String(), ser.Name)
}
// disable from 5.2.0 2020 06 17
// if v.ErrorDuration > n.cfg.AutoUnschedulerUnHealthDuration && n.cfg.AutoScheduler && n.currentNode.Role.HasRule(client.ComputeNode) {
// n.currentNode.NodeStatus.AdviceAction = []string{"unscheduler"}
// logrus.Warningf("node unhealth more than %s(service %s unhealth), will send unscheduler advice action to master", n.cfg.AutoUnschedulerUnHealthDuration.String(), ser.Name)
// } else {
// n.currentNode.NodeStatus.AdviceAction = []string{}
// }
n.currentNode.NodeStatus.AdviceAction = []string{}
} else {
logrus.Errorf("can not find service %s", k)
}