Add WriteBuffer to provide abstraction for delta policy (#27874)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-11-04 12:10:17 +08:00 committed by GitHub
parent 8011054a2a
commit bf2f62c1e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 2337 additions and 23 deletions

View File

@ -430,6 +430,8 @@ generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage
$(INSTALL_PATH)/mockery --name=Broker --dir=$(PWD)/internal/datanode/broker --output=$(PWD)/internal/datanode/broker/ --filename=mock_broker.go --with-expecter --structname=MockBroker --outpkg=broker --inpackage
$(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/datanode/metacache --output=$(PWD)/internal/datanode/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage
$(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --output=writebuffer --inpackage
generate-mockery-metastore: getdeps
$(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks

View File

@ -19,6 +19,7 @@ package metacache
import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type SegmentFilter func(info *SegmentInfo) bool
@ -29,9 +30,10 @@ func WithPartitionID(partitionID int64) SegmentFilter {
}
}
func WithSegmentID(segmentID int64) SegmentFilter {
func WithSegmentIDs(segmentIDs ...int64) SegmentFilter {
set := typeutil.NewSet[int64](segmentIDs...)
return func(info *SegmentInfo) bool {
return info.segmentID == segmentID
return set.Contain(info.segmentID)
}
}

View File

@ -40,7 +40,7 @@ func (s *SegmentFilterSuite) TestFilters() {
s.True(filter(info))
segmentID := int64(10001)
filter = WithSegmentID(segmentID)
filter = WithSegmentIDs(segmentID)
info.segmentID = segmentID + 1
s.False(filter(info))
info.segmentID = segmentID

View File

@ -23,12 +23,13 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
)
type MetaCache interface {
NewSegment(segmentID, partitionID int64)
NewSegment(segmentID, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction)
UpdateSegments(action SegmentAction, filters ...SegmentFilter)
CompactSegments(newSegmentID, partitionID int64, oldSegmentIDs ...int64)
GetSegmentsBy(filters ...SegmentFilter) []*SegmentInfo
@ -67,17 +68,21 @@ func (c *metaCacheImpl) init(vchannel *datapb.VchannelInfo, factory PkStatsFacto
}
}
func (c *metaCacheImpl) NewSegment(segmentID, partitionID int64) {
func (c *metaCacheImpl) NewSegment(segmentID, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.segmentInfos[segmentID]; !ok {
c.segmentInfos[segmentID] = &SegmentInfo{
info := &SegmentInfo{
segmentID: segmentID,
partitionID: partitionID,
state: commonpb.SegmentState_Growing,
startPosRecorded: false,
}
for _, action := range actions {
action(info)
}
c.segmentInfos[segmentID] = info
}
}

View File

@ -81,7 +81,7 @@ func (s *MetaCacheSuite) SetupTest() {
func (s *MetaCacheSuite) TestNewSegment() {
for i, seg := range s.newSegments {
s.cache.NewSegment(seg, s.partitionIDs[i])
s.cache.NewSegment(seg, s.partitionIDs[i], nil, UpdateNumOfRows(100))
}
for id, partitionID := range s.partitionIDs {
@ -110,8 +110,8 @@ func (s *MetaCacheSuite) TestCompactSegments() {
}
func (s *MetaCacheSuite) TestUpdateSegments() {
s.cache.UpdateSegments(UpdateState(commonpb.SegmentState_Flushed), WithSegmentID(5))
segments := s.cache.GetSegmentsBy(WithSegmentID(5))
s.cache.UpdateSegments(UpdateState(commonpb.SegmentState_Flushed), WithSegmentIDs(5))
segments := s.cache.GetSegmentsBy(WithSegmentIDs(5))
s.Require().Equal(1, len(segments))
segment := segments[0]
s.Equal(commonpb.SegmentState_Flushed, segment.State())

View File

@ -2,7 +2,10 @@
package metacache
import mock "github.com/stretchr/testify/mock"
import (
msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
mock "github.com/stretchr/testify/mock"
)
// MockMetaCache is an autogenerated mock type for the MetaCache type
type MockMetaCache struct {
@ -180,9 +183,16 @@ func (_c *MockMetaCache_GetSegmentsBy_Call) RunAndReturn(run func(...SegmentFilt
return _c
}
// NewSegment provides a mock function with given fields: segmentID, partitionID
func (_m *MockMetaCache) NewSegment(segmentID int64, partitionID int64) {
_m.Called(segmentID, partitionID)
// NewSegment provides a mock function with given fields: segmentID, partitionID, startPos, actions
func (_m *MockMetaCache) NewSegment(segmentID int64, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction) {
_va := make([]interface{}, len(actions))
for _i := range actions {
_va[_i] = actions[_i]
}
var _ca []interface{}
_ca = append(_ca, segmentID, partitionID, startPos)
_ca = append(_ca, _va...)
_m.Called(_ca...)
}
// MockMetaCache_NewSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NewSegment'
@ -193,13 +203,22 @@ type MockMetaCache_NewSegment_Call struct {
// NewSegment is a helper method to define mock.On call
// - segmentID int64
// - partitionID int64
func (_e *MockMetaCache_Expecter) NewSegment(segmentID interface{}, partitionID interface{}) *MockMetaCache_NewSegment_Call {
return &MockMetaCache_NewSegment_Call{Call: _e.mock.On("NewSegment", segmentID, partitionID)}
// - startPos *msgpb.MsgPosition
// - actions ...SegmentAction
func (_e *MockMetaCache_Expecter) NewSegment(segmentID interface{}, partitionID interface{}, startPos interface{}, actions ...interface{}) *MockMetaCache_NewSegment_Call {
return &MockMetaCache_NewSegment_Call{Call: _e.mock.On("NewSegment",
append([]interface{}{segmentID, partitionID, startPos}, actions...)...)}
}
func (_c *MockMetaCache_NewSegment_Call) Run(run func(segmentID int64, partitionID int64)) *MockMetaCache_NewSegment_Call {
func (_c *MockMetaCache_NewSegment_Call) Run(run func(segmentID int64, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction)) *MockMetaCache_NewSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64))
variadicArgs := make([]SegmentAction, len(args)-3)
for i, a := range args[3:] {
if a != nil {
variadicArgs[i] = a.(SegmentAction)
}
}
run(args[0].(int64), args[1].(int64), args[2].(*msgpb.MsgPosition), variadicArgs...)
})
return _c
}
@ -209,7 +228,7 @@ func (_c *MockMetaCache_NewSegment_Call) Return() *MockMetaCache_NewSegment_Call
return _c
}
func (_c *MockMetaCache_NewSegment_Call) RunAndReturn(run func(int64, int64)) *MockMetaCache_NewSegment_Call {
func (_c *MockMetaCache_NewSegment_Call) RunAndReturn(run func(int64, int64, *msgpb.MsgPosition, ...SegmentAction)) *MockMetaCache_NewSegment_Call {
_c.Call.Return(run)
return _c
}

View File

@ -67,6 +67,10 @@ func (s *SegmentInfo) CompactTo() int64 {
return s.compactTo
}
func (s *SegmentInfo) GetBloomFilterSet() *BloomFilterSet {
return s.bfs
}
func (s *SegmentInfo) Clone() *SegmentInfo {
return &SegmentInfo{
segmentID: s.segmentID,

View File

@ -51,7 +51,7 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
deltaInfos[0] = &datapb.FieldBinlog{Binlogs: []*datapb.Binlog{pack.deltaBinlog}}
// only current segment checkpoint info,
segments := pack.metacache.GetSegmentsBy(metacache.WithSegmentID(pack.segmentID))
segments := pack.metacache.GetSegmentsBy(metacache.WithSegmentIDs(pack.segmentID))
if len(segments) == 0 {
return merr.WrapErrSegmentNotFound(pack.segmentID)
}
@ -96,8 +96,8 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
StartPositions: startPos,
Flushed: pack.isFlush,
// Dropped: pack.option.isDrop,
Channel: pack.channelName,
Dropped: pack.isDrop,
Channel: pack.channelName,
}
err := retry.Do(context.Background(), func() error {
err := b.broker.SaveBinlogPaths(context.Background(), req)

View File

@ -0,0 +1,145 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package syncmgr
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// MockSyncManager is an autogenerated mock type for the SyncManager type
type MockSyncManager struct {
mock.Mock
}
type MockSyncManager_Expecter struct {
mock *mock.Mock
}
func (_m *MockSyncManager) EXPECT() *MockSyncManager_Expecter {
return &MockSyncManager_Expecter{mock: &_m.Mock}
}
// Block provides a mock function with given fields: segmentID
func (_m *MockSyncManager) Block(segmentID int64) {
_m.Called(segmentID)
}
// MockSyncManager_Block_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Block'
type MockSyncManager_Block_Call struct {
*mock.Call
}
// Block is a helper method to define mock.On call
// - segmentID int64
func (_e *MockSyncManager_Expecter) Block(segmentID interface{}) *MockSyncManager_Block_Call {
return &MockSyncManager_Block_Call{Call: _e.mock.On("Block", segmentID)}
}
func (_c *MockSyncManager_Block_Call) Run(run func(segmentID int64)) *MockSyncManager_Block_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockSyncManager_Block_Call) Return() *MockSyncManager_Block_Call {
_c.Call.Return()
return _c
}
func (_c *MockSyncManager_Block_Call) RunAndReturn(run func(int64)) *MockSyncManager_Block_Call {
_c.Call.Return(run)
return _c
}
// SyncData provides a mock function with given fields: ctx, task
func (_m *MockSyncManager) SyncData(ctx context.Context, task *SyncTask) error {
ret := _m.Called(ctx, task)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *SyncTask) error); ok {
r0 = rf(ctx, task)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockSyncManager_SyncData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncData'
type MockSyncManager_SyncData_Call struct {
*mock.Call
}
// SyncData is a helper method to define mock.On call
// - ctx context.Context
// - task *SyncTask
func (_e *MockSyncManager_Expecter) SyncData(ctx interface{}, task interface{}) *MockSyncManager_SyncData_Call {
return &MockSyncManager_SyncData_Call{Call: _e.mock.On("SyncData", ctx, task)}
}
func (_c *MockSyncManager_SyncData_Call) Run(run func(ctx context.Context, task *SyncTask)) *MockSyncManager_SyncData_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*SyncTask))
})
return _c
}
func (_c *MockSyncManager_SyncData_Call) Return(_a0 error) *MockSyncManager_SyncData_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, *SyncTask) error) *MockSyncManager_SyncData_Call {
_c.Call.Return(run)
return _c
}
// Unblock provides a mock function with given fields: segmentID
func (_m *MockSyncManager) Unblock(segmentID int64) {
_m.Called(segmentID)
}
// MockSyncManager_Unblock_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Unblock'
type MockSyncManager_Unblock_Call struct {
*mock.Call
}
// Unblock is a helper method to define mock.On call
// - segmentID int64
func (_e *MockSyncManager_Expecter) Unblock(segmentID interface{}) *MockSyncManager_Unblock_Call {
return &MockSyncManager_Unblock_Call{Call: _e.mock.On("Unblock", segmentID)}
}
func (_c *MockSyncManager_Unblock_Call) Run(run func(segmentID int64)) *MockSyncManager_Unblock_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockSyncManager_Unblock_Call) Return() *MockSyncManager_Unblock_Call {
_c.Call.Return()
return _c
}
func (_c *MockSyncManager_Unblock_Call) RunAndReturn(run func(int64)) *MockSyncManager_Unblock_Call {
_c.Call.Return(run)
return _c
}
// NewMockSyncManager creates a new instance of MockSyncManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockSyncManager(t interface {
mock.TestingT
Cleanup(func())
}) *MockSyncManager {
mock := &MockSyncManager{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -80,6 +80,11 @@ func (t *SyncTask) WithFlush() *SyncTask {
return t
}
func (t *SyncTask) WithDrop() *SyncTask {
t.isDrop = true
return t
}
func (t *SyncTask) WithMetaCache(metacache metacache.MetaCache) *SyncTask {
t.metacache = metacache
return t

View File

@ -41,6 +41,7 @@ type SyncTask struct {
tsTo typeutil.Timestamp
isFlush bool
isDrop bool
metacache metacache.MetaCache
metaWriter MetaWriter
@ -75,7 +76,7 @@ func (t *SyncTask) Run() error {
log := t.getLogger()
var err error
infos := t.metacache.GetSegmentsBy(metacache.WithSegmentID(t.segmentID))
infos := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID))
if len(infos) == 0 {
log.Warn("failed to sync data, segment not found in metacache")
t.handleError(err)
@ -245,7 +246,7 @@ func (t *SyncTask) serializeSinglePkStats(fieldID int64, stats *storage.PrimaryK
}
func (t *SyncTask) serializeMergedPkStats(fieldID int64, stats *storage.PrimaryKeyStats, rowNum int64) error {
segments := t.metacache.GetSegmentsBy(metacache.WithSegmentID(t.segmentID))
segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID))
var statsList []*storage.PrimaryKeyStats
var oldRowNum int64
for _, segment := range segments {

View File

@ -201,6 +201,7 @@ func (s *SyncTaskSuite) TestRunNormal() {
task := s.getSuiteSyncTask()
task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer())
task.WithFlush()
task.WithDrop()
task.WithMetaWriter(BrokerMetaWriter(s.broker))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,

View File

@ -0,0 +1,73 @@
package writebuffer
import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type bfWriteBuffer struct {
*writeBufferBase
syncMgr syncmgr.SyncManager
metacache metacache.MetaCache
}
func NewBFWriteBuffer(sch *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) {
return &bfWriteBuffer{
writeBufferBase: newWriteBufferBase(sch, metacache, syncMgr, option),
syncMgr: syncMgr,
}, nil
}
func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
wb.mut.Lock()
defer wb.mut.Unlock()
// process insert msgs
pkData, err := wb.bufferInsert(insertMsgs, startPos, endPos)
if err != nil {
return err
}
// update pk oracle
for segmentID, dataList := range pkData {
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID))
for _, segment := range segments {
for _, fieldData := range dataList {
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
if err != nil {
return err
}
}
}
}
// distribute delete msg
for _, delMsg := range deleteMsgs {
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys())
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID))
for _, segment := range segments {
var deletePks []storage.PrimaryKey
var deleteTss []typeutil.Timestamp
for idx, pk := range pks {
if segment.GetBloomFilterSet().PkExists(pk) {
deletePks = append(deletePks, pk)
deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx])
}
}
if len(deletePks) > 0 {
wb.bufferDelete(segment.SegmentID(), deletePks, deleteTss, startPos, endPos)
}
}
}
// update buffer last checkpoint
wb.checkpoint = endPos
return wb.triggerAutoSync()
}

