diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index f3e767f76b..cef0e4cd61 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -91,11 +91,24 @@ func (c *ChannelChecker) Check(ctx context.Context) []task.Task { } } + // clean channel which has been released channels := c.dist.ChannelDistManager.GetByFilter() released := utils.FilterReleased(channels, collectionIDs) releaseTasks := c.createChannelReduceTasks(ctx, released, meta.NilReplica) task.SetReason("collection released", releaseTasks...) tasks = append(tasks, releaseTasks...) + + // clean node which has been move out from replica + for _, nodeInfo := range c.nodeMgr.GetAll() { + nodeID := nodeInfo.ID() + replicas := c.meta.ReplicaManager.GetByNode(nodeID) + channels := c.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(nodeID)) + if len(replicas) == 0 && len(channels) != 0 { + reduceTasks := c.createChannelReduceTasks(ctx, channels, meta.NilReplica) + task.SetReason("dirty channel exists", reduceTasks...) + tasks = append(tasks, reduceTasks...) + } + } return tasks } diff --git a/internal/querycoordv2/checkers/channel_checker_test.go b/internal/querycoordv2/checkers/channel_checker_test.go index 58b5a7a75f..3fe1d50e31 100644 --- a/internal/querycoordv2/checkers/channel_checker_test.go +++ b/internal/querycoordv2/checkers/channel_checker_test.go @@ -227,6 +227,57 @@ func (suite *ChannelCheckerTestSuite) TestRepeatedChannels() { suite.EqualValues("test-insert-channel", action.ChannelName()) } +func (suite *ChannelCheckerTestSuite) TestReleaseDirtyChannels() { + checker := suite.checker + err := checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + suite.NoError(err) + err = checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1})) + suite.NoError(err) + + segments := []*datapb.SegmentInfo{ + { + ID: 1, + InsertChannel: "test-insert-channel", + }, + } + + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + checker.targetMgr.UpdateCollectionNextTarget(int64(1)) + checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 2, "test-insert-channel")) + checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 2, "test-insert-channel")) + checker.dist.LeaderViewManager.Update(1, &meta.LeaderView{ID: 1, Channel: "test-insert-channel"}) + checker.dist.LeaderViewManager.Update(2, &meta.LeaderView{ID: 2, Channel: "test-insert-channel"}) + + tasks := checker.Check(context.TODO()) + suite.Len(tasks, 1) + suite.EqualValues(-1, tasks[0].ReplicaID()) + suite.Len(tasks[0].Actions(), 1) + suite.IsType((*task.ChannelAction)(nil), tasks[0].Actions()[0]) + action := tasks[0].Actions()[0].(*task.ChannelAction) + suite.Equal(task.ActionTypeReduce, action.Type()) + suite.EqualValues(int64(2), action.Node()) + suite.EqualValues("test-insert-channel", action.ChannelName()) +} + func TestChannelCheckerSuite(t *testing.T) { suite.Run(t, new(ChannelCheckerTestSuite)) } diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 1fa04b3eaa..8c0389950a 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -104,6 +104,19 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task { task.SetPriority(task.TaskPriorityNormal, reduceTasks...) results = append(results, reduceTasks...) + // clean node which has been move out from replica + for _, nodeInfo := range c.nodeMgr.GetAll() { + nodeID := nodeInfo.ID() + replicas := c.meta.ReplicaManager.GetByNode(nodeID) + segments := c.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID)) + if len(replicas) == 0 && len(segments) != 0 { + reduceTasks := c.createSegmentReduceTasks(ctx, segments, meta.NilReplica, querypb.DataScope_Historical) + task.SetReason("dirty segment exists", reduceTasks...) + task.SetPriority(task.TaskPriorityNormal, reduceTasks...) + results = append(results, reduceTasks...) + } + } + return results } diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 7418889307..c87b7acd42 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -456,6 +456,58 @@ func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() { suite.Len(tasks, 0) } +func (suite *SegmentCheckerTestSuite) TestReleaseDirtySegments() { + checker := suite.checker + // set meta + checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1})) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + + // set target + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + checker.targetMgr.UpdateCollectionNextTarget(int64(1)) + + // set dist + checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{1: 2}, map[int64]*meta.Segment{})) + checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel")) + + tasks := checker.Check(context.TODO()) + suite.Len(tasks, 1) + suite.Len(tasks[0].Actions(), 1) + action, ok := tasks[0].Actions()[0].(*task.SegmentAction) + suite.True(ok) + suite.EqualValues(-1, tasks[0].ReplicaID()) + suite.Equal(task.ActionTypeReduce, action.Type()) + suite.EqualValues(1, action.SegmentID()) + suite.EqualValues(2, action.Node()) + suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) +} + func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() { checker := suite.checker