Merge pull request #792 from GLYASAI/clusterinfo

get cluster info in rbd-api
This commit is contained in:
barnettZQG 2020-07-15 04:08:06 -05:00 committed by GitHub
commit 283a5ebee3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 299 additions and 9 deletions

View File

@ -22,6 +22,11 @@ import (
"net/http"
)
// ClusterInterface -
type ClusterInterface interface {
GetClusterInfo(w http.ResponseWriter, r *http.Request)
}
//TenantInterface interface
type TenantInterface interface {
TenantInterfaceWithV1

View File

@ -39,6 +39,7 @@ func (v2 *V2) Routes() chi.Router {
r.Get("/show", controller.GetManager().Show)
r.Post("/show", controller.GetManager().Show)
r.Mount("/tenants", v2.tenantRouter())
r.Mount("/cluster", v2.clusterRouter())
r.Mount("/notificationEvent", v2.notificationEventRouter())
r.Mount("/resources", v2.resourcesRouter())
r.Mount("/prometheus", v2.prometheusRouter())
@ -78,6 +79,12 @@ func (v2 *V2) eventsRouter() chi.Router {
return r
}
func (v2 *V2) clusterRouter() chi.Router {
r := chi.NewRouter()
r.Get("/", controller.GetManager().GetClusterInfo)
return r
}
func (v2 *V2) tenantRouter() chi.Router {
r := chi.NewRouter()
r.Post("/", controller.GetManager().Tenants)

View File

@ -16,16 +16,30 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package option
package controller
import (
"testing"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/api/handler"
"net/http"
httputil "github.com/goodrain/rainbond/util/http"
)
func TestConfig_GetEtcdClientArgs(t *testing.T) {
c := Config{EtcdEndpoint: []string{"192.168.2.203:2379"}, EtcdCaFile: "string", EtcdCertFile: "", EtcdKeyFile: ""}
a := NewAPIServer()
a.Config = c
a.SetEtcdClientArgs()
t.Logf("%+v", a.EtcdClientArgs)
// ClusterController -
type ClusterController struct {
}
// GetClusterInfo -
func (t *ClusterController) GetClusterInfo(w http.ResponseWriter, r *http.Request) {
logrus.Debugf("start getting cluster info")
nodes, err := handler.GetClusterHandler().GetClusterInfo()
if err != nil {
logrus.Errorf("get cluster info: %v", err)
httputil.ReturnError(r, w, 500, err.Error())
return
}
httputil.ReturnSuccess(r, w, nodes)
}

View File

@ -36,6 +36,7 @@ type V2Manager interface {
Health(w http.ResponseWriter, r *http.Request)
AlertManagerWebHook(w http.ResponseWriter, r *http.Request)
Version(w http.ResponseWriter, r *http.Request)
api.ClusterInterface
api.TenantInterface
api.ServiceInterface
api.LogInterface

View File

@ -48,6 +48,7 @@ import (
//V2Routes v2Routes
type V2Routes struct {
ClusterController
TenantStruct
EventLogStruct
AppStruct

186
api/handler/cluster.go Normal file
View File

@ -0,0 +1,186 @@
package handler
import (
"fmt"
"runtime"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/api/model"
"github.com/shirou/gopsutil/disk"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
)
// ClusterHandler -
type ClusterHandler interface {
GetClusterInfo() (*model.ClusterResource, error)
}
// NewClusterHandler -
func NewClusterHandler(clientset *kubernetes.Clientset) ClusterHandler {
return &clusterAction{
clientset: clientset,
}
}
type clusterAction struct {
clientset *kubernetes.Clientset
}
func (c *clusterAction) GetClusterInfo() (*model.ClusterResource, error) {
nodes, err := c.listNodes()
if err != nil {
return nil, fmt.Errorf("[GetClusterInfo] list nodes: %v", err)
}
var healthCapCPU, healthCapMem, unhealthCapCPU, unhealthCapMem int64
nodeLen := len(nodes)
_ = nodeLen
usedNodeList := make([]*corev1.Node, len(nodes))
for i := range nodes {
node := nodes[i]
if !isNodeReady(node) {
logrus.Debugf("[GetClusterInfo] node(%s) not ready", node.GetName())
unhealthCapCPU += node.Status.Allocatable.Cpu().Value()
unhealthCapMem += node.Status.Allocatable.Memory().Value()
continue
}
healthCapCPU += node.Status.Allocatable.Cpu().Value()
healthCapMem += node.Status.Allocatable.Memory().Value()
if node.Spec.Unschedulable == false {
usedNodeList[i] = node
}
}
var healthcpuR, healthmemR, unhealthCPUR, unhealthMemR int64
nodeAllocatableResourceList := make(map[string]*model.NodeResource, len(usedNodeList))
var maxAllocatableMemory *model.NodeResource
for i := range usedNodeList {
node := usedNodeList[i]
pods, err := c.listPods(node.Name)
if err != nil {
return nil, fmt.Errorf("list pods: %v", err)
}
nodeAllocatableResource := model.NewResource(node.Status.Allocatable)
for _, pod := range pods {
nodeAllocatableResource.AllowedPodNumber--
for _, c := range pod.Spec.Containers {
nodeAllocatableResource.Memory -= c.Resources.Requests.Memory().Value()
nodeAllocatableResource.MilliCPU -= c.Resources.Requests.Cpu().MilliValue()
nodeAllocatableResource.EphemeralStorage -= c.Resources.Requests.StorageEphemeral().Value()
if isNodeReady(node) {
healthcpuR += c.Resources.Requests.Cpu().MilliValue()
healthmemR += c.Resources.Requests.Memory().Value()
} else {
unhealthCPUR += c.Resources.Requests.Cpu().MilliValue()
unhealthMemR += c.Resources.Requests.Memory().Value()
}
}
}
nodeAllocatableResourceList[node.Name] = nodeAllocatableResource
// Gets the node resource with the maximum remaining scheduling memory
if maxAllocatableMemory == nil {
maxAllocatableMemory = nodeAllocatableResource
} else {
if nodeAllocatableResource.Memory > maxAllocatableMemory.Memory {
maxAllocatableMemory = nodeAllocatableResource
}
}
}
var diskstauts *disk.UsageStat
if runtime.GOOS != "windows" {
diskstauts, _ = disk.Usage("/grdata")
} else {
diskstauts, _ = disk.Usage(`z:\\`)
}
var diskCap, reqDisk uint64
if diskstauts != nil {
diskCap = diskstauts.Total
reqDisk = diskstauts.Used
}
result := &model.ClusterResource{
CapCPU: int(healthCapCPU + unhealthCapCPU),
CapMem: int(healthCapMem+unhealthCapMem) / 1024 / 1024,
HealthCapCPU: int(healthCapCPU),
HealthCapMem: int(healthCapMem) / 1024 / 1024,
UnhealthCapCPU: int(unhealthCapCPU),
UnhealthCapMem: int(unhealthCapMem) / 1024 / 1024,
ReqCPU: float32(healthcpuR+unhealthCPUR) / 1000,
ReqMem: int(healthmemR+unhealthMemR) / 1024 / 1024,
HealthReqCPU: float32(healthcpuR) / 1000,
HealthReqMem: int(healthmemR) / 1024 / 1024,
UnhealthReqCPU: float32(unhealthCPUR) / 1000,
UnhealthReqMem: int(unhealthMemR) / 1024 / 1024,
ComputeNode: len(nodes),
CapDisk: diskCap,
ReqDisk: reqDisk,
MaxAllocatableMemoryNodeResource: maxAllocatableMemory,
}
result.AllNode = len(nodes)
for _, node := range nodes {
if !isNodeReady(node) {
result.NotReadyNode++
}
}
return result, nil
}
func (c *clusterAction) listNodes() ([]*corev1.Node, error) {
opts := metav1.ListOptions{}
nodeList, err := c.clientset.CoreV1().Nodes().List(opts)
if err != nil {
return nil, err
}
var nodes []*corev1.Node
for idx := range nodeList.Items {
node := &nodeList.Items[idx]
// check if node contains taints
if containsTaints(node) {
logrus.Debugf("[GetClusterInfo] node(%s) contains NoSchedule taints", node.GetName())
continue
}
nodes = append(nodes, node)
}
return nodes, nil
}
func isNodeReady(node *corev1.Node) bool {
for _, cond := range node.Status.Conditions {
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
return true
}
}
return false
}
func containsTaints(node *corev1.Node) bool {
for _, taint := range node.Spec.Taints {
if taint.Effect == corev1.TaintEffectNoSchedule {
return true
}
}
return false
}
func (c *clusterAction) listPods(nodeName string) (pods []corev1.Pod, err error) {
podList, err := c.clientset.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}).String()})
if err != nil {
return pods, err
}
return podList.Items, nil
}

View File

@ -68,6 +68,7 @@ func InitHandle(conf option.Config,
batchOperationHandler = CreateBatchOperationHandler(mqClient, operationHandler)
defaultAppRestoreHandler = NewAppRestoreHandler()
defPodHandler = NewPodHandler(statusCli)
defClusterHandler = NewClusterHandler(kubeClient)
defaultVolumeTypeHandler = CreateVolumeTypeManger(statusCli)
defaultEtcdHandler = NewEtcdHandler(etcdcli)
@ -191,3 +192,10 @@ var defaultEtcdHandler *EtcdHandler
func GetEtcdHandler() *EtcdHandler {
return defaultEtcdHandler
}
var defClusterHandler ClusterHandler
// GetClusterHandler returns the default cluster handler.
func GetClusterHandler() ClusterHandler {
return defClusterHandler
}

View File

@ -152,7 +152,7 @@ func Proxy(next http.Handler) http.Handler {
handler.GetNodeProxy().Proxy(w, r)
return
}
if strings.HasPrefix(r.RequestURI, "/v2/cluster") {
if strings.HasPrefix(r.RequestURI, "/v2/cluster/service-health") {
handler.GetNodeProxy().Proxy(w, r)
return
}

66
api/model/cluster.go Normal file
View File

@ -0,0 +1,66 @@
package model
import (
corev1 "k8s.io/api/core/v1"
)
//ClusterResource -
type ClusterResource struct {
AllNode int `json:"all_node"`
NotReadyNode int `json:"notready_node"`
ComputeNode int `json:"compute_node"`
Tenant int `json:"tenant"`
CapCPU int `json:"cap_cpu"`
CapMem int `json:"cap_mem"`
HealthCapCPU int `json:"health_cap_cpu"`
HealthCapMem int `json:"health_cap_mem"`
UnhealthCapCPU int `json:"unhealth_cap_cpu"`
UnhealthCapMem int `json:"unhealth_cap_mem"`
ReqCPU float32 `json:"req_cpu"`
ReqMem int `json:"req_mem"`
HealthReqCPU float32 `json:"health_req_cpu"`
HealthReqMem int `json:"health_req_mem"`
UnhealthReqCPU float32 `json:"unhealth_req_cpu"`
UnhealthReqMem int `json:"unhealth_req_mem"`
CapDisk uint64 `json:"cap_disk"`
ReqDisk uint64 `json:"req_disk"`
MaxAllocatableMemoryNodeResource *NodeResource `json:"max_allocatable_memory_node_resource"`
}
// NodeResource is a collection of compute resource.
type NodeResource struct {
MilliCPU int64 `json:"milli_cpu"`
Memory int64 `json:"memory"`
NvidiaGPU int64 `json:"nvidia_gpu"`
EphemeralStorage int64 `json:"ephemeral_storage"`
// We store allowedPodNumber (which is Node.Status.Allocatable.Pods().Value())
// explicitly as int, to avoid conversions and improve performance.
AllowedPodNumber int `json:"allowed_pod_number"`
}
// NewResource creates a Resource from ResourceList
func NewResource(rl corev1.ResourceList) *NodeResource {
r := &NodeResource{}
r.Add(rl)
return r
}
// Add adds ResourceList into Resource.
func (r *NodeResource) Add(rl corev1.ResourceList) {
if r == nil {
return
}
for rName, rQuant := range rl {
switch rName {
case corev1.ResourceCPU:
r.MilliCPU += rQuant.MilliValue()
case corev1.ResourceMemory:
r.Memory += rQuant.Value()
case corev1.ResourcePods:
r.AllowedPodNumber += int(rQuant.Value())
case corev1.ResourceEphemeralStorage:
r.EphemeralStorage += rQuant.Value()
}
}
}

1
mock.sh Executable file
View File

@ -0,0 +1 @@
mockgen -source=worker/appm/store/store.go -destination worker/appm/store/mock_store.go -package store

View File

@ -339,6 +339,7 @@ func ClusterInfo(w http.ResponseWriter, r *http.Request) {
usedNodeList = append(usedNodeList, nodes[i])
}
}
var healthcpuR int64
var healthmemR int64
var unhealthCPUR int64