diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 550b575719..9956c5f42e 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/generic" "github.com/milvus-io/milvus/pkg/util/logutil" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -91,7 +92,7 @@ func runComponent[T component](ctx context.Context, runWg *sync.WaitGroup, creator func(context.Context, dependency.Factory) (T, error), metricRegister func(*prometheus.Registry), -) T { +) component { var role T sign := make(chan struct{}) @@ -118,6 +119,9 @@ func runComponent[T component](ctx context.Context, healthz.Register(role) metricRegister(Registry.GoRegistry) + if generic.IsZero(role) { + return nil + } return role } @@ -161,22 +165,22 @@ func (mr *MilvusRoles) printLDPreLoad() { } } -func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.RootCoord { +func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { wg.Add(1) return runComponent(ctx, localMsg, wg, components.NewRootCoord, metrics.RegisterRootCoord) } -func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.Proxy { +func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { wg.Add(1) return runComponent(ctx, localMsg, wg, components.NewProxy, metrics.RegisterProxy) } -func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.QueryCoord { +func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { wg.Add(1) return runComponent(ctx, localMsg, wg, components.NewQueryCoord, metrics.RegisterQueryCoord) } -func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.QueryNode { +func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { wg.Add(1) rootPath := paramtable.Get().LocalStorageCfg.Path.GetValue() queryDataLocalPath := filepath.Join(rootPath, typeutil.QueryNodeRole) @@ -185,22 +189,22 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode) } -func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.DataCoord { +func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { wg.Add(1) return runComponent(ctx, localMsg, wg, components.NewDataCoord, metrics.RegisterDataCoord) } -func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.DataNode { +func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { wg.Add(1) return runComponent(ctx, localMsg, wg, components.NewDataNode, metrics.RegisterDataNode) } -func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.IndexCoord { +func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { wg.Add(1) return runComponent(ctx, localMsg, wg, components.NewIndexCoord, func(registry *prometheus.Registry) {}) } -func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.IndexNode { +func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { wg.Add(1) rootPath := paramtable.Get().LocalStorageCfg.Path.GetValue() indexDataLocalPath := filepath.Join(rootPath, typeutil.IndexNodeRole) @@ -326,46 +330,41 @@ func (mr *MilvusRoles) Run(alias string) { http.ServeHTTP() - var rc *components.RootCoord var wg sync.WaitGroup local := mr.Local + + var rootCoord, queryCoord, indexCoord, dataCoord component + var proxy, dataNode, indexNode, queryNode component if mr.EnableRootCoord { - rc = mr.runRootCoord(ctx, local, &wg) + rootCoord = mr.runRootCoord(ctx, local, &wg) } - var pn *components.Proxy if mr.EnableProxy { - pn = mr.runProxy(ctx, local, &wg) + proxy = mr.runProxy(ctx, local, &wg) } - var qs *components.QueryCoord if mr.EnableQueryCoord { - qs = mr.runQueryCoord(ctx, local, &wg) + queryCoord = mr.runQueryCoord(ctx, local, &wg) } - var qn *components.QueryNode if mr.EnableQueryNode { - qn = mr.runQueryNode(ctx, local, &wg) + queryNode = mr.runQueryNode(ctx, local, &wg) } - var ds *components.DataCoord if mr.EnableDataCoord { - ds = mr.runDataCoord(ctx, local, &wg) + dataCoord = mr.runDataCoord(ctx, local, &wg) } - var dn *components.DataNode if mr.EnableDataNode { - dn = mr.runDataNode(ctx, local, &wg) + dataNode = mr.runDataNode(ctx, local, &wg) } - var is *components.IndexCoord if mr.EnableIndexCoord { - is = mr.runIndexCoord(ctx, local, &wg) + indexCoord = mr.runIndexCoord(ctx, local, &wg) } - var in *components.IndexNode if mr.EnableIndexNode { - in = mr.runIndexNode(ctx, local, &wg) + indexNode = mr.runIndexNode(ctx, local, &wg) } wg.Wait() @@ -381,9 +380,11 @@ func (mr *MilvusRoles) Run(alias string) { // stop coordinators first // var component - coordinators := []component{rc, qs, ds, is} - for _, coord := range coordinators { + coordinators := []component{rootCoord, queryCoord, dataCoord, indexCoord} + for idx, coord := range coordinators { + log.Warn("stop processing") if coord != nil { + log.Warn("stop coord", zap.Int("idx", idx), zap.Any("coord", coord)) wg.Add(1) go func(coord component) { defer wg.Done() @@ -395,7 +396,7 @@ func (mr *MilvusRoles) Run(alias string) { log.Info("All coordinators have stopped") // stop nodes - nodes := []component{qn, in, dn} + nodes := []component{queryNode, indexNode, dataNode} for _, node := range nodes { if node != nil { wg.Add(1) @@ -409,8 +410,8 @@ func (mr *MilvusRoles) Run(alias string) { log.Info("All nodes have stopped") // stop proxy - if pn != nil { - pn.Stop() + if proxy != nil { + proxy.Stop() log.Info("proxy stopped") }