From bf2f62c1e7231bb2f7f3f27c5807439eb63b6918 Mon Sep 17 00:00:00 2001 From: congqixia Date: Sat, 4 Nov 2023 12:10:17 +0800 Subject: [PATCH] Add `WriteBuffer` to provide abstraction for delta policy (#27874) Signed-off-by: Congqi Xia --- Makefile | 2 + internal/datanode/metacache/actions.go | 6 +- internal/datanode/metacache/actions_test.go | 2 +- internal/datanode/metacache/meta_cache.go | 11 +- .../datanode/metacache/meta_cache_test.go | 6 +- .../datanode/metacache/mock_meta_cache.go | 37 ++- internal/datanode/metacache/segment.go | 4 + internal/datanode/syncmgr/meta_writer.go | 6 +- .../datanode/syncmgr/mock_sync_manager.go | 145 +++++++++ internal/datanode/syncmgr/options.go | 5 + internal/datanode/syncmgr/task.go | 5 +- internal/datanode/syncmgr/task_test.go | 1 + .../datanode/writebuffer/bf_write_buffer.go | 73 +++++ .../writebuffer/bf_write_buffer_test.go | 182 +++++++++++ internal/datanode/writebuffer/delta_buffer.go | 80 +++++ .../datanode/writebuffer/delta_buffer_test.go | 66 ++++ .../datanode/writebuffer/insert_buffer.go | 171 ++++++++++ .../writebuffer/insert_buffer_test.go | 294 ++++++++++++++++++ .../datanode/writebuffer/l0_write_buffer.go | 83 +++++ .../writebuffer/l0_write_buffer_test.go | 158 ++++++++++ internal/datanode/writebuffer/manager.go | 105 +++++++ internal/datanode/writebuffer/manager_test.go | 138 ++++++++ .../datanode/writebuffer/mock_write_buffer.go | 201 ++++++++++++ internal/datanode/writebuffer/options.go | 70 +++++ .../datanode/writebuffer/segment_buffer.go | 56 ++++ internal/datanode/writebuffer/sync_policy.go | 38 +++ .../datanode/writebuffer/sync_policy_test.go | 80 +++++ internal/datanode/writebuffer/write_buffer.go | 226 ++++++++++++++ .../datanode/writebuffer/write_buffer_test.go | 91 ++++++ pkg/util/merr/utils.go | 8 + pkg/util/paramtable/component_param.go | 10 + 31 files changed, 2337 insertions(+), 23 deletions(-) create mode 100644 internal/datanode/syncmgr/mock_sync_manager.go create mode 100644 internal/datanode/writebuffer/bf_write_buffer.go create mode 100644 internal/datanode/writebuffer/bf_write_buffer_test.go create mode 100644 internal/datanode/writebuffer/delta_buffer.go create mode 100644 internal/datanode/writebuffer/delta_buffer_test.go create mode 100644 internal/datanode/writebuffer/insert_buffer.go create mode 100644 internal/datanode/writebuffer/insert_buffer_test.go create mode 100644 internal/datanode/writebuffer/l0_write_buffer.go create mode 100644 internal/datanode/writebuffer/l0_write_buffer_test.go create mode 100644 internal/datanode/writebuffer/manager.go create mode 100644 internal/datanode/writebuffer/manager_test.go create mode 100644 internal/datanode/writebuffer/mock_write_buffer.go create mode 100644 internal/datanode/writebuffer/options.go create mode 100644 internal/datanode/writebuffer/segment_buffer.go create mode 100644 internal/datanode/writebuffer/sync_policy.go create mode 100644 internal/datanode/writebuffer/sync_policy_test.go create mode 100644 internal/datanode/writebuffer/write_buffer.go create mode 100644 internal/datanode/writebuffer/write_buffer_test.go diff --git a/Makefile b/Makefile index d30a0a7336..27ff5f882e 100644 --- a/Makefile +++ b/Makefile @@ -430,6 +430,8 @@ generate-mockery-datanode: getdeps $(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage $(INSTALL_PATH)/mockery --name=Broker --dir=$(PWD)/internal/datanode/broker --output=$(PWD)/internal/datanode/broker/ --filename=mock_broker.go --with-expecter --structname=MockBroker --outpkg=broker --inpackage $(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/datanode/metacache --output=$(PWD)/internal/datanode/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage + $(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage + $(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --output=writebuffer --inpackage generate-mockery-metastore: getdeps $(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks diff --git a/internal/datanode/metacache/actions.go b/internal/datanode/metacache/actions.go index d776a5831d..c9cecc4f76 100644 --- a/internal/datanode/metacache/actions.go +++ b/internal/datanode/metacache/actions.go @@ -19,6 +19,7 @@ package metacache import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type SegmentFilter func(info *SegmentInfo) bool @@ -29,9 +30,10 @@ func WithPartitionID(partitionID int64) SegmentFilter { } } -func WithSegmentID(segmentID int64) SegmentFilter { +func WithSegmentIDs(segmentIDs ...int64) SegmentFilter { + set := typeutil.NewSet[int64](segmentIDs...) return func(info *SegmentInfo) bool { - return info.segmentID == segmentID + return set.Contain(info.segmentID) } } diff --git a/internal/datanode/metacache/actions_test.go b/internal/datanode/metacache/actions_test.go index c234384dc2..29cafd29c3 100644 --- a/internal/datanode/metacache/actions_test.go +++ b/internal/datanode/metacache/actions_test.go @@ -40,7 +40,7 @@ func (s *SegmentFilterSuite) TestFilters() { s.True(filter(info)) segmentID := int64(10001) - filter = WithSegmentID(segmentID) + filter = WithSegmentIDs(segmentID) info.segmentID = segmentID + 1 s.False(filter(info)) info.segmentID = segmentID diff --git a/internal/datanode/metacache/meta_cache.go b/internal/datanode/metacache/meta_cache.go index d3f4c83138..b06c1ab724 100644 --- a/internal/datanode/metacache/meta_cache.go +++ b/internal/datanode/metacache/meta_cache.go @@ -23,12 +23,13 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" ) type MetaCache interface { - NewSegment(segmentID, partitionID int64) + NewSegment(segmentID, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction) UpdateSegments(action SegmentAction, filters ...SegmentFilter) CompactSegments(newSegmentID, partitionID int64, oldSegmentIDs ...int64) GetSegmentsBy(filters ...SegmentFilter) []*SegmentInfo @@ -67,17 +68,21 @@ func (c *metaCacheImpl) init(vchannel *datapb.VchannelInfo, factory PkStatsFacto } } -func (c *metaCacheImpl) NewSegment(segmentID, partitionID int64) { +func (c *metaCacheImpl) NewSegment(segmentID, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction) { c.mu.Lock() defer c.mu.Unlock() if _, ok := c.segmentInfos[segmentID]; !ok { - c.segmentInfos[segmentID] = &SegmentInfo{ + info := &SegmentInfo{ segmentID: segmentID, partitionID: partitionID, state: commonpb.SegmentState_Growing, startPosRecorded: false, } + for _, action := range actions { + action(info) + } + c.segmentInfos[segmentID] = info } } diff --git a/internal/datanode/metacache/meta_cache_test.go b/internal/datanode/metacache/meta_cache_test.go index ea6e91d4ed..f1f71a7367 100644 --- a/internal/datanode/metacache/meta_cache_test.go +++ b/internal/datanode/metacache/meta_cache_test.go @@ -81,7 +81,7 @@ func (s *MetaCacheSuite) SetupTest() { func (s *MetaCacheSuite) TestNewSegment() { for i, seg := range s.newSegments { - s.cache.NewSegment(seg, s.partitionIDs[i]) + s.cache.NewSegment(seg, s.partitionIDs[i], nil, UpdateNumOfRows(100)) } for id, partitionID := range s.partitionIDs { @@ -110,8 +110,8 @@ func (s *MetaCacheSuite) TestCompactSegments() { } func (s *MetaCacheSuite) TestUpdateSegments() { - s.cache.UpdateSegments(UpdateState(commonpb.SegmentState_Flushed), WithSegmentID(5)) - segments := s.cache.GetSegmentsBy(WithSegmentID(5)) + s.cache.UpdateSegments(UpdateState(commonpb.SegmentState_Flushed), WithSegmentIDs(5)) + segments := s.cache.GetSegmentsBy(WithSegmentIDs(5)) s.Require().Equal(1, len(segments)) segment := segments[0] s.Equal(commonpb.SegmentState_Flushed, segment.State()) diff --git a/internal/datanode/metacache/mock_meta_cache.go b/internal/datanode/metacache/mock_meta_cache.go index 62f499d190..fd7f1f1a6f 100644 --- a/internal/datanode/metacache/mock_meta_cache.go +++ b/internal/datanode/metacache/mock_meta_cache.go @@ -2,7 +2,10 @@ package metacache -import mock "github.com/stretchr/testify/mock" +import ( + msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + mock "github.com/stretchr/testify/mock" +) // MockMetaCache is an autogenerated mock type for the MetaCache type type MockMetaCache struct { @@ -180,9 +183,16 @@ func (_c *MockMetaCache_GetSegmentsBy_Call) RunAndReturn(run func(...SegmentFilt return _c } -// NewSegment provides a mock function with given fields: segmentID, partitionID -func (_m *MockMetaCache) NewSegment(segmentID int64, partitionID int64) { - _m.Called(segmentID, partitionID) +// NewSegment provides a mock function with given fields: segmentID, partitionID, startPos, actions +func (_m *MockMetaCache) NewSegment(segmentID int64, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction) { + _va := make([]interface{}, len(actions)) + for _i := range actions { + _va[_i] = actions[_i] + } + var _ca []interface{} + _ca = append(_ca, segmentID, partitionID, startPos) + _ca = append(_ca, _va...) + _m.Called(_ca...) } // MockMetaCache_NewSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NewSegment' @@ -193,13 +203,22 @@ type MockMetaCache_NewSegment_Call struct { // NewSegment is a helper method to define mock.On call // - segmentID int64 // - partitionID int64 -func (_e *MockMetaCache_Expecter) NewSegment(segmentID interface{}, partitionID interface{}) *MockMetaCache_NewSegment_Call { - return &MockMetaCache_NewSegment_Call{Call: _e.mock.On("NewSegment", segmentID, partitionID)} +// - startPos *msgpb.MsgPosition +// - actions ...SegmentAction +func (_e *MockMetaCache_Expecter) NewSegment(segmentID interface{}, partitionID interface{}, startPos interface{}, actions ...interface{}) *MockMetaCache_NewSegment_Call { + return &MockMetaCache_NewSegment_Call{Call: _e.mock.On("NewSegment", + append([]interface{}{segmentID, partitionID, startPos}, actions...)...)} } -func (_c *MockMetaCache_NewSegment_Call) Run(run func(segmentID int64, partitionID int64)) *MockMetaCache_NewSegment_Call { +func (_c *MockMetaCache_NewSegment_Call) Run(run func(segmentID int64, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction)) *MockMetaCache_NewSegment_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64), args[1].(int64)) + variadicArgs := make([]SegmentAction, len(args)-3) + for i, a := range args[3:] { + if a != nil { + variadicArgs[i] = a.(SegmentAction) + } + } + run(args[0].(int64), args[1].(int64), args[2].(*msgpb.MsgPosition), variadicArgs...) }) return _c } @@ -209,7 +228,7 @@ func (_c *MockMetaCache_NewSegment_Call) Return() *MockMetaCache_NewSegment_Call return _c } -func (_c *MockMetaCache_NewSegment_Call) RunAndReturn(run func(int64, int64)) *MockMetaCache_NewSegment_Call { +func (_c *MockMetaCache_NewSegment_Call) RunAndReturn(run func(int64, int64, *msgpb.MsgPosition, ...SegmentAction)) *MockMetaCache_NewSegment_Call { _c.Call.Return(run) return _c } diff --git a/internal/datanode/metacache/segment.go b/internal/datanode/metacache/segment.go index 803a1e2729..0777230409 100644 --- a/internal/datanode/metacache/segment.go +++ b/internal/datanode/metacache/segment.go @@ -67,6 +67,10 @@ func (s *SegmentInfo) CompactTo() int64 { return s.compactTo } +func (s *SegmentInfo) GetBloomFilterSet() *BloomFilterSet { + return s.bfs +} + func (s *SegmentInfo) Clone() *SegmentInfo { return &SegmentInfo{ segmentID: s.segmentID, diff --git a/internal/datanode/syncmgr/meta_writer.go b/internal/datanode/syncmgr/meta_writer.go index c38abb0757..47a8bfdde2 100644 --- a/internal/datanode/syncmgr/meta_writer.go +++ b/internal/datanode/syncmgr/meta_writer.go @@ -51,7 +51,7 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error { deltaInfos[0] = &datapb.FieldBinlog{Binlogs: []*datapb.Binlog{pack.deltaBinlog}} // only current segment checkpoint info, - segments := pack.metacache.GetSegmentsBy(metacache.WithSegmentID(pack.segmentID)) + segments := pack.metacache.GetSegmentsBy(metacache.WithSegmentIDs(pack.segmentID)) if len(segments) == 0 { return merr.WrapErrSegmentNotFound(pack.segmentID) } @@ -96,8 +96,8 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error { StartPositions: startPos, Flushed: pack.isFlush, - // Dropped: pack.option.isDrop, - Channel: pack.channelName, + Dropped: pack.isDrop, + Channel: pack.channelName, } err := retry.Do(context.Background(), func() error { err := b.broker.SaveBinlogPaths(context.Background(), req) diff --git a/internal/datanode/syncmgr/mock_sync_manager.go b/internal/datanode/syncmgr/mock_sync_manager.go new file mode 100644 index 0000000000..1187760096 --- /dev/null +++ b/internal/datanode/syncmgr/mock_sync_manager.go @@ -0,0 +1,145 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package syncmgr + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// MockSyncManager is an autogenerated mock type for the SyncManager type +type MockSyncManager struct { + mock.Mock +} + +type MockSyncManager_Expecter struct { + mock *mock.Mock +} + +func (_m *MockSyncManager) EXPECT() *MockSyncManager_Expecter { + return &MockSyncManager_Expecter{mock: &_m.Mock} +} + +// Block provides a mock function with given fields: segmentID +func (_m *MockSyncManager) Block(segmentID int64) { + _m.Called(segmentID) +} + +// MockSyncManager_Block_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Block' +type MockSyncManager_Block_Call struct { + *mock.Call +} + +// Block is a helper method to define mock.On call +// - segmentID int64 +func (_e *MockSyncManager_Expecter) Block(segmentID interface{}) *MockSyncManager_Block_Call { + return &MockSyncManager_Block_Call{Call: _e.mock.On("Block", segmentID)} +} + +func (_c *MockSyncManager_Block_Call) Run(run func(segmentID int64)) *MockSyncManager_Block_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockSyncManager_Block_Call) Return() *MockSyncManager_Block_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSyncManager_Block_Call) RunAndReturn(run func(int64)) *MockSyncManager_Block_Call { + _c.Call.Return(run) + return _c +} + +// SyncData provides a mock function with given fields: ctx, task +func (_m *MockSyncManager) SyncData(ctx context.Context, task *SyncTask) error { + ret := _m.Called(ctx, task) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *SyncTask) error); ok { + r0 = rf(ctx, task) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockSyncManager_SyncData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncData' +type MockSyncManager_SyncData_Call struct { + *mock.Call +} + +// SyncData is a helper method to define mock.On call +// - ctx context.Context +// - task *SyncTask +func (_e *MockSyncManager_Expecter) SyncData(ctx interface{}, task interface{}) *MockSyncManager_SyncData_Call { + return &MockSyncManager_SyncData_Call{Call: _e.mock.On("SyncData", ctx, task)} +} + +func (_c *MockSyncManager_SyncData_Call) Run(run func(ctx context.Context, task *SyncTask)) *MockSyncManager_SyncData_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*SyncTask)) + }) + return _c +} + +func (_c *MockSyncManager_SyncData_Call) Return(_a0 error) *MockSyncManager_SyncData_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, *SyncTask) error) *MockSyncManager_SyncData_Call { + _c.Call.Return(run) + return _c +} + +// Unblock provides a mock function with given fields: segmentID +func (_m *MockSyncManager) Unblock(segmentID int64) { + _m.Called(segmentID) +} + +// MockSyncManager_Unblock_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Unblock' +type MockSyncManager_Unblock_Call struct { + *mock.Call +} + +// Unblock is a helper method to define mock.On call +// - segmentID int64 +func (_e *MockSyncManager_Expecter) Unblock(segmentID interface{}) *MockSyncManager_Unblock_Call { + return &MockSyncManager_Unblock_Call{Call: _e.mock.On("Unblock", segmentID)} +} + +func (_c *MockSyncManager_Unblock_Call) Run(run func(segmentID int64)) *MockSyncManager_Unblock_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockSyncManager_Unblock_Call) Return() *MockSyncManager_Unblock_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSyncManager_Unblock_Call) RunAndReturn(run func(int64)) *MockSyncManager_Unblock_Call { + _c.Call.Return(run) + return _c +} + +// NewMockSyncManager creates a new instance of MockSyncManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockSyncManager(t interface { + mock.TestingT + Cleanup(func()) +}) *MockSyncManager { + mock := &MockSyncManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datanode/syncmgr/options.go b/internal/datanode/syncmgr/options.go index 1dcd2a6334..d4dfc7dd45 100644 --- a/internal/datanode/syncmgr/options.go +++ b/internal/datanode/syncmgr/options.go @@ -80,6 +80,11 @@ func (t *SyncTask) WithFlush() *SyncTask { return t } +func (t *SyncTask) WithDrop() *SyncTask { + t.isDrop = true + return t +} + func (t *SyncTask) WithMetaCache(metacache metacache.MetaCache) *SyncTask { t.metacache = metacache return t diff --git a/internal/datanode/syncmgr/task.go b/internal/datanode/syncmgr/task.go index a000301890..393920146f 100644 --- a/internal/datanode/syncmgr/task.go +++ b/internal/datanode/syncmgr/task.go @@ -41,6 +41,7 @@ type SyncTask struct { tsTo typeutil.Timestamp isFlush bool + isDrop bool metacache metacache.MetaCache metaWriter MetaWriter @@ -75,7 +76,7 @@ func (t *SyncTask) Run() error { log := t.getLogger() var err error - infos := t.metacache.GetSegmentsBy(metacache.WithSegmentID(t.segmentID)) + infos := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID)) if len(infos) == 0 { log.Warn("failed to sync data, segment not found in metacache") t.handleError(err) @@ -245,7 +246,7 @@ func (t *SyncTask) serializeSinglePkStats(fieldID int64, stats *storage.PrimaryK } func (t *SyncTask) serializeMergedPkStats(fieldID int64, stats *storage.PrimaryKeyStats, rowNum int64) error { - segments := t.metacache.GetSegmentsBy(metacache.WithSegmentID(t.segmentID)) + segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID)) var statsList []*storage.PrimaryKeyStats var oldRowNum int64 for _, segment := range segments { diff --git a/internal/datanode/syncmgr/task_test.go b/internal/datanode/syncmgr/task_test.go index fb00fc5211..a07265805e 100644 --- a/internal/datanode/syncmgr/task_test.go +++ b/internal/datanode/syncmgr/task_test.go @@ -201,6 +201,7 @@ func (s *SyncTaskSuite) TestRunNormal() { task := s.getSuiteSyncTask() task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer()) task.WithFlush() + task.WithDrop() task.WithMetaWriter(BrokerMetaWriter(s.broker)) task.WithCheckpoint(&msgpb.MsgPosition{ ChannelName: s.channelName, diff --git a/internal/datanode/writebuffer/bf_write_buffer.go b/internal/datanode/writebuffer/bf_write_buffer.go new file mode 100644 index 0000000000..92ad3e888a --- /dev/null +++ b/internal/datanode/writebuffer/bf_write_buffer.go @@ -0,0 +1,73 @@ +package writebuffer + +import ( + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type bfWriteBuffer struct { + *writeBufferBase + + syncMgr syncmgr.SyncManager + metacache metacache.MetaCache +} + +func NewBFWriteBuffer(sch *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) { + return &bfWriteBuffer{ + writeBufferBase: newWriteBufferBase(sch, metacache, syncMgr, option), + syncMgr: syncMgr, + }, nil +} + +func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { + wb.mut.Lock() + defer wb.mut.Unlock() + + // process insert msgs + pkData, err := wb.bufferInsert(insertMsgs, startPos, endPos) + if err != nil { + return err + } + + // update pk oracle + for segmentID, dataList := range pkData { + segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID)) + for _, segment := range segments { + for _, fieldData := range dataList { + err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) + if err != nil { + return err + } + } + } + } + + // distribute delete msg + for _, delMsg := range deleteMsgs { + pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) + segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID)) + for _, segment := range segments { + var deletePks []storage.PrimaryKey + var deleteTss []typeutil.Timestamp + for idx, pk := range pks { + if segment.GetBloomFilterSet().PkExists(pk) { + deletePks = append(deletePks, pk) + deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx]) + } + } + if len(deletePks) > 0 { + wb.bufferDelete(segment.SegmentID(), deletePks, deleteTss, startPos, endPos) + } + } + } + + // update buffer last checkpoint + wb.checkpoint = endPos + + return wb.triggerAutoSync() +} diff --git a/internal/datanode/writebuffer/bf_write_buffer_test.go b/internal/datanode/writebuffer/bf_write_buffer_test.go new file mode 100644 index 0000000000..769e0c41d5 --- /dev/null +++ b/internal/datanode/writebuffer/bf_write_buffer_test.go @@ -0,0 +1,182 @@ +package writebuffer + +import ( + "math/rand" + "testing" + "time" + + "github.com/samber/lo" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/broker" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +type BFWriteBufferSuite struct { + suite.Suite + collSchema *schemapb.CollectionSchema + syncMgr *syncmgr.MockSyncManager + metacache *metacache.MockMetaCache + broker *broker.MockBroker +} + +func (s *BFWriteBufferSuite) SetupSuite() { + paramtable.Get().Init(paramtable.NewBaseTable()) + s.collSchema = &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + } +} + +func (s *BFWriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int) ([]int64, *msgstream.InsertMsg) { + tss := lo.RepeatBy(rowCount, func(idx int) int64 { return int64(tsoutil.ComposeTSByTime(time.Now(), int64(idx))) }) + vectors := lo.RepeatBy(rowCount, func(_ int) []float32 { + return lo.RepeatBy(dim, func(_ int) float32 { return rand.Float32() }) + }) + flatten := lo.Flatten(vectors) + return tss, &msgstream.InsertMsg{ + InsertRequest: msgpb.InsertRequest{ + SegmentID: segmentID, + Version: msgpb.InsertDataVersion_ColumnBased, + RowIDs: tss, + Timestamps: lo.Map(tss, func(id int64, _ int) uint64 { return uint64(id) }), + FieldsData: []*schemapb.FieldData{ + { + FieldId: common.RowIDField, FieldName: common.RowIDFieldName, Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: tss, + }, + }, + }, + }, + }, + { + FieldId: common.TimeStampField, FieldName: common.TimeStampFieldName, Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: tss, + }, + }, + }, + }, + }, + { + FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: tss, + }, + }, + }, + }, + }, + { + FieldId: common.StartOfUserFieldID + 1, FieldName: "vector", Type: schemapb.DataType_FloatVector, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: int64(dim), + Data: &schemapb.VectorField_FloatVector{ + FloatVector: &schemapb.FloatArray{ + Data: flatten, + }, + }, + }, + }, + }, + }, + }, + } +} + +func (s *BFWriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstream.DeleteMsg { + delMsg := &msgstream.DeleteMsg{ + DeleteRequest: msgpb.DeleteRequest{ + PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks), + Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) }), + }, + } + return delMsg +} + +func (s *BFWriteBufferSuite) SetupTest() { + s.syncMgr = syncmgr.NewMockSyncManager(s.T()) + s.metacache = metacache.NewMockMetaCache(s.T()) + s.broker = broker.NewMockBroker(s.T()) +} + +func (s *BFWriteBufferSuite) TestBufferData() { + wb, err := NewBFWriteBuffer(s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{}) + s.NoError(err) + + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) + + pks, msg := s.composeInsertMsg(1000, 10, 128) + delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + + err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.NoError(err) +} + +func (s *BFWriteBufferSuite) TestAutoSync() { + paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1") + + wb, err := NewBFWriteBuffer(s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{ + syncPolicies: []SyncPolicy{ + SyncFullBuffer, + GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), + GetFlushingSegmentsPolicy(s.metacache), + }, + }) + s.NoError(err) + + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002}) // mock flushing + s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil).Twice() + + pks, msg := s.composeInsertMsg(1000, 10, 128) + delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + + err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.NoError(err) +} + +func TestBFWriteBuffer(t *testing.T) { + suite.Run(t, new(BFWriteBufferSuite)) +} diff --git a/internal/datanode/writebuffer/delta_buffer.go b/internal/datanode/writebuffer/delta_buffer.go new file mode 100644 index 0000000000..b6320eaa44 --- /dev/null +++ b/internal/datanode/writebuffer/delta_buffer.go @@ -0,0 +1,80 @@ +package writebuffer + +import ( + "math" + + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type DeltaBuffer struct { + BufferBase + + buffer *storage.DeleteData +} + +func NewDeltaBuffer() *DeltaBuffer { + return &DeltaBuffer{ + BufferBase: BufferBase{ + rowLimit: noLimit, + sizeLimit: paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64(), + TimestampFrom: math.MaxUint64, + TimestampTo: 0, + }, + buffer: &storage.DeleteData{}, + } +} + +func (ib *DeltaBuffer) getTimestampRange(tss []typeutil.Timestamp) TimeRange { + tr := TimeRange{ + timestampMin: math.MaxUint64, + timestampMax: 0, + } + + for _, data := range tss { + if data < tr.timestampMin { + tr.timestampMin = data + } + if data > tr.timestampMax { + tr.timestampMax = data + } + } + return tr +} + +func (db *DeltaBuffer) Renew() *storage.DeleteData { + if db.IsEmpty() { + return nil + } + result := db.buffer + db.BufferBase.rows = 0 + db.BufferBase.TimestampFrom = math.MaxUint64 + db.BufferBase.TimestampTo = 0 + + return result +} + +func (db *DeltaBuffer) Buffer(pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) (bufSize int64) { + rowCount := len(pks) + + for i := 0; i < rowCount; i++ { + db.buffer.Append(pks[i], tss[i]) + + switch pks[i].Type() { + case schemapb.DataType_Int64: + bufSize += 8 + case schemapb.DataType_VarChar: + varCharPk := pks[i].(*storage.VarCharPrimaryKey) + bufSize += int64(len(varCharPk.Value)) + } + // accumulate buf size for timestamp, which is 8 bytes + bufSize += 8 + } + + db.UpdateStatistics(int64(rowCount), bufSize, db.getTimestampRange(tss), startPos, endPos) + + return bufSize +} diff --git a/internal/datanode/writebuffer/delta_buffer_test.go b/internal/datanode/writebuffer/delta_buffer_test.go new file mode 100644 index 0000000000..18e65d0eca --- /dev/null +++ b/internal/datanode/writebuffer/delta_buffer_test.go @@ -0,0 +1,66 @@ +package writebuffer + +import ( + "fmt" + "testing" + "time" + + "github.com/samber/lo" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +type DeltaBufferSuite struct { + suite.Suite +} + +func (s *DeltaBufferSuite) TestBuffer() { + s.Run("int64_pk", func() { + deltaBuffer := NewDeltaBuffer() + + tss := lo.RepeatBy(100, func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) }) + pks := lo.Map(tss, func(ts uint64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(int64(ts)) }) + + memSize := deltaBuffer.Buffer(pks, tss, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.EqualValues(100*8*2, memSize) + }) + + s.Run("string_pk", func() { + deltaBuffer := NewDeltaBuffer() + + tss := lo.RepeatBy(100, func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) }) + pks := lo.Map(tss, func(ts uint64, idx int) storage.PrimaryKey { + return storage.NewVarCharPrimaryKey(fmt.Sprintf("%03d", idx)) + }) + + memSize := deltaBuffer.Buffer(pks, tss, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.EqualValues(100*8+100*3, memSize) + }) +} + +func (s *DeltaBufferSuite) TestRenew() { + deltaBuffer := NewDeltaBuffer() + + result := deltaBuffer.Renew() + s.Nil(result) + s.True(deltaBuffer.IsEmpty()) + + tss := lo.RepeatBy(100, func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) }) + pks := lo.Map(tss, func(ts uint64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(int64(ts)) }) + + deltaBuffer.Buffer(pks, tss, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + + result = deltaBuffer.Renew() + s.NotNil(result) + s.True(deltaBuffer.IsEmpty()) + + s.ElementsMatch(tss, result.Tss) + s.ElementsMatch(pks, result.Pks) +} + +func TestDeltaBuffer(t *testing.T) { + suite.Run(t, new(DeltaBufferSuite)) +} diff --git a/internal/datanode/writebuffer/insert_buffer.go b/internal/datanode/writebuffer/insert_buffer.go new file mode 100644 index 0000000000..509bcb06cc --- /dev/null +++ b/internal/datanode/writebuffer/insert_buffer.go @@ -0,0 +1,171 @@ +package writebuffer + +import ( + "math" + + "github.com/cockroachdb/errors" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +const ( + noLimit int64 = -1 +) + +type BufferBase struct { + rows int64 + rowLimit int64 + size int64 + sizeLimit int64 + + TimestampFrom typeutil.Timestamp + TimestampTo typeutil.Timestamp + + startPos *msgpb.MsgPosition + endPos *msgpb.MsgPosition +} + +func (b *BufferBase) UpdateStatistics(entryNum, size int64, tr TimeRange, startPos, endPos *msgpb.MsgPosition) { + b.rows += entryNum + b.size += size + + if tr.timestampMin < b.TimestampFrom { + b.TimestampFrom = tr.timestampMin + } + if tr.timestampMax > b.TimestampTo { + b.TimestampTo = tr.timestampMax + } + + if b.startPos == nil || startPos.Timestamp < b.startPos.Timestamp { + b.startPos = startPos + } + if b.endPos == nil || endPos.Timestamp > b.endPos.Timestamp { + b.endPos = endPos + } +} + +func (b *BufferBase) IsFull() bool { + return (b.rowLimit != noLimit && b.rows >= b.rowLimit) || + (b.sizeLimit != noLimit && b.size >= b.sizeLimit) +} + +func (b *BufferBase) IsEmpty() bool { + return b.rows == 0 +} + +func (b *BufferBase) MinTimestamp() typeutil.Timestamp { + if b.startPos == nil { + return math.MaxUint64 + } + return b.startPos.GetTimestamp() +} + +type InsertBuffer struct { + BufferBase + collSchema *schemapb.CollectionSchema + + buffer *storage.InsertData +} + +func NewInsertBuffer(sch *schemapb.CollectionSchema) (*InsertBuffer, error) { + size, err := typeutil.EstimateSizePerRecord(sch) + if err != nil { + log.Warn("failed to estimate size per record", zap.Error(err)) + return nil, err + } + + if size == 0 { + return nil, errors.New("Invalid schema") + } + buffer, err := storage.NewInsertData(sch) + if err != nil { + return nil, err + } + limit := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt64() / int64(size) + if paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt64()%int64(size) != 0 { + limit++ + } + + return &InsertBuffer{ + BufferBase: BufferBase{ + rowLimit: limit, + sizeLimit: noLimit, + TimestampFrom: math.MaxUint64, + TimestampTo: 0, + }, + collSchema: sch, + buffer: buffer, + }, nil +} + +func (ib *InsertBuffer) Renew() *storage.InsertData { + if ib.IsEmpty() { + return nil + } + result := ib.buffer + + // no error since validated in constructor + ib.buffer, _ = storage.NewInsertData(ib.collSchema) + ib.BufferBase.rows = 0 + ib.BufferBase.TimestampFrom = math.MaxUint64 + ib.BufferBase.TimestampTo = 0 + + return result +} + +func (ib *InsertBuffer) Buffer(msgs []*msgstream.InsertMsg, startPos, endPos *msgpb.MsgPosition) ([]storage.FieldData, error) { + pkData := make([]storage.FieldData, 0, len(msgs)) + for _, msg := range msgs { + tmpBuffer, err := storage.InsertMsgToInsertData(msg, ib.collSchema) + if err != nil { + log.Warn("failed to transfer insert msg to insert data", zap.Error(err)) + return nil, err + } + + pkFieldData, err := storage.GetPkFromInsertData(ib.collSchema, tmpBuffer) + if err != nil { + return nil, err + } + if pkFieldData.RowNum() != tmpBuffer.GetRowNum() { + return nil, merr.WrapErrServiceInternal("pk column row num not match") + } + pkData = append(pkData, pkFieldData) + + storage.MergeInsertData(ib.buffer, tmpBuffer) + + tsData, err := storage.GetTimestampFromInsertData(tmpBuffer) + if err != nil { + log.Warn("no timestamp field found in insert msg", zap.Error(err)) + return nil, err + } + + // update buffer size + ib.UpdateStatistics(int64(tmpBuffer.GetRowNum()), 0, ib.getTimestampRange(tsData), startPos, endPos) + } + return pkData, nil +} + +func (ib *InsertBuffer) getTimestampRange(tsData *storage.Int64FieldData) TimeRange { + tr := TimeRange{ + timestampMin: math.MaxUint64, + timestampMax: 0, + } + + for _, data := range tsData.Data { + if uint64(data) < tr.timestampMin { + tr.timestampMin = typeutil.Timestamp(data) + } + if uint64(data) > tr.timestampMax { + tr.timestampMax = typeutil.Timestamp(data) + } + } + return tr +} diff --git a/internal/datanode/writebuffer/insert_buffer_test.go b/internal/datanode/writebuffer/insert_buffer_test.go new file mode 100644 index 0000000000..abf53a38a3 --- /dev/null +++ b/internal/datanode/writebuffer/insert_buffer_test.go @@ -0,0 +1,294 @@ +package writebuffer + +import ( + "math/rand" + "testing" + "time" + + "github.com/samber/lo" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +type InsertBufferSuite struct { + suite.Suite + collSchema *schemapb.CollectionSchema +} + +func (s *InsertBufferSuite) SetupSuite() { + paramtable.Get().Init(paramtable.NewBaseTable()) + s.collSchema = &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + } +} + +func (s *InsertBufferSuite) composeInsertMsg(rowCount int, dim int) ([]int64, *msgstream.InsertMsg) { + tss := lo.RepeatBy(rowCount, func(idx int) int64 { return int64(tsoutil.ComposeTSByTime(time.Now(), int64(idx))) }) + vectors := lo.RepeatBy(rowCount, func(_ int) []float32 { + return lo.RepeatBy(dim, func(_ int) float32 { return rand.Float32() }) + }) + flatten := lo.Flatten(vectors) + return tss, &msgstream.InsertMsg{ + InsertRequest: msgpb.InsertRequest{ + Version: msgpb.InsertDataVersion_ColumnBased, + RowIDs: tss, + Timestamps: lo.Map(tss, func(id int64, _ int) uint64 { return uint64(id) }), + FieldsData: []*schemapb.FieldData{ + { + FieldId: common.RowIDField, FieldName: common.RowIDFieldName, Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: tss, + }, + }, + }, + }, + }, + { + FieldId: common.TimeStampField, FieldName: common.TimeStampFieldName, Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: tss, + }, + }, + }, + }, + }, + { + FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: tss, + }, + }, + }, + }, + }, + { + FieldId: common.StartOfUserFieldID + 1, FieldName: "vector", Type: schemapb.DataType_FloatVector, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: int64(dim), + Data: &schemapb.VectorField_FloatVector{ + FloatVector: &schemapb.FloatArray{ + Data: flatten, + }, + }, + }, + }, + }, + }, + }, + } +} + +func (s *InsertBufferSuite) TestBasic() { + s.Run("normal_insertbuffer", func() { + insertBuffer, err := NewInsertBuffer(s.collSchema) + s.Require().NoError(err) + + s.True(insertBuffer.IsEmpty()) + s.False(insertBuffer.IsFull()) + + insertBuffer.rows = insertBuffer.rowLimit + 1 + s.True(insertBuffer.IsFull()) + s.False(insertBuffer.IsEmpty()) + }) +} + +func (s *InsertBufferSuite) TestBuffer() { + s.Run("normal_buffer", func() { + pks, insertMsg := s.composeInsertMsg(10, 128) + + insertBuffer, err := NewInsertBuffer(s.collSchema) + s.Require().NoError(err) + + fieldData, err := insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.NoError(err) + + pkData := lo.Map(fieldData, func(fd storage.FieldData, _ int) []int64 { + return lo.RepeatBy(fd.RowNum(), func(idx int) int64 { return fd.GetRow(idx).(int64) }) + }) + s.ElementsMatch(pks, lo.Flatten(pkData)) + s.EqualValues(100, insertBuffer.MinTimestamp()) + }) + + s.Run("pk_not_found", func() { + _, insertMsg := s.composeInsertMsg(10, 128) + + insertMsg.FieldsData = []*schemapb.FieldData{insertMsg.FieldsData[0], insertMsg.FieldsData[1], insertMsg.FieldsData[3]} + + insertBuffer, err := NewInsertBuffer(s.collSchema) + s.Require().NoError(err) + + _, err = insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.Error(err) + }) + + s.Run("schema_without_pk", func() { + badSchema := &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + } + + _, insertMsg := s.composeInsertMsg(10, 128) + + insertBuffer, err := NewInsertBuffer(badSchema) + s.Require().NoError(err) + + _, err = insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.Error(err) + }) +} + +func (s *InsertBufferSuite) TestRenew() { + insertBuffer, err := NewInsertBuffer(s.collSchema) + s.Require().NoError(err) + + result := insertBuffer.Renew() + s.Nil(result) + s.True(insertBuffer.IsEmpty()) + + pks, insertMsg := s.composeInsertMsg(10, 128) + _, err = insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.Require().NoError(err) + + result = insertBuffer.Renew() + s.NotNil(result) + s.True(insertBuffer.IsEmpty()) + + pkField, ok := result.Data[common.StartOfUserFieldID] + s.Require().True(ok) + pkData := lo.RepeatBy(pkField.RowNum(), func(idx int) int64 { return pkField.GetRow(idx).(int64) }) + s.ElementsMatch(pks, pkData) +} + +type InsertBufferConstructSuite struct { + suite.Suite + schema *schemapb.CollectionSchema +} + +func (*InsertBufferConstructSuite) SetupSuite() { + paramtable.Get().Init(paramtable.NewBaseTable()) +} + +func (s *InsertBufferConstructSuite) TestCreateSuccess() { + schema := &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + } + insertBuffer, err := NewInsertBuffer(schema) + s.NoError(err) + s.NotNil(insertBuffer) +} + +func (s *InsertBufferConstructSuite) TestCreateFailure() { + type testCase struct { + schema *schemapb.CollectionSchema + tag string + } + + cases := []testCase{ + { + tag: "undefined_datatype", + schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + {FieldID: 101, Name: "vector", DataType: -1}, + }, + }, + }, + { + tag: "mssing_maxlength", + schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + {FieldID: 101, Name: "string", DataType: schemapb.DataType_VarChar}, + }, + }, + }, + { + tag: "empty_schema", + schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{}, + }, + }, + { + tag: "missing_type_param", + schema: &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + }, + }, + }, + }, + } + for _, tc := range cases { + s.Run(tc.tag, func() { + _, err := NewInsertBuffer(tc.schema) + s.Error(err) + }) + } +} + +func TestInsertBuffer(t *testing.T) { + suite.Run(t, new(InsertBufferSuite)) + suite.Run(t, new(InsertBufferConstructSuite)) +} diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go new file mode 100644 index 0000000000..92683dd540 --- /dev/null +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -0,0 +1,83 @@ +package writebuffer + +import ( + "context" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/retry" +) + +type l0WriteBuffer struct { + *writeBufferBase + + l0Segments map[int64]int64 // partitionID => l0 segment ID + + syncMgr syncmgr.SyncManager + idAllocator allocator.Interface +} + +func NewL0WriteBuffer(sch *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) { + if option.idAllocator == nil { + return nil, merr.WrapErrServiceInternal("id allocator is nil when creating l0 write buffer") + } + return &l0WriteBuffer{ + l0Segments: make(map[int64]int64), + writeBufferBase: newWriteBufferBase(sch, metacache, syncMgr, option), + syncMgr: syncMgr, + idAllocator: option.idAllocator, + }, nil +} + +func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { + wb.mut.Lock() + defer wb.mut.Unlock() + + // process insert msgs + _, err := wb.bufferInsert(insertMsgs, startPos, endPos) + if err != nil { + log.Warn("failed to buffer insert data", zap.Error(err)) + return err + } + + for _, msg := range deleteMsgs { + l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID()) + pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys()) + err := wb.bufferDelete(l0SegmentID, pks, msg.GetTimestamps(), startPos, endPos) + if err != nil { + log.Warn("failed to buffer delete data", zap.Error(err)) + return err + } + } + + // update buffer last checkpoint + wb.checkpoint = endPos + + return wb.triggerAutoSync() +} + +func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64) int64 { + segmentID, ok := wb.l0Segments[partitionID] + if !ok { + err := retry.Do(context.Background(), func() error { + var err error + segmentID, err = wb.idAllocator.AllocOne() + return err + }) + if err != nil { + log.Error("failed to allocate l0 segment ID", zap.Error(err)) + panic(err) + } + wb.l0Segments[partitionID] = segmentID + } + return segmentID +} diff --git a/internal/datanode/writebuffer/l0_write_buffer_test.go b/internal/datanode/writebuffer/l0_write_buffer_test.go new file mode 100644 index 0000000000..ed1823781a --- /dev/null +++ b/internal/datanode/writebuffer/l0_write_buffer_test.go @@ -0,0 +1,158 @@ +package writebuffer + +import ( + "math/rand" + "testing" + "time" + + "github.com/samber/lo" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +type L0WriteBufferSuite struct { + suite.Suite + collSchema *schemapb.CollectionSchema + syncMgr *syncmgr.MockSyncManager + metacache *metacache.MockMetaCache + allocator *allocator.MockGIDAllocator +} + +func (s *L0WriteBufferSuite) SetupSuite() { + paramtable.Get().Init(paramtable.NewBaseTable()) + s.collSchema = &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + } +} + +func (s *L0WriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int) ([]int64, *msgstream.InsertMsg) { + tss := lo.RepeatBy(rowCount, func(idx int) int64 { return int64(tsoutil.ComposeTSByTime(time.Now(), int64(idx))) }) + vectors := lo.RepeatBy(rowCount, func(_ int) []float32 { + return lo.RepeatBy(dim, func(_ int) float32 { return rand.Float32() }) + }) + flatten := lo.Flatten(vectors) + return tss, &msgstream.InsertMsg{ + InsertRequest: msgpb.InsertRequest{ + SegmentID: segmentID, + Version: msgpb.InsertDataVersion_ColumnBased, + RowIDs: tss, + Timestamps: lo.Map(tss, func(id int64, _ int) uint64 { return uint64(id) }), + FieldsData: []*schemapb.FieldData{ + { + FieldId: common.RowIDField, FieldName: common.RowIDFieldName, Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: tss, + }, + }, + }, + }, + }, + { + FieldId: common.TimeStampField, FieldName: common.TimeStampFieldName, Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: tss, + }, + }, + }, + }, + }, + { + FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: tss, + }, + }, + }, + }, + }, + { + FieldId: common.StartOfUserFieldID + 1, FieldName: "vector", Type: schemapb.DataType_FloatVector, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: int64(dim), + Data: &schemapb.VectorField_FloatVector{ + FloatVector: &schemapb.FloatArray{ + Data: flatten, + }, + }, + }, + }, + }, + }, + }, + } +} + +func (s *L0WriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstream.DeleteMsg { + delMsg := &msgstream.DeleteMsg{ + DeleteRequest: msgpb.DeleteRequest{ + PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks), + Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) }), + }, + } + return delMsg +} + +func (s *L0WriteBufferSuite) SetupTest() { + s.syncMgr = syncmgr.NewMockSyncManager(s.T()) + s.metacache = metacache.NewMockMetaCache(s.T()) + s.allocator = allocator.NewMockGIDAllocator() + s.allocator.AllocOneF = func() (int64, error) { return int64(tsoutil.ComposeTSByTime(time.Now(), 0)), nil } +} + +func (s *L0WriteBufferSuite) TestBufferData() { + wb, err := NewL0WriteBuffer(s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{ + idAllocator: s.allocator, + }) + s.NoError(err) + + // seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) + // s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) + + pks, msg := s.composeInsertMsg(1000, 10, 128) + delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + + err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) + s.NoError(err) +} + +func TestL0WriteBuffer(t *testing.T) { + suite.Run(t, new(L0WriteBufferSuite)) +} diff --git a/internal/datanode/writebuffer/manager.go b/internal/datanode/writebuffer/manager.go new file mode 100644 index 0000000000..4c794c22b7 --- /dev/null +++ b/internal/datanode/writebuffer/manager.go @@ -0,0 +1,105 @@ +package writebuffer + +import ( + "context" + "sync" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +// Manager is the interface for WriteBuffer management. +type Manager interface { + // Register adds a WriteBuffer with provided schema & options. + Register(channel string, schema *schemapb.CollectionSchema, metacache metacache.MetaCache, opts ...WriteBufferOption) error + // FlushSegments notifies writeBuffer corresponding to provided channel to flush segments. + FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error + // RemoveChannel removes a write buffer from manager. + RemoveChannel(channel string) + // BufferData put data into channel write buffer. + BufferData(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error +} + +// NewManager returns initialized manager as `Manager` +func NewManager(syncMgr syncmgr.SyncManager) Manager { + return &manager{ + syncMgr: syncMgr, + buffers: make(map[string]WriteBuffer), + } +} + +type manager struct { + syncMgr syncmgr.SyncManager + buffers map[string]WriteBuffer + mut sync.RWMutex +} + +// Register a new WriteBuffer for channel. +func (m *manager) Register(channel string, schema *schemapb.CollectionSchema, metacache metacache.MetaCache, opts ...WriteBufferOption) error { + m.mut.Lock() + defer m.mut.Unlock() + + _, ok := m.buffers[channel] + if ok { + return merr.WrapErrChannelReduplicate(channel) + } + buf, err := NewWriteBuffer(schema, metacache, m.syncMgr, opts...) + if err != nil { + return err + } + m.buffers[channel] = buf + return nil +} + +// FlushSegments call sync segment and change segments state to Flushed. +func (m *manager) FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error { + m.mut.RLock() + buf, ok := m.buffers[channel] + m.mut.RUnlock() + + if !ok { + log.Ctx(ctx).Warn("write buffer not found when flush segments", + zap.String("channel", channel), + zap.Int64s("segmentIDs", segmentIDs)) + return merr.WrapErrChannelNotFound(channel) + } + + return buf.FlushSegments(ctx, segmentIDs) +} + +// BufferData put data into channel write buffer. +func (m *manager) BufferData(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { + m.mut.RLock() + buf, ok := m.buffers[channel] + m.mut.RUnlock() + + if !ok { + log.Ctx(context.Background()).Warn("write buffer not found when buffer data", + zap.String("channel", channel)) + return merr.WrapErrChannelNotFound(channel) + } + + return buf.BufferData(insertMsgs, deleteMsgs, startPos, endPos) +} + +// RemoveChannel remove channel WriteBuffer from manager. +func (m *manager) RemoveChannel(channel string) { + m.mut.Lock() + buf, ok := m.buffers[channel] + delete(m.buffers, channel) + m.mut.Unlock() + + if !ok { + log.Warn("failed to remove channel, channel not maintained in manager", zap.String("channel", channel)) + return + } + + buf.Close() +} diff --git a/internal/datanode/writebuffer/manager_test.go b/internal/datanode/writebuffer/manager_test.go new file mode 100644 index 0000000000..204d7a8f50 --- /dev/null +++ b/internal/datanode/writebuffer/manager_test.go @@ -0,0 +1,138 @@ +package writebuffer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +type ManagerSuite struct { + suite.Suite + channelName string + collSchema *schemapb.CollectionSchema + syncMgr *syncmgr.MockSyncManager + metacache *metacache.MockMetaCache + + manager *manager +} + +func (s *ManagerSuite) SetupSuite() { + paramtable.Get().Init(paramtable.NewBaseTable()) + s.collSchema = &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + } + + s.channelName = "by-dev-rootcoord-dml_0_100_v0" +} + +func (s *ManagerSuite) SetupTest() { + s.syncMgr = syncmgr.NewMockSyncManager(s.T()) + s.metacache = metacache.NewMockMetaCache(s.T()) + + mgr := NewManager(s.syncMgr) + var ok bool + s.manager, ok = mgr.(*manager) + s.Require().True(ok) +} + +func (s *ManagerSuite) TestRegister() { + manager := s.manager + + err := manager.Register(s.channelName, s.collSchema, s.metacache) + s.NoError(err) + + err = manager.Register(s.channelName, s.collSchema, s.metacache) + s.Error(err) + s.ErrorIs(err, merr.ErrChannelReduplicate) +} + +func (s *ManagerSuite) TestFlushSegments() { + manager := s.manager + s.Run("channel_not_found", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := manager.FlushSegments(ctx, s.channelName, []int64{1, 2, 3}) + s.Error(err, "FlushSegments shall return error when channel not found") + }) + + s.Run("normal_flush", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wb := NewMockWriteBuffer(s.T()) + + s.manager.mut.Lock() + s.manager.buffers[s.channelName] = wb + s.manager.mut.Unlock() + + wb.EXPECT().FlushSegments(mock.Anything, mock.Anything).Return(nil) + + err := manager.FlushSegments(ctx, s.channelName, []int64{1}) + s.NoError(err) + }) +} + +func (s *ManagerSuite) TestBufferData() { + manager := s.manager + s.Run("channel_not_found", func() { + err := manager.BufferData(s.channelName, nil, nil, nil, nil) + s.Error(err, "FlushSegments shall return error when channel not found") + }) + + s.Run("normal_buffer_data", func() { + wb := NewMockWriteBuffer(s.T()) + + s.manager.mut.Lock() + s.manager.buffers[s.channelName] = wb + s.manager.mut.Unlock() + + wb.EXPECT().BufferData(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + err := manager.BufferData(s.channelName, nil, nil, nil, nil) + s.NoError(err) + }) +} + +func (s *ManagerSuite) TestRemoveChannel() { + manager := NewManager(s.syncMgr) + + s.Run("remove_not_exist", func() { + s.NotPanics(func() { + manager.RemoveChannel(s.channelName) + }) + }) + + s.Run("remove_channel", func() { + err := manager.Register(s.channelName, s.collSchema, s.metacache) + s.Require().NoError(err) + + s.NotPanics(func() { + manager.RemoveChannel(s.channelName) + }) + }) +} + +func TestManager(t *testing.T) { + suite.Run(t, new(ManagerSuite)) +} diff --git a/internal/datanode/writebuffer/mock_write_buffer.go b/internal/datanode/writebuffer/mock_write_buffer.go new file mode 100644 index 0000000000..21a81a6185 --- /dev/null +++ b/internal/datanode/writebuffer/mock_write_buffer.go @@ -0,0 +1,201 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package writebuffer + +import ( + context "context" + + msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + mock "github.com/stretchr/testify/mock" + + msgstream "github.com/milvus-io/milvus/pkg/mq/msgstream" +) + +// MockWriteBuffer is an autogenerated mock type for the WriteBuffer type +type MockWriteBuffer struct { + mock.Mock +} + +type MockWriteBuffer_Expecter struct { + mock *mock.Mock +} + +func (_m *MockWriteBuffer) EXPECT() *MockWriteBuffer_Expecter { + return &MockWriteBuffer_Expecter{mock: &_m.Mock} +} + +// BufferData provides a mock function with given fields: insertMsgs, deleteMsgs, startPos, endPos +func (_m *MockWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition) error { + ret := _m.Called(insertMsgs, deleteMsgs, startPos, endPos) + + var r0 error + if rf, ok := ret.Get(0).(func([]*msgstream.InsertMsg, []*msgstream.DeleteMsg, *msgpb.MsgPosition, *msgpb.MsgPosition) error); ok { + r0 = rf(insertMsgs, deleteMsgs, startPos, endPos) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWriteBuffer_BufferData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BufferData' +type MockWriteBuffer_BufferData_Call struct { + *mock.Call +} + +// BufferData is a helper method to define mock.On call +// - insertMsgs []*msgstream.InsertMsg +// - deleteMsgs []*msgstream.DeleteMsg +// - startPos *msgpb.MsgPosition +// - endPos *msgpb.MsgPosition +func (_e *MockWriteBuffer_Expecter) BufferData(insertMsgs interface{}, deleteMsgs interface{}, startPos interface{}, endPos interface{}) *MockWriteBuffer_BufferData_Call { + return &MockWriteBuffer_BufferData_Call{Call: _e.mock.On("BufferData", insertMsgs, deleteMsgs, startPos, endPos)} +} + +func (_c *MockWriteBuffer_BufferData_Call) Run(run func(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition)) *MockWriteBuffer_BufferData_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]*msgstream.InsertMsg), args[1].([]*msgstream.DeleteMsg), args[2].(*msgpb.MsgPosition), args[3].(*msgpb.MsgPosition)) + }) + return _c +} + +func (_c *MockWriteBuffer_BufferData_Call) Return(_a0 error) *MockWriteBuffer_BufferData_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWriteBuffer_BufferData_Call) RunAndReturn(run func([]*msgstream.InsertMsg, []*msgstream.DeleteMsg, *msgpb.MsgPosition, *msgpb.MsgPosition) error) *MockWriteBuffer_BufferData_Call { + _c.Call.Return(run) + return _c +} + +// Close provides a mock function with given fields: +func (_m *MockWriteBuffer) Close() { + _m.Called() +} + +// MockWriteBuffer_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockWriteBuffer_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockWriteBuffer_Expecter) Close() *MockWriteBuffer_Close_Call { + return &MockWriteBuffer_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockWriteBuffer_Close_Call) Run(run func()) *MockWriteBuffer_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWriteBuffer_Close_Call) Return() *MockWriteBuffer_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockWriteBuffer_Close_Call) RunAndReturn(run func()) *MockWriteBuffer_Close_Call { + _c.Call.Return(run) + return _c +} + +// FlushSegments provides a mock function with given fields: ctx, segmentIDs +func (_m *MockWriteBuffer) FlushSegments(ctx context.Context, segmentIDs []int64) error { + ret := _m.Called(ctx, segmentIDs) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok { + r0 = rf(ctx, segmentIDs) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWriteBuffer_FlushSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FlushSegments' +type MockWriteBuffer_FlushSegments_Call struct { + *mock.Call +} + +// FlushSegments is a helper method to define mock.On call +// - ctx context.Context +// - segmentIDs []int64 +func (_e *MockWriteBuffer_Expecter) FlushSegments(ctx interface{}, segmentIDs interface{}) *MockWriteBuffer_FlushSegments_Call { + return &MockWriteBuffer_FlushSegments_Call{Call: _e.mock.On("FlushSegments", ctx, segmentIDs)} +} + +func (_c *MockWriteBuffer_FlushSegments_Call) Run(run func(ctx context.Context, segmentIDs []int64)) *MockWriteBuffer_FlushSegments_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]int64)) + }) + return _c +} + +func (_c *MockWriteBuffer_FlushSegments_Call) Return(_a0 error) *MockWriteBuffer_FlushSegments_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWriteBuffer_FlushSegments_Call) RunAndReturn(run func(context.Context, []int64) error) *MockWriteBuffer_FlushSegments_Call { + _c.Call.Return(run) + return _c +} + +// HasSegment provides a mock function with given fields: segmentID +func (_m *MockWriteBuffer) HasSegment(segmentID int64) bool { + ret := _m.Called(segmentID) + + var r0 bool + if rf, ok := ret.Get(0).(func(int64) bool); ok { + r0 = rf(segmentID) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockWriteBuffer_HasSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HasSegment' +type MockWriteBuffer_HasSegment_Call struct { + *mock.Call +} + +// HasSegment is a helper method to define mock.On call +// - segmentID int64 +func (_e *MockWriteBuffer_Expecter) HasSegment(segmentID interface{}) *MockWriteBuffer_HasSegment_Call { + return &MockWriteBuffer_HasSegment_Call{Call: _e.mock.On("HasSegment", segmentID)} +} + +func (_c *MockWriteBuffer_HasSegment_Call) Run(run func(segmentID int64)) *MockWriteBuffer_HasSegment_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockWriteBuffer_HasSegment_Call) Return(_a0 bool) *MockWriteBuffer_HasSegment_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWriteBuffer_HasSegment_Call) RunAndReturn(run func(int64) bool) *MockWriteBuffer_HasSegment_Call { + _c.Call.Return(run) + return _c +} + +// NewMockWriteBuffer creates a new instance of MockWriteBuffer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockWriteBuffer(t interface { + mock.TestingT + Cleanup(func()) +}) *MockWriteBuffer { + mock := &MockWriteBuffer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datanode/writebuffer/options.go b/internal/datanode/writebuffer/options.go new file mode 100644 index 0000000000..3e25f8f3e8 --- /dev/null +++ b/internal/datanode/writebuffer/options.go @@ -0,0 +1,70 @@ +package writebuffer + +import ( + "time" + + "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/datanode/broker" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +const ( + // DeletePolicyBFPKOracle is the const config value for using bf pk oracle as delete policy + DeletePolicyBFPkOracle = `bloom_filter_pkoracle` + + // DeletePolicyL0Delta is the const config value for using L0 delta as deleta policy. + DeletePolicyL0Delta = `l0_delta` +) + +type WriteBufferOption func(opt *writeBufferOption) + +type writeBufferOption struct { + deletePolicy string + idAllocator allocator.Interface + syncPolicies []SyncPolicy + + pkStatsFactory metacache.PkStatsFactory + broker broker.Broker +} + +func defaultWBOption() *writeBufferOption { + return &writeBufferOption{ + // TODO use l0 delta as default after implementation. + deletePolicy: paramtable.Get().DataNodeCfg.DeltaPolicy.GetValue(), + syncPolicies: []SyncPolicy{ + SyncFullBuffer, + GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), + }, + } +} + +func WithDeletePolicy(policy string) WriteBufferOption { + return func(opt *writeBufferOption) { + opt.deletePolicy = policy + } +} + +func WithIDAllocator(allocator allocator.Interface) WriteBufferOption { + return func(opt *writeBufferOption) { + opt.idAllocator = allocator + } +} + +func WithPKStatsFactory(factory metacache.PkStatsFactory) WriteBufferOption { + return func(opt *writeBufferOption) { + opt.pkStatsFactory = factory + } +} + +func WithBroker(broker broker.Broker) WriteBufferOption { + return func(opt *writeBufferOption) { + opt.broker = broker + } +} + +func WithSyncPolicy(policy SyncPolicy) WriteBufferOption { + return func(opt *writeBufferOption) { + opt.syncPolicies = append(opt.syncPolicies, policy) + } +} diff --git a/internal/datanode/writebuffer/segment_buffer.go b/internal/datanode/writebuffer/segment_buffer.go new file mode 100644 index 0000000000..871ebc0ed6 --- /dev/null +++ b/internal/datanode/writebuffer/segment_buffer.go @@ -0,0 +1,56 @@ +package writebuffer + +import ( + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type segmentBuffer struct { + segmentID int64 + + insertBuffer *InsertBuffer + deltaBuffer *DeltaBuffer + + flushing bool +} + +func newSegmentBuffer(segmentID int64, collSchema *schemapb.CollectionSchema) (*segmentBuffer, error) { + insertBuffer, err := NewInsertBuffer(collSchema) + if err != nil { + return nil, err + } + return &segmentBuffer{ + segmentID: segmentID, + insertBuffer: insertBuffer, + deltaBuffer: NewDeltaBuffer(), + }, nil +} + +func (buf *segmentBuffer) IsFull() bool { + return buf.insertBuffer.IsFull() || buf.deltaBuffer.IsFull() +} + +func (buf *segmentBuffer) Renew() (insert *storage.InsertData, delete *storage.DeleteData) { + return buf.insertBuffer.Renew(), buf.deltaBuffer.Renew() +} + +func (buf *segmentBuffer) SetFlush() { + buf.flushing = true +} + +func (buf *segmentBuffer) MinTimestamp() typeutil.Timestamp { + insertTs := buf.insertBuffer.MinTimestamp() + deltaTs := buf.deltaBuffer.MinTimestamp() + + if insertTs < deltaTs { + return insertTs + } + return deltaTs +} + +// TimeRange is a range of timestamp contains the min-timestamp and max-timestamp +type TimeRange struct { + timestampMin typeutil.Timestamp + timestampMax typeutil.Timestamp +} diff --git a/internal/datanode/writebuffer/sync_policy.go b/internal/datanode/writebuffer/sync_policy.go new file mode 100644 index 0000000000..b7ef915f40 --- /dev/null +++ b/internal/datanode/writebuffer/sync_policy.go @@ -0,0 +1,38 @@ +package writebuffer + +import ( + "time" + + "github.com/samber/lo" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/pkg/util/tsoutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type SyncPolicy func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 + +func SyncFullBuffer(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 { + return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) { + return buf.segmentID, buf.IsFull() + }) +} + +func GetSyncStaleBufferPolicy(staleDuration time.Duration) SyncPolicy { + return func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 { + current := tsoutil.PhysicalTime(ts) + return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) { + minTs := buf.MinTimestamp() + start := tsoutil.PhysicalTime(minTs) + + return buf.segmentID, current.Sub(start) > staleDuration + }) + } +} + +func GetFlushingSegmentsPolicy(meta metacache.MetaCache) SyncPolicy { + return func(_ []*segmentBuffer, _ typeutil.Timestamp) []int64 { + return meta.GetSegmentIDsBy(metacache.WithSegmentState(commonpb.SegmentState_Flushing)) + } +} diff --git a/internal/datanode/writebuffer/sync_policy_test.go b/internal/datanode/writebuffer/sync_policy_test.go new file mode 100644 index 0000000000..beb7e92958 --- /dev/null +++ b/internal/datanode/writebuffer/sync_policy_test.go @@ -0,0 +1,80 @@ +package writebuffer + +import ( + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +type SyncPolicySuite struct { + suite.Suite + collSchema *schemapb.CollectionSchema +} + +func (s *SyncPolicySuite) SetupSuite() { + paramtable.Get().Init(paramtable.NewBaseTable()) + + s.collSchema = &schemapb.CollectionSchema{ + Name: "wb_base_collection", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"}, + {FieldID: 101, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }}, + }, + } +} + +func (s *SyncPolicySuite) TestSyncFullBuffer() { + buffer, err := newSegmentBuffer(100, s.collSchema) + s.Require().NoError(err) + + ids := SyncFullBuffer([]*segmentBuffer{buffer}, 0) + s.Equal(0, len(ids), "empty buffer shall not be synced") + + buffer.insertBuffer.rows = buffer.insertBuffer.rowLimit + 1 + + ids = SyncFullBuffer([]*segmentBuffer{buffer}, 0) + s.ElementsMatch([]int64{100}, ids) +} + +func (s *SyncPolicySuite) TestSyncStalePolicy() { + policy := GetSyncStaleBufferPolicy(time.Minute) + + buffer, err := newSegmentBuffer(100, s.collSchema) + s.Require().NoError(err) + + ids := policy([]*segmentBuffer{buffer}, tsoutil.ComposeTSByTime(time.Now(), 0)) + s.Equal(0, len(ids), "empty buffer shall not be synced") + + buffer.insertBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: tsoutil.ComposeTSByTime(time.Now().Add(-time.Minute*2), 0), + } + + ids = policy([]*segmentBuffer{buffer}, tsoutil.ComposeTSByTime(time.Now(), 0)) + s.ElementsMatch([]int64{100}, ids) +} + +func (s *SyncPolicySuite) TestFlushingSegmentsPolicy() { + metacache := metacache.NewMockMetaCache(s.T()) + policy := GetFlushingSegmentsPolicy(metacache) + ids := []int64{1, 2, 3} + metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return(ids) + + result := policy([]*segmentBuffer{}, tsoutil.ComposeTSByTime(time.Now(), 0)) + s.ElementsMatch(ids, result) +} + +func TestSyncPolicy(t *testing.T) { + suite.Run(t, new(SyncPolicySuite)) +} diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go new file mode 100644 index 0000000000..a0e989f7d7 --- /dev/null +++ b/internal/datanode/writebuffer/write_buffer.go @@ -0,0 +1,226 @@ +package writebuffer + +import ( + "context" + "sync" + + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/broker" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// WriteBuffer is the interface for channel write buffer. +// It provides abstraction for channel write buffer and pk bloom filter & L0 delta logic. +type WriteBuffer interface { + // HasSegment checks whether certain segment exists in this buffer. + HasSegment(segmentID int64) bool + // BufferData is the method to buffer dml data msgs. + BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error + // FlushSegments is the method to perform `Sync` operation with provided options. + FlushSegments(ctx context.Context, segmentIDs []int64) error + // Close is the method to close and sink current buffer data. + Close() +} + +func NewWriteBuffer(schema *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) { + option := defaultWBOption() + option.syncPolicies = append(option.syncPolicies, GetFlushingSegmentsPolicy(metacache)) + for _, opt := range opts { + opt(option) + } + + switch option.deletePolicy { + case DeletePolicyBFPkOracle: + return NewBFWriteBuffer(schema, metacache, syncMgr, option) + case DeletePolicyL0Delta: + return NewL0WriteBuffer(schema, metacache, syncMgr, option) + default: + return nil, merr.WrapErrParameterInvalid("valid delete policy config", option.deletePolicy) + } +} + +// writeBufferBase is the common component for buffering data +type writeBufferBase struct { + mut sync.RWMutex + + collectionID int64 + + collSchema *schemapb.CollectionSchema + metaCache metacache.MetaCache + syncMgr syncmgr.SyncManager + broker broker.Broker + buffers map[int64]*segmentBuffer // segmentID => segmentBuffer + + syncPolicies []SyncPolicy + checkpoint *msgpb.MsgPosition +} + +func newWriteBufferBase(sch *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) *writeBufferBase { + return &writeBufferBase{ + collSchema: sch, + syncMgr: syncMgr, + broker: option.broker, + buffers: make(map[int64]*segmentBuffer), + metaCache: metacache, + syncPolicies: option.syncPolicies, + } +} + +func (wb *writeBufferBase) HasSegment(segmentID int64) bool { + wb.mut.RLock() + defer wb.mut.RUnlock() + + _, ok := wb.buffers[segmentID] + return ok +} + +func (wb *writeBufferBase) FlushSegments(ctx context.Context, segmentIDs []int64) error { + wb.mut.RLock() + defer wb.mut.RUnlock() + + return wb.flushSegments(ctx, segmentIDs) +} + +func (wb *writeBufferBase) triggerAutoSync() error { + segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp()) + if len(segmentsToSync) > 0 { + log.Info("write buffer get segments to sync", zap.Int64s("segmentIDs", segmentsToSync)) + err := wb.syncSegments(context.Background(), segmentsToSync) + if err != nil { + log.Warn("segment segments failed", zap.Int64s("segmentIDs", segmentsToSync), zap.Error(err)) + return err + } + } + + return nil +} + +func (wb *writeBufferBase) flushSegments(ctx context.Context, segmentIDs []int64) error { + // mark segment flushing if segment was growing + wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushing), + metacache.WithSegmentIDs(segmentIDs...), + metacache.WithSegmentState(commonpb.SegmentState_Growing)) + return nil +} + +func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) error { + log := log.Ctx(ctx) + for _, segmentID := range segmentIDs { + infos := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID)) + if len(infos) == 0 { + log.Warn("segment info not found in meta cache", zap.Int64("segmentID", segmentID)) + continue + } + segmentInfo := infos[0] + + buffer, exist := wb.getBuffer(segmentID) + + var insert *storage.InsertData + var delta *storage.DeleteData + if exist { + insert, delta = buffer.Renew() + } + + wb.metaCache.UpdateSegments(metacache.RollStats(), metacache.WithSegmentIDs(segmentID)) + + syncTask := syncmgr.NewSyncTask(). + WithInsertData(insert). + WithDeleteData(delta). + WithCollectionID(wb.collectionID). + WithPartitionID(segmentInfo.PartitionID()). + WithSegmentID(segmentID). + WithCheckpoint(wb.checkpoint). + WithSchema(wb.collSchema). + WithMetaWriter(syncmgr.BrokerMetaWriter(wb.broker)). + WithFailureCallback(func(err error) { + // TODO could change to unsub channel in the future + panic(err) + }) + + // update flush& drop state + switch segmentInfo.State() { + case commonpb.SegmentState_Flushing: + syncTask.WithFlush() + case commonpb.SegmentState_Dropped: + syncTask.WithDrop() + } + + err := wb.syncMgr.SyncData(ctx, syncTask) + if err != nil { + return err + } + } + return nil +} + +// getSegmentsToSync applies all policies to get segments list to sync. +// **NOTE** shall be invoked within mutex protection +func (wb *writeBufferBase) getSegmentsToSync(ts typeutil.Timestamp) []int64 { + buffers := lo.Values(wb.buffers) + segments := typeutil.NewSet[int64]() + for _, policy := range wb.syncPolicies { + segments.Insert(policy(buffers, ts)...) + } + + return segments.Collect() +} + +func (wb *writeBufferBase) getOrCreateBuffer(segmentID int64) *segmentBuffer { + buffer, ok := wb.buffers[segmentID] + if !ok { + var err error + buffer, err = newSegmentBuffer(segmentID, wb.collSchema) + if err != nil { + // TODO avoid panic here + panic(err) + } + wb.buffers[segmentID] = buffer + } + + return buffer +} + +func (wb *writeBufferBase) getBuffer(segmentID int64) (*segmentBuffer, bool) { + buffer, ok := wb.buffers[segmentID] + return buffer, ok +} + +// bufferInsert transform InsertMsg into bufferred InsertData and returns primary key field data for future usage. +func (wb *writeBufferBase) bufferInsert(insertMsgs []*msgstream.InsertMsg, startPos, endPos *msgpb.MsgPosition) (map[int64][]storage.FieldData, error) { + insertGroups := lo.GroupBy(insertMsgs, func(msg *msgstream.InsertMsg) int64 { return msg.GetSegmentID() }) + segmentPKData := make(map[int64][]storage.FieldData) + + for segmentID, msgs := range insertGroups { + segBuf := wb.getOrCreateBuffer(segmentID) + + pkData, err := segBuf.insertBuffer.Buffer(msgs, startPos, endPos) + if err != nil { + log.Warn("failed to buffer insert data", zap.Int64("segmentID", segmentID), zap.Error(err)) + return nil, err + } + segmentPKData[segmentID] = pkData + } + + return segmentPKData, nil +} + +// bufferDelete buffers DeleteMsg into DeleteData. +func (wb *writeBufferBase) bufferDelete(segmentID int64, pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) error { + segBuf := wb.getOrCreateBuffer(segmentID) + segBuf.deltaBuffer.Buffer(pks, tss, startPos, endPos) + return nil +} + +func (wb *writeBufferBase) Close() { +} diff --git a/internal/datanode/writebuffer/write_buffer_test.go b/internal/datanode/writebuffer/write_buffer_test.go new file mode 100644 index 0000000000..cb47e5d551 --- /dev/null +++ b/internal/datanode/writebuffer/write_buffer_test.go @@ -0,0 +1,91 @@ +package writebuffer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +type WriteBufferSuite struct { + suite.Suite + collSchema *schemapb.CollectionSchema + wb *writeBufferBase + syncMgr *syncmgr.MockSyncManager + metacache *metacache.MockMetaCache +} + +func (s *WriteBufferSuite) SetupSuite() { + paramtable.Get().Init(paramtable.NewBaseTable()) + s.collSchema = &schemapb.CollectionSchema{ + Name: "wb_base_collection", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"}, + {FieldID: 101, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }}, + }, + } +} + +func (s *WriteBufferSuite) SetupTest() { + s.syncMgr = syncmgr.NewMockSyncManager(s.T()) + s.metacache = metacache.NewMockMetaCache(s.T()) + s.wb = newWriteBufferBase(s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{ + pkStatsFactory: func(vchannel *datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }, + }) +} + +func (s *WriteBufferSuite) TestWriteBufferType() { + wb, err := NewWriteBuffer(s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) + s.NoError(err) + + _, ok := wb.(*bfWriteBuffer) + s.True(ok) + + wb, err = NewWriteBuffer(s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyL0Delta), WithIDAllocator(allocator.NewMockGIDAllocator())) + s.NoError(err) + _, ok = wb.(*l0WriteBuffer) + s.True(ok) + + _, err = NewWriteBuffer(s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy("")) + s.Error(err) +} + +func (s *WriteBufferSuite) TestHasSegment() { + segmentID := int64(1001) + + s.False(s.wb.HasSegment(segmentID)) + + s.wb.getOrCreateBuffer(segmentID) + + s.True(s.wb.HasSegment(segmentID)) +} + +func (s *WriteBufferSuite) TestFlushSegments() { + segmentID := int64(1001) + + s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything) + + wb, err := NewWriteBuffer(s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) + s.NoError(err) + + err = wb.FlushSegments(context.Background(), []int64{segmentID}) + s.NoError(err) +} + +func TestWriteBufferBase(t *testing.T) { + suite.Run(t, new(WriteBufferSuite)) +} diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 2d4e8ccb0a..a01479f5a7 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -525,6 +525,14 @@ func WrapErrSegmentNotFound(id int64, msg ...string) error { return err } +func WrapErrSegmentsNotFound(ids []int64, msg ...string) error { + err := wrapWithField(ErrSegmentNotFound, "segments", ids) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + func WrapErrSegmentNotLoaded(id int64, msg ...string) error { err := wrapWithField(ErrSegmentNotLoaded, "segment", id) if len(msg) > 0 { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index d798bd9f8e..9da0671eb7 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2419,6 +2419,7 @@ type dataNodeConfig struct { FlushDeleteBufferBytes ParamItem `refreshable:"true"` BinLogMaxSize ParamItem `refreshable:"true"` SyncPeriod ParamItem `refreshable:"true"` + DeltaPolicy ParamItem `refreshable:"false"` // watchEvent WatchEventTicklerInterval ParamItem `refreshable:"false"` @@ -2548,6 +2549,15 @@ func (p *dataNodeConfig) init(base *BaseTable) { } p.SyncPeriod.Init(base.mgr) + p.DeltaPolicy = ParamItem{ + Key: "dataNode.segment.deltaPolicy", + Version: "2.3.4", + DefaultValue: "bloom_filter_pkoracle", + Doc: "the delta policy current datanode using", + Export: true, + } + p.DeltaPolicy.Init(base.mgr) + p.WatchEventTicklerInterval = ParamItem{ Key: "datanode.segment.watchEventTicklerInterval", Version: "2.2.3",