View File

@ -0,0 +1,182 @@
package writebuffer
import (
"math/rand"
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type BFWriteBufferSuite struct {
suite.Suite
collSchema *schemapb.CollectionSchema
syncMgr *syncmgr.MockSyncManager
metacache *metacache.MockMetaCache
broker *broker.MockBroker
}
func (s *BFWriteBufferSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collSchema = &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
}
func (s *BFWriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int) ([]int64, *msgstream.InsertMsg) {
tss := lo.RepeatBy(rowCount, func(idx int) int64 { return int64(tsoutil.ComposeTSByTime(time.Now(), int64(idx))) })
vectors := lo.RepeatBy(rowCount, func(_ int) []float32 {
return lo.RepeatBy(dim, func(_ int) float32 { return rand.Float32() })
})
flatten := lo.Flatten(vectors)
return tss, &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
SegmentID: segmentID,
Version: msgpb.InsertDataVersion_ColumnBased,
RowIDs: tss,
Timestamps: lo.Map(tss, func(id int64, _ int) uint64 { return uint64(id) }),
FieldsData: []*schemapb.FieldData{
{
FieldId: common.RowIDField, FieldName: common.RowIDFieldName, Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
},
{
FieldId: common.TimeStampField, FieldName: common.TimeStampFieldName, Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
},
{
FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
},
{
FieldId: common.StartOfUserFieldID + 1, FieldName: "vector", Type: schemapb.DataType_FloatVector,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: flatten,
},
},
},
},
},
},
},
}
}
func (s *BFWriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstream.DeleteMsg {
delMsg := &msgstream.DeleteMsg{
DeleteRequest: msgpb.DeleteRequest{
PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks),
Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) }),
},
}
return delMsg
}
func (s *BFWriteBufferSuite) SetupTest() {
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
s.broker = broker.NewMockBroker(s.T())
}
func (s *BFWriteBufferSuite) TestBufferData() {
wb, err := NewBFWriteBuffer(s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
pks, msg := s.composeInsertMsg(1000, 10, 128)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
}
func (s *BFWriteBufferSuite) TestAutoSync() {
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1")
wb, err := NewBFWriteBuffer(s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{
syncPolicies: []SyncPolicy{
SyncFullBuffer,
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetFlushingSegmentsPolicy(s.metacache),
},
})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002}) // mock flushing
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil).Twice()
pks, msg := s.composeInsertMsg(1000, 10, 128)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
}
func TestBFWriteBuffer(t *testing.T) {
suite.Run(t, new(BFWriteBufferSuite))
}

