mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Fix component pointer to component
interface never be nil caused panic (#27072)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
17375541a7
commit
f7b2ad6650
@ -37,6 +37,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/tracer"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/generic"
|
||||
"github.com/milvus-io/milvus/pkg/util/logutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
@ -91,7 +92,7 @@ func runComponent[T component](ctx context.Context,
|
||||
runWg *sync.WaitGroup,
|
||||
creator func(context.Context, dependency.Factory) (T, error),
|
||||
metricRegister func(*prometheus.Registry),
|
||||
) T {
|
||||
) component {
|
||||
var role T
|
||||
|
||||
sign := make(chan struct{})
|
||||
@ -118,6 +119,9 @@ func runComponent[T component](ctx context.Context,
|
||||
|
||||
healthz.Register(role)
|
||||
metricRegister(Registry.GoRegistry)
|
||||
if generic.IsZero(role) {
|
||||
return nil
|
||||
}
|
||||
return role
|
||||
}
|
||||
|
||||
@ -161,22 +165,22 @@ func (mr *MilvusRoles) printLDPreLoad() {
|
||||
}
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.RootCoord {
|
||||
func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
|
||||
wg.Add(1)
|
||||
return runComponent(ctx, localMsg, wg, components.NewRootCoord, metrics.RegisterRootCoord)
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.Proxy {
|
||||
func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
|
||||
wg.Add(1)
|
||||
return runComponent(ctx, localMsg, wg, components.NewProxy, metrics.RegisterProxy)
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.QueryCoord {
|
||||
func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
|
||||
wg.Add(1)
|
||||
return runComponent(ctx, localMsg, wg, components.NewQueryCoord, metrics.RegisterQueryCoord)
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.QueryNode {
|
||||
func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
|
||||
wg.Add(1)
|
||||
rootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()
|
||||
queryDataLocalPath := filepath.Join(rootPath, typeutil.QueryNodeRole)
|
||||
@ -185,22 +189,22 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync
|
||||
return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode)
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.DataCoord {
|
||||
func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
|
||||
wg.Add(1)
|
||||
return runComponent(ctx, localMsg, wg, components.NewDataCoord, metrics.RegisterDataCoord)
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.DataNode {
|
||||
func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
|
||||
wg.Add(1)
|
||||
return runComponent(ctx, localMsg, wg, components.NewDataNode, metrics.RegisterDataNode)
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.IndexCoord {
|
||||
func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
|
||||
wg.Add(1)
|
||||
return runComponent(ctx, localMsg, wg, components.NewIndexCoord, func(registry *prometheus.Registry) {})
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *components.IndexNode {
|
||||
func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
|
||||
wg.Add(1)
|
||||
rootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()
|
||||
indexDataLocalPath := filepath.Join(rootPath, typeutil.IndexNodeRole)
|
||||
@ -326,46 +330,41 @@ func (mr *MilvusRoles) Run(alias string) {
|
||||
|
||||
http.ServeHTTP()
|
||||
|
||||
var rc *components.RootCoord
|
||||
var wg sync.WaitGroup
|
||||
local := mr.Local
|
||||
|
||||
var rootCoord, queryCoord, indexCoord, dataCoord component
|
||||
var proxy, dataNode, indexNode, queryNode component
|
||||
if mr.EnableRootCoord {
|
||||
rc = mr.runRootCoord(ctx, local, &wg)
|
||||
rootCoord = mr.runRootCoord(ctx, local, &wg)
|
||||
}
|
||||
|
||||
var pn *components.Proxy
|
||||
if mr.EnableProxy {
|
||||
pn = mr.runProxy(ctx, local, &wg)
|
||||
proxy = mr.runProxy(ctx, local, &wg)
|
||||
}
|
||||
|
||||
var qs *components.QueryCoord
|
||||
if mr.EnableQueryCoord {
|
||||
qs = mr.runQueryCoord(ctx, local, &wg)
|
||||
queryCoord = mr.runQueryCoord(ctx, local, &wg)
|
||||
}
|
||||
|
||||
var qn *components.QueryNode
|
||||
if mr.EnableQueryNode {
|
||||
qn = mr.runQueryNode(ctx, local, &wg)
|
||||
queryNode = mr.runQueryNode(ctx, local, &wg)
|
||||
}
|
||||
|
||||
var ds *components.DataCoord
|
||||
if mr.EnableDataCoord {
|
||||
ds = mr.runDataCoord(ctx, local, &wg)
|
||||
dataCoord = mr.runDataCoord(ctx, local, &wg)
|
||||
}
|
||||
|
||||
var dn *components.DataNode
|
||||
if mr.EnableDataNode {
|
||||
dn = mr.runDataNode(ctx, local, &wg)
|
||||
dataNode = mr.runDataNode(ctx, local, &wg)
|
||||
}
|
||||
|
||||
var is *components.IndexCoord
|
||||
if mr.EnableIndexCoord {
|
||||
is = mr.runIndexCoord(ctx, local, &wg)
|
||||
indexCoord = mr.runIndexCoord(ctx, local, &wg)
|
||||
}
|
||||
|
||||
var in *components.IndexNode
|
||||
if mr.EnableIndexNode {
|
||||
in = mr.runIndexNode(ctx, local, &wg)
|
||||
indexNode = mr.runIndexNode(ctx, local, &wg)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
@ -381,9 +380,11 @@ func (mr *MilvusRoles) Run(alias string) {
|
||||
|
||||
// stop coordinators first
|
||||
// var component
|
||||
coordinators := []component{rc, qs, ds, is}
|
||||
for _, coord := range coordinators {
|
||||
coordinators := []component{rootCoord, queryCoord, dataCoord, indexCoord}
|
||||
for idx, coord := range coordinators {
|
||||
log.Warn("stop processing")
|
||||
if coord != nil {
|
||||
log.Warn("stop coord", zap.Int("idx", idx), zap.Any("coord", coord))
|
||||
wg.Add(1)
|
||||
go func(coord component) {
|
||||
defer wg.Done()
|
||||
@ -395,7 +396,7 @@ func (mr *MilvusRoles) Run(alias string) {
|
||||
log.Info("All coordinators have stopped")
|
||||
|
||||
// stop nodes
|
||||
nodes := []component{qn, in, dn}
|
||||
nodes := []component{queryNode, indexNode, dataNode}
|
||||
for _, node := range nodes {
|
||||
if node != nil {
|
||||
wg.Add(1)
|
||||
@ -409,8 +410,8 @@ func (mr *MilvusRoles) Run(alias string) {
|
||||
log.Info("All nodes have stopped")
|
||||
|
||||
// stop proxy
|
||||
if pn != nil {
|
||||
pn.Stop()
|
||||
if proxy != nil {
|
||||
proxy.Stop()
|
||||
log.Info("proxy stopped")
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user