Fix leader observer confused with replicas (#19392)

With 2 replicas, the same segment was loaded on 2 nodes in different
replicas, leader observer will sync the latest one (by version) to the
other one.

Signed-off-by: yah01 <yang.cen@zilliz.com>

Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
yah01 2022-09-23 15:16:51 +08:00 committed by GitHub
parent 928a213e31
commit 2cfacbba8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 90 additions and 28 deletions

View File

@ -127,6 +127,25 @@ func (m *SegmentDistManager) GetByShard(shard string) []*Segment {
return ret
}
// GetByShard returns all segments of the given collection.
func (m *SegmentDistManager) GetByShardWithReplica(shard string, replica *Replica) []*Segment {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
ret := make([]*Segment, 0)
for nodeID, segments := range m.segments {
if !replica.Nodes.Contain(nodeID) {
continue
}
for _, segment := range segments {
if segment.GetInsertChannel() == shard {
ret = append(ret, segment)
}
}
}
return ret
}
// GetByCollectionAndNode returns all segments of the given collection and node.
func (m *SegmentDistManager) GetByCollectionAndNode(collectionID, nodeID UniqueID) []*Segment {
m.rwmutex.RLock()

View File

@ -71,7 +71,7 @@ func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64
if leaderView == nil {
continue
}
dists := o.dist.SegmentDistManager.GetByShard(ch)
dists := o.dist.SegmentDistManager.GetByShardWithReplica(ch, replica)
needLoaded, needRemoved := o.findNeedLoadedSegments(leaderView, dists),
o.findNeedRemovedSegments(leaderView, dists)
o.sync(ctx, leaderView, append(needLoaded, needRemoved...))

View File

@ -92,6 +92,48 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
)
}
func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(2, 1, []int64{3, 4}))
observer.target.AddSegment(utils.CreateTestSegmentInfo(1, 1, 1, "test-insert-channel"))
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
observer.dist.SegmentDistManager.Update(4, utils.CreateTestSegment(1, 1, 1, 4, 2, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, []int64{}))
observer.dist.LeaderViewManager.Update(4, utils.CreateTestLeaderView(4, 1, "test-insert-channel", map[int64]int64{1: 4}, []int64{}))
expectReq := &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: 1,
SegmentID: 1,
NodeID: 1,
},
},
}
called := atomic.NewBool(false)
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), expectReq).Once().
Run(func(args mock.Arguments) { called.Store(true) }).
Return(&commonpb.Status{}, nil)
observer.Start(context.TODO())
suite.Eventually(
func() bool {
return called.Load()
},
10*time.Second,
500*time.Millisecond,
)
}
func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() {
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))

View File

