fix: segment version doesn't update as expected (#30951)

issue: #30950

due to segment version doesn't update as expected.
This PR will update segment version until segment become loaded

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-03-01 14:21:10 +08:00 committed by GitHub
parent 6387403639
commit cc46d6bafc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 20 additions and 17 deletions

View File

@ -133,7 +133,7 @@ func (loader *segmentLoaderV2) Load(ctx context.Context,
return nil, nil
}
// Filter out loaded & loading segments
infos := loader.prepare(segmentType, version, segments...)
infos := loader.prepare(segmentType, segments...)
defer loader.unregister(infos...)
log.With(
@ -233,7 +233,8 @@ func (loader *segmentLoaderV2) Load(ctx context.Context,
}
// Wait for all segments loaded
if err := loader.waitSegmentLoadDone(ctx, segmentType, lo.Map(segments, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })...); err != nil {
segmentIDs := lo.Map(segments, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })
if err := loader.waitSegmentLoadDone(ctx, segmentType, segmentIDs, version); err != nil {
log.Warn("failed to wait the filtered out segments load done", zap.Error(err))
return nil, err
}
@ -554,7 +555,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
return nil, nil
}
// Filter out loaded & loading segments
infos := loader.prepare(segmentType, version, segments...)
infos := loader.prepare(segmentType, segments...)
defer loader.unregister(infos...)
log.With(
@ -670,7 +671,8 @@ func (loader *segmentLoader) Load(ctx context.Context,
}
// Wait for all segments loaded
if err := loader.waitSegmentLoadDone(ctx, segmentType, lo.Map(segments, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })...); err != nil {
segmentIDs := lo.Map(segments, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })
if err := loader.waitSegmentLoadDone(ctx, segmentType, segmentIDs, version); err != nil {
log.Warn("failed to wait the filtered out segments load done", zap.Error(err))
return nil, err
}
@ -684,7 +686,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
return result, nil
}
func (loader *segmentLoader) prepare(segmentType SegmentType, version int64, segments ...*querypb.SegmentLoadInfo) []*querypb.SegmentLoadInfo {
func (loader *segmentLoader) prepare(segmentType SegmentType, segments ...*querypb.SegmentLoadInfo) []*querypb.SegmentLoadInfo {
loader.mut.Lock()
defer loader.mut.Unlock()
@ -697,9 +699,6 @@ func (loader *segmentLoader) prepare(segmentType SegmentType, version int64, seg
infos = append(infos, segment)
loader.loadingSegments.Insert(segment.GetSegmentID(), newLoadResult())
} else {
// try to update segment version before skip load operation
loader.manager.Segment.UpdateBy(IncreaseVersion(version),
WithType(segmentType), WithID(segment.SegmentID))
log.Info("skip loaded/loading segment", zap.Int64("segmentID", segment.GetSegmentID()),
zap.Bool("isLoaded", len(loader.manager.Segment.GetBy(WithType(segmentType), WithID(segment.GetSegmentID()))) > 0),
zap.Bool("isLoading", loader.loadingSegments.Contain(segment.GetSegmentID())),
@ -798,7 +797,7 @@ func (loader *segmentLoader) freeRequest(resource LoadResource) {
loader.committedResource.Sub(resource)
}
func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentType SegmentType, segmentIDs ...int64) error {
func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentType SegmentType, segmentIDs []int64, version int64) error {
log := log.Ctx(ctx).With(
zap.String("segmentType", segmentType.String()),
zap.Int64s("segmentIDs", segmentIDs),
@ -841,6 +840,9 @@ func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentTyp
return merr.WrapErrSegmentLack(segmentID, "failed to wait segment loaded")
}
// try to update segment version after wait segment loaded
loader.manager.Segment.UpdateBy(IncreaseVersion(version), WithType(segmentType), WithID(segmentID))
log.Info("segment loaded...", zap.Int64("segmentID", segmentID))
}
return nil
@ -1473,7 +1475,7 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen
// Filter out LOADING segments only
// use None to avoid loaded check
infos := loader.prepare(commonpb.SegmentState_SegmentStateNone, version, loadInfo)
infos := loader.prepare(commonpb.SegmentState_SegmentStateNone, loadInfo)
defer loader.unregister(infos...)
indexInfo := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *querypb.SegmentLoadInfo {
@ -1515,7 +1517,7 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen
loader.notifyLoadFinish(loadInfo)
}
return loader.waitSegmentLoadDone(ctx, commonpb.SegmentState_SegmentStateNone, loadInfo.GetSegmentID())
return loader.waitSegmentLoadDone(ctx, commonpb.SegmentState_SegmentStateNone, []int64{loadInfo.GetSegmentID()}, version)
}
func getBinlogDataSize(fieldBinlog *datapb.FieldBinlog) int64 {

View File

@ -713,14 +713,15 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
}
return nil
})
infos = suite.loader.prepare(SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
suite.segmentManager.EXPECT().UpdateBy(mock.Anything, mock.Anything, mock.Anything).Return(0)
infos = suite.loader.prepare(SegmentTypeSealed, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
NumOfRows: 100,
})
err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, suite.segmentID)
err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, []int64{suite.segmentID}, 0)
suite.NoError(err)
})
@ -741,14 +742,14 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
return nil
})
infos = suite.loader.prepare(SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
infos = suite.loader.prepare(SegmentTypeSealed, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
NumOfRows: 100,
})
err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, suite.segmentID)
err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, []int64{suite.segmentID}, 0)
suite.Error(err)
})
@ -759,7 +760,7 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
suite.segmentManager.EXPECT().GetWithType(suite.segmentID, SegmentTypeSealed).RunAndReturn(func(segmentID int64, segmentType commonpb.SegmentState) Segment {
return nil
})
suite.loader.prepare(SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
suite.loader.prepare(SegmentTypeSealed, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
@ -769,7 +770,7 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := suite.loader.waitSegmentLoadDone(ctx, SegmentTypeSealed, suite.segmentID)
err := suite.loader.waitSegmentLoadDone(ctx, SegmentTypeSealed, []int64{suite.segmentID}, 0)
suite.Error(err)
suite.True(merr.IsCanceledOrTimeout(err))
})