mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Allow querycoord executor load sealed segment with no index (#25902)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
3735a097e6
commit
a669440ee9
@ -21,6 +21,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
"go.uber.org/atomic"
|
||||
@ -269,8 +270,11 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
|
||||
segment := resp.GetInfos()[0]
|
||||
indexes, err := ex.broker.GetIndexInfo(ctx, task.CollectionID(), segment.GetID())
|
||||
if err != nil {
|
||||
log.Warn("failed to get index of segment", zap.Error(err))
|
||||
return err
|
||||
if !errors.Is(err, merr.ErrIndexNotFound) {
|
||||
log.Warn("failed to get index of segment", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
indexes = nil
|
||||
}
|
||||
|
||||
readableVersion := int64(0)
|
||||
|
@ -158,6 +158,7 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) {
|
||||
switch testName {
|
||||
case "TestSubscribeChannelTask",
|
||||
"TestLoadSegmentTask",
|
||||
"TestLoadSegmentTaskNotIndex",
|
||||
"TestLoadSegmentTaskFailed",
|
||||
"TestSegmentTaskStale",
|
||||
"TestTaskCanceled",
|
||||
@ -453,6 +454,100 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() {
|
||||
ctx := context.Background()
|
||||
timeout := 10 * time.Second
|
||||
targetNode := int64(3)
|
||||
partition := int64(100)
|
||||
channel := &datapb.VchannelInfo{
|
||||
CollectionID: suite.collection,
|
||||
ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-test",
|
||||
}
|
||||
|
||||
// Expect
|
||||
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, suite.collection).Return(&schemapb.CollectionSchema{
|
||||
Name: "TestLoadSegmentTask",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector},
|
||||
},
|
||||
}, nil)
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
|
||||
{
|
||||
CollectionID: suite.collection,
|
||||
},
|
||||
}, nil)
|
||||
for _, segment := range suite.loadSegments {
|
||||
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{
|
||||
{
|
||||
ID: segment,
|
||||
CollectionID: suite.collection,
|
||||
PartitionID: partition,
|
||||
InsertChannel: channel.ChannelName,
|
||||
}},
|
||||
}, nil)
|
||||
suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, merr.WrapErrIndexNotFound())
|
||||
}
|
||||
suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(merr.Status(nil), nil)
|
||||
|
||||
// Test load segment task
|
||||
suite.dist.ChannelDistManager.Update(targetNode, meta.DmChannelFromVChannel(&datapb.VchannelInfo{
|
||||
CollectionID: suite.collection,
|
||||
ChannelName: channel.ChannelName,
|
||||
}))
|
||||
tasks := []Task{}
|
||||
segments := make([]*datapb.SegmentInfo, 0)
|
||||
for _, segment := range suite.loadSegments {
|
||||
segments = append(segments, &datapb.SegmentInfo{
|
||||
ID: segment,
|
||||
InsertChannel: channel.ChannelName,
|
||||
PartitionID: 1,
|
||||
})
|
||||
task, err := NewSegmentTask(
|
||||
ctx,
|
||||
timeout,
|
||||
0,
|
||||
suite.collection,
|
||||
suite.replica,
|
||||
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
|
||||
)
|
||||
suite.NoError(err)
|
||||
tasks = append(tasks, task)
|
||||
err = suite.scheduler.Add(task)
|
||||
suite.NoError(err)
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
|
||||
suite.target.UpdateCollectionNextTargetWithPartitions(suite.collection, int64(1))
|
||||
segmentsNum := len(suite.loadSegments)
|
||||
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
|
||||
|
||||
// Process tasks
|
||||
suite.dispatchAndWait(targetNode)
|
||||
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
|
||||
|
||||
// Process tasks done
|
||||
// Dist contains channels
|
||||
view := &meta.LeaderView{
|
||||
ID: targetNode,
|
||||
CollectionID: suite.collection,
|
||||
Segments: map[int64]*querypb.SegmentDist{},
|
||||
}
|
||||
for _, segment := range suite.loadSegments {
|
||||
view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
|
||||
}
|
||||
distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment {
|
||||
return meta.SegmentFromInfo(info)
|
||||
})
|
||||
suite.dist.LeaderViewManager.Update(targetNode, view)
|
||||
suite.dist.SegmentDistManager.Update(targetNode, distSegments...)
|
||||
suite.dispatchAndWait(targetNode)
|
||||
suite.AssertTaskNum(0, 0, 0, 0)
|
||||
|
||||
for _, task := range tasks {
|
||||
suite.Equal(TaskStatusSucceeded, task.Status())
|
||||
suite.NoError(task.Err())
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
|
||||
ctx := context.Background()
|
||||
timeout := 10 * time.Second
|
||||
|
Loading…
Reference in New Issue
Block a user