mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
IndexNode dose not need to wait for IndexCoord to start to comlete (#7074)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
d909af15ad
commit
382fa6f274
2
go.mod
2
go.mod
@ -20,7 +20,7 @@ require (
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||
github.com/jarcoal/httpmock v1.0.8
|
||||
github.com/klauspost/compress v1.10.11 // indirect
|
||||
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 // indirect
|
||||
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
|
||||
github.com/minio/minio-go/v7 v7.0.10
|
||||
github.com/mitchellh/mapstructure v1.1.2
|
||||
github.com/opentracing/opentracing-go v1.2.0
|
||||
|
@ -71,6 +71,7 @@ func (s *Server) init() error {
|
||||
s.closer = closer
|
||||
|
||||
if err := s.indexcoord.Register(); err != nil {
|
||||
log.Error("IndexCoord", zap.Any("register session error", err))
|
||||
return err
|
||||
}
|
||||
|
||||
@ -78,11 +79,13 @@ func (s *Server) init() error {
|
||||
go s.startGrpcLoop(Params.ServicePort)
|
||||
// wait for grpc IndexCoord loop start
|
||||
if err := <-s.grpcErrChan; err != nil {
|
||||
log.Error("IndexCoord", zap.Any("init error", err))
|
||||
return err
|
||||
}
|
||||
s.indexcoord.UpdateStateCode(internalpb.StateCode_Initializing)
|
||||
|
||||
if err := s.indexcoord.Init(); err != nil {
|
||||
log.Error("IndexCoord", zap.Any("init error", err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -147,7 +150,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
||||
|
||||
defer s.loopWg.Done()
|
||||
|
||||
log.Debug("IndexCoord", zap.Int("network port", grpcPort))
|
||||
log.Debug("IndexCoord", zap.String("network address", Params.ServiceAddress), zap.Int("network port", grpcPort))
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
|
||||
if err != nil {
|
||||
log.Warn("IndexCoord", zap.String("GrpcServer:failed to listen", err.Error()))
|
||||
|
@ -22,14 +22,12 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
|
||||
grpcindexcoordclient "github.com/milvus-io/milvus/internal/distributed/indexcoord/client"
|
||||
"github.com/milvus-io/milvus/internal/indexnode"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/trace"
|
||||
"google.golang.org/grpc"
|
||||
@ -41,10 +39,9 @@ type Server struct {
|
||||
grpcServer *grpc.Server
|
||||
grpcErrChan chan error
|
||||
|
||||
indexCoordClient types.IndexCoord
|
||||
loopCtx context.Context
|
||||
loopCancel func()
|
||||
loopWg sync.WaitGroup
|
||||
loopCtx context.Context
|
||||
loopCancel func()
|
||||
loopWg sync.WaitGroup
|
||||
|
||||
closer io.Closer
|
||||
}
|
||||
@ -65,7 +62,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
||||
|
||||
defer s.loopWg.Done()
|
||||
|
||||
log.Debug("IndexNode", zap.Int("network port: ", grpcPort))
|
||||
log.Debug("IndexNode", zap.String("network address", Params.Address), zap.Int("network port: ", grpcPort))
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
|
||||
if err != nil {
|
||||
log.Warn("IndexNode", zap.String("GrpcServer:failed to listen", err.Error()))
|
||||
@ -114,14 +111,14 @@ func (s *Server) init() error {
|
||||
if err != nil {
|
||||
err = s.Stop()
|
||||
if err != nil {
|
||||
log.Debug("IndexNode Init failed, and Stop failed")
|
||||
log.Error("IndexNode Init failed, and Stop failed")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err = s.indexnode.Register()
|
||||
if err != nil {
|
||||
log.Debug("IndexNode Register etcd failed", zap.Error(err))
|
||||
log.Error("IndexNode Register etcd failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("IndexNode Register etcd success")
|
||||
@ -131,26 +128,15 @@ func (s *Server) init() error {
|
||||
// wait for grpc server loop start
|
||||
err = <-s.grpcErrChan
|
||||
if err != nil {
|
||||
log.Error("IndexNode", zap.Any("grpc error", err))
|
||||
return err
|
||||
}
|
||||
|
||||
s.indexCoordClient, err = grpcindexcoordclient.NewClient(s.loopCtx, indexnode.Params.MetaRootPath, indexnode.Params.EtcdEndpoints)
|
||||
if err != nil {
|
||||
log.Debug("New indexCoordeClient failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
err = s.indexCoordClient.Init()
|
||||
if err != nil {
|
||||
log.Debug("IndexNode indexCoordeClient init failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
s.indexnode.SetIndexCoordClient(s.indexCoordClient)
|
||||
|
||||
s.indexnode.UpdateStateCode(internalpb.StateCode_Initializing)
|
||||
log.Debug("IndexNode", zap.Any("State", internalpb.StateCode_Initializing))
|
||||
err = s.indexnode.Init()
|
||||
if err != nil {
|
||||
log.Debug("IndexNode Init failed", zap.Error(err))
|
||||
log.Error("IndexNode Init failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -516,11 +516,13 @@ func (i *IndexCoord) watchNodeLoop() {
|
||||
serverID := event.Session.ServerID
|
||||
log.Debug("IndexCoord watchNodeLoop SessionAddEvent", zap.Any("serverID", serverID),
|
||||
zap.Any("address", event.Session.Address))
|
||||
err := i.nodeManager.AddNode(serverID, event.Session.Address)
|
||||
if err != nil {
|
||||
log.Debug("IndexCoord", zap.Any("Add IndexNode err", err))
|
||||
}
|
||||
log.Debug("IndexCoord", zap.Any("IndexNode number", len(i.nodeManager.nodeClients)))
|
||||
go func() {
|
||||
err := i.nodeManager.AddNode(serverID, event.Session.Address)
|
||||
if err != nil {
|
||||
log.Error("IndexCoord", zap.Any("Add IndexNode err", err))
|
||||
}
|
||||
log.Debug("IndexCoord", zap.Any("IndexNode number", len(i.nodeManager.nodeClients)))
|
||||
}()
|
||||
case sessionutil.SessionDelEvent:
|
||||
serverID := event.Session.ServerID
|
||||
log.Debug("IndexCoord watchNodeLoop SessionDelEvent", zap.Any("serverID", serverID))
|
||||
|
@ -29,7 +29,6 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/internal/util/trace"
|
||||
@ -50,8 +49,6 @@ type IndexNode struct {
|
||||
kv kv.BaseKV
|
||||
session *sessionutil.Session
|
||||
|
||||
serviceClient types.IndexCoord // method factory
|
||||
|
||||
// Add callback functions at different stages
|
||||
startCallbacks []func()
|
||||
closeCallbacks []func()
|
||||
@ -148,10 +145,6 @@ func (i *IndexNode) UpdateStateCode(code internalpb.StateCode) {
|
||||
i.stateCode.Store(code)
|
||||
}
|
||||
|
||||
func (i *IndexNode) SetIndexCoordClient(serviceClient types.IndexCoord) {
|
||||
i.serviceClient = serviceClient
|
||||
}
|
||||
|
||||
func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
|
||||
if i.stateCode.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy {
|
||||
return &commonpb.Status{
|
||||
|
Loading…
Reference in New Issue
Block a user