mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
fix: get latest collection info when checking index (#31744)
issue: https://github.com/milvus-io/milvus/issues/31727 --------- Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
parent
bd853be8c7
commit
91cb529ba6
@ -23,6 +23,7 @@ import (
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
@ -75,6 +76,12 @@ func (c *IndexChecker) Check(ctx context.Context) []task.Task {
|
||||
var tasks []task.Task
|
||||
|
||||
for _, collectionID := range collectionIDs {
|
||||
indexInfos, err := c.broker.ListIndexes(ctx, collectionID)
|
||||
if err != nil {
|
||||
log.Warn("failed to list indexes", zap.Int64("collection", collectionID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
collection := c.meta.CollectionManager.GetCollection(collectionID)
|
||||
if collection == nil {
|
||||
log.Warn("collection released during check index", zap.Int64("collection", collectionID))
|
||||
@ -82,14 +89,14 @@ func (c *IndexChecker) Check(ctx context.Context) []task.Task {
|
||||
}
|
||||
replicas := c.meta.ReplicaManager.GetByCollection(collectionID)
|
||||
for _, replica := range replicas {
|
||||
tasks = append(tasks, c.checkReplica(ctx, collection, replica)...)
|
||||
tasks = append(tasks, c.checkReplica(ctx, collection, replica, indexInfos)...)
|
||||
}
|
||||
}
|
||||
|
||||
return tasks
|
||||
}
|
||||
|
||||
func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collection, replica *meta.Replica) []task.Task {
|
||||
func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collection, replica *meta.Replica, indexInfos []*indexpb.IndexInfo) []task.Task {
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("collectionID", collection.GetCollectionID()),
|
||||
)
|
||||
@ -104,7 +111,7 @@ func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collec
|
||||
if ok, _ := c.nodeMgr.IsStoppingNode(segment.Node); ok {
|
||||
continue
|
||||
}
|
||||
missing := c.checkSegment(ctx, segment, collection)
|
||||
missing := c.checkSegment(ctx, segment, indexInfos)
|
||||
if len(missing) > 0 {
|
||||
targets[segment.GetID()] = missing
|
||||
idSegments[segment.GetID()] = segment
|
||||
@ -135,9 +142,10 @@ func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collec
|
||||
return tasks
|
||||
}
|
||||
|
||||
func (c *IndexChecker) checkSegment(ctx context.Context, segment *meta.Segment, collection *meta.Collection) (fieldIDs []int64) {
|
||||
func (c *IndexChecker) checkSegment(ctx context.Context, segment *meta.Segment, indexInfos []*indexpb.IndexInfo) (fieldIDs []int64) {
|
||||
var result []int64
|
||||
for fieldID, indexID := range collection.GetFieldIndexID() {
|
||||
for _, indexInfo := range indexInfos {
|
||||
fieldID, indexID := indexInfo.FieldID, indexInfo.IndexID
|
||||
info, ok := segment.IndexInfo[fieldID]
|
||||
if !ok {
|
||||
result = append(result, fieldID)
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
@ -114,6 +115,13 @@ func (suite *IndexCheckerSuite) TestLoadIndex() {
|
||||
},
|
||||
}, nil)
|
||||
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{
|
||||
{
|
||||
FieldID: 101,
|
||||
IndexID: 1000,
|
||||
},
|
||||
}, nil)
|
||||
|
||||
tasks := checker.Check(context.Background())
|
||||
suite.Require().Len(tasks, 1)
|
||||
|
||||
@ -182,6 +190,13 @@ func (suite *IndexCheckerSuite) TestIndexInfoNotMatch() {
|
||||
return nil
|
||||
}, nil)
|
||||
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{
|
||||
{
|
||||
FieldID: 101,
|
||||
IndexID: 1000,
|
||||
},
|
||||
}, nil)
|
||||
|
||||
tasks := checker.Check(context.Background())
|
||||
suite.Require().Len(tasks, 0)
|
||||
}
|
||||
@ -214,11 +229,86 @@ func (suite *IndexCheckerSuite) TestGetIndexInfoFailed() {
|
||||
// broker
|
||||
suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), mock.AnythingOfType("int64")).
|
||||
Return(nil, errors.New("mocked error"))
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{
|
||||
{
|
||||
FieldID: 101,
|
||||
IndexID: 1000,
|
||||
},
|
||||
}, nil)
|
||||
|
||||
tasks := checker.Check(context.Background())
|
||||
suite.Require().Len(tasks, 0)
|
||||
}
|
||||
|
||||
func (suite *IndexCheckerSuite) TestCreateNewIndex() {
|
||||
checker := suite.checker
|
||||
|
||||
// meta
|
||||
coll := utils.CreateTestCollection(1, 1)
|
||||
coll.FieldIndexID = map[int64]int64{101: 1000}
|
||||
checker.meta.CollectionManager.PutCollection(coll)
|
||||
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: 1,
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: 2,
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
|
||||
|
||||
// dist
|
||||
segment := utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel")
|
||||
segment.IndexInfo = map[int64]*querypb.FieldIndexInfo{101: {
|
||||
FieldID: 101,
|
||||
IndexID: 1000,
|
||||
EnableIndex: true,
|
||||
}}
|
||||
checker.dist.SegmentDistManager.Update(1, segment)
|
||||
|
||||
// broker
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Call.Return(
|
||||
func(ctx context.Context, collectionID int64) ([]*indexpb.IndexInfo, error) {
|
||||
return []*indexpb.IndexInfo{
|
||||
{
|
||||
FieldID: 101,
|
||||
IndexID: 1000,
|
||||
},
|
||||
{
|
||||
FieldID: 102,
|
||||
IndexID: 1001,
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
)
|
||||
suite.broker.EXPECT().GetIndexInfo(mock.Anything, mock.Anything, mock.AnythingOfType("int64")).Call.
|
||||
Return(func(ctx context.Context, collectionID, segmentID int64) []*querypb.FieldIndexInfo {
|
||||
return []*querypb.FieldIndexInfo{
|
||||
{
|
||||
FieldID: 101,
|
||||
IndexID: 1000,
|
||||
EnableIndex: true,
|
||||
IndexFilePaths: []string{"index"},
|
||||
},
|
||||
{
|
||||
FieldID: 102,
|
||||
IndexID: 1001,
|
||||
EnableIndex: true,
|
||||
IndexFilePaths: []string{"index"},
|
||||
},
|
||||
}
|
||||
}, nil)
|
||||
|
||||
tasks := checker.Check(context.Background())
|
||||
suite.Len(tasks, 1)
|
||||
suite.Len(tasks[0].Actions(), 1)
|
||||
suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).Type(), task.ActionTypeUpdate)
|
||||
}
|
||||
|
||||
func TestIndexChecker(t *testing.T) {
|
||||
suite.Run(t, new(IndexCheckerSuite))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user