// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package datanode import ( "container/heap" "errors" "fmt" "math" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/typeutil" ) // DelBufferManager is in charge of managing insertBuf and delBuf from an overall prospect // not only controlling buffered data size based on every segment size, but also triggering // insert/delete flush when the memory usage of the whole manager reach a certain level. // but at the first stage, this struct is only used for delete buff type DelBufferManager struct { channel Channel delMemorySize int64 delBufHeap *PriorityQueue } func (bm *DelBufferManager) GetSegDelBufMemSize(segID UniqueID) int64 { if delDataBuf, ok := bm.channel.getCurDeleteBuffer(segID); ok { return delDataBuf.item.memorySize } return 0 } func (bm *DelBufferManager) GetEntriesNum(segID UniqueID) int64 { if delDataBuf, ok := bm.channel.getCurDeleteBuffer(segID); ok { return delDataBuf.GetEntriesNum() } return 0 } // Store :the method only for unit test func (bm *DelBufferManager) Store(segID UniqueID, delDataBuf *DelDataBuf) { bm.channel.setCurDeleteBuffer(segID, delDataBuf) } func (bm *DelBufferManager) StoreNewDeletes(segID UniqueID, pks []primaryKey, tss []Timestamp, tr TimeRange, startPos, endPos *internalpb.MsgPosition) { //1. load or create delDataBuf var delDataBuf *DelDataBuf buffer, loaded := bm.channel.getCurDeleteBuffer(segID) if loaded { delDataBuf = buffer } else { delDataBuf = newDelDataBuf() } //2. fill in new delta delData := delDataBuf.delData rowCount := len(pks) var bufSize int64 for i := 0; i < rowCount; i++ { delData.Pks = append(delData.Pks, pks[i]) delData.Tss = append(delData.Tss, tss[i]) switch pks[i].Type() { case schemapb.DataType_Int64: bufSize += 8 case schemapb.DataType_VarChar: varCharPk := pks[i].(*varCharPrimaryKey) bufSize += int64(len(varCharPk.Value)) } //accumulate buf size for timestamp, which is 8 bytes bufSize += 8 } //3. update statistics of del data delDataBuf.accumulateEntriesNum(int64(rowCount)) delDataBuf.updateTimeRange(tr) delDataBuf.updateStartAndEndPosition(startPos, endPos) //4. update and sync memory size with priority queue if !loaded { delDataBuf.item.segmentID = segID delDataBuf.item.memorySize = bufSize heap.Push(bm.delBufHeap, delDataBuf.item) } else { bm.delBufHeap.update(delDataBuf.item, delDataBuf.item.memorySize+bufSize) } bm.channel.setCurDeleteBuffer(segID, delDataBuf) bm.delMemorySize += bufSize //4. sync metrics metrics.DataNodeConsumeMsgRowsCount.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Add(float64(rowCount)) } func (bm *DelBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok bool) { buffer, ok := bm.channel.getCurDeleteBuffer(segID) if ok { return buffer, ok } return nil, false } func (bm *DelBufferManager) Delete(segID UniqueID) { if buf, ok := bm.channel.getCurDeleteBuffer(segID); ok { item := buf.item bm.delMemorySize -= item.memorySize heap.Remove(bm.delBufHeap, item.index) bm.channel.rollDeleteBuffer(segID) } } func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFromSegIDs []UniqueID) { var compactToDelBuff *DelDataBuf compactToDelBuff, loaded := bm.Load(compactedToSegID) if !loaded { compactToDelBuff = newDelDataBuf() } for _, segID := range compactedFromSegIDs { if delDataBuf, loaded := bm.Load(segID); loaded { compactToDelBuff.mergeDelDataBuf(delDataBuf) bm.Delete(segID) } } // only store delBuf if EntriesNum > 0 if compactToDelBuff.EntriesNum > 0 { if loaded { bm.delBufHeap.update(compactToDelBuff.item, compactToDelBuff.item.memorySize) } else { heap.Push(bm.delBufHeap, compactToDelBuff.item) } //note that when compacting segment in del buffer manager //there is no need to modify the general memory size as there is no new //added del into the memory bm.channel.setCurDeleteBuffer(compactedToSegID, compactToDelBuff) } } func (bm *DelBufferManager) ShouldFlushSegments() []UniqueID { var shouldFlushSegments []UniqueID if bm.delMemorySize < Params.DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() { return shouldFlushSegments } mmUsage := bm.delMemorySize var poppedSegMem []*Item for { segMem := heap.Pop(bm.delBufHeap).(*Item) poppedSegMem = append(poppedSegMem, segMem) shouldFlushSegments = append(shouldFlushSegments, segMem.segmentID) log.Debug("add segment for delete buf flush", zap.Int64("segmentID", segMem.segmentID)) mmUsage -= segMem.memorySize if mmUsage < Params.DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() { break } } //here we push all selected segment back into the heap //in order to keep the heap semantically correct for _, segMem := range poppedSegMem { heap.Push(bm.delBufHeap, segMem) } return shouldFlushSegments } // An Item is something we manage in a memorySize priority queue. type Item struct { segmentID UniqueID // The segmentID memorySize int64 // The size of memory consumed by del buf index int // The index of the item in the heap. // The index is needed by update and is maintained by the heap.Interface methods. } // A PriorityQueue implements heap.Interface and holds Items. // We use PriorityQueue to manage memory consumed by del buf type PriorityQueue struct { items []*Item } func (pq *PriorityQueue) Len() int { return len(pq.items) } func (pq *PriorityQueue) Less(i, j int) bool { // We want Pop to give us the highest, not lowest, memorySize so we use greater than here. return pq.items[i].memorySize > pq.items[j].memorySize } func (pq *PriorityQueue) Swap(i, j int) { pq.items[i], pq.items[j] = pq.items[j], pq.items[i] pq.items[i].index = i pq.items[j].index = j } func (pq *PriorityQueue) Push(x any) { n := len(pq.items) item := x.(*Item) item.index = n pq.items = append(pq.items, item) } func (pq *PriorityQueue) Pop() any { old := pq.items n := len(old) item := old[n-1] old[n-1] = nil // avoid memory leak item.index = -1 // for safety pq.items = old[0 : n-1] return item } // update modifies the priority and value of an Item in the queue. func (pq *PriorityQueue) update(item *Item, memorySize int64) { item.memorySize = memorySize heap.Fix(pq, item.index) } // BufferData buffers insert data, monitoring buffer size and limit // size and limit both indicate numOfRows type BufferData struct { buffer *InsertData size int64 limit int64 tsFrom Timestamp tsTo Timestamp startPos *internalpb.MsgPosition endPos *internalpb.MsgPosition } func (bd *BufferData) effectiveCap() int64 { return bd.limit - bd.size } func (bd *BufferData) updateSize(no int64) { bd.size += no } // updateTimeRange update BufferData tsFrom, tsTo range according to input time range func (bd *BufferData) updateTimeRange(tr TimeRange) { if tr.timestampMin < bd.tsFrom { bd.tsFrom = tr.timestampMin } if tr.timestampMax > bd.tsTo { bd.tsTo = tr.timestampMax } } func (bd *BufferData) updateStartAndEndPosition(startPos *internalpb.MsgPosition, endPos *internalpb.MsgPosition) { if bd.startPos == nil || startPos.Timestamp < bd.startPos.Timestamp { bd.startPos = startPos } if bd.endPos == nil || endPos.Timestamp > bd.endPos.Timestamp { bd.endPos = endPos } } // DelDataBuf buffers delete data, monitoring buffer size and limit // size and limit both indicate numOfRows type DelDataBuf struct { datapb.Binlog delData *DeleteData item *Item startPos *internalpb.MsgPosition endPos *internalpb.MsgPosition } func (ddb *DelDataBuf) accumulateEntriesNum(entryNum int64) { ddb.EntriesNum += entryNum } func (ddb *DelDataBuf) updateTimeRange(tr TimeRange) { if tr.timestampMin < ddb.TimestampFrom { ddb.TimestampFrom = tr.timestampMin } if tr.timestampMax > ddb.TimestampTo { ddb.TimestampTo = tr.timestampMax } } func (ddb *DelDataBuf) mergeDelDataBuf(buf *DelDataBuf) { ddb.accumulateEntriesNum(buf.EntriesNum) tr := TimeRange{timestampMax: buf.TimestampTo, timestampMin: buf.TimestampFrom} ddb.updateTimeRange(tr) ddb.updateStartAndEndPosition(buf.startPos, buf.endPos) ddb.delData.Pks = append(ddb.delData.Pks, buf.delData.Pks...) ddb.delData.Tss = append(ddb.delData.Tss, buf.delData.Tss...) ddb.item.memorySize += buf.item.memorySize } func (ddb *DelDataBuf) updateStartAndEndPosition(startPos *internalpb.MsgPosition, endPos *internalpb.MsgPosition) { if ddb.startPos == nil || startPos.Timestamp < ddb.startPos.Timestamp { ddb.startPos = startPos } if ddb.endPos == nil || endPos.Timestamp > ddb.endPos.Timestamp { ddb.endPos = endPos } } // newBufferData needs an input dimension to calculate the limit of this buffer // // `limit` is the segment numOfRows a buffer can buffer at most. // // For a float32 vector field: // // limit = 16 * 2^20 Byte [By default] / (dimension * 4 Byte) // // For a binary vector field: // // limit = 16 * 2^20 Byte [By default]/ (dimension / 8 Byte) // // But since the buffer of binary vector fields is larger than the float32 one // // with the same dimension, newBufferData takes the smaller buffer limit // to fit in both types of vector fields // // * This need to change for string field support and multi-vector fields support. func newBufferData(collSchema *schemapb.CollectionSchema) (*BufferData, error) { // Get Dimension size, err := typeutil.EstimateSizePerRecord(collSchema) if err != nil { log.Warn("failed to estimate size per record", zap.Error(err)) return nil, err } if size == 0 { return nil, errors.New("Invalid schema") } limit := Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64() / int64(size) if Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64()%int64(size) != 0 { limit++ } //TODO::xige-16 eval vec and string field return &BufferData{ buffer: &InsertData{Data: make(map[UniqueID]storage.FieldData)}, size: 0, limit: limit, tsFrom: math.MaxUint64, tsTo: 0}, nil } func newDelDataBuf() *DelDataBuf { return &DelDataBuf{ delData: &DeleteData{}, Binlog: datapb.Binlog{ EntriesNum: 0, TimestampFrom: math.MaxUint64, TimestampTo: 0, }, item: &Item{ memorySize: 0, }, } }