mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Make segment loaded successful put in manager even ctx done (#26992)
Leave segment loaded in manager even wait other segment failed See also #26908 Fix error case in distributed scenario Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
3203ce1654
commit
ac45af585b
@ -192,13 +192,15 @@ func (loader *segmentLoader) Load(ctx context.Context,
|
||||
}
|
||||
defer loader.freeRequest(resource)
|
||||
|
||||
newSegments := make(map[int64]*LocalSegment, len(infos))
|
||||
clearAll := func() {
|
||||
for _, s := range newSegments {
|
||||
newSegments := typeutil.NewConcurrentMap[int64, *LocalSegment]()
|
||||
loaded := typeutil.NewConcurrentMap[int64, *LocalSegment]()
|
||||
defer func() {
|
||||
newSegments.Range(func(_ int64, s *LocalSegment) bool {
|
||||
s.Release()
|
||||
}
|
||||
return true
|
||||
})
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
}()
|
||||
|
||||
for _, info := range infos {
|
||||
segmentID := info.SegmentID
|
||||
@ -210,7 +212,6 @@ func (loader *segmentLoader) Load(ctx context.Context,
|
||||
if collection == nil {
|
||||
err := merr.WrapErrCollectionNotFound(collectionID)
|
||||
log.Warn("failed to get collection", zap.Error(err))
|
||||
clearAll()
|
||||
return nil, err
|
||||
}
|
||||
segment, err := NewSegment(collection, segmentID, partitionID, collectionID, shard, segmentType, version, info.GetStartPosition(), info.GetDeltaPosition())
|
||||
@ -220,18 +221,17 @@ func (loader *segmentLoader) Load(ctx context.Context,
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.Error(err),
|
||||
)
|
||||
clearAll()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
newSegments[segmentID] = segment
|
||||
newSegments.Insert(segmentID, segment)
|
||||
}
|
||||
|
||||
loadSegmentFunc := func(idx int) error {
|
||||
loadInfo := infos[idx]
|
||||
partitionID := loadInfo.PartitionID
|
||||
segmentID := loadInfo.SegmentID
|
||||
segment := newSegments[segmentID]
|
||||
segment, _ := newSegments.Get(segmentID)
|
||||
|
||||
tr := timerecord.NewTimeRecorder("loadDurationPerSegment")
|
||||
err := loader.loadSegment(ctx, segment, loadInfo)
|
||||
@ -243,6 +243,9 @@ func (loader *segmentLoader) Load(ctx context.Context,
|
||||
)
|
||||
return err
|
||||
}
|
||||
loader.manager.Segment.Put(segmentType, segment)
|
||||
newSegments.GetAndRemove(segmentID)
|
||||
loaded.Insert(segmentID, segment)
|
||||
log.Info("load segment done", zap.Int64("segmentID", segmentID))
|
||||
loader.notifyLoadFinish(loadInfo)
|
||||
|
||||
@ -258,25 +261,23 @@ func (loader *segmentLoader) Load(ctx context.Context,
|
||||
err = funcutil.ProcessFuncParallel(len(infos),
|
||||
concurrencyLevel, loadSegmentFunc, "loadSegmentFunc")
|
||||
if err != nil {
|
||||
clearAll()
|
||||
log.Warn("failed to load some segments", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wait for all segments loaded
|
||||
if err := loader.waitSegmentLoadDone(ctx, segmentType, lo.Map(segments, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })...); err != nil {
|
||||
clearAll()
|
||||
log.Warn("failed to wait the filtered out segments load done", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
loaded := make([]Segment, 0, len(newSegments))
|
||||
for _, segment := range newSegments {
|
||||
loaded = append(loaded, segment)
|
||||
}
|
||||
loader.manager.Segment.Put(segmentType, loaded...)
|
||||
log.Info("all segment load done")
|
||||
return loaded, nil
|
||||
var result []Segment
|
||||
loaded.Range(func(_ int64, s *LocalSegment) bool {
|
||||
result = append(result, s)
|
||||
return true
|
||||
})
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (loader *segmentLoader) prepare(segmentType SegmentType, version int64, segments ...*querypb.SegmentLoadInfo) []*querypb.SegmentLoadInfo {
|
||||
|
Loading…
Reference in New Issue
Block a user