From 3cfa88d6f2d5fa2a77f2c88a2ba13fb798250edc Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Mon, 27 Dec 2021 11:02:18 +0800 Subject: [PATCH] Fix delete node not aware of merged segments error (#14190) This PR made merge gracefully remove merged segment by moving them into `compactedSegments` And everytime when delete_node operates, if compactedSegments have contents, delete_node will replace the latest segment in delBuf merged from the merged segments, and truely remove the compactedSegments Fixes: #14085 Signed-off-by: yangxuan --- internal/datanode/compactor.go | 17 +-- internal/datanode/compactor_test.go | 4 +- internal/datanode/flow_graph_delete_node.go | 28 +++++ .../datanode/flow_graph_delete_node_test.go | 22 +++- internal/datanode/segment_replica.go | 119 ++++++++++++++---- internal/datanode/segment_replica_test.go | 39 +++--- 6 files changed, 165 insertions(+), 64 deletions(-) diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 18cb3f3de7..c565c025be 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -25,7 +25,6 @@ import ( "sync" "time" - "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -33,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" + "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -442,22 +442,11 @@ func (t *compactionTask) compact() error { // Compaction I: update pk range. // Compaction II: remove the segments and add a new flushed segment with pk range. - fd := make([]UniqueID, 0, numRows) - if numRows > 0 { - for _, iData := range iDatas { - fd = append(fd, iData.Data[common.TimeStampField].(*storage.Int64FieldData).Data...) - } - - } if t.hasSegment(targetSegID, true) { t.refreshFlushedSegStatistics(targetSegID, numRows) - t.refreshFlushedSegmentPKRange(targetSegID, fd) + // no need to shorten the PK range of a segment, deleting dup PKs is valid } else { - t.addFlushedSegmentWithPKs(targetSegID, collID, partID, t.plan.GetChannel(), numRows, fd) - - for _, seg := range segIDs { - t.removeSegment(seg) - } + t.mergeFlushedSegments(targetSegID, collID, partID, segIDs, t.plan.GetChannel(), numRows) } ti.injectDone(true) diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index ed8b55c71f..13091b6ba9 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -496,7 +496,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { plan.Timetravel = Timestamp(25000) replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1}) replica.addFlushedSegmentWithPKs(segID2, collID, partID, "channelname", 2, []UniqueID{9}) - replica.removeSegment(19530) + replica.removeSegments(19530) require.True(t, replica.hasSegment(segID1, true)) require.True(t, replica.hasSegment(segID2, true)) require.False(t, replica.hasSegment(19530, true)) @@ -520,7 +520,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { plan.Timetravel = Timestamp(10000) replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1}) replica.addFlushedSegmentWithPKs(segID2, collID, partID, "channelname", 2, []UniqueID{9}) - replica.removeSegment(19530) + replica.removeSegments(19530) require.True(t, replica.hasSegment(segID1, true)) require.True(t, replica.hasSegment(segID2, true)) require.False(t, replica.hasSegment(19530, true)) diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 218e71aa4b..afe7a885af 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -69,6 +69,16 @@ func (ddb *DelDataBuf) updateTimeRange(tr TimeRange) { } } +func (ddb *DelDataBuf) updateFromBuf(buf *DelDataBuf) { + ddb.updateSize(buf.EntriesNum) + + tr := TimeRange{timestampMax: buf.TimestampTo, timestampMin: buf.TimestampFrom} + ddb.updateTimeRange(tr) + + ddb.delData.Pks = append(ddb.delData.Pks, buf.delData.Pks...) + ddb.delData.Tss = append(ddb.delData.Tss, buf.delData.Tss...) +} + func newDelDataBuf() *DelDataBuf { return &DelDataBuf{ delData: &DeleteData{}, @@ -91,6 +101,24 @@ func (dn *deleteNode) Close() { func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) error { log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys), zap.String("vChannelName", dn.channelName)) + // Update delBuf for merged segments + compactedTo2From := dn.replica.listCompactedSegmentIDs() + for compactedTo, compactedFrom := range compactedTo2From { + compactToDelBuff := newDelDataBuf() + for _, segID := range compactedFrom { + value, loaded := dn.delBuf.LoadAndDelete(segID) + if loaded { + compactToDelBuff.updateFromBuf(value.(*DelDataBuf)) + } + } + dn.delBuf.Store(compactedTo, compactToDelBuff) + dn.replica.removeSegments(compactedFrom...) + log.Debug("update delBuf for merged segments", + zap.Int64("compactedTo segmentID", compactedTo), + zap.Int64s("compactedFrom segmentIDs", compactedFrom), + ) + } + segIDToPkMap := make(map[UniqueID][]int64) segIDToTsMap := make(map[UniqueID][]uint64) diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index e8c3faff87..e021a472d1 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -34,19 +34,29 @@ import ( type mockReplica struct { Replica - newSegments map[UniqueID]*Segment - normalSegments map[UniqueID]*Segment - flushedSegments map[UniqueID]*Segment + newSegments map[UniqueID]*Segment + normalSegments map[UniqueID]*Segment + flushedSegments map[UniqueID]*Segment + compactedSegments map[UniqueID]*Segment } +var _ Replica = (*mockReplica)(nil) + func newMockReplica() *mockReplica { return &mockReplica{ - newSegments: make(map[int64]*Segment), - normalSegments: make(map[int64]*Segment), - flushedSegments: make(map[int64]*Segment), + newSegments: make(map[int64]*Segment), + normalSegments: make(map[int64]*Segment), + flushedSegments: make(map[int64]*Segment), + compactedSegments: make(map[int64]*Segment), } } +func (replica *mockReplica) listCompactedSegmentIDs() map[UniqueID][]UniqueID { + return make(map[UniqueID][]UniqueID) +} + +func (replica *mockReplica) removeSegments(segIDs ...UniqueID) {} + func (replica *mockReplica) filterSegments(channelName string, partitionID UniqueID) []*Segment { results := make([]*Segment, 0) for _, value := range replica.newSegments { diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index 7003b188ab..75e2127a35 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -60,10 +60,10 @@ type Replica interface { updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition) updateSegmentCheckPoint(segID UniqueID) updateSegmentPKRange(segID UniqueID, rowIDs []int64) - refreshFlushedSegmentPKRange(segID UniqueID, rowIDs []int64) - addFlushedSegmentWithPKs(segID, collID, partID UniqueID, channelName string, numOfRow int64, rowIDs []int64) + mergeFlushedSegments(segID, collID, partID UniqueID, compactedFrom []UniqueID, channelName string, numOfRows int64) hasSegment(segID UniqueID, countFlushed bool) bool - removeSegment(segID UniqueID) + removeSegments(segID ...UniqueID) + listCompactedSegmentIDs() map[UniqueID][]UniqueID updateStatistics(segID UniqueID, numRows int64) refreshFlushedSegStatistics(segID UniqueID, numRows int64) @@ -81,6 +81,7 @@ type Segment struct { isNew atomic.Value // bool isFlushed atomic.Value // bool channelName string + compactedTo UniqueID checkPoint segmentCheckPoint startPos *internalpb.MsgPosition // TODO readonly @@ -98,10 +99,11 @@ type SegmentReplica struct { collectionID UniqueID collSchema *schemapb.CollectionSchema - segMu sync.RWMutex - newSegments map[UniqueID]*Segment - normalSegments map[UniqueID]*Segment - flushedSegments map[UniqueID]*Segment + segMu sync.RWMutex + newSegments map[UniqueID]*Segment + normalSegments map[UniqueID]*Segment + flushedSegments map[UniqueID]*Segment + compactedSegments map[UniqueID]*Segment metaService *metaService minIOKV kv.BaseKV @@ -149,9 +151,10 @@ func newReplica(ctx context.Context, rc types.RootCoord, collID UniqueID) (*Segm replica := &SegmentReplica{ collectionID: collID, - newSegments: make(map[UniqueID]*Segment), - normalSegments: make(map[UniqueID]*Segment), - flushedSegments: make(map[UniqueID]*Segment), + newSegments: make(map[UniqueID]*Segment), + normalSegments: make(map[UniqueID]*Segment), + flushedSegments: make(map[UniqueID]*Segment), + compactedSegments: make(map[UniqueID]*Segment), metaService: metaService, minIOKV: minIOKV, @@ -267,7 +270,28 @@ func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID return nil } +func (replica *SegmentReplica) listCompactedSegmentIDs() map[UniqueID][]UniqueID { + replica.segMu.Lock() + defer replica.segMu.Unlock() + + compactedTo2From := make(map[UniqueID][]UniqueID) + + for segID, seg := range replica.compactedSegments { + var from []UniqueID + from, ok := compactedTo2From[seg.compactedTo] + if !ok { + from = []UniqueID{} + } + + from = append(from, segID) + compactedTo2From[seg.compactedTo] = from + } + + return compactedTo2From +} + // filterSegments return segments with same channelName and partition ID +// get all segments func (replica *SegmentReplica) filterSegments(channelName string, partitionID UniqueID) []*Segment { replica.segMu.Lock() defer replica.segMu.Unlock() @@ -528,13 +552,18 @@ func (replica *SegmentReplica) updateSegmentPKRange(segID UniqueID, pks []int64) log.Warn("No match segment to update PK range", zap.Int64("ID", segID)) } -func (replica *SegmentReplica) removeSegment(segID UniqueID) { +func (replica *SegmentReplica) removeSegments(segIDs ...UniqueID) { replica.segMu.Lock() defer replica.segMu.Unlock() - delete(replica.newSegments, segID) - delete(replica.normalSegments, segID) - delete(replica.flushedSegments, segID) + log.Debug("remove segments if exist", zap.Int64s("segmentIDs", segIDs)) + + for _, segID := range segIDs { + delete(replica.newSegments, segID) + delete(replica.normalSegments, segID) + delete(replica.flushedSegments, segID) + delete(replica.compactedSegments, segID) + } } // hasSegment checks whether this replica has a segment according to segment ID. @@ -660,22 +689,60 @@ func (replica *SegmentReplica) updateSegmentCheckPoint(segID UniqueID) { log.Warn("There's no segment", zap.Int64("ID", segID)) } -// please call hasSegment first -func (replica *SegmentReplica) refreshFlushedSegmentPKRange(segID UniqueID, rowIDs []int64) { - replica.segMu.Lock() - defer replica.segMu.Unlock() - - seg, ok := replica.flushedSegments[segID] - if ok { - seg.pkFilter.ClearAll() - seg.updatePKRange(rowIDs) +func (replica *SegmentReplica) mergeFlushedSegments(segID, collID, partID UniqueID, compactedFrom []UniqueID, channelName string, numOfRows int64) { + if collID != replica.collectionID { + log.Warn("Mismatch collection", + zap.Int64("input ID", collID), + zap.Int64("expected ID", replica.collectionID)) return } - log.Warn("No match segment to update PK range", zap.Int64("ID", segID)) + log.Debug("merge flushed segments", + zap.Int64("compacted To segmentID", segID), + zap.Int64s("compacted From segmentIDs", compactedFrom), + zap.Int64("partition ID", partID), + zap.String("channel name", channelName), + ) + + seg := &Segment{ + collectionID: collID, + partitionID: partID, + segmentID: segID, + channelName: channelName, + numRows: numOfRows, + + pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive), + minPK: math.MaxInt64, // use max value, represents no value + maxPK: math.MinInt64, // use min value represents no value + } + + replica.segMu.Lock() + for _, ID := range compactedFrom { + s, ok := replica.flushedSegments[ID] + + if !ok { + log.Warn("no match flushed segment to merge from", zap.Int64("segmentID", ID)) + continue + } + + s.compactedTo = segID + replica.compactedSegments[ID] = s + delete(replica.flushedSegments, ID) + + seg.pkFilter.Merge(s.pkFilter) + } + replica.segMu.Unlock() + + seg.isNew.Store(false) + seg.isFlushed.Store(true) + + replica.segMu.Lock() + replica.flushedSegments[segID] = seg + replica.segMu.Unlock() } -func (replica *SegmentReplica) addFlushedSegmentWithPKs(segID, collID, partID UniqueID, channelName string, numOfRows int64, rowIDs []int64) { +// for tests only +func (replica *SegmentReplica) addFlushedSegmentWithPKs(segID, collID, partID UniqueID, channelName string, numOfRows int64, pks []UniqueID) { if collID != replica.collectionID { log.Warn("Mismatch collection", zap.Int64("input ID", collID), @@ -702,7 +769,7 @@ func (replica *SegmentReplica) addFlushedSegmentWithPKs(segID, collID, partID Un maxPK: math.MinInt64, // use min value represents no value } - seg.updatePKRange(rowIDs) + seg.updatePKRange(pks) seg.isNew.Store(false) seg.isFlushed.Store(true) diff --git a/internal/datanode/segment_replica_test.go b/internal/datanode/segment_replica_test.go index dfcecb4eb2..17b433523b 100644 --- a/internal/datanode/segment_replica_test.go +++ b/internal/datanode/segment_replica_test.go @@ -249,19 +249,6 @@ func TestSegmentReplica(t *testing.T) { func TestSegmentReplica_InterfaceMethod(t *testing.T) { rc := &RootCoordFactory{} - t.Run("Test refreshFlushedSegmentPKRange", func(t *testing.T) { - replica, err := newReplica(context.TODO(), rc, 1) - require.NoError(t, err) - - require.False(t, replica.hasSegment(100, true)) - replica.refreshFlushedSegmentPKRange(100, []int64{10}) - - replica.addFlushedSegmentWithPKs(100, 1, 10, "a", 1, []int64{9}) - require.True(t, replica.hasSegment(100, true)) - replica.refreshFlushedSegmentPKRange(100, []int64{10}) - - }) - t.Run("Test addFlushedSegmentWithPKs", func(t *testing.T) { tests := []struct { isvalid bool @@ -476,7 +463,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { sr.flushedSegments[test.flushedSegID] = &Segment{} } sr.updateSegmentEndPosition(test.inSegID, new(internalpb.MsgPosition)) - sr.removeSegment(0) + sr.removeSegments(0) }) } @@ -580,7 +567,6 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { } }) } - }) t.Run("Test listAllSegmentIDs", func(t *testing.T) { @@ -592,7 +578,6 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { ids := sr.listAllSegmentIDs() assert.ElementsMatch(t, []UniqueID{1, 2, 3}, ids) - }) t.Run("Test_addSegmentMinIOLoadError", func(t *testing.T) { @@ -634,6 +619,28 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { assert.NotNil(t, err) }) + t.Run("Test_mergeFlushedSegments", func(t *testing.T) { + sr, err := newReplica(context.Background(), rc, 1) + assert.Nil(t, err) + + sr.addFlushedSegmentWithPKs(1, 1, 0, "channel", 10, []UniqueID{1}) + sr.addFlushedSegmentWithPKs(2, 1, 0, "channel", 10, []UniqueID{1}) + require.True(t, sr.hasSegment(1, true)) + require.True(t, sr.hasSegment(2, true)) + + sr.mergeFlushedSegments(3, 1, 0, []UniqueID{1, 2}, "channel", 15) + assert.True(t, sr.hasSegment(3, true)) + assert.False(t, sr.hasSegment(1, true)) + assert.False(t, sr.hasSegment(2, true)) + + to2from := sr.listCompactedSegmentIDs() + assert.NotEmpty(t, to2from) + + from, ok := to2from[3] + assert.True(t, ok) + assert.ElementsMatch(t, []UniqueID{1, 2}, from) + }) + } func TestInnerFunctionSegment(t *testing.T) { rc := &RootCoordFactory{}