enable delbuf auto flush function to avoid OOM when del msg accumulating (#20213)

issue: #19713

Signed-off-by: MrPresent-Han <jamesharden11122@gmail.com>

Signed-off-by: MrPresent-Han <jamesharden11122@gmail.com>
Co-authored-by: MrPresent-Han <zilliz@zillizdeMacBook-Pro-2.local>
This commit is contained in:
MrPresent-Han 2022-11-07 18:49:02 +08:00 committed by GitHub
parent b847c425e1
commit 21b54709a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 403 additions and 99 deletions

View File

@ -283,6 +283,9 @@ dataNode:
flush:
# Max buffer size to flush for a single segment.
insertBufSize: 16777216 # Bytes, 16 MB
# Max buffer size to flush del for a single channel
deleteBufBytes: 67108864 # Bytes, 64MB
# Configures the system log output.
log:

View File

@ -17,13 +17,224 @@
package datanode
import (
"container/heap"
"errors"
"fmt"
"math"
"sync"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
)
// DelBufferManager is in charge of managing insertBuf and delBuf from an overall prospect
// not only controlling buffered data size based on every segment size, but also triggering
// insert/delete flush when the memory usage of the whole manager reach a certain level.
// but at the first stage, this struct is only used for delete buff
type DelBufferManager struct {
delBufMap sync.Map // map[segmentID]*DelDataBuf
delMemorySize int64
delBufHeap *PriorityQueue
}
func (bm *DelBufferManager) GetSegDelBufMemSize(segID UniqueID) int64 {
if delDataBuf, ok := bm.delBufMap.Load(segID); ok {
return delDataBuf.(*DelDataBuf).item.memorySize
}
return 0
}
func (bm *DelBufferManager) GetEntriesNum(segID UniqueID) int64 {
if delDataBuf, ok := bm.delBufMap.Load(segID); ok {
return delDataBuf.(*DelDataBuf).GetEntriesNum()
}
return 0
}
// Store :the method only for unit test
func (bm *DelBufferManager) Store(segID UniqueID, delDataBuf *DelDataBuf) {
bm.delBufMap.Store(segID, delDataBuf)
}
func (bm *DelBufferManager) StoreNewDeletes(segID UniqueID, pks []primaryKey,
tss []Timestamp, tr TimeRange) {
//1. load or create delDataBuf
var delDataBuf *DelDataBuf
value, loaded := bm.delBufMap.Load(segID)
if loaded {
delDataBuf = value.(*DelDataBuf)
} else {
delDataBuf = newDelDataBuf()
}
//2. fill in new delta
delData := delDataBuf.delData
rowCount := len(pks)
var bufSize int64
for i := 0; i < rowCount; i++ {
delData.Pks = append(delData.Pks, pks[i])
delData.Tss = append(delData.Tss, tss[i])
switch pks[i].Type() {
case schemapb.DataType_Int64:
bufSize += 8
case schemapb.DataType_VarChar:
varCharPk := pks[i].(*varCharPrimaryKey)
bufSize += int64(len(varCharPk.Value))
}
//accumulate buf size for timestamp, which is 8 bytes
bufSize += 8
}
//3. update statistics of del data
delDataBuf.accumulateEntriesNum(int64(rowCount))
delDataBuf.updateTimeRange(tr)
//4. update and sync memory size with priority queue
if !loaded {
delDataBuf.item.segmentID = segID
delDataBuf.item.memorySize = bufSize
heap.Push(bm.delBufHeap, delDataBuf.item)
} else {
bm.delBufHeap.update(delDataBuf.item, delDataBuf.item.memorySize+bufSize)
}
bm.delBufMap.Store(segID, delDataBuf)
bm.delMemorySize += bufSize
//4. sync metrics
metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Add(float64(rowCount))
}
func (bm *DelBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok bool) {
value, ok := bm.delBufMap.Load(segID)
if ok {
return value.(*DelDataBuf), ok
}
return nil, ok
}
func (bm *DelBufferManager) Delete(segID UniqueID) {
if buf, ok := bm.delBufMap.Load(segID); ok {
item := buf.(*DelDataBuf).item
bm.delMemorySize -= item.memorySize
heap.Remove(bm.delBufHeap, item.index)
bm.delBufMap.Delete(segID)
}
}
func (bm *DelBufferManager) LoadAndDelete(segID UniqueID) (delDataBuf *DelDataBuf, ok bool) {
if buf, ok := bm.Load(segID); ok {
bm.Delete(segID)
return buf, ok
}
return nil, ok
}
func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFromSegIDs []UniqueID) {
var compactToDelBuff *DelDataBuf
compactToDelBuff, loaded := bm.Load(compactedToSegID)
if !loaded {
compactToDelBuff = newDelDataBuf()
}
for _, segID := range compactedFromSegIDs {
if delDataBuf, loaded := bm.LoadAndDelete(segID); loaded {
compactToDelBuff.mergeDelDataBuf(delDataBuf)
}
}
// only store delBuf if EntriesNum > 0
if compactToDelBuff.EntriesNum > 0 {
if loaded {
bm.delBufHeap.update(compactToDelBuff.item, compactToDelBuff.item.memorySize)
} else {
heap.Push(bm.delBufHeap, compactToDelBuff.item)
}
//note that when compacting segment in del buffer manager
//there is no need to modify the general memory size as there is no new
//added del into the memory
bm.delBufMap.Store(compactedToSegID, compactToDelBuff)
}
}
func (bm *DelBufferManager) ShouldFlushSegments() []UniqueID {
var shouldFlushSegments []UniqueID
if bm.delMemorySize < Params.DataNodeCfg.FlushDeleteBufferBytes {
return shouldFlushSegments
}
mmUsage := bm.delMemorySize
var poppedSegMem []*Item
for {
segMem := heap.Pop(bm.delBufHeap).(*Item)
poppedSegMem = append(poppedSegMem, segMem)
shouldFlushSegments = append(shouldFlushSegments, segMem.segmentID)
log.Debug("add segment for delete buf flush", zap.Int64("segmentID", segMem.segmentID))
mmUsage -= segMem.memorySize
if mmUsage < Params.DataNodeCfg.FlushDeleteBufferBytes {
break
}
}
//here we push all selected segment back into the heap
//in order to keep the heap semantically correct
for _, segMem := range poppedSegMem {
heap.Push(bm.delBufHeap, segMem)
}
return shouldFlushSegments
}
// An Item is something we manage in a memorySize priority queue.
type Item struct {
segmentID UniqueID // The segmentID
memorySize int64 // The size of memory consumed by del buf
index int // The index of the item in the heap.
// The index is needed by update and is maintained by the heap.Interface methods.
}
// A PriorityQueue implements heap.Interface and holds Items.
// We use PriorityQueue to manage memory consumed by del buf
type PriorityQueue struct {
items []*Item
}
func (pq *PriorityQueue) Len() int { return len(pq.items) }
func (pq *PriorityQueue) Less(i, j int) bool {
// We want Pop to give us the highest, not lowest, memorySize so we use greater than here.
return pq.items[i].memorySize > pq.items[j].memorySize
}
func (pq *PriorityQueue) Swap(i, j int) {
pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
pq.items[i].index = i
pq.items[j].index = j
}
func (pq *PriorityQueue) Push(x any) {
n := len(pq.items)
item := x.(*Item)
item.index = n
pq.items = append(pq.items, item)
}
func (pq *PriorityQueue) Pop() any {
old := pq.items
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
pq.items = old[0 : n-1]
return item
}
// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *Item, memorySize int64) {
item.memorySize = memorySize
heap.Fix(pq, item.index)
}
// BufferData buffers insert data, monitoring buffer size and limit
// size and limit both indicate numOfRows
type BufferData struct {
@ -57,10 +268,11 @@ func (bd *BufferData) updateTimeRange(tr TimeRange) {
type DelDataBuf struct {
datapb.Binlog
delData *DeleteData
item *Item
}
func (ddb *DelDataBuf) updateSize(size int64) {
ddb.EntriesNum += size
func (ddb *DelDataBuf) accumulateEntriesNum(entryNum int64) {
ddb.EntriesNum += entryNum
}
func (ddb *DelDataBuf) updateTimeRange(tr TimeRange) {
@ -72,14 +284,15 @@ func (ddb *DelDataBuf) updateTimeRange(tr TimeRange) {
}
}
func (ddb *DelDataBuf) updateFromBuf(buf *DelDataBuf) {
ddb.updateSize(buf.EntriesNum)
func (ddb *DelDataBuf) mergeDelDataBuf(buf *DelDataBuf) {
ddb.accumulateEntriesNum(buf.EntriesNum)
tr := TimeRange{timestampMax: buf.TimestampTo, timestampMin: buf.TimestampFrom}
ddb.updateTimeRange(tr)
ddb.delData.Pks = append(ddb.delData.Pks, buf.delData.Pks...)
ddb.delData.Tss = append(ddb.delData.Tss, buf.delData.Tss...)
ddb.item.memorySize += buf.item.memorySize
}
// newBufferData needs an input dimension to calculate the limit of this buffer
@ -87,14 +300,17 @@ func (ddb *DelDataBuf) updateFromBuf(buf *DelDataBuf) {
// `limit` is the segment numOfRows a buffer can buffer at most.
//
// For a float32 vector field:
// limit = 16 * 2^20 Byte [By default] / (dimension * 4 Byte)
//
// limit = 16 * 2^20 Byte [By default] / (dimension * 4 Byte)
//
// For a binary vector field:
// limit = 16 * 2^20 Byte [By default]/ (dimension / 8 Byte)
//
// limit = 16 * 2^20 Byte [By default]/ (dimension / 8 Byte)
//
// But since the buffer of binary vector fields is larger than the float32 one
// with the same dimension, newBufferData takes the smaller buffer limit
// to fit in both types of vector fields
//
// with the same dimension, newBufferData takes the smaller buffer limit
// to fit in both types of vector fields
//
// * This need to change for string field support and multi-vector fields support.
func newBufferData(dimension int64) (*BufferData, error) {
@ -121,5 +337,8 @@ func newDelDataBuf() *DelDataBuf {
TimestampFrom: math.MaxUint64,
TimestampTo: 0,
},
item: &Item{
memorySize: 0,
},
}
}

View File

@ -129,7 +129,8 @@ func (t *compactionTask) getChannelName() string {
return t.plan.GetChannel()
}
func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[interface{}]Timestamp, *DelDataBuf, error) {
func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (
map[interface{}]Timestamp, *DelDataBuf, error) {
log := log.With(zap.Int64("planID", t.getPlanID()))
mergeStart := time.Now()
dCodec := storage.NewDeleteCodec()
@ -175,7 +176,7 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT
}
}
dbuff.updateSize(dbuff.delData.RowCount)
dbuff.accumulateEntriesNum(dbuff.delData.RowCount)
log.Info("mergeDeltalogs end",
zap.Int("number of pks to compact in insert logs", len(pk2ts)),
zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart))))

