fix: Leader checker can't update segment's load version (#31040)

issue: #30890

when leader checker find that leader view has an older load version of
segment, it will try to correct leader view. but the sync action doesn't
specify the latest load version. so the update operation will failed.

This PR fix leader checker can't update segment's load version and
keeping generate same task to scheduler.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-03-08 11:57:01 +08:00 committed by GitHub
parent cc51ab9cd9
commit 22df5061c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 24 additions and 75 deletions

View File

@ -121,24 +121,17 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica int6
ret := make([]task.Task, 0)
dist = utils.FindMaxVersionSegments(dist)
for _, s := range dist {
version, ok := leaderView.Segments[s.GetID()]
currentTarget := c.target.GetSealedSegment(s.CollectionID, s.GetID(), meta.CurrentTarget)
existInCurrentTarget := currentTarget != nil
existInNextTarget := c.target.GetSealedSegment(s.CollectionID, s.GetID(), meta.NextTarget) != nil
if !existInCurrentTarget && !existInNextTarget {
existInTarget := c.target.GetSealedSegment(leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst) != nil
if !existInTarget {
continue
}
leaderWithOldVersion := version.GetVersion() < s.Version
// leader has newer version, but the query node which loaded the newer version has been shutdown
leaderWithDirtyVersion := version.GetVersion() > s.Version && c.nodeMgr.Get(version.GetNodeID()) == nil
if !ok || leaderWithOldVersion || leaderWithDirtyVersion {
version, ok := leaderView.Segments[s.GetID()]
if !ok || version.GetVersion() < s.Version {
log.RatedDebug(10, "leader checker append a segment to set",
zap.Int64("segmentID", s.GetID()),
zap.Int64("nodeID", s.Node))
action := task.NewLeaderAction(leaderView.ID, s.Node, task.ActionTypeGrow, s.GetInsertChannel(), s.GetID())
action := task.NewLeaderAction(leaderView.ID, s.Node, task.ActionTypeGrow, s.GetInsertChannel(), s.GetID(), s.Version)
t := task.NewLeaderTask(
ctx,
params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
@ -173,15 +166,14 @@ func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica int
for sid, s := range leaderView.Segments {
_, ok := distMap[sid]
existInCurrentTarget := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil
existInNextTarget := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.NextTarget) != nil
if ok || existInCurrentTarget || existInNextTarget {
existInTarget := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.CurrentTargetFirst) != nil
if ok || existInTarget {
continue
}
log.Debug("leader checker append a segment to remove",
zap.Int64("segmentID", sid),
zap.Int64("nodeID", s.NodeID))
action := task.NewLeaderAction(leaderView.ID, s.NodeID, task.ActionTypeReduce, leaderView.Channel, sid)
action := task.NewLeaderAction(leaderView.ID, s.NodeID, task.ActionTypeReduce, leaderView.Channel, sid, 0)
t := task.NewLeaderTask(
ctx,
paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),

View File

@ -19,6 +19,7 @@ package checkers
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -27,7 +28,6 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
@ -107,7 +107,8 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
suite.Len(tasks, 0)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"))
loadVersion := time.Now().UnixMilli()
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, loadVersion, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
@ -120,6 +121,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), loadVersion)
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
}
@ -240,59 +242,6 @@ func (suite *LeaderCheckerTestSuite) TestIgnoreSyncLoadedSegments() {
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
}
func (suite *LeaderCheckerTestSuite) TestIgnoreBalancedSegment() {
observer := suite.checker
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
// dist with older version and leader view with newer version
leaderView := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
leaderView.Segments[1] = &querypb.SegmentDist{
NodeID: 2,
Version: 2,
}
leaderView.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, leaderView)
// test querynode-1 and querynode-2 exist
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
tasks := suite.checker.Check(context.TODO())
suite.Len(tasks, 0)
// test querynode-2 crash
suite.nodeMgr.Remove(2)
tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Equal(tasks[0].Source(), utils.LeaderChecker)
suite.Len(tasks[0].Actions(), 1)
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
}
func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegmentsWithReplicas() {
observer := suite.checker
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
@ -369,6 +318,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() {
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeReduce)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(3))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), int64(0))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
}

View File

@ -166,16 +166,18 @@ type LeaderAction struct {
leaderID typeutil.UniqueID
segmentID typeutil.UniqueID
version typeutil.UniqueID // segment load ts, 0 means not set
rpcReturned atomic.Bool
}
func NewLeaderAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID) *LeaderAction {
func NewLeaderAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, version typeutil.UniqueID) *LeaderAction {
action := &LeaderAction{
BaseAction: NewBaseAction(workerID, typ, shard),
leaderID: leaderID,
segmentID: segmentID,
version: version,
}
action.rpcReturned.Store(false)
return action
@ -185,6 +187,10 @@ func (action *LeaderAction) SegmentID() typeutil.UniqueID {
return action.segmentID
}
func (action *LeaderAction) Version() typeutil.UniqueID {
return action.version
}
func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool {
views := distMgr.LeaderViewManager.GetLeaderView(action.leaderID)
view := views[action.Shard()]

View File

@ -471,6 +471,7 @@ func (ex *Executor) setDistribution(task *LeaderTask, step int) error {
SegmentID: action.SegmentID(),
NodeID: action.Node(),
Info: loadInfo,
Version: action.Version(),
},
},
IndexInfoList: indexInfo,

View File

@ -1290,7 +1290,7 @@ func (suite *TaskSuite) TestLeaderTaskSet() {
suite.collection,
suite.replica,
targetNode,
NewLeaderAction(targetNode, targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
NewLeaderAction(targetNode, targetNode, ActionTypeGrow, channel.GetChannelName(), segment, 0),
)
tasks = append(tasks, task)
err := suite.scheduler.Add(task)
@ -1370,7 +1370,7 @@ func (suite *TaskSuite) TestCreateTaskBehavior() {
suite.ErrorIs(err, merr.ErrParameterInvalid)
suite.Nil(segmentTask)
leaderAction := NewLeaderAction(1, 2, ActionTypeGrow, "fake-channel1", 100)
leaderAction := NewLeaderAction(1, 2, ActionTypeGrow, "fake-channel1", 100, 0)
leaderTask := NewLeaderTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, 0, 1, leaderAction)
suite.NotNil(leaderTask)
}
@ -1555,7 +1555,7 @@ func (suite *TaskSuite) TestLeaderTaskRemove() {
suite.collection,
suite.replica,
targetNode,
NewLeaderAction(targetNode, targetNode, ActionTypeReduce, channel.GetChannelName(), segment),
NewLeaderAction(targetNode, targetNode, ActionTypeReduce, channel.GetChannelName(), segment, 0),
)
tasks = append(tasks, task)
err := suite.scheduler.Add(task)