Filter dropped segments in DN (#12242)

See also: #12131, #12230

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2021-11-24 14:11:15 +08:00 committed by GitHub
parent 6606591238
commit e72c5b9351
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 1 deletions

View File

@ -58,6 +58,7 @@ type ddNode struct {
segID2SegInfo sync.Map // segment ID to *SegmentInfo
flushedSegments []*datapb.SegmentInfo
droppedSegments []UniqueID
vchannelName string
deltaMsgStream msgstream.MsgStream
@ -170,7 +171,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
}
func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg) bool {
if ddn.isFlushed(msg.GetSegmentID()) {
if ddn.isFlushed(msg.GetSegmentID()) || ddn.isDropped(msg.GetSegmentID()) {
return true
}
@ -193,6 +194,15 @@ func (ddn *ddNode) isFlushed(segmentID UniqueID) bool {
return false
}
func (ddn *ddNode) isDropped(segID UniqueID) bool {
for _, sID := range ddn.droppedSegments {
if sID == segID {
return true
}
}
return false
}
func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, maxTs Timestamp) error {
if len(msgs) != 0 {
var msgPack = msgstream.MsgPack{
@ -277,6 +287,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI
BaseNode: baseNode,
collectionID: collID,
flushedSegments: fs,
droppedSegments: vchanInfo.GetDroppedSegments(),
vchannelName: vchanInfo.ChannelName,
deltaMsgStream: deltaMsgStream,
}

View File

@ -380,3 +380,35 @@ func TestFlowGraph_DDNode_isFlushed(te *testing.T) {
})
}
}
func TestFlowGraph_DDNode_isDropped(te *testing.T) {
tests := []struct {
indroppedSegment []UniqueID
inSeg UniqueID
expectedOut bool
description string
}{
{[]UniqueID{1, 2, 3}, 1, true,
"Input seg 1 in droppedSegs{1, 2, 3}"},
{[]UniqueID{1, 2, 3}, 2, true,
"Input seg 2 in droppedSegs{1, 2, 3}"},
{[]UniqueID{1, 2, 3}, 3, true,
"Input seg 3 in droppedSegs{1, 2, 3}"},
{[]UniqueID{1, 2, 3}, 4, false,
"Input seg 4 not in droppedSegs{1, 2, 3}"},
{[]UniqueID{}, 5, false,
"Input seg 5, no droppedSegs {}"},
}
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
factory := mockMsgStreamFactory{true, true}
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
ddn := &ddNode{droppedSegments: test.indroppedSegment, deltaMsgStream: deltaStream}
assert.Equal(t, test.expectedOut, ddn.isDropped(test.inSeg))
})
}
}