View File

@ -836,8 +836,10 @@ func TestCompactorInterfaceMethods(t *testing.T) {
}
type mockFlushManager struct {
sleepSeconds int32
returnError bool
sleepSeconds int32
returnError bool
recordFlushedSeg bool
flushedSegIDs []UniqueID
}
var _ flushManager = (*mockFlushManager)(nil)
@ -853,6 +855,9 @@ func (mfm *mockFlushManager) flushDelData(data *DelDataBuf, segmentID UniqueID,
if mfm.returnError {
return fmt.Errorf("mock error")
}
if mfm.recordFlushedSeg {
mfm.flushedSegIDs = append(mfm.flushedSegIDs, segmentID)
}
return nil
}

View File

@ -22,28 +22,25 @@ import (
"reflect"
"sync"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
)
// DeleteNode is to process delete msg, flush delete info into storage.
type deleteNode struct {
BaseNode
ctx context.Context
channelName string
delBuf sync.Map // map[segmentID]*DelDataBuf
channel Channel
idAllocator allocatorInterface
flushManager flushManager
ctx context.Context
channelName string
delBufferManager *DelBufferManager // manager of delete msg
channel Channel
idAllocator allocatorInterface
flushManager flushManager
clearSignal chan<- string
}
@ -58,12 +55,12 @@ func (dn *deleteNode) Close() {
func (dn *deleteNode) showDelBuf(segIDs []UniqueID, ts Timestamp) {
for _, segID := range segIDs {
if v, ok := dn.delBuf.Load(segID); ok {
delDataBuf, _ := v.(*DelDataBuf)
if _, ok := dn.delBufferManager.Load(segID); ok {
log.Debug("delta buffer status",
zap.Uint64("timestamp", ts),
zap.Int64("segment ID", segID),
zap.Int64("entries", delDataBuf.GetEntriesNum()),
zap.Int64("entries", dn.delBufferManager.GetEntriesNum(segID)),
zap.Int64("memory size", dn.delBufferManager.GetSegDelBufMemSize(segID)),
zap.String("vChannel", dn.channelName))
}
}
@ -103,8 +100,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
var segIDs []UniqueID
for i, msg := range fgMsg.deleteMessages {
traceID, _, _ := trace.InfoFromSpan(spans[i])
log.Info("Buffer delete request in DataNode", zap.String("traceID", traceID))
log.Debug("Buffer delete request in DataNode", zap.String("traceID", traceID))
tmpSegIDs, err := dn.bufferDeleteMsg(msg, fgMsg.timeRange)
if err != nil {
// error occurs only when deleteMsg is misaligned, should not happen
@ -120,19 +116,39 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
dn.showDelBuf(segIDs, fgMsg.timeRange.timestampMax)
}
//here we adopt a quite radical strategy:
//every time we make sure that the N biggest delDataBuf can be flushed
//when memsize usage reaches a certain level
//then we will add all segments in the fgMsg.segmentsToFlush into the toFlushSeg and remove duplicate segments
//the aim for taking all these actions is to guarantee that the memory consumed by delBuf will not exceed a limit
segmentsToFlush := dn.delBufferManager.ShouldFlushSegments()
log.Info("should flush segments, ", zap.Int("seg_count", len(segmentsToFlush)))
for _, msgSegmentID := range fgMsg.segmentsToSync {
existed := false
for _, autoFlushSegment := range segmentsToFlush {
if msgSegmentID == autoFlushSegment {
existed = true
break
}
}
if !existed {
segmentsToFlush = append(segmentsToFlush, msgSegmentID)
}
}
// process flush messages
if len(fgMsg.segmentsToSync) > 0 {
log.Info("DeleteNode receives flush message",
zap.Int64s("segIDs", fgMsg.segmentsToSync),
if len(segmentsToFlush) > 0 {
log.Debug("DeleteNode receives flush message",
zap.Int64s("segIDs", segmentsToFlush),
zap.String("vChannelName", dn.channelName))
for _, segmentToFlush := range fgMsg.segmentsToSync {
buf, ok := dn.delBuf.Load(segmentToFlush)
for _, segmentToFlush := range segmentsToFlush {
buf, ok := dn.delBufferManager.Load(segmentToFlush)
if !ok {
// no related delta data to flush, send empty buf to complete flush life-cycle
dn.flushManager.flushDelData(nil, segmentToFlush, fgMsg.endPositions[0])
} else {
err := retry.Do(dn.ctx, func() error {
return dn.flushManager.flushDelData(buf.(*DelDataBuf), segmentToFlush, fgMsg.endPositions[0])
return dn.flushManager.flushDelData(buf, segmentToFlush, fgMsg.endPositions[0])
}, getFlowGraphRetryOpt())
if err != nil {
err = fmt.Errorf("failed to flush delete data, err = %s", err)
@ -140,7 +156,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
panic(err)
}
// remove delete buf
dn.delBuf.Delete(segmentToFlush)
dn.delBufferManager.Delete(segmentToFlush)
}
}
}
@ -166,30 +182,13 @@ func (dn *deleteNode) updateCompactedSegments() {
// if the compactedTo segment has 0 numRows, remove all segments related
if !dn.channel.hasSegment(compactedTo, true) {
for _, segID := range compactedFrom {
dn.delBuf.Delete(segID)
dn.delBufferManager.Delete(segID)
}
dn.channel.removeSegments(compactedFrom...)
continue
}
var compactToDelBuff *DelDataBuf
delBuf, loaded := dn.delBuf.Load(compactedTo)
if !loaded {
compactToDelBuff = newDelDataBuf()
} else {
compactToDelBuff = delBuf.(*DelDataBuf)
}
for _, segID := range compactedFrom {
if value, loaded := dn.delBuf.LoadAndDelete(segID); loaded {
compactToDelBuff.updateFromBuf(value.(*DelDataBuf))
// only store delBuf if EntriesNum > 0
if compactToDelBuff.EntriesNum > 0 {
dn.delBuf.Store(compactedTo, compactToDelBuff)
}
}
}
dn.delBufferManager.CompactSegBuf(compactedTo, compactedFrom)
log.Info("update delBuf for compacted segments",
zap.Int64("compactedTo segmentID", compactedTo),
zap.Int64s("compactedFrom segmentIDs", compactedFrom),
@ -208,47 +207,21 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) ([
for segID, pks := range segIDToPks {
segIDs = append(segIDs, segID)
rows := len(pks)
tss, ok := segIDToTss[segID]
if !ok || rows != len(tss) {
if !ok || len(pks) != len(tss) {
return nil, fmt.Errorf("primary keys and timestamp's element num mis-match, segmentID = %d", segID)
}
var delDataBuf *DelDataBuf
value, ok := dn.delBuf.Load(segID)
if ok {
delDataBuf = value.(*DelDataBuf)
} else {
delDataBuf = newDelDataBuf()
}
delData := delDataBuf.delData
for i := 0; i < rows; i++ {
delData.Pks = append(delData.Pks, pks[i])
delData.Tss = append(delData.Tss, tss[i])
// this log will influence the data node performance as it may be too many,
// only use it when we focus on delete operations
//log.Debug("delete",
// zap.Any("primary key", pks[i]),
// zap.Uint64("ts", tss[i]),
// zap.Int64("segmentID", segID),
// zap.String("vChannelName", dn.channelName))
}
// store
delDataBuf.updateSize(int64(rows))
metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Add(float64(rows))
delDataBuf.updateTimeRange(tr)
dn.delBuf.Store(segID, delDataBuf)
dn.delBufferManager.StoreNewDeletes(segID, pks, tss, tr)
}
return segIDs, nil
}
// filterSegmentByPK returns the bloom filter check result.
// If the key may exists in the segment, returns it in map.
// If the key not exists in the segment, the segment is filter out.
func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []primaryKey, tss []Timestamp) (map[UniqueID][]primaryKey, map[UniqueID][]uint64) {
// If the key may exist in the segment, returns it in map.
// If the key not exist in the segment, the segment is filter out.
func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []primaryKey, tss []Timestamp) (
map[UniqueID][]primaryKey, map[UniqueID][]uint64) {
segID2Pks := make(map[UniqueID][]primaryKey)
segID2Tss := make(map[UniqueID][]uint64)
segments := dn.channel.filterSegments(partID)
@ -273,8 +246,11 @@ func newDeleteNode(ctx context.Context, fm flushManager, sig chan<- string, conf
return &deleteNode{
ctx: ctx,
BaseNode: baseNode,
delBuf: sync.Map{},
delBufferManager: &DelBufferManager{
delBufMap: sync.Map{},
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
},
channel: config.channel,
idAllocator: config.allocator,
channelName: config.vChannelName,

View File

@ -17,6 +17,7 @@
package datanode
import (
"container/heap"
"context"
"fmt"
"testing"
@ -344,7 +345,9 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
msg := genFlowGraphDeleteMsg(int64Pks, chanName)
msg.segmentsToSync = []UniqueID{-1}
delNode.delBuf.Store(UniqueID(-1), &DelDataBuf{})
delDataBuf := newDelDataBuf()
delNode.delBufferManager.Store(UniqueID(-1), delDataBuf)
heap.Push(delNode.delBufferManager.delBufHeap, delDataBuf.item)
delNode.flushManager = &mockFlushManager{
returnError: true,
}
@ -390,8 +393,13 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
msg.deleteMessages = []*msgstream.DeleteMsg{}
msg.segmentsToSync = []UniqueID{compactedSegment}
delNode.delBuf.Store(compactedSegment, &DelDataBuf{delData: &DeleteData{}})
delNode.flushManager = NewRendezvousFlushManager(&allocator{}, cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
bufItem := &Item{memorySize: 0}
delNode.delBufferManager.Store(compactedSegment,
&DelDataBuf{delData: &DeleteData{}, item: bufItem})
heap.Push(delNode.delBufferManager.delBufHeap, bufItem)
delNode.flushManager = NewRendezvousFlushManager(&allocator{}, cm, channel,
func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
var fgMsg flowgraph.Msg = &msg
setFlowGraphRetryOpt(retry.Attempts(1))
@ -399,11 +407,93 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
delNode.Operate([]flowgraph.Msg{fgMsg})
})
_, ok := delNode.delBuf.Load(100)
_, ok := delNode.delBufferManager.Load(100)
assert.False(t, ok)
_, ok = delNode.delBuf.Load(compactedSegment)
_, ok = delNode.delBufferManager.Load(compactedSegment)
assert.False(t, ok)
})
t.Run("Test deleteNode auto flush function", func(t *testing.T) {
//for issue
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
chanName := "datanode-test-FlowGraphDeletenode-autoflush"
testPath := "/test/datanode/root/meta"
assert.NoError(t, clearEtcd(testPath))
Params.EtcdCfg.MetaRootPath = testPath
c := &nodeConfig{
channel: channel,
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
mockFlushManager := &mockFlushManager{
recordFlushedSeg: true,
}
delNode, err := newDeleteNode(ctx, mockFlushManager, make(chan string, 1), c)
assert.Nil(t, err)
//2. here we set flushing segments inside fgmsg to empty
//in order to verify the validity of auto flush function
msg := genFlowGraphDeleteMsg(int64Pks, chanName)
msg.segmentsToSync = []UniqueID{}
var fgMsg flowgraph.Msg = &msg
//1. here we set buffer bytes to a relatively high level
//and the sum of memory consumption in this case is 208
//so no segments will be flushed
Params.DataNodeCfg.FlushDeleteBufferBytes = 300
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 0, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(208), delNode.delBufferManager.delMemorySize)
assert.Equal(t, 5, delNode.delBufferManager.delBufHeap.Len())
//3. note that the whole memory size used by 5 segments will be 208
//so when setting delete buffer size equal to 200
//there will only be one segment to be flushed then the
//memory consumption will be reduced to 160(under 200)
msg.deleteMessages = []*msgstream.DeleteMsg{}
msg.segmentsToSync = []UniqueID{}
Params.DataNodeCfg.FlushDeleteBufferBytes = 200
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(160), delNode.delBufferManager.delMemorySize)
assert.Equal(t, 4, delNode.delBufferManager.delBufHeap.Len())
//4. there is no new delete msg and delBufferSize is still 200
//we expect there will not be any auto flush del
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(160), delNode.delBufferManager.delMemorySize)
assert.Equal(t, 4, delNode.delBufferManager.delBufHeap.Len())
//5. we reset buffer bytes to 150, then we expect there would be one more
//segment which is 48 in size to be flushed, so the remained del memory size
//will be 112
Params.DataNodeCfg.FlushDeleteBufferBytes = 150
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 2, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(112), delNode.delBufferManager.delMemorySize)
assert.Equal(t, 3, delNode.delBufferManager.delBufHeap.Len())
//6. we reset buffer bytes to 60, then most of the segments will be flushed
//except for the smallest entry with size equaling to 32
Params.DataNodeCfg.FlushDeleteBufferBytes = 60
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 4, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(32), delNode.delBufferManager.delMemorySize)
assert.Equal(t, 1, delNode.delBufferManager.delBufHeap.Len())
//7. we reset buffer bytes to 20, then as all segment-memory consumption
//is more than 20, so all five segments will be flushed and the remained
//del memory will be lowered to zero
Params.DataNodeCfg.FlushDeleteBufferBytes = 20
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 5, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(0), delNode.delBufferManager.delMemorySize)
assert.Equal(t, 0, delNode.delBufferManager.delBufHeap.Len())
})
}
func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) {
@ -439,8 +529,9 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) {
for _, test := range tests {
delBuf := newDelDataBuf()
delBuf.updateSize(test.numRows)
delNode.delBuf.Store(test.seg, delBuf)
delBuf.accumulateEntriesNum(test.numRows)
delNode.delBufferManager.Store(test.seg, delBuf)
heap.Push(delNode.delBufferManager.delBufHeap, delBuf.item)
}
delNode.showDelBuf([]UniqueID{111, 112, 113}, 100)
@ -507,8 +598,9 @@ func TestFlowGraphDeleteNode_updateCompactedSegments(t *testing.T) {
t.Run(test.description, func(t *testing.T) {
for _, seg := range test.segIDsInBuffer {
delBuf := newDelDataBuf()
delBuf.updateSize(100)
delNode.delBuf.Store(seg, delBuf)
delBuf.accumulateEntriesNum(100)
heap.Push(delNode.delBufferManager.delBufHeap, delBuf.item)
delNode.delBufferManager.Store(seg, delBuf)
}
if test.compactToExist {
@ -536,7 +628,7 @@ func TestFlowGraphDeleteNode_updateCompactedSegments(t *testing.T) {
delNode.updateCompactedSegments()
for _, remain := range test.expectedSegsRemain {
_, ok := delNode.delBuf.Load(remain)
_, ok := delNode.delBufferManager.Load(remain)
assert.True(t, ok)
}
})

View File

@ -1280,6 +1280,7 @@ type dataNodeConfig struct {
FlowGraphMaxQueueLength int32
FlowGraphMaxParallelism int32
FlushInsertBufferSize int64
FlushDeleteBufferBytes int64
Alias string // Different datanode in one machine
@ -1298,6 +1299,7 @@ func (p *dataNodeConfig) init(base *BaseTable) {
p.initFlowGraphMaxQueueLength()
p.initFlowGraphMaxParallelism()
p.initFlushInsertBufferSize()
p.initFlushDeleteBufferSize()
p.initIOConcurrency()
p.initChannelWatchPath()
@ -1325,6 +1327,12 @@ func (p *dataNodeConfig) initFlushInsertBufferSize() {
p.FlushInsertBufferSize = bs
}
func (p *dataNodeConfig) initFlushDeleteBufferSize() {
deleteBufBytes := p.Base.ParseInt64WithDefault("datanode.flush.deleteBufBytes",
64*1024*1024)
p.FlushDeleteBufferBytes = deleteBufBytes
}
func (p *dataNodeConfig) initChannelWatchPath() {
p.ChannelWatchSubPath = "channelwatch"
}