ignore circular dependencies

This commit is contained in:
GLYASAI 2019-12-06 13:30:17 +08:00
parent 14e6ad04c6
commit f9c3b1eb14
10 changed files with 621 additions and 15 deletions

View File

@ -19,6 +19,10 @@
package handler
import (
"fmt"
"strings"
"container/list"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/api/model"
"github.com/goodrain/rainbond/db"
@ -44,11 +48,11 @@ func CreateBatchOperationHandler(mqCli gclient.MQClient, operationHandler *Opera
}
}
func setStartupSequenceConfig(configs map[string]string) map[string]string {
func setStartupSequenceConfig(configs map[string]string, depsids []string) map[string]string {
if configs == nil {
configs = make(map[string]string, 1)
}
configs["startup_sequence"] = "true"
configs["boot_seq_dep_service_ids"] = strings.Join(depsids, ",")
return configs
}
@ -69,8 +73,18 @@ func checkResourceEnough(serviceID string) error {
//Build build
func (b *BatchOperationHandler) Build(buildInfos []model.BuildInfoRequestStruct) (re BatchOperationResult) {
var retrys []model.BuildInfoRequestStruct
var serviceIDs []string
for _, info := range buildInfos {
serviceIDs = append(serviceIDs, info.ServiceID)
}
sd, err := NewServiceDependency(serviceIDs)
if err != nil {
logrus.Warningf("create a new ServiceDependency: %v", err)
}
startupSeqConfigs := sd.serviceStartupSequence()
logrus.Debugf("build services; startup sequence configurations: %#v", startupSeqConfigs)
var retrys []model.BuildInfoRequestStruct
for _, buildInfo := range buildInfos {
if err := checkResourceEnough(buildInfo.ServiceID); err != nil {
re.BatchResult = append(re.BatchResult, OperationResult{
@ -83,7 +97,7 @@ func (b *BatchOperationHandler) Build(buildInfos []model.BuildInfoRequestStruct)
})
continue
}
buildInfo.Configs = setStartupSequenceConfig(buildInfo.Configs)
buildInfo.Configs = setStartupSequenceConfig(buildInfo.Configs, startupSeqConfigs[buildInfo.ServiceID])
buildre := b.operationHandler.Build(buildInfo)
if buildre.Status != "success" {
retrys = append(retrys, buildInfo)
@ -99,6 +113,17 @@ func (b *BatchOperationHandler) Build(buildInfos []model.BuildInfoRequestStruct)
//Start batch start
func (b *BatchOperationHandler) Start(startInfos []model.StartOrStopInfoRequestStruct) (re BatchOperationResult) {
var serviceIDs []string
for _, info := range startInfos {
serviceIDs = append(serviceIDs, info.ServiceID)
}
sd, err := NewServiceDependency(serviceIDs)
if err != nil {
logrus.Warningf("create a new ServiceDependency: %v", err)
}
startupSeqConfigs := sd.serviceStartupSequence()
logrus.Debugf("startup sequence configurations: %#v", startupSeqConfigs)
var retrys []model.StartOrStopInfoRequestStruct
for _, startInfo := range startInfos {
if err := checkResourceEnough(startInfo.ServiceID); err != nil {
@ -112,7 +137,9 @@ func (b *BatchOperationHandler) Start(startInfos []model.StartOrStopInfoRequestS
})
continue
}
startInfo.Configs = setStartupSequenceConfig(startInfo.Configs)
// startup sequence
startInfo.Configs = setStartupSequenceConfig(startInfo.Configs, startupSeqConfigs[startInfo.ServiceID])
startre := b.operationHandler.Start(startInfo)
if startre.Status != "success" {
retrys = append(retrys, startInfo)
@ -130,7 +157,6 @@ func (b *BatchOperationHandler) Start(startInfos []model.StartOrStopInfoRequestS
func (b *BatchOperationHandler) Stop(stopInfos []model.StartOrStopInfoRequestStruct) (re BatchOperationResult) {
var retrys []model.StartOrStopInfoRequestStruct
for _, stopInfo := range stopInfos {
stopInfo.Configs = setStartupSequenceConfig(stopInfo.Configs)
stopre := b.operationHandler.Stop(stopInfo)
if stopre.Status != "success" {
retrys = append(retrys, stopInfo)
@ -146,6 +172,17 @@ func (b *BatchOperationHandler) Stop(stopInfos []model.StartOrStopInfoRequestStr
//Upgrade batch upgrade
func (b *BatchOperationHandler) Upgrade(upgradeInfos []model.UpgradeInfoRequestStruct) (re BatchOperationResult) {
var serviceIDs []string
for _, info := range upgradeInfos {
serviceIDs = append(serviceIDs, info.ServiceID)
}
sd, err := NewServiceDependency(serviceIDs)
if err != nil {
logrus.Warningf("create a new ServiceDependency: %v", err)
}
startupSeqConfigs := sd.serviceStartupSequence()
logrus.Debugf("build services; startup sequence configurations: %#v", startupSeqConfigs)
var retrys []model.UpgradeInfoRequestStruct
for _, upgradeInfo := range upgradeInfos {
if err := checkResourceEnough(upgradeInfo.ServiceID); err != nil {
@ -159,7 +196,7 @@ func (b *BatchOperationHandler) Upgrade(upgradeInfos []model.UpgradeInfoRequestS
})
continue
}
upgradeInfo.Configs = setStartupSequenceConfig(upgradeInfo.Configs)
upgradeInfo.Configs = setStartupSequenceConfig(upgradeInfo.Configs, startupSeqConfigs[upgradeInfo.ServiceID])
stopre := b.operationHandler.Upgrade(upgradeInfo)
if stopre.Status != "success" {
retrys = append(retrys, upgradeInfo)
@ -172,3 +209,152 @@ func (b *BatchOperationHandler) Upgrade(upgradeInfos []model.UpgradeInfoRequestS
}
return
}
// ServiceDependency documents a set of services and their dependencies.
// provides the ability to build linked lists of dependencies and find circular dependencies.
type ServiceDependency struct {
serviceIDs []string
sid2depsids map[string][]string
depsid2sids map[string][]string
}
// NewServiceDependency creates a new ServiceDependency.
func NewServiceDependency(serviceIDs []string) (*ServiceDependency, error) {
relations, err := db.GetManager().TenantServiceRelationDao().ListByServiceIDs(serviceIDs)
if err != nil {
return nil, fmt.Errorf("list retions: %v", err)
}
sid2depsids := make(map[string][]string)
depsid2sids := make(map[string][]string)
for _, relation := range relations {
sid2depsids[relation.ServiceID] = append(sid2depsids[relation.ServiceID], relation.DependServiceID)
depsid2sids[relation.DependServiceID] = append(depsid2sids[relation.DependServiceID], relation.ServiceID)
}
logrus.Debugf("create a new ServiceDependency; sid2depsids: %#v; depsid2sids: %#v", sid2depsids, depsid2sids)
return &ServiceDependency{
serviceIDs: serviceIDs,
sid2depsids: sid2depsids,
depsid2sids: depsid2sids,
}, nil
}
// The order in which services are started is determined by their dependencies. If interdependencies occur, one of them is ignored.
func (s *ServiceDependency) serviceStartupSequence() map[string][]string {
headNodes := s.headNodes()
var lists []*list.List
for _, h := range headNodes {
l := list.New()
l.PushBack(h)
lists = append(lists, s.buildLinkListByHead(l)...)
}
result := make(map[string][]string)
for _, l := range lists {
cur := l.Front()
for cur != nil && cur.Next() != nil {
existingVals := result[cur.Value.(string)]
exists := false
for _, val := range existingVals {
if val == cur.Next().Value.(string) {
exists = true
break
}
}
if !exists {
result[cur.Value.(string)] = append(result[cur.Value.(string)], cur.Next().Value.(string))
}
cur = cur.Next()
}
}
return result
}
// headNodes finds out the service ID of all head nodes. The head nodes are services that are not dependent on other services.
func (s *ServiceDependency) headNodes() []string {
var headNodes []string
for _, sid := range s.serviceIDs {
if _, ok := s.depsid2sids[sid]; ok {
continue
}
headNodes = append(headNodes, sid)
}
// if there is no head node(i.e. a->b->c->d->a), then a node is randomly selected.
// however, this node cannot be a tail node
for _, sid := range s.serviceIDs {
// does not depend on other services, it is the tail node
if _, ok := s.sid2depsids[sid]; !ok {
continue
}
headNodes = append(headNodes, sid)
logrus.Debugf("randomly select '%s' as the head node", sid)
break
}
return headNodes
}
// buildLinkListByHead recursively creates linked lists of service dependencies.
//
// recursive end condition:
// 1. nil or empty input
// 2. no more children
// 3. child node is already in the linked list
func (s *ServiceDependency) buildLinkListByHead(l *list.List) []*list.List {
// nil or empty input
if l == nil || l.Len() == 0 {
return nil
}
// the last node is the head node of the new linked list
sid, _ := l.Back().Value.(string)
depsids, ok := s.sid2depsids[sid]
// no more children
if !ok {
copy := list.New()
copy.PushBackList(l)
return []*list.List{l}
}
var result []*list.List
for _, depsid := range depsids {
// child node is already in the linked list
if alreadyInLinkedList(l, depsid) {
copy := list.New()
copy.PushBackList(l)
result = append(result, copy)
continue
}
newl := list.New()
newl.PushBackList(l)
newl.PushBack(depsid)
sublists := s.buildLinkListByHead(newl)
if len(sublists) == 0 {
result = append(result, newl)
} else {
for _, sublist := range sublists {
result = append(result, sublist)
}
}
}
return result
}
func alreadyInLinkedList(l *list.List, depsid string) bool {
pre := l.Back()
for pre != nil {
val := pre.Value.(string)
if val == depsid {
return true
}
pre = pre.Prev()
}
return false
}

View File

@ -0,0 +1,350 @@
package handler
import (
"container/list"
"strings"
"testing"
)
func TestBuildLinkListByHead(t *testing.T) {
tests := []struct {
name string
l *list.List
sid2depsids map[string][]string
want []*list.List
}{
{
name: "nil linked list",
l: nil,
want: nil,
},
{
name: "empty linked list",
l: func() *list.List {
return list.New()
}(),
want: nil,
},
{
name: "no more children",
l: func() *list.List {
l := list.New()
l.PushBack("apple")
return l
}(),
want: func() []*list.List {
l := list.New()
l.PushBack("apple")
return []*list.List{l}
}(),
},
{
name: "child node is already in the linked list",
l: func() *list.List {
l := list.New()
l.PushBack("apple")
l.PushBack("banana")
l.PushBack("cat")
l.PushBack("dog")
return l
}(),
sid2depsids: map[string][]string{
"dog": []string{"banana"},
},
want: func() []*list.List {
l := list.New()
l.PushBack("apple")
l.PushBack("banana")
l.PushBack("cat")
l.PushBack("dog")
return []*list.List{l}
}(),
},
{
name: "one child node is not already in the linked list, the other one is not",
l: func() *list.List {
l := list.New()
l.PushBack("apple")
l.PushBack("banana")
l.PushBack("cat")
l.PushBack("dog")
return l
}(),
sid2depsids: map[string][]string{
"dog": []string{"banana", "elephant"},
},
want: func() []*list.List {
l := list.New()
l.PushBack("apple")
l.PushBack("banana")
l.PushBack("cat")
l.PushBack("dog")
l2 := list.New()
l2.PushBack("apple")
l2.PushBack("banana")
l2.PushBack("cat")
l2.PushBack("dog")
l2.PushBack("elephant")
return []*list.List{l, l2}
}(),
},
{
name: "three sub lists",
l: func() *list.List {
l := list.New()
l.PushBack("apple")
return l
}(),
sid2depsids: map[string][]string{
"apple": []string{"banana"},
"banana": []string{"cat", "cake", "candy"},
"cat": []string{"dog"},
"cake": []string{"dance"},
"candy": []string{"daughter"},
},
want: func() []*list.List {
l1 := list.New()
l1.PushBack("apple")
l1.PushBack("banana")
l1.PushBack("cat")
l1.PushBack("dog")
l2 := list.New()
l2.PushBack("apple")
l2.PushBack("banana")
l2.PushBack("cake")
l2.PushBack("dance")
l3 := list.New()
l3.PushBack("apple")
l3.PushBack("banana")
l3.PushBack("candy")
l3.PushBack("daughter")
return []*list.List{l1, l2, l3}
}(),
},
{
name: "single linked list",
l: func() *list.List {
l := list.New()
l.PushBack("apple")
return l
}(),
sid2depsids: map[string][]string{
"apple": []string{"banana"},
"banana": []string{"cat"},
"cat": []string{"dog"},
},
want: func() []*list.List {
l := list.New()
l.PushBack("apple")
l.PushBack("banana")
l.PushBack("cat")
l.PushBack("dog")
return []*list.List{l}
}(),
},
{
name: "ring linked list",
l: func() *list.List {
l := list.New()
l.PushBack("apple")
return l
}(),
sid2depsids: map[string][]string{
"apple": []string{"banana"},
"banana": []string{"cat"},
"cat": []string{"dog"},
"dog": []string{"apple"},
},
want: func() []*list.List {
l := list.New()
l.PushBack("apple")
l.PushBack("banana")
l.PushBack("cat")
l.PushBack("dog")
return []*list.List{l}
}(),
},
}
for idx := range tests {
tc := tests[idx]
t.Run(tc.name, func(t *testing.T) {
sd := &ServiceDependency{
sid2depsids: tc.sid2depsids,
}
got := sd.buildLinkListByHead(tc.l)
if !listsEqual(got, tc.want) {
t.Errorf("expected %#v, but got %#v", linkedLists2String(tc.want), linkedLists2String(got))
}
})
}
}
func listsEqual(got, want []*list.List) bool {
if len(got) != len(want) {
return false
}
listEqual := func(g, w *list.List) bool {
gele := g.Front()
wele := w.Front()
for gele != nil {
if gele.Value != wele.Value {
return false
}
gele = gele.Next()
wele = wele.Next()
}
return true
}
for _, g := range got {
flag := false
for _, w := range want {
if g.Len() != w.Len() {
continue
}
// check if linked list g is equals to w
if listEqual(g, w) {
flag = true
break
}
}
if !flag {
return false
}
}
return true
}
func linkedLists2String(lists []*list.List) string {
var strs []string
for _, l := range lists {
var lstrs []string
cur := l.Front()
for cur != nil {
lstrs = append(lstrs, cur.Value.(string))
cur = cur.Next()
}
strs = append(strs, strings.Join(lstrs, "->"))
}
return strings.Join(strs, "; ")
}
func TestServiceStartupSequence(t *testing.T) {
tests := []struct {
name string
serviceIDS []string
sid2depsids map[string][]string
depsid2sids map[string][]string
want map[string][]string
}{
{
name: "one to two",
serviceIDS: []string{"apple", "banana", "cat"},
sid2depsids: map[string][]string{
"apple": []string{
"banana",
"cat",
},
},
depsid2sids: map[string][]string{
"banana": []string{
"apple",
},
"cat": []string{
"apple",
},
},
want: map[string][]string{
"apple": []string{
"banana",
"cat",
},
},
},
{
name: "a circle",
serviceIDS: []string{"apple", "banana", "cat"},
sid2depsids: map[string][]string{
"apple": []string{
"banana",
},
"banana": []string{
"cat",
},
"cat": []string{
"apple",
},
},
depsid2sids: map[string][]string{
"banana": []string{
"apple",
},
"cat": []string{
"banana",
},
"apple": []string{
"cat",
},
},
want: map[string][]string{
"apple": []string{
"banana",
},
"banana": []string{
"cat",
},
},
},
}
equal := func(want, got map[string][]string) bool {
if len(want) != len(got) {
return false
}
for wk, wv := range want {
gv := got[wk]
if len(wv) != len(gv) {
return false
}
flag := false
for _, wsid := range wv {
for _, gsid := range gv {
if wsid == gsid {
flag = true
break
}
}
if !flag {
return false
}
}
}
return true
}
for idx := range tests {
tc := tests[idx]
t.Run(tc.name, func(t *testing.T) {
sd := &ServiceDependency{
serviceIDs: tc.serviceIDS,
sid2depsids: tc.sid2depsids,
depsid2sids: tc.depsid2sids,
}
got := sd.serviceStartupSequence()
if !equal(tc.want, got) {
t.Errorf("expected %v, but got %v", tc.want, got)
}
})
}
}

View File

@ -175,11 +175,12 @@ func (o *OperationHandler) Start(startInfo model.StartOrStopInfoRequestStruct) (
}
re.EventID = startInfo.EventID
TaskBody := dmodel.StartTaskBody{
TenantID: service.TenantID,
ServiceID: service.ServiceID,
DeployVersion: service.DeployVersion,
EventID: startInfo.EventID,
Configs: startInfo.Configs,
TenantID: service.TenantID,
ServiceID: service.ServiceID,
DeployVersion: service.DeployVersion,
EventID: startInfo.EventID,
Configs: startInfo.Configs,
DepServiceIDInBootSeq: startInfo.DepServiceIDInBootSeq,
}
err = o.mqCli.SendBuilderTopic(gclient.TaskStruct{
TaskType: "start",

View File

@ -0,0 +1,47 @@
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package svclink
// ServiceList linked list of services, used to find closed loops in linked lists.
type ServiceList struct {
head *service
}
// service linked list node about service
type service struct {
sid string
next *service
}
// New creates a new ServiceList.
func New(sid string) *ServiceList {
return &ServiceList{
head: &service{
sid: sid,
},
}
}
func (s *ServiceList) Add(sid string) {
svc := &service{sid: sid}
if s.head == nil {
s.head = svc
return
}
}

View File

@ -1531,6 +1531,8 @@ type StartOrStopInfoRequestStruct struct {
EventID string `json:"event_id"`
ServiceID string `json:"service_id"`
Configs map[string]string `json:"configs"`
// When determining the startup sequence of services, you need to know the services they depend on
DepServiceIDInBootSeq []string `json:"dep_service_ids_in_boot_seq"`
}
//BuildMQBodyFrom -

View File

@ -184,6 +184,7 @@ type TenantServiceRelationDao interface {
Dao
DelDao
GetTenantServiceRelations(serviceID string) ([]*model.TenantServiceRelation, error)
ListByServiceIDs(serviceIDs []string) ([]*model.TenantServiceRelation, error)
GetTenantServiceRelationsByDependServiceID(dependServiceID string) ([]*model.TenantServiceRelation, error)
HaveRelations(serviceID string) bool
DELRelationsByServiceID(serviceID string) error

View File

@ -711,6 +711,16 @@ func (t *TenantServiceRelationDaoImpl) GetTenantServiceRelations(serviceID strin
return oldRelation, nil
}
// ListByServiceIDs -
func (t *TenantServiceRelationDaoImpl) ListByServiceIDs(serviceIDs []string) ([]*model.TenantServiceRelation, error) {
var relations []*model.TenantServiceRelation
if err := t.DB.Where("service_id in (?)", serviceIDs).Find(&relations).Error; err != nil {
return nil, err
}
return relations, nil
}
//HaveRelations 是否有依赖
func (t *TenantServiceRelationDaoImpl) HaveRelations(serviceID string) bool {
var oldRelation []*model.TenantServiceRelation

View File

@ -157,7 +157,8 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1
meshPluginID = pluginID
}
}
if as.NeedProxy && !udpDep && strings.ToLower(as.ExtensionSet["startup_sequence"]) == "true" {
if bootSeqDepServiceIds := as.ExtensionSet["boot_seq_dep_service_ids"]; as.NeedProxy && !udpDep && bootSeqDepServiceIds != "" {
initContainers = append(initContainers, createProbeMeshInitContainer(as.ServiceID, meshPluginID, as.ServiceAlias, mainContainer.Env))
}
return initContainers, containers, nil

View File

@ -171,6 +171,10 @@ func createEnv(as *v1.AppService, dbmanager db.Manager) (*[]corev1.EnvVar, error
if err != nil {
return nil, err
}
bootSeqDepServiceIDs := as.ExtensionSet["boot_seq_dep_service_ids"]
logrus.Infof("boot sequence dep service ids: %s", bootSeqDepServiceIDs)
if relations != nil && len(relations) > 0 {
var relationIDs []string
for _, r := range relations {
@ -213,8 +217,10 @@ func createEnv(as *v1.AppService, dbmanager db.Manager) (*[]corev1.EnvVar, error
continue
}
clusterName := fmt.Sprintf("%s_%s_%s_%d", as.TenantID, as.ServiceAlias, depServiceAlias, port.ContainerPort)
clusterNames = append(clusterNames, clusterName)
if bootSeqDepServiceIDs != "" && strings.Contains(bootSeqDepServiceIDs, port.ServiceID) {
clusterName := fmt.Sprintf("%s_%s_%s_%d", as.TenantID, as.ServiceAlias, depServiceAlias, port.ContainerPort)
clusterNames = append(clusterNames, clusterName)
}
}
envs = append(envs, corev1.EnvVar{Name: "DEPEND_SERVICE_CLUSTER_NAMES", Value: strings.Join(clusterNames, ",")})

View File

@ -187,6 +187,8 @@ type StartTaskBody struct {
DeployVersion string `json:"deploy_version"`
EventID string `json:"event_id"`
Configs map[string]string `json:"configs"`
// When determining the startup sequence of services, you need to know the services they depend on
DepServiceIDInBootSeq []string `json:"dep_service_ids_in_boot_seq"`
}
//StopTaskBody 停止操作任务主体