View File

@ -0,0 +1,80 @@
package writebuffer
import (
"math"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type DeltaBuffer struct {
BufferBase
buffer *storage.DeleteData
}
func NewDeltaBuffer() *DeltaBuffer {
return &DeltaBuffer{
BufferBase: BufferBase{
rowLimit: noLimit,
sizeLimit: paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64(),
TimestampFrom: math.MaxUint64,
TimestampTo: 0,
},
buffer: &storage.DeleteData{},
}
}
func (ib *DeltaBuffer) getTimestampRange(tss []typeutil.Timestamp) TimeRange {
tr := TimeRange{
timestampMin: math.MaxUint64,
timestampMax: 0,
}
for _, data := range tss {
if data < tr.timestampMin {
tr.timestampMin = data
}
if data > tr.timestampMax {
tr.timestampMax = data
}
}
return tr
}
func (db *DeltaBuffer) Renew() *storage.DeleteData {
if db.IsEmpty() {
return nil
}
result := db.buffer
db.BufferBase.rows = 0
db.BufferBase.TimestampFrom = math.MaxUint64
db.BufferBase.TimestampTo = 0
return result
}
func (db *DeltaBuffer) Buffer(pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) (bufSize int64) {
rowCount := len(pks)
for i := 0; i < rowCount; i++ {
db.buffer.Append(pks[i], tss[i])
switch pks[i].Type() {
case schemapb.DataType_Int64:
bufSize += 8
case schemapb.DataType_VarChar:
varCharPk := pks[i].(*storage.VarCharPrimaryKey)
bufSize += int64(len(varCharPk.Value))
}
// accumulate buf size for timestamp, which is 8 bytes
bufSize += 8
}
db.UpdateStatistics(int64(rowCount), bufSize, db.getTimestampRange(tss), startPos, endPos)
return bufSize
}

View File

@ -0,0 +1,66 @@
package writebuffer
import (
"fmt"
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type DeltaBufferSuite struct {
suite.Suite
}
func (s *DeltaBufferSuite) TestBuffer() {
s.Run("int64_pk", func() {
deltaBuffer := NewDeltaBuffer()
tss := lo.RepeatBy(100, func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) })
pks := lo.Map(tss, func(ts uint64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(int64(ts)) })
memSize := deltaBuffer.Buffer(pks, tss, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.EqualValues(100*8*2, memSize)
})
s.Run("string_pk", func() {
deltaBuffer := NewDeltaBuffer()
tss := lo.RepeatBy(100, func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) })
pks := lo.Map(tss, func(ts uint64, idx int) storage.PrimaryKey {
return storage.NewVarCharPrimaryKey(fmt.Sprintf("%03d", idx))
})
memSize := deltaBuffer.Buffer(pks, tss, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.EqualValues(100*8+100*3, memSize)
})
}
func (s *DeltaBufferSuite) TestRenew() {
deltaBuffer := NewDeltaBuffer()
result := deltaBuffer.Renew()
s.Nil(result)
s.True(deltaBuffer.IsEmpty())
tss := lo.RepeatBy(100, func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) })
pks := lo.Map(tss, func(ts uint64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(int64(ts)) })
deltaBuffer.Buffer(pks, tss, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
result = deltaBuffer.Renew()
s.NotNil(result)
s.True(deltaBuffer.IsEmpty())
s.ElementsMatch(tss, result.Tss)
s.ElementsMatch(pks, result.Pks)
}
func TestDeltaBuffer(t *testing.T) {
suite.Run(t, new(DeltaBufferSuite))
}

View File

@ -0,0 +1,171 @@
package writebuffer
import (
"math"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
noLimit int64 = -1
)
type BufferBase struct {
rows int64
rowLimit int64
size int64
sizeLimit int64
TimestampFrom typeutil.Timestamp
TimestampTo typeutil.Timestamp
startPos *msgpb.MsgPosition
endPos *msgpb.MsgPosition
}
func (b *BufferBase) UpdateStatistics(entryNum, size int64, tr TimeRange, startPos, endPos *msgpb.MsgPosition) {
b.rows += entryNum
b.size += size
if tr.timestampMin < b.TimestampFrom {
b.TimestampFrom = tr.timestampMin
}
if tr.timestampMax > b.TimestampTo {
b.TimestampTo = tr.timestampMax
}
if b.startPos == nil || startPos.Timestamp < b.startPos.Timestamp {
b.startPos = startPos
}
if b.endPos == nil || endPos.Timestamp > b.endPos.Timestamp {
b.endPos = endPos
}
}
func (b *BufferBase) IsFull() bool {
return (b.rowLimit != noLimit && b.rows >= b.rowLimit) ||
(b.sizeLimit != noLimit && b.size >= b.sizeLimit)
}
func (b *BufferBase) IsEmpty() bool {
return b.rows == 0
}
func (b *BufferBase) MinTimestamp() typeutil.Timestamp {
if b.startPos == nil {
return math.MaxUint64
}
return b.startPos.GetTimestamp()
}
type InsertBuffer struct {
BufferBase
collSchema *schemapb.CollectionSchema
buffer *storage.InsertData
}
func NewInsertBuffer(sch *schemapb.CollectionSchema) (*InsertBuffer, error) {
size, err := typeutil.EstimateSizePerRecord(sch)
if err != nil {
log.Warn("failed to estimate size per record", zap.Error(err))
return nil, err
}
if size == 0 {
return nil, errors.New("Invalid schema")
}
buffer, err := storage.NewInsertData(sch)
if err != nil {
return nil, err
}
limit := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt64() / int64(size)
if paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt64()%int64(size) != 0 {
limit++
}
return &InsertBuffer{
BufferBase: BufferBase{
rowLimit: limit,
sizeLimit: noLimit,
TimestampFrom: math.MaxUint64,
TimestampTo: 0,
},
collSchema: sch,
buffer: buffer,
}, nil
}
func (ib *InsertBuffer) Renew() *storage.InsertData {
if ib.IsEmpty() {
return nil
}
result := ib.buffer
// no error since validated in constructor
ib.buffer, _ = storage.NewInsertData(ib.collSchema)
ib.BufferBase.rows = 0
ib.BufferBase.TimestampFrom = math.MaxUint64
ib.BufferBase.TimestampTo = 0
return result
}
func (ib *InsertBuffer) Buffer(msgs []*msgstream.InsertMsg, startPos, endPos *msgpb.MsgPosition) ([]storage.FieldData, error) {
pkData := make([]storage.FieldData, 0, len(msgs))
for _, msg := range msgs {
tmpBuffer, err := storage.InsertMsgToInsertData(msg, ib.collSchema)
if err != nil {
log.Warn("failed to transfer insert msg to insert data", zap.Error(err))
return nil, err
}
pkFieldData, err := storage.GetPkFromInsertData(ib.collSchema, tmpBuffer)
if err != nil {
return nil, err
}
if pkFieldData.RowNum() != tmpBuffer.GetRowNum() {
return nil, merr.WrapErrServiceInternal("pk column row num not match")
}
pkData = append(pkData, pkFieldData)
storage.MergeInsertData(ib.buffer, tmpBuffer)
tsData, err := storage.GetTimestampFromInsertData(tmpBuffer)
if err != nil {
log.Warn("no timestamp field found in insert msg", zap.Error(err))
return nil, err
}
// update buffer size
ib.UpdateStatistics(int64(tmpBuffer.GetRowNum()), 0, ib.getTimestampRange(tsData), startPos, endPos)
}
return pkData, nil
}
func (ib *InsertBuffer) getTimestampRange(tsData *storage.Int64FieldData) TimeRange {
tr := TimeRange{
timestampMin: math.MaxUint64,
timestampMax: 0,
}
for _, data := range tsData.Data {
if uint64(data) < tr.timestampMin {
tr.timestampMin = typeutil.Timestamp(data)
}
if uint64(data) > tr.timestampMax {
tr.timestampMax = typeutil.Timestamp(data)
}
}
return tr
}

