Merge pull request #1000 from yangkaa/master-install1

Fix Bug: Create components to support all attributes
This commit is contained in:
barnettZQG 2021-05-26 15:02:46 +08:00 committed by GitHub
commit 1428a5b59d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 450 additions and 86 deletions

View File

@ -57,6 +57,25 @@ func CreateGatewayManager(dbmanager db.Manager, mqclient client.MQClient, etcdCl
// AddHTTPRule adds http rule to db if it doesn't exists.
func (g *GatewayAction) AddHTTPRule(req *apimodel.AddHTTPRuleStruct) error {
if err := db.GetManager().DB().Transaction(func(tx *gorm.DB) error {
return g.CreateHTTPRule(tx, req)
}); err != nil {
return err
}
// Effective immediately
if err := g.SendTask(map[string]interface{}{
"service_id": req.ServiceID,
"action": "add-http-rule",
"limit": map[string]string{"domain": req.Domain},
}); err != nil {
logrus.Errorf("send runtime message about gateway failure %s", err.Error())
}
return nil
}
// CreateHTTPRule Create http rules through transactions
func (g *GatewayAction) CreateHTTPRule(tx *gorm.DB, req *apimodel.AddHTTPRuleStruct) error {
httpRule := &model.HTTPRule{
UUID: req.HTTPRuleID,
ServiceID: req.ServiceID,
@ -74,17 +93,7 @@ func (g *GatewayAction) AddHTTPRule(req *apimodel.AddHTTPRuleStruct) error {
IP: req.IP,
CertificateID: req.CertificateID,
}
// begin transaction
tx := db.GetManager().Begin()
defer func() {
if r := recover(); r != nil {
logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
tx.Rollback()
}
}()
if err := db.GetManager().HTTPRuleDaoTransactions(tx).AddModel(httpRule); err != nil {
tx.Rollback()
return fmt.Errorf("create http rule: %v", err)
}
@ -96,7 +105,6 @@ func (g *GatewayAction) AddHTTPRule(req *apimodel.AddHTTPRuleStruct) error {
PrivateKey: req.PrivateKey,
}
if err := db.GetManager().CertificateDaoTransactions(tx).AddOrUpdate(cert); err != nil {
tx.Rollback()
return fmt.Errorf("create or update http rule: %v", err)
}
}
@ -109,24 +117,9 @@ func (g *GatewayAction) AddHTTPRule(req *apimodel.AddHTTPRuleStruct) error {
Value: ruleExtension.Value,
}
if err := db.GetManager().RuleExtensionDaoTransactions(tx).AddModel(re); err != nil {
tx.Rollback()
return fmt.Errorf("create rule extensions: %v", err)
}
}
// end transaction
if err := tx.Commit().Error; err != nil {
tx.Rollback()
return fmt.Errorf("commit transaction: %v", err)
}
// Effective immediately
if err := g.SendTask(map[string]interface{}{
"service_id": req.ServiceID,
"action": "add-http-rule",
"limit": map[string]string{"domain": req.Domain},
}); err != nil {
logrus.Errorf("send runtime message about gateway failure %s", err.Error())
}
return nil
}
@ -330,14 +323,23 @@ func (g *GatewayAction) UpdateCertificate(req apimodel.AddHTTPRuleStruct, httpRu
// AddTCPRule adds tcp rule.
func (g *GatewayAction) AddTCPRule(req *apimodel.AddTCPRuleStruct) error {
// begin transaction
tx := db.GetManager().Begin()
defer func() {
if r := recover(); r != nil {
logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
tx.Rollback()
if err := g.dbmanager.DB().Transaction(func(tx *gorm.DB) error {
return g.CreateTCPRule(tx, req)
}); err != nil {
return err
}
}()
if err := g.SendTask(map[string]interface{}{
"service_id": req.ServiceID,
"action": "add-tcp-rule",
"limit": map[string]string{"tcp-address": fmt.Sprintf("%s:%d", req.IP, req.Port)},
}); err != nil {
logrus.Errorf("send runtime message about gateway failure %s", err.Error())
}
return nil
}
// CreateTCPRule Create tcp rules through transactions
func (g *GatewayAction) CreateTCPRule(tx *gorm.DB, req *apimodel.AddTCPRuleStruct) error {
// add tcp rule
tcpRule := &model.TCPRule{
UUID: req.TCPRuleID,
@ -347,7 +349,6 @@ func (g *GatewayAction) AddTCPRule(req *apimodel.AddTCPRuleStruct) error {
Port: req.Port,
}
if err := g.dbmanager.TCPRuleDaoTransactions(tx).AddModel(tcpRule); err != nil {
tx.Rollback()
return err
}
// add rule extensions
@ -358,22 +359,9 @@ func (g *GatewayAction) AddTCPRule(req *apimodel.AddTCPRuleStruct) error {
Value: ruleExtension.Value,
}
if err := g.dbmanager.RuleExtensionDaoTransactions(tx).AddModel(re); err != nil {
tx.Rollback()
return err
}
}
// end transaction
if err := tx.Commit().Error; err != nil {
return err
}
if err := g.SendTask(map[string]interface{}{
"service_id": tcpRule.ServiceID,
"action": "add-tcp-rule",
"limit": map[string]string{"tcp-address": fmt.Sprintf("%s:%d", tcpRule.IP, tcpRule.Port)},
}); err != nil {
logrus.Errorf("send runtime message about gateway failure %s", err.Error())
}
return nil
}

View File

@ -27,6 +27,7 @@ import (
//GatewayHandler gateway api handler
type GatewayHandler interface {
AddHTTPRule(req *apimodel.AddHTTPRuleStruct) error
CreateHTTPRule(tx *gorm.DB, req *apimodel.AddHTTPRuleStruct) error
UpdateHTTPRule(req *apimodel.UpdateHTTPRuleStruct) error
DeleteHTTPRule(req *apimodel.DeleteHTTPRuleStruct) error
DeleteHTTPRuleByServiceIDWithTransaction(sid string, tx *gorm.DB) error
@ -35,6 +36,7 @@ type GatewayHandler interface {
UpdateCertificate(req apimodel.AddHTTPRuleStruct, httpRule *dbmodel.HTTPRule, tx *gorm.DB) error
AddTCPRule(req *apimodel.AddTCPRuleStruct) error
CreateTCPRule(tx *gorm.DB, req *apimodel.AddTCPRuleStruct) error
UpdateTCPRule(req *apimodel.UpdateTCPRuleStruct, minPort int) error
DeleteTCPRule(req *apimodel.DeleteTCPRuleStruct) error
DeleteTCPRuleByServiceIDWithTransaction(sid string, tx *gorm.DB) error

View File

@ -45,7 +45,6 @@ import (
"github.com/twinj/uuid"
api_model "github.com/goodrain/rainbond/api/model"
dberrors "github.com/goodrain/rainbond/db/errors"
core_model "github.com/goodrain/rainbond/db/model"
dbmodel "github.com/goodrain/rainbond/db/model"
eventutil "github.com/goodrain/rainbond/eventlog/util"
@ -246,6 +245,7 @@ func (s *ServiceAction) isWindowsService(serviceID string) bool {
//AddLabel add labels
func (s *ServiceAction) AddLabel(l *api_model.LabelsStruct, serviceID string) error {
tx := db.GetManager().Begin()
defer func() {
if r := recover(); r != nil {
@ -366,7 +366,7 @@ func (s *ServiceAction) StartStopService(sss *api_model.StartStopStruct) error {
func (s *ServiceAction) ServiceVertical(vs *model.VerticalScalingTaskBody) error {
service, err := db.GetManager().TenantServiceDao().GetServiceByID(vs.ServiceID)
if err != nil {
logrus.Errorf("get service by id %s error, %s", service.ServiceID, err)
logrus.Errorf("get service by id %s error, %s", vs.ServiceID, err)
return err
}
oldMemory := service.ContainerMemory
@ -506,11 +506,17 @@ func (s *ServiceAction) ServiceCreate(sc *api_model.ServiceStruct) error {
ts.ServiceName = ts.ServiceAlias
}
ts.UpdateTime = time.Now()
ports := sc.PortsInfo
envs := sc.EnvsInfo
volumns := sc.VolumesInfo
dependVolumes := sc.DepVolumesInfo
dependIds := sc.DependIDs
var (
ports = sc.PortsInfo
envs = sc.EnvsInfo
volumns = sc.VolumesInfo
dependVolumes = sc.DepVolumesInfo
dependIds = sc.DependIDs
probes = sc.ComponentProbes
monitors = sc.ComponentMonitors
httpRules = sc.HTTPRules
tcpRules = sc.TCPRules
)
ts.AppID = sc.AppID
ts.DeployVersion = ""
tx := db.GetManager().Begin()
@ -528,36 +534,32 @@ func (s *ServiceAction) ServiceCreate(sc *api_model.ServiceStruct) error {
}
//set app envs
if len(envs) > 0 {
var batchEnvs []*dbmodel.TenantServiceEnvVar
for _, env := range envs {
env.ServiceID = ts.ServiceID
env.TenantID = ts.TenantID
if err := db.GetManager().TenantServiceEnvVarDaoTransactions(tx).AddModel(&env); err != nil {
logrus.Errorf("add env[name=%s] error, %v", env.AttrName, err)
if err != dberrors.ErrRecordAlreadyExist {
batchEnvs = append(batchEnvs, &env)
}
if err := db.GetManager().TenantServiceEnvVarDaoTransactions(tx).CreateOrUpdateEnvsInBatch(batchEnvs);err != nil {
logrus.Errorf("batch add env error, %v", err)
tx.Rollback()
return err
}
logrus.Warningf("recover env[name=%s]", env.AttrName)
// if env already exists, update it
if err = db.GetManager().TenantServiceEnvVarDaoTransactions(tx).UpdateModel(&env); err != nil {
tx.Rollback()
return err
}
}
}
}
//set app port
if len(ports) > 0 {
var batchPorts []*dbmodel.TenantServicesPort
for _, port := range ports {
port.ServiceID = ts.ServiceID
port.TenantID = ts.TenantID
if err := db.GetManager().TenantServicesPortDaoTransactions(tx).AddModel(&port); err != nil {
logrus.Errorf("add port %v error, %v", port.ContainerPort, err)
batchPorts = append(batchPorts, &port)
}
if err := db.GetManager().TenantServicesPortDaoTransactions(tx).CreateOrUpdatePortsInBatch(batchPorts); err != nil {
logrus.Errorf("batch add port error, %v", err)
tx.Rollback()
return err
}
}
}
//set app volumns
if len(volumns) > 0 {
localPath := os.Getenv("LOCAL_DATA_PATH")
@ -750,6 +752,65 @@ func (s *ServiceAction) ServiceCreate(sc *api_model.ServiceStruct) error {
}
}
}
if len(probes) > 0 {
for _, pb := range probes {
probe := s.convertProbeModel(&pb, ts.ServiceID)
if err := db.GetManager().ServiceProbeDaoTransactions(tx).AddModel(probe); err != nil {
logrus.Errorf("add probe %v error, %v", probe.ProbeID, err)
tx.Rollback()
return err
}
}
}
if len(monitors) > 0 {
for _, m := range monitors {
monitor := dbmodel.TenantServiceMonitor{
Name: m.Name,
TenantID: ts.TenantID,
ServiceID: ts.ServiceID,
ServiceShowName: m.ServiceShowName,
Port: m.Port,
Path: m.Path,
Interval: m.Interval,
}
if err := db.GetManager().TenantServiceMonitorDaoTransactions(tx).AddModel(&monitor); err != nil {
logrus.Errorf("add monitor %v error, %v", monitor.Name, err)
tx.Rollback()
return err
}
}
}
if len(httpRules) > 0 {
for _, httpRule := range httpRules {
if err := GetGatewayHandler().CreateHTTPRule(tx, &httpRule); err != nil {
logrus.Errorf("add service http rule error %v", err)
tx.Rollback()
return err
}
}
}
if len(tcpRules) > 0 {
for _, tcpRule := range tcpRules {
if err := GetGatewayHandler().CreateTCPRule(tx, &tcpRule); err != nil {
logrus.Errorf("add service tcp rule error %v", err)
tx.Rollback()
return err
}
}
}
labelModel := dbmodel.TenantServiceLable{
ServiceID: ts.ServiceID,
LabelKey: dbmodel.LabelKeyServiceType,
LabelValue: core_util.StatelessServiceType,
}
if ts.IsState(){
labelModel.LabelValue = core_util.StatefulServiceType
}
if err := db.GetManager().TenantServiceLabelDaoTransactions(tx).AddModel(&labelModel); err != nil {
tx.Rollback()
return err
}
// TODO: create default probe for third-party service.
if err := tx.Commit().Error; err != nil {
tx.Rollback()
@ -759,6 +820,26 @@ func (s *ServiceAction) ServiceCreate(sc *api_model.ServiceStruct) error {
return nil
}
func (s *ServiceAction) convertProbeModel(req *api_model.ServiceProbe, serviceID string) *dbmodel.TenantServiceProbe {
return &dbmodel.TenantServiceProbe{
ServiceID: serviceID,
Cmd: req.Cmd,
FailureThreshold: req.FailureThreshold,
HTTPHeader: req.HTTPHeader,
InitialDelaySecond: req.InitialDelaySecond,
IsUsed: &req.IsUsed,
Mode: req.Mode,
Path: req.Path,
PeriodSecond: req.PeriodSecond,
Port: req.Port,
ProbeID: req.ProbeID,
Scheme: req.Scheme,
SuccessThreshold: req.SuccessThreshold,
TimeoutSecond: req.TimeoutSecond,
FailureAction: req.FailureAction,
}
}
//ServiceUpdate update service
func (s *ServiceAction) ServiceUpdate(sc map[string]interface{}) error {
ts, err := db.GetManager().TenantServiceDao().GetServiceByID(sc["service_id"].(string))

View File

@ -341,6 +341,10 @@ type ServiceStruct struct {
PortsInfo []dbmodel.TenantServicesPort `json:"ports_info" validate:"ports_info"`
Endpoints *Endpoints `json:"endpoints" validate:"endpoints"`
AppID string `json:"app_id" validate:"required"`
ComponentProbes []ServiceProbe `json:"component_probes" validate:"component_probes"`
ComponentMonitors []AddServiceMonitorRequestStruct `json:"component_monitors" validate:"component_monitors"`
HTTPRules []AddHTTPRuleStruct `json:"http_rules" validate:"http_rules"`
TCPRules []AddTCPRuleStruct `json:"tcp_rules" validate:"tcp_rules"`
}
// Endpoints holds third-party service endpoints or configuraion to get endpoints.

View File

@ -172,6 +172,7 @@ type TenantServicesPortDao interface {
DelByServiceID(sid string) error
ListInnerPortsByServiceIDs(serviceIDs []string) ([]*model.TenantServicesPort, error)
ListByK8sServiceNames(serviceIDs []string) ([]*model.TenantServicesPort, error)
CreateOrUpdatePortsInBatch(ports []*model.TenantServicesPort) error
}
//TenantPluginDao TenantPluginDao
@ -287,6 +288,7 @@ type TenantServiceEnvVarDao interface {
GetEnv(serviceID, envName string) (*model.TenantServiceEnvVar, error)
DELServiceEnvsByServiceID(serviceID string) error
DelByServiceIDAndScope(sid, scope string) error
CreateOrUpdateEnvsInBatch(envs []*model.TenantServiceEnvVar) error
}
//TenantServiceMountRelationDao TenantServiceMountRelationDao

View File

@ -32,6 +32,7 @@ import (
"github.com/jinzhu/gorm"
pkgerr "github.com/pkg/errors"
"github.com/sirupsen/logrus"
gormbulkups "github.com/atcdot/gorm-bulk-upsert"
)
//TenantDaoImpl 租户信息管理
@ -638,6 +639,19 @@ func (t *TenantServicesPortDaoImpl) UpdateModel(mo model.Interface) error {
return nil
}
// CreateOrUpdatePortsInBatch Batch insert or update ports variables
func (t *TenantServicesPortDaoImpl) CreateOrUpdatePortsInBatch(ports []*model.TenantServicesPort) error{
var objects []interface{}
for _, port := range ports {
port := port
objects = append(objects, *port)
}
if err := gormbulkups.BulkUpsert(t.DB, objects, 2000); err != nil {
return pkgerr.Wrap(err, "create or update ports in batch")
}
return nil
}
//DeleteModel 删除端口
func (t *TenantServicesPortDaoImpl) DeleteModel(serviceID string, args ...interface{}) error {
if len(args) < 1 {
@ -923,6 +937,19 @@ func (t *TenantServiceEnvVarDaoImpl) UpdateModel(mo model.Interface) error {
}).Error
}
// CreateOrUpdateEnvsInBatch Batch insert or update environment variables
func (t *TenantServiceEnvVarDaoImpl) CreateOrUpdateEnvsInBatch(envs []*model.TenantServiceEnvVar) error{
var objects []interface{}
for _, env := range envs {
env := env
objects = append(objects, *env)
}
if err := gormbulkups.BulkUpsert(t.DB, objects, 2000); err != nil {
return pkgerr.Wrap(err, "create or update envs in batch")
}
return nil
}
//DeleteModel 删除env
func (t *TenantServiceEnvVarDaoImpl) DeleteModel(serviceID string, args ...interface{}) error {
envName := args[0].(string)

1
go.mod
View File

@ -9,6 +9,7 @@ require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/alecthomas/units v0.0.0-20201120081800-1786d5ef83d4 // indirect
github.com/aliyun/aliyun-oss-go-sdk v2.1.5+incompatible
github.com/atcdot/gorm-bulk-upsert v1.0.0
github.com/aws/aws-sdk-go v1.36.15
github.com/barnettZQG/gotty v1.0.1-0.20200904091006-a0a1f7d747dc
github.com/beorn7/perks v1.0.1

259
go.sum

File diff suppressed because it is too large Load Diff