mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
fix: Check resource when loading deltalogs (#37195)
Related to #36887 `LoadDeltaLogs` API did not check memory usage. When system is under high delete load pressure, this could result into OOM quit. This PR add resource check for `LoadDeltaLogs` actions and separate internal deltalog loading function with public one. --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
224d797f94
commit
5a0135727d
@ -72,6 +72,8 @@ type Loader interface {
|
||||
// NOTE: make sure the ref count of the corresponding collection will never go down to 0 during this
|
||||
Load(ctx context.Context, collectionID int64, segmentType SegmentType, version int64, segments ...*querypb.SegmentLoadInfo) ([]Segment, error)
|
||||
|
||||
// LoadDeltaLogs load deltalog and write delta data into provided segment.
|
||||
// it also executes resource protection logic in case of OOM.
|
||||
LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error
|
||||
|
||||
// LoadBloomFilterSet loads needed statslog for RemoteSegment.
|
||||
@ -334,7 +336,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
|
||||
}
|
||||
}
|
||||
}
|
||||
if err = loader.LoadDeltaLogs(ctx, segment, loadInfo.GetDeltalogs()); err != nil {
|
||||
if err = loader.loadDeltalogs(ctx, segment, loadInfo.GetDeltalogs()); err != nil {
|
||||
return errors.Wrap(err, "At LoadDeltaLogs")
|
||||
}
|
||||
|
||||
@ -1158,7 +1160,9 @@ func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int6
|
||||
return nil
|
||||
}
|
||||
|
||||
func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
|
||||
// loadDeltalogs performs the internal actions of `LoadDeltaLogs`
|
||||
// this function does not perform resource check and is meant be used among other load APIs.
|
||||
func (loader *segmentLoader) loadDeltalogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
|
||||
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadDeltalogs-%d", segment.ID()))
|
||||
defer sp.End()
|
||||
log := log.Ctx(ctx).With(
|
||||
@ -1254,6 +1258,24 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadDeltaLogs load deltalog and write delta data into provided segment.
|
||||
// it also executes resource protection logic in case of OOM.
|
||||
func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
|
||||
loadInfo := &querypb.SegmentLoadInfo{
|
||||
SegmentID: segment.ID(),
|
||||
CollectionID: segment.Collection(),
|
||||
Deltalogs: deltaLogs,
|
||||
}
|
||||
// Check memory & storage limit
|
||||
requestResourceResult, err := loader.requestResource(ctx, loadInfo)
|
||||
if err != nil {
|
||||
log.Warn("request resource failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer loader.freeRequest(requestResourceResult.Resource)
|
||||
return loader.loadDeltalogs(ctx, segment, deltaLogs)
|
||||
}
|
||||
|
||||
func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo) error {
|
||||
var needReset bool
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user