Merge branch 'master' into transport-socket

This commit is contained in:
黄润豪 2021-05-20 13:42:53 +08:00 committed by GitHub
commit e71f38db4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 160 additions and 30 deletions

View File

@ -49,6 +49,7 @@ func CreateDBManager(conf option.Config) error {
dbCfg := config.Config{ dbCfg := config.Config{
MysqlConnectionInfo: conf.DBConnectionInfo, MysqlConnectionInfo: conf.DBConnectionInfo,
DBType: conf.DBType, DBType: conf.DBType,
ShowSQL: conf.ShowSQL,
} }
if err := db.CreateManager(dbCfg); err != nil { if err := db.CreateManager(dbCfg); err != nil {
logrus.Errorf("get db manager failed,%s", err.Error()) logrus.Errorf("get db manager failed,%s", err.Error())
@ -63,7 +64,6 @@ func CreateDBManager(conf option.Config) error {
//CreateEventManager create event manager //CreateEventManager create event manager
func CreateEventManager(conf option.Config) error { func CreateEventManager(conf option.Config) error {
var tryTime time.Duration var tryTime time.Duration
tryTime = 0
var err error var err error
etcdClientArgs := &etcdutil.ClientArgs{ etcdClientArgs := &etcdutil.ClientArgs{
Endpoints: conf.EtcdEndpoint, Endpoints: conf.EtcdEndpoint,

View File

@ -20,6 +20,7 @@ package handler
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"os" "os"
"sort" "sort"
@ -29,6 +30,7 @@ import (
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
apimodel "github.com/goodrain/rainbond/api/model" apimodel "github.com/goodrain/rainbond/api/model"
"github.com/goodrain/rainbond/api/util/bcode"
"github.com/goodrain/rainbond/db" "github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model" "github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/mq/client" "github.com/goodrain/rainbond/mq/client"
@ -801,3 +803,50 @@ func (g *GatewayAction) GetGatewayIPs() []IPAndAvailablePort {
} }
return defaultIps return defaultIps
} }
// DeleteIngressRulesByComponentPort deletes ingress rules, including http rules and tcp rules, based on the given componentID and port.
func (g *GatewayAction) DeleteIngressRulesByComponentPort(tx *gorm.DB, componentID string, port int) error {
httpRuleIDs, err := g.listHTTPRuleIDs(componentID, port)
if err != nil {
return err
}
// delete rule configs
if err := db.GetManager().GwRuleConfigDaoTransactions(tx).DeleteByRuleIDs(httpRuleIDs); err != nil {
return err
}
// delete rule extentions
if err := db.GetManager().RuleExtensionDaoTransactions(tx).DeleteByRuleIDs(httpRuleIDs); err != nil {
return err
}
// delete http rules
if err := db.GetManager().HTTPRuleDaoTransactions(tx).DeleteByComponentPort(componentID, port); err != nil {
if !errors.Is(err, bcode.ErrIngressHTTPRuleNotFound) {
return err
}
}
// delete tcp rules
if err := db.GetManager().TCPRuleDaoTransactions(tx).DeleteByComponentPort(componentID, port); err != nil {
if !errors.Is(err, bcode.ErrIngressTCPRuleNotFound) {
return err
}
}
return nil
}
func (g *GatewayAction) listHTTPRuleIDs(componentID string, port int) ([]string, error) {
httpRules, err := db.GetManager().HTTPRuleDao().ListByComponentPort(componentID, port)
if err != nil {
return nil, err
}
var ruleIDs []string
for _, rule := range httpRules {
ruleIDs = append(ruleIDs, rule.UUID)
}
return ruleIDs, nil
}

View File

@ -46,4 +46,5 @@ type GatewayHandler interface {
UpdCertificate(req *apimodel.UpdCertificateReq) error UpdCertificate(req *apimodel.UpdCertificateReq) error
GetGatewayIPs() []IPAndAvailablePort GetGatewayIPs() []IPAndAvailablePort
ListHTTPRulesByCertID(certID string) ([]*dbmodel.HTTPRule, error) ListHTTPRulesByCertID(certID string) ([]*dbmodel.HTTPRule, error)
DeleteIngressRulesByComponentPort(tx *gorm.DB, componentID string, port int) error
} }

View File

@ -1082,6 +1082,23 @@ func (s *ServiceAction) CreatePorts(tenantID, serviceID string, vps *api_model.S
return nil return nil
} }
func (s *ServiceAction) deletePorts(componentID string, ports *api_model.ServicePorts) error {
return db.GetManager().DB().Transaction(func(tx *gorm.DB) error {
for _, port := range ports.Port {
if err := db.GetManager().TenantServicesPortDaoTransactions(tx).DeleteModel(componentID, port.ContainerPort); err != nil {
return err
}
// delete related ingress rules
if err := GetGatewayHandler().DeleteIngressRulesByComponentPort(tx, componentID, port.ContainerPort); err != nil {
return err
}
}
return nil
})
}
//PortVar port var //PortVar port var
func (s *ServiceAction) PortVar(action, tenantID, serviceID string, vps *api_model.ServicePorts, oldPort int) error { func (s *ServiceAction) PortVar(action, tenantID, serviceID string, vps *api_model.ServicePorts, oldPort int) error {
crt, err := db.GetManager().TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID( crt, err := db.GetManager().TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID(
@ -1093,25 +1110,7 @@ func (s *ServiceAction) PortVar(action, tenantID, serviceID string, vps *api_mod
} }
switch action { switch action {
case "delete": case "delete":
tx := db.GetManager().Begin() return s.deletePorts(serviceID, vps)
defer func() {
if r := recover(); r != nil {
logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
tx.Rollback()
}
}()
for _, vp := range vps.Port {
if err := db.GetManager().TenantServicesPortDaoTransactions(tx).DeleteModel(serviceID, vp.ContainerPort); err != nil {
logrus.Errorf("delete port var error, %v", err)
tx.Rollback()
return err
}
}
if err := tx.Commit().Error; err != nil {
tx.Rollback()
logrus.Debugf("commit delete port error, %v", err)
return err
}
case "update": case "update":
tx := db.GetManager().Begin() tx := db.GetManager().Begin()
defer func() { defer func() {

View File

@ -0,0 +1,7 @@
package bcode
// service: 11200~11299
var (
ErrIngressHTTPRuleNotFound = newByMessage(404, 11200, "http rule not found")
ErrIngressTCPRuleNotFound = newByMessage(404, 11201, "tcp rule not found")
)

View File

@ -10,4 +10,4 @@ var (
ErrServiceMonitorNameExist = newByMessage(400, 10102, "service monitor name is exist") ErrServiceMonitorNameExist = newByMessage(400, 10102, "service monitor name is exist")
// ErrSyncOperation - // ErrSyncOperation -
ErrSyncOperation = newByMessage(409, 10103, "The asynchronous operation is executing") ErrSyncOperation = newByMessage(409, 10103, "The asynchronous operation is executing")
) )

View File

@ -61,6 +61,7 @@ type Config struct {
KubeConfigPath string KubeConfigPath string
PrometheusEndpoint string PrometheusEndpoint string
RbdNamespace string RbdNamespace string
ShowSQL bool
} }
//APIServer apiserver server //APIServer apiserver server
@ -113,6 +114,7 @@ func (a *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&a.KuberentesDashboardAPI, "k8s-dashboard-api", "kubernetes-dashboard.rbd-system:443", "The service DNS name of Kubernetes dashboard. Default to kubernetes-dashboard.kubernetes-dashboard") fs.StringVar(&a.KuberentesDashboardAPI, "k8s-dashboard-api", "kubernetes-dashboard.rbd-system:443", "The service DNS name of Kubernetes dashboard. Default to kubernetes-dashboard.kubernetes-dashboard")
fs.StringVar(&a.PrometheusEndpoint, "prom-api", "rbd-monitor:9999", "The service DNS name of Prometheus api. Default to rbd-monitor:9999") fs.StringVar(&a.PrometheusEndpoint, "prom-api", "rbd-monitor:9999", "The service DNS name of Prometheus api. Default to rbd-monitor:9999")
fs.StringVar(&a.RbdNamespace, "rbd-namespace", "rbd-system", "rbd component namespace") fs.StringVar(&a.RbdNamespace, "rbd-namespace", "rbd-system", "rbd component namespace")
fs.BoolVar(&a.ShowSQL, "show-sql", false, "The trigger for showing sql.")
} }
//SetLog 设置log //SetLog 设置log

View File

@ -27,4 +27,5 @@ type Config struct {
EtcdCertFile string EtcdCertFile string
EtcdKeyFile string EtcdKeyFile string
EtcdTimeout int EtcdTimeout int
ShowSQL bool
} }

View File

@ -460,6 +460,7 @@ type RuleExtensionDao interface {
Dao Dao
GetRuleExtensionByRuleID(ruleID string) ([]*model.RuleExtension, error) GetRuleExtensionByRuleID(ruleID string) ([]*model.RuleExtension, error)
DeleteRuleExtensionByRuleID(ruleID string) error DeleteRuleExtensionByRuleID(ruleID string) error
DeleteByRuleIDs(ruleIDs []string) error
} }
// HTTPRuleDao - // HTTPRuleDao -
@ -471,7 +472,9 @@ type HTTPRuleDao interface {
DeleteHTTPRuleByID(id string) error DeleteHTTPRuleByID(id string) error
DeleteHTTPRuleByServiceID(serviceID string) error DeleteHTTPRuleByServiceID(serviceID string) error
ListByServiceID(serviceID string) ([]*model.HTTPRule, error) ListByServiceID(serviceID string) ([]*model.HTTPRule, error)
ListByComponentPort(componentID string, port int) ([]*model.HTTPRule, error)
ListByCertID(certID string) ([]*model.HTTPRule, error) ListByCertID(certID string) ([]*model.HTTPRule, error)
DeleteByComponentPort(componentID string, port int) error
} }
// TCPRuleDao - // TCPRuleDao -
@ -484,6 +487,7 @@ type TCPRuleDao interface {
DeleteTCPRuleByServiceID(serviceID string) error DeleteTCPRuleByServiceID(serviceID string) error
ListByServiceID(serviceID string) ([]*model.TCPRule, error) ListByServiceID(serviceID string) ([]*model.TCPRule, error)
GetUsedPortsByIP(ip string) ([]*model.TCPRule, error) GetUsedPortsByIP(ip string) ([]*model.TCPRule, error)
DeleteByComponentPort(componentID string, port int) error
} }
// EndpointsDao is an interface for defining method // EndpointsDao is an interface for defining method
@ -511,6 +515,7 @@ type GwRuleConfigDao interface {
Dao Dao
DeleteByRuleID(rid string) error DeleteByRuleID(rid string) error
ListByRuleID(rid string) ([]*model.GwRuleConfig, error) ListByRuleID(rid string) ([]*model.GwRuleConfig, error)
DeleteByRuleIDs(ruleIDs []string) error
} }
// TenantServceAutoscalerRulesDao - // TenantServceAutoscalerRulesDao -

View File

@ -34,6 +34,7 @@ import (
type Manager interface { type Manager interface {
CloseManager() error CloseManager() error
Begin() *gorm.DB Begin() *gorm.DB
DB() *gorm.DB
EnsureEndTransactionFunc() func(tx *gorm.DB) EnsureEndTransactionFunc() func(tx *gorm.DB)
VolumeTypeDao() dao.VolumeTypeDao VolumeTypeDao() dao.VolumeTypeDao
LicenseDao() dao.LicenseDao LicenseDao() dao.LicenseDao

View File

@ -22,8 +22,10 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"github.com/goodrain/rainbond/api/util/bcode"
"github.com/goodrain/rainbond/db/model" "github.com/goodrain/rainbond/db/model"
"github.com/jinzhu/gorm" "github.com/jinzhu/gorm"
"github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -36,7 +38,7 @@ type CertificateDaoImpl struct {
func (c *CertificateDaoImpl) AddModel(mo model.Interface) error { func (c *CertificateDaoImpl) AddModel(mo model.Interface) error {
certificate, ok := mo.(*model.Certificate) certificate, ok := mo.(*model.Certificate)
if !ok { if !ok {
return fmt.Errorf("Can't convert %s to %s", reflect.TypeOf(mo).String(), "*model.Certificate") return fmt.Errorf("can't convert %s to %s", reflect.TypeOf(mo).String(), "*model.Certificate")
} }
var old model.Certificate var old model.Certificate
if ok := c.DB.Where("uuid = ?", certificate.UUID).Find(&old).RecordNotFound(); ok { if ok := c.DB.Where("uuid = ?", certificate.UUID).Find(&old).RecordNotFound(); ok {
@ -44,8 +46,7 @@ func (c *CertificateDaoImpl) AddModel(mo model.Interface) error {
return err return err
} }
} else { } else {
return fmt.Errorf("Certificate already exists based on certificateID(%s)", return fmt.Errorf("certificate already exists based on certificateID(%s)", certificate.UUID)
certificate.UUID)
} }
return nil return nil
} }
@ -54,7 +55,7 @@ func (c *CertificateDaoImpl) AddModel(mo model.Interface) error {
func (c *CertificateDaoImpl) UpdateModel(mo model.Interface) error { func (c *CertificateDaoImpl) UpdateModel(mo model.Interface) error {
cert, ok := mo.(*model.Certificate) cert, ok := mo.(*model.Certificate)
if !ok { if !ok {
return fmt.Errorf("Failed to convert %s to *model.Certificate", reflect.TypeOf(mo).String()) return fmt.Errorf("failed to convert %s to *model.Certificate", reflect.TypeOf(mo).String())
} }
return c.DB.Table(cert.TableName()). return c.DB.Table(cert.TableName()).
Where("uuid = ?", cert.UUID). Where("uuid = ?", cert.UUID).
@ -65,7 +66,7 @@ func (c *CertificateDaoImpl) UpdateModel(mo model.Interface) error {
func (c *CertificateDaoImpl) AddOrUpdate(mo model.Interface) error { func (c *CertificateDaoImpl) AddOrUpdate(mo model.Interface) error {
cert, ok := mo.(*model.Certificate) cert, ok := mo.(*model.Certificate)
if !ok { if !ok {
return fmt.Errorf("Failed to convert %s to *model.Certificate", reflect.TypeOf(mo).String()) return fmt.Errorf("failed to convert %s to *model.Certificate", reflect.TypeOf(mo).String())
} }
var old model.Certificate var old model.Certificate
@ -104,7 +105,7 @@ type RuleExtensionDaoImpl struct {
func (c *RuleExtensionDaoImpl) AddModel(mo model.Interface) error { func (c *RuleExtensionDaoImpl) AddModel(mo model.Interface) error {
re, ok := mo.(*model.RuleExtension) re, ok := mo.(*model.RuleExtension)
if !ok { if !ok {
return fmt.Errorf("Can't convert %s to %s", reflect.TypeOf(mo).String(), "*model.RuleExtension") return fmt.Errorf("can't convert %s to %s", reflect.TypeOf(mo).String(), "*model.RuleExtension")
} }
var old model.RuleExtension var old model.RuleExtension
if ok := c.DB.Where("rule_id = ? and value = ?", re.RuleID, re.Value).Find(&old).RecordNotFound(); ok { if ok := c.DB.Where("rule_id = ? and value = ?", re.RuleID, re.Value).Find(&old).RecordNotFound(); ok {
@ -139,6 +140,14 @@ func (c *RuleExtensionDaoImpl) DeleteRuleExtensionByRuleID(ruleID string) error
return c.DB.Where("rule_id=?", ruleID).Delete(re).Error return c.DB.Where("rule_id=?", ruleID).Delete(re).Error
} }
// DeleteByRuleIDs deletes rule extentions based on the given ruleIDs.
func (c *RuleExtensionDaoImpl) DeleteByRuleIDs(ruleIDs []string) error {
if err := c.DB.Where("rule_id in (?)", ruleIDs).Delete(&model.RuleExtension{}).Error; err != nil {
return errors.Wrap(err, "delete rule extentions")
}
return nil
}
//HTTPRuleDaoImpl http rule //HTTPRuleDaoImpl http rule
type HTTPRuleDaoImpl struct { type HTTPRuleDaoImpl struct {
DB *gorm.DB DB *gorm.DB
@ -148,7 +157,7 @@ type HTTPRuleDaoImpl struct {
func (h *HTTPRuleDaoImpl) AddModel(mo model.Interface) error { func (h *HTTPRuleDaoImpl) AddModel(mo model.Interface) error {
httpRule, ok := mo.(*model.HTTPRule) httpRule, ok := mo.(*model.HTTPRule)
if !ok { if !ok {
return fmt.Errorf("Can't not convert %s to *model.HTTPRule", reflect.TypeOf(mo).String()) return fmt.Errorf("can't not convert %s to *model.HTTPRule", reflect.TypeOf(mo).String())
} }
var oldHTTPRule model.HTTPRule var oldHTTPRule model.HTTPRule
if ok := h.DB.Where("uuid=?", httpRule.UUID).Find(&oldHTTPRule).RecordNotFound(); ok { if ok := h.DB.Where("uuid=?", httpRule.UUID).Find(&oldHTTPRule).RecordNotFound(); ok {
@ -165,7 +174,7 @@ func (h *HTTPRuleDaoImpl) AddModel(mo model.Interface) error {
func (h *HTTPRuleDaoImpl) UpdateModel(mo model.Interface) error { func (h *HTTPRuleDaoImpl) UpdateModel(mo model.Interface) error {
hr, ok := mo.(*model.HTTPRule) hr, ok := mo.(*model.HTTPRule)
if !ok { if !ok {
return fmt.Errorf("Failed to convert %s to *model.HTTPRule", reflect.TypeOf(mo).String()) return fmt.Errorf("failed to convert %s to *model.HTTPRule", reflect.TypeOf(mo).String())
} }
return h.DB.Save(hr).Error return h.DB.Save(hr).Error
} }
@ -218,6 +227,17 @@ func (h *HTTPRuleDaoImpl) DeleteHTTPRuleByID(id string) error {
return nil return nil
} }
// DeleteByComponentPort deletes http rules based on componentID and port.
func (h *HTTPRuleDaoImpl) DeleteByComponentPort(componentID string, port int) error {
if err := h.DB.Where("service_id=? and container_port=?", componentID, port).Delete(&model.HTTPRule{}).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return errors.Wrap(bcode.ErrIngressHTTPRuleNotFound, "delete http rules")
}
return err
}
return nil
}
//DeleteHTTPRuleByServiceID delete http rule by service id //DeleteHTTPRuleByServiceID delete http rule by service id
func (h *HTTPRuleDaoImpl) DeleteHTTPRuleByServiceID(serviceID string) error { func (h *HTTPRuleDaoImpl) DeleteHTTPRuleByServiceID(serviceID string) error {
httpRule := &model.HTTPRule{} httpRule := &model.HTTPRule{}
@ -236,6 +256,15 @@ func (h *HTTPRuleDaoImpl) ListByServiceID(serviceID string) ([]*model.HTTPRule,
return rules, nil return rules, nil
} }
// ListByComponentPort lists http rules based on the given componentID and port.
func (h *HTTPRuleDaoImpl) ListByComponentPort(componentID string, port int) ([]*model.HTTPRule, error) {
var rules []*model.HTTPRule
if err := h.DB.Where("service_id=? and container_port=?", componentID, port).Find(&rules).Error; err != nil {
return nil, errors.Wrap(err, "list http rules")
}
return rules, nil
}
// ListByCertID lists all HTTPRules matching certificate id // ListByCertID lists all HTTPRules matching certificate id
func (h *HTTPRuleDaoImpl) ListByCertID(certID string) ([]*model.HTTPRule, error) { func (h *HTTPRuleDaoImpl) ListByCertID(certID string) ([]*model.HTTPRule, error) {
var rules []*model.HTTPRule var rules []*model.HTTPRule
@ -268,7 +297,7 @@ func (t *TCPRuleDaoTmpl) AddModel(mo model.Interface) error {
func (t *TCPRuleDaoTmpl) UpdateModel(mo model.Interface) error { func (t *TCPRuleDaoTmpl) UpdateModel(mo model.Interface) error {
tr, ok := mo.(*model.TCPRule) tr, ok := mo.(*model.TCPRule)
if !ok { if !ok {
return fmt.Errorf("Failed to convert %s to *model.TCPRule", reflect.TypeOf(mo).String()) return fmt.Errorf("failed to convert %s to *model.TCPRule", reflect.TypeOf(mo).String())
} }
return t.DB.Save(tr).Error return t.DB.Save(tr).Error
@ -323,6 +352,17 @@ func (t *TCPRuleDaoTmpl) DeleteTCPRuleByServiceID(serviceID string) error {
return t.DB.Where("service_id = ?", serviceID).Delete(tcpRule).Error return t.DB.Where("service_id = ?", serviceID).Delete(tcpRule).Error
} }
// DeleteByComponentPort deletes tcp rules based on the given component id and port.
func (t *TCPRuleDaoTmpl) DeleteByComponentPort(componentID string, port int) error {
if err := t.DB.Where("service_id=? and container_port=?", componentID, port).Delete(&model.TCPRule{}).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return errors.Wrap(bcode.ErrIngressTCPRuleNotFound, "delete tcp rules")
}
return errors.Wrap(err, "delete tcp rules")
}
return nil
}
//GetUsedPortsByIP get used port by ip //GetUsedPortsByIP get used port by ip
//sort by port //sort by port
func (t *TCPRuleDaoTmpl) GetUsedPortsByIP(ip string) ([]*model.TCPRule, error) { func (t *TCPRuleDaoTmpl) GetUsedPortsByIP(ip string) ([]*model.TCPRule, error) {
@ -387,3 +427,11 @@ func (t *GwRuleConfigDaoImpl) ListByRuleID(rid string) ([]*model.GwRuleConfig, e
} }
return res, nil return res, nil
} }
// DeleteByRuleIDs deletes rule configs based on the given ruleIDs.
func (t *GwRuleConfigDaoImpl) DeleteByRuleIDs(ruleIDs []string) error {
if err := t.DB.Where("rule_id in (?)", ruleIDs).Delete(&model.GwRuleConfig{}).Error; err != nil {
return errors.Wrap(err, "delete rule configs")
}
return nil
}

View File

@ -370,6 +370,9 @@ func (t *TenantServicesDaoImpl) GetPagedTenantService(offset, length int, servic
} }
} }
tenants, err := t.DB.Raw("SELECT uuid,name,eid from tenants where uuid in (?)", tenantIDs).Rows() tenants, err := t.DB.Raw("SELECT uuid,name,eid from tenants where uuid in (?)", tenantIDs).Rows()
if err != nil {
return nil, 0, pkgerr.Wrap(err, "list tenants")
}
defer tenants.Close() defer tenants.Close()
for tenants.Next() { for tenants.Next() {
var tenantID string var tenantID string
@ -647,6 +650,9 @@ func (t *TenantServicesPortDaoImpl) DeleteModel(serviceID string, args ...interf
//Protocol: protocol, //Protocol: protocol,
} }
if err := t.DB.Where("service_id=? and container_port=?", serviceID, containerPort).Delete(tsp).Error; err != nil { if err := t.DB.Where("service_id=? and container_port=?", serviceID, containerPort).Delete(tsp).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return pkgerr.Wrap(bcode.ErrPortNotFound, "delete component port")
}
return err return err
} }
return nil return nil

View File

@ -57,6 +57,9 @@ func CreateManager(config config.Config) (*Manager, error) {
return nil, err return nil, err
} }
} }
if config.ShowSQL {
db = db.Debug()
}
manager := &Manager{ manager := &Manager{
db: db, db: db,
config: config, config: config,
@ -79,6 +82,11 @@ func (m *Manager) Begin() *gorm.DB {
return m.db.Begin() return m.db.Begin()
} }
// DB returns the db.
func (m *Manager) DB() *gorm.DB {
return m.db
}
// EnsureEndTransactionFunc - // EnsureEndTransactionFunc -
func (m *Manager) EnsureEndTransactionFunc() func(tx *gorm.DB) { func (m *Manager) EnsureEndTransactionFunc() func(tx *gorm.DB) {
return func(tx *gorm.DB) { return func(tx *gorm.DB) {

View File

@ -62,6 +62,9 @@ func NewClient(ctx context.Context, conf AppRuntimeSyncClientConf) (*AppRuntimeS
KeyFile: conf.EtcdKeyFile, KeyFile: conf.EtcdKeyFile,
} }
c, err := etcdutil.NewClient(ctx, etcdClientArgs) c, err := etcdutil.NewClient(ctx, etcdClientArgs)
if err != nil {
return nil, err
}
r := &grpcutil.GRPCResolver{Client: c} r := &grpcutil.GRPCResolver{Client: c}
b := grpc.RoundRobin(r) b := grpc.RoundRobin(r)
arsc.cc, err = grpc.DialContext(ctx, "/rainbond/discover/app_sync_runtime_server", grpc.WithBalancer(b), grpc.WithInsecure(), grpc.WithBlock()) arsc.cc, err = grpc.DialContext(ctx, "/rainbond/discover/app_sync_runtime_server", grpc.WithBalancer(b), grpc.WithInsecure(), grpc.WithBlock())