Rainbond/api/handler/nodes.go
张启航 514d885377
fix: list nil (#1969)
Signed-off-by: 张启航 <101104760+ZhangSetSail@users.noreply.github.com>
2024-09-12 12:13:32 +08:00

491 lines
15 KiB
Go

package handler
import (
"context"
"fmt"
"github.com/goodrain/rainbond/api/client/prometheus"
k8sutil "github.com/goodrain/rainbond/util/k8s"
"github.com/pquerna/ffjson/ffjson"
"github.com/shirou/gopsutil/disk"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"runtime"
"strings"
"github.com/goodrain/rainbond/api/model"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"
)
const (
// NodeRolesLabelPrefix -
NodeRolesLabelPrefix = "node-role.kubernetes.io"
// NodeRolesLabel -
NodeRolesLabel = "kubernetes.io/role"
// NodeInternalIP -
NodeInternalIP = "InternalIP"
// NodeExternalIP -
NodeExternalIP = "ExternalIP"
// UnSchedulAble -
UnSchedulAble = "unschedulable"
// ReSchedulAble -
ReSchedulAble = "reschedulable"
// NodeUp -
NodeUp = "up"
// NodeDown -
NodeDown = "down"
// Evict -
Evict = "evict"
//EvictionKind -
EvictionKind = "Eviction"
//EvictionSubresource -
EvictionSubresource = "pods/eviction"
)
// NodesHandler -
type NodesHandler interface {
ListNodes(ctx context.Context) ([]model.NodeInfo, error)
ListChaosNodeArch(ctx context.Context) ([]string, error)
GetNodeInfo(ctx context.Context, nodeName string) (model.NodeInfo, error)
NodeAction(ctx context.Context, nodeName, action string) error
ListLabels(ctx context.Context, nodeName string) (map[string]string, error)
UpdateLabels(ctx context.Context, nodeName string, labels map[string]string) (map[string]string, error)
ListTaints(ctx context.Context, nodeName string) ([]v1.Taint, error)
UpdateTaints(ctx context.Context, nodeName string, taints []v1.Taint) ([]v1.Taint, error)
}
// NewNodesHandler -
func NewNodesHandler(clientset *kubernetes.Clientset, RbdNamespace string, config *rest.Config, mapper meta.RESTMapper, prometheusCli prometheus.Interface) NodesHandler {
return &nodesHandle{
namespace: RbdNamespace,
clientset: clientset,
config: config,
mapper: mapper,
prometheusCli: prometheusCli,
}
}
type nodesHandle struct {
namespace string
clientset *kubernetes.Clientset
clusterInfoCache *model.ClusterResource
cacheTime time.Time
config *rest.Config
mapper meta.RESTMapper
client client.Client
prometheusCli prometheus.Interface
}
// ListNodes -
func (n *nodesHandle) ListNodes(ctx context.Context) (res []model.NodeInfo, err error) {
nodeList, err := n.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
logrus.Errorf("get node list error: %v", err)
return nil, err
}
for _, node := range nodeList.Items {
nodeInfo, err := n.HandleNodeInfo(node)
if err != nil {
logrus.Error("get node info handle error:", err)
return res, err
}
res = append(res, nodeInfo)
}
return res, nil
}
// ListChaosNodeArch -
func (n *nodesHandle) ListChaosNodeArch(ctx context.Context) ([]string, error) {
chaosPods, err := n.clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{
LabelSelector: "name=rbd-chaos",
})
if err != nil {
logrus.Errorf("get chaos list error: %v", err)
return nil, err
}
chaosNodeIP := make(map[string]int)
for _, chaosPod := range chaosPods.Items {
chaosNodeIP[chaosPod.Status.HostIP] = 1
}
nodes, err := n.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
logrus.Error("get node info handle error:", err)
return nil, err
}
var nodeArchs []string
for _, node := range nodes.Items {
address := node.Status.Addresses
for _, addr := range address {
if addr.Type == NodeInternalIP && chaosNodeIP[addr.Address] == 1 {
nodeArchs = append(nodeArchs, node.Status.NodeInfo.Architecture)
}
}
}
return nodeArchs, nil
}
// GetNodeInfo -
func (n *nodesHandle) GetNodeInfo(ctx context.Context, nodeName string) (res model.NodeInfo, err error) {
node, err := n.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
logrus.Error("get node info error:", err)
return res, err
}
res, err = n.HandleNodeInfo(*node)
if err != nil {
logrus.Error("get node info handle error:", err)
return res, err
}
address := node.Status.Addresses
var ip string
for _, addr := range address {
if addr.Type == NodeInternalIP {
ip = addr.Address
}
}
query := fmt.Sprintf(`node_filesystem_size_bytes{mountpoint="/", instance=~"%v:.*"}`, ip)
nodeGenCap := n.prometheusCli.GetMetric(query, time.Now())
query = fmt.Sprintf(`node_filesystem_avail_bytes{mountpoint="/", instance=~"%v:.*"}`, ip)
nodeGenAvail := n.prometheusCli.GetMetric(query, time.Now())
var diskCap uint64
var diskAvail uint64
for _, dcap := range nodeGenCap.MetricData.MetricValues {
diskCap = uint64(dcap.Sample.Value())
if container := dcap.Metadata["container"]; container == "prometheus-node-exporter" {
break
}
}
for _, avail := range nodeGenAvail.MetricData.MetricValues {
diskAvail = uint64(avail.Sample.Value())
if container := avail.Metadata["container"]; container == "prometheus-node-exporter" {
break
}
}
query = fmt.Sprintf(`node_filesystem_size_bytes{mountpoint="/var/lib/container", instance=~"%v:.*"}`, ip)
nodeContainerCap := n.prometheusCli.GetMetric(query, time.Now())
query = fmt.Sprintf(`node_filesystem_avail_bytes{mountpoint="/var/lib/container", instance=~"%v:.*"}`, ip)
nodeContainerAvail := n.prometheusCli.GetMetric(query, time.Now())
var containerDiskCap uint64
var containerDiskAvail uint64
for _, cap := range nodeContainerCap.MetricData.MetricValues {
containerDiskCap = uint64(cap.Sample.Value())
if container := cap.Metadata["container"]; container == "prometheus-node-exporter" {
break
}
}
for _, avail := range nodeContainerAvail.MetricData.MetricValues {
containerDiskAvail = uint64(avail.Sample.Value())
if container := avail.Metadata["container"]; container == "prometheus-node-exporter" {
break
}
}
if containerDiskCap == 0 {
containerDiskCap = diskCap
}
if containerDiskAvail == 0 {
containerDiskAvail = diskAvail
}
res.Resource.CapDisk = diskCap
res.Resource.ReqDisk = diskCap - diskAvail
res.Resource.CapContainerDisk = containerDiskCap
res.Resource.ReqContainerDisk = containerDiskCap - containerDiskAvail
if res.Resource.CapDisk == 0 {
var diskStatus *disk.UsageStat
if runtime.GOOS != "windows" {
diskStatus, _ = disk.Usage("/")
} else {
diskStatus, _ = disk.Usage(`z:\\`)
}
var diskCap, reqDisk uint64
if diskStatus != nil {
diskCap = diskStatus.Total
reqDisk = diskStatus.Used
}
res.Resource.CapDisk = diskCap
res.Resource.ReqDisk = reqDisk
res.Resource.CapContainerDisk = diskCap
res.Resource.ReqContainerDisk = reqDisk
}
return res, nil
}
// HandleNode -
func (n *nodesHandle) HandleNodeInfo(node v1.Node) (nodeinfo model.NodeInfo, err error) {
var condition model.NodeCondition
for _, addr := range node.Status.Addresses {
switch addr.Type {
case NodeInternalIP:
nodeinfo.InternalIP = addr.Address
case NodeExternalIP:
nodeinfo.ExternalIP = addr.Address
default:
continue
}
}
for _, cond := range node.Status.Conditions {
condition.Type = string(cond.Type)
condition.Status = string(cond.Status)
condition.Message = cond.Message
condition.Reason = cond.Reason
condition.LastHeartbeatTime = cond.LastHeartbeatTime
condition.LastTransitionTime = cond.LastTransitionTime
nodeinfo.Conditions = append(nodeinfo.Conditions, condition)
}
// get node roles
var roles []string
for k, v := range node.Labels {
if strings.HasPrefix(k, NodeRolesLabelPrefix) {
// string handle : node-role.kubernetes.io/worker: "true"
role := strings.Split(k, "/")[1]
roles = append(roles, role)
}
if strings.HasPrefix(k, NodeRolesLabel) {
// string handle : kubernetes.io/role: master
roles = append(roles, v)
}
continue
}
// req resource from Prometheus
var query string
query = fmt.Sprintf(`sum(rbd_api_exporter_cluster_pod_memory{node_name="%v"}) by (instance)`, node.Name)
podMemoryMetric := n.prometheusCli.GetMetric(query, time.Now())
query = fmt.Sprintf(`sum(rbd_api_exporter_cluster_pod_cpu{node_name="%v"}) by (instance)`, node.Name)
podCPUMetric := n.prometheusCli.GetMetric(query, time.Now())
query = fmt.Sprintf(`sum(rbd_api_exporter_cluster_pod_ephemeral_storage{node_name="%v"}) by (instance)`, node.Name)
podEphemeralStorageMetric := n.prometheusCli.GetMetric(query, time.Now())
for _, memory := range podMemoryMetric.MetricData.MetricValues {
nodeinfo.Resource.ReqMemory = int(memory.Sample.Value()) / 1024 / 1024
}
for _, cpu := range podCPUMetric.MetricData.MetricValues {
nodeinfo.Resource.ReqCPU = float32(cpu.Sample.Value()) / 1000
}
for _, storage := range podEphemeralStorageMetric.MetricData.MetricValues {
nodeinfo.Resource.ReqStorageEq = float32(storage.Sample.Value()) / 1024 / 1024
}
// cap resource
nodeinfo.Resource.CapMemory = int(node.Status.Capacity.Memory().Value()) / 1024 / 1024
nodeinfo.Resource.CapCPU = int(node.Status.Capacity.Cpu().Value())
nodeinfo.Resource.CapStorageEq = int(node.Status.Capacity.StorageEphemeral().Value()) / 1024 / 1024
nodeinfo.Name = node.Name
nodeinfo.Unschedulable = node.Spec.Unschedulable
nodeinfo.KernelVersion = node.Status.NodeInfo.KernelVersion
nodeinfo.ContainerRunTime = node.Status.NodeInfo.ContainerRuntimeVersion
nodeinfo.Architecture = node.Status.NodeInfo.Architecture
nodeinfo.OperatingSystem = node.Status.NodeInfo.OperatingSystem
nodeinfo.CreateTime = node.CreationTimestamp.Time
nodeinfo.OSVersion = node.Status.NodeInfo.OSImage
nodeinfo.Roles = roles
return nodeinfo, nil
}
// NodeAction -
func (n *nodesHandle) NodeAction(ctx context.Context, nodeName, action string) error {
var data string
switch action {
case UnSchedulAble:
// true can't scheduler ,false can scheduler
data = fmt.Sprintf(`{"spec":{"unschedulable":%t}}`, true)
case ReSchedulAble:
// true can't scheduler ,false can scheduler
data = fmt.Sprintf(`{"spec":{"unschedulable":%t}}`, false)
case Evict:
// unschedulable
data = fmt.Sprintf(`{"spec":{"unschedulable":%t}}`, true)
default:
logrus.Info("not support this action")
return nil
}
_, err := n.clientset.CoreV1().Nodes().Patch(ctx, nodeName, types.StrategicMergePatchType, []byte(data), metav1.PatchOptions{})
if err != nil {
logrus.Error("action patch error:", err)
return err
}
if action == Evict {
err = n.DeleteOrEvictPodsSimple(nodeName)
if err != nil {
logrus.Error("delete or evict pods error:", err)
return err
}
}
return nil
}
// DeleteOrEvictPodsSimple Evict the Pod from a node
func (n *nodesHandle) DeleteOrEvictPodsSimple(nodeName string) error {
nodePods, err := n.GetNodePods(nodeName)
if err != nil {
return err
}
policyGroupVersion, err := n.SupportEviction()
if err != nil {
return err
}
if policyGroupVersion == "" {
return fmt.Errorf("the server can not support eviction subresource")
}
for _, v := range nodePods {
n.evictPod(v, policyGroupVersion)
}
return nil
}
// SupportEviction uses Discovery API to find out if the server support eviction subresource
// If support, it will return its groupVersion; Otherwise, it will return ""
func (n *nodesHandle) SupportEviction() (string, error) {
discoveryClient := n.clientset.Discovery()
groupList, err := discoveryClient.ServerGroups()
if err != nil {
return "", err
}
foundPolicyGroup := false
var policyGroupVersion string
for _, group := range groupList.Groups {
if group.Name == "policy" {
foundPolicyGroup = true
policyGroupVersion = group.PreferredVersion.GroupVersion
break
}
}
if !foundPolicyGroup {
return "", nil
}
resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1")
if err != nil {
return "", err
}
for _, resource := range resourceList.APIResources {
if resource.Name == EvictionSubresource && resource.Kind == EvictionKind {
return policyGroupVersion, nil
}
}
return "", nil
}
// GetNodePods -
func (n *nodesHandle) GetNodePods(nodeName string) (pods []v1.Pod, err error) {
podList, err := n.clientset.CoreV1().Pods(metav1.NamespaceAll).List(context.Background(), metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}).String()})
if err != nil {
return nil, err
}
return podList.Items, err
}
// GetNodeScheduler Scheduler status
func (n *nodesHandle) GetNodeScheduler(ctx context.Context, nodeName string) (status bool, err error) {
node, err := n.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
logrus.Error("get node scheduler status error:", err)
return false, err
}
return node.Spec.Unschedulable, err
}
// evictPod -
func (n *nodesHandle) evictPod(pod v1.Pod, policyGroupVersion string) error {
deleteOptions := &metav1.DeleteOptions{}
if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.21.0")) {
eviction := &policyv1.Eviction{
TypeMeta: metav1.TypeMeta{
APIVersion: policyGroupVersion,
Kind: EvictionKind,
},
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
DeleteOptions: deleteOptions,
}
// Remember to change change the URL manipulation func when Evction's version change
return n.clientset.PolicyV1().Evictions(eviction.Namespace).Evict(context.Background(), eviction)
}
eviction := &policyv1beta1.Eviction{
TypeMeta: metav1.TypeMeta{
APIVersion: policyGroupVersion,
Kind: EvictionKind,
},
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
DeleteOptions: deleteOptions,
}
// Remember to change change the URL manipulation func when Evction's version change
return n.clientset.PolicyV1beta1().Evictions(eviction.Namespace).Evict(context.Background(), eviction)
}
// GetLabels -
func (n *nodesHandle) ListLabels(ctx context.Context, nodeName string) (map[string]string, error) {
node, err := n.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
logrus.Error("[GetLabels] get node error:", err)
return nil, err
}
return node.Labels, err
}
// UpdateLabels -
func (n *nodesHandle) UpdateLabels(ctx context.Context, nodeName string, labels map[string]string) (map[string]string, error) {
node, err := n.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
logrus.Error("[UpdateLabels] update node labels error:", err)
return nil, err
}
node.Labels = labels
res, err := n.clientset.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{})
if err != nil {
logrus.Error("[UpdateLabels] update node labels error:", err)
return nil, err
}
return res.Labels, nil
}
// GetTaint -
func (n *nodesHandle) ListTaints(ctx context.Context, nodeName string) ([]v1.Taint, error) {
node, err := n.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
logrus.Error("[GetTaint] get node error:", err)
return nil, err
}
return node.Spec.Taints, err
}
// UpdateTaint -
func (n *nodesHandle) UpdateTaints(ctx context.Context, nodeName string, taints []v1.Taint) ([]v1.Taint, error) {
taintsByte, err := ffjson.Marshal(taints)
if err != nil {
return nil, err
}
data := fmt.Sprintf(`{"spec":{"taints":%s}}`, string(taintsByte))
node, err := n.clientset.CoreV1().Nodes().Patch(ctx, nodeName, types.StrategicMergePatchType, []byte(data), metav1.PatchOptions{})
if err != nil {
logrus.Error("[UpdateTaints] update node taints error:", err)
return nil, err
}
return node.Spec.Taints, err
}