// Copyright (C) 2014-2018 Goodrain Co., Ltd. // RAINBOND, Application Management Platform // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. For any non-GPL usage of Rainbond, // one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. // must be obtained first. // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with this program. If not, see . package handler import ( "context" "encoding/json" "fmt" "github.com/goodrain/rainbond/util/constants" "io" "net/http" "os" "strconv" "strings" "time" "github.com/coreos/etcd/clientv3" "github.com/goodrain/rainbond/api/client/prometheus" api_model "github.com/goodrain/rainbond/api/model" "github.com/goodrain/rainbond/api/util" "github.com/goodrain/rainbond/api/util/bcode" "github.com/goodrain/rainbond/api/util/license" "github.com/goodrain/rainbond/builder/parser" "github.com/goodrain/rainbond/cmd/api/option" "github.com/goodrain/rainbond/db" dberr "github.com/goodrain/rainbond/db/errors" dbmodel "github.com/goodrain/rainbond/db/model" "github.com/goodrain/rainbond/event" gclient "github.com/goodrain/rainbond/mq/client" "github.com/goodrain/rainbond/pkg/generated/clientset/versioned" core_util "github.com/goodrain/rainbond/util" "github.com/goodrain/rainbond/worker/client" "github.com/goodrain/rainbond/worker/discover/model" "github.com/goodrain/rainbond/worker/server" "github.com/goodrain/rainbond/worker/server/pb" "github.com/jinzhu/gorm" "github.com/pkg/errors" "github.com/pquerna/ffjson/ffjson" "github.com/sirupsen/logrus" "github.com/twinj/uuid" corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/util/flushwriter" "k8s.io/client-go/kubernetes" ) // ErrServiceNotClosed - var ErrServiceNotClosed = errors.New("Service has not been closed") //ServiceAction service act type ServiceAction struct { MQClient gclient.MQClient EtcdCli *clientv3.Client statusCli *client.AppRuntimeSyncClient prometheusCli prometheus.Interface conf option.Config rainbondClient versioned.Interface kubeClient kubernetes.Interface } type dCfg struct { Type string `json:"type"` Servers []string `json:"servers"` Key string `json:"key"` Username string `json:"username"` Password string `json:"password"` } //CreateManager create Manger func CreateManager(conf option.Config, mqClient gclient.MQClient, etcdCli *clientv3.Client, statusCli *client.AppRuntimeSyncClient, prometheusCli prometheus.Interface, rainbondClient versioned.Interface, kubeClient kubernetes.Interface) *ServiceAction { return &ServiceAction{ MQClient: mqClient, EtcdCli: etcdCli, statusCli: statusCli, conf: conf, prometheusCli: prometheusCli, rainbondClient: rainbondClient, kubeClient: kubeClient, } } //ServiceBuild service build func (s *ServiceAction) ServiceBuild(tenantID, serviceID string, r *api_model.BuildServiceStruct) error { eventID := r.Body.EventID logger := event.GetManager().GetLogger(eventID) defer event.CloseManager() service, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID) db.GetManager().TenantServiceDao().UpdateModel(service) if err != nil { return err } if r.Body.Kind == "" { r.Body.Kind = "source" } switch r.Body.Kind { case "build_from_image": if err := s.buildFromImage(r, service); err != nil { logger.Error("The image build application task failed to send: "+err.Error(), map[string]string{"step": "callback", "status": "failure"}) return err } logger.Info("The mirror build application task successed to send ", map[string]string{"step": "image-service", "status": "starting"}) return nil case "build_from_source_code": if err := s.buildFromSourceCode(r, service); err != nil { logger.Error("The source code build application task failed to send "+err.Error(), map[string]string{"step": "callback", "status": "failure"}) return err } logger.Info("The source code build application task successed to send ", map[string]string{"step": "source-service", "status": "starting"}) return nil case "build_from_market_image": if err := s.buildFromImage(r, service); err != nil { logger.Error("The cloud image build application task failed to send "+err.Error(), map[string]string{"step": "callback", "status": "failure"}) return err } logger.Info("The cloud image build application task successed to send ", map[string]string{"step": "image-service", "status": "starting"}) return nil case "build_from_market_slug": if err := s.buildFromMarketSlug(r, service); err != nil { logger.Error("The cloud slug build application task failed to send "+err.Error(), map[string]string{"step": "callback", "status": "failure"}) return err } logger.Info("The cloud slug build application task successed to send ", map[string]string{"step": "image-service", "status": "starting"}) return nil default: return fmt.Errorf("unexpect kind") } } func (s *ServiceAction) buildFromMarketSlug(r *api_model.BuildServiceStruct, service *dbmodel.TenantServices) error { body := make(map[string]interface{}) if r.Body.Operator == "" { body["operator"] = "define" } else { body["operator"] = r.Body.Operator } body["deploy_version"] = r.Body.DeployVersion body["event_id"] = r.Body.EventID body["action"] = r.Body.Action body["tenant_name"] = r.Body.TenantName body["tenant_id"] = service.TenantID body["service_id"] = service.ServiceID body["service_alias"] = r.Body.ServiceAlias body["slug_info"] = r.Body.SlugInfo topic := gclient.BuilderTopic if s.isWindowsService(service.ServiceID) { topic = gclient.WindowsBuilderTopic } return s.MQClient.SendBuilderTopic(gclient.TaskStruct{ Topic: topic, TaskType: "build_from_market_slug", TaskBody: body, }) } func (s *ServiceAction) buildFromImage(r *api_model.BuildServiceStruct, service *dbmodel.TenantServices) error { dependIds, err := db.GetManager().TenantServiceRelationDao().GetTenantServiceRelations(service.ServiceID) if err != nil { return err } body := make(map[string]interface{}) if r.Body.Operator == "" { body["operator"] = "define" } else { body["operator"] = r.Body.Operator } body["image"] = r.Body.ImageURL body["service_id"] = service.ServiceID body["deploy_version"] = r.Body.DeployVersion body["namespace"] = service.Namespace body["operator"] = r.Body.Operator body["event_id"] = r.Body.EventID body["tenant_name"] = r.Body.TenantName body["service_alias"] = r.Body.ServiceAlias body["action"] = r.Body.Action body["dep_sids"] = dependIds body["code_from"] = "image_manual" if r.Body.User != "" && r.Body.Password != "" { body["user"] = r.Body.User body["password"] = r.Body.Password } topic := gclient.BuilderTopic if s.isWindowsService(service.ServiceID) { topic = gclient.WindowsBuilderTopic } return s.MQClient.SendBuilderTopic(gclient.TaskStruct{ Topic: topic, TaskType: "build_from_image", TaskBody: body, }) } func (s *ServiceAction) buildFromSourceCode(r *api_model.BuildServiceStruct, service *dbmodel.TenantServices) error { logrus.Debugf("build_from_source_code") if r.Body.RepoURL == "" || r.Body.Branch == "" || r.Body.DeployVersion == "" || r.Body.EventID == "" { return fmt.Errorf("args error") } body := make(map[string]interface{}) if r.Body.Operator == "" { body["operator"] = "define" } else { body["operator"] = r.Body.Operator } body["tenant_id"] = service.TenantID body["service_id"] = service.ServiceID body["repo_url"] = r.Body.RepoURL body["action"] = r.Body.Action body["lang"] = r.Body.Lang body["runtime"] = r.Body.Runtime body["deploy_version"] = r.Body.DeployVersion body["event_id"] = r.Body.EventID body["envs"] = r.Body.ENVS body["tenant_name"] = r.Body.TenantName body["branch"] = r.Body.Branch body["server_type"] = r.Body.ServerType body["service_alias"] = r.Body.ServiceAlias if r.Body.User != "" && r.Body.Password != "" { body["user"] = r.Body.User body["password"] = r.Body.Password } body["expire"] = 180 topic := gclient.BuilderTopic if s.isWindowsService(service.ServiceID) { topic = gclient.WindowsBuilderTopic } return s.MQClient.SendBuilderTopic(gclient.TaskStruct{ Topic: topic, TaskType: "build_from_source_code", TaskBody: body, }) } func (s *ServiceAction) isWindowsService(serviceID string) bool { label, err := db.GetManager().TenantServiceLabelDao().GetLabelByNodeSelectorKey(serviceID, "windows") if label == nil || err != nil { return false } return true } //AddLabel add labels func (s *ServiceAction) AddLabel(l *api_model.LabelsStruct, serviceID string) error { tx := db.GetManager().Begin() defer func() { if r := recover(); r != nil { logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r) tx.Rollback() } }() //V5.2: do not support service type label for _, label := range l.Labels { labelModel := dbmodel.TenantServiceLable{ ServiceID: serviceID, LabelKey: label.LabelKey, LabelValue: label.LabelValue, } if err := db.GetManager().TenantServiceLabelDaoTransactions(tx).AddModel(&labelModel); err != nil { tx.Rollback() return err } } if err := tx.Commit().Error; err != nil { tx.Rollback() return err } return nil } //UpdateLabel updates labels func (s *ServiceAction) UpdateLabel(l *api_model.LabelsStruct, serviceID string) error { tx := db.GetManager().Begin() defer func() { if r := recover(); r != nil { logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r) tx.Rollback() } }() for _, label := range l.Labels { // delete old labels err := db.GetManager().TenantServiceLabelDaoTransactions(tx). DelTenantServiceLabelsByServiceIDKey(serviceID, label.LabelKey) if err != nil { logrus.Errorf("error deleting old labels: %v", err) tx.Rollback() return err } // V5.2 do not support service type label // add new labels labelModel := dbmodel.TenantServiceLable{ ServiceID: serviceID, LabelKey: label.LabelKey, LabelValue: label.LabelValue, } if err := db.GetManager().TenantServiceLabelDaoTransactions(tx).AddModel(&labelModel); err != nil { logrus.Errorf("error adding new labels: %v", err) tx.Rollback() return err } } if err := tx.Commit().Error; err != nil { tx.Rollback() return err } return nil } //DeleteLabel deletes label func (s *ServiceAction) DeleteLabel(l *api_model.LabelsStruct, serviceID string) error { tx := db.GetManager().Begin() defer func() { if r := recover(); r != nil { logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r) tx.Rollback() } }() for _, label := range l.Labels { err := db.GetManager().TenantServiceLabelDaoTransactions(tx). DelTenantServiceLabelsByServiceIDKeyValue(serviceID, label.LabelKey, label.LabelValue) if err != nil { logrus.Errorf("error deleting label: %v", err) tx.Rollback() return err } } if err := tx.Commit().Error; err != nil { tx.Rollback() return err } return nil } //StartStopService start service func (s *ServiceAction) StartStopService(sss *api_model.StartStopStruct) error { services, err := db.GetManager().TenantServiceDao().GetServiceByID(sss.ServiceID) if err != nil { logrus.Errorf("get service by id error, %v", err) return err } TaskBody := model.StopTaskBody{ TenantID: sss.TenantID, ServiceID: sss.ServiceID, DeployVersion: services.DeployVersion, EventID: sss.EventID, } err = s.MQClient.SendBuilderTopic(gclient.TaskStruct{ TaskType: sss.TaskType, TaskBody: TaskBody, Topic: gclient.WorkerTopic, }) if err != nil { logrus.Errorf("equque mq error, %v", err) return err } logrus.Debugf("equeue mq startstop task success") return nil } //ServiceVertical vertical service func (s *ServiceAction) ServiceVertical(ctx context.Context, vs *model.VerticalScalingTaskBody) error { service, err := db.GetManager().TenantServiceDao().GetServiceByID(vs.ServiceID) if err != nil { logrus.Errorf("get service by id %s error, %s", vs.ServiceID, err) db.GetManager().ServiceEventDao().SetEventStatus(ctx, dbmodel.EventStatusFailure) return err } oldMemory := service.ContainerMemory oldCPU := service.ContainerCPU oldGPU := service.ContainerGPU var rollback = func() { service.ContainerMemory = oldMemory service.ContainerCPU = oldCPU service.ContainerGPU = oldGPU _ = db.GetManager().TenantServiceDao().UpdateModel(service) } if vs.ContainerCPU != nil { service.ContainerCPU = *vs.ContainerCPU } if vs.ContainerMemory != nil { service.ContainerMemory = *vs.ContainerMemory } if vs.ContainerGPU != nil { service.ContainerGPU = *vs.ContainerGPU } licenseInfo := license.ReadLicense() if licenseInfo == nil || !licenseInfo.HaveFeature("GPU") { service.ContainerGPU = 0 } if service.ContainerMemory == oldMemory && service.ContainerCPU == oldCPU && service.ContainerGPU == oldGPU { db.GetManager().ServiceEventDao().SetEventStatus(ctx, dbmodel.EventStatusSuccess) return nil } err = db.GetManager().TenantServiceDao().UpdateModel(service) if err != nil { db.GetManager().ServiceEventDao().SetEventStatus(ctx, dbmodel.EventStatusFailure) logrus.Errorf("update service memory and cpu failure. %v", err) return fmt.Errorf("vertical service faliure:%s", err.Error()) } err = s.MQClient.SendBuilderTopic(gclient.TaskStruct{ TaskType: "vertical_scaling", TaskBody: vs, Topic: gclient.WorkerTopic, }) if err != nil { // roll back service rollback() logrus.Errorf("equque mq error, %v", err) db.GetManager().ServiceEventDao().SetEventStatus(ctx, dbmodel.EventStatusFailure) return err } logrus.Debugf("equeue mq vertical task success") return nil } //ServiceHorizontal Service Horizontal func (s *ServiceAction) ServiceHorizontal(hs *model.HorizontalScalingTaskBody) error { service, err := db.GetManager().TenantServiceDao().GetServiceByID(hs.ServiceID) if err != nil { logrus.Errorf("get service by id %s error, %s", hs.ServiceID, err) return err } // for rollback database oldReplicas := service.Replicas pods, err := s.statusCli.GetServicePods(service.ServiceID) if err != nil { logrus.Errorf("get service pods error: %v", err) return fmt.Errorf("horizontal service faliure:%s", err.Error()) } if int32(len(pods.NewPods)) == hs.Replicas { return bcode.ErrHorizontalDueToNoChange } service.Replicas = int(hs.Replicas) err = db.GetManager().TenantServiceDao().UpdateModel(service) if err != nil { logrus.Errorf("updtae service replicas failure. %v", err) return fmt.Errorf("horizontal service faliure:%s", err.Error()) } var rollback = func() { service.Replicas = oldReplicas _ = db.GetManager().TenantServiceDao().UpdateModel(service) } err = s.MQClient.SendBuilderTopic(gclient.TaskStruct{ TaskType: "horizontal_scaling", TaskBody: hs, Topic: gclient.WorkerTopic, }) if err != nil { // roll back service rollback() logrus.Errorf("equque mq error, %v", err) return err } // if send task success, return nil logrus.Debugf("enqueue mq horizontal task success") return nil } //ServiceUpgrade service upgrade func (s *ServiceAction) ServiceUpgrade(ru *model.RollingUpgradeTaskBody) error { services, err := db.GetManager().TenantServiceDao().GetServiceByID(ru.ServiceID) if err != nil { logrus.Errorf("get service by id %s error %s", ru.ServiceID, err.Error()) return err } version, err := db.GetManager().VersionInfoDao().GetVersionByDeployVersion(ru.NewDeployVersion, ru.ServiceID) if err != nil { logrus.Errorf("get service version by id %s version %s error, %s", ru.ServiceID, ru.NewDeployVersion, err.Error()) return err } oldDeployVersion := services.DeployVersion var rollback = func() { services.DeployVersion = oldDeployVersion _ = db.GetManager().TenantServiceDao().UpdateModel(services) } if version.FinalStatus != "success" { logrus.Warnf("deploy version %s is not build success,can not change deploy version in this upgrade event", ru.NewDeployVersion) } else { services.DeployVersion = ru.NewDeployVersion err = db.GetManager().TenantServiceDao().UpdateModel(services) if err != nil { logrus.Errorf("update service deploy version error. %v", err) return fmt.Errorf("horizontal service faliure:%s", err.Error()) } } err = s.MQClient.SendBuilderTopic(gclient.TaskStruct{ TaskBody: ru, TaskType: "rolling_upgrade", Topic: gclient.WorkerTopic, }) if err != nil { // roll back service deploy version rollback() logrus.Errorf("equque upgrade message error, %v", err) return err } return nil } //ServiceCreate create service func (s *ServiceAction) ServiceCreate(sc *api_model.ServiceStruct) error { jsonSC, err := ffjson.Marshal(sc) if err != nil { logrus.Errorf("trans service struct to json failed. %v", err) return err } var ts dbmodel.TenantServices if err := ffjson.Unmarshal(jsonSC, &ts); err != nil { logrus.Errorf("trans json to tenant service error, %v", err) return err } if ts.ServiceName == "" { ts.ServiceName = ts.ServiceAlias } if ts.ContainerCPU < 0 { ts.ContainerCPU = 0 } if ts.ContainerMemory < 0 { ts.ContainerMemory = 0 } if ts.ContainerGPU < 0 { ts.ContainerGPU = 0 } if ts.K8sComponentName != "" { if db.GetManager().TenantServiceDao().IsK8sComponentNameDuplicate(ts.AppID, ts.ServiceID, ts.K8sComponentName) { return bcode.ErrK8sComponentNameExists } } ts.UpdateTime = time.Now() var ( ports = sc.PortsInfo envs = sc.EnvsInfo volumns = sc.VolumesInfo dependVolumes = sc.DepVolumesInfo dependIds = sc.DependIDs probes = sc.ComponentProbes monitors = sc.ComponentMonitors httpRules = sc.HTTPRules tcpRules = sc.TCPRules ) ts.AppID = sc.AppID ts.DeployVersion = "" tx := db.GetManager().Begin() defer func() { if r := recover(); r != nil { logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r) tx.Rollback() } }() //create app if err := db.GetManager().TenantServiceDaoTransactions(tx).AddModel(&ts); err != nil { logrus.Errorf("add service error, %v", err) tx.Rollback() return err } //set app envs if len(envs) > 0 { var batchEnvs []*dbmodel.TenantServiceEnvVar for _, env := range envs { env := env env.ServiceID = ts.ServiceID env.TenantID = ts.TenantID batchEnvs = append(batchEnvs, &env) } if err := db.GetManager().TenantServiceEnvVarDaoTransactions(tx).CreateOrUpdateEnvsInBatch(batchEnvs); err != nil { logrus.Errorf("batch add env error, %v", err) tx.Rollback() return err } } //set app port if len(ports) > 0 { var batchPorts []*dbmodel.TenantServicesPort for _, port := range ports { port := port port.ServiceID = ts.ServiceID port.TenantID = ts.TenantID batchPorts = append(batchPorts, &port) } if err := db.GetManager().TenantServicesPortDaoTransactions(tx).CreateOrUpdatePortsInBatch(batchPorts); err != nil { logrus.Errorf("batch add port error, %v", err) tx.Rollback() return err } } //set app volumns if len(volumns) > 0 { localPath := os.Getenv("LOCAL_DATA_PATH") sharePath := os.Getenv("SHARE_DATA_PATH") if localPath == "" { localPath = "/grlocaldata" } if sharePath == "" { sharePath = "/grdata" } for _, volumn := range volumns { v := dbmodel.TenantServiceVolume{ ServiceID: ts.ServiceID, Category: volumn.Category, VolumeType: volumn.VolumeType, VolumeName: volumn.VolumeName, HostPath: volumn.HostPath, VolumePath: volumn.VolumePath, IsReadOnly: volumn.IsReadOnly, VolumeCapacity: volumn.VolumeCapacity, // AccessMode 读写模式(Important! A volume can only be mounted using one access mode at a time, even if it supports many. For example, a GCEPersistentDisk can be mounted as ReadWriteOnce by a single node or ReadOnlyMany by many nodes, but not at the same time. #https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes) AccessMode: volumn.AccessMode, // SharePolicy 共享模式 SharePolicy: volumn.SharePolicy, // BackupPolicy 备份策略 BackupPolicy: volumn.BackupPolicy, // ReclaimPolicy 回收策略 ReclaimPolicy: volumn.ReclaimPolicy, // AllowExpansion 是否支持扩展 AllowExpansion: volumn.AllowExpansion, // VolumeProviderName 使用的存储驱动别名 VolumeProviderName: volumn.VolumeProviderName, } v.ServiceID = ts.ServiceID if volumn.VolumeType == "" { v.VolumeType = dbmodel.ShareFileVolumeType.String() } if volumn.HostPath == "" { //step 1 设置主机目录 switch volumn.VolumeType { //共享文件存储 case dbmodel.ShareFileVolumeType.String(): v.HostPath = fmt.Sprintf("%s/tenant/%s/service/%s%s", sharePath, sc.TenantID, ts.ServiceID, volumn.VolumePath) //本地文件存储 case dbmodel.LocalVolumeType.String(): if !dbmodel.ServiceType(sc.ExtendMethod).IsState() { tx.Rollback() return util.CreateAPIHandleError(400, fmt.Errorf("local volume type only support state component")) } v.HostPath = fmt.Sprintf("%s/tenant/%s/service/%s%s", localPath, sc.TenantID, ts.ServiceID, volumn.VolumePath) case dbmodel.ConfigFileVolumeType.String(), dbmodel.MemoryFSVolumeType.String(): logrus.Debug("simple volume type : ", volumn.VolumeType) default: if !dbmodel.ServiceType(sc.ExtendMethod).IsState() { tx.Rollback() return util.CreateAPIHandleError(400, fmt.Errorf("custom volume type only support state component")) } } } if volumn.VolumeName == "" { v.VolumeName = uuid.NewV4().String() } if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).AddModel(&v); err != nil { logrus.Errorf("add volumn %v error, %v", volumn.HostPath, err) tx.Rollback() return err } if volumn.FileContent != "" { cf := &dbmodel.TenantServiceConfigFile{ ServiceID: sc.ServiceID, VolumeName: volumn.VolumeName, FileContent: volumn.FileContent, } if err := db.GetManager().TenantServiceConfigFileDaoTransactions(tx).AddModel(cf); err != nil { tx.Rollback() return util.CreateAPIHandleErrorFromDBError("error creating config file", err) } } } } //set app dependVolumes if len(dependVolumes) > 0 { for _, depVolume := range dependVolumes { depVolume.ServiceID = ts.ServiceID depVolume.TenantID = ts.TenantID volume, err := db.GetManager().TenantServiceVolumeDao().GetVolumeByServiceIDAndName(depVolume.DependServiceID, depVolume.VolumeName) if err != nil { tx.Rollback() return fmt.Errorf("find volume %s error %s", depVolume.VolumeName, err.Error()) } depVolume.VolumeType = volume.VolumeType depVolume.HostPath = volume.HostPath if err := db.GetManager().TenantServiceMountRelationDaoTransactions(tx).AddModel(&depVolume); err != nil { tx.Rollback() return fmt.Errorf("add dep volume %s error %s", depVolume.VolumeName, err.Error()) } } } //set app depends if len(dependIds) > 0 { for _, id := range dependIds { if err := db.GetManager().TenantServiceRelationDaoTransactions(tx).AddModel(&id); err != nil { logrus.Errorf("add depend_id %v error, %v", id.DependServiceID, err) tx.Rollback() return err } } } //set app label if sc.OSType == "windows" { if err := db.GetManager().TenantServiceLabelDaoTransactions(tx).AddModel(&dbmodel.TenantServiceLable{ ServiceID: ts.ServiceID, LabelKey: dbmodel.LabelKeyNodeSelector, LabelValue: sc.OSType, }); err != nil { logrus.Errorf("add label %s=%s %v error, %v", dbmodel.LabelKeyNodeSelector, sc.OSType, ts.ServiceID, err) tx.Rollback() return err } } // sc.Endpoints can't be nil // sc.Endpoints.Discovery or sc.Endpoints.Static can't be nil if sc.Kind == dbmodel.ServiceKindThirdParty.String() { // TODO: validate request data if sc.Endpoints == nil { tx.Rollback() return fmt.Errorf("endpoints can not be empty for third-party service") } if sc.Endpoints.Kubernetes != nil { c := &dbmodel.ThirdPartySvcDiscoveryCfg{ ServiceID: sc.ServiceID, Type: string(dbmodel.DiscorveryTypeKubernetes), Namespace: sc.Endpoints.Kubernetes.Namespace, ServiceName: sc.Endpoints.Kubernetes.ServiceName, } if err := db.GetManager().ThirdPartySvcDiscoveryCfgDaoTransactions(tx). AddModel(c); err != nil { logrus.Errorf("error saving discover center configuration: %v", err) tx.Rollback() return err } } if sc.Endpoints.Static != nil { for _, o := range sc.Endpoints.Static { ep := &dbmodel.Endpoint{ ServiceID: sc.ServiceID, UUID: core_util.NewUUID(), } address := o port := 0 prefix := "" if strings.HasPrefix(address, "https://") { address = strings.Split(address, "https://")[1] prefix = "https://" } if strings.HasPrefix(address, "http://") { address = strings.Split(address, "http://")[1] prefix = "http://" } if strings.Contains(address, ":") { addressL := strings.Split(address, ":") address = addressL[0] port, _ = strconv.Atoi(addressL[1]) } ep.IP = prefix + address ep.Port = port logrus.Debugf("add new endpoint: %v", ep) if err := db.GetManager().EndpointsDaoTransactions(tx).AddModel(ep); err != nil { tx.Rollback() logrus.Errorf("error saving o endpoint: %v", err) return err } } } } if len(probes) > 0 { for _, pb := range probes { probe := s.convertProbeModel(&pb, ts.ServiceID) if err := db.GetManager().ServiceProbeDaoTransactions(tx).AddModel(probe); err != nil { logrus.Errorf("add probe %v error, %v", probe.ProbeID, err) tx.Rollback() return err } } } if len(monitors) > 0 { for _, m := range monitors { monitor := dbmodel.TenantServiceMonitor{ Name: m.Name, TenantID: ts.TenantID, ServiceID: ts.ServiceID, ServiceShowName: m.ServiceShowName, Port: m.Port, Path: m.Path, Interval: m.Interval, } if err := db.GetManager().TenantServiceMonitorDaoTransactions(tx).AddModel(&monitor); err != nil { logrus.Errorf("add monitor %v error, %v", monitor.Name, err) tx.Rollback() return err } } } if len(httpRules) > 0 { for _, httpRule := range httpRules { if err := GetGatewayHandler().CreateHTTPRule(tx, &httpRule); err != nil { logrus.Errorf("add service http rule error %v", err) tx.Rollback() return err } } } if len(tcpRules) > 0 { for _, tcpRule := range tcpRules { if GetGatewayHandler().TCPIPPortExists(tcpRule.IP, tcpRule.Port) { logrus.Debugf("tcp rule %v:%v exists", tcpRule.IP, tcpRule.Port) continue } if err := GetGatewayHandler().CreateTCPRule(tx, &tcpRule); err != nil { logrus.Errorf("add service tcp rule error %v", err) tx.Rollback() return err } } } labelModel := dbmodel.TenantServiceLable{ ServiceID: ts.ServiceID, LabelKey: dbmodel.LabelKeyServiceType, LabelValue: core_util.StatelessServiceType, } if ts.IsState() { labelModel.LabelValue = core_util.StatefulServiceType } if err := db.GetManager().TenantServiceLabelDaoTransactions(tx).AddModel(&labelModel); err != nil { tx.Rollback() return err } // TODO: create default probe for third-party service. if err := tx.Commit().Error; err != nil { tx.Rollback() return err } logrus.Debugf("create a new app %s success", ts.ServiceAlias) return nil } func (s *ServiceAction) convertProbeModel(req *api_model.ServiceProbe, serviceID string) *dbmodel.TenantServiceProbe { return &dbmodel.TenantServiceProbe{ ServiceID: serviceID, Cmd: req.Cmd, FailureThreshold: req.FailureThreshold, HTTPHeader: req.HTTPHeader, InitialDelaySecond: req.InitialDelaySecond, IsUsed: &req.IsUsed, Mode: req.Mode, Path: req.Path, PeriodSecond: req.PeriodSecond, Port: req.Port, ProbeID: req.ProbeID, Scheme: req.Scheme, SuccessThreshold: req.SuccessThreshold, TimeoutSecond: req.TimeoutSecond, FailureAction: req.FailureAction, } } //ServiceUpdate update service func (s *ServiceAction) ServiceUpdate(sc map[string]interface{}) error { ts, err := db.GetManager().TenantServiceDao().GetServiceByID(sc["service_id"].(string)) if err != nil { return err } if memory, ok := sc["container_memory"].(int); ok && memory >= 0 { ts.ContainerMemory = memory } if cpu, ok := sc["container_cpu"].(int); ok && cpu >= 0 { ts.ContainerCPU = cpu } if gpu, ok := sc["container_gpu"].(int); ok { ts.ContainerCPU = gpu } if name, ok := sc["service_name"].(string); ok && name != "" { ts.ServiceName = name } if appID, ok := sc["app_id"].(string); ok && appID != "" { ts.AppID = appID } if k8sComponentName, ok := sc["k8s_component_name"].(string); ok && k8sComponentName != "" { if db.GetManager().TenantServiceDao().IsK8sComponentNameDuplicate(ts.AppID, ts.ServiceID, k8sComponentName) { return bcode.ErrK8sComponentNameExists } ts.K8sComponentName = k8sComponentName } if sc["extend_method"] != nil { extendMethod := sc["extend_method"].(string) ts.ExtendMethod = extendMethod // if component replicas is more than 1, so can't change service type to singleton if ts.Replicas > 1 && ts.IsSingleton() { err := fmt.Errorf("service[%s] replicas > 1, can't change service typ to stateless_singleton", ts.ServiceAlias) return err } volumes, err := db.GetManager().TenantServiceVolumeDao().GetTenantServiceVolumesByServiceID(ts.ServiceID) if err != nil { return err } for _, vo := range volumes { if vo.VolumeType == dbmodel.ShareFileVolumeType.String() || vo.VolumeType == dbmodel.MemoryFSVolumeType.String() { continue } if vo.VolumeType == dbmodel.LocalVolumeType.String() && !ts.IsState() { err := fmt.Errorf("service[%s] has local volume type, can't change type to stateless", ts.ServiceAlias) return err } // if component use volume, what it accessMode is rwo, can't change volume type to stateless if vo.AccessMode == "RWO" && !ts.IsState() { err := fmt.Errorf("service[%s] volume[%s] access_mode is RWO, can't change type to stateless", ts.ServiceAlias, vo.VolumeName) return err } } ts.ExtendMethod = extendMethod ts.ServiceType = extendMethod } if js, ok := sc["job_strategy"].(string); ok { ts.JobStrategy = js } //update component if err := db.GetManager().TenantServiceDao().UpdateModel(ts); err != nil { logrus.Errorf("update service error, %v", err) return err } return nil } //LanguageSet language set func (s *ServiceAction) LanguageSet(langS *api_model.LanguageSet) error { logrus.Debugf("service id is %s, language is %s", langS.ServiceID, langS.Language) services, err := db.GetManager().TenantServiceDao().GetServiceByID(langS.ServiceID) if err != nil { logrus.Errorf("get service by id error, %v, %v", services, err) return err } if langS.Language == "java" { services.ContainerMemory = 512 if err := db.GetManager().TenantServiceDao().UpdateModel(services); err != nil { logrus.Errorf("update tenant service error %v", err) return err } } return nil } //GetService get service(s) func (s *ServiceAction) GetService(tenantID string) ([]*dbmodel.TenantServices, error) { services, err := db.GetManager().TenantServiceDao().GetServicesAllInfoByTenantID(tenantID) if err != nil { logrus.Errorf("get service by id error, %v, %v", services, err) return nil, err } var serviceIDs []string for _, s := range services { serviceIDs = append(serviceIDs, s.ServiceID) } status := s.statusCli.GetStatuss(strings.Join(serviceIDs, ",")) for _, s := range services { if status, ok := status[s.ServiceID]; ok { s.CurStatus = status } } return services, nil } //GetServicesByAppID get service(s) by appID func (s *ServiceAction) GetServicesByAppID(appID string, page, pageSize int) (*api_model.ListServiceResponse, error) { var resp api_model.ListServiceResponse services, total, err := db.GetManager().TenantServiceDao().GetServicesInfoByAppID(appID, page, pageSize) if err != nil { logrus.Errorf("get service by application id error, %v, %v", services, err) return nil, err } var serviceIDs []string for _, s := range services { serviceIDs = append(serviceIDs, s.ServiceID) } status := s.statusCli.GetStatuss(strings.Join(serviceIDs, ",")) for _, s := range services { if status, ok := status[s.ServiceID]; ok { s.CurStatus = status } } if services != nil { resp.Services = services } else { resp.Services = make([]*dbmodel.TenantServices, 0) } resp.Page = page resp.Total = total resp.PageSize = pageSize return &resp, nil } //GetPagedTenantRes get pagedTenantServiceRes(s) func (s *ServiceAction) GetPagedTenantRes(offset, len int) ([]*api_model.TenantResource, int, error) { allstatus := s.statusCli.GetAllStatus() var serviceIDs []string for k, v := range allstatus { if !s.statusCli.IsClosedStatus(v) { serviceIDs = append(serviceIDs, k) } } services, count, err := db.GetManager().TenantServiceDao().GetPagedTenantService(offset, len, serviceIDs) if err != nil { logrus.Errorf("get service by id error, %v, %v", services, err) return nil, count, err } var result []*api_model.TenantResource for _, v := range services { var res api_model.TenantResource res.UUID, _ = v["tenant"].(string) res.Name, _ = v["tenant_name"].(string) res.EID, _ = v["eid"].(string) res.AllocatedCPU, _ = v["capcpu"].(int) res.AllocatedMEM, _ = v["capmem"].(int) res.UsedCPU, _ = v["usecpu"].(int) res.UsedMEM, _ = v["usemem"].(int) result = append(result, &res) } return result, count, nil } //GetTenantRes get pagedTenantServiceRes(s) func (s *ServiceAction) GetTenantRes(uuid string) (*api_model.TenantResource, error) { if logrus.IsLevelEnabled(logrus.DebugLevel) { defer core_util.Elapsed("[ServiceAction] get tenant resource")() } tenant, err := db.GetManager().TenantDao().GetTenantByUUID(uuid) if err != nil { logrus.Errorf("get tenant %s info failure %v", uuid, err.Error()) return nil, err } services, err := db.GetManager().TenantServiceDao().GetServicesByTenantID(uuid) if err != nil { logrus.Errorf("get service by id error, %v, %v", services, err.Error()) return nil, err } var serviceIDs string var AllocatedCPU, AllocatedMEM int for _, ser := range services { if serviceIDs == "" { serviceIDs += ser.ServiceID } else { serviceIDs += "," + ser.ServiceID } AllocatedCPU += ser.ContainerCPU * ser.Replicas AllocatedMEM += ser.ContainerMemory * ser.Replicas } tenantResUesd, err := s.statusCli.GetTenantResource(uuid) if err != nil { logrus.Errorf("get tenant %s resource failure %s", uuid, err.Error()) } disks := GetServicesDiskDeprecated(strings.Split(serviceIDs, ","), s.prometheusCli) var value float64 for _, v := range disks { value += v } var res api_model.TenantResource res.UUID = uuid res.Name = tenant.Name res.EID = tenant.EID res.AllocatedCPU = AllocatedCPU res.AllocatedMEM = AllocatedMEM if tenantResUesd != nil { res.UsedCPU = int(tenantResUesd.CpuRequest) res.UsedMEM = int(tenantResUesd.MemoryRequest) } res.UsedDisk = value return &res, nil } // // GetTenantMemoryCPU get pagedTenantServiceRes(s) // func (s *ServiceAction) GetAllocableResources(tenantID string) (*api_model.TenantResource, error) { // if logrus.IsLevelEnabled(logrus.DebugLevel) { // defer core_util.Elapsed("[ServiceAction] get allocable resources")() // } // tenant, err := db.GetManager().TenantDao().GetTenantByUUID(tenantID) // if err != nil { // return nil, err // } // services, err := db.GetManager().TenantServiceDao().GetServicesByTenantID(tenantID) // if err != nil { // logrus.Errorf("get service by id error, %v, %v", services, err.Error()) // return nil, err // } // var serviceIDs string // var allocatedCPU, allocatedMEM int // for _, svc := range services { // allocatedCPU += svc.ContainerCPU * svc.Replicas // allocatedMEM += svc.ContainerMemory * svc.Replicas // } // usedResource, err := s.statusCli.GetTenantResource(tenantID) // if err != nil { // return nil, err // } // return &res, nil // } //GetServicesDiskDeprecated get service disk // // Deprecated func GetServicesDiskDeprecated(ids []string, prometheusCli prometheus.Interface) map[string]float64 { if logrus.IsLevelEnabled(logrus.DebugLevel) { defer core_util.Elapsed("[GetServicesDiskDeprecated] get tenant resource")() } result := make(map[string]float64) //query disk used in prometheus query := fmt.Sprintf(`max(app_resource_appfs{service_id=~"%s"}) by(service_id)`, strings.Join(ids, "|")) metric := prometheusCli.GetMetric(query, time.Now()) for _, re := range metric.MetricData.MetricValues { var serviceID = re.Metadata["service_id"] if re.Sample != nil { result[serviceID] = re.Sample.Value() } } return result } //CodeCheck code check func (s *ServiceAction) CodeCheck(c *api_model.CheckCodeStruct) error { err := s.MQClient.SendBuilderTopic(gclient.TaskStruct{ TaskType: "code_check", TaskBody: c.Body, Topic: gclient.BuilderTopic, }) if err != nil { logrus.Errorf("equque mq error, %v", err) return err } return nil } //ServiceDepend service depend func (s *ServiceAction) ServiceDepend(action string, ds *api_model.DependService) error { switch action { case "add": tsr := &dbmodel.TenantServiceRelation{ TenantID: ds.TenantID, ServiceID: ds.ServiceID, DependServiceID: ds.DepServiceID, DependServiceType: ds.DepServiceType, DependOrder: 1, } if err := db.GetManager().TenantServiceRelationDao().AddModel(tsr); err != nil { logrus.Errorf("add depend error, %v", err) if err == dberr.ErrRecordAlreadyExist { return nil } return err } case "delete": logrus.Debugf("serviceid is %v, depid is %v", ds.ServiceID, ds.DepServiceID) if err := db.GetManager().TenantServiceRelationDao().DeleteRelationByDepID(ds.ServiceID, ds.DepServiceID); err != nil { logrus.Errorf("delete depend error, %v", err) return err } } return nil } //EnvAttr env attr func (s *ServiceAction) EnvAttr(action string, at *dbmodel.TenantServiceEnvVar) error { switch action { case "add": if err := db.GetManager().TenantServiceEnvVarDao().AddModel(at); err != nil { logrus.Errorf("add env %v error, %v", at.AttrName, err) return err } case "delete": if err := db.GetManager().TenantServiceEnvVarDao().DeleteModel(at.ServiceID, at.AttrName); err != nil { logrus.Errorf("delete env %v error, %v", at.AttrName, err) return err } case "update": if err := db.GetManager().TenantServiceEnvVarDao().UpdateModel(at); err != nil { logrus.Errorf("update env %v error,%v", at.AttrName, err) return err } } return nil } // CreatePorts - func (s *ServiceAction) CreatePorts(tenantID, serviceID string, vps *api_model.ServicePorts) error { tx := db.GetManager().Begin() defer func() { if r := recover(); r != nil { logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r) tx.Rollback() } }() for _, vp := range vps.Port { // make sure K8sServiceName is unique if vp.K8sServiceName != "" { port, err := db.GetManager().TenantServicesPortDao().GetByTenantAndName(tenantID, vp.K8sServiceName) if err != nil && err != gorm.ErrRecordNotFound { tx.Rollback() return err } if port != nil { tx.Rollback() return bcode.ErrK8sServiceNameExists } } var vpD dbmodel.TenantServicesPort vpD.ServiceID = serviceID vpD.TenantID = tenantID vpD.IsInnerService = &vp.IsInnerService vpD.IsOuterService = &vp.IsOuterService vpD.ContainerPort = vp.ContainerPort vpD.MappingPort = vp.MappingPort vpD.Protocol = vp.Protocol vpD.PortAlias = vp.PortAlias vpD.K8sServiceName = vp.K8sServiceName if err := db.GetManager().TenantServicesPortDaoTransactions(tx).AddModel(&vpD); err != nil { tx.Rollback() return err } } if err := tx.Commit().Error; err != nil { tx.Rollback() return err } return nil } func (s *ServiceAction) deletePorts(componentID string, ports *api_model.ServicePorts) error { return db.GetManager().DB().Transaction(func(tx *gorm.DB) error { for _, port := range ports.Port { if err := db.GetManager().TenantServicesPortDaoTransactions(tx).DeleteModel(componentID, port.ContainerPort); err != nil { return err } // delete related ingress rules if err := GetGatewayHandler().DeleteIngressRulesByComponentPort(tx, componentID, port.ContainerPort); err != nil { return err } } return nil }) } // SyncComponentPorts - func (s *ServiceAction) SyncComponentPorts(tx *gorm.DB, app *dbmodel.Application, components []*api_model.Component) error { var ( componentIDs []string ports []*dbmodel.TenantServicesPort ) for _, component := range components { if component.Ports == nil { continue } componentIDs = append(componentIDs, component.ComponentBase.ComponentID) for _, port := range component.Ports { ports = append(ports, port.DbModel(app.TenantID, component.ComponentBase.ComponentID)) } } if err := db.GetManager().TenantServicesPortDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil { return err } return db.GetManager().TenantServicesPortDaoTransactions(tx).CreateOrUpdatePortsInBatch(ports) } //PortVar port var func (s *ServiceAction) PortVar(action, tenantID, serviceID string, vps *api_model.ServicePorts, oldPort int) error { crt, err := db.GetManager().TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID( serviceID, dbmodel.InBoundNetPlugin, ) if err != nil { return err } switch action { case "delete": return s.deletePorts(serviceID, vps) case "update": tx := db.GetManager().Begin() defer func() { if r := recover(); r != nil { logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r) tx.Rollback() } }() for _, vp := range vps.Port { //port更新单个请求 if oldPort == 0 { oldPort = vp.ContainerPort } vpD, err := db.GetManager().TenantServicesPortDao().GetPort(serviceID, oldPort) if err != nil { tx.Rollback() return err } // make sure K8sServiceName is unique if vp.K8sServiceName != "" { port, err := db.GetManager().TenantServicesPortDao().GetByTenantAndName(tenantID, vp.K8sServiceName) if err != nil && err != gorm.ErrRecordNotFound { tx.Rollback() return err } if port != nil && vpD.K8sServiceName != vp.K8sServiceName { tx.Rollback() return bcode.ErrK8sServiceNameExists } } vpD.ServiceID = serviceID vpD.TenantID = tenantID vpD.IsInnerService = &vp.IsInnerService vpD.IsOuterService = &vp.IsOuterService vpD.ContainerPort = vp.ContainerPort vpD.MappingPort = vp.MappingPort vpD.Protocol = vp.Protocol vpD.PortAlias = vp.PortAlias vpD.K8sServiceName = vp.K8sServiceName if err := db.GetManager().TenantServicesPortDaoTransactions(tx).UpdateModel(vpD); err != nil { logrus.Errorf("update port var error, %v", err) tx.Rollback() return err } if crt { pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort( serviceID, dbmodel.InBoundNetPlugin, oldPort, ) goon := true if err != nil { if strings.Contains(err.Error(), "record not found") { goon = false } else { logrus.Errorf("get plugin mapping port error:(%s)", err) tx.Rollback() return err } } if goon { pluginPort.ContainerPort = vp.ContainerPort if err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).UpdateModel(pluginPort); err != nil { logrus.Errorf("update plugin mapping port error:(%s)", err) tx.Rollback() return err } } } } if err := tx.Commit().Error; err != nil { tx.Rollback() logrus.Debugf("commit update port error, %v", err) return err } } return nil } //PortOuter 端口对外服务操作 func (s *ServiceAction) PortOuter(tenantName, serviceID string, containerPort int, servicePort *api_model.ServicePortInnerOrOuter) (*dbmodel.TenantServiceLBMappingPort, string, error) { p, err := db.GetManager().TenantServicesPortDao().GetPort(serviceID, containerPort) if err != nil { return nil, "", fmt.Errorf("find service port error:%s", err.Error()) } _, err = db.GetManager().TenantServiceDao().GetServiceByID(serviceID) if err != nil { return nil, "", fmt.Errorf("find service error:%s", err.Error()) } hasUpStream, err := db.GetManager().TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID( serviceID, dbmodel.InBoundNetPlugin, ) if err != nil { return nil, "", fmt.Errorf("get plugin relations error: %s", err.Error()) } //if stream 创建vs端口 vsPort := &dbmodel.TenantServiceLBMappingPort{} switch servicePort.Body.Operation { case "close": if *p.IsOuterService { //如果端口已经开了对外 falsev := false p.IsOuterService = &falsev tx := db.GetManager().Begin() defer func() { if r := recover(); r != nil { logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r) tx.Rollback() } }() if err = db.GetManager().TenantServicesPortDaoTransactions(tx).UpdateModel(p); err != nil { tx.Rollback() return nil, "", err } if hasUpStream { pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort( serviceID, dbmodel.InBoundNetPlugin, containerPort, ) if err != nil { if err.Error() == gorm.ErrRecordNotFound.Error() { logrus.Debugf("outer, plugin port (%d) is not exist, do not need delete", containerPort) goto OUTERCLOSEPASS } tx.Rollback() return nil, "", fmt.Errorf("outer, get plugin mapping port error:(%s)", err) } if *p.IsInnerService { //发现内网未关闭则不删除该映射 logrus.Debugf("outer, close outer, but plugin inner port (%d) is exist, do not need delete", containerPort) goto OUTERCLOSEPASS } if err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).DeletePluginMappingPortByContainerPort( serviceID, dbmodel.InBoundNetPlugin, containerPort, ); err != nil { tx.Rollback() return nil, "", fmt.Errorf("outer, delete plugin mapping port %d error:(%s)", containerPort, err) } logrus.Debugf(fmt.Sprintf("outer, delete plugin port %d->%d", containerPort, pluginPort.PluginPort)) OUTERCLOSEPASS: } if err := tx.Commit().Error; err != nil { tx.Rollback() return nil, "", err } } else { return nil, "", nil } case "open": truev := true p.IsOuterService = &truev tx := db.GetManager().Begin() defer func() { if r := recover(); r != nil { logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r) tx.Rollback() } }() if err = db.GetManager().TenantServicesPortDaoTransactions(tx).UpdateModel(p); err != nil { tx.Rollback() return nil, "", err } if hasUpStream { pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort( serviceID, dbmodel.InBoundNetPlugin, containerPort, ) var pPort int if err != nil { if err.Error() == gorm.ErrRecordNotFound.Error() { ppPort, err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).SetPluginMappingPort( p.TenantID, serviceID, dbmodel.InBoundNetPlugin, containerPort, ) if err != nil { tx.Rollback() logrus.Errorf("outer, set plugin mapping port error:(%s)", err) return nil, "", fmt.Errorf("outer, set plugin mapping port error:(%s)", err) } pPort = ppPort goto OUTEROPENPASS } tx.Rollback() return nil, "", fmt.Errorf("outer, in setting plugin mapping port, get plugin mapping port error:(%s)", err) } logrus.Debugf("outer, plugin mapping port is already exist, %d->%d", pluginPort.ContainerPort, pluginPort.PluginPort) OUTEROPENPASS: logrus.Debugf("outer, set plugin mapping port %d->%d", containerPort, pPort) } if err := tx.Commit().Error; err != nil { tx.Rollback() return nil, "", err } } return vsPort, p.Protocol, nil } //PortInner 端口对内服务操作 //TODO: send task to worker func (s *ServiceAction) PortInner(tenantName, serviceID, operation string, port int) error { p, err := db.GetManager().TenantServicesPortDao().GetPort(serviceID, port) if err != nil { return err } _, err = db.GetManager().TenantServiceDao().GetServiceByID(serviceID) if err != nil { return fmt.Errorf("get service error:%s", err.Error()) } hasUpStream, err := db.GetManager().TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID( serviceID, dbmodel.InBoundNetPlugin, ) if err != nil { return fmt.Errorf("get plugin relations error: %s", err.Error()) } tx := db.GetManager().Begin() defer func() { if r := recover(); r != nil { logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r) tx.Rollback() } }() switch operation { case "close": if *p.IsInnerService { //如果端口已经开了对内 falsev := false p.IsInnerService = &falsev if err = db.GetManager().TenantServicesPortDaoTransactions(tx).UpdateModel(p); err != nil { tx.Rollback() return fmt.Errorf("update service port error: %s", err.Error()) } if hasUpStream { pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort( serviceID, dbmodel.InBoundNetPlugin, port, ) if err != nil { if err.Error() == gorm.ErrRecordNotFound.Error() { logrus.Debugf("inner, plugin port (%d) is not exist, do not need delete", port) goto INNERCLOSEPASS } tx.Rollback() return fmt.Errorf("inner, get plugin mapping port error:(%s)", err) } if *p.IsOuterService { logrus.Debugf("inner, close inner, but plugin outerport (%d) is exist, do not need delete", port) goto INNERCLOSEPASS } if err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).DeletePluginMappingPortByContainerPort( serviceID, dbmodel.InBoundNetPlugin, port, ); err != nil { tx.Rollback() return fmt.Errorf("inner, delete plugin mapping port %d error:(%s)", port, err) } logrus.Debugf(fmt.Sprintf("inner, delete plugin port %d->%d", port, pluginPort.PluginPort)) INNERCLOSEPASS: } } else { tx.Rollback() return fmt.Errorf("already close") } case "open": if *p.IsInnerService { tx.Rollback() return fmt.Errorf("already open") } truv := true p.IsInnerService = &truv if err = db.GetManager().TenantServicesPortDaoTransactions(tx).UpdateModel(p); err != nil { tx.Rollback() return err } if hasUpStream { pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort( serviceID, dbmodel.InBoundNetPlugin, port, ) var pPort int if err != nil { if err.Error() == gorm.ErrRecordNotFound.Error() { ppPort, err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).SetPluginMappingPort( p.TenantID, serviceID, dbmodel.InBoundNetPlugin, port, ) if err != nil { tx.Rollback() logrus.Errorf("inner, set plugin mapping port error:(%s)", err) return fmt.Errorf("inner, set plugin mapping port error:(%s)", err) } pPort = ppPort goto INNEROPENPASS } tx.Rollback() return fmt.Errorf("inner, in setting plugin mapping port, get plugin mapping port error:(%s)", err) } logrus.Debugf("inner, plugin mapping port is already exist, %d->%d", pluginPort.ContainerPort, pluginPort.PluginPort) INNEROPENPASS: logrus.Debugf("inner, set plugin mapping port %d->%d", port, pPort) } } if err := tx.Commit().Error; err != nil { tx.Rollback() return err } return nil } //VolumnVar var volumn func (s *ServiceAction) VolumnVar(tsv *dbmodel.TenantServiceVolume, tenantID, fileContent, action string) *util.APIHandleError { localPath := os.Getenv("LOCAL_DATA_PATH") sharePath := os.Getenv("SHARE_DATA_PATH") if localPath == "" { localPath = "/grlocaldata" } if sharePath == "" { sharePath = "/grdata" } switch action { case "add": if tsv.HostPath == "" { //step 1 设置主机目录 switch tsv.VolumeType { //共享文件存储 case dbmodel.ShareFileVolumeType.String(): tsv.HostPath = fmt.Sprintf("%s/tenant/%s/service/%s%s", sharePath, tenantID, tsv.ServiceID, tsv.VolumePath) //本地文件存储 case dbmodel.LocalVolumeType.String(): serviceInfo, err := db.GetManager().TenantServiceDao().GetServiceTypeByID(tsv.ServiceID) if err != nil { return util.CreateAPIHandleErrorFromDBError("service type", err) } // local volume just only support state component if serviceInfo == nil || !serviceInfo.IsState() { return util.CreateAPIHandleError(400, fmt.Errorf("应用类型为'无状态'.不支持本地存储")) } tsv.HostPath = fmt.Sprintf("%s/tenant/%s/service/%s%s", localPath, tenantID, tsv.ServiceID, tsv.VolumePath) } } util.SetVolumeDefaultValue(tsv) // begin transaction tx := db.GetManager().Begin() defer func() { if r := recover(); r != nil { logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r) tx.Rollback() } }() if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).AddModel(tsv); err != nil { tx.Rollback() return util.CreateAPIHandleErrorFromDBError("add volume", err) } if fileContent != "" { cf := &dbmodel.TenantServiceConfigFile{ ServiceID: tsv.ServiceID, VolumeName: tsv.VolumeName, FileContent: fileContent, } if err := db.GetManager().TenantServiceConfigFileDaoTransactions(tx).AddModel(cf); err != nil { tx.Rollback() return util.CreateAPIHandleErrorFromDBError("error creating config file", err) } } // end transaction if err := tx.Commit().Error; err != nil { tx.Rollback() return util.CreateAPIHandleErrorFromDBError("error ending transaction", err) } case "delete": // begin transaction tx := db.GetManager().Begin() defer func() { if r := recover(); r != nil { logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r) tx.Rollback() } }() if tsv.VolumeName != "" { volume, err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).GetVolumeByServiceIDAndName(tsv.ServiceID, tsv.VolumeName) if err != nil { tx.Rollback() return util.CreateAPIHandleErrorFromDBError("find volume", err) } if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteModel(tsv.ServiceID, tsv.VolumeName); err != nil && err.Error() != gorm.ErrRecordNotFound.Error() { tx.Rollback() return util.CreateAPIHandleErrorFromDBError("delete volume", err) } err = s.MQClient.SendBuilderTopic(gclient.TaskStruct{ Topic: gclient.WorkerTopic, TaskType: "volume_gc", TaskBody: map[string]interface{}{ "tenant_id": tenantID, "service_id": volume.ServiceID, "volume_id": volume.ID, "volume_path": volume.VolumePath, }, }) if err != nil { logrus.Errorf("send 'volume_gc' task: %v", err) tx.Rollback() return util.CreateAPIHandleErrorFromDBError("send 'volume_gc' task", err) } } else { if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteByServiceIDAndVolumePath(tsv.ServiceID, tsv.VolumePath); err != nil && err.Error() != gorm.ErrRecordNotFound.Error() { tx.Rollback() return util.CreateAPIHandleErrorFromDBError("delete volume", err) } } if err := db.GetManager().TenantServiceConfigFileDaoTransactions(tx).DelByVolumeID(tsv.ServiceID, tsv.VolumeName); err != nil { tx.Rollback() return util.CreateAPIHandleErrorFromDBError("error deleting config files", err) } // end transaction if err := tx.Commit().Error; err != nil { tx.Rollback() return util.CreateAPIHandleErrorFromDBError("error ending transaction", err) } } return nil } // UpdVolume updates service volume. func (s *ServiceAction) UpdVolume(sid string, req *api_model.UpdVolumeReq) error { tx := db.GetManager().Begin() defer func() { if r := recover(); r != nil { logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r) tx.Rollback() } }() v, err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).GetVolumeByServiceIDAndName(sid, req.VolumeName) if err != nil { tx.Rollback() return err } v.VolumePath = req.VolumePath v.Mode = req.Mode if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).UpdateModel(v); err != nil { tx.Rollback() return err } if req.VolumeType == "config-file" { configfile, err := db.GetManager().TenantServiceConfigFileDaoTransactions(tx).GetByVolumeName(sid, req.VolumeName) if err != nil { tx.Rollback() return err } configfile.FileContent = req.FileContent if err := db.GetManager().TenantServiceConfigFileDaoTransactions(tx).UpdateModel(configfile); err != nil { tx.Rollback() return err } } tx.Commit() return nil } //GetVolumes 获取应用全部存储 func (s *ServiceAction) GetVolumes(serviceID string) ([]*api_model.VolumeWithStatusStruct, *util.APIHandleError) { volumeWithStatusList := make([]*api_model.VolumeWithStatusStruct, 0) vs, err := db.GetManager().TenantServiceVolumeDao().GetTenantServiceVolumesByServiceID(serviceID) if err != nil && err.Error() != gorm.ErrRecordNotFound.Error() { return nil, util.CreateAPIHandleErrorFromDBError("get volumes", err) } volumeStatusList, err := s.statusCli.GetAppVolumeStatus(serviceID) if err != nil { logrus.Warnf("get volume status error: %s", err.Error()) } volumeStatus := make(map[string]pb.ServiceVolumeStatus) if volumeStatusList != nil && volumeStatusList.GetStatus() != nil { volumeStatus = volumeStatusList.GetStatus() } isMountedShareVolume := false mountStatus := pb.ServiceVolumeStatus_NOT_READY.String() for _, volume := range vs { vws := &api_model.VolumeWithStatusStruct{ ServiceID: volume.ServiceID, Category: volume.Category, VolumeType: volume.VolumeType, VolumeName: volume.VolumeName, HostPath: volume.HostPath, VolumePath: volume.VolumePath, IsReadOnly: volume.IsReadOnly, VolumeCapacity: volume.VolumeCapacity, AccessMode: volume.AccessMode, SharePolicy: volume.SharePolicy, BackupPolicy: volume.BackupPolicy, ReclaimPolicy: volume.ReclaimPolicy, AllowExpansion: volume.AllowExpansion, VolumeProviderName: volume.VolumeProviderName, } volumeID := strconv.FormatInt(int64(volume.ID), 10) if phrase, ok := volumeStatus[volumeID]; ok { vws.Status = phrase.String() if os.Getenv("ENABLE_SUBPATH") == "true" && vws.VolumeType == "share-file" && strings.HasPrefix(vws.HostPath, "/grdata") { isMountedShareVolume = true mountStatus = vws.Status } } else { vws.Status = pb.ServiceVolumeStatus_NOT_READY.String() if isMountedShareVolume && strings.HasPrefix(vws.HostPath, "/grdata") { vws.Status = mountStatus } } volumeWithStatusList = append(volumeWithStatusList, vws) } return volumeWithStatusList, nil } //VolumeDependency VolumeDependency func (s *ServiceAction) VolumeDependency(tsr *dbmodel.TenantServiceMountRelation, action string) *util.APIHandleError { switch action { case "add": if tsr.VolumeName != "" { vm, err := db.GetManager().TenantServiceVolumeDao().GetVolumeByServiceIDAndName(tsr.DependServiceID, tsr.VolumeName) if err != nil { return util.CreateAPIHandleErrorFromDBError("get volume", err) } tsr.HostPath = vm.HostPath if err := db.GetManager().TenantServiceMountRelationDao().AddModel(tsr); err != nil { return util.CreateAPIHandleErrorFromDBError("add volume mount relation", err) } } else { if tsr.HostPath == "" { return util.CreateAPIHandleError(400, fmt.Errorf("host path can not be empty when create volume dependency in api v2")) } if err := db.GetManager().TenantServiceMountRelationDao().AddModel(tsr); err != nil { return util.CreateAPIHandleErrorFromDBError("add volume mount relation", err) } } case "delete": if tsr.VolumeName != "" { if err := db.GetManager().TenantServiceMountRelationDao().DElTenantServiceMountRelationByServiceAndName(tsr.ServiceID, tsr.VolumeName); err != nil { return util.CreateAPIHandleErrorFromDBError("delete mount relation", err) } } else { if err := db.GetManager().TenantServiceMountRelationDao().DElTenantServiceMountRelationByDepService(tsr.ServiceID, tsr.DependServiceID); err != nil { return util.CreateAPIHandleErrorFromDBError("delete mount relation", err) } } } return nil } //GetDepVolumes 获取依赖存储 func (s *ServiceAction) GetDepVolumes(serviceID string) ([]*dbmodel.TenantServiceMountRelation, *util.APIHandleError) { dbManager := db.GetManager() mounts, err := dbManager.TenantServiceMountRelationDao().GetTenantServiceMountRelationsByService(serviceID) if err != nil { return nil, util.CreateAPIHandleErrorFromDBError("get dep volume", err) } return mounts, nil } //ServiceProbe ServiceProbe func (s *ServiceAction) ServiceProbe(tsp *dbmodel.TenantServiceProbe, action string) error { switch action { case "add": if err := db.GetManager().ServiceProbeDao().AddModel(tsp); err != nil { return err } case "update": if err := db.GetManager().ServiceProbeDao().UpdateModel(tsp); err != nil { return err } case "delete": if err := db.GetManager().ServiceProbeDao().DeleteModel(tsp.ServiceID, tsp.ProbeID); err != nil { return err } } return nil } //RollBack RollBack func (s *ServiceAction) RollBack(rs *api_model.RollbackStruct) error { service, err := db.GetManager().TenantServiceDao().GetServiceByID(rs.ServiceID) if err != nil { return err } oldDeployVersion := service.DeployVersion if service.DeployVersion == rs.DeployVersion { return fmt.Errorf("current version is %v, don't need rollback", rs.DeployVersion) } service.DeployVersion = rs.DeployVersion if err := db.GetManager().TenantServiceDao().UpdateModel(service); err != nil { return err } //发送重启消息到MQ startStopStruct := &api_model.StartStopStruct{ TenantID: rs.TenantID, ServiceID: rs.ServiceID, EventID: rs.EventID, TaskType: "rolling_upgrade", } if err := GetServiceManager().StartStopService(startStopStruct); err != nil { // rollback service.DeployVersion = oldDeployVersion if err := db.GetManager().TenantServiceDao().UpdateModel(service); err != nil { logrus.Warningf("error deploy version rollback: %v", err) } return err } return nil } //GetStatus GetStatus func (s *ServiceAction) GetStatus(serviceID string) (*api_model.StatusList, error) { services, errS := db.GetManager().TenantServiceDao().GetServiceByID(serviceID) if errS != nil { return nil, errS } sl := &api_model.StatusList{ TenantID: services.TenantID, ServiceID: serviceID, ServiceAlias: services.ServiceAlias, DeployVersion: services.DeployVersion, Replicas: services.Replicas, ContainerMem: services.ContainerMemory, ContainerCPU: services.ContainerCPU, CurStatus: services.CurStatus, StatusCN: TransStatus(services.CurStatus), } status := s.statusCli.GetStatus(serviceID) if status != "" { sl.CurStatus = status sl.StatusCN = TransStatus(status) } di, err := s.statusCli.GetServiceDeployInfo(serviceID) if err != nil { logrus.Warningf("service id: %s; failed to get deploy info: %v", serviceID, err) } else { sl.StartTime = di.GetStartTime() } return sl, nil } //GetServicesStatus 获取一组应用状态,若 serviceIDs为空,获取租户所有应用状态 func (s *ServiceAction) GetServicesStatus(tenantID string, serviceIDs []string) []map[string]interface{} { if len(serviceIDs) == 0 { services, _ := db.GetManager().TenantServiceDao().GetServicesByTenantID(tenantID) for _, s := range services { serviceIDs = append(serviceIDs, s.ServiceID) } } if len(serviceIDs) == 0 { return []map[string]interface{}{} } statusList := s.statusCli.GetStatuss(strings.Join(serviceIDs, ",")) var info = make([]map[string]interface{}, 0) for k, v := range statusList { serviceInfo := map[string]interface{}{"service_id": k, "status": v, "status_cn": TransStatus(v), "used_mem": 0} info = append(info, serviceInfo) } return info } // GetEnterpriseServicesStatus get services func (s *ServiceAction) GetEnterpriseServicesStatus(enterpriseID string) (map[string]string, *util.APIHandleError) { var tenantIDs []string tenants, err := db.GetManager().EnterpriseDao().GetEnterpriseTenants(enterpriseID) if err != nil { logrus.Errorf("list tenant failed: %s", err.Error()) return nil, util.CreateAPIHandleErrorFromDBError(fmt.Sprintf("enterprise[%s] get tenant failed", enterpriseID), err) } if len(tenants) == 0 { return nil, util.CreateAPIHandleErrorf(400, "enterprise[%s] has not tenants", enterpriseID) } for _, tenant := range tenants { tenantIDs = append(tenantIDs, tenant.UUID) } services, err := db.GetManager().TenantServiceDao().GetServicesByTenantIDs(tenantIDs) if err != nil { logrus.Errorf("list tenants service failed: %s", err.Error()) return nil, util.CreateAPIHandleErrorf(500, "get enterprise[%s] service failed: %s", enterpriseID, err.Error()) } var serviceIDs []string for _, svc := range services { serviceIDs = append(serviceIDs, svc.ServiceID) } statusList := s.statusCli.GetStatuss(strings.Join(serviceIDs, ",")) return statusList, nil } //CreateTenant create tenant func (s *ServiceAction) CreateTenant(t *dbmodel.Tenants) error { tenant, _ := db.GetManager().TenantDao().GetTenantIDByName(t.Name) if tenant != nil { return fmt.Errorf("tenant name %s is exist", t.Name) } labels := map[string]string{ constants.ResourceManagedByLabel: constants.Rainbond, } return db.GetManager().DB().Transaction(func(tx *gorm.DB) error { if err := db.GetManager().TenantDaoTransactions(tx).AddModel(t); err != nil { if !strings.HasSuffix(err.Error(), "is exist") { return err } } if _, err := s.kubeClient.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: t.Namespace, Labels: labels, }, }, metav1.CreateOptions{}); err != nil { if k8sErrors.IsAlreadyExists(err) { return bcode.ErrNamespaceExists } return err } return nil }) } //CreateTenandIDAndName create tenant_id and tenant_name func (s *ServiceAction) CreateTenandIDAndName(eid string) (string, string, error) { id := uuid.NewV4().String() uid := strings.Replace(id, "-", "", -1) name := strings.Split(id, "-")[0] logrus.Debugf("uuid is %v, name is %v", uid, name) return uid, name, nil } //K8sPodInfos - type K8sPodInfos struct { NewPods []*K8sPodInfo `json:"new_pods"` OldPods []*K8sPodInfo `json:"old_pods"` } //K8sPodInfo for api type K8sPodInfo struct { PodName string `json:"pod_name"` PodIP string `json:"pod_ip"` PodStatus string `json:"pod_status"` ServiceID string `json:"service_id"` Container map[string]map[string]string `json:"container"` } //GetPods get pods func (s *ServiceAction) GetPods(serviceID string) (*K8sPodInfos, error) { pods, err := s.statusCli.GetServicePods(serviceID) if err != nil && !strings.Contains(err.Error(), server.ErrAppServiceNotFound.Error()) && !strings.Contains(err.Error(), server.ErrPodNotFound.Error()) { logrus.Error("GetPodByService Error:", err) return nil, err } if pods == nil { return nil, nil } convpod := func(pods []*pb.ServiceAppPod) []*K8sPodInfo { var podsInfoList []*K8sPodInfo var podNames []string for _, v := range pods { var podInfo K8sPodInfo podInfo.PodName = v.PodName podInfo.PodIP = v.PodIp podInfo.PodStatus = v.PodStatus podInfo.ServiceID = serviceID containerInfos := make(map[string]map[string]string, 10) for _, container := range v.Containers { containerInfos[container.ContainerName] = map[string]string{ "memory_limit": fmt.Sprintf("%d", container.MemoryLimit), "memory_usage": "0", } } podInfo.Container = containerInfos podNames = append(podNames, v.PodName) podsInfoList = append(podsInfoList, &podInfo) } containerMemInfo, _ := s.GetPodContainerMemory(podNames) for _, c := range podsInfoList { for k := range c.Container { if info, exist := containerMemInfo[c.PodName][k]; exist { c.Container[k]["memory_usage"] = info } } } return podsInfoList } newpods := convpod(pods.NewPods) oldpods := convpod(pods.OldPods) return &K8sPodInfos{ NewPods: newpods, OldPods: oldpods, }, nil } //GetMultiServicePods get pods func (s *ServiceAction) GetMultiServicePods(serviceIDs []string) (*K8sPodInfos, error) { mpods, err := s.statusCli.GetMultiServicePods(serviceIDs) if err != nil && !strings.Contains(err.Error(), server.ErrAppServiceNotFound.Error()) && !strings.Contains(err.Error(), server.ErrPodNotFound.Error()) { logrus.Error("GetPodByService Error:", err) return nil, err } if mpods == nil { return nil, nil } convpod := func(serviceID string, pods []*pb.ServiceAppPod) []*K8sPodInfo { var podsInfoList []*K8sPodInfo for _, v := range pods { var podInfo K8sPodInfo podInfo.PodName = v.PodName podInfo.PodIP = v.PodIp podInfo.PodStatus = v.PodStatus podInfo.ServiceID = serviceID podsInfoList = append(podsInfoList, &podInfo) } return podsInfoList } var re K8sPodInfos for serviceID, pods := range mpods.ServicePods { if pods != nil { re.NewPods = append(re.NewPods, convpod(serviceID, pods.NewPods)...) re.OldPods = append(re.OldPods, convpod(serviceID, pods.OldPods)...) } } return &re, nil } // GetComponentPodNums get pods func (s *ServiceAction) GetComponentPodNums(ctx context.Context, componentIDs []string) (map[string]int32, error) { if logrus.IsLevelEnabled(logrus.DebugLevel) { defer core_util.Elapsed(fmt.Sprintf("[AppRuntimeSyncClient] [GetComponentPodNums] component nums: %d", len(componentIDs)))() } podNums, err := s.statusCli.GetComponentPodNums(ctx, componentIDs) if err != nil { return nil, errors.Wrap(err, "get component nums") } return podNums, nil } //GetPodContainerMemory Use Prometheus to query memory resources func (s *ServiceAction) GetPodContainerMemory(podNames []string) (map[string]map[string]string, error) { memoryUsageMap := make(map[string]map[string]string, 10) queryName := strings.Join(podNames, "|") query := fmt.Sprintf(`container_memory_rss{pod=~"%s"}`, queryName) metric := s.prometheusCli.GetMetric(query, time.Now()) for _, re := range metric.MetricData.MetricValues { var containerName = re.Metadata["container"] var podName = re.Metadata["pod"] var valuesBytes string if re.Sample != nil { valuesBytes = fmt.Sprintf("%d", int(re.Sample.Value())) } if _, ok := memoryUsageMap[podName]; ok { memoryUsageMap[podName][containerName] = valuesBytes } else { memoryUsageMap[podName] = map[string]string{ containerName: valuesBytes, } } } return memoryUsageMap, nil } //TransServieToDelete trans service info to delete table func (s *ServiceAction) TransServieToDelete(ctx context.Context, tenantID, serviceID string) error { _, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID) if err != nil && gorm.ErrRecordNotFound == err { logrus.Infof("service[%s] of tenant[%s] do not exist, ignore it", serviceID, tenantID) return nil } if err := s.isServiceClosed(serviceID); err != nil { return err } body, err := s.gcTaskBody(tenantID, serviceID) if err != nil { return fmt.Errorf("GC task body: %v", err) } if err := s.delServiceMetadata(ctx, serviceID); err != nil { return fmt.Errorf("delete service-related metadata: %v", err) } // let rbd-chaos remove related persistent data logrus.Info("let rbd-chaos remove related persistent data") topic := gclient.WorkerTopic if err := s.MQClient.SendBuilderTopic(gclient.TaskStruct{ Topic: topic, TaskType: "service_gc", TaskBody: body, }); err != nil { logrus.Warningf("send gc task: %v", err) } return nil } // isServiceClosed checks if the service has been closed according to the serviceID. func (s *ServiceAction) isServiceClosed(serviceID string) error { service, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID) if err != nil { return err } status := s.statusCli.GetStatus(serviceID) if service.Kind != dbmodel.ServiceKindThirdParty.String() { if !s.statusCli.IsClosedStatus(status) { return ErrServiceNotClosed } } return nil } func (s *ServiceAction) deleteComponent(tx *gorm.DB, service *dbmodel.TenantServices) error { delService := service.ChangeDelete() delService.ID = 0 if err := db.GetManager().TenantServiceDeleteDaoTransactions(tx).AddModel(delService); err != nil { return err } var deleteServicePropertyFunc = []func(serviceID string) error{ db.GetManager().CodeCheckResultDaoTransactions(tx).DeleteByServiceID, db.GetManager().TenantServiceEnvVarDaoTransactions(tx).DELServiceEnvsByServiceID, db.GetManager().TenantPluginVersionConfigDaoTransactions(tx).DeletePluginConfigByServiceID, db.GetManager().TenantServicePluginRelationDaoTransactions(tx).DeleteALLRelationByServiceID, db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).DeleteAllPluginMappingPortByServiceID, db.GetManager().TenantServiceDaoTransactions(tx).DeleteServiceByServiceID, db.GetManager().TenantServicesPortDaoTransactions(tx).DELPortsByServiceID, db.GetManager().TenantServiceRelationDaoTransactions(tx).DELRelationsByServiceID, db.GetManager().TenantServiceMountRelationDaoTransactions(tx).DELTenantServiceMountRelationByServiceID, db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteTenantServiceVolumesByServiceID, db.GetManager().TenantServiceConfigFileDaoTransactions(tx).DelByServiceID, db.GetManager().EndpointsDaoTransactions(tx).DeleteByServiceID, db.GetManager().ThirdPartySvcDiscoveryCfgDaoTransactions(tx).DeleteByServiceID, db.GetManager().TenantServiceLabelDaoTransactions(tx).DeleteLabelByServiceID, db.GetManager().VersionInfoDaoTransactions(tx).DeleteVersionByServiceID, db.GetManager().TenantPluginVersionENVDaoTransactions(tx).DeleteEnvByServiceID, db.GetManager().ServiceProbeDaoTransactions(tx).DELServiceProbesByServiceID, db.GetManager().ServiceEventDaoTransactions(tx).DelEventByServiceID, db.GetManager().TenantServiceMonitorDaoTransactions(tx).DeleteServiceMonitorByServiceID, db.GetManager().AppConfigGroupServiceDaoTransactions(tx).DeleteEffectiveServiceByServiceID, } if err := GetGatewayHandler().DeleteTCPRuleByServiceIDWithTransaction(service.ServiceID, tx); err != nil { return err } if err := GetGatewayHandler().DeleteHTTPRuleByServiceIDWithTransaction(service.ServiceID, tx); err != nil { return err } for _, del := range deleteServicePropertyFunc { if err := del(service.ServiceID); err != nil { if err != gorm.ErrRecordNotFound { return err } } } return nil } // delServiceMetadata deletes service-related metadata in the database. func (s *ServiceAction) delServiceMetadata(ctx context.Context, serviceID string) error { service, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID) if err != nil { return err } if db.GetManager().DB().Dialect().GetName() == "sqlite3" { if err := s.deleteThirdComponent(ctx, service); err != nil { return err } return s.deleteComponent(db.GetManager().DB(), service) } logrus.Infof("delete service %s %s", serviceID, service.ServiceAlias) return db.GetManager().DB().Transaction(func(tx *gorm.DB) error { if err := s.deleteThirdComponent(ctx, service); err != nil { return err } return s.deleteComponent(tx, service) }) } func (s *ServiceAction) deleteThirdComponent(ctx context.Context, component *dbmodel.TenantServices) error { if component.Kind != "third_party" { return nil } tenant, err := db.GetManager().TenantDao().GetTenantByUUID(component.TenantID) if err != nil { return err } thirdPartySvcDiscoveryCfg, err := db.GetManager().ThirdPartySvcDiscoveryCfgDao().GetByServiceID(component.ServiceID) if err != nil { return err } if thirdPartySvcDiscoveryCfg == nil { return nil } if thirdPartySvcDiscoveryCfg.Type != string(dbmodel.DiscorveryTypeKubernetes) { return nil } newCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() err = s.rainbondClient.RainbondV1alpha1().ThirdComponents(tenant.Namespace).Delete(newCtx, component.ServiceID, metav1.DeleteOptions{}) if err != nil && !k8sErrors.IsNotFound(err) { return err } return nil } func (s *ServiceAction) gcTaskBody(tenantID, serviceID string) (map[string]interface{}, error) { events, err := db.GetManager().ServiceEventDao().ListByTargetID(serviceID) if err != nil { logrus.Errorf("list events based on serviceID: %v", err) } var eventIDs []string for _, event := range events { eventIDs = append(eventIDs, event.EventID) } return map[string]interface{}{ "tenant_id": tenantID, "service_id": serviceID, "event_ids": eventIDs, }, nil } //GetServiceDeployInfo get service deploy info func (s *ServiceAction) GetServiceDeployInfo(tenantID, serviceID string) (*pb.DeployInfo, *util.APIHandleError) { info, err := s.statusCli.GetServiceDeployInfo(serviceID) if err != nil { return nil, util.CreateAPIHandleError(500, err) } return info, nil } // ListVersionInfo lists version info func (s *ServiceAction) ListVersionInfo(serviceID string) (*api_model.BuildListRespVO, error) { versionInfos, err := db.GetManager().VersionInfoDao().GetAllVersionByServiceID(serviceID) if err != nil && err != gorm.ErrRecordNotFound { logrus.Errorf("error getting all version by service id: %v", err) return nil, fmt.Errorf("error getting all version by service id: %v", err) } svc, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID) if err != nil { logrus.Errorf("error getting service by uuid: %v", err) return nil, fmt.Errorf("error getting service by uuid: %v", err) } b, err := json.Marshal(versionInfos) if err != nil { return nil, fmt.Errorf("error marshaling version infos: %v", err) } var bversions []*api_model.BuildVersion if err := json.Unmarshal(b, &bversions); err != nil { return nil, fmt.Errorf("error unmarshaling version infos: %v", err) } for idx := range bversions { bv := bversions[idx] if bv.Kind == "build_from_image" || bv.Kind == "build_from_market_image" { image := parser.ParseImageName(bv.RepoURL) bv.ImageDomain = image.GetDomain() bv.ImageRepo = image.GetRepostory() bv.ImageTag = image.GetTag() } } result := &api_model.BuildListRespVO{ DeployVersion: svc.DeployVersion, List: bversions, } return result, nil } // EventBuildVersion - func (s *ServiceAction) EventBuildVersion(serviceID, buildVersion string) (*api_model.BuildListRespVO, error) { versionInfo, err := db.GetManager().VersionInfoDao().GetVersionByDeployVersion(buildVersion, serviceID) if err != nil && err != gorm.ErrRecordNotFound { logrus.Errorf("error getting all version by service id: %v", err) return nil, fmt.Errorf("error getting all version by service id: %v", err) } b, err := json.Marshal(versionInfo) if err != nil { return nil, fmt.Errorf("error marshaling version infos: %v", err) } var bversion *api_model.BuildVersion if err := json.Unmarshal(b, &bversion); err != nil { return nil, fmt.Errorf("error unmarshaling version infos: %v", err) } result := &api_model.BuildListRespVO{ DeployVersion: buildVersion, List: bversion, } return result, nil } // AddAutoscalerRule - func (s *ServiceAction) AddAutoscalerRule(req *api_model.AutoscalerRuleReq) error { tx := db.GetManager().Begin() defer db.GetManager().EnsureEndTransactionFunc() r := &dbmodel.TenantServiceAutoscalerRules{ RuleID: req.RuleID, ServiceID: req.ServiceID, Enable: req.Enable, XPAType: req.XPAType, MinReplicas: req.MinReplicas, MaxReplicas: req.MaxReplicas, } if err := db.GetManager().TenantServceAutoscalerRulesDaoTransactions(tx).AddModel(r); err != nil { tx.Rollback() return err } for _, metric := range req.Metrics { m := &dbmodel.TenantServiceAutoscalerRuleMetrics{ RuleID: req.RuleID, MetricsType: metric.MetricsType, MetricsName: metric.MetricsName, MetricTargetType: metric.MetricTargetType, MetricTargetValue: metric.MetricTargetValue, } if err := db.GetManager().TenantServceAutoscalerRuleMetricsDaoTransactions(tx).AddModel(m); err != nil { tx.Rollback() return err } } taskbody := map[string]interface{}{ "service_id": r.ServiceID, "rule_id": r.RuleID, } if err := s.MQClient.SendBuilderTopic(gclient.TaskStruct{ TaskType: "refreshhpa", TaskBody: taskbody, Topic: gclient.WorkerTopic, }); err != nil { logrus.Errorf("send 'refreshhpa' task: %v", err) return err } logrus.Infof("rule id: %s; successfully send 'refreshhpa' task.", r.RuleID) return tx.Commit().Error } // UpdAutoscalerRule - func (s *ServiceAction) UpdAutoscalerRule(req *api_model.AutoscalerRuleReq) error { rule, err := db.GetManager().TenantServceAutoscalerRulesDao().GetByRuleID(req.RuleID) if err != nil { return err } rule.Enable = req.Enable rule.XPAType = req.XPAType rule.MinReplicas = req.MinReplicas rule.MaxReplicas = req.MaxReplicas tx := db.GetManager().Begin() defer db.GetManager().EnsureEndTransactionFunc() if err := db.GetManager().TenantServceAutoscalerRulesDaoTransactions(tx).UpdateModel(rule); err != nil { tx.Rollback() return err } // delete metrics if err := db.GetManager().TenantServceAutoscalerRuleMetricsDaoTransactions(tx).DeleteByRuleID(req.RuleID); err != nil { tx.Rollback() return err } for _, metric := range req.Metrics { m := &dbmodel.TenantServiceAutoscalerRuleMetrics{ RuleID: req.RuleID, MetricsType: metric.MetricsType, MetricsName: metric.MetricsName, MetricTargetType: metric.MetricTargetType, MetricTargetValue: metric.MetricTargetValue, } if err := db.GetManager().TenantServceAutoscalerRuleMetricsDaoTransactions(tx).AddModel(m); err != nil { tx.Rollback() return err } } taskbody := map[string]interface{}{ "service_id": rule.ServiceID, "rule_id": rule.RuleID, } if err := s.MQClient.SendBuilderTopic(gclient.TaskStruct{ TaskType: "refreshhpa", TaskBody: taskbody, Topic: gclient.WorkerTopic, }); err != nil { logrus.Errorf("send 'refreshhpa' task: %v", err) return err } logrus.Infof("rule id: %s; successfully send 'refreshhpa' task.", rule.RuleID) return tx.Commit().Error } // ListScalingRecords - func (s *ServiceAction) ListScalingRecords(serviceID string, page, pageSize int) ([]*dbmodel.TenantServiceScalingRecords, int, error) { records, err := db.GetManager().TenantServiceScalingRecordsDao().ListByServiceID(serviceID, (page-1)*pageSize, pageSize) if err != nil { return nil, 0, err } count, err := db.GetManager().TenantServiceScalingRecordsDao().CountByServiceID(serviceID) if err != nil { return nil, 0, err } return records, count, nil } // SyncComponentBase - func (s *ServiceAction) SyncComponentBase(tx *gorm.DB, app *dbmodel.Application, components []*api_model.Component) error { var ( componentIDs []string dbComponents []*dbmodel.TenantServices ) for _, component := range components { componentIDs = append(componentIDs, component.ComponentBase.ComponentID) } oldComponents, err := db.GetManager().TenantServiceDao().GetServiceByIDs(componentIDs) if err != nil { return err } existComponents := make(map[string]*dbmodel.TenantServices) for _, oc := range oldComponents { existComponents[oc.ServiceID] = oc } for _, component := range components { var deployVersion string if oldComponent, ok := existComponents[component.ComponentBase.ComponentID]; ok { deployVersion = oldComponent.DeployVersion } dbComponents = append(dbComponents, component.ComponentBase.DbModel(app.TenantID, app.AppID, deployVersion)) } if err := db.GetManager().TenantServiceDaoTransactions(tx).DeleteByComponentIDs(app.TenantID, app.AppID, componentIDs); err != nil { return err } return db.GetManager().TenantServiceDaoTransactions(tx).CreateOrUpdateComponentsInBatch(dbComponents) } // SyncComponentRelations - func (s *ServiceAction) SyncComponentRelations(tx *gorm.DB, app *dbmodel.Application, components []*api_model.Component) error { var ( componentIDs []string relations []*dbmodel.TenantServiceRelation ) for _, component := range components { if component.Relations == nil { continue } componentIDs = append(componentIDs, component.ComponentBase.ComponentID) for _, relation := range component.Relations { relations = append(relations, relation.DbModel(app.TenantID, component.ComponentBase.ComponentID)) } } if err := db.GetManager().TenantServiceRelationDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil { return err } return db.GetManager().TenantServiceRelationDaoTransactions(tx).CreateOrUpdateRelationsInBatch(relations) } // SyncComponentEnvs - func (s *ServiceAction) SyncComponentEnvs(tx *gorm.DB, app *dbmodel.Application, components []*api_model.Component) error { var ( componentIDs []string envs []*dbmodel.TenantServiceEnvVar ) for _, component := range components { if component.Envs == nil { continue } componentIDs = append(componentIDs, component.ComponentBase.ComponentID) for _, env := range component.Envs { envs = append(envs, env.DbModel(app.TenantID, component.ComponentBase.ComponentID)) } } if err := db.GetManager().TenantServiceEnvVarDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil { return err } return db.GetManager().TenantServiceEnvVarDaoTransactions(tx).CreateOrUpdateEnvsInBatch(envs) } // SyncComponentVolumeRels - func (s *ServiceAction) SyncComponentVolumeRels(tx *gorm.DB, app *dbmodel.Application, components []*api_model.Component) error { var ( componentIDs []string volRels []*dbmodel.TenantServiceMountRelation ) // Get the storage of all components under the application appComponents, err := db.GetManager().TenantServiceDao().ListByAppID(app.AppID) if err != nil { return err } var appComponentIDs []string for _, ac := range appComponents { appComponentIDs = append(appComponentIDs, ac.ServiceID) } existVolume, err := s.getExistVolumes(appComponentIDs) if err != nil { return err } // Get the storage that needs to be newly created for _, component := range components { componentID := component.ComponentBase.ComponentID if component.Volumes == nil { continue } for _, vol := range component.Volumes { if _, ok := existVolume[vol.Key(componentID)]; !ok { existVolume[vol.Key(componentID)] = vol.DbModel(componentID) } } } for _, component := range components { if component.VolumeRelations == nil { continue } componentIDs = append(componentIDs, component.ComponentBase.ComponentID) //The hostpath attribute should not be recorded in the mount relationship table, //and should be processed when the worker takes effect for _, volumeRelation := range component.VolumeRelations { if vol, ok := existVolume[volumeRelation.Key()]; ok { volRels = append(volRels, volumeRelation.DbModel(app.TenantID, component.ComponentBase.ComponentID, vol.HostPath, vol.VolumeType)) } } } if err := db.GetManager().TenantServiceMountRelationDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil { return err } return db.GetManager().TenantServiceMountRelationDaoTransactions(tx).CreateOrUpdateVolumeRelsInBatch(volRels) } // SyncComponentVolumes - func (s *ServiceAction) SyncComponentVolumes(tx *gorm.DB, components []*api_model.Component) error { var ( componentIDs []string volumes []*dbmodel.TenantServiceVolume ) for _, component := range components { if component.Volumes == nil { continue } componentIDs = append(componentIDs, component.ComponentBase.ComponentID) for _, volume := range component.Volumes { volumes = append(volumes, volume.DbModel(component.ComponentBase.ComponentID)) } } existVolumes, err := s.getExistVolumes(componentIDs) if err != nil { return err } deleteVolumeIDs := s.getDeleteVolumeIDs(existVolumes, volumes) createOrUpdates := s.getCreateOrUpdateVolumes(existVolumes, volumes) if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteByVolumeIDs(deleteVolumeIDs); err != nil { return err } return db.GetManager().TenantServiceVolumeDaoTransactions(tx).CreateOrUpdateVolumesInBatch(createOrUpdates) } func (s *ServiceAction) getExistVolumes(componentIDs []string) (existVolumes map[string]*dbmodel.TenantServiceVolume, err error) { existVolumes = make(map[string]*dbmodel.TenantServiceVolume) volumes, err := db.GetManager().TenantServiceVolumeDao().ListVolumesByComponentIDs(componentIDs) if err != nil { return nil, err } for _, volume := range volumes { existVolumes[volume.Key()] = volume } return existVolumes, nil } func (s *ServiceAction) getCreateOrUpdateVolumes(existVolumes map[string]*dbmodel.TenantServiceVolume, incomeVolumes []*dbmodel.TenantServiceVolume) (volumes []*dbmodel.TenantServiceVolume) { for _, incomeVolume := range incomeVolumes { if _, ok := existVolumes[incomeVolume.Key()]; ok { incomeVolume.ID = existVolumes[incomeVolume.Key()].ID } volumes = append(volumes, incomeVolume) } return volumes } func (s *ServiceAction) getDeleteVolumeIDs(existVolumes map[string]*dbmodel.TenantServiceVolume, incomeVolumes []*dbmodel.TenantServiceVolume) (deleteVolumeIDs []uint) { newVolumes := make(map[string]struct{}) for _, volume := range incomeVolumes { newVolumes[volume.Key()] = struct{}{} } for existKey, existVolume := range existVolumes { if _, ok := newVolumes[existKey]; !ok { deleteVolumeIDs = append(deleteVolumeIDs, existVolume.ID) } } return deleteVolumeIDs } // SyncComponentConfigFiles - func (s *ServiceAction) SyncComponentConfigFiles(tx *gorm.DB, components []*api_model.Component) error { var ( componentIDs []string configFiles []*dbmodel.TenantServiceConfigFile ) for _, component := range components { if component.ConfigFiles == nil { continue } componentIDs = append(componentIDs, component.ComponentBase.ComponentID) for _, configFile := range component.ConfigFiles { configFiles = append(configFiles, configFile.DbModel(component.ComponentBase.ComponentID)) } } if err := db.GetManager().TenantServiceConfigFileDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil { return err } return db.GetManager().TenantServiceConfigFileDaoTransactions(tx).CreateOrUpdateConfigFilesInBatch(configFiles) } // SyncComponentProbes - func (s *ServiceAction) SyncComponentProbes(tx *gorm.DB, components []*api_model.Component) error { var ( componentIDs []string probes []*dbmodel.TenantServiceProbe ) for _, component := range components { componentIDs = append(componentIDs, component.ComponentBase.ComponentID) modes := make(map[string]struct{}) for _, probe := range component.Probes { _, ok := modes[probe.Mode] if ok { continue } probes = append(probes, probe.DbModel(component.ComponentBase.ComponentID)) modes[probe.Mode] = struct{}{} } } if err := db.GetManager().ServiceProbeDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil { return err } return db.GetManager().ServiceProbeDaoTransactions(tx).CreateOrUpdateProbesInBatch(probes) } // SyncComponentLabels - func (s *ServiceAction) SyncComponentLabels(tx *gorm.DB, components []*api_model.Component) error { var ( componentIDs []string labels []*dbmodel.TenantServiceLable ) for _, component := range components { if component.Labels == nil { continue } componentIDs = append(componentIDs, component.ComponentBase.ComponentID) for _, label := range component.Labels { labels = append(labels, label.DbModel(component.ComponentBase.ComponentID)) } } if err := db.GetManager().TenantServiceLabelDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil { return err } return db.GetManager().TenantServiceLabelDaoTransactions(tx).CreateOrUpdateLabelsInBatch(labels) } // SyncComponentPlugins - func (s *ServiceAction) SyncComponentPlugins(tx *gorm.DB, app *dbmodel.Application, components []*api_model.Component) error { var ( componentIDs []string portConfigComponentIDs []string envComponentIDs []string pluginRelations []*dbmodel.TenantServicePluginRelation pluginVersionEnvs []*dbmodel.TenantPluginVersionEnv pluginVersionConfigs []*dbmodel.TenantPluginVersionDiscoverConfig pluginStreamPorts []*dbmodel.TenantServicesStreamPluginPort ) for _, component := range components { if component.Plugins == nil { continue } componentIDs = append(componentIDs, component.ComponentBase.ComponentID) for _, plugin := range component.Plugins { pluginRelations = append(pluginRelations, plugin.DbModel(component.ComponentBase.ComponentID)) if plugin.ConfigEnvs.NormalEnvs != nil { envComponentIDs = append(envComponentIDs, component.ComponentBase.ComponentID) for _, versionEnv := range plugin.ConfigEnvs.NormalEnvs { pluginVersionEnvs = append(pluginVersionEnvs, versionEnv.DbModel(component.ComponentBase.ComponentID, plugin.PluginID)) } } if configs := plugin.ConfigEnvs.ComplexEnvs; configs != nil { portConfigComponentIDs = append(portConfigComponentIDs, component.ComponentBase.ComponentID) if configs.BasePorts != nil && checkPluginHaveInbound(plugin.PluginModel) { psPorts := s.handlePluginMappingPort(app.TenantID, component.ComponentBase.ComponentID, plugin.PluginModel, configs.BasePorts) pluginStreamPorts = append(pluginStreamPorts, psPorts...) } config, err := ffjson.Marshal(configs) if err != nil { return err } pluginVersionConfigs = append(pluginVersionConfigs, &dbmodel.TenantPluginVersionDiscoverConfig{ PluginID: plugin.PluginID, ServiceID: component.ComponentBase.ComponentID, ConfigStr: string(config), }) } } } if err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).DeleteByComponentIDs(portConfigComponentIDs); err != nil { return err } if err := db.GetManager().TenantPluginVersionConfigDaoTransactions(tx).DeleteByComponentIDs(portConfigComponentIDs); err != nil { return err } if err := db.GetManager().TenantServicePluginRelationDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil { return err } if err := db.GetManager().TenantPluginVersionENVDaoTransactions(tx).DeleteByComponentIDs(envComponentIDs); err != nil { return err } if err := db.GetManager().TenantServicePluginRelationDaoTransactions(tx).CreateOrUpdatePluginRelsInBatch(pluginRelations); err != nil { return err } if err := db.GetManager().TenantPluginVersionENVDaoTransactions(tx).CreateOrUpdatePluginVersionEnvsInBatch(pluginVersionEnvs); err != nil { return err } if err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).CreateOrUpdateStreamPluginPortsInBatch(pluginStreamPorts); err != nil { return err } return db.GetManager().TenantPluginVersionConfigDaoTransactions(tx).CreateOrUpdatePluginVersionConfigsInBatch(pluginVersionConfigs) } //handlePluginMappingPort - func (s *ServiceAction) handlePluginMappingPort(tenantID, componentID, pluginModel string, ports []*api_model.BasePort) []*dbmodel.TenantServicesStreamPluginPort { existPorts := make(map[int]struct{}) for _, port := range ports { existPorts[port.Port] = struct{}{} } minPort := 65301 var newPorts []*dbmodel.TenantServicesStreamPluginPort for _, port := range ports { newPort := &dbmodel.TenantServicesStreamPluginPort{ TenantID: tenantID, ServiceID: componentID, PluginModel: pluginModel, ContainerPort: port.Port, } if _, ok := existPorts[minPort]; ok { minPort = minPort + 1 } newPluginPort := minPort if _, ok := existPorts[newPluginPort]; ok { minPort = minPort + 1 newPluginPort = minPort } existPorts[newPluginPort] = struct{}{} port.ListenPort = newPluginPort newPort.PluginPort = newPluginPort newPorts = append(newPorts, newPort) } return newPorts } // SyncComponentScaleRules - func (s *ServiceAction) SyncComponentScaleRules(tx *gorm.DB, components []*api_model.Component) error { var ( componentIDs []string autoScaleRuleIDs []string autoScaleRules []*dbmodel.TenantServiceAutoscalerRules autoScaleRuleMetrics []*dbmodel.TenantServiceAutoscalerRuleMetrics ) for _, component := range components { componentIDs = append(componentIDs, component.ComponentBase.ComponentID) autoScaleRuleIDs = append(autoScaleRuleIDs, component.AutoScaleRule.RuleID) autoScaleRules = append(autoScaleRules, component.AutoScaleRule.DbModel(component.ComponentBase.ComponentID)) for _, metric := range component.AutoScaleRule.RuleMetrics { autoScaleRuleMetrics = append(autoScaleRuleMetrics, metric.DbModel(component.AutoScaleRule.RuleID)) } } if err := db.GetManager().TenantServceAutoscalerRulesDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil { return err } if err := db.GetManager().TenantServceAutoscalerRuleMetricsDaoTransactions(tx).DeleteByRuleIDs(autoScaleRuleIDs); err != nil { return err } if err := db.GetManager().TenantServceAutoscalerRulesDaoTransactions(tx).CreateOrUpdateScaleRulesInBatch(autoScaleRules); err != nil { return err } return db.GetManager().TenantServceAutoscalerRuleMetricsDaoTransactions(tx).CreateOrUpdateScaleRuleMetricsInBatch(autoScaleRuleMetrics) } // SyncComponentEndpoints - func (s *ServiceAction) SyncComponentEndpoints(tx *gorm.DB, components []*api_model.Component) error { var ( componentIDs []string thirdPartySvcDiscoveryCfgs []*dbmodel.ThirdPartySvcDiscoveryCfg ) for _, component := range components { if component.Endpoint == nil { continue } componentIDs = append(componentIDs, component.ComponentBase.ComponentID) if component.Endpoint.Kubernetes != nil { thirdPartySvcDiscoveryCfgs = append(thirdPartySvcDiscoveryCfgs, component.Endpoint.DbModel(component.ComponentBase.ComponentID)) } } if err := db.GetManager().ThirdPartySvcDiscoveryCfgDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil { return err } return db.GetManager().ThirdPartySvcDiscoveryCfgDaoTransactions(tx).CreateOrUpdate3rdSvcDiscoveryCfgInBatch(thirdPartySvcDiscoveryCfgs) } // SyncComponentK8sAttributes - func (s *ServiceAction) SyncComponentK8sAttributes(tx *gorm.DB, app *dbmodel.Application, components []*api_model.Component) error { var ( componentIDs []string k8sAttributes []*dbmodel.ComponentK8sAttributes ) for _, component := range components { if component.ComponentK8sAttributes == nil || len(component.ComponentK8sAttributes) == 0 { continue } componentIDs = append(componentIDs, component.ComponentBase.ComponentID) for _, k8sAttribute := range component.ComponentK8sAttributes { k8sAttributes = append(k8sAttributes, k8sAttribute.DbModel(app.TenantID, component.ComponentBase.ComponentID)) } } if err := db.GetManager().ComponentK8sAttributeDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil { return err } return db.GetManager().ComponentK8sAttributeDaoTransactions(tx).CreateOrUpdateAttributesInBatch(k8sAttributes) } // Log returns the logs reader for a container in a pod, a pod or a component. func (s *ServiceAction) Log(w http.ResponseWriter, r *http.Request, component *dbmodel.TenantServices, podName, containerName string, follow bool) error { // If podName and containerName is missing, return the logs reader for the component // If containerName is missing, return the logs reader for the pod. if podName == "" || containerName == "" { // Only support return the logs reader for a container now. return errors.WithStack(bcode.NewBadRequest("the field 'podName' and 'containerName' is required")) } tenant, err := db.GetManager().TenantDao().GetTenantByUUID(component.TenantID) if err != nil { return fmt.Errorf("get tenant info failure %s", err.Error()) } request := s.kubeClient.CoreV1().Pods(tenant.Namespace).GetLogs(podName, &corev1.PodLogOptions{ Container: containerName, Follow: follow, }) out, err := request.Stream(context.TODO()) if err != nil { if k8sErrors.IsNotFound(err) { return errors.Wrap(bcode.ErrPodNotFound, "get pod log") } return errors.Wrap(err, "get stream from request") } defer out.Close() w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(http.StatusOK) // Flush headers, if possible if flusher, ok := w.(http.Flusher); ok { flusher.Flush() } writer := flushwriter.Wrap(w) _, err = io.Copy(writer, out) if err != nil { if strings.HasSuffix(err.Error(), "write: broken pipe") { return nil } logrus.Warningf("write stream to response: %v", err) } return nil } //TransStatus trans service status func TransStatus(eStatus string) string { switch eStatus { case "starting": return "启动中" case "abnormal": return "运行异常" case "upgrade": return "升级中" case "closed": return "已关闭" case "stopping": return "关闭中" case "checking": return "检测中" case "unusual": return "运行异常" case "running": return "运行中" case "failure": return "未知" case "undeploy": return "未部署" case "deployed": return "已部署" case "succeeded": return "已完成" } return "" }