fix: Add atomic method to get collection target (#29577)

Related to #29575

Add `getCollectionTarget` method which is atomic when scope is
`CurrentTargetFirst` or `NextTargetFirst`
Also return error when executor finds no channel in target manager

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-12-29 09:04:46 +08:00 committed by GitHub
parent a8b7629315
commit a3cb8e2625
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 182 additions and 50 deletions

View File

@ -37,6 +37,8 @@ type TargetScope = int32
const (
CurrentTarget TargetScope = iota + 1
NextTarget
CurrentTargetFirst
NextTargetFirst
)
type TargetManager struct {
@ -317,12 +319,24 @@ func (mgr *TargetManager) removePartitionFromCollectionTarget(oldTarget *Collect
return NewCollectionTarget(segments, channels)
}
func (mgr *TargetManager) getTarget(scope TargetScope) *target {
if scope == CurrentTarget {
return mgr.current
func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID int64) *CollectionTarget {
switch scope {
case CurrentTarget:
return mgr.current.collectionTargetMap[collectionID]
case NextTarget:
return mgr.next.collectionTargetMap[collectionID]
case CurrentTargetFirst:
if current := mgr.current.collectionTargetMap[collectionID]; current != nil {
return current
}
return mgr.next.collectionTargetMap[collectionID]
case NextTargetFirst:
if next := mgr.next.collectionTargetMap[collectionID]; next != nil {
return next
}
return mgr.current.collectionTargetMap[collectionID]
}
return mgr.next
return nil
}
func (mgr *TargetManager) GetGrowingSegmentsByCollection(collectionID int64,
@ -331,8 +345,7 @@ func (mgr *TargetManager) GetGrowingSegmentsByCollection(collectionID int64,
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targetMap := mgr.getTarget(scope)
collectionTarget := targetMap.getCollectionTarget(collectionID)
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
if collectionTarget == nil {
return nil
@ -353,8 +366,7 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64,
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targetMap := mgr.getTarget(scope)
collectionTarget := targetMap.getCollectionTarget(collectionID)
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
if collectionTarget == nil {
return nil
@ -376,8 +388,7 @@ func (mgr *TargetManager) GetSealedSegmentsByCollection(collectionID int64,
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targetMap := mgr.getTarget(scope)
collectionTarget := targetMap.getCollectionTarget(collectionID)
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
if collectionTarget == nil {
return nil
@ -392,9 +403,7 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(collectionID int64,
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targetMap := mgr.getTarget(scope)
collectionTarget := targetMap.getCollectionTarget(collectionID)
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
if collectionTarget == nil {
return nil
}
@ -416,8 +425,7 @@ func (mgr *TargetManager) GetDroppedSegmentsByChannel(collectionID int64,
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targetMap := mgr.getTarget(scope)
collectionTarget := targetMap.getCollectionTarget(collectionID)
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
if collectionTarget == nil {
return nil
@ -437,8 +445,7 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64,
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targetMap := mgr.getTarget(scope)
collectionTarget := targetMap.getCollectionTarget(collectionID)
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
if collectionTarget == nil {
return nil
@ -458,8 +465,7 @@ func (mgr *TargetManager) GetDmChannelsByCollection(collectionID int64, scope Ta
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targetMap := mgr.getTarget(scope)
collectionTarget := targetMap.getCollectionTarget(collectionID)
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
if collectionTarget == nil {
return nil
@ -471,8 +477,7 @@ func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targetMap := mgr.getTarget(scope)
collectionTarget := targetMap.getCollectionTarget(collectionID)
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
if collectionTarget == nil {
return nil
@ -483,8 +488,7 @@ func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope
func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targetMap := mgr.getTarget(scope)
collectionTarget := targetMap.getCollectionTarget(collectionID)
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
if collectionTarget == nil {
return nil
@ -495,8 +499,7 @@ func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope T
func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64 {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targetMap := mgr.getTarget(scope)
collectionTarget := targetMap.getCollectionTarget(collectionID)
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
if collectionTarget == nil {
return 0

View File

@ -415,6 +415,135 @@ func (suite *TargetManagerSuite) TestGetSegmentByChannel() {
suite.Len(suite.mgr.GetDroppedSegmentsByChannel(collectionID, "channel-1", NextTarget), 3)
}
func (suite *TargetManagerSuite) TestGetTarget() {
type testCase struct {
tag string
mgr *TargetManager
scope TargetScope
expectTarget *CollectionTarget
}
current := &CollectionTarget{}
next := &CollectionTarget{}
bothMgr := &TargetManager{
current: &target{
collectionTargetMap: map[int64]*CollectionTarget{
1000: current,
},
},
next: &target{
collectionTargetMap: map[int64]*CollectionTarget{
1000: current,
},
},
}
currentMgr := &TargetManager{
current: &target{
collectionTargetMap: map[int64]*CollectionTarget{
1000: current,
},
},
next: &target{},
}
nextMgr := &TargetManager{
next: &target{
collectionTargetMap: map[int64]*CollectionTarget{
1000: current,
},
},
current: &target{},
}
cases := []testCase{
{
tag: "both_scope_unknown",
mgr: bothMgr,
scope: -1,
expectTarget: nil,
},
{
tag: "both_scope_current",
mgr: bothMgr,
scope: CurrentTarget,
expectTarget: current,
},
{
tag: "both_scope_next",
mgr: bothMgr,
scope: NextTarget,
expectTarget: next,
},
{
tag: "both_scope_current_first",
mgr: bothMgr,
scope: CurrentTargetFirst,
expectTarget: current,
},
{
tag: "both_scope_next_first",
mgr: bothMgr,
scope: NextTargetFirst,
expectTarget: next,
},
{
tag: "next_scope_current",
mgr: nextMgr,
scope: CurrentTarget,
expectTarget: nil,
},
{
tag: "next_scope_next",
mgr: nextMgr,
scope: NextTarget,
expectTarget: next,
},
{
tag: "next_scope_current_first",
mgr: nextMgr,
scope: CurrentTargetFirst,
expectTarget: next,
},
{
tag: "next_scope_next_first",
mgr: nextMgr,
scope: NextTargetFirst,
expectTarget: next,
},
{
tag: "current_scope_current",
mgr: currentMgr,
scope: CurrentTarget,
expectTarget: current,
},
{
tag: "current_scope_next",
mgr: currentMgr,
scope: NextTarget,
expectTarget: nil,
},
{
tag: "current_scope_current_first",
mgr: currentMgr,
scope: CurrentTargetFirst,
expectTarget: current,
},
{
tag: "current_scope_next_first",
mgr: currentMgr,
scope: NextTargetFirst,
expectTarget: current,
},
}
for _, tc := range cases {
suite.Run(tc.tag, func() {
target := tc.mgr.getCollectionTarget(tc.scope, 1000)
suite.Equal(tc.expectTarget, target)
})
}
}
func TestTargetManager(t *testing.T) {
suite.Run(t, new(TargetManagerSuite))
}

View File

@ -152,26 +152,24 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis
continue
}
channel := o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.CurrentTarget)
if channel == nil {
channel = o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.NextTarget)
}
loadInfo := utils.PackSegmentLoadInfo(resp.GetInfos()[0], channel.GetSeekPosition(), nil)
if channel := o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.CurrentTargetFirst); channel != nil {
loadInfo := utils.PackSegmentLoadInfo(resp.GetInfos()[0], channel.GetSeekPosition(), nil)
log.Debug("leader observer append a segment to set",
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channel", leaderView.Channel),
zap.Int64("leaderViewID", leaderView.ID),
zap.Int64("segmentID", s.GetID()),
zap.Int64("nodeID", s.Node))
ret = append(ret, &querypb.SyncAction{
Type: querypb.SyncType_Set,
PartitionID: s.GetPartitionID(),
SegmentID: s.GetID(),
NodeID: s.Node,
Version: s.Version,
Info: loadInfo,
})
log.Debug("leader observer append a segment to set",
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channel", leaderView.Channel),
zap.Int64("leaderViewID", leaderView.ID),
zap.Int64("segmentID", s.GetID()),
zap.Int64("nodeID", s.Node))
ret = append(ret, &querypb.SyncAction{
Type: querypb.SyncType_Set,
PartitionID: s.GetPartitionID(),
SegmentID: s.GetID(),
NodeID: s.Node,
Version: s.Version,
Info: loadInfo,
})
}
}
}
return ret

View File

@ -179,12 +179,19 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
task.CollectionID(),
partitions...,
)
// get channel first, in case of target updated after segment info fetched
channel := ex.targetMgr.GetDmChannel(task.CollectionID(), task.shard, meta.NextTargetFirst)
if channel == nil {
return merr.WrapErrChannelNotAvailable(task.shard)
}
resp, err := ex.broker.GetSegmentInfo(ctx, task.SegmentID())
if err != nil || len(resp.GetInfos()) == 0 {
log.Warn("failed to get segment info from DataCoord", zap.Error(err))
return err
}
segment := resp.GetInfos()[0]
indexes, err := ex.broker.GetIndexInfo(ctx, task.CollectionID(), segment.GetID())
if err != nil {
if !errors.Is(err, merr.ErrIndexNotFound) {
@ -194,11 +201,6 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
indexes = nil
}
channel := ex.targetMgr.GetDmChannel(task.CollectionID(), segment.GetInsertChannel(), meta.NextTarget)
if channel == nil {
channel = ex.targetMgr.GetDmChannel(task.CollectionID(), segment.GetInsertChannel(), meta.CurrentTarget)
}
// Get collection index info
indexInfos, err := ex.broker.DescribeIndex(ctx, task.CollectionID())
if err != nil {