enhance: fix LevelZero segment sync logic (#28482)

See also #27675
- Fix LevelZero segment cannot be flushed
- Add level option for syncTask
- Invoke `AddSegment` when new LevelZero segment is allocated

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-11-17 21:46:20 +08:00 committed by GitHub
parent c948a437a9
commit 18dc6b61ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 175 additions and 176 deletions

View File

@ -295,7 +295,7 @@ func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb
resendTTCh = make(chan resendTTMsg, 100)
)
node.writeBufferManager.Register(channelName, metacache.Schema(), metacache, writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(node.broker)), writebuffer.WithIDAllocator(node.allocator))
node.writeBufferManager.Register(channelName, metacache, writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(node.broker)), writebuffer.WithIDAllocator(node.allocator))
ctx, cancel := context.WithCancel(node.ctx)
ds := &dataSyncService{
ctx: ctx,

View File

@ -19,6 +19,7 @@ package metacache
import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -57,6 +58,12 @@ func WithImporting() SegmentFilter {
}
}
func WithLevel(level datapb.SegmentLevel) SegmentFilter {
return func(info *SegmentInfo) bool {
return info.level == level
}
}
type SegmentAction func(info *SegmentInfo)
func UpdateState(state commonpb.SegmentState) SegmentAction {
@ -115,6 +122,12 @@ func FinishSyncing(batchSize int64) SegmentAction {
}
}
func SetStartPosRecorded(flag bool) SegmentAction {
return func(info *SegmentInfo) {
info.startPosRecorded = flag
}
}
// MergeSegmentAction is the util function to merge multiple SegmentActions into one.
func MergeSegmentAction(actions ...SegmentAction) SegmentAction {
return func(info *SegmentInfo) {

View File

@ -23,7 +23,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
@ -35,8 +34,6 @@ type MetaCache interface {
Collection() int64
// Schema returns collection schema.
Schema() *schemapb.CollectionSchema
// NewSegment creates a new segment from WAL stream data.
NewSegment(segmentID, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction)
// AddSegment adds a segment from segment info.
AddSegment(segInfo *datapb.SegmentInfo, factory PkStatsFactory, actions ...SegmentAction)
// UpdateSegments applies action to segment(s) satisfy the provided filters.
@ -97,25 +94,6 @@ func (c *metaCacheImpl) Schema() *schemapb.CollectionSchema {
return c.schema
}
// NewSegment creates a new segment from WAL stream data.
func (c *metaCacheImpl) NewSegment(segmentID, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.segmentInfos[segmentID]; !ok {
info := &SegmentInfo{
segmentID: segmentID,
partitionID: partitionID,
state: commonpb.SegmentState_Growing,
startPosRecorded: false,
}
for _, action := range actions {
action(info)
}
c.segmentInfos[segmentID] = info
}
}
// AddSegment adds a segment from segment info.
func (c *metaCacheImpl) AddSegment(segInfo *datapb.SegmentInfo, factory PkStatsFactory, actions ...SegmentAction) {
segment := NewSegmentInfo(segInfo, factory(segInfo))

View File

@ -100,21 +100,6 @@ func (s *MetaCacheSuite) TestMetaInfo() {
s.Equal(s.collSchema, s.cache.Schema())
}
func (s *MetaCacheSuite) TestNewSegment() {
for i, seg := range s.newSegments {
s.cache.NewSegment(seg, s.partitionIDs[i], nil, UpdateNumOfRows(100))
}
for id, partitionID := range s.partitionIDs {
segs := s.cache.GetSegmentIDsBy(WithPartitionID(partitionID))
targets := []int64{s.flushedSegments[id], s.growingSegments[id], s.newSegments[id]}
s.Equal(len(targets), len(segs))
for _, seg := range segs {
s.True(lo.Contains(targets, seg))
}
}
}
func (s *MetaCacheSuite) TestCompactSegments() {
for i, seg := range s.newSegments {
// compaction from flushed[i], unflushed[i] and invalidSeg to new[i]

View File

@ -6,8 +6,6 @@ import (
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
mock "github.com/stretchr/testify/mock"
msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
storage "github.com/milvus-io/milvus/internal/storage"
@ -350,56 +348,6 @@ func (_c *MockMetaCache_GetSegmentsBy_Call) RunAndReturn(run func(...SegmentFilt
return _c
}
// NewSegment provides a mock function with given fields: segmentID, partitionID, startPos, actions
func (_m *MockMetaCache) NewSegment(segmentID int64, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction) {
_va := make([]interface{}, len(actions))
for _i := range actions {
_va[_i] = actions[_i]
}
var _ca []interface{}
_ca = append(_ca, segmentID, partitionID, startPos)
_ca = append(_ca, _va...)
_m.Called(_ca...)
}
// MockMetaCache_NewSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NewSegment'
type MockMetaCache_NewSegment_Call struct {
*mock.Call
}
// NewSegment is a helper method to define mock.On call
// - segmentID int64
// - partitionID int64
// - startPos *msgpb.MsgPosition
// - actions ...SegmentAction
func (_e *MockMetaCache_Expecter) NewSegment(segmentID interface{}, partitionID interface{}, startPos interface{}, actions ...interface{}) *MockMetaCache_NewSegment_Call {
return &MockMetaCache_NewSegment_Call{Call: _e.mock.On("NewSegment",
append([]interface{}{segmentID, partitionID, startPos}, actions...)...)}
}
func (_c *MockMetaCache_NewSegment_Call) Run(run func(segmentID int64, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction)) *MockMetaCache_NewSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]SegmentAction, len(args)-3)
for i, a := range args[3:] {
if a != nil {
variadicArgs[i] = a.(SegmentAction)
}
}
run(args[0].(int64), args[1].(int64), args[2].(*msgpb.MsgPosition), variadicArgs...)
})
return _c
}
func (_c *MockMetaCache_NewSegment_Call) Return() *MockMetaCache_NewSegment_Call {
_c.Call.Return()
return _c
}
func (_c *MockMetaCache_NewSegment_Call) RunAndReturn(run func(int64, int64, *msgpb.MsgPosition, ...SegmentAction)) *MockMetaCache_NewSegment_Call {
_c.Call.Return(run)
return _c
}
// PredictSegments provides a mock function with given fields: pk, filters
func (_m *MockMetaCache) PredictSegments(pk storage.PrimaryKey, filters ...SegmentFilter) ([]int64, bool) {
_va := make([]interface{}, len(filters))

View File

@ -36,6 +36,7 @@ type SegmentInfo struct {
bfs *BloomFilterSet
compactTo int64
importing bool
level datapb.SegmentLevel
}
func (s *SegmentInfo) SegmentID() int64 {
@ -81,6 +82,10 @@ func (s *SegmentInfo) GetBloomFilterSet() *BloomFilterSet {
return s.bfs
}
func (s *SegmentInfo) Level() datapb.SegmentLevel {
return s.level
}
func (s *SegmentInfo) Clone() *SegmentInfo {
return &SegmentInfo{
segmentID: s.segmentID,
@ -94,10 +99,16 @@ func (s *SegmentInfo) Clone() *SegmentInfo {
syncingRows: s.syncingRows,
bfs: s.bfs,
compactTo: s.compactTo,
level: s.level,
importing: s.importing,
}
}
func NewSegmentInfo(info *datapb.SegmentInfo, bfs *BloomFilterSet) *SegmentInfo {
level := info.GetLevel()
if level == datapb.SegmentLevel_Legacy {
level = datapb.SegmentLevel_L1
}
return &SegmentInfo{
segmentID: info.GetID(),
partitionID: info.GetPartitionID(),
@ -106,6 +117,7 @@ func NewSegmentInfo(info *datapb.SegmentInfo, bfs *BloomFilterSet) *SegmentInfo
startPosition: info.GetStartPosition(),
checkpoint: info.GetDmlPosition(),
startPosRecorded: true,
level: level,
bfs: bfs,
}
}

View File

@ -240,9 +240,17 @@ func (s *DataNodeServicesSuite) TestFlushSegments() {
fgservice, ok := s.node.flowgraphManager.getFlowgraphService(dmChannelName)
s.Require().True(ok)
s.node.writeBufferManager.Register(dmChannelName, schema, metacache.NewMockMetaCache(s.T()))
metaCache := metacache.NewMockMetaCache(s.T())
metaCache.EXPECT().Collection().Return(1).Maybe()
metaCache.EXPECT().Schema().Return(schema).Maybe()
s.node.writeBufferManager.Register(dmChannelName, metaCache)
fgservice.metacache.NewSegment(segmentID, 1, &msgpb.MsgPosition{})
fgservice.metacache.AddSegment(&datapb.SegmentInfo{
ID: segmentID,
CollectionID: 1,
PartitionID: 2,
StartPosition: &msgpb.MsgPosition{},
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() })
s.Run("service_not_ready", func() {
ctx, cancel := context.WithCancel(context.Background())

View File

@ -69,6 +69,7 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
log.Info("SaveBinlogPath",
zap.Int64("SegmentID", pack.segmentID),
zap.Int64("CollectionID", pack.collectionID),
zap.Int64("ParitionID", pack.partitionID),
zap.Any("startPos", startPos),
zap.Any("checkPoints", checkPoints),
zap.Int("binlogNum", lo.SumBy(insertFieldBinlogs, getBinlogNum)),
@ -85,6 +86,7 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
),
SegmentID: pack.segmentID,
CollectionID: pack.collectionID,
PartitionID: pack.partitionID,
Field2BinlogPaths: insertFieldBinlogs,
Field2StatslogPaths: statsFieldBinlogs,
Deltalogs: deltaFieldBinlogs,
@ -95,6 +97,7 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
Flushed: pack.isFlush,
Dropped: pack.isDrop,
Channel: pack.channelName,
SegLevel: pack.level,
}
err := retry.Do(context.Background(), func() error {
err := b.broker.SaveBinlogPaths(context.Background(), req)
@ -124,8 +127,12 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
log.Warn("failed to SaveBinlogPaths",
zap.Int64("segmentID", pack.segmentID),
zap.Error(err))
return err
}
return err
pack.metacache.UpdateSegments(metacache.SetStartPosRecorded(true), metacache.WithSegmentIDs(lo.Map(startPos, func(pos *datapb.SegmentStartPosition, _ int) int64 { return pos.GetSegmentID() })...))
return nil
}
func (b *brokerMetaWriter) DropChannel(channelName string) error {

View File

@ -40,6 +40,7 @@ func (s *MetaWriterSuite) TestNormalSave() {
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
task := NewSyncTask()
task.WithMetaCache(s.metacache)
err := s.writer.UpdateSync(task)

View File

@ -115,3 +115,8 @@ func (t *SyncTask) WithBatchSize(batchSize int64) *SyncTask {
t.batchSize = batchSize
return t
}
func (t *SyncTask) WithLevel(level datapb.SegmentLevel) *SyncTask {
t.level = level
return t
}

View File

@ -210,6 +210,7 @@ func (s *SyncManagerSuite) TestCompacted() {
func (s *SyncManagerSuite) TestBlock() {
sig := make(chan struct{})
counter := atomic.NewInt32(0)
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil)
bfs := metacache.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
@ -219,7 +220,9 @@ func (s *SyncManagerSuite) TestBlock() {
return []*metacache.SegmentInfo{seg}
})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(_ metacache.SegmentAction, filters ...metacache.SegmentFilter) {
close(sig)
if counter.Inc() == 2 {
close(sig)
}
})
manager, err := NewSyncManager(10, s.chunkManager, s.allocator)

View File

@ -41,6 +41,7 @@ type SyncTask struct {
// batchSize is the row number of this sync task,
// not the total num of rows of segemnt
batchSize int64
level datapb.SegmentLevel
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp

View File

@ -3,7 +3,6 @@ package writebuffer
import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/storage"
@ -18,9 +17,9 @@ type bfWriteBuffer struct {
metacache metacache.MetaCache
}
func NewBFWriteBuffer(channel string, sch *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) {
func NewBFWriteBuffer(channel string, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) {
return &bfWriteBuffer{
writeBufferBase: newWriteBufferBase(channel, sch, metacache, syncMgr, option),
writeBufferBase: newWriteBufferBase(channel, metacache, syncMgr, option),
syncMgr: syncMgr,
}, nil
}
@ -71,5 +70,6 @@ func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
// update buffer last checkpoint
wb.checkpoint = endPos
return wb.triggerSync()
_ = wb.triggerSync()
return nil
}

View File

@ -25,6 +25,7 @@ import (
type BFWriteBufferSuite struct {
suite.Suite
collID int64
channelName string
collSchema *schemapb.CollectionSchema
syncMgr *syncmgr.MockSyncManager
@ -34,6 +35,7 @@ type BFWriteBufferSuite struct {
func (s *BFWriteBufferSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collID = 100
s.collSchema = &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
@ -136,17 +138,19 @@ func (s *BFWriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstre
func (s *BFWriteBufferSuite) SetupTest() {
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
s.metacache.EXPECT().Schema().Return(s.collSchema).Maybe()
s.metacache.EXPECT().Collection().Return(s.collID).Maybe()
s.broker = broker.NewMockBroker(s.T())
}
func (s *BFWriteBufferSuite) TestBufferData() {
wb, err := NewBFWriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{})
wb, err := NewBFWriteBuffer(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
pks, msg := s.composeInsertMsg(1000, 10, 128)
@ -159,30 +163,32 @@ func (s *BFWriteBufferSuite) TestBufferData() {
func (s *BFWriteBufferSuite) TestAutoSync() {
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1")
wb, err := NewBFWriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{
syncPolicies: []SyncPolicy{
SyncFullBuffer,
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetFlushingSegmentsPolicy(s.metacache),
},
s.Run("normal_auto_sync", func() {
wb, err := NewBFWriteBuffer(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{
syncPolicies: []SyncPolicy{
SyncFullBuffer,
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetFlushingSegmentsPolicy(s.metacache),
},
})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacache.EXPECT().GetSegmentByID(int64(1002)).Return(seg, true)
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil)
pks, msg := s.composeInsertMsg(1000, 10, 128)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacache.EXPECT().GetSegmentByID(int64(1002)).Return(seg, true)
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil)
pks, msg := s.composeInsertMsg(1000, 10, 128)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
}
func TestBFWriteBuffer(t *testing.T) {

View File

@ -5,11 +5,12 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
@ -20,19 +21,21 @@ import (
type l0WriteBuffer struct {
*writeBufferBase
l0Segments map[int64]int64 // partitionID => l0 segment ID
l0Segments map[int64]int64 // partitionID => l0 segment ID
l0partition map[int64]int64 // l0 segment id => partition id
syncMgr syncmgr.SyncManager
idAllocator allocator.Interface
}
func NewL0WriteBuffer(channel string, sch *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) {
func NewL0WriteBuffer(channel string, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) {
if option.idAllocator == nil {
return nil, merr.WrapErrServiceInternal("id allocator is nil when creating l0 write buffer")
}
return &l0WriteBuffer{
l0Segments: make(map[int64]int64),
writeBufferBase: newWriteBufferBase(channel, sch, metacache, syncMgr, option),
l0partition: make(map[int64]int64),
writeBufferBase: newWriteBufferBase(channel, metacache, syncMgr, option),
syncMgr: syncMgr,
idAllocator: option.idAllocator,
}, nil
@ -50,7 +53,7 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
}
for _, msg := range deleteMsgs {
l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID())
l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos)
pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys())
err := wb.bufferDelete(l0SegmentID, pks, msg.GetTimestamps(), startPos, endPos)
if err != nil {
@ -62,10 +65,18 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
// update buffer last checkpoint
wb.checkpoint = endPos
return wb.triggerSync()
segmentsSync := wb.triggerSync()
for _, segment := range segmentsSync {
partition, ok := wb.l0partition[segment]
if ok {
delete(wb.l0partition, segment)
delete(wb.l0Segments, partition)
}
}
return nil
}
func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64) int64 {
func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64, startPos *msgpb.MsgPosition) int64 {
segmentID, ok := wb.l0Segments[partitionID]
if !ok {
err := retry.Do(context.Background(), func() error {
@ -78,6 +89,16 @@ func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64) int64 {
panic(err)
}
wb.l0Segments[partitionID] = segmentID
wb.l0partition[segmentID] = partitionID
wb.metaCache.AddSegment(&datapb.SegmentInfo{
ID: segmentID,
PartitionID: partitionID,
CollectionID: wb.collectionID,
InsertChannel: wb.channelName,
StartPosition: startPos,
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L0,
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }, metacache.SetStartPosRecorded(false))
}
return segmentID
}

View File

@ -25,6 +25,7 @@ import (
type L0WriteBufferSuite struct {
suite.Suite
channelName string
collID int64
collSchema *schemapb.CollectionSchema
syncMgr *syncmgr.MockSyncManager
metacache *metacache.MockMetaCache
@ -33,6 +34,7 @@ type L0WriteBufferSuite struct {
func (s *L0WriteBufferSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collID = 100
s.collSchema = &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
@ -136,12 +138,14 @@ func (s *L0WriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstre
func (s *L0WriteBufferSuite) SetupTest() {
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
s.metacache.EXPECT().Schema().Return(s.collSchema).Maybe()
s.metacache.EXPECT().Collection().Return(s.collID).Maybe()
s.allocator = allocator.NewMockGIDAllocator()
s.allocator.AllocOneF = func() (int64, error) { return int64(tsoutil.ComposeTSByTime(time.Now(), 0)), nil }
}
func (s *L0WriteBufferSuite) TestBufferData() {
wb, err := NewL0WriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{
wb, err := NewL0WriteBuffer(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{
idAllocator: s.allocator,
})
s.NoError(err)
@ -150,7 +154,7 @@ func (s *L0WriteBufferSuite) TestBufferData() {
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})

View File

@ -7,7 +7,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/pkg/log"
@ -18,7 +17,7 @@ import (
// BufferManager is the interface for WriteBuffer management.
type BufferManager interface {
// Register adds a WriteBuffer with provided schema & options.
Register(channel string, schema *schemapb.CollectionSchema, metacache metacache.MetaCache, opts ...WriteBufferOption) error
Register(channel string, metacache metacache.MetaCache, opts ...WriteBufferOption) error
// FlushSegments notifies writeBuffer corresponding to provided channel to flush segments.
FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error
// FlushChannel
@ -50,7 +49,7 @@ type bufferManager struct {
}
// Register a new WriteBuffer for channel.
func (m *bufferManager) Register(channel string, schema *schemapb.CollectionSchema, metacache metacache.MetaCache, opts ...WriteBufferOption) error {
func (m *bufferManager) Register(channel string, metacache metacache.MetaCache, opts ...WriteBufferOption) error {
m.mut.Lock()
defer m.mut.Unlock()
@ -58,7 +57,7 @@ func (m *bufferManager) Register(channel string, schema *schemapb.CollectionSche
if ok {
return merr.WrapErrChannelReduplicate(channel)
}
buf, err := NewWriteBuffer(channel, schema, metacache, m.syncMgr, opts...)
buf, err := NewWriteBuffer(channel, metacache, m.syncMgr, opts...)
if err != nil {
return err
}

View File

@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/pkg/common"
@ -21,16 +22,19 @@ import (
type ManagerSuite struct {
suite.Suite
collID int64
channelName string
collSchema *schemapb.CollectionSchema
syncMgr *syncmgr.MockSyncManager
metacache *metacache.MockMetaCache
allocator *allocator.MockAllocator
manager *bufferManager
}
func (s *ManagerSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collID = 100
s.collSchema = &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
@ -52,6 +56,9 @@ func (s *ManagerSuite) SetupSuite() {
func (s *ManagerSuite) SetupTest() {
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
s.metacache.EXPECT().Collection().Return(s.collID).Maybe()
s.metacache.EXPECT().Schema().Return(s.collSchema).Maybe()
s.allocator = allocator.NewMockAllocator(s.T())
mgr := NewManager(s.syncMgr)
var ok bool
@ -62,10 +69,10 @@ func (s *ManagerSuite) SetupTest() {
func (s *ManagerSuite) TestRegister() {
manager := s.manager
err := manager.Register(s.channelName, s.collSchema, s.metacache)
err := manager.Register(s.channelName, s.metacache, WithIDAllocator(s.allocator))
s.NoError(err)
err = manager.Register(s.channelName, s.collSchema, s.metacache)
err = manager.Register(s.channelName, s.metacache, WithIDAllocator(s.allocator))
s.Error(err)
s.ErrorIs(err, merr.ErrChannelReduplicate)
}
@ -169,7 +176,7 @@ func (s *ManagerSuite) TestRemoveChannel() {
})
s.Run("remove_channel", func() {
err := manager.Register(s.channelName, s.collSchema, s.metacache)
err := manager.Register(s.channelName, s.metacache, WithIDAllocator(s.allocator))
s.Require().NoError(err)
s.NotPanics(func() {

View File

@ -11,8 +11,6 @@ import (
msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
msgstream "github.com/milvus-io/milvus/pkg/mq/msgstream"
schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
)
// MockBufferManager is an autogenerated mock type for the BufferManager type
@ -290,20 +288,20 @@ func (_c *MockBufferManager_NotifyCheckpointUpdated_Call) RunAndReturn(run func(
return _c
}
// Register provides a mock function with given fields: channel, schema, _a2, opts
func (_m *MockBufferManager) Register(channel string, schema *schemapb.CollectionSchema, _a2 metacache.MetaCache, opts ...WriteBufferOption) error {
// Register provides a mock function with given fields: channel, _a1, opts
func (_m *MockBufferManager) Register(channel string, _a1 metacache.MetaCache, opts ...WriteBufferOption) error {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, channel, schema, _a2)
_ca = append(_ca, channel, _a1)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(string, *schemapb.CollectionSchema, metacache.MetaCache, ...WriteBufferOption) error); ok {
r0 = rf(channel, schema, _a2, opts...)
if rf, ok := ret.Get(0).(func(string, metacache.MetaCache, ...WriteBufferOption) error); ok {
r0 = rf(channel, _a1, opts...)
} else {
r0 = ret.Error(0)
}
@ -318,23 +316,22 @@ type MockBufferManager_Register_Call struct {
// Register is a helper method to define mock.On call
// - channel string
// - schema *schemapb.CollectionSchema
// - _a2 metacache.MetaCache
// - _a1 metacache.MetaCache
// - opts ...WriteBufferOption
func (_e *MockBufferManager_Expecter) Register(channel interface{}, schema interface{}, _a2 interface{}, opts ...interface{}) *MockBufferManager_Register_Call {
func (_e *MockBufferManager_Expecter) Register(channel interface{}, _a1 interface{}, opts ...interface{}) *MockBufferManager_Register_Call {
return &MockBufferManager_Register_Call{Call: _e.mock.On("Register",
append([]interface{}{channel, schema, _a2}, opts...)...)}
append([]interface{}{channel, _a1}, opts...)...)}
}
func (_c *MockBufferManager_Register_Call) Run(run func(channel string, schema *schemapb.CollectionSchema, _a2 metacache.MetaCache, opts ...WriteBufferOption)) *MockBufferManager_Register_Call {
func (_c *MockBufferManager_Register_Call) Run(run func(channel string, _a1 metacache.MetaCache, opts ...WriteBufferOption)) *MockBufferManager_Register_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]WriteBufferOption, len(args)-3)
for i, a := range args[3:] {
variadicArgs := make([]WriteBufferOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(WriteBufferOption)
}
}
run(args[0].(string), args[1].(*schemapb.CollectionSchema), args[2].(metacache.MetaCache), variadicArgs...)
run(args[0].(string), args[1].(metacache.MetaCache), variadicArgs...)
})
return _c
}
@ -344,7 +341,7 @@ func (_c *MockBufferManager_Register_Call) Return(_a0 error) *MockBufferManager_
return _c
}
func (_c *MockBufferManager_Register_Call) RunAndReturn(run func(string, *schemapb.CollectionSchema, metacache.MetaCache, ...WriteBufferOption) error) *MockBufferManager_Register_Call {
func (_c *MockBufferManager_Register_Call) RunAndReturn(run func(string, metacache.MetaCache, ...WriteBufferOption) error) *MockBufferManager_Register_Call {
_c.Call.Return(run)
return _c
}

View File

@ -8,6 +8,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -48,7 +49,9 @@ func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) S
if !ok {
return buf.segmentID, false
}
return buf.segmentID, seg.State() == commonpb.SegmentState_Flushed && buf.MinTimestamp() < flushTs
inRange := seg.State() == commonpb.SegmentState_Flushed ||
seg.Level() == datapb.SegmentLevel_L0
return buf.segmentID, inRange && buf.MinTimestamp() < flushTs
})
// set segment flushing
meta.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushing),

View File

@ -48,7 +48,7 @@ type WriteBuffer interface {
Close(drop bool)
}
func NewWriteBuffer(channel string, schema *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) {
func NewWriteBuffer(channel string, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) {
option := defaultWBOption()
option.syncPolicies = append(option.syncPolicies, GetFlushingSegmentsPolicy(metacache))
for _, opt := range opts {
@ -57,9 +57,9 @@ func NewWriteBuffer(channel string, schema *schemapb.CollectionSchema, metacache
switch option.deletePolicy {
case DeletePolicyBFPkOracle:
return NewBFWriteBuffer(channel, schema, metacache, syncMgr, option)
return NewBFWriteBuffer(channel, metacache, syncMgr, option)
case DeletePolicyL0Delta:
return NewL0WriteBuffer(channel, schema, metacache, syncMgr, option)
return NewL0WriteBuffer(channel, metacache, syncMgr, option)
default:
return nil, merr.WrapErrParameterInvalid("valid delete policy config", option.deletePolicy)
}
@ -84,14 +84,15 @@ type writeBufferBase struct {
flushTimestamp *atomic.Uint64
}
func newWriteBufferBase(channel string, sch *schemapb.CollectionSchema, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) *writeBufferBase {
func newWriteBufferBase(channel string, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) *writeBufferBase {
flushTs := atomic.NewUint64(nonFlushTS)
flushTsPolicy := GetFlushTsPolicy(flushTs, metacache)
option.syncPolicies = append(option.syncPolicies, flushTsPolicy)
return &writeBufferBase{
channelName: channel,
collSchema: sch,
collectionID: metacache.Collection(),
collSchema: metacache.Schema(),
syncMgr: syncMgr,
metaWriter: option.metaWriter,
buffers: make(map[int64]*segmentBuffer),
@ -143,18 +144,14 @@ func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
return checkpoint
}
func (wb *writeBufferBase) triggerSync() error {
func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) {
segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp())
if len(segmentsToSync) > 0 {
log.Info("write buffer get segments to sync", zap.Int64s("segmentIDs", segmentsToSync))
err := wb.syncSegments(context.Background(), segmentsToSync)
if err != nil {
log.Warn("segment segments failed", zap.Int64s("segmentIDs", segmentsToSync), zap.Error(err))
return err
}
wb.syncSegments(context.Background(), segmentsToSync)
}
return nil
return segmentsToSync
}
func (wb *writeBufferBase) flushSegments(ctx context.Context, segmentIDs []int64) error {
@ -169,7 +166,7 @@ func (wb *writeBufferBase) flushSegments(ctx context.Context, segmentIDs []int64
return nil
}
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) error {
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) {
for _, segmentID := range segmentIDs {
syncTask := wb.getSyncTask(ctx, segmentID)
if syncTask == nil {
@ -181,7 +178,6 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
// discard Future here, handle error in callback
_ = wb.syncMgr.SyncData(ctx, syncTask)
}
return nil
}
// getSegmentsToSync applies all policies to get segments list to sync.
@ -242,7 +238,7 @@ func (wb *writeBufferBase) bufferInsert(insertMsgs []*msgstream.InsertMsg, start
InsertChannel: wb.channelName,
StartPosition: startPos,
State: commonpb.SegmentState_Growing,
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() })
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }, metacache.SetStartPosRecorded(false))
}
segBuf := wb.getOrCreateBuffer(segmentID)
@ -292,6 +288,7 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) *sy
WithChannelName(wb.channelName).
WithSegmentID(segmentID).
WithStartPosition(startPos).
WithLevel(segmentInfo.Level()).
WithCheckpoint(wb.checkpoint).
WithSchema(wb.collSchema).
WithBatchSize(batchSize).

View File

@ -19,6 +19,7 @@ import (
type WriteBufferSuite struct {
suite.Suite
collID int64
channelName string
collSchema *schemapb.CollectionSchema
wb *writeBufferBase
@ -28,6 +29,7 @@ type WriteBufferSuite struct {
func (s *WriteBufferSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collID = 100
s.collSchema = &schemapb.CollectionSchema{
Name: "wb_base_collection",
Fields: []*schemapb.FieldSchema{
@ -43,7 +45,9 @@ func (s *WriteBufferSuite) SetupSuite() {
func (s *WriteBufferSuite) SetupTest() {
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
s.wb = newWriteBufferBase(s.channelName, s.collSchema, s.metacache, s.syncMgr, &writeBufferOption{
s.metacache.EXPECT().Schema().Return(s.collSchema).Maybe()
s.metacache.EXPECT().Collection().Return(s.collID).Maybe()
s.wb = newWriteBufferBase(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{
pkStatsFactory: func(vchannel *datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
},
@ -51,18 +55,18 @@ func (s *WriteBufferSuite) SetupTest() {
}
func (s *WriteBufferSuite) TestWriteBufferType() {
wb, err := NewWriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
s.NoError(err)
_, ok := wb.(*bfWriteBuffer)
s.True(ok)
wb, err = NewWriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyL0Delta), WithIDAllocator(allocator.NewMockGIDAllocator()))
wb, err = NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyL0Delta), WithIDAllocator(allocator.NewMockGIDAllocator()))
s.NoError(err)
_, ok = wb.(*l0WriteBuffer)
s.True(ok)
_, err = NewWriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(""))
_, err = NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(""))
s.Error(err)
}
@ -81,7 +85,7 @@ func (s *WriteBufferSuite) TestFlushSegments() {
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything)
wb, err := NewWriteBuffer(s.channelName, s.collSchema, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
s.NoError(err)
err = wb.FlushSegments(context.Background(), []int64{segmentID})