From 91cb529ba6c3b9654789c1f6cade11424a379366 Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Tue, 2 Apr 2024 14:43:13 +0800 Subject: [PATCH] fix: get latest collection info when checking index (#31744) issue: https://github.com/milvus-io/milvus/issues/31727 --------- Signed-off-by: sunby --- .../querycoordv2/checkers/index_checker.go | 18 ++-- .../checkers/index_checker_test.go | 90 +++++++++++++++++++ 2 files changed, 103 insertions(+), 5 deletions(-) diff --git a/internal/querycoordv2/checkers/index_checker.go b/internal/querycoordv2/checkers/index_checker.go index 5969f2983d..4ffe71b4ab 100644 --- a/internal/querycoordv2/checkers/index_checker.go +++ b/internal/querycoordv2/checkers/index_checker.go @@ -23,6 +23,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/params" @@ -75,6 +76,12 @@ func (c *IndexChecker) Check(ctx context.Context) []task.Task { var tasks []task.Task for _, collectionID := range collectionIDs { + indexInfos, err := c.broker.ListIndexes(ctx, collectionID) + if err != nil { + log.Warn("failed to list indexes", zap.Int64("collection", collectionID), zap.Error(err)) + continue + } + collection := c.meta.CollectionManager.GetCollection(collectionID) if collection == nil { log.Warn("collection released during check index", zap.Int64("collection", collectionID)) @@ -82,14 +89,14 @@ func (c *IndexChecker) Check(ctx context.Context) []task.Task { } replicas := c.meta.ReplicaManager.GetByCollection(collectionID) for _, replica := range replicas { - tasks = append(tasks, c.checkReplica(ctx, collection, replica)...) + tasks = append(tasks, c.checkReplica(ctx, collection, replica, indexInfos)...) } } return tasks } -func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collection, replica *meta.Replica) []task.Task { +func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collection, replica *meta.Replica, indexInfos []*indexpb.IndexInfo) []task.Task { log := log.Ctx(ctx).With( zap.Int64("collectionID", collection.GetCollectionID()), ) @@ -104,7 +111,7 @@ func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collec if ok, _ := c.nodeMgr.IsStoppingNode(segment.Node); ok { continue } - missing := c.checkSegment(ctx, segment, collection) + missing := c.checkSegment(ctx, segment, indexInfos) if len(missing) > 0 { targets[segment.GetID()] = missing idSegments[segment.GetID()] = segment @@ -135,9 +142,10 @@ func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collec return tasks } -func (c *IndexChecker) checkSegment(ctx context.Context, segment *meta.Segment, collection *meta.Collection) (fieldIDs []int64) { +func (c *IndexChecker) checkSegment(ctx context.Context, segment *meta.Segment, indexInfos []*indexpb.IndexInfo) (fieldIDs []int64) { var result []int64 - for fieldID, indexID := range collection.GetFieldIndexID() { + for _, indexInfo := range indexInfos { + fieldID, indexID := indexInfo.FieldID, indexInfo.IndexID info, ok := segment.IndexInfo[fieldID] if !ok { result = append(result, fieldID) diff --git a/internal/querycoordv2/checkers/index_checker_test.go b/internal/querycoordv2/checkers/index_checker_test.go index 8ad379c219..6d930996bb 100644 --- a/internal/querycoordv2/checkers/index_checker_test.go +++ b/internal/querycoordv2/checkers/index_checker_test.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/params" @@ -114,6 +115,13 @@ func (suite *IndexCheckerSuite) TestLoadIndex() { }, }, nil) + suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{ + { + FieldID: 101, + IndexID: 1000, + }, + }, nil) + tasks := checker.Check(context.Background()) suite.Require().Len(tasks, 1) @@ -182,6 +190,13 @@ func (suite *IndexCheckerSuite) TestIndexInfoNotMatch() { return nil }, nil) + suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{ + { + FieldID: 101, + IndexID: 1000, + }, + }, nil) + tasks := checker.Check(context.Background()) suite.Require().Len(tasks, 0) } @@ -214,11 +229,86 @@ func (suite *IndexCheckerSuite) TestGetIndexInfoFailed() { // broker suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), mock.AnythingOfType("int64")). Return(nil, errors.New("mocked error")) + suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{ + { + FieldID: 101, + IndexID: 1000, + }, + }, nil) tasks := checker.Check(context.Background()) suite.Require().Len(tasks, 0) } +func (suite *IndexCheckerSuite) TestCreateNewIndex() { + checker := suite.checker + + // meta + coll := utils.CreateTestCollection(1, 1) + coll.FieldIndexID = map[int64]int64{101: 1000} + checker.meta.CollectionManager.PutCollection(coll) + checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) + + // dist + segment := utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel") + segment.IndexInfo = map[int64]*querypb.FieldIndexInfo{101: { + FieldID: 101, + IndexID: 1000, + EnableIndex: true, + }} + checker.dist.SegmentDistManager.Update(1, segment) + + // broker + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Call.Return( + func(ctx context.Context, collectionID int64) ([]*indexpb.IndexInfo, error) { + return []*indexpb.IndexInfo{ + { + FieldID: 101, + IndexID: 1000, + }, + { + FieldID: 102, + IndexID: 1001, + }, + }, nil + }, + ) + suite.broker.EXPECT().GetIndexInfo(mock.Anything, mock.Anything, mock.AnythingOfType("int64")).Call. + Return(func(ctx context.Context, collectionID, segmentID int64) []*querypb.FieldIndexInfo { + return []*querypb.FieldIndexInfo{ + { + FieldID: 101, + IndexID: 1000, + EnableIndex: true, + IndexFilePaths: []string{"index"}, + }, + { + FieldID: 102, + IndexID: 1001, + EnableIndex: true, + IndexFilePaths: []string{"index"}, + }, + } + }, nil) + + tasks := checker.Check(context.Background()) + suite.Len(tasks, 1) + suite.Len(tasks[0].Actions(), 1) + suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).Type(), task.ActionTypeUpdate) +} + func TestIndexChecker(t *testing.T) { suite.Run(t, new(IndexCheckerSuite)) }