diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index 9b717d1c01..233f568a37 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -127,6 +127,25 @@ func (m *SegmentDistManager) GetByShard(shard string) []*Segment { return ret } +// GetByShard returns all segments of the given collection. +func (m *SegmentDistManager) GetByShardWithReplica(shard string, replica *Replica) []*Segment { + m.rwmutex.RLock() + defer m.rwmutex.RUnlock() + + ret := make([]*Segment, 0) + for nodeID, segments := range m.segments { + if !replica.Nodes.Contain(nodeID) { + continue + } + for _, segment := range segments { + if segment.GetInsertChannel() == shard { + ret = append(ret, segment) + } + } + } + return ret +} + // GetByCollectionAndNode returns all segments of the given collection and node. func (m *SegmentDistManager) GetByCollectionAndNode(collectionID, nodeID UniqueID) []*Segment { m.rwmutex.RLock() diff --git a/internal/querycoordv2/observers/leader_observer.go b/internal/querycoordv2/observers/leader_observer.go index cabd032165..cd6cc62373 100644 --- a/internal/querycoordv2/observers/leader_observer.go +++ b/internal/querycoordv2/observers/leader_observer.go @@ -71,7 +71,7 @@ func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64 if leaderView == nil { continue } - dists := o.dist.SegmentDistManager.GetByShard(ch) + dists := o.dist.SegmentDistManager.GetByShardWithReplica(ch, replica) needLoaded, needRemoved := o.findNeedLoadedSegments(leaderView, dists), o.findNeedRemovedSegments(leaderView, dists) o.sync(ctx, leaderView, append(needLoaded, needRemoved...)) diff --git a/internal/querycoordv2/observers/leader_observer_test.go b/internal/querycoordv2/observers/leader_observer_test.go index ce98aa65d7..a3bb5e138d 100644 --- a/internal/querycoordv2/observers/leader_observer_test.go +++ b/internal/querycoordv2/observers/leader_observer_test.go @@ -92,6 +92,48 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() { ) } +func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() { + observer := suite.observer + observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2)) + observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + observer.meta.ReplicaManager.Put(utils.CreateTestReplica(2, 1, []int64{3, 4})) + observer.target.AddSegment(utils.CreateTestSegmentInfo(1, 1, 1, "test-insert-channel")) + observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel")) + observer.dist.SegmentDistManager.Update(4, utils.CreateTestSegment(1, 1, 1, 4, 2, "test-insert-channel")) + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, []int64{})) + observer.dist.LeaderViewManager.Update(4, utils.CreateTestLeaderView(4, 1, "test-insert-channel", map[int64]int64{1: 4}, []int64{})) + expectReq := &querypb.SyncDistributionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SyncDistribution, + }, + CollectionID: 1, + Channel: "test-insert-channel", + Actions: []*querypb.SyncAction{ + { + Type: querypb.SyncType_Set, + PartitionID: 1, + SegmentID: 1, + NodeID: 1, + }, + }, + } + called := atomic.NewBool(false) + suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), expectReq).Once(). + Run(func(args mock.Arguments) { called.Store(true) }). + Return(&commonpb.Status{}, nil) + + observer.Start(context.TODO()) + + suite.Eventually( + func() bool { + return called.Load() + }, + 10*time.Second, + 500*time.Millisecond, + ) +} + func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() { observer := suite.observer observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 47e41db31e..f39a6f91fd 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -130,11 +130,17 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { } }() + taskIDs := make([]int64, 0, len(mergeTask.tasks)) + segments := make([]int64, 0, len(mergeTask.tasks)) + for _, task := range mergeTask.tasks { + taskIDs = append(taskIDs, task.ID()) + segments = append(segments, task.SegmentID()) + } log := log.With( - zap.Int64("taskID", task.ID()), + zap.Int64s("taskIDs", taskIDs), zap.Int64("collectionID", task.CollectionID()), - zap.Int64("segmentID", task.segmentID), - zap.Int64("node", action.Node()), + zap.Int64s("segmentIDs", segments), + zap.Int64("nodeID", action.Node()), zap.Int64("source", task.SourceID()), ) @@ -148,6 +154,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { return } + log.Info("load segments...") ctx, cancel := context.WithTimeout(task.Context(), actionTimeout) status, err := ex.cluster.LoadSegments(ctx, leader, mergeTask.req) cancel() @@ -159,6 +166,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { log.Warn("failed to load segment", zap.String("reason", status.GetReason())) return } + log.Info("load segments done") } func (ex *Executor) removeTask(task Task, step int) { diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 3edc63d2fb..9e0f588c50 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -261,7 +261,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error { func (scheduler *taskScheduler) promote(task Task) error { log := log.With( zap.Int64("collection", task.CollectionID()), - zap.Int64("task", task.ID()), + zap.Int64("taskID", task.ID()), zap.Int64("source", task.SourceID()), ) err := scheduler.prePromote(task) @@ -405,26 +405,18 @@ func (scheduler *taskScheduler) schedule(node int64) { scheduler.tryPromoteAll() log.Debug("process tasks related to node", - zap.Int("processing-task-num", scheduler.processQueue.Len()), - zap.Int("waiting-task-num", scheduler.waitQueue.Len()), - zap.Int("segment-task-num", len(scheduler.segmentTasks)), - zap.Int("channel-task-num", len(scheduler.channelTasks)), + zap.Int("processingTaskNum", scheduler.processQueue.Len()), + zap.Int("waitingTaskNum", scheduler.waitQueue.Len()), + zap.Int("segmentTaskNum", len(scheduler.segmentTasks)), + zap.Int("channelTaskNum", len(scheduler.channelTasks)), ) // Process tasks toRemove := make([]Task, 0) scheduler.processQueue.Range(func(task Task) bool { - log.Debug("check task related", - zap.Int64("task", task.ID())) if scheduler.isRelated(task, node) { scheduler.process(task) - } else { - log.Debug("task not related, skip it", - zap.Int64("task", task.ID()), - zap.Int64("taskActionNode", task.Actions()[0].Node()), - ) } - if task.Status() != TaskStatusStarted { toRemove = append(toRemove, task) } @@ -440,10 +432,10 @@ func (scheduler *taskScheduler) schedule(node int64) { zap.Int("toRemoveNum", len(toRemove))) log.Debug("process tasks related to node done", - zap.Int("processing-task-num", scheduler.processQueue.Len()), - zap.Int("waiting-task-num", scheduler.waitQueue.Len()), - zap.Int("segment-task-num", len(scheduler.segmentTasks)), - zap.Int("channel-task-num", len(scheduler.channelTasks)), + zap.Int("processingTaskNum", scheduler.processQueue.Len()), + zap.Int("waitingTaskNum", scheduler.waitQueue.Len()), + zap.Int("segmentTaskNum", len(scheduler.segmentTasks)), + zap.Int("channelTaskNum", len(scheduler.channelTasks)), ) } @@ -477,7 +469,7 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool { // return true if the task is started and succeeds to commit the current action func (scheduler *taskScheduler) process(task Task) bool { log := log.With( - zap.Int64("task", task.ID()), + zap.Int64("taskID", task.ID()), zap.Int32("type", GetTaskType(task)), zap.Int64("source", task.SourceID()), ) @@ -531,7 +523,8 @@ func (scheduler *taskScheduler) RemoveByNode(node int64) { func (scheduler *taskScheduler) remove(task Task) { log := log.With( - zap.Int64("task", task.ID()), + zap.Int64("taskID", task.ID()), + zap.Int32("taskStatus", task.Status()), ) task.Cancel() scheduler.tasks.Remove(task.ID()) @@ -555,7 +548,7 @@ func (scheduler *taskScheduler) remove(task Task) { func (scheduler *taskScheduler) checkCanceled(task Task) bool { log := log.With( - zap.Int64("task", task.ID()), + zap.Int64("taskID", task.ID()), zap.Int64("source", task.SourceID()), ) @@ -571,7 +564,7 @@ func (scheduler *taskScheduler) checkCanceled(task Task) bool { func (scheduler *taskScheduler) checkStale(task Task) bool { log := log.With( - zap.Int64("task", task.ID()), + zap.Int64("taskID", task.ID()), zap.Int64("source", task.SourceID()), ) @@ -606,7 +599,7 @@ func (scheduler *taskScheduler) checkStale(task Task) bool { func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) bool { log := log.With( - zap.Int64("task", task.ID()), + zap.Int64("taskID", task.ID()), zap.Int64("source", task.SourceID()), ) @@ -652,7 +645,7 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) bool { func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) bool { log := log.With( - zap.Int64("task", task.ID()), + zap.Int64("taskID", task.ID()), zap.Int64("source", task.SourceID()), ) diff --git a/internal/querycoordv2/utils/test.go b/internal/querycoordv2/utils/test.go index f4b52aa0c2..2ffe9922e1 100644 --- a/internal/querycoordv2/utils/test.go +++ b/internal/querycoordv2/utils/test.go @@ -43,7 +43,7 @@ func CreateTestCollection(collection int64, replica int32) *meta.Collection { return &meta.Collection{ CollectionLoadInfo: &querypb.CollectionLoadInfo{ CollectionID: collection, - ReplicaNumber: 3, + ReplicaNumber: replica, }, } }