From 1e51255c1511b9a792f8984a21bf5460ce977ed5 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 3 Nov 2023 04:48:15 +0800 Subject: [PATCH] Implement `Injection` for SyncManager with block and meta transition (#28093) Signed-off-by: Congqi Xia --- internal/datanode/metacache/actions.go | 6 ++ internal/datanode/metacache/actions_test.go | 5 ++ internal/datanode/metacache/segment.go | 6 ++ internal/datanode/syncmgr/sync_manager.go | 10 +++ .../datanode/syncmgr/sync_manager_test.go | 70 +++++++++++++++++++ internal/datanode/syncmgr/task.go | 15 ++++ internal/datanode/syncmgr/task_test.go | 17 +++++ 7 files changed, 129 insertions(+) diff --git a/internal/datanode/metacache/actions.go b/internal/datanode/metacache/actions.go index 9522669a35..d776a5831d 100644 --- a/internal/datanode/metacache/actions.go +++ b/internal/datanode/metacache/actions.go @@ -73,6 +73,12 @@ func RollStats() SegmentAction { } } +func CompactTo(compactTo int64) SegmentAction { + return func(info *SegmentInfo) { + info.compactTo = compactTo + } +} + // MergeSegmentAction is the util function to merge multiple SegmentActions into one. func MergeSegmentAction(actions ...SegmentAction) SegmentAction { return func(info *SegmentInfo) { diff --git a/internal/datanode/metacache/actions_test.go b/internal/datanode/metacache/actions_test.go index bca462fdf0..c234384dc2 100644 --- a/internal/datanode/metacache/actions_test.go +++ b/internal/datanode/metacache/actions_test.go @@ -89,6 +89,11 @@ func (s *SegmentActionSuite) TestActions() { action = UpdateNumOfRows(numOfRows) action(info) s.Equal(numOfRows, info.NumOfRows()) + + compactTo := int64(1002) + action = CompactTo(compactTo) + action(info) + s.Equal(compactTo, info.CompactTo()) } func (s *SegmentActionSuite) TestMergeActions() { diff --git a/internal/datanode/metacache/segment.go b/internal/datanode/metacache/segment.go index afb542a926..803a1e2729 100644 --- a/internal/datanode/metacache/segment.go +++ b/internal/datanode/metacache/segment.go @@ -32,6 +32,7 @@ type SegmentInfo struct { startPosRecorded bool numOfRows int64 bfs *BloomFilterSet + compactTo int64 } func (s *SegmentInfo) SegmentID() int64 { @@ -62,6 +63,10 @@ func (s *SegmentInfo) GetHistory() []*storage.PkStatistics { return s.bfs.GetHistory() } +func (s *SegmentInfo) CompactTo() int64 { + return s.compactTo +} + func (s *SegmentInfo) Clone() *SegmentInfo { return &SegmentInfo{ segmentID: s.segmentID, @@ -72,6 +77,7 @@ func (s *SegmentInfo) Clone() *SegmentInfo { startPosRecorded: s.startPosRecorded, numOfRows: s.numOfRows, bfs: s.bfs, + compactTo: s.compactTo, } } diff --git a/internal/datanode/syncmgr/sync_manager.go b/internal/datanode/syncmgr/sync_manager.go index 51b5cc288c..f1d22c0ebb 100644 --- a/internal/datanode/syncmgr/sync_manager.go +++ b/internal/datanode/syncmgr/sync_manager.go @@ -34,6 +34,8 @@ type SyncMeta struct { type SyncManager interface { SyncData(ctx context.Context, task *SyncTask) error + Block(segmentID int64) + Unblock(segmentID int64) } type syncManager struct { @@ -62,3 +64,11 @@ func (mgr syncManager) SyncData(ctx context.Context, task *SyncTask) error { return nil } + +func (mgr syncManager) Block(segmentID int64) { + mgr.keyLock.Lock(segmentID) +} + +func (mgr syncManager) Unblock(segmentID int64) { + mgr.keyLock.Unlock(segmentID) +} diff --git a/internal/datanode/syncmgr/sync_manager_test.go b/internal/datanode/syncmgr/sync_manager_test.go index 9086afde1d..8865a0bd01 100644 --- a/internal/datanode/syncmgr/sync_manager_test.go +++ b/internal/datanode/syncmgr/sync_manager_test.go @@ -3,6 +3,7 @@ package syncmgr import ( "context" "math/rand" + "sync/atomic" "testing" "time" @@ -172,6 +173,75 @@ func (s *SyncManagerSuite) TestSubmit() { <-sig } +func (s *SyncManagerSuite) TestCompacted() { + sig := make(chan struct{}) + var segmentID atomic.Int64 + s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Run(func(_ context.Context, req *datapb.SaveBinlogPathsRequest) { + close(sig) + segmentID.Store(req.GetSegmentID()) + }).Return(nil) + bfs := metacache.NewBloomFilterSet() + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) + metacache.UpdateNumOfRows(1000)(seg) + metacache.CompactTo(1001)(seg) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) + + manager, err := NewSyncManager(10, s.chunkManager, s.allocator) + s.NoError(err) + task := s.getSuiteSyncTask() + task.WithMetaWriter(BrokerMetaWriter(s.broker)) + task.WithTimeRange(50, 100) + task.WithCheckpoint(&msgpb.MsgPosition{ + ChannelName: s.channelName, + MsgID: []byte{1, 2, 3, 4}, + Timestamp: 100, + }) + + err = manager.SyncData(context.Background(), task) + s.NoError(err) + + <-sig + s.EqualValues(1001, segmentID.Load()) +} + +func (s *SyncManagerSuite) TestBlock() { + sig := make(chan struct{}) + s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Run(func(_ context.Context, _ *datapb.SaveBinlogPathsRequest) { + close(sig) + }).Return(nil) + bfs := metacache.NewBloomFilterSet() + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) + metacache.UpdateNumOfRows(1000)(seg) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) + + manager, err := NewSyncManager(10, s.chunkManager, s.allocator) + s.NoError(err) + + // block + manager.Block(s.segmentID) + + go func() { + task := s.getSuiteSyncTask() + task.WithMetaWriter(BrokerMetaWriter(s.broker)) + task.WithTimeRange(50, 100) + task.WithCheckpoint(&msgpb.MsgPosition{ + ChannelName: s.channelName, + MsgID: []byte{1, 2, 3, 4}, + Timestamp: 100, + }) + manager.SyncData(context.Background(), task) + }() + + select { + case <-sig: + s.FailNow("sync task done during block") + default: + } + + manager.Unblock(s.segmentID) + <-sig +} + func TestSyncManager(t *testing.T) { suite.Run(t, new(SyncManagerSuite)) } diff --git a/internal/datanode/syncmgr/task.go b/internal/datanode/syncmgr/task.go index aff357782c..a000301890 100644 --- a/internal/datanode/syncmgr/task.go +++ b/internal/datanode/syncmgr/task.go @@ -75,6 +75,21 @@ func (t *SyncTask) Run() error { log := t.getLogger() var err error + infos := t.metacache.GetSegmentsBy(metacache.WithSegmentID(t.segmentID)) + if len(infos) == 0 { + log.Warn("failed to sync data, segment not found in metacache") + t.handleError(err) + return merr.WrapErrSegmentNotFound(t.segmentID) + } + + segment := infos[0] + if segment.CompactTo() > 0 { + log.Info("syncing segment compacted, update segment id", zap.Int64("compactTo", segment.CompactTo())) + // update sync task segment id + // it's ok to use compactTo segmentID here, since there shall be no insert for compacted segment + t.segmentID = segment.CompactTo() + } + err = t.serializeInsertData() if err != nil { log.Warn("failed to serialize insert data", zap.Error(err)) diff --git a/internal/datanode/syncmgr/task_test.go b/internal/datanode/syncmgr/task_test.go index 6b4d3700fc..fb00fc5211 100644 --- a/internal/datanode/syncmgr/task_test.go +++ b/internal/datanode/syncmgr/task_test.go @@ -214,6 +214,23 @@ func (s *SyncTaskSuite) TestRunNormal() { } func (s *SyncTaskSuite) TestRunError() { + s.Run("segment_not_found", func() { + s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{}) + flag := false + handler := func(_ error) { flag = true } + task := s.getSuiteSyncTask().WithFailureCallback(handler) + task.WithInsertData(s.getEmptyInsertBuffer()) + + err := task.Run() + + s.Error(err) + s.True(flag) + }) + + s.metacache.ExpectedCalls = nil + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, metacache.NewBloomFilterSet()) + metacache.UpdateNumOfRows(1000)(seg) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.Run("serialize_insert_fail", func() { flag := false handler := func(_ error) { flag = true }