@ -130,11 +130,17 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
}
}()
taskIDs := make([]int64, 0, len(mergeTask.tasks))
segments := make([]int64, 0, len(mergeTask.tasks))
for _, task := range mergeTask.tasks {
taskIDs = append(taskIDs, task.ID())
segments = append(segments, task.SegmentID())
}
log := log.With(
zap.Int64("taskID", task.ID()),
zap.Int64s("taskIDs", taskIDs),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("segmentID", task.segmentID),
zap.Int64("node", action.Node()),
zap.Int64s("segmentIDs", segments),
zap.Int64("nodeID", action.Node()),
zap.Int64("source", task.SourceID()),
)
@ -148,6 +154,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
return
}
log.Info("load segments...")
ctx, cancel := context.WithTimeout(task.Context(), actionTimeout)
status, err := ex.cluster.LoadSegments(ctx, leader, mergeTask.req)
cancel()
@ -159,6 +166,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
log.Warn("failed to load segment", zap.String("reason", status.GetReason()))
return
}
log.Info("load segments done")
}
func (ex *Executor) removeTask(task Task, step int) {

View File

@ -261,7 +261,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
func (scheduler *taskScheduler) promote(task Task) error {
log := log.With(
zap.Int64("collection", task.CollectionID()),
zap.Int64("task", task.ID()),
zap.Int64("taskID", task.ID()),
zap.Int64("source", task.SourceID()),
)
err := scheduler.prePromote(task)
@ -405,26 +405,18 @@ func (scheduler *taskScheduler) schedule(node int64) {
scheduler.tryPromoteAll()
log.Debug("process tasks related to node",
zap.Int("processing-task-num", scheduler.processQueue.Len()),
zap.Int("waiting-task-num", scheduler.waitQueue.Len()),
zap.Int("segment-task-num", len(scheduler.segmentTasks)),
zap.Int("channel-task-num", len(scheduler.channelTasks)),
zap.Int("processingTaskNum", scheduler.processQueue.Len()),
zap.Int("waitingTaskNum", scheduler.waitQueue.Len()),
zap.Int("segmentTaskNum", len(scheduler.segmentTasks)),
zap.Int("channelTaskNum", len(scheduler.channelTasks)),
)
// Process tasks
toRemove := make([]Task, 0)
scheduler.processQueue.Range(func(task Task) bool {
log.Debug("check task related",
zap.Int64("task", task.ID()))
if scheduler.isRelated(task, node) {
scheduler.process(task)
} else {
log.Debug("task not related, skip it",
zap.Int64("task", task.ID()),
zap.Int64("taskActionNode", task.Actions()[0].Node()),
)
}
if task.Status() != TaskStatusStarted {
toRemove = append(toRemove, task)
}
@ -440,10 +432,10 @@ func (scheduler *taskScheduler) schedule(node int64) {
zap.Int("toRemoveNum", len(toRemove)))
log.Debug("process tasks related to node done",
zap.Int("processing-task-num", scheduler.processQueue.Len()),
zap.Int("waiting-task-num", scheduler.waitQueue.Len()),
zap.Int("segment-task-num", len(scheduler.segmentTasks)),
zap.Int("channel-task-num", len(scheduler.channelTasks)),
zap.Int("processingTaskNum", scheduler.processQueue.Len()),
zap.Int("waitingTaskNum", scheduler.waitQueue.Len()),
zap.Int("segmentTaskNum", len(scheduler.segmentTasks)),
zap.Int("channelTaskNum", len(scheduler.channelTasks)),
)
}
@ -477,7 +469,7 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool {
// return true if the task is started and succeeds to commit the current action
func (scheduler *taskScheduler) process(task Task) bool {
log := log.With(
zap.Int64("task", task.ID()),
zap.Int64("taskID", task.ID()),
zap.Int32("type", GetTaskType(task)),
zap.Int64("source", task.SourceID()),
)
@ -531,7 +523,8 @@ func (scheduler *taskScheduler) RemoveByNode(node int64) {
func (scheduler *taskScheduler) remove(task Task) {
log := log.With(
zap.Int64("task", task.ID()),
zap.Int64("taskID", task.ID()),
zap.Int32("taskStatus", task.Status()),
)
task.Cancel()
scheduler.tasks.Remove(task.ID())
@ -555,7 +548,7 @@ func (scheduler *taskScheduler) remove(task Task) {
func (scheduler *taskScheduler) checkCanceled(task Task) bool {
log := log.With(
zap.Int64("task", task.ID()),
zap.Int64("taskID", task.ID()),
zap.Int64("source", task.SourceID()),
)
@ -571,7 +564,7 @@ func (scheduler *taskScheduler) checkCanceled(task Task) bool {
func (scheduler *taskScheduler) checkStale(task Task) bool {
log := log.With(
zap.Int64("task", task.ID()),
zap.Int64("taskID", task.ID()),
zap.Int64("source", task.SourceID()),
)
@ -606,7 +599,7 @@ func (scheduler *taskScheduler) checkStale(task Task) bool {
func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) bool {
log := log.With(
zap.Int64("task", task.ID()),
zap.Int64("taskID", task.ID()),
zap.Int64("source", task.SourceID()),
)
@ -652,7 +645,7 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) bool {
func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) bool {
log := log.With(
zap.Int64("task", task.ID()),
zap.Int64("taskID", task.ID()),
zap.Int64("source", task.SourceID()),
)

View File

@ -43,7 +43,7 @@ func CreateTestCollection(collection int64, replica int32) *meta.Collection {
return &meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: collection,
ReplicaNumber: 3,
ReplicaNumber: replica,
},
}
}