mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Add memory usage too large sync policy (#22244)
Signed-off-by: wayblink <anyang.wang@zilliz.com>
This commit is contained in:
parent
8d05874ff9
commit
416866e42f
@ -307,6 +307,10 @@ dataNode:
|
||||
serverMaxRecvSize: 536870912
|
||||
clientMaxSendSize: 268435456
|
||||
clientMaxRecvSize: 268435456
|
||||
memory:
|
||||
forceSyncEnable: true # `true` to force sync if memory usage is too high
|
||||
forceSyncThreshold: 0.6 # forceSync only take effects when memory usage ratio > forceSyncThreshold. note that should set it a small value like 0.2 for standalone deployment
|
||||
forceSyncSegmentRatio: 0.3 # ratio of segments to sync, top largest forceSyncSegmentRatio segments will be synced
|
||||
|
||||
# Configures the system log output.
|
||||
log:
|
||||
|
@ -284,6 +284,14 @@ func (bd *BufferData) updateStartAndEndPosition(startPos *internalpb.MsgPosition
|
||||
}
|
||||
}
|
||||
|
||||
func (bd *BufferData) memorySize() int64 {
|
||||
var size int64
|
||||
for _, field := range bd.buffer.Data {
|
||||
size += int64(field.GetMemorySize())
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
// DelDataBuf buffers delete data, monitoring buffer size and limit
|
||||
// size and limit both indicate numOfRows
|
||||
type DelDataBuf struct {
|
||||
|
@ -69,7 +69,8 @@ type Channel interface {
|
||||
listSegmentIDsToSync(ts Timestamp) []UniqueID
|
||||
setSegmentLastSyncTs(segID UniqueID, ts Timestamp)
|
||||
|
||||
updateStatistics(segID UniqueID, numRows int64)
|
||||
updateSegmentRowNumber(segID UniqueID, numRows int64)
|
||||
updateSegmentMemorySize(segID UniqueID, memorySize int64)
|
||||
InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error
|
||||
RollPKstats(segID UniqueID, stats []*storage.PrimaryKeyStats)
|
||||
getSegmentStatisticsUpdates(segID UniqueID) (*datapb.SegmentStats, error)
|
||||
@ -128,6 +129,7 @@ func newChannel(channelName string, collID UniqueID, schema *schemapb.Collection
|
||||
|
||||
syncPolicies: []segmentSyncPolicy{
|
||||
syncPeriodically(),
|
||||
syncMemoryTooHigh(),
|
||||
},
|
||||
|
||||
metaService: metaService,
|
||||
@ -251,15 +253,24 @@ func (c *ChannelMeta) listSegmentIDsToSync(ts Timestamp) []UniqueID {
|
||||
c.segMu.RLock()
|
||||
defer c.segMu.RUnlock()
|
||||
|
||||
segIDsToSync := make([]UniqueID, 0)
|
||||
for segID, seg := range c.segments {
|
||||
validSegs := make([]*Segment, 0)
|
||||
for _, seg := range c.segments {
|
||||
if !seg.isValid() {
|
||||
continue
|
||||
}
|
||||
for _, policy := range c.syncPolicies {
|
||||
if policy(seg, ts) {
|
||||
validSegs = append(validSegs, seg)
|
||||
}
|
||||
|
||||
segIDsToSync := make([]UniqueID, 0)
|
||||
toSyncSegIDDict := make(map[UniqueID]bool, 0)
|
||||
for _, policy := range c.syncPolicies {
|
||||
toSyncSegments := policy(validSegs, ts)
|
||||
for _, segID := range toSyncSegments {
|
||||
if _, ok := toSyncSegIDDict[segID]; ok {
|
||||
continue
|
||||
} else {
|
||||
toSyncSegIDDict[segID] = true
|
||||
segIDsToSync = append(segIDsToSync, segID)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -465,11 +476,11 @@ func (c *ChannelMeta) hasSegment(segID UniqueID, countFlushed bool) bool {
|
||||
}
|
||||
|
||||
// updateStatistics updates the number of rows of a segment in channel.
|
||||
func (c *ChannelMeta) updateStatistics(segID UniqueID, numRows int64) {
|
||||
func (c *ChannelMeta) updateSegmentRowNumber(segID UniqueID, numRows int64) {
|
||||
c.segMu.Lock()
|
||||
defer c.segMu.Unlock()
|
||||
|
||||
log.Info("updating segment", zap.Int64("Segment ID", segID), zap.Int64("numRows", numRows))
|
||||
log.Info("updating segment num row", zap.Int64("Segment ID", segID), zap.Int64("numRows", numRows))
|
||||
seg, ok := c.segments[segID]
|
||||
if ok && seg.notFlushed() {
|
||||
seg.numRows += numRows
|
||||
@ -479,6 +490,21 @@ func (c *ChannelMeta) updateStatistics(segID UniqueID, numRows int64) {
|
||||
log.Warn("update segment num row not exist", zap.Int64("segID", segID))
|
||||
}
|
||||
|
||||
// updateStatistics updates the number of rows of a segment in channel.
|
||||
func (c *ChannelMeta) updateSegmentMemorySize(segID UniqueID, memorySize int64) {
|
||||
c.segMu.Lock()
|
||||
defer c.segMu.Unlock()
|
||||
|
||||
log.Info("updating segment memorySize", zap.Int64("Segment ID", segID), zap.Int64("memorySize", memorySize))
|
||||
seg, ok := c.segments[segID]
|
||||
if ok && seg.notFlushed() {
|
||||
seg.memorySize = memorySize
|
||||
return
|
||||
}
|
||||
|
||||
log.Warn("update segment memorySize not exist", zap.Int64("segID", segID))
|
||||
}
|
||||
|
||||
// getSegmentStatisticsUpdates gives current segment's statistics updates.
|
||||
func (c *ChannelMeta) getSegmentStatisticsUpdates(segID UniqueID) (*datapb.SegmentStats, error) {
|
||||
c.segMu.RLock()
|
||||
|
@ -146,8 +146,10 @@ func TestChannelMeta_InnerFunction(t *testing.T) {
|
||||
assert.Equal(t, int64(0), seg.numRows)
|
||||
assert.Equal(t, datapb.SegmentType_New, seg.getType())
|
||||
|
||||
channel.updateStatistics(0, 10)
|
||||
channel.updateSegmentRowNumber(0, 10)
|
||||
assert.Equal(t, int64(10), seg.numRows)
|
||||
channel.updateSegmentMemorySize(0, 10)
|
||||
assert.Equal(t, int64(10), seg.memorySize)
|
||||
|
||||
segPos := channel.listNewSegmentsStartPositions()
|
||||
assert.Equal(t, 1, len(segPos))
|
||||
|
@ -179,8 +179,8 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
|
||||
|
||||
ibNode.lastTimestamp = endPositions[0].Timestamp
|
||||
|
||||
// Updating segment statistics in channel
|
||||
seg2Upload, err := ibNode.updateSegmentStates(fgMsg.insertMessages, startPositions[0], endPositions[0])
|
||||
// Add segment in channel if need and updating segment row number
|
||||
seg2Upload, err := ibNode.addSegmentAndUpdateRowNum(fgMsg.insertMessages, startPositions[0], endPositions[0])
|
||||
if err != nil {
|
||||
// Occurs only if the collectionID is mismatch, should not happen
|
||||
err = fmt.Errorf("update segment states in channel meta wrong, err = %s", err)
|
||||
@ -199,6 +199,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
|
||||
}
|
||||
}
|
||||
|
||||
ibNode.updateSegmentsMemorySize(seg2Upload)
|
||||
ibNode.DisplayStatistics(seg2Upload)
|
||||
|
||||
segmentsToSync := ibNode.Sync(fgMsg, seg2Upload, endPositions[0])
|
||||
@ -285,7 +286,17 @@ func (ibNode *insertBufferNode) DisplayStatistics(seg2Upload []UniqueID) {
|
||||
zap.Int64("segmentID", segID),
|
||||
zap.String("channel", ibNode.channelName),
|
||||
zap.Int64("size", bd.size),
|
||||
zap.Int64("limit", bd.limit))
|
||||
zap.Int64("limit", bd.limit),
|
||||
zap.Int64("memorySize", bd.memorySize()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateSegmentsMemorySize updates segments' memory size in channel meta
|
||||
func (ibNode *insertBufferNode) updateSegmentsMemorySize(seg2Upload []UniqueID) {
|
||||
for _, segID := range seg2Upload {
|
||||
if bd, ok := ibNode.channel.getCurInsertBuffer(segID); ok {
|
||||
ibNode.channel.updateSegmentMemorySize(segID, bd.memorySize())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -495,11 +506,11 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
|
||||
return segmentsToSync
|
||||
}
|
||||
|
||||
// updateSegmentStates updates statistics in channel meta for the segments in insertMsgs.
|
||||
// addSegmentAndUpdateRowNum updates row number in channel meta for the segments in insertMsgs.
|
||||
//
|
||||
// If the segment doesn't exist, a new segment will be created.
|
||||
// The segment number of rows will be updated in mem, waiting to be uploaded to DataCoord.
|
||||
func (ibNode *insertBufferNode) updateSegmentStates(insertMsgs []*msgstream.InsertMsg, startPos, endPos *internalpb.MsgPosition) (seg2Upload []UniqueID, err error) {
|
||||
// If the segment doesn't exist, a new segment will be created.
|
||||
// The segment number of rows will be updated in mem, waiting to be uploaded to DataCoord.
|
||||
func (ibNode *insertBufferNode) addSegmentAndUpdateRowNum(insertMsgs []*msgstream.InsertMsg, startPos, endPos *internalpb.MsgPosition) (seg2Upload []UniqueID, err error) {
|
||||
uniqueSeg := make(map[UniqueID]int64)
|
||||
for _, msg := range insertMsgs {
|
||||
|
||||
@ -535,7 +546,7 @@ func (ibNode *insertBufferNode) updateSegmentStates(insertMsgs []*msgstream.Inse
|
||||
seg2Upload = make([]UniqueID, 0, len(uniqueSeg))
|
||||
for id, num := range uniqueSeg {
|
||||
seg2Upload = append(seg2Upload, id)
|
||||
ibNode.channel.updateStatistics(id, num)
|
||||
ibNode.channel.updateSegmentRowNumber(id, num)
|
||||
}
|
||||
|
||||
return
|
||||
|
@ -256,7 +256,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
|
||||
assert.Panics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) })
|
||||
iBNode.lastTimestamp = timestampBak
|
||||
|
||||
// test updateSegmentStates failed
|
||||
// test addSegmentAndUpdateRowNum failed
|
||||
inMsg = genFlowGraphInsertMsg(insertChannelName)
|
||||
inMsg.insertMessages[0].CollectionID = UniqueID(-1)
|
||||
inMsg.insertMessages[0].SegmentID = UniqueID(-1)
|
||||
@ -1054,7 +1054,7 @@ func TestInsertBufferNode_updateSegmentStates(te *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
seg, err := ibNode.updateSegmentStates(im, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
|
||||
seg, err := ibNode.addSegmentAndUpdateRowNum(im, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
|
||||
|
||||
assert.Error(te, err)
|
||||
assert.Empty(te, seg)
|
||||
|
@ -17,25 +17,60 @@
|
||||
package datanode
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/hardware"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// segmentSyncPolicy sync policy applies to segment
|
||||
type segmentSyncPolicy func(segment *Segment, ts Timestamp) bool
|
||||
// segmentsSyncPolicy sync policy applies to segments
|
||||
type segmentSyncPolicy func(segments []*Segment, ts Timestamp) []UniqueID
|
||||
|
||||
// syncPeriodically get segmentSyncPolicy with segment sync periodically.
|
||||
// syncPeriodically get segmentSyncPolicy with segments sync periodically.
|
||||
func syncPeriodically() segmentSyncPolicy {
|
||||
return func(segment *Segment, ts Timestamp) bool {
|
||||
endTime := tsoutil.PhysicalTime(ts)
|
||||
lastSyncTime := tsoutil.PhysicalTime(segment.lastSyncTs)
|
||||
shouldSync := endTime.Sub(lastSyncTime) >= Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) && !segment.isBufferEmpty()
|
||||
if shouldSync {
|
||||
log.Info("sync segment periodically ", zap.Time("now", endTime), zap.Time("last sync", lastSyncTime))
|
||||
return func(segments []*Segment, ts Timestamp) []UniqueID {
|
||||
segsToSync := make([]UniqueID, 0)
|
||||
for _, seg := range segments {
|
||||
endTime := tsoutil.PhysicalTime(ts)
|
||||
lastSyncTime := tsoutil.PhysicalTime(seg.lastSyncTs)
|
||||
shouldSync := endTime.Sub(lastSyncTime) >= Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) && !seg.isBufferEmpty()
|
||||
if shouldSync {
|
||||
segsToSync = append(segsToSync, seg.segmentID)
|
||||
}
|
||||
}
|
||||
return shouldSync
|
||||
if len(segsToSync) > 0 {
|
||||
log.Debug("sync segment periodically",
|
||||
zap.Int64s("segmentID", segsToSync))
|
||||
}
|
||||
return segsToSync
|
||||
}
|
||||
}
|
||||
|
||||
// syncMemoryTooHigh force sync the largest segment.
|
||||
func syncMemoryTooHigh() segmentSyncPolicy {
|
||||
return func(segments []*Segment, ts Timestamp) []UniqueID {
|
||||
if Params.DataNodeCfg.MemoryForceSyncEnable.GetAsBool() &&
|
||||
hardware.GetMemoryUseRatio() >= Params.DataNodeCfg.MemoryForceSyncThreshold.GetAsFloat() &&
|
||||
len(segments) >= 1 {
|
||||
toSyncSegmentNum := int(math.Max(float64(len(segments))*Params.DataNodeCfg.MemoryForceSyncSegmentRatio.GetAsFloat(), 1.0))
|
||||
toSyncSegmentIDs := make([]UniqueID, 0)
|
||||
sort.Slice(segments, func(i, j int) bool {
|
||||
return segments[i].memorySize > segments[j].memorySize
|
||||
})
|
||||
for i := 0; i < toSyncSegmentNum; i++ {
|
||||
toSyncSegmentIDs = append(toSyncSegmentIDs, segments[i].segmentID)
|
||||
}
|
||||
log.Debug("sync segment due to memory usage is too high",
|
||||
zap.Int64s("toSyncSegmentIDs", toSyncSegmentIDs),
|
||||
zap.Int("inputSegmentNum", len(segments)),
|
||||
zap.Int("toSyncSegmentNum", len(toSyncSegmentIDs)),
|
||||
zap.Float64("memoryUsageRatio", hardware.GetMemoryUseRatio()))
|
||||
return toSyncSegmentIDs
|
||||
}
|
||||
return []UniqueID{}
|
||||
}
|
||||
}
|
||||
|
@ -33,12 +33,12 @@ func TestSyncPeriodically(t *testing.T) {
|
||||
lastTs time.Time
|
||||
ts time.Time
|
||||
isBufferEmpty bool
|
||||
shouldSync bool
|
||||
shouldSyncNum int
|
||||
}{
|
||||
{"test buffer empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), true, false},
|
||||
{"test buffer empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) / 2), true, false},
|
||||
{"test buffer not empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), false, true},
|
||||
{"test buffer not empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) / 2), false, false},
|
||||
{"test buffer empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), true, 0},
|
||||
{"test buffer empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) / 2), true, 0},
|
||||
{"test buffer not empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), false, 1},
|
||||
{"test buffer not empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) / 2), false, 0},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
@ -49,8 +49,27 @@ func TestSyncPeriodically(t *testing.T) {
|
||||
if !test.isBufferEmpty {
|
||||
segment.curInsertBuf = &BufferData{}
|
||||
}
|
||||
res := policy(segment, tsoutil.ComposeTSByTime(test.ts, 0))
|
||||
assert.Equal(t, test.shouldSync, res)
|
||||
res := policy([]*Segment{segment}, tsoutil.ComposeTSByTime(test.ts, 0))
|
||||
assert.Equal(t, test.shouldSyncNum, len(res))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncMemoryTooHigh(t *testing.T) {
|
||||
s1 := &Segment{segmentID: 1, memorySize: 1}
|
||||
s2 := &Segment{segmentID: 2, memorySize: 2}
|
||||
s3 := &Segment{segmentID: 3, memorySize: 3}
|
||||
s4 := &Segment{segmentID: 4, memorySize: 4}
|
||||
s5 := &Segment{segmentID: 5, memorySize: 5}
|
||||
|
||||
var baseParams = Params.BaseTable
|
||||
baseParams.Save(Params.DataNodeCfg.MemoryForceSyncEnable.Key, "true")
|
||||
baseParams.Save(Params.DataNodeCfg.MemoryForceSyncThreshold.Key, "0.0")
|
||||
baseParams.Save(Params.DataNodeCfg.MemoryForceSyncSegmentRatio.Key, "0.6")
|
||||
policy := syncMemoryTooHigh()
|
||||
segs := policy([]*Segment{s3, s4, s2, s1, s5}, 0)
|
||||
assert.Equal(t, 3, len(segs))
|
||||
assert.Equal(t, int64(5), segs[0])
|
||||
assert.Equal(t, int64(4), segs[1])
|
||||
assert.Equal(t, int64(3), segs[2])
|
||||
}
|
||||
|
@ -149,3 +149,12 @@ func GetDiskCount() uint64 {
|
||||
func GetDiskUsage() uint64 {
|
||||
return 2 * 1024 * 1024
|
||||
}
|
||||
|
||||
func GetMemoryUseRatio() float64 {
|
||||
usedMemory := GetUsedMemoryCount()
|
||||
totalMemory := GetMemoryCount()
|
||||
if usedMemory > 0 && totalMemory > 0 {
|
||||
return float64(usedMemory) / float64(totalMemory)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
@ -51,3 +51,9 @@ func Test_GetDiskUsage(t *testing.T) {
|
||||
log.Info("TestGetDiskUsage",
|
||||
zap.Uint64("DiskUsage", GetDiskUsage()))
|
||||
}
|
||||
|
||||
func Test_GetMemoryUsageRatio(t *testing.T) {
|
||||
log.Info("TestGetMemoryUsageRatio",
|
||||
zap.Float64("Memory usage ratio", GetMemoryUseRatio()))
|
||||
assert.True(t, GetMemoryUseRatio() > 0)
|
||||
}
|
||||
|
@ -1951,6 +1951,11 @@ type dataNodeConfig struct {
|
||||
|
||||
// io concurrency to fetch stats logs
|
||||
IOConcurrency ParamItem `refreshable:"false"`
|
||||
|
||||
// memory management
|
||||
MemoryForceSyncEnable ParamItem `refreshable:"true"`
|
||||
MemoryForceSyncThreshold ParamItem `refreshable:"true"`
|
||||
MemoryForceSyncSegmentRatio ParamItem `refreshable:"true"`
|
||||
}
|
||||
|
||||
func (p *dataNodeConfig) init(base *BaseTable) {
|
||||
@ -1983,6 +1988,27 @@ func (p *dataNodeConfig) init(base *BaseTable) {
|
||||
}
|
||||
p.FlushInsertBufferSize.Init(base.mgr)
|
||||
|
||||
p.MemoryForceSyncEnable = ParamItem{
|
||||
Key: "datanode.memory.forceSyncEnable",
|
||||
Version: "2.2.4",
|
||||
DefaultValue: "true",
|
||||
}
|
||||
p.MemoryForceSyncEnable.Init(base.mgr)
|
||||
|
||||
p.MemoryForceSyncThreshold = ParamItem{
|
||||
Key: "datanode.memory.forceSyncThreshold",
|
||||
Version: "2.2.4",
|
||||
DefaultValue: "0.6",
|
||||
}
|
||||
p.MemoryForceSyncThreshold.Init(base.mgr)
|
||||
|
||||
p.MemoryForceSyncSegmentRatio = ParamItem{
|
||||
Key: "datanode.memory.forceSyncSegmentRatio",
|
||||
Version: "2.2.4",
|
||||
DefaultValue: "0.3",
|
||||
}
|
||||
p.MemoryForceSyncSegmentRatio.Init(base.mgr)
|
||||
|
||||
p.FlushDeleteBufferBytes = ParamItem{
|
||||
Key: "dataNode.segment.deleteBufBytes",
|
||||
Version: "2.0.0",
|
||||
|
Loading…
Reference in New Issue
Block a user