mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 03:18:29 +08:00
Implement Injection
for SyncManager with block and meta transition (#28093)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
fc2df9514f
commit
1e51255c15
@ -73,6 +73,12 @@ func RollStats() SegmentAction {
|
||||
}
|
||||
}
|
||||
|
||||
func CompactTo(compactTo int64) SegmentAction {
|
||||
return func(info *SegmentInfo) {
|
||||
info.compactTo = compactTo
|
||||
}
|
||||
}
|
||||
|
||||
// MergeSegmentAction is the util function to merge multiple SegmentActions into one.
|
||||
func MergeSegmentAction(actions ...SegmentAction) SegmentAction {
|
||||
return func(info *SegmentInfo) {
|
||||
|
@ -89,6 +89,11 @@ func (s *SegmentActionSuite) TestActions() {
|
||||
action = UpdateNumOfRows(numOfRows)
|
||||
action(info)
|
||||
s.Equal(numOfRows, info.NumOfRows())
|
||||
|
||||
compactTo := int64(1002)
|
||||
action = CompactTo(compactTo)
|
||||
action(info)
|
||||
s.Equal(compactTo, info.CompactTo())
|
||||
}
|
||||
|
||||
func (s *SegmentActionSuite) TestMergeActions() {
|
||||
|
@ -32,6 +32,7 @@ type SegmentInfo struct {
|
||||
startPosRecorded bool
|
||||
numOfRows int64
|
||||
bfs *BloomFilterSet
|
||||
compactTo int64
|
||||
}
|
||||
|
||||
func (s *SegmentInfo) SegmentID() int64 {
|
||||
@ -62,6 +63,10 @@ func (s *SegmentInfo) GetHistory() []*storage.PkStatistics {
|
||||
return s.bfs.GetHistory()
|
||||
}
|
||||
|
||||
func (s *SegmentInfo) CompactTo() int64 {
|
||||
return s.compactTo
|
||||
}
|
||||
|
||||
func (s *SegmentInfo) Clone() *SegmentInfo {
|
||||
return &SegmentInfo{
|
||||
segmentID: s.segmentID,
|
||||
@ -72,6 +77,7 @@ func (s *SegmentInfo) Clone() *SegmentInfo {
|
||||
startPosRecorded: s.startPosRecorded,
|
||||
numOfRows: s.numOfRows,
|
||||
bfs: s.bfs,
|
||||
compactTo: s.compactTo,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,8 @@ type SyncMeta struct {
|
||||
|
||||
type SyncManager interface {
|
||||
SyncData(ctx context.Context, task *SyncTask) error
|
||||
Block(segmentID int64)
|
||||
Unblock(segmentID int64)
|
||||
}
|
||||
|
||||
type syncManager struct {
|
||||
@ -62,3 +64,11 @@ func (mgr syncManager) SyncData(ctx context.Context, task *SyncTask) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr syncManager) Block(segmentID int64) {
|
||||
mgr.keyLock.Lock(segmentID)
|
||||
}
|
||||
|
||||
func (mgr syncManager) Unblock(segmentID int64) {
|
||||
mgr.keyLock.Unlock(segmentID)
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package syncmgr
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -172,6 +173,75 @@ func (s *SyncManagerSuite) TestSubmit() {
|
||||
<-sig
|
||||
}
|
||||
|
||||
func (s *SyncManagerSuite) TestCompacted() {
|
||||
sig := make(chan struct{})
|
||||
var segmentID atomic.Int64
|
||||
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Run(func(_ context.Context, req *datapb.SaveBinlogPathsRequest) {
|
||||
close(sig)
|
||||
segmentID.Store(req.GetSegmentID())
|
||||
}).Return(nil)
|
||||
bfs := metacache.NewBloomFilterSet()
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
metacache.CompactTo(1001)(seg)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
|
||||
manager, err := NewSyncManager(10, s.chunkManager, s.allocator)
|
||||
s.NoError(err)
|
||||
task := s.getSuiteSyncTask()
|
||||
task.WithMetaWriter(BrokerMetaWriter(s.broker))
|
||||
task.WithTimeRange(50, 100)
|
||||
task.WithCheckpoint(&msgpb.MsgPosition{
|
||||
ChannelName: s.channelName,
|
||||
MsgID: []byte{1, 2, 3, 4},
|
||||
Timestamp: 100,
|
||||
})
|
||||
|
||||
err = manager.SyncData(context.Background(), task)
|
||||
s.NoError(err)
|
||||
|
||||
<-sig
|
||||
s.EqualValues(1001, segmentID.Load())
|
||||
}
|
||||
|
||||
func (s *SyncManagerSuite) TestBlock() {
|
||||
sig := make(chan struct{})
|
||||
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Run(func(_ context.Context, _ *datapb.SaveBinlogPathsRequest) {
|
||||
close(sig)
|
||||
}).Return(nil)
|
||||
bfs := metacache.NewBloomFilterSet()
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
|
||||
manager, err := NewSyncManager(10, s.chunkManager, s.allocator)
|
||||
s.NoError(err)
|
||||
|
||||
// block
|
||||
manager.Block(s.segmentID)
|
||||
|
||||
go func() {
|
||||
task := s.getSuiteSyncTask()
|
||||
task.WithMetaWriter(BrokerMetaWriter(s.broker))
|
||||
task.WithTimeRange(50, 100)
|
||||
task.WithCheckpoint(&msgpb.MsgPosition{
|
||||
ChannelName: s.channelName,
|
||||
MsgID: []byte{1, 2, 3, 4},
|
||||
Timestamp: 100,
|
||||
})
|
||||
manager.SyncData(context.Background(), task)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-sig:
|
||||
s.FailNow("sync task done during block")
|
||||
default:
|
||||
}
|
||||
|
||||
manager.Unblock(s.segmentID)
|
||||
<-sig
|
||||
}
|
||||
|
||||
func TestSyncManager(t *testing.T) {
|
||||
suite.Run(t, new(SyncManagerSuite))
|
||||
}
|
||||
|
@ -75,6 +75,21 @@ func (t *SyncTask) Run() error {
|
||||
log := t.getLogger()
|
||||
var err error
|
||||
|
||||
infos := t.metacache.GetSegmentsBy(metacache.WithSegmentID(t.segmentID))
|
||||
if len(infos) == 0 {
|
||||
log.Warn("failed to sync data, segment not found in metacache")
|
||||
t.handleError(err)
|
||||
return merr.WrapErrSegmentNotFound(t.segmentID)
|
||||
}
|
||||
|
||||
segment := infos[0]
|
||||
if segment.CompactTo() > 0 {
|
||||
log.Info("syncing segment compacted, update segment id", zap.Int64("compactTo", segment.CompactTo()))
|
||||
// update sync task segment id
|
||||
// it's ok to use compactTo segmentID here, since there shall be no insert for compacted segment
|
||||
t.segmentID = segment.CompactTo()
|
||||
}
|
||||
|
||||
err = t.serializeInsertData()
|
||||
if err != nil {
|
||||
log.Warn("failed to serialize insert data", zap.Error(err))
|
||||
|
@ -214,6 +214,23 @@ func (s *SyncTaskSuite) TestRunNormal() {
|
||||
}
|
||||
|
||||
func (s *SyncTaskSuite) TestRunError() {
|
||||
s.Run("segment_not_found", func() {
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{})
|
||||
flag := false
|
||||
handler := func(_ error) { flag = true }
|
||||
task := s.getSuiteSyncTask().WithFailureCallback(handler)
|
||||
task.WithInsertData(s.getEmptyInsertBuffer())
|
||||
|
||||
err := task.Run()
|
||||
|
||||
s.Error(err)
|
||||
s.True(flag)
|
||||
})
|
||||
|
||||
s.metacache.ExpectedCalls = nil
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, metacache.NewBloomFilterSet())
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.Run("serialize_insert_fail", func() {
|
||||
flag := false
|
||||
handler := func(_ error) { flag = true }
|
||||
|
Loading…
Reference in New Issue
Block a user