diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index 002cd2df49..f57866791e 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -121,24 +121,17 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica int6 ret := make([]task.Task, 0) dist = utils.FindMaxVersionSegments(dist) for _, s := range dist { - version, ok := leaderView.Segments[s.GetID()] - currentTarget := c.target.GetSealedSegment(s.CollectionID, s.GetID(), meta.CurrentTarget) - existInCurrentTarget := currentTarget != nil - existInNextTarget := c.target.GetSealedSegment(s.CollectionID, s.GetID(), meta.NextTarget) != nil - - if !existInCurrentTarget && !existInNextTarget { + existInTarget := c.target.GetSealedSegment(leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst) != nil + if !existInTarget { continue } - leaderWithOldVersion := version.GetVersion() < s.Version - // leader has newer version, but the query node which loaded the newer version has been shutdown - leaderWithDirtyVersion := version.GetVersion() > s.Version && c.nodeMgr.Get(version.GetNodeID()) == nil - - if !ok || leaderWithOldVersion || leaderWithDirtyVersion { + version, ok := leaderView.Segments[s.GetID()] + if !ok || version.GetVersion() < s.Version { log.RatedDebug(10, "leader checker append a segment to set", zap.Int64("segmentID", s.GetID()), zap.Int64("nodeID", s.Node)) - action := task.NewLeaderAction(leaderView.ID, s.Node, task.ActionTypeGrow, s.GetInsertChannel(), s.GetID()) + action := task.NewLeaderAction(leaderView.ID, s.Node, task.ActionTypeGrow, s.GetInsertChannel(), s.GetID(), s.Version) t := task.NewLeaderTask( ctx, params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), @@ -173,15 +166,14 @@ func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica int for sid, s := range leaderView.Segments { _, ok := distMap[sid] - existInCurrentTarget := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil - existInNextTarget := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.NextTarget) != nil - if ok || existInCurrentTarget || existInNextTarget { + existInTarget := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.CurrentTargetFirst) != nil + if ok || existInTarget { continue } log.Debug("leader checker append a segment to remove", zap.Int64("segmentID", sid), zap.Int64("nodeID", s.NodeID)) - action := task.NewLeaderAction(leaderView.ID, s.NodeID, task.ActionTypeReduce, leaderView.Channel, sid) + action := task.NewLeaderAction(leaderView.ID, s.NodeID, task.ActionTypeReduce, leaderView.Channel, sid, 0) t := task.NewLeaderTask( ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), diff --git a/internal/querycoordv2/checkers/leader_checker_test.go b/internal/querycoordv2/checkers/leader_checker_test.go index fca0bf03ec..d8c5794492 100644 --- a/internal/querycoordv2/checkers/leader_checker_test.go +++ b/internal/querycoordv2/checkers/leader_checker_test.go @@ -19,6 +19,7 @@ package checkers import ( "context" "testing" + "time" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -27,7 +28,6 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" @@ -107,7 +107,8 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() { suite.Len(tasks, 0) observer.target.UpdateCollectionNextTarget(int64(1)) observer.target.UpdateCollectionCurrentTarget(1) - observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel")) + loadVersion := time.Now().UnixMilli() + observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, loadVersion, "test-insert-channel")) observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) @@ -120,6 +121,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() { suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow) suite.Equal(tasks[0].Actions()[0].Node(), int64(1)) suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1)) + suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), loadVersion) suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) } @@ -240,59 +242,6 @@ func (suite *LeaderCheckerTestSuite) TestIgnoreSyncLoadedSegments() { suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) } -func (suite *LeaderCheckerTestSuite) TestIgnoreBalancedSegment() { - observer := suite.checker - observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) - observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) - observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - }, - } - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - observer.target.UpdateCollectionNextTarget(int64(1)) - observer.target.UpdateCollectionCurrentTarget(1) - observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel")) - observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - - // dist with older version and leader view with newer version - leaderView := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) - leaderView.Segments[1] = &querypb.SegmentDist{ - NodeID: 2, - Version: 2, - } - leaderView.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) - observer.dist.LeaderViewManager.Update(2, leaderView) - - // test querynode-1 and querynode-2 exist - suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) - tasks := suite.checker.Check(context.TODO()) - suite.Len(tasks, 0) - - // test querynode-2 crash - suite.nodeMgr.Remove(2) - tasks = suite.checker.Check(context.TODO()) - suite.Len(tasks, 1) - suite.Equal(tasks[0].Source(), utils.LeaderChecker) - suite.Len(tasks[0].Actions(), 1) - suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow) - suite.Equal(tasks[0].Actions()[0].Node(), int64(1)) - suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1)) - suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) -} - func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegmentsWithReplicas() { observer := suite.checker observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2)) @@ -369,6 +318,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() { suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeReduce) suite.Equal(tasks[0].Actions()[0].Node(), int64(1)) suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(3)) + suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), int64(0)) suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) } diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 2e72fb3a8a..ee3062cfff 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -166,16 +166,18 @@ type LeaderAction struct { leaderID typeutil.UniqueID segmentID typeutil.UniqueID + version typeutil.UniqueID // segment load ts, 0 means not set rpcReturned atomic.Bool } -func NewLeaderAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID) *LeaderAction { +func NewLeaderAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, version typeutil.UniqueID) *LeaderAction { action := &LeaderAction{ BaseAction: NewBaseAction(workerID, typ, shard), leaderID: leaderID, segmentID: segmentID, + version: version, } action.rpcReturned.Store(false) return action @@ -185,6 +187,10 @@ func (action *LeaderAction) SegmentID() typeutil.UniqueID { return action.segmentID } +func (action *LeaderAction) Version() typeutil.UniqueID { + return action.version +} + func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool { views := distMgr.LeaderViewManager.GetLeaderView(action.leaderID) view := views[action.Shard()] diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 9c83b326ef..8dddbfe1e3 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -471,6 +471,7 @@ func (ex *Executor) setDistribution(task *LeaderTask, step int) error { SegmentID: action.SegmentID(), NodeID: action.Node(), Info: loadInfo, + Version: action.Version(), }, }, IndexInfoList: indexInfo, diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 103b402fb0..dc72855e2f 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1290,7 +1290,7 @@ func (suite *TaskSuite) TestLeaderTaskSet() { suite.collection, suite.replica, targetNode, - NewLeaderAction(targetNode, targetNode, ActionTypeGrow, channel.GetChannelName(), segment), + NewLeaderAction(targetNode, targetNode, ActionTypeGrow, channel.GetChannelName(), segment, 0), ) tasks = append(tasks, task) err := suite.scheduler.Add(task) @@ -1370,7 +1370,7 @@ func (suite *TaskSuite) TestCreateTaskBehavior() { suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(segmentTask) - leaderAction := NewLeaderAction(1, 2, ActionTypeGrow, "fake-channel1", 100) + leaderAction := NewLeaderAction(1, 2, ActionTypeGrow, "fake-channel1", 100, 0) leaderTask := NewLeaderTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, 0, 1, leaderAction) suite.NotNil(leaderTask) } @@ -1555,7 +1555,7 @@ func (suite *TaskSuite) TestLeaderTaskRemove() { suite.collection, suite.replica, targetNode, - NewLeaderAction(targetNode, targetNode, ActionTypeReduce, channel.GetChannelName(), segment), + NewLeaderAction(targetNode, targetNode, ActionTypeReduce, channel.GetChannelName(), segment, 0), ) tasks = append(tasks, task) err := suite.scheduler.Add(task)