mirror of
https://gitee.com/rainbond/Rainbond.git
synced 2024-11-29 18:27:58 +08:00
514d885377
Signed-off-by: 张启航 <101104760+ZhangSetSail@users.noreply.github.com>
491 lines
15 KiB
Go
491 lines
15 KiB
Go
package handler
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/goodrain/rainbond/api/client/prometheus"
|
|
k8sutil "github.com/goodrain/rainbond/util/k8s"
|
|
"github.com/pquerna/ffjson/ffjson"
|
|
"github.com/shirou/gopsutil/disk"
|
|
"github.com/sirupsen/logrus"
|
|
v1 "k8s.io/api/core/v1"
|
|
policyv1 "k8s.io/api/policy/v1"
|
|
policyv1beta1 "k8s.io/api/policy/v1beta1"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
utilversion "k8s.io/apimachinery/pkg/util/version"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"runtime"
|
|
"strings"
|
|
|
|
"github.com/goodrain/rainbond/api/model"
|
|
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
// NodeRolesLabelPrefix -
|
|
NodeRolesLabelPrefix = "node-role.kubernetes.io"
|
|
// NodeRolesLabel -
|
|
NodeRolesLabel = "kubernetes.io/role"
|
|
// NodeInternalIP -
|
|
NodeInternalIP = "InternalIP"
|
|
// NodeExternalIP -
|
|
NodeExternalIP = "ExternalIP"
|
|
// UnSchedulAble -
|
|
UnSchedulAble = "unschedulable"
|
|
// ReSchedulAble -
|
|
ReSchedulAble = "reschedulable"
|
|
// NodeUp -
|
|
NodeUp = "up"
|
|
// NodeDown -
|
|
NodeDown = "down"
|
|
// Evict -
|
|
Evict = "evict"
|
|
//EvictionKind -
|
|
EvictionKind = "Eviction"
|
|
//EvictionSubresource -
|
|
EvictionSubresource = "pods/eviction"
|
|
)
|
|
|
|
// NodesHandler -
|
|
type NodesHandler interface {
|
|
ListNodes(ctx context.Context) ([]model.NodeInfo, error)
|
|
ListChaosNodeArch(ctx context.Context) ([]string, error)
|
|
GetNodeInfo(ctx context.Context, nodeName string) (model.NodeInfo, error)
|
|
NodeAction(ctx context.Context, nodeName, action string) error
|
|
ListLabels(ctx context.Context, nodeName string) (map[string]string, error)
|
|
UpdateLabels(ctx context.Context, nodeName string, labels map[string]string) (map[string]string, error)
|
|
ListTaints(ctx context.Context, nodeName string) ([]v1.Taint, error)
|
|
UpdateTaints(ctx context.Context, nodeName string, taints []v1.Taint) ([]v1.Taint, error)
|
|
}
|
|
|
|
// NewNodesHandler -
|
|
func NewNodesHandler(clientset *kubernetes.Clientset, RbdNamespace string, config *rest.Config, mapper meta.RESTMapper, prometheusCli prometheus.Interface) NodesHandler {
|
|
return &nodesHandle{
|
|
namespace: RbdNamespace,
|
|
clientset: clientset,
|
|
config: config,
|
|
mapper: mapper,
|
|
prometheusCli: prometheusCli,
|
|
}
|
|
}
|
|
|
|
type nodesHandle struct {
|
|
namespace string
|
|
clientset *kubernetes.Clientset
|
|
clusterInfoCache *model.ClusterResource
|
|
cacheTime time.Time
|
|
config *rest.Config
|
|
mapper meta.RESTMapper
|
|
client client.Client
|
|
prometheusCli prometheus.Interface
|
|
}
|
|
|
|
// ListNodes -
|
|
func (n *nodesHandle) ListNodes(ctx context.Context) (res []model.NodeInfo, err error) {
|
|
nodeList, err := n.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
logrus.Errorf("get node list error: %v", err)
|
|
return nil, err
|
|
}
|
|
for _, node := range nodeList.Items {
|
|
nodeInfo, err := n.HandleNodeInfo(node)
|
|
if err != nil {
|
|
logrus.Error("get node info handle error:", err)
|
|
return res, err
|
|
}
|
|
res = append(res, nodeInfo)
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
// ListChaosNodeArch -
|
|
func (n *nodesHandle) ListChaosNodeArch(ctx context.Context) ([]string, error) {
|
|
chaosPods, err := n.clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{
|
|
LabelSelector: "name=rbd-chaos",
|
|
})
|
|
if err != nil {
|
|
logrus.Errorf("get chaos list error: %v", err)
|
|
return nil, err
|
|
}
|
|
chaosNodeIP := make(map[string]int)
|
|
for _, chaosPod := range chaosPods.Items {
|
|
chaosNodeIP[chaosPod.Status.HostIP] = 1
|
|
}
|
|
nodes, err := n.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
logrus.Error("get node info handle error:", err)
|
|
return nil, err
|
|
}
|
|
var nodeArchs []string
|
|
for _, node := range nodes.Items {
|
|
address := node.Status.Addresses
|
|
for _, addr := range address {
|
|
if addr.Type == NodeInternalIP && chaosNodeIP[addr.Address] == 1 {
|
|
nodeArchs = append(nodeArchs, node.Status.NodeInfo.Architecture)
|
|
}
|
|
}
|
|
}
|
|
return nodeArchs, nil
|
|
}
|
|
|
|
// GetNodeInfo -
|
|
func (n *nodesHandle) GetNodeInfo(ctx context.Context, nodeName string) (res model.NodeInfo, err error) {
|
|
node, err := n.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
|
|
if err != nil {
|
|
logrus.Error("get node info error:", err)
|
|
return res, err
|
|
}
|
|
res, err = n.HandleNodeInfo(*node)
|
|
if err != nil {
|
|
logrus.Error("get node info handle error:", err)
|
|
return res, err
|
|
}
|
|
address := node.Status.Addresses
|
|
var ip string
|
|
for _, addr := range address {
|
|
if addr.Type == NodeInternalIP {
|
|
ip = addr.Address
|
|
}
|
|
}
|
|
|
|
query := fmt.Sprintf(`node_filesystem_size_bytes{mountpoint="/", instance=~"%v:.*"}`, ip)
|
|
nodeGenCap := n.prometheusCli.GetMetric(query, time.Now())
|
|
|
|
query = fmt.Sprintf(`node_filesystem_avail_bytes{mountpoint="/", instance=~"%v:.*"}`, ip)
|
|
nodeGenAvail := n.prometheusCli.GetMetric(query, time.Now())
|
|
|
|
var diskCap uint64
|
|
var diskAvail uint64
|
|
for _, dcap := range nodeGenCap.MetricData.MetricValues {
|
|
diskCap = uint64(dcap.Sample.Value())
|
|
if container := dcap.Metadata["container"]; container == "prometheus-node-exporter" {
|
|
break
|
|
}
|
|
}
|
|
for _, avail := range nodeGenAvail.MetricData.MetricValues {
|
|
diskAvail = uint64(avail.Sample.Value())
|
|
if container := avail.Metadata["container"]; container == "prometheus-node-exporter" {
|
|
break
|
|
}
|
|
}
|
|
|
|
query = fmt.Sprintf(`node_filesystem_size_bytes{mountpoint="/var/lib/container", instance=~"%v:.*"}`, ip)
|
|
nodeContainerCap := n.prometheusCli.GetMetric(query, time.Now())
|
|
|
|
query = fmt.Sprintf(`node_filesystem_avail_bytes{mountpoint="/var/lib/container", instance=~"%v:.*"}`, ip)
|
|
nodeContainerAvail := n.prometheusCli.GetMetric(query, time.Now())
|
|
|
|
var containerDiskCap uint64
|
|
var containerDiskAvail uint64
|
|
for _, cap := range nodeContainerCap.MetricData.MetricValues {
|
|
containerDiskCap = uint64(cap.Sample.Value())
|
|
if container := cap.Metadata["container"]; container == "prometheus-node-exporter" {
|
|
break
|
|
}
|
|
}
|
|
for _, avail := range nodeContainerAvail.MetricData.MetricValues {
|
|
containerDiskAvail = uint64(avail.Sample.Value())
|
|
if container := avail.Metadata["container"]; container == "prometheus-node-exporter" {
|
|
break
|
|
}
|
|
}
|
|
if containerDiskCap == 0 {
|
|
containerDiskCap = diskCap
|
|
}
|
|
if containerDiskAvail == 0 {
|
|
containerDiskAvail = diskAvail
|
|
}
|
|
|
|
res.Resource.CapDisk = diskCap
|
|
res.Resource.ReqDisk = diskCap - diskAvail
|
|
res.Resource.CapContainerDisk = containerDiskCap
|
|
res.Resource.ReqContainerDisk = containerDiskCap - containerDiskAvail
|
|
|
|
if res.Resource.CapDisk == 0 {
|
|
var diskStatus *disk.UsageStat
|
|
if runtime.GOOS != "windows" {
|
|
diskStatus, _ = disk.Usage("/")
|
|
} else {
|
|
diskStatus, _ = disk.Usage(`z:\\`)
|
|
}
|
|
var diskCap, reqDisk uint64
|
|
if diskStatus != nil {
|
|
diskCap = diskStatus.Total
|
|
reqDisk = diskStatus.Used
|
|
}
|
|
res.Resource.CapDisk = diskCap
|
|
res.Resource.ReqDisk = reqDisk
|
|
res.Resource.CapContainerDisk = diskCap
|
|
res.Resource.ReqContainerDisk = reqDisk
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
// HandleNode -
|
|
func (n *nodesHandle) HandleNodeInfo(node v1.Node) (nodeinfo model.NodeInfo, err error) {
|
|
var condition model.NodeCondition
|
|
for _, addr := range node.Status.Addresses {
|
|
switch addr.Type {
|
|
case NodeInternalIP:
|
|
nodeinfo.InternalIP = addr.Address
|
|
case NodeExternalIP:
|
|
nodeinfo.ExternalIP = addr.Address
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
for _, cond := range node.Status.Conditions {
|
|
condition.Type = string(cond.Type)
|
|
condition.Status = string(cond.Status)
|
|
condition.Message = cond.Message
|
|
condition.Reason = cond.Reason
|
|
condition.LastHeartbeatTime = cond.LastHeartbeatTime
|
|
condition.LastTransitionTime = cond.LastTransitionTime
|
|
nodeinfo.Conditions = append(nodeinfo.Conditions, condition)
|
|
}
|
|
// get node roles
|
|
var roles []string
|
|
for k, v := range node.Labels {
|
|
if strings.HasPrefix(k, NodeRolesLabelPrefix) {
|
|
// string handle : node-role.kubernetes.io/worker: "true"
|
|
role := strings.Split(k, "/")[1]
|
|
roles = append(roles, role)
|
|
}
|
|
if strings.HasPrefix(k, NodeRolesLabel) {
|
|
// string handle : kubernetes.io/role: master
|
|
roles = append(roles, v)
|
|
}
|
|
continue
|
|
}
|
|
// req resource from Prometheus
|
|
var query string
|
|
query = fmt.Sprintf(`sum(rbd_api_exporter_cluster_pod_memory{node_name="%v"}) by (instance)`, node.Name)
|
|
podMemoryMetric := n.prometheusCli.GetMetric(query, time.Now())
|
|
|
|
query = fmt.Sprintf(`sum(rbd_api_exporter_cluster_pod_cpu{node_name="%v"}) by (instance)`, node.Name)
|
|
podCPUMetric := n.prometheusCli.GetMetric(query, time.Now())
|
|
|
|
query = fmt.Sprintf(`sum(rbd_api_exporter_cluster_pod_ephemeral_storage{node_name="%v"}) by (instance)`, node.Name)
|
|
podEphemeralStorageMetric := n.prometheusCli.GetMetric(query, time.Now())
|
|
|
|
for _, memory := range podMemoryMetric.MetricData.MetricValues {
|
|
nodeinfo.Resource.ReqMemory = int(memory.Sample.Value()) / 1024 / 1024
|
|
}
|
|
for _, cpu := range podCPUMetric.MetricData.MetricValues {
|
|
nodeinfo.Resource.ReqCPU = float32(cpu.Sample.Value()) / 1000
|
|
}
|
|
for _, storage := range podEphemeralStorageMetric.MetricData.MetricValues {
|
|
nodeinfo.Resource.ReqStorageEq = float32(storage.Sample.Value()) / 1024 / 1024
|
|
}
|
|
// cap resource
|
|
nodeinfo.Resource.CapMemory = int(node.Status.Capacity.Memory().Value()) / 1024 / 1024
|
|
nodeinfo.Resource.CapCPU = int(node.Status.Capacity.Cpu().Value())
|
|
nodeinfo.Resource.CapStorageEq = int(node.Status.Capacity.StorageEphemeral().Value()) / 1024 / 1024
|
|
|
|
nodeinfo.Name = node.Name
|
|
nodeinfo.Unschedulable = node.Spec.Unschedulable
|
|
nodeinfo.KernelVersion = node.Status.NodeInfo.KernelVersion
|
|
nodeinfo.ContainerRunTime = node.Status.NodeInfo.ContainerRuntimeVersion
|
|
nodeinfo.Architecture = node.Status.NodeInfo.Architecture
|
|
nodeinfo.OperatingSystem = node.Status.NodeInfo.OperatingSystem
|
|
nodeinfo.CreateTime = node.CreationTimestamp.Time
|
|
nodeinfo.OSVersion = node.Status.NodeInfo.OSImage
|
|
nodeinfo.Roles = roles
|
|
|
|
return nodeinfo, nil
|
|
}
|
|
|
|
// NodeAction -
|
|
func (n *nodesHandle) NodeAction(ctx context.Context, nodeName, action string) error {
|
|
var data string
|
|
switch action {
|
|
case UnSchedulAble:
|
|
// true can't scheduler ,false can scheduler
|
|
data = fmt.Sprintf(`{"spec":{"unschedulable":%t}}`, true)
|
|
case ReSchedulAble:
|
|
// true can't scheduler ,false can scheduler
|
|
data = fmt.Sprintf(`{"spec":{"unschedulable":%t}}`, false)
|
|
case Evict:
|
|
// unschedulable
|
|
data = fmt.Sprintf(`{"spec":{"unschedulable":%t}}`, true)
|
|
default:
|
|
logrus.Info("not support this action")
|
|
return nil
|
|
}
|
|
_, err := n.clientset.CoreV1().Nodes().Patch(ctx, nodeName, types.StrategicMergePatchType, []byte(data), metav1.PatchOptions{})
|
|
if err != nil {
|
|
logrus.Error("action patch error:", err)
|
|
return err
|
|
}
|
|
if action == Evict {
|
|
err = n.DeleteOrEvictPodsSimple(nodeName)
|
|
if err != nil {
|
|
logrus.Error("delete or evict pods error:", err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteOrEvictPodsSimple Evict the Pod from a node
|
|
func (n *nodesHandle) DeleteOrEvictPodsSimple(nodeName string) error {
|
|
nodePods, err := n.GetNodePods(nodeName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
policyGroupVersion, err := n.SupportEviction()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if policyGroupVersion == "" {
|
|
return fmt.Errorf("the server can not support eviction subresource")
|
|
}
|
|
for _, v := range nodePods {
|
|
n.evictPod(v, policyGroupVersion)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SupportEviction uses Discovery API to find out if the server support eviction subresource
|
|
// If support, it will return its groupVersion; Otherwise, it will return ""
|
|
func (n *nodesHandle) SupportEviction() (string, error) {
|
|
discoveryClient := n.clientset.Discovery()
|
|
groupList, err := discoveryClient.ServerGroups()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
foundPolicyGroup := false
|
|
var policyGroupVersion string
|
|
for _, group := range groupList.Groups {
|
|
if group.Name == "policy" {
|
|
foundPolicyGroup = true
|
|
policyGroupVersion = group.PreferredVersion.GroupVersion
|
|
break
|
|
}
|
|
}
|
|
if !foundPolicyGroup {
|
|
return "", nil
|
|
}
|
|
resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1")
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
for _, resource := range resourceList.APIResources {
|
|
if resource.Name == EvictionSubresource && resource.Kind == EvictionKind {
|
|
return policyGroupVersion, nil
|
|
}
|
|
}
|
|
return "", nil
|
|
}
|
|
|
|
// GetNodePods -
|
|
func (n *nodesHandle) GetNodePods(nodeName string) (pods []v1.Pod, err error) {
|
|
podList, err := n.clientset.CoreV1().Pods(metav1.NamespaceAll).List(context.Background(), metav1.ListOptions{
|
|
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}).String()})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return podList.Items, err
|
|
}
|
|
|
|
// GetNodeScheduler Scheduler status
|
|
func (n *nodesHandle) GetNodeScheduler(ctx context.Context, nodeName string) (status bool, err error) {
|
|
node, err := n.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
|
|
if err != nil {
|
|
logrus.Error("get node scheduler status error:", err)
|
|
return false, err
|
|
}
|
|
return node.Spec.Unschedulable, err
|
|
}
|
|
|
|
// evictPod -
|
|
func (n *nodesHandle) evictPod(pod v1.Pod, policyGroupVersion string) error {
|
|
deleteOptions := &metav1.DeleteOptions{}
|
|
if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.21.0")) {
|
|
eviction := &policyv1.Eviction{
|
|
TypeMeta: metav1.TypeMeta{
|
|
APIVersion: policyGroupVersion,
|
|
Kind: EvictionKind,
|
|
},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: pod.Name,
|
|
Namespace: pod.Namespace,
|
|
},
|
|
DeleteOptions: deleteOptions,
|
|
}
|
|
// Remember to change change the URL manipulation func when Evction's version change
|
|
return n.clientset.PolicyV1().Evictions(eviction.Namespace).Evict(context.Background(), eviction)
|
|
}
|
|
eviction := &policyv1beta1.Eviction{
|
|
TypeMeta: metav1.TypeMeta{
|
|
APIVersion: policyGroupVersion,
|
|
Kind: EvictionKind,
|
|
},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: pod.Name,
|
|
Namespace: pod.Namespace,
|
|
},
|
|
DeleteOptions: deleteOptions,
|
|
}
|
|
// Remember to change change the URL manipulation func when Evction's version change
|
|
return n.clientset.PolicyV1beta1().Evictions(eviction.Namespace).Evict(context.Background(), eviction)
|
|
}
|
|
|
|
// GetLabels -
|
|
func (n *nodesHandle) ListLabels(ctx context.Context, nodeName string) (map[string]string, error) {
|
|
node, err := n.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
|
|
if err != nil {
|
|
logrus.Error("[GetLabels] get node error:", err)
|
|
return nil, err
|
|
}
|
|
return node.Labels, err
|
|
}
|
|
|
|
// UpdateLabels -
|
|
func (n *nodesHandle) UpdateLabels(ctx context.Context, nodeName string, labels map[string]string) (map[string]string, error) {
|
|
node, err := n.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
|
|
if err != nil {
|
|
logrus.Error("[UpdateLabels] update node labels error:", err)
|
|
return nil, err
|
|
}
|
|
node.Labels = labels
|
|
res, err := n.clientset.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
logrus.Error("[UpdateLabels] update node labels error:", err)
|
|
return nil, err
|
|
}
|
|
return res.Labels, nil
|
|
}
|
|
|
|
// GetTaint -
|
|
func (n *nodesHandle) ListTaints(ctx context.Context, nodeName string) ([]v1.Taint, error) {
|
|
node, err := n.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
|
|
if err != nil {
|
|
logrus.Error("[GetTaint] get node error:", err)
|
|
return nil, err
|
|
}
|
|
return node.Spec.Taints, err
|
|
}
|
|
|
|
// UpdateTaint -
|
|
func (n *nodesHandle) UpdateTaints(ctx context.Context, nodeName string, taints []v1.Taint) ([]v1.Taint, error) {
|
|
taintsByte, err := ffjson.Marshal(taints)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
data := fmt.Sprintf(`{"spec":{"taints":%s}}`, string(taintsByte))
|
|
node, err := n.clientset.CoreV1().Nodes().Patch(ctx, nodeName, types.StrategicMergePatchType, []byte(data), metav1.PatchOptions{})
|
|
if err != nil {
|
|
logrus.Error("[UpdateTaints] update node taints error:", err)
|
|
return nil, err
|
|
}
|
|
return node.Spec.Taints, err
|
|
}
|