View File

@ -0,0 +1,294 @@
package writebuffer
import (
"math/rand"
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type InsertBufferSuite struct {
suite.Suite
collSchema *schemapb.CollectionSchema
}
func (s *InsertBufferSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collSchema = &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
}
func (s *InsertBufferSuite) composeInsertMsg(rowCount int, dim int) ([]int64, *msgstream.InsertMsg) {
tss := lo.RepeatBy(rowCount, func(idx int) int64 { return int64(tsoutil.ComposeTSByTime(time.Now(), int64(idx))) })
vectors := lo.RepeatBy(rowCount, func(_ int) []float32 {
return lo.RepeatBy(dim, func(_ int) float32 { return rand.Float32() })
})
flatten := lo.Flatten(vectors)
return tss, &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
Version: msgpb.InsertDataVersion_ColumnBased,
RowIDs: tss,
Timestamps: lo.Map(tss, func(id int64, _ int) uint64 { return uint64(id) }),
FieldsData: []*schemapb.FieldData{
{
FieldId: common.RowIDField, FieldName: common.RowIDFieldName, Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
},
{
FieldId: common.TimeStampField, FieldName: common.TimeStampFieldName, Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
},
{
FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
},
{
FieldId: common.StartOfUserFieldID + 1, FieldName: "vector", Type: schemapb.DataType_FloatVector,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: flatten,
},
},
},
},
},
},
},
}
}
func (s *InsertBufferSuite) TestBasic() {
s.Run("normal_insertbuffer", func() {
insertBuffer, err := NewInsertBuffer(s.collSchema)
s.Require().NoError(err)
s.True(insertBuffer.IsEmpty())
s.False(insertBuffer.IsFull())
insertBuffer.rows = insertBuffer.rowLimit + 1
s.True(insertBuffer.IsFull())
s.False(insertBuffer.IsEmpty())
})
}
func (s *InsertBufferSuite) TestBuffer() {
s.Run("normal_buffer", func() {
pks, insertMsg := s.composeInsertMsg(10, 128)
insertBuffer, err := NewInsertBuffer(s.collSchema)
s.Require().NoError(err)
fieldData, err := insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
pkData := lo.Map(fieldData, func(fd storage.FieldData, _ int) []int64 {
return lo.RepeatBy(fd.RowNum(), func(idx int) int64 { return fd.GetRow(idx).(int64) })
})
s.ElementsMatch(pks, lo.Flatten(pkData))
s.EqualValues(100, insertBuffer.MinTimestamp())
})
s.Run("pk_not_found", func() {
_, insertMsg := s.composeInsertMsg(10, 128)
insertMsg.FieldsData = []*schemapb.FieldData{insertMsg.FieldsData[0], insertMsg.FieldsData[1], insertMsg.FieldsData[3]}
insertBuffer, err := NewInsertBuffer(s.collSchema)
s.Require().NoError(err)
_, err = insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.Error(err)
})
s.Run("schema_without_pk", func() {
badSchema := &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
_, insertMsg := s.composeInsertMsg(10, 128)
insertBuffer, err := NewInsertBuffer(badSchema)
s.Require().NoError(err)
_, err = insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.Error(err)
})
}
func (s *InsertBufferSuite) TestRenew() {
insertBuffer, err := NewInsertBuffer(s.collSchema)
s.Require().NoError(err)
result := insertBuffer.Renew()
s.Nil(result)
s.True(insertBuffer.IsEmpty())
pks, insertMsg := s.composeInsertMsg(10, 128)
_, err = insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.Require().NoError(err)
result = insertBuffer.Renew()
s.NotNil(result)
s.True(insertBuffer.IsEmpty())
pkField, ok := result.Data[common.StartOfUserFieldID]
s.Require().True(ok)
pkData := lo.RepeatBy(pkField.RowNum(), func(idx int) int64 { return pkField.GetRow(idx).(int64) })
s.ElementsMatch(pks, pkData)
}
type InsertBufferConstructSuite struct {
suite.Suite
schema *schemapb.CollectionSchema
}
func (*InsertBufferConstructSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
}
func (s *InsertBufferConstructSuite) TestCreateSuccess() {
schema := &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
insertBuffer, err := NewInsertBuffer(schema)
s.NoError(err)
s.NotNil(insertBuffer)
}
func (s *InsertBufferConstructSuite) TestCreateFailure() {
type testCase struct {
schema *schemapb.CollectionSchema
tag string
}
cases := []testCase{
{
tag: "undefined_datatype",
schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: 101, Name: "vector", DataType: -1},
},
},
},
{
tag: "mssing_maxlength",
schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: 101, Name: "string", DataType: schemapb.DataType_VarChar},
},
},
},
{
tag: "empty_schema",
schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{},
},
},
{
tag: "missing_type_param",
schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
},
},
},
},
}
for _, tc := range cases {
s.Run(tc.tag, func() {
_, err := NewInsertBuffer(tc.schema)
s.Error(err)
})
}
}
func TestInsertBuffer(t *testing.T) {
suite.Run(t, new(InsertBufferSuite))
suite.Run(t, new(InsertBufferConstructSuite))
}

View File

@ -0,0 +1,83 @@
package writebuffer
import (
"context"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/retry"
)
type l0WriteBuffer struct {
*writeBufferBase
l0Segments map[int64]int64 // partitionID => l0 segment ID
syncMgr syncmgr.SyncManager
idAllocator allocator.Interface
}
func NewL0WriteBuffer(sch *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) {
if option.idAllocator == nil {
return nil, merr.WrapErrServiceInternal("id allocator is nil when creating l0 write buffer")
}
return &l0WriteBuffer{
l0Segments: make(map[int64]int64),
writeBufferBase: newWriteBufferBase(sch, metacache, syncMgr, option),
syncMgr: syncMgr,
idAllocator: option.idAllocator,
}, nil
}
func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
wb.mut.Lock()
defer wb.mut.Unlock()
// process insert msgs
_, err := wb.bufferInsert(insertMsgs, startPos, endPos)
if err != nil {
log.Warn("failed to buffer insert data", zap.Error(err))
return err
}
for _, msg := range deleteMsgs {
l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID())
pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys())
err := wb.bufferDelete(l0SegmentID, pks, msg.GetTimestamps(), startPos, endPos)
if err != nil {
log.Warn("failed to buffer delete data", zap.Error(err))
return err
}
}
// update buffer last checkpoint
wb.checkpoint = endPos
return wb.triggerAutoSync()
}
func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64) int64 {
segmentID, ok := wb.l0Segments[partitionID]
if !ok {
err := retry.Do(context.Background(), func() error {
var err error
segmentID, err = wb.idAllocator.AllocOne()
return err
})
if err != nil {
log.Error("failed to allocate l0 segment ID", zap.Error(err))
panic(err)
}
wb.l0Segments[partitionID] = segmentID
}
return segmentID
}

View File

@ -0,0 +1,158 @@
package writebuffer
import (
"math/rand"
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type L0WriteBufferSuite struct {
suite.Suite
collSchema *schemapb.CollectionSchema
syncMgr *syncmgr.MockSyncManager
metacache *metacache.MockMetaCache
allocator *allocator.MockGIDAllocator
}
func (s *L0WriteBufferSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collSchema = &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
}
func (s *L0WriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int) ([]int64, *msgstream.InsertMsg) {
tss := lo.RepeatBy(rowCount, func(idx int) int64 { return int64(tsoutil.ComposeTSByTime(time.Now(), int64(idx))) })
vectors := lo.RepeatBy(rowCount, func(_ int) []float32 {
return lo.RepeatBy(dim, func(_ int) float32 { return rand.Float32() })
})
flatten := lo.Flatten(vectors)
return tss, &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
SegmentID: segmentID,
Version: msgpb.InsertDataVersion_ColumnBased,
RowIDs: tss,
Timestamps: lo.Map(tss, func(id int64, _ int) uint64 { return uint64(id) }),
FieldsData: []*schemapb.FieldData{
{
FieldId: common.RowIDField, FieldName: common.RowIDFieldName, Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
},
{
FieldId: common.TimeStampField, FieldName: common.TimeStampFieldName, Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
},
{
FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
},
{
FieldId: common.StartOfUserFieldID + 1, FieldName: "vector", Type: schemapb.DataType_FloatVector,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: flatten,
},
},
},
},
},
},
},
}
}
func (s *L0WriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstream.DeleteMsg {
delMsg := &msgstream.DeleteMsg{
DeleteRequest: msgpb.DeleteRequest{
PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks),
Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) }),
},
}
return delMsg
}
func (s *L0WriteBufferSuite) SetupTest() {
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
s.allocator = allocator.NewMockGIDAllocator()
s.allocator.AllocOneF = func() (int64, error) { return int64(tsoutil.ComposeTSByTime(time.Now(), 0)), nil }
}
func (s *L0WriteBufferSuite) TestBufferData() {
wb, err := NewL0WriteBuffer(s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{
idAllocator: s.allocator,
})
s.NoError(err)
// seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
// s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
pks, msg := s.composeInsertMsg(1000, 10, 128)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
}
func TestL0WriteBuffer(t *testing.T) {
suite.Run(t, new(L0WriteBufferSuite))
}

View File

@ -0,0 +1,105 @@
package writebuffer
import (
"context"
"sync"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
)
// Manager is the interface for WriteBuffer management.
type Manager interface {
// Register adds a WriteBuffer with provided schema & options.
Register(channel string, schema *schemapb.CollectionSchema, metacache metacache.MetaCache, opts ...WriteBufferOption) error
// FlushSegments notifies writeBuffer corresponding to provided channel to flush segments.
FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error
// RemoveChannel removes a write buffer from manager.
RemoveChannel(channel string)
// BufferData put data into channel write buffer.
BufferData(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error
}
// NewManager returns initialized manager as `Manager`
func NewManager(syncMgr syncmgr.SyncManager) Manager {
return &manager{
syncMgr: syncMgr,
buffers: make(map[string]WriteBuffer),
}
}
type manager struct {
syncMgr syncmgr.SyncManager
buffers map[string]WriteBuffer
mut sync.RWMutex
}
// Register a new WriteBuffer for channel.
func (m *manager) Register(channel string, schema *schemapb.CollectionSchema, metacache metacache.MetaCache, opts ...WriteBufferOption) error {
m.mut.Lock()
defer m.mut.Unlock()
_, ok := m.buffers[channel]
if ok {
return merr.WrapErrChannelReduplicate(channel)
}
buf, err := NewWriteBuffer(schema, metacache, m.syncMgr, opts...)
if err != nil {
return err
}
m.buffers[channel] = buf
return nil
}
// FlushSegments call sync segment and change segments state to Flushed.
func (m *manager) FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()
if !ok {
log.Ctx(ctx).Warn("write buffer not found when flush segments",
zap.String("channel", channel),
zap.Int64s("segmentIDs", segmentIDs))
return merr.WrapErrChannelNotFound(channel)
}
return buf.FlushSegments(ctx, segmentIDs)
}
// BufferData put data into channel write buffer.
func (m *manager) BufferData(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()
if !ok {
log.Ctx(context.Background()).Warn("write buffer not found when buffer data",
zap.String("channel", channel))
return merr.WrapErrChannelNotFound(channel)
}
return buf.BufferData(insertMsgs, deleteMsgs, startPos, endPos)
}
// RemoveChannel remove channel WriteBuffer from manager.
func (m *manager) RemoveChannel(channel string) {
m.mut.Lock()
buf, ok := m.buffers[channel]
delete(m.buffers, channel)
m.mut.Unlock()
if !ok {
log.Warn("failed to remove channel, channel not maintained in manager", zap.String("channel", channel))
return
}
buf.Close()
}

View File

@ -0,0 +1,138 @@
package writebuffer
import (
"context"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type ManagerSuite struct {
suite.Suite
channelName string
collSchema *schemapb.CollectionSchema
syncMgr *syncmgr.MockSyncManager
metacache *metacache.MockMetaCache
manager *manager
}
func (s *ManagerSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collSchema = &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
s.channelName = "by-dev-rootcoord-dml_0_100_v0"
}
func (s *ManagerSuite) SetupTest() {
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
mgr := NewManager(s.syncMgr)
var ok bool
s.manager, ok = mgr.(*manager)
s.Require().True(ok)
}
func (s *ManagerSuite) TestRegister() {
manager := s.manager
err := manager.Register(s.channelName, s.collSchema, s.metacache)
s.NoError(err)
err = manager.Register(s.channelName, s.collSchema, s.metacache)
s.Error(err)
s.ErrorIs(err, merr.ErrChannelReduplicate)
}
func (s *ManagerSuite) TestFlushSegments() {
manager := s.manager
s.Run("channel_not_found", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := manager.FlushSegments(ctx, s.channelName, []int64{1, 2, 3})
s.Error(err, "FlushSegments shall return error when channel not found")
})
s.Run("normal_flush", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wb := NewMockWriteBuffer(s.T())
s.manager.mut.Lock()
s.manager.buffers[s.channelName] = wb
s.manager.mut.Unlock()
wb.EXPECT().FlushSegments(mock.Anything, mock.Anything).Return(nil)
err := manager.FlushSegments(ctx, s.channelName, []int64{1})
s.NoError(err)
})
}
func (s *ManagerSuite) TestBufferData() {
manager := s.manager
s.Run("channel_not_found", func() {
err := manager.BufferData(s.channelName, nil, nil, nil, nil)
s.Error(err, "FlushSegments shall return error when channel not found")
})
s.Run("normal_buffer_data", func() {
wb := NewMockWriteBuffer(s.T())
s.manager.mut.Lock()
s.manager.buffers[s.channelName] = wb
s.manager.mut.Unlock()
wb.EXPECT().BufferData(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err := manager.BufferData(s.channelName, nil, nil, nil, nil)
s.NoError(err)
})
}
func (s *ManagerSuite) TestRemoveChannel() {
manager := NewManager(s.syncMgr)
s.Run("remove_not_exist", func() {
s.NotPanics(func() {
manager.RemoveChannel(s.channelName)
})
})
s.Run("remove_channel", func() {
err := manager.Register(s.channelName, s.collSchema, s.metacache)
s.Require().NoError(err)
s.NotPanics(func() {
manager.RemoveChannel(s.channelName)
})
})
}
func TestManager(t *testing.T) {
suite.Run(t, new(ManagerSuite))
}

View File

@ -0,0 +1,201 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package writebuffer
import (
context "context"
msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
mock "github.com/stretchr/testify/mock"
msgstream "github.com/milvus-io/milvus/pkg/mq/msgstream"
)
// MockWriteBuffer is an autogenerated mock type for the WriteBuffer type
type MockWriteBuffer struct {
mock.Mock
}
type MockWriteBuffer_Expecter struct {
mock *mock.Mock
}
func (_m *MockWriteBuffer) EXPECT() *MockWriteBuffer_Expecter {
return &MockWriteBuffer_Expecter{mock: &_m.Mock}
}
// BufferData provides a mock function with given fields: insertMsgs, deleteMsgs, startPos, endPos
func (_m *MockWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition) error {
ret := _m.Called(insertMsgs, deleteMsgs, startPos, endPos)
var r0 error
if rf, ok := ret.Get(0).(func([]*msgstream.InsertMsg, []*msgstream.DeleteMsg, *msgpb.MsgPosition, *msgpb.MsgPosition) error); ok {
r0 = rf(insertMsgs, deleteMsgs, startPos, endPos)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockWriteBuffer_BufferData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BufferData'
type MockWriteBuffer_BufferData_Call struct {
*mock.Call
}
// BufferData is a helper method to define mock.On call
// - insertMsgs []*msgstream.InsertMsg
// - deleteMsgs []*msgstream.DeleteMsg
// - startPos *msgpb.MsgPosition
// - endPos *msgpb.MsgPosition
func (_e *MockWriteBuffer_Expecter) BufferData(insertMsgs interface{}, deleteMsgs interface{}, startPos interface{}, endPos interface{}) *MockWriteBuffer_BufferData_Call {
return &MockWriteBuffer_BufferData_Call{Call: _e.mock.On("BufferData", insertMsgs, deleteMsgs, startPos, endPos)}
}
func (_c *MockWriteBuffer_BufferData_Call) Run(run func(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition)) *MockWriteBuffer_BufferData_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].([]*msgstream.InsertMsg), args[1].([]*msgstream.DeleteMsg), args[2].(*msgpb.MsgPosition), args[3].(*msgpb.MsgPosition))
})
return _c
}
func (_c *MockWriteBuffer_BufferData_Call) Return(_a0 error) *MockWriteBuffer_BufferData_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockWriteBuffer_BufferData_Call) RunAndReturn(run func([]*msgstream.InsertMsg, []*msgstream.DeleteMsg, *msgpb.MsgPosition, *msgpb.MsgPosition) error) *MockWriteBuffer_BufferData_Call {
_c.Call.Return(run)
return _c
}
// Close provides a mock function with given fields:
func (_m *MockWriteBuffer) Close() {
_m.Called()
}
// MockWriteBuffer_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockWriteBuffer_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockWriteBuffer_Expecter) Close() *MockWriteBuffer_Close_Call {
return &MockWriteBuffer_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockWriteBuffer_Close_Call) Run(run func()) *MockWriteBuffer_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockWriteBuffer_Close_Call) Return() *MockWriteBuffer_Close_Call {
_c.Call.Return()
return _c
}
func (_c *MockWriteBuffer_Close_Call) RunAndReturn(run func()) *MockWriteBuffer_Close_Call {
_c.Call.Return(run)
return _c
}
// FlushSegments provides a mock function with given fields: ctx, segmentIDs
func (_m *MockWriteBuffer) FlushSegments(ctx context.Context, segmentIDs []int64) error {
ret := _m.Called(ctx, segmentIDs)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok {
r0 = rf(ctx, segmentIDs)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockWriteBuffer_FlushSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FlushSegments'
type MockWriteBuffer_FlushSegments_Call struct {
*mock.Call
}
// FlushSegments is a helper method to define mock.On call
// - ctx context.Context
// - segmentIDs []int64
func (_e *MockWriteBuffer_Expecter) FlushSegments(ctx interface{}, segmentIDs interface{}) *MockWriteBuffer_FlushSegments_Call {
return &MockWriteBuffer_FlushSegments_Call{Call: _e.mock.On("FlushSegments", ctx, segmentIDs)}
}
func (_c *MockWriteBuffer_FlushSegments_Call) Run(run func(ctx context.Context, segmentIDs []int64)) *MockWriteBuffer_FlushSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]int64))
})
return _c
}
func (_c *MockWriteBuffer_FlushSegments_Call) Return(_a0 error) *MockWriteBuffer_FlushSegments_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockWriteBuffer_FlushSegments_Call) RunAndReturn(run func(context.Context, []int64) error) *MockWriteBuffer_FlushSegments_Call {
_c.Call.Return(run)
return _c
}
// HasSegment provides a mock function with given fields: segmentID
func (_m *MockWriteBuffer) HasSegment(segmentID int64) bool {
ret := _m.Called(segmentID)
var r0 bool
if rf, ok := ret.Get(0).(func(int64) bool); ok {
r0 = rf(segmentID)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockWriteBuffer_HasSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HasSegment'
type MockWriteBuffer_HasSegment_Call struct {
*mock.Call
}
// HasSegment is a helper method to define mock.On call
// - segmentID int64
func (_e *MockWriteBuffer_Expecter) HasSegment(segmentID interface{}) *MockWriteBuffer_HasSegment_Call {
return &MockWriteBuffer_HasSegment_Call{Call: _e.mock.On("HasSegment", segmentID)}
}
func (_c *MockWriteBuffer_HasSegment_Call) Run(run func(segmentID int64)) *MockWriteBuffer_HasSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockWriteBuffer_HasSegment_Call) Return(_a0 bool) *MockWriteBuffer_HasSegment_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockWriteBuffer_HasSegment_Call) RunAndReturn(run func(int64) bool) *MockWriteBuffer_HasSegment_Call {
_c.Call.Return(run)
return _c
}
// NewMockWriteBuffer creates a new instance of MockWriteBuffer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockWriteBuffer(t interface {
mock.TestingT
Cleanup(func())
}) *MockWriteBuffer {
mock := &MockWriteBuffer{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,70 @@
package writebuffer
import (
"time"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
const (
// DeletePolicyBFPKOracle is the const config value for using bf pk oracle as delete policy
DeletePolicyBFPkOracle = `bloom_filter_pkoracle`
// DeletePolicyL0Delta is the const config value for using L0 delta as deleta policy.
DeletePolicyL0Delta = `l0_delta`
)
type WriteBufferOption func(opt *writeBufferOption)
type writeBufferOption struct {
deletePolicy string
idAllocator allocator.Interface
syncPolicies []SyncPolicy
pkStatsFactory metacache.PkStatsFactory
broker broker.Broker
}
func defaultWBOption() *writeBufferOption {
return &writeBufferOption{
// TODO use l0 delta as default after implementation.
deletePolicy: paramtable.Get().DataNodeCfg.DeltaPolicy.GetValue(),
syncPolicies: []SyncPolicy{
SyncFullBuffer,
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
},
}
}
func WithDeletePolicy(policy string) WriteBufferOption {
return func(opt *writeBufferOption) {
opt.deletePolicy = policy
}
}
func WithIDAllocator(allocator allocator.Interface) WriteBufferOption {
return func(opt *writeBufferOption) {
opt.idAllocator = allocator
}
}
func WithPKStatsFactory(factory metacache.PkStatsFactory) WriteBufferOption {
return func(opt *writeBufferOption) {
opt.pkStatsFactory = factory
}
}
func WithBroker(broker broker.Broker) WriteBufferOption {
return func(opt *writeBufferOption) {
opt.broker = broker
}
}
func WithSyncPolicy(policy SyncPolicy) WriteBufferOption {
return func(opt *writeBufferOption) {
opt.syncPolicies = append(opt.syncPolicies, policy)
}
}

View File

@ -0,0 +1,56 @@
package writebuffer
import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type segmentBuffer struct {
segmentID int64
insertBuffer *InsertBuffer
deltaBuffer *DeltaBuffer
flushing bool
}
func newSegmentBuffer(segmentID int64, collSchema *schemapb.CollectionSchema) (*segmentBuffer, error) {
insertBuffer, err := NewInsertBuffer(collSchema)
if err != nil {
return nil, err
}
return &segmentBuffer{
segmentID: segmentID,
insertBuffer: insertBuffer,
deltaBuffer: NewDeltaBuffer(),
}, nil
}
func (buf *segmentBuffer) IsFull() bool {
return buf.insertBuffer.IsFull() || buf.deltaBuffer.IsFull()
}
func (buf *segmentBuffer) Renew() (insert *storage.InsertData, delete *storage.DeleteData) {
return buf.insertBuffer.Renew(), buf.deltaBuffer.Renew()
}
func (buf *segmentBuffer) SetFlush() {
buf.flushing = true
}
func (buf *segmentBuffer) MinTimestamp() typeutil.Timestamp {
insertTs := buf.insertBuffer.MinTimestamp()
deltaTs := buf.deltaBuffer.MinTimestamp()
if insertTs < deltaTs {
return insertTs
}
return deltaTs
}
// TimeRange is a range of timestamp contains the min-timestamp and max-timestamp
type TimeRange struct {
timestampMin typeutil.Timestamp
timestampMax typeutil.Timestamp
}

View File

@ -0,0 +1,38 @@
package writebuffer
import (
"time"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type SyncPolicy func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64
func SyncFullBuffer(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 {
return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
return buf.segmentID, buf.IsFull()
})
}
func GetSyncStaleBufferPolicy(staleDuration time.Duration) SyncPolicy {
return func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
current := tsoutil.PhysicalTime(ts)
return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
minTs := buf.MinTimestamp()
start := tsoutil.PhysicalTime(minTs)
return buf.segmentID, current.Sub(start) > staleDuration
})
}
}
func GetFlushingSegmentsPolicy(meta metacache.MetaCache) SyncPolicy {
return func(_ []*segmentBuffer, _ typeutil.Timestamp) []int64 {
return meta.GetSegmentIDsBy(metacache.WithSegmentState(commonpb.SegmentState_Flushing))
}
}

View File

@ -0,0 +1,80 @@
package writebuffer
import (
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type SyncPolicySuite struct {
suite.Suite
collSchema *schemapb.CollectionSchema
}
func (s *SyncPolicySuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collSchema = &schemapb.CollectionSchema{
Name: "wb_base_collection",
Fields: []*schemapb.FieldSchema{
{FieldID: 100, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"},
{FieldID: 101, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
}},
},
}
}
func (s *SyncPolicySuite) TestSyncFullBuffer() {
buffer, err := newSegmentBuffer(100, s.collSchema)
s.Require().NoError(err)
ids := SyncFullBuffer([]*segmentBuffer{buffer}, 0)
s.Equal(0, len(ids), "empty buffer shall not be synced")
buffer.insertBuffer.rows = buffer.insertBuffer.rowLimit + 1
ids = SyncFullBuffer([]*segmentBuffer{buffer}, 0)
s.ElementsMatch([]int64{100}, ids)
}
func (s *SyncPolicySuite) TestSyncStalePolicy() {
policy := GetSyncStaleBufferPolicy(time.Minute)
buffer, err := newSegmentBuffer(100, s.collSchema)
s.Require().NoError(err)
ids := policy([]*segmentBuffer{buffer}, tsoutil.ComposeTSByTime(time.Now(), 0))
s.Equal(0, len(ids), "empty buffer shall not be synced")
buffer.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: tsoutil.ComposeTSByTime(time.Now().Add(-time.Minute*2), 0),
}
ids = policy([]*segmentBuffer{buffer}, tsoutil.ComposeTSByTime(time.Now(), 0))
s.ElementsMatch([]int64{100}, ids)
}
func (s *SyncPolicySuite) TestFlushingSegmentsPolicy() {
metacache := metacache.NewMockMetaCache(s.T())
policy := GetFlushingSegmentsPolicy(metacache)
ids := []int64{1, 2, 3}
metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return(ids)
result := policy([]*segmentBuffer{}, tsoutil.ComposeTSByTime(time.Now(), 0))
s.ElementsMatch(ids, result)
}
func TestSyncPolicy(t *testing.T) {
suite.Run(t, new(SyncPolicySuite))
}

View File

@ -0,0 +1,226 @@
package writebuffer
import (
"context"
"sync"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// WriteBuffer is the interface for channel write buffer.
// It provides abstraction for channel write buffer and pk bloom filter & L0 delta logic.
type WriteBuffer interface {
// HasSegment checks whether certain segment exists in this buffer.
HasSegment(segmentID int64) bool
// BufferData is the method to buffer dml data msgs.
BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error
// FlushSegments is the method to perform `Sync` operation with provided options.
FlushSegments(ctx context.Context, segmentIDs []int64) error
// Close is the method to close and sink current buffer data.
Close()
}
func NewWriteBuffer(schema *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) {
option := defaultWBOption()
option.syncPolicies = append(option.syncPolicies, GetFlushingSegmentsPolicy(metacache))
for _, opt := range opts {
opt(option)
}
switch option.deletePolicy {
case DeletePolicyBFPkOracle:
return NewBFWriteBuffer(schema, metacache, syncMgr, option)
case DeletePolicyL0Delta:
return NewL0WriteBuffer(schema, metacache, syncMgr, option)
default:
return nil, merr.WrapErrParameterInvalid("valid delete policy config", option.deletePolicy)
}
}
// writeBufferBase is the common component for buffering data
type writeBufferBase struct {
mut sync.RWMutex
collectionID int64
collSchema *schemapb.CollectionSchema
metaCache metacache.MetaCache
syncMgr syncmgr.SyncManager
broker broker.Broker
buffers map[int64]*segmentBuffer // segmentID => segmentBuffer
syncPolicies []SyncPolicy
checkpoint *msgpb.MsgPosition
}
func newWriteBufferBase(sch *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) *writeBufferBase {
return &writeBufferBase{
collSchema: sch,
syncMgr: syncMgr,
broker: option.broker,
buffers: make(map[int64]*segmentBuffer),
metaCache: metacache,
syncPolicies: option.syncPolicies,
}
}
func (wb *writeBufferBase) HasSegment(segmentID int64) bool {
wb.mut.RLock()
defer wb.mut.RUnlock()
_, ok := wb.buffers[segmentID]
return ok
}
func (wb *writeBufferBase) FlushSegments(ctx context.Context, segmentIDs []int64) error {
wb.mut.RLock()
defer wb.mut.RUnlock()
return wb.flushSegments(ctx, segmentIDs)
}
func (wb *writeBufferBase) triggerAutoSync() error {
segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp())
if len(segmentsToSync) > 0 {
log.Info("write buffer get segments to sync", zap.Int64s("segmentIDs", segmentsToSync))
err := wb.syncSegments(context.Background(), segmentsToSync)
if err != nil {
log.Warn("segment segments failed", zap.Int64s("segmentIDs", segmentsToSync), zap.Error(err))
return err
}
}
return nil
}
func (wb *writeBufferBase) flushSegments(ctx context.Context, segmentIDs []int64) error {
// mark segment flushing if segment was growing
wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushing),
metacache.WithSegmentIDs(segmentIDs...),
metacache.WithSegmentState(commonpb.SegmentState_Growing))
return nil
}
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) error {
log := log.Ctx(ctx)
for _, segmentID := range segmentIDs {
infos := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID))
if len(infos) == 0 {
log.Warn("segment info not found in meta cache", zap.Int64("segmentID", segmentID))
continue
}
segmentInfo := infos[0]
buffer, exist := wb.getBuffer(segmentID)
var insert *storage.InsertData
var delta *storage.DeleteData
if exist {
insert, delta = buffer.Renew()
}
wb.metaCache.UpdateSegments(metacache.RollStats(), metacache.WithSegmentIDs(segmentID))
syncTask := syncmgr.NewSyncTask().
WithInsertData(insert).
WithDeleteData(delta).
WithCollectionID(wb.collectionID).
WithPartitionID(segmentInfo.PartitionID()).
WithSegmentID(segmentID).
WithCheckpoint(wb.checkpoint).
WithSchema(wb.collSchema).
WithMetaWriter(syncmgr.BrokerMetaWriter(wb.broker)).
WithFailureCallback(func(err error) {
// TODO could change to unsub channel in the future
panic(err)
})
// update flush& drop state
switch segmentInfo.State() {
case commonpb.SegmentState_Flushing:
syncTask.WithFlush()
case commonpb.SegmentState_Dropped:
syncTask.WithDrop()
}
err := wb.syncMgr.SyncData(ctx, syncTask)
if err != nil {
return err
}
}
return nil
}
// getSegmentsToSync applies all policies to get segments list to sync.
// **NOTE** shall be invoked within mutex protection
func (wb *writeBufferBase) getSegmentsToSync(ts typeutil.Timestamp) []int64 {
buffers := lo.Values(wb.buffers)
segments := typeutil.NewSet[int64]()
for _, policy := range wb.syncPolicies {
segments.Insert(policy(buffers, ts)...)
}
return segments.Collect()
}
func (wb *writeBufferBase) getOrCreateBuffer(segmentID int64) *segmentBuffer {
buffer, ok := wb.buffers[segmentID]
if !ok {
var err error
buffer, err = newSegmentBuffer(segmentID, wb.collSchema)
if err != nil {
// TODO avoid panic here
panic(err)
}
wb.buffers[segmentID] = buffer
}
return buffer
}
func (wb *writeBufferBase) getBuffer(segmentID int64) (*segmentBuffer, bool) {
buffer, ok := wb.buffers[segmentID]
return buffer, ok
}
// bufferInsert transform InsertMsg into bufferred InsertData and returns primary key field data for future usage.
func (wb *writeBufferBase) bufferInsert(insertMsgs []*msgstream.InsertMsg, startPos, endPos *msgpb.MsgPosition) (map[int64][]storage.FieldData, error) {
insertGroups := lo.GroupBy(insertMsgs, func(msg *msgstream.InsertMsg) int64 { return msg.GetSegmentID() })
segmentPKData := make(map[int64][]storage.FieldData)
for segmentID, msgs := range insertGroups {
segBuf := wb.getOrCreateBuffer(segmentID)
pkData, err := segBuf.insertBuffer.Buffer(msgs, startPos, endPos)
if err != nil {
log.Warn("failed to buffer insert data", zap.Int64("segmentID", segmentID), zap.Error(err))
return nil, err
}
segmentPKData[segmentID] = pkData
}
return segmentPKData, nil
}
// bufferDelete buffers DeleteMsg into DeleteData.
func (wb *writeBufferBase) bufferDelete(segmentID int64, pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) error {
segBuf := wb.getOrCreateBuffer(segmentID)
segBuf.deltaBuffer.Buffer(pks, tss, startPos, endPos)
return nil
}
func (wb *writeBufferBase) Close() {
}

View File

@ -0,0 +1,91 @@
package writebuffer
import (
"context"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type WriteBufferSuite struct {
suite.Suite
collSchema *schemapb.CollectionSchema
wb *writeBufferBase
syncMgr *syncmgr.MockSyncManager
metacache *metacache.MockMetaCache
}
func (s *WriteBufferSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collSchema = &schemapb.CollectionSchema{
Name: "wb_base_collection",
Fields: []*schemapb.FieldSchema{
{FieldID: 100, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"},
{FieldID: 101, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
}},
},
}
}
func (s *WriteBufferSuite) SetupTest() {
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
s.wb = newWriteBufferBase(s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{
pkStatsFactory: func(vchannel *datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
},
})
}
func (s *WriteBufferSuite) TestWriteBufferType() {
wb, err := NewWriteBuffer(s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
s.NoError(err)
_, ok := wb.(*bfWriteBuffer)
s.True(ok)
wb, err = NewWriteBuffer(s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyL0Delta), WithIDAllocator(allocator.NewMockGIDAllocator()))
s.NoError(err)
_, ok = wb.(*l0WriteBuffer)
s.True(ok)
_, err = NewWriteBuffer(s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(""))
s.Error(err)
}
func (s *WriteBufferSuite) TestHasSegment() {
segmentID := int64(1001)
s.False(s.wb.HasSegment(segmentID))
s.wb.getOrCreateBuffer(segmentID)
s.True(s.wb.HasSegment(segmentID))
}
func (s *WriteBufferSuite) TestFlushSegments() {
segmentID := int64(1001)
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything)
wb, err := NewWriteBuffer(s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
s.NoError(err)
err = wb.FlushSegments(context.Background(), []int64{segmentID})
s.NoError(err)
}
func TestWriteBufferBase(t *testing.T) {
suite.Run(t, new(WriteBufferSuite))
}

View File

@ -525,6 +525,14 @@ func WrapErrSegmentNotFound(id int64, msg ...string) error {
return err
}
func WrapErrSegmentsNotFound(ids []int64, msg ...string) error {
err := wrapWithField(ErrSegmentNotFound, "segments", ids)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrSegmentNotLoaded(id int64, msg ...string) error {
err := wrapWithField(ErrSegmentNotLoaded, "segment", id)
if len(msg) > 0 {

View File

@ -2419,6 +2419,7 @@ type dataNodeConfig struct {
FlushDeleteBufferBytes ParamItem `refreshable:"true"`
BinLogMaxSize ParamItem `refreshable:"true"`
SyncPeriod ParamItem `refreshable:"true"`
DeltaPolicy ParamItem `refreshable:"false"`
// watchEvent
WatchEventTicklerInterval ParamItem `refreshable:"false"`
@ -2548,6 +2549,15 @@ func (p *dataNodeConfig) init(base *BaseTable) {
}
p.SyncPeriod.Init(base.mgr)
p.DeltaPolicy = ParamItem{
Key: "dataNode.segment.deltaPolicy",
Version: "2.3.4",
DefaultValue: "bloom_filter_pkoracle",
Doc: "the delta policy current datanode using",
Export: true,
}
p.DeltaPolicy.Init(base.mgr)
p.WatchEventTicklerInterval = ParamItem{
Key: "datanode.segment.watchEventTicklerInterval",
Version: "2.2.3",