From 6e319f8b8bf917959e6ce56f1e2d4ac1d83fb241 Mon Sep 17 00:00:00 2001 From: GLYASAI Date: Thu, 20 May 2021 00:01:37 +0800 Subject: [PATCH 1/2] delete component port --- api/db/db.go | 2 +- api/handler/gateway_action.go | 49 +++++++++++++++++++++++++ api/handler/gateway_handler.go | 1 + api/handler/service.go | 37 ++++++++++--------- api/util/bcode/ingress.go | 7 ++++ api/util/bcode/service.go | 2 +- cmd/api/option/option.go | 2 ++ db/config/config.go | 1 + db/dao/dao.go | 5 +++ db/db.go | 1 + db/mysql/dao/gateway.go | 66 +++++++++++++++++++++++++++++----- db/mysql/dao/tenants.go | 7 ++++ db/mysql/mysql.go | 8 +++++ worker/client/client.go | 2 +- 14 files changed, 159 insertions(+), 31 deletions(-) create mode 100644 api/util/bcode/ingress.go diff --git a/api/db/db.go b/api/db/db.go index 02a43710e..91f38a558 100644 --- a/api/db/db.go +++ b/api/db/db.go @@ -49,6 +49,7 @@ func CreateDBManager(conf option.Config) error { dbCfg := config.Config{ MysqlConnectionInfo: conf.DBConnectionInfo, DBType: conf.DBType, + ShowSQL: conf.ShowSQL, } if err := db.CreateManager(dbCfg); err != nil { logrus.Errorf("get db manager failed,%s", err.Error()) @@ -63,7 +64,6 @@ func CreateDBManager(conf option.Config) error { //CreateEventManager create event manager func CreateEventManager(conf option.Config) error { var tryTime time.Duration - tryTime = 0 var err error etcdClientArgs := &etcdutil.ClientArgs{ Endpoints: conf.EtcdEndpoint, diff --git a/api/handler/gateway_action.go b/api/handler/gateway_action.go index 086185fd9..a636e7f12 100644 --- a/api/handler/gateway_action.go +++ b/api/handler/gateway_action.go @@ -20,6 +20,7 @@ package handler import ( "context" + "errors" "fmt" "os" "sort" @@ -29,6 +30,7 @@ import ( "github.com/coreos/etcd/clientv3" apimodel "github.com/goodrain/rainbond/api/model" + "github.com/goodrain/rainbond/api/util/bcode" "github.com/goodrain/rainbond/db" "github.com/goodrain/rainbond/db/model" "github.com/goodrain/rainbond/mq/client" @@ -801,3 +803,50 @@ func (g *GatewayAction) GetGatewayIPs() []IPAndAvailablePort { } return defaultIps } + +// DeleteIngressRulesByComponentPort deletes ingress rules, including http rules and tcp rules, based on the given componentID and port. +func (g *GatewayAction) DeleteIngressRulesByComponentPort(tx *gorm.DB, componentID string, port int) error { + httpRuleIDs, err := g.listHTTPRuleIDs(componentID, port) + if err != nil { + return err + } + + // delete rule configs + if err := db.GetManager().GwRuleConfigDaoTransactions(tx).DeleteByRuleIDs(httpRuleIDs); err != nil { + return err + } + + // delete rule extentions + if err := db.GetManager().RuleExtensionDaoTransactions(tx).DeleteByRuleIDs(httpRuleIDs); err != nil { + return err + } + + // delete http rules + if err := db.GetManager().HTTPRuleDaoTransactions(tx).DeleteByComponentPort(componentID, port); err != nil { + if !errors.Is(err, bcode.ErrIngressHTTPRuleNotFound) { + return err + } + } + + // delete tcp rules + if err := db.GetManager().TCPRuleDaoTransactions(tx).DeleteByComponentPort(componentID, port); err != nil { + if !errors.Is(err, bcode.ErrIngressTCPRuleNotFound) { + return err + } + } + + return nil +} + +func (g *GatewayAction) listHTTPRuleIDs(componentID string, port int) ([]string, error) { + httpRules, err := db.GetManager().HTTPRuleDao().ListByComponentPort(componentID, port) + if err != nil { + return nil, err + } + + var ruleIDs []string + for _, rule := range httpRules { + ruleIDs = append(ruleIDs, rule.UUID) + } + return ruleIDs, nil +} diff --git a/api/handler/gateway_handler.go b/api/handler/gateway_handler.go index b785c5221..eaaad940a 100644 --- a/api/handler/gateway_handler.go +++ b/api/handler/gateway_handler.go @@ -46,4 +46,5 @@ type GatewayHandler interface { UpdCertificate(req *apimodel.UpdCertificateReq) error GetGatewayIPs() []IPAndAvailablePort ListHTTPRulesByCertID(certID string) ([]*dbmodel.HTTPRule, error) + DeleteIngressRulesByComponentPort(tx *gorm.DB, componentID string, port int) error } diff --git a/api/handler/service.go b/api/handler/service.go index 0e29ecaa9..a53a6c17b 100644 --- a/api/handler/service.go +++ b/api/handler/service.go @@ -1082,6 +1082,23 @@ func (s *ServiceAction) CreatePorts(tenantID, serviceID string, vps *api_model.S return nil } +func (s *ServiceAction) deletePorts(componentID string, ports *api_model.ServicePorts) error { + return db.GetManager().DB().Transaction(func(tx *gorm.DB) error { + for _, port := range ports.Port { + if err := db.GetManager().TenantServicesPortDaoTransactions(tx).DeleteModel(componentID, port.ContainerPort); err != nil { + return err + } + + // delete related ingress rules + if err := GetGatewayHandler().DeleteIngressRulesByComponentPort(tx, componentID, port.ContainerPort); err != nil { + return err + } + } + + return nil + }) +} + //PortVar port var func (s *ServiceAction) PortVar(action, tenantID, serviceID string, vps *api_model.ServicePorts, oldPort int) error { crt, err := db.GetManager().TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID( @@ -1093,25 +1110,7 @@ func (s *ServiceAction) PortVar(action, tenantID, serviceID string, vps *api_mod } switch action { case "delete": - tx := db.GetManager().Begin() - defer func() { - if r := recover(); r != nil { - logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r) - tx.Rollback() - } - }() - for _, vp := range vps.Port { - if err := db.GetManager().TenantServicesPortDaoTransactions(tx).DeleteModel(serviceID, vp.ContainerPort); err != nil { - logrus.Errorf("delete port var error, %v", err) - tx.Rollback() - return err - } - } - if err := tx.Commit().Error; err != nil { - tx.Rollback() - logrus.Debugf("commit delete port error, %v", err) - return err - } + return s.deletePorts(serviceID, vps) case "update": tx := db.GetManager().Begin() defer func() { diff --git a/api/util/bcode/ingress.go b/api/util/bcode/ingress.go new file mode 100644 index 000000000..e73b9594b --- /dev/null +++ b/api/util/bcode/ingress.go @@ -0,0 +1,7 @@ +package bcode + +// service: 11200~11299 +var ( + ErrIngressHTTPRuleNotFound = newByMessage(404, 11200, "http rule not found") + ErrIngressTCPRuleNotFound = newByMessage(404, 11201, "tcp rule not found") +) diff --git a/api/util/bcode/service.go b/api/util/bcode/service.go index 366609976..8c1d57677 100644 --- a/api/util/bcode/service.go +++ b/api/util/bcode/service.go @@ -10,4 +10,4 @@ var ( ErrServiceMonitorNameExist = newByMessage(400, 10102, "service monitor name is exist") // ErrSyncOperation - ErrSyncOperation = newByMessage(409, 10103, "The asynchronous operation is executing") -) +) \ No newline at end of file diff --git a/cmd/api/option/option.go b/cmd/api/option/option.go index 2112168b8..65477e297 100644 --- a/cmd/api/option/option.go +++ b/cmd/api/option/option.go @@ -61,6 +61,7 @@ type Config struct { KubeConfigPath string PrometheusEndpoint string RbdNamespace string + ShowSQL bool } //APIServer apiserver server @@ -113,6 +114,7 @@ func (a *APIServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&a.KuberentesDashboardAPI, "k8s-dashboard-api", "kubernetes-dashboard.rbd-system:443", "The service DNS name of Kubernetes dashboard. Default to kubernetes-dashboard.kubernetes-dashboard") fs.StringVar(&a.PrometheusEndpoint, "prom-api", "rbd-monitor:9999", "The service DNS name of Prometheus api. Default to rbd-monitor:9999") fs.StringVar(&a.RbdNamespace, "rbd-namespace", "rbd-system", "rbd component namespace") + fs.BoolVar(&a.ShowSQL, "show-sql", false, "The trigger for showing sql.") } //SetLog 设置log diff --git a/db/config/config.go b/db/config/config.go index 59b71c901..cf48adcb3 100644 --- a/db/config/config.go +++ b/db/config/config.go @@ -27,4 +27,5 @@ type Config struct { EtcdCertFile string EtcdKeyFile string EtcdTimeout int + ShowSQL bool } diff --git a/db/dao/dao.go b/db/dao/dao.go index 5a188d8ab..9da085f0f 100644 --- a/db/dao/dao.go +++ b/db/dao/dao.go @@ -458,6 +458,7 @@ type RuleExtensionDao interface { Dao GetRuleExtensionByRuleID(ruleID string) ([]*model.RuleExtension, error) DeleteRuleExtensionByRuleID(ruleID string) error + DeleteByRuleIDs(ruleIDs []string) error } // HTTPRuleDao - @@ -469,7 +470,9 @@ type HTTPRuleDao interface { DeleteHTTPRuleByID(id string) error DeleteHTTPRuleByServiceID(serviceID string) error ListByServiceID(serviceID string) ([]*model.HTTPRule, error) + ListByComponentPort(componentID string, port int) ([]*model.HTTPRule, error) ListByCertID(certID string) ([]*model.HTTPRule, error) + DeleteByComponentPort(componentID string, port int) error } // TCPRuleDao - @@ -482,6 +485,7 @@ type TCPRuleDao interface { DeleteTCPRuleByServiceID(serviceID string) error ListByServiceID(serviceID string) ([]*model.TCPRule, error) GetUsedPortsByIP(ip string) ([]*model.TCPRule, error) + DeleteByComponentPort(componentID string, port int) error } // EndpointsDao is an interface for defining method @@ -509,6 +513,7 @@ type GwRuleConfigDao interface { Dao DeleteByRuleID(rid string) error ListByRuleID(rid string) ([]*model.GwRuleConfig, error) + DeleteByRuleIDs(ruleIDs []string) error } // TenantServceAutoscalerRulesDao - diff --git a/db/db.go b/db/db.go index ec5ea4aac..7d2af901e 100644 --- a/db/db.go +++ b/db/db.go @@ -34,6 +34,7 @@ import ( type Manager interface { CloseManager() error Begin() *gorm.DB + DB() *gorm.DB EnsureEndTransactionFunc() func(tx *gorm.DB) VolumeTypeDao() dao.VolumeTypeDao LicenseDao() dao.LicenseDao diff --git a/db/mysql/dao/gateway.go b/db/mysql/dao/gateway.go index 4fed08e4a..614a509c1 100644 --- a/db/mysql/dao/gateway.go +++ b/db/mysql/dao/gateway.go @@ -22,8 +22,10 @@ import ( "fmt" "reflect" + "github.com/goodrain/rainbond/api/util/bcode" "github.com/goodrain/rainbond/db/model" "github.com/jinzhu/gorm" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -36,7 +38,7 @@ type CertificateDaoImpl struct { func (c *CertificateDaoImpl) AddModel(mo model.Interface) error { certificate, ok := mo.(*model.Certificate) if !ok { - return fmt.Errorf("Can't convert %s to %s", reflect.TypeOf(mo).String(), "*model.Certificate") + return fmt.Errorf("can't convert %s to %s", reflect.TypeOf(mo).String(), "*model.Certificate") } var old model.Certificate if ok := c.DB.Where("uuid = ?", certificate.UUID).Find(&old).RecordNotFound(); ok { @@ -44,8 +46,7 @@ func (c *CertificateDaoImpl) AddModel(mo model.Interface) error { return err } } else { - return fmt.Errorf("Certificate already exists based on certificateID(%s)", - certificate.UUID) + return fmt.Errorf("certificate already exists based on certificateID(%s)", certificate.UUID) } return nil } @@ -54,7 +55,7 @@ func (c *CertificateDaoImpl) AddModel(mo model.Interface) error { func (c *CertificateDaoImpl) UpdateModel(mo model.Interface) error { cert, ok := mo.(*model.Certificate) if !ok { - return fmt.Errorf("Failed to convert %s to *model.Certificate", reflect.TypeOf(mo).String()) + return fmt.Errorf("failed to convert %s to *model.Certificate", reflect.TypeOf(mo).String()) } return c.DB.Table(cert.TableName()). Where("uuid = ?", cert.UUID). @@ -65,7 +66,7 @@ func (c *CertificateDaoImpl) UpdateModel(mo model.Interface) error { func (c *CertificateDaoImpl) AddOrUpdate(mo model.Interface) error { cert, ok := mo.(*model.Certificate) if !ok { - return fmt.Errorf("Failed to convert %s to *model.Certificate", reflect.TypeOf(mo).String()) + return fmt.Errorf("failed to convert %s to *model.Certificate", reflect.TypeOf(mo).String()) } var old model.Certificate @@ -104,7 +105,7 @@ type RuleExtensionDaoImpl struct { func (c *RuleExtensionDaoImpl) AddModel(mo model.Interface) error { re, ok := mo.(*model.RuleExtension) if !ok { - return fmt.Errorf("Can't convert %s to %s", reflect.TypeOf(mo).String(), "*model.RuleExtension") + return fmt.Errorf("can't convert %s to %s", reflect.TypeOf(mo).String(), "*model.RuleExtension") } var old model.RuleExtension if ok := c.DB.Where("rule_id = ? and value = ?", re.RuleID, re.Value).Find(&old).RecordNotFound(); ok { @@ -139,6 +140,14 @@ func (c *RuleExtensionDaoImpl) DeleteRuleExtensionByRuleID(ruleID string) error return c.DB.Where("rule_id=?", ruleID).Delete(re).Error } +// DeleteByRuleIDs deletes rule extentions based on the given ruleIDs. +func (c *RuleExtensionDaoImpl) DeleteByRuleIDs(ruleIDs []string) error { + if err := c.DB.Where("rule_id in (?)", ruleIDs).Delete(&model.RuleExtension{}).Error; err != nil { + return errors.Wrap(err, "delete rule extentions") + } + return nil +} + //HTTPRuleDaoImpl http rule type HTTPRuleDaoImpl struct { DB *gorm.DB @@ -148,7 +157,7 @@ type HTTPRuleDaoImpl struct { func (h *HTTPRuleDaoImpl) AddModel(mo model.Interface) error { httpRule, ok := mo.(*model.HTTPRule) if !ok { - return fmt.Errorf("Can't not convert %s to *model.HTTPRule", reflect.TypeOf(mo).String()) + return fmt.Errorf("can't not convert %s to *model.HTTPRule", reflect.TypeOf(mo).String()) } var oldHTTPRule model.HTTPRule if ok := h.DB.Where("uuid=?", httpRule.UUID).Find(&oldHTTPRule).RecordNotFound(); ok { @@ -165,7 +174,7 @@ func (h *HTTPRuleDaoImpl) AddModel(mo model.Interface) error { func (h *HTTPRuleDaoImpl) UpdateModel(mo model.Interface) error { hr, ok := mo.(*model.HTTPRule) if !ok { - return fmt.Errorf("Failed to convert %s to *model.HTTPRule", reflect.TypeOf(mo).String()) + return fmt.Errorf("failed to convert %s to *model.HTTPRule", reflect.TypeOf(mo).String()) } return h.DB.Save(hr).Error } @@ -218,6 +227,17 @@ func (h *HTTPRuleDaoImpl) DeleteHTTPRuleByID(id string) error { return nil } +// DeleteByComponentPort deletes http rules based on componentID and port. +func (h *HTTPRuleDaoImpl) DeleteByComponentPort(componentID string, port int) error { + if err := h.DB.Where("service_id=? and container_port=?", componentID, port).Delete(&model.HTTPRule{}).Error; err != nil { + if err == gorm.ErrRecordNotFound { + return errors.Wrap(bcode.ErrIngressHTTPRuleNotFound, "delete http rules") + } + return err + } + return nil +} + //DeleteHTTPRuleByServiceID delete http rule by service id func (h *HTTPRuleDaoImpl) DeleteHTTPRuleByServiceID(serviceID string) error { httpRule := &model.HTTPRule{} @@ -236,6 +256,15 @@ func (h *HTTPRuleDaoImpl) ListByServiceID(serviceID string) ([]*model.HTTPRule, return rules, nil } +// ListByComponentPort lists http rules based on the given componentID and port. +func (h *HTTPRuleDaoImpl) ListByComponentPort(componentID string, port int) ([]*model.HTTPRule, error) { + var rules []*model.HTTPRule + if err := h.DB.Where("service_id=? and container_port=?", componentID, port).Find(&rules).Error; err != nil { + return nil, errors.Wrap(err, "list http rules") + } + return rules, nil +} + // ListByCertID lists all HTTPRules matching certificate id func (h *HTTPRuleDaoImpl) ListByCertID(certID string) ([]*model.HTTPRule, error) { var rules []*model.HTTPRule @@ -268,7 +297,7 @@ func (t *TCPRuleDaoTmpl) AddModel(mo model.Interface) error { func (t *TCPRuleDaoTmpl) UpdateModel(mo model.Interface) error { tr, ok := mo.(*model.TCPRule) if !ok { - return fmt.Errorf("Failed to convert %s to *model.TCPRule", reflect.TypeOf(mo).String()) + return fmt.Errorf("failed to convert %s to *model.TCPRule", reflect.TypeOf(mo).String()) } return t.DB.Save(tr).Error @@ -323,6 +352,17 @@ func (t *TCPRuleDaoTmpl) DeleteTCPRuleByServiceID(serviceID string) error { return t.DB.Where("service_id = ?", serviceID).Delete(tcpRule).Error } +// DeleteByComponentPort deletes tcp rules based on the given component id and port. +func (t *TCPRuleDaoTmpl) DeleteByComponentPort(componentID string, port int) error { + if err := t.DB.Where("service_id=? and container_port=?", componentID, port).Delete(&model.TCPRule{}).Error; err != nil { + if err == gorm.ErrRecordNotFound { + return errors.Wrap(bcode.ErrIngressTCPRuleNotFound, "delete tcp rules") + } + return errors.Wrap(err, "delete tcp rules") + } + return nil +} + //GetUsedPortsByIP get used port by ip //sort by port func (t *TCPRuleDaoTmpl) GetUsedPortsByIP(ip string) ([]*model.TCPRule, error) { @@ -387,3 +427,11 @@ func (t *GwRuleConfigDaoImpl) ListByRuleID(rid string) ([]*model.GwRuleConfig, e } return res, nil } + +// DeleteByRuleIDs deletes rule configs based on the given ruleIDs. +func (t *GwRuleConfigDaoImpl) DeleteByRuleIDs(ruleIDs []string) error { + if err := t.DB.Where("rule_id in (?)", ruleIDs).Delete(&model.GwRuleConfig{}).Error; err != nil { + return errors.Wrap(err, "delete rule configs") + } + return nil +} diff --git a/db/mysql/dao/tenants.go b/db/mysql/dao/tenants.go index b702fd08c..828ffaa95 100644 --- a/db/mysql/dao/tenants.go +++ b/db/mysql/dao/tenants.go @@ -30,6 +30,7 @@ import ( "github.com/goodrain/rainbond/db/errors" "github.com/goodrain/rainbond/db/model" "github.com/jinzhu/gorm" + perr "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -369,6 +370,9 @@ func (t *TenantServicesDaoImpl) GetPagedTenantService(offset, length int, servic } } tenants, err := t.DB.Raw("SELECT uuid,name,eid from tenants where uuid in (?)", tenantIDs).Rows() + if err != nil { + return nil, 0, perr.Wrap(err, "list tenants") + } defer tenants.Close() for tenants.Next() { var tenantID string @@ -637,6 +641,9 @@ func (t *TenantServicesPortDaoImpl) DeleteModel(serviceID string, args ...interf //Protocol: protocol, } if err := t.DB.Where("service_id=? and container_port=?", serviceID, containerPort).Delete(tsp).Error; err != nil { + if err == gorm.ErrRecordNotFound { + return perr.Wrap(bcode.ErrPortNotFound, "delete component port") + } return err } return nil diff --git a/db/mysql/mysql.go b/db/mysql/mysql.go index 1662257db..4e88c590d 100644 --- a/db/mysql/mysql.go +++ b/db/mysql/mysql.go @@ -57,6 +57,9 @@ func CreateManager(config config.Config) (*Manager, error) { return nil, err } } + if config.ShowSQL { + db = db.Debug() + } manager := &Manager{ db: db, config: config, @@ -79,6 +82,11 @@ func (m *Manager) Begin() *gorm.DB { return m.db.Begin() } +// DB returns the db. +func (m *Manager) DB() *gorm.DB { + return m.db +} + // EnsureEndTransactionFunc - func (m *Manager) EnsureEndTransactionFunc() func(tx *gorm.DB) { return func(tx *gorm.DB) { diff --git a/worker/client/client.go b/worker/client/client.go index 3b0eaf679..dff71320b 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -64,7 +64,7 @@ func NewClient(ctx context.Context, conf AppRuntimeSyncClientConf) (*AppRuntimeS c, err := etcdutil.NewClient(ctx, etcdClientArgs) r := &grpcutil.GRPCResolver{Client: c} b := grpc.RoundRobin(r) - arsc.cc, err = grpc.DialContext(ctx, "/rainbond/discover/app_sync_runtime_server", grpc.WithBalancer(b), grpc.WithInsecure(), grpc.WithBlock()) + arsc.cc, err = grpc.DialContext(ctx, "/rainbond/discover/app_sync_runtime_server", grpc.WithBalancer(b), grpc.WithInsecure()) if err != nil { return nil, err } From 02798558107ee1da50c917f5a4bd758a201b76e9 Mon Sep 17 00:00:00 2001 From: GLYASAI Date: Thu, 20 May 2021 08:29:46 +0800 Subject: [PATCH 2/2] with block --- worker/client/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/worker/client/client.go b/worker/client/client.go index dff71320b..f862a4589 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -62,9 +62,12 @@ func NewClient(ctx context.Context, conf AppRuntimeSyncClientConf) (*AppRuntimeS KeyFile: conf.EtcdKeyFile, } c, err := etcdutil.NewClient(ctx, etcdClientArgs) + if err != nil { + return nil, err + } r := &grpcutil.GRPCResolver{Client: c} b := grpc.RoundRobin(r) - arsc.cc, err = grpc.DialContext(ctx, "/rainbond/discover/app_sync_runtime_server", grpc.WithBalancer(b), grpc.WithInsecure()) + arsc.cc, err = grpc.DialContext(ctx, "/rainbond/discover/app_sync_runtime_server", grpc.WithBalancer(b), grpc.WithInsecure(), grpc.WithBlock()) if err != nil { return nil, err }