mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Fix calculation of memory usage prediction for mmap mode (#26264)
Signed-off-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
parent
9ca8f28005
commit
a173486d2e
@ -30,11 +30,7 @@
|
||||
|
||||
namespace milvus {
|
||||
|
||||
#ifdef MAP_POPULATE
|
||||
static int mmap_flags = MAP_SHARED | MAP_POPULATE;
|
||||
#else
|
||||
static int mmap_flags = MAP_SHARED;
|
||||
#endif
|
||||
|
||||
class ColumnBase {
|
||||
public:
|
||||
|
@ -299,6 +299,13 @@ func (loader *segmentLoader) notifyLoadFinish(segments ...*querypb.SegmentLoadIn
|
||||
// requestResource requests memory & storage to load segments,
|
||||
// returns the memory usage, disk usage and concurrency with the gained memory.
|
||||
func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*querypb.SegmentLoadInfo) (LoadResource, int, error) {
|
||||
segmentIDs := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 {
|
||||
return info.GetSegmentID()
|
||||
})
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64s("segmentIDs", segmentIDs),
|
||||
)
|
||||
|
||||
resource := LoadResource{}
|
||||
|
||||
loader.mut.Lock()
|
||||
@ -350,6 +357,14 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
|
||||
resource.DiskSize += du
|
||||
|
||||
loader.committedResource.Add(resource)
|
||||
log.Info("request resource for loading segments",
|
||||
zap.Int("workerNum", resource.WorkNum),
|
||||
zap.Int("committedWorkerNum", loader.committedResource.WorkNum),
|
||||
zap.Uint64("memory", resource.MemorySize),
|
||||
zap.Uint64("committedMemory", loader.committedResource.MemorySize),
|
||||
zap.Uint64("disk", resource.DiskSize),
|
||||
zap.Uint64("committedDisk", loader.committedResource.DiskSize),
|
||||
)
|
||||
|
||||
return resource, concurrencyLevel, nil
|
||||
}
|
||||
@ -830,6 +845,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
|
||||
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(toMB(uint64(localDiskUsage))))
|
||||
diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize
|
||||
|
||||
mmapEnabled := len(paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()) > 0
|
||||
maxSegmentSize := uint64(0)
|
||||
predictMemUsage := memUsage
|
||||
predictDiskUsage := diskUsage
|
||||
@ -856,10 +872,18 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
|
||||
)
|
||||
return 0, 0, err
|
||||
}
|
||||
predictMemUsage += neededMemSize
|
||||
predictDiskUsage += neededDiskSize
|
||||
if mmapEnabled {
|
||||
predictDiskUsage += neededMemSize + neededDiskSize
|
||||
} else {
|
||||
predictMemUsage += neededMemSize
|
||||
predictDiskUsage += neededDiskSize
|
||||
}
|
||||
} else {
|
||||
predictMemUsage += uint64(getBinlogDataSize(fieldBinlog))
|
||||
if mmapEnabled {
|
||||
predictDiskUsage += uint64(getBinlogDataSize(fieldBinlog))
|
||||
} else {
|
||||
predictMemUsage += uint64(getBinlogDataSize(fieldBinlog))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -878,7 +902,6 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
|
||||
}
|
||||
}
|
||||
|
||||
mmapEnabled := len(paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()) > 0
|
||||
log.Info("predict memory and disk usage while loading (in MiB)",
|
||||
zap.Uint64("maxSegmentSize", toMB(maxSegmentSize)),
|
||||
zap.Int("concurrency", concurrency),
|
||||
|
Loading…
Reference in New Issue
Block a user