From 19524a53446d61d467c01a34a18b19c0fb52b5e4 Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Thu, 17 Nov 2022 17:15:08 +0800 Subject: [PATCH] Fix nodeID mismatch at standalone mode (#20648) Signed-off-by: Enwei Jiao Signed-off-by: Enwei Jiao --- cmd/tools/migration/migration/runner.go | 2 +- internal/datacoord/server.go | 1 - internal/datanode/data_node.go | 1 - internal/indexcoord/index_coord.go | 42 +++++++++---------- internal/indexcoord/index_coord_test.go | 4 +- internal/indexcoord/task.go | 3 +- internal/indexnode/indexnode.go | 1 - internal/proxy/proxy.go | 1 - internal/querycoordv2/server.go | 2 - internal/querynode/query_node.go | 1 - internal/util/sessionutil/session_util.go | 38 ++++++++++++----- .../util/sessionutil/session_util_test.go | 24 +++++------ 12 files changed, 63 insertions(+), 57 deletions(-) diff --git a/cmd/tools/migration/migration/runner.go b/cmd/tools/migration/migration/runner.go index df106cab20..3156230f72 100644 --- a/cmd/tools/migration/migration/runner.go +++ b/cmd/tools/migration/migration/runner.go @@ -82,7 +82,7 @@ func (r *Runner) init() { r.initEtcdCli() r.session = sessionutil.NewSession(r.ctx, r.cfg.EtcdCfg.MetaRootPath, r.etcdCli, - sessionutil.WithCustomConfigEnable(), sessionutil.WithSessionTTL(60), sessionutil.WithSessionRetryTimes(30)) + sessionutil.WithCustomConfigEnable(), sessionutil.WithTTL(60), sessionutil.WithRetryTimes(30)) // address not important here. address := time.Now().String() r.address = address diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 79b4a6c234..48b28b0891 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -244,7 +244,6 @@ func (s *Server) initSession() error { } s.session.Init(typeutil.DataCoordRole, s.address, true, true) s.session.SetEnableActiveStandBy(s.enableActiveStandBy) - paramtable.SetNodeID(s.session.ServerID) return nil } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index d0dd722e64..808f3c0708 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -216,7 +216,6 @@ func (node *DataNode) initSession() error { return errors.New("failed to initialize session") } node.session.Init(typeutil.DataNodeRole, node.address, false, true) - paramtable.SetNodeID(node.session.ServerID) return nil } diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index c22eab3f25..3fac0521ef 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -79,9 +79,8 @@ type IndexCoord struct { loopCancel func() loopWg sync.WaitGroup - sched *TaskScheduler - session *sessionutil.Session - serverID UniqueID + sched *TaskScheduler + session *sessionutil.Session eventChan <-chan *sessionutil.SessionEvent @@ -164,7 +163,6 @@ func (i *IndexCoord) initSession() error { } i.session.Init(typeutil.IndexCoordRole, i.address, true, true) i.session.SetEnableActiveStandBy(i.enableActiveStandBy) - i.serverID = i.session.ServerID return nil } @@ -433,10 +431,10 @@ func (i *IndexCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.String // indexBuilder will find this task and assign it to IndexNode for execution. func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) { if !i.isHealthy() { - log.Warn(msgIndexCoordIsUnhealthy(i.serverID)) + log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID())) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgIndexCoordIsUnhealthy(i.serverID), + Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()), }, nil } log.Info("IndexCoord receive create index request", zap.Int64("CollectionID", req.CollectionID), @@ -495,11 +493,11 @@ func (i *IndexCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexSta zap.String("indexName", req.IndexName)) if !i.isHealthy() { - log.Warn(msgIndexCoordIsUnhealthy(i.serverID)) + log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID())) return &indexpb.GetIndexStateResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgIndexCoordIsUnhealthy(i.serverID), + Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()), }, }, nil } @@ -546,11 +544,11 @@ func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetS zap.String("indexName", req.IndexName), zap.Int64s("segIDs", req.GetSegmentIDs())) if !i.isHealthy() { - log.Warn(msgIndexCoordIsUnhealthy(i.serverID)) + log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID())) return &indexpb.GetSegmentIndexStateResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgIndexCoordIsUnhealthy(i.serverID), + Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()), }, }, nil } @@ -634,11 +632,11 @@ func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.Get log.RatedInfo(5, "IndexCoord receive GetIndexBuildProgress request", zap.Int64("collID", req.CollectionID), zap.String("indexName", req.IndexName)) if !i.isHealthy() { - log.Warn(msgIndexCoordIsUnhealthy(i.serverID)) + log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID())) return &indexpb.GetIndexBuildProgressResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgIndexCoordIsUnhealthy(i.serverID), + Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()), }, }, nil } @@ -710,10 +708,10 @@ func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexReques log.Info("IndexCoord DropIndex", zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", req.PartitionIDs), zap.String("indexName", req.IndexName)) if !i.isHealthy() { - log.Warn(msgIndexCoordIsUnhealthy(i.serverID)) + log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID())) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgIndexCoordIsUnhealthy(i.serverID), + Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()), }, nil } @@ -777,11 +775,11 @@ func (i *IndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInf log.RatedInfo(5, "IndexCoord GetIndexInfos", zap.Int64("collectionID", req.CollectionID), zap.String("indexName", req.GetIndexName()), zap.Int64s("segIDs", req.GetSegmentIDs())) if !i.isHealthy() { - log.Warn(msgIndexCoordIsUnhealthy(i.serverID)) + log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID())) return &indexpb.GetIndexInfoResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgIndexCoordIsUnhealthy(i.serverID), + Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()), }, SegmentInfo: nil, }, nil @@ -836,11 +834,11 @@ func (i *IndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInf func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) { log.RatedInfo(5, "IndexCoord DescribeIndex", zap.Int64("collectionID", req.CollectionID), zap.String("indexName", req.GetIndexName())) if !i.isHealthy() { - log.Warn(msgIndexCoordIsUnhealthy(i.serverID)) + log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID())) return &indexpb.DescribeIndexResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgIndexCoordIsUnhealthy(i.serverID), + Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()), }, IndexInfos: nil, }, nil @@ -937,7 +935,7 @@ func (i *IndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.Sho log.Info("IndexCoord.ShowConfigurations", zap.String("pattern", req.Pattern)) if !i.isHealthy() { log.Warn("IndexCoord.ShowConfigurations failed", - zap.Int64("nodeId", i.serverID), + zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("req", req.Pattern), zap.Error(errIndexCoordIsUnhealthy(paramtable.GetNodeID()))) @@ -955,15 +953,15 @@ func (i *IndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.Sho // GetMetrics gets the metrics info of IndexCoord. func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { - log.RatedInfo(5, "IndexCoord.GetMetrics", zap.Int64("node id", i.serverID), zap.String("req", req.Request)) + log.RatedInfo(5, "IndexCoord.GetMetrics", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("req", req.Request)) if !i.isHealthy() { - log.Warn(msgIndexCoordIsUnhealthy(i.serverID)) + log.Warn(msgIndexCoordIsUnhealthy(paramtable.GetNodeID())) return &milvuspb.GetMetricsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgIndexCoordIsUnhealthy(i.serverID), + Reason: msgIndexCoordIsUnhealthy(paramtable.GetNodeID()), }, Response: "", }, nil diff --git a/internal/indexcoord/index_coord_test.go b/internal/indexcoord/index_coord_test.go index 5af92d1d35..5e17db1e9f 100644 --- a/internal/indexcoord/index_coord_test.go +++ b/internal/indexcoord/index_coord_test.go @@ -530,9 +530,7 @@ func TestIndexCoord_GetComponentStates(t *testing.T) { func TestIndexCoord_UnHealthy(t *testing.T) { ctx := context.Background() - ic := &IndexCoord{ - serverID: 1, - } + ic := &IndexCoord{} ic.stateCode.Store(commonpb.StateCode_Abnormal) // Test IndexCoord function diff --git a/internal/indexcoord/task.go b/internal/indexcoord/task.go index 380e46a1b1..8434843708 100644 --- a/internal/indexcoord/task.go +++ b/internal/indexcoord/task.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/commonpbutil" + "github.com/milvus-io/milvus/internal/util/paramtable" ) type task interface { @@ -160,7 +161,7 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error { commonpbutil.WithMsgType(0), commonpbutil.WithMsgID(cit.indexID), commonpbutil.WithTimeStamp(cit.req.Timestamp), - commonpbutil.WithSourceID(cit.indexCoordClient.serverID), + commonpbutil.WithSourceID(paramtable.GetNodeID()), ), CollectionID: cit.req.CollectionID, PartitionID: -1, diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index e61440bb6f..c6b35e510e 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -176,7 +176,6 @@ func (i *IndexNode) initSession() error { return errors.New("failed to initialize session") } i.session.Init(typeutil.IndexNodeRole, i.address, false, true) - paramtable.SetNodeID(i.session.ServerID) return nil } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 6244ac77fa..7f3bf46af3 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -155,7 +155,6 @@ func (node *Proxy) initSession() error { return errors.New("new session failed, maybe etcd cannot be connected") } node.session.Init(typeutil.ProxyRole, node.address, false, true) - paramtable.SetNodeID(node.session.ServerID) return nil } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index d63c9582da..70106bda23 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -49,7 +49,6 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/metricsinfo" - "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -158,7 +157,6 @@ func (s *Server) Init() error { s.session.Init(typeutil.QueryCoordRole, s.address, true, true) s.enableActiveStandBy = Params.QueryCoordCfg.EnableActiveStandby s.session.SetEnableActiveStandBy(s.enableActiveStandBy) - paramtable.SetNodeID(s.session.ServerID) s.factory.Init(Params) // Init KV diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index d49367ee76..d9e5271a3c 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -149,7 +149,6 @@ func (node *QueryNode) initSession() error { return fmt.Errorf("session is nil, the etcd client connection may have failed") } node.session.Init(typeutil.QueryNodeRole, node.address, false, true) - paramtable.SetNodeID(node.session.ServerID) return nil } diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index d521b740bc..02c13b406c 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -104,6 +104,7 @@ type Session struct { useCustomConfig bool sessionTTL int64 sessionRetryTimes int64 + reuseNodeID bool } type SessionOption func(session *Session) @@ -112,14 +113,18 @@ func WithCustomConfigEnable() SessionOption { return func(session *Session) { session.useCustomConfig = true } } -func WithSessionTTL(ttl int64) SessionOption { +func WithTTL(ttl int64) SessionOption { return func(session *Session) { session.sessionTTL = ttl } } -func WithSessionRetryTimes(n int64) SessionOption { +func WithRetryTimes(n int64) SessionOption { return func(session *Session) { session.sessionRetryTimes = n } } +func WithResueNodeID(b bool) SessionOption { + return func(session *Session) { session.reuseNodeID = b } +} + func (s *Session) apply(opts ...SessionOption) { for _, opt := range opts { opt(s) @@ -184,12 +189,15 @@ func (s *Session) MarshalJSON() ([]byte, error) { // etcdEndpoints is to init etcdCli when NewSession func NewSession(ctx context.Context, metaRoot string, client *clientv3.Client, opts ...SessionOption) *Session { session := &Session{ - ctx: ctx, - metaRoot: metaRoot, - Version: common.Version, + ctx: ctx, + metaRoot: metaRoot, + Version: common.Version, + + // options useCustomConfig: false, sessionTTL: 60, sessionRetryTimes: 30, + reuseNodeID: true, } session.apply(opts...) @@ -241,6 +249,7 @@ func (s *Session) String() string { func (s *Session) Register() { ch, err := s.registerService() if err != nil { + log.Error("Register failed", zap.Error(err)) panic(err) } s.liveCh = s.processKeepAliveResponse(ch) @@ -253,12 +262,21 @@ func (s *Session) getServerID() (int64, error) { serverIDMu.Lock() defer serverIDMu.Unlock() - // Notice, For standalone, all process share the same nodeID. - nodeID := paramtable.GetNodeID() - if nodeID != 0 { - return nodeID, nil + log.Debug("getServerID", zap.Bool("reuse", s.reuseNodeID)) + if s.reuseNodeID { + // Notice, For standalone, all process share the same nodeID. + if nodeID := paramtable.GetNodeID(); nodeID != 0 { + return nodeID, nil + } } - return s.getServerIDWithKey(DefaultIDKey) + nodeID, err := s.getServerIDWithKey(DefaultIDKey) + if err != nil { + return nodeID, err + } + if s.reuseNodeID { + paramtable.SetNodeID(nodeID) + } + return nodeID, nil } func (s *Session) checkIDExist() { diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index faebac916d..b1336e3bfc 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -73,9 +73,7 @@ func TestGetServerIDConcurrently(t *testing.T) { go getIDFunc() } wg.Wait() - for i := 1; i <= 10; i++ { - assert.Contains(t, res, int64(i)) - } + assert.ElementsMatch(t, []int64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, res) } func TestInit(t *testing.T) { @@ -125,7 +123,7 @@ func TestUpdateSessions(t *testing.T) { var wg sync.WaitGroup var muList = sync.Mutex{} - s := NewSession(ctx, metaRoot, etcdCli) + s := NewSession(ctx, metaRoot, etcdCli, WithResueNodeID(false)) sessions, rev, err := s.GetSessions("test") assert.Nil(t, err) @@ -137,7 +135,7 @@ func TestUpdateSessions(t *testing.T) { getIDFunc := func() { etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints) require.NoError(t, err) - singleS := NewSession(ctx, metaRoot, etcdCli) + singleS := NewSession(ctx, metaRoot, etcdCli, WithResueNodeID(false)) singleS.Init("test", "testAddr", false, false) singleS.Register() muList.Lock() @@ -498,21 +496,21 @@ func (suite *SessionWithVersionSuite) SetupTest() { suite.metaRoot = "sessionWithVersion" suite.serverName = "sessionComp" - s1 := NewSession(ctx, suite.metaRoot, client) + s1 := NewSession(ctx, suite.metaRoot, client, WithResueNodeID(false)) s1.Version.Major, s1.Version.Minor, s1.Version.Patch = 0, 0, 0 s1.Init(suite.serverName, "s1", false, false) s1.Register() suite.sessions = append(suite.sessions, s1) - s2 := NewSession(ctx, suite.metaRoot, client) + s2 := NewSession(ctx, suite.metaRoot, client, WithResueNodeID(false)) s2.Version.Major, s2.Version.Minor, s2.Version.Patch = 2, 1, 0 s2.Init(suite.serverName, "s2", false, false) s2.Register() suite.sessions = append(suite.sessions, s2) - s3 := NewSession(ctx, suite.metaRoot, client) + s3 := NewSession(ctx, suite.metaRoot, client, WithResueNodeID(false)) s3.Version.Major, s3.Version.Minor, s3.Version.Patch = 2, 2, 0 s3.Version.Build = []string{"dev"} s3.Init(suite.serverName, "s3", false, false) @@ -538,7 +536,7 @@ func (suite *SessionWithVersionSuite) TearDownTest() { } func (suite *SessionWithVersionSuite) TestGetSessionsWithRangeVersion() { - s := NewSession(context.Background(), suite.metaRoot, suite.client) + s := NewSession(context.Background(), suite.metaRoot, suite.client, WithResueNodeID(false)) suite.Run(">1.0.0", func() { r, err := semver.ParseRange(">1.0.0") @@ -581,7 +579,7 @@ func (suite *SessionWithVersionSuite) TestGetSessionsWithRangeVersion() { } func (suite *SessionWithVersionSuite) TestWatchServicesWithVersionRange() { - s := NewSession(context.Background(), suite.metaRoot, suite.client) + s := NewSession(context.Background(), suite.metaRoot, suite.client, WithResueNodeID(false)) suite.Run(">1.0.0 <=2.1.0", func() { r, err := semver.ParseRange(">1.0.0 <=2.1.0") @@ -641,7 +639,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) { // register session 1, will be active ctx1 := context.Background() - s1 := NewSession(ctx1, metaRoot, etcdCli) + s1 := NewSession(ctx1, metaRoot, etcdCli, WithResueNodeID(false)) s1.Init("inittest", "testAddr", true, true) s1.SetEnableActiveStandBy(true) s1.Register() @@ -660,7 +658,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) { // register session 2, will be standby ctx2 := context.Background() - s2 := NewSession(ctx2, metaRoot, etcdCli) + s2 := NewSession(ctx2, metaRoot, etcdCli, WithResueNodeID(false)) s2.Init("inittest", "testAddr", true, true) s2.SetEnableActiveStandBy(true) s2.Register() @@ -704,7 +702,7 @@ func TestSessionEventType_String(t *testing.T) { func TestSession_apply(t *testing.T) { session := &Session{} - opts := []SessionOption{WithCustomConfigEnable(), WithSessionTTL(100), WithSessionRetryTimes(200)} + opts := []SessionOption{WithCustomConfigEnable(), WithTTL(100), WithRetryTimes(200)} session.apply(opts...) assert.True(t, session.useCustomConfig) assert.Equal(t, int64(100), session.sessionTTL)