diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 98a02b5b90..b3e8b69e92 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -58,6 +58,7 @@ type ddNode struct { segID2SegInfo sync.Map // segment ID to *SegmentInfo flushedSegments []*datapb.SegmentInfo + droppedSegments []UniqueID vchannelName string deltaMsgStream msgstream.MsgStream @@ -170,7 +171,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { } func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg) bool { - if ddn.isFlushed(msg.GetSegmentID()) { + if ddn.isFlushed(msg.GetSegmentID()) || ddn.isDropped(msg.GetSegmentID()) { return true } @@ -193,6 +194,15 @@ func (ddn *ddNode) isFlushed(segmentID UniqueID) bool { return false } +func (ddn *ddNode) isDropped(segID UniqueID) bool { + for _, sID := range ddn.droppedSegments { + if sID == segID { + return true + } + } + return false +} + func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, maxTs Timestamp) error { if len(msgs) != 0 { var msgPack = msgstream.MsgPack{ @@ -277,6 +287,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI BaseNode: baseNode, collectionID: collID, flushedSegments: fs, + droppedSegments: vchanInfo.GetDroppedSegments(), vchannelName: vchanInfo.ChannelName, deltaMsgStream: deltaMsgStream, } diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index e8049253a2..02bd1ce215 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -380,3 +380,35 @@ func TestFlowGraph_DDNode_isFlushed(te *testing.T) { }) } } + +func TestFlowGraph_DDNode_isDropped(te *testing.T) { + tests := []struct { + indroppedSegment []UniqueID + inSeg UniqueID + + expectedOut bool + + description string + }{ + {[]UniqueID{1, 2, 3}, 1, true, + "Input seg 1 in droppedSegs{1, 2, 3}"}, + {[]UniqueID{1, 2, 3}, 2, true, + "Input seg 2 in droppedSegs{1, 2, 3}"}, + {[]UniqueID{1, 2, 3}, 3, true, + "Input seg 3 in droppedSegs{1, 2, 3}"}, + {[]UniqueID{1, 2, 3}, 4, false, + "Input seg 4 not in droppedSegs{1, 2, 3}"}, + {[]UniqueID{}, 5, false, + "Input seg 5, no droppedSegs {}"}, + } + + for _, test := range tests { + te.Run(test.description, func(t *testing.T) { + factory := mockMsgStreamFactory{true, true} + deltaStream, err := factory.NewMsgStream(context.Background()) + assert.Nil(t, err) + ddn := &ddNode{droppedSegments: test.indroppedSegment, deltaMsgStream: deltaStream} + assert.Equal(t, test.expectedOut, ddn.isDropped(test.inSeg)) + }) + } +}