fix: DataNode might OOM by estimating based on MemorySize (#34203)

See also: #34136
pr: #34201

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2024-07-04 15:24:10 +08:00 committed by GitHub
parent dd4dfbcd8d
commit 0f1915ef24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 78 additions and 31 deletions

View File

@ -544,8 +544,6 @@ dataNode:
# if this parameter <= 0, will set it as the maximum number of CPUs that can be executing
# suggest to set it bigger on large collection numbers to avoid blocking
workPoolSize: -1
# specify the size of global work pool for channel checkpoint updating
# if this parameter <= 0, will set it as 10
updateChannelCheckpointMaxParallel: 10
updateChannelCheckpointInterval: 60 # the interval duration(in seconds) for datanode to update channel checkpoint of each channel
updateChannelCheckpointRPCTimeout: 20 # timeout in seconds for UpdateChannelCheckpoint RPC call
@ -557,6 +555,7 @@ dataNode:
readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import.
compaction:
levelZeroBatchMemoryRatio: 0.05 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.
gracefulStopTimeout: 1800 # seconds. force stop node without graceful stop
ip: # if not specified, use the first unicastable address
port: 21124

View File

@ -131,24 +131,19 @@ func (t *LevelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error
}
var (
totalSize int64
totalDeltalogs = make(map[int64][]string)
memorySize int64
totalDeltalogs = []string{}
)
for _, s := range l0Segments {
paths := []string{}
for _, d := range s.GetDeltalogs() {
for _, l := range d.GetBinlogs() {
paths = append(paths, l.GetLogPath())
totalSize += l.GetMemorySize()
totalDeltalogs = append(totalDeltalogs, l.GetLogPath())
memorySize += l.GetMemorySize()
}
}
if len(paths) > 0 {
totalDeltalogs[s.GetSegmentID()] = paths
}
}
batchSize := getMaxBatchSize(totalSize)
resultSegments, err := t.process(ctx, batchSize, targetSegments, lo.Values(totalDeltalogs)...)
resultSegments, err := t.process(ctx, memorySize, targetSegments, totalDeltalogs)
if err != nil {
return nil, err
}
@ -168,15 +163,22 @@ func (t *LevelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error
return result, nil
}
// batch size means segment count
func getMaxBatchSize(totalSize int64) int {
max := 1
memLimit := float64(hardware.GetFreeMemoryCount()) * paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat()
if memLimit > float64(totalSize) {
max = int(memLimit / float64(totalSize))
// BatchSize refers to the L1/L2 segments count that in one batch, batchSize controls the expansion ratio
// of deltadata in memory.
func getMaxBatchSize(baseMemSize, memLimit float64) int {
batchSize := 1
if memLimit > baseMemSize {
batchSize = int(memLimit / baseMemSize)
}
return max
maxSizeLimit := paramtable.Get().DataNodeCfg.L0CompactionMaxBatchSize.GetAsInt()
// Set batch size to maxSizeLimit if it is larger than maxSizeLimit.
// When maxSizeLimit <= 0, it means no limit.
if maxSizeLimit > 0 && batchSize > maxSizeLimit {
return maxSizeLimit
}
return batchSize
}
func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) {
@ -314,18 +316,15 @@ func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deltaDa
return retMap
}
func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
func (t *LevelZeroCompactionTask) process(ctx context.Context, l0MemSize int64, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact process")
defer span.End()
results := make([]*datapb.CompactionSegment, 0)
batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize)))
log := log.Ctx(ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.Int("max conc segment counts", batchSize),
zap.Int("total segment counts", len(targetSegments)),
zap.Int("total batch", batch),
)
ratio := paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat()
memLimit := float64(hardware.GetFreeMemoryCount()) * ratio
if float64(l0MemSize) > memLimit {
return nil, errors.Newf("L0 compaction failed, not enough memory, request memory size: %v, memory limit: %v", l0MemSize, memLimit)
}
log.Info("L0 compaction process start")
allDelta, err := t.loadDelta(ctx, lo.Flatten(deltaLogs))
@ -334,6 +333,16 @@ func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, ta
return nil, err
}
batchSize := getMaxBatchSize(float64(allDelta.Size()), memLimit)
batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize)))
log := log.Ctx(ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.Int("max conc segment counts", batchSize),
zap.Int("total segment counts", len(targetSegments)),
zap.Int("total batch", batch),
)
results := make([]*datapb.CompactionSegment, 0)
for i := 0; i < batch; i++ {
left, right := i*batchSize, (i+1)*batchSize
if right > len(targetSegments) {

View File

@ -79,6 +79,36 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() {
s.dBlob = blob.GetValue()
}
func (s *LevelZeroCompactionTaskSuite) TestGetMaxBatchSize() {
tests := []struct {
baseMem float64
memLimit float64
batchSizeLimit string
expected int
description string
}{
{10, 100, "-1", 10, "no limitation on maxBatchSize"},
{10, 100, "0", 10, "no limitation on maxBatchSize v2"},
{10, 100, "11", 10, "maxBatchSize == 11"},
{10, 100, "1", 1, "maxBatchSize == 1"},
{10, 12, "-1", 1, "no limitation on maxBatchSize"},
{10, 12, "100", 1, "maxBatchSize == 100"},
}
maxSizeK := paramtable.Get().DataNodeCfg.L0CompactionMaxBatchSize.Key
defer paramtable.Get().Reset(maxSizeK)
for _, test := range tests {
s.Run(test.description, func() {
paramtable.Get().Save(maxSizeK, test.batchSizeLimit)
defer paramtable.Get().Reset(maxSizeK)
actual := getMaxBatchSize(test.baseMem, test.memLimit)
s.Equal(test.expected, actual)
})
}
}
func (s *LevelZeroCompactionTaskSuite) TestProcessLoadDeltaFail() {
plan := &datapb.CompactionPlan{
PlanID: 19530,
@ -256,8 +286,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
s.task.cm = cm
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(1)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Twice()
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once()
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2)
s.Require().Equal(plan.GetPlanID(), s.task.GetPlanID())

View File

@ -3726,7 +3726,8 @@ type dataNodeConfig struct {
ReadBufferSizeInMB ParamItem `refreshable:"true"`
// Compaction
L0BatchMemoryRatio ParamItem `refreshable:"true"`
L0BatchMemoryRatio ParamItem `refreshable:"true"`
L0CompactionMaxBatchSize ParamItem `refreshable:"true"`
GracefulStopTimeout ParamItem `refreshable:"true"`
BloomFilterApplyParallelFactor ParamItem `refreshable:"true"`
@ -4034,6 +4035,15 @@ if this parameter <= 0, will set it as 10`,
}
p.L0BatchMemoryRatio.Init(base.mgr)
p.L0CompactionMaxBatchSize = ParamItem{
Key: "dataNode.compaction.levelZeroMaxBatchSize",
Version: "2.4.5",
Doc: "Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.",
DefaultValue: "-1",
Export: true,
}
p.L0CompactionMaxBatchSize.Init(base.mgr)
p.GracefulStopTimeout = ParamItem{
Key: "dataNode.gracefulStopTimeout",
Version: "2.3.7",