mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
fix: Clean dirty segment/channel on querynode (#36202)
issue: #36201 after querynode has been remove from replica, all dirty segment/channel on it should be released. Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
b4c1603a57
commit
fb2a41a94c
@ -91,11 +91,24 @@ func (c *ChannelChecker) Check(ctx context.Context) []task.Task {
|
||||
}
|
||||
}
|
||||
|
||||
// clean channel which has been released
|
||||
channels := c.dist.ChannelDistManager.GetByFilter()
|
||||
released := utils.FilterReleased(channels, collectionIDs)
|
||||
releaseTasks := c.createChannelReduceTasks(ctx, released, meta.NilReplica)
|
||||
task.SetReason("collection released", releaseTasks...)
|
||||
tasks = append(tasks, releaseTasks...)
|
||||
|
||||
// clean node which has been move out from replica
|
||||
for _, nodeInfo := range c.nodeMgr.GetAll() {
|
||||
nodeID := nodeInfo.ID()
|
||||
replicas := c.meta.ReplicaManager.GetByNode(nodeID)
|
||||
channels := c.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(nodeID))
|
||||
if len(replicas) == 0 && len(channels) != 0 {
|
||||
reduceTasks := c.createChannelReduceTasks(ctx, channels, meta.NilReplica)
|
||||
task.SetReason("dirty channel exists", reduceTasks...)
|
||||
tasks = append(tasks, reduceTasks...)
|
||||
}
|
||||
}
|
||||
return tasks
|
||||
}
|
||||
|
||||
|
@ -227,6 +227,57 @@ func (suite *ChannelCheckerTestSuite) TestRepeatedChannels() {
|
||||
suite.EqualValues("test-insert-channel", action.ChannelName())
|
||||
}
|
||||
|
||||
func (suite *ChannelCheckerTestSuite) TestReleaseDirtyChannels() {
|
||||
checker := suite.checker
|
||||
err := checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
|
||||
suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
|
||||
suite.NoError(err)
|
||||
err = checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1}))
|
||||
suite.NoError(err)
|
||||
|
||||
segments := []*datapb.SegmentInfo{
|
||||
{
|
||||
ID: 1,
|
||||
InsertChannel: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
|
||||
channels := []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: 1,
|
||||
ChannelName: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: 1,
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: 2,
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, segments, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 2, "test-insert-channel"))
|
||||
checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 2, "test-insert-channel"))
|
||||
checker.dist.LeaderViewManager.Update(1, &meta.LeaderView{ID: 1, Channel: "test-insert-channel"})
|
||||
checker.dist.LeaderViewManager.Update(2, &meta.LeaderView{ID: 2, Channel: "test-insert-channel"})
|
||||
|
||||
tasks := checker.Check(context.TODO())
|
||||
suite.Len(tasks, 1)
|
||||
suite.EqualValues(-1, tasks[0].ReplicaID())
|
||||
suite.Len(tasks[0].Actions(), 1)
|
||||
suite.IsType((*task.ChannelAction)(nil), tasks[0].Actions()[0])
|
||||
action := tasks[0].Actions()[0].(*task.ChannelAction)
|
||||
suite.Equal(task.ActionTypeReduce, action.Type())
|
||||
suite.EqualValues(int64(2), action.Node())
|
||||
suite.EqualValues("test-insert-channel", action.ChannelName())
|
||||
}
|
||||
|
||||
func TestChannelCheckerSuite(t *testing.T) {
|
||||
suite.Run(t, new(ChannelCheckerTestSuite))
|
||||
}
|
||||
|
@ -104,6 +104,19 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task {
|
||||
task.SetPriority(task.TaskPriorityNormal, reduceTasks...)
|
||||
results = append(results, reduceTasks...)
|
||||
|
||||
// clean node which has been move out from replica
|
||||
for _, nodeInfo := range c.nodeMgr.GetAll() {
|
||||
nodeID := nodeInfo.ID()
|
||||
replicas := c.meta.ReplicaManager.GetByNode(nodeID)
|
||||
segments := c.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID))
|
||||
if len(replicas) == 0 && len(segments) != 0 {
|
||||
reduceTasks := c.createSegmentReduceTasks(ctx, segments, meta.NilReplica, querypb.DataScope_Historical)
|
||||
task.SetReason("dirty segment exists", reduceTasks...)
|
||||
task.SetPriority(task.TaskPriorityNormal, reduceTasks...)
|
||||
results = append(results, reduceTasks...)
|
||||
}
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
|
@ -456,6 +456,58 @@ func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() {
|
||||
suite.Len(tasks, 0)
|
||||
}
|
||||
|
||||
func (suite *SegmentCheckerTestSuite) TestReleaseDirtySegments() {
|
||||
checker := suite.checker
|
||||
// set meta
|
||||
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
|
||||
checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
|
||||
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: 1,
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: 2,
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
|
||||
// set target
|
||||
segments := []*datapb.SegmentInfo{
|
||||
{
|
||||
ID: 1,
|
||||
PartitionID: 1,
|
||||
InsertChannel: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
channels := []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: 1,
|
||||
ChannelName: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, segments, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
|
||||
// set dist
|
||||
checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
|
||||
checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{1: 2}, map[int64]*meta.Segment{}))
|
||||
checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
|
||||
|
||||
tasks := checker.Check(context.TODO())
|
||||
suite.Len(tasks, 1)
|
||||
suite.Len(tasks[0].Actions(), 1)
|
||||
action, ok := tasks[0].Actions()[0].(*task.SegmentAction)
|
||||
suite.True(ok)
|
||||
suite.EqualValues(-1, tasks[0].ReplicaID())
|
||||
suite.Equal(task.ActionTypeReduce, action.Type())
|
||||
suite.EqualValues(1, action.SegmentID())
|
||||
suite.EqualValues(2, action.Node())
|
||||
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
|
||||
}
|
||||
|
||||
func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() {
|
||||
checker := suite.checker
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user