mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
fix: Leader checker can't remove segment from leader view (#30151)
issue: #30150 This PR fix three problems: 1. leader checker use wrong node id when generate release task, which cause the release task finished immediately 2. the release request generated by leader_checker doesn't set the `force` flag, the operation to clean leader view on delegator will fail. Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
e3b8c3f60b
commit
f69f65ff68
@ -66,6 +66,13 @@ func (c *LeaderChecker) Description() string {
|
||||
return "LeaderChecker checks the difference of leader view between dist, and try to correct it"
|
||||
}
|
||||
|
||||
func (c *LeaderChecker) readyToCheck(collectionID int64) bool {
|
||||
metaExist := (c.meta.GetCollection(collectionID) != nil)
|
||||
targetExist := c.target.IsNextTargetExist(collectionID) || c.target.IsCurrentTargetExist(collectionID)
|
||||
|
||||
return metaExist && targetExist
|
||||
}
|
||||
|
||||
func (c *LeaderChecker) Check(ctx context.Context) []task.Task {
|
||||
if !c.IsActive() {
|
||||
return nil
|
||||
@ -75,6 +82,9 @@ func (c *LeaderChecker) Check(ctx context.Context) []task.Task {
|
||||
tasks := make([]task.Task, 0)
|
||||
|
||||
for _, collectionID := range collectionIDs {
|
||||
if !c.readyToCheck(collectionID) {
|
||||
continue
|
||||
}
|
||||
collection := c.meta.CollectionManager.GetCollection(collectionID)
|
||||
if collection == nil {
|
||||
log.Warn("collection released during check leader", zap.Int64("collection", collectionID))
|
||||
@ -180,7 +190,7 @@ func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica int
|
||||
zap.Int64("segmentID", sid),
|
||||
zap.Int64("nodeID", s.NodeID))
|
||||
|
||||
action := task.NewSegmentActionWithScope(leaderView.ID, task.ActionTypeReduce, leaderView.Channel, sid, querypb.DataScope_Historical)
|
||||
action := task.NewSegmentActionWithScope(s.NodeID, task.ActionTypeReduce, leaderView.Channel, sid, querypb.DataScope_Historical)
|
||||
t, err := task.NewSegmentTask(
|
||||
ctx,
|
||||
paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
|
||||
|
@ -101,6 +101,10 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, segments, nil)
|
||||
|
||||
// before target ready, should skip check collection
|
||||
tasks := suite.checker.Check(context.TODO())
|
||||
suite.Len(tasks, 0)
|
||||
observer.target.UpdateCollectionNextTarget(int64(1))
|
||||
observer.target.UpdateCollectionCurrentTarget(1)
|
||||
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"))
|
||||
@ -109,7 +113,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
|
||||
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
|
||||
observer.dist.LeaderViewManager.Update(2, view)
|
||||
|
||||
tasks := suite.checker.Check(context.TODO())
|
||||
tasks = suite.checker.Check(context.TODO())
|
||||
suite.Len(tasks, 1)
|
||||
suite.Equal(tasks[0].Source(), utils.LeaderChecker)
|
||||
suite.Len(tasks[0].Actions(), 1)
|
||||
@ -353,7 +357,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() {
|
||||
observer.target.UpdateCollectionCurrentTarget(1)
|
||||
|
||||
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
|
||||
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2}, map[int64]*meta.Segment{})
|
||||
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 1}, map[int64]*meta.Segment{})
|
||||
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
|
||||
observer.dist.LeaderViewManager.Update(2, view)
|
||||
|
||||
@ -363,7 +367,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() {
|
||||
suite.Equal(tasks[0].ReplicaID(), int64(1))
|
||||
suite.Len(tasks[0].Actions(), 1)
|
||||
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeReduce)
|
||||
suite.Equal(tasks[0].Actions()[0].Node(), int64(2))
|
||||
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
|
||||
suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(3))
|
||||
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
|
||||
}
|
||||
|
@ -286,19 +286,7 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
|
||||
// to protect the version, which serves search/query
|
||||
req.NeedTransfer = true
|
||||
} else {
|
||||
var targetSegment *meta.Segment
|
||||
segments := ex.dist.SegmentDistManager.GetByNode(action.Node())
|
||||
for _, segment := range segments {
|
||||
if segment.GetID() == task.SegmentID() {
|
||||
targetSegment = segment
|
||||
break
|
||||
}
|
||||
}
|
||||
if targetSegment == nil {
|
||||
log.Info("segment to release not found in distribution")
|
||||
return
|
||||
}
|
||||
req.Shard = targetSegment.GetInsertChannel()
|
||||
req.Shard = task.shard
|
||||
|
||||
if ex.meta.CollectionManager.Exist(task.CollectionID()) {
|
||||
leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), req.GetShard())
|
||||
|
Loading…
Reference in New Issue
Block a user