Fix delete node not aware of merged segments error (#14190)

This PR made merge gracefully remove merged segment by moving
them into `compactedSegments`

And everytime when delete_node operates, if compactedSegments
have contents, delete_node will replace the latest segment in delBuf
merged from the merged segments, and truely remove the compactedSegments

Fixes: #14085

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2021-12-27 11:02:18 +08:00 committed by GitHub
parent 14a9678a5a
commit 3cfa88d6f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 165 additions and 64 deletions

View File

@ -25,7 +25,6 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -33,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
@ -442,22 +442,11 @@ func (t *compactionTask) compact() error {
// Compaction I: update pk range.
// Compaction II: remove the segments and add a new flushed segment with pk range.
fd := make([]UniqueID, 0, numRows)
if numRows > 0 {
for _, iData := range iDatas {
fd = append(fd, iData.Data[common.TimeStampField].(*storage.Int64FieldData).Data...)
}
}
if t.hasSegment(targetSegID, true) {
t.refreshFlushedSegStatistics(targetSegID, numRows)
t.refreshFlushedSegmentPKRange(targetSegID, fd)
// no need to shorten the PK range of a segment, deleting dup PKs is valid
} else {
t.addFlushedSegmentWithPKs(targetSegID, collID, partID, t.plan.GetChannel(), numRows, fd)
for _, seg := range segIDs {
t.removeSegment(seg)
}
t.mergeFlushedSegments(targetSegID, collID, partID, segIDs, t.plan.GetChannel(), numRows)
}
ti.injectDone(true)

View File

@ -496,7 +496,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
plan.Timetravel = Timestamp(25000)
replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1})
replica.addFlushedSegmentWithPKs(segID2, collID, partID, "channelname", 2, []UniqueID{9})
replica.removeSegment(19530)
replica.removeSegments(19530)
require.True(t, replica.hasSegment(segID1, true))
require.True(t, replica.hasSegment(segID2, true))
require.False(t, replica.hasSegment(19530, true))
@ -520,7 +520,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
plan.Timetravel = Timestamp(10000)
replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1})
replica.addFlushedSegmentWithPKs(segID2, collID, partID, "channelname", 2, []UniqueID{9})
replica.removeSegment(19530)
replica.removeSegments(19530)
require.True(t, replica.hasSegment(segID1, true))
require.True(t, replica.hasSegment(segID2, true))
require.False(t, replica.hasSegment(19530, true))

View File

@ -69,6 +69,16 @@ func (ddb *DelDataBuf) updateTimeRange(tr TimeRange) {
}
}
func (ddb *DelDataBuf) updateFromBuf(buf *DelDataBuf) {
ddb.updateSize(buf.EntriesNum)
tr := TimeRange{timestampMax: buf.TimestampTo, timestampMin: buf.TimestampFrom}
ddb.updateTimeRange(tr)
ddb.delData.Pks = append(ddb.delData.Pks, buf.delData.Pks...)
ddb.delData.Tss = append(ddb.delData.Tss, buf.delData.Tss...)
}
func newDelDataBuf() *DelDataBuf {
return &DelDataBuf{
delData: &DeleteData{},
@ -91,6 +101,24 @@ func (dn *deleteNode) Close() {
func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) error {
log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys), zap.String("vChannelName", dn.channelName))
// Update delBuf for merged segments
compactedTo2From := dn.replica.listCompactedSegmentIDs()
for compactedTo, compactedFrom := range compactedTo2From {
compactToDelBuff := newDelDataBuf()
for _, segID := range compactedFrom {
value, loaded := dn.delBuf.LoadAndDelete(segID)
if loaded {
compactToDelBuff.updateFromBuf(value.(*DelDataBuf))
}
}
dn.delBuf.Store(compactedTo, compactToDelBuff)
dn.replica.removeSegments(compactedFrom...)
log.Debug("update delBuf for merged segments",
zap.Int64("compactedTo segmentID", compactedTo),
zap.Int64s("compactedFrom segmentIDs", compactedFrom),
)
}
segIDToPkMap := make(map[UniqueID][]int64)
segIDToTsMap := make(map[UniqueID][]uint64)

View File

@ -34,19 +34,29 @@ import (
type mockReplica struct {
Replica
newSegments map[UniqueID]*Segment
normalSegments map[UniqueID]*Segment
flushedSegments map[UniqueID]*Segment
newSegments map[UniqueID]*Segment
normalSegments map[UniqueID]*Segment
flushedSegments map[UniqueID]*Segment
compactedSegments map[UniqueID]*Segment
}
var _ Replica = (*mockReplica)(nil)
func newMockReplica() *mockReplica {
return &mockReplica{
newSegments: make(map[int64]*Segment),
normalSegments: make(map[int64]*Segment),
flushedSegments: make(map[int64]*Segment),
newSegments: make(map[int64]*Segment),
normalSegments: make(map[int64]*Segment),
flushedSegments: make(map[int64]*Segment),
compactedSegments: make(map[int64]*Segment),
}
}
func (replica *mockReplica) listCompactedSegmentIDs() map[UniqueID][]UniqueID {
return make(map[UniqueID][]UniqueID)
}
func (replica *mockReplica) removeSegments(segIDs ...UniqueID) {}
func (replica *mockReplica) filterSegments(channelName string, partitionID UniqueID) []*Segment {
results := make([]*Segment, 0)
for _, value := range replica.newSegments {

View File

@ -60,10 +60,10 @@ type Replica interface {
updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition)
updateSegmentCheckPoint(segID UniqueID)
updateSegmentPKRange(segID UniqueID, rowIDs []int64)
refreshFlushedSegmentPKRange(segID UniqueID, rowIDs []int64)
addFlushedSegmentWithPKs(segID, collID, partID UniqueID, channelName string, numOfRow int64, rowIDs []int64)
mergeFlushedSegments(segID, collID, partID UniqueID, compactedFrom []UniqueID, channelName string, numOfRows int64)
hasSegment(segID UniqueID, countFlushed bool) bool
removeSegment(segID UniqueID)
removeSegments(segID ...UniqueID)
listCompactedSegmentIDs() map[UniqueID][]UniqueID
updateStatistics(segID UniqueID, numRows int64)
refreshFlushedSegStatistics(segID UniqueID, numRows int64)
@ -81,6 +81,7 @@ type Segment struct {
isNew atomic.Value // bool
isFlushed atomic.Value // bool
channelName string
compactedTo UniqueID
checkPoint segmentCheckPoint
startPos *internalpb.MsgPosition // TODO readonly
@ -98,10 +99,11 @@ type SegmentReplica struct {
collectionID UniqueID
collSchema *schemapb.CollectionSchema
segMu sync.RWMutex
newSegments map[UniqueID]*Segment
normalSegments map[UniqueID]*Segment
flushedSegments map[UniqueID]*Segment
segMu sync.RWMutex
newSegments map[UniqueID]*Segment
normalSegments map[UniqueID]*Segment
flushedSegments map[UniqueID]*Segment
compactedSegments map[UniqueID]*Segment
metaService *metaService
minIOKV kv.BaseKV
@ -149,9 +151,10 @@ func newReplica(ctx context.Context, rc types.RootCoord, collID UniqueID) (*Segm
replica := &SegmentReplica{
collectionID: collID,
newSegments: make(map[UniqueID]*Segment),
normalSegments: make(map[UniqueID]*Segment),
flushedSegments: make(map[UniqueID]*Segment),
newSegments: make(map[UniqueID]*Segment),
normalSegments: make(map[UniqueID]*Segment),
flushedSegments: make(map[UniqueID]*Segment),
compactedSegments: make(map[UniqueID]*Segment),
metaService: metaService,
minIOKV: minIOKV,
@ -267,7 +270,28 @@ func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID
return nil
}
func (replica *SegmentReplica) listCompactedSegmentIDs() map[UniqueID][]UniqueID {
replica.segMu.Lock()
defer replica.segMu.Unlock()
compactedTo2From := make(map[UniqueID][]UniqueID)
for segID, seg := range replica.compactedSegments {
var from []UniqueID
from, ok := compactedTo2From[seg.compactedTo]
if !ok {
from = []UniqueID{}
}
from = append(from, segID)
compactedTo2From[seg.compactedTo] = from
}
return compactedTo2From
}
// filterSegments return segments with same channelName and partition ID
// get all segments
func (replica *SegmentReplica) filterSegments(channelName string, partitionID UniqueID) []*Segment {
replica.segMu.Lock()
defer replica.segMu.Unlock()
@ -528,13 +552,18 @@ func (replica *SegmentReplica) updateSegmentPKRange(segID UniqueID, pks []int64)
log.Warn("No match segment to update PK range", zap.Int64("ID", segID))
}
func (replica *SegmentReplica) removeSegment(segID UniqueID) {
func (replica *SegmentReplica) removeSegments(segIDs ...UniqueID) {
replica.segMu.Lock()
defer replica.segMu.Unlock()
delete(replica.newSegments, segID)
delete(replica.normalSegments, segID)
delete(replica.flushedSegments, segID)
log.Debug("remove segments if exist", zap.Int64s("segmentIDs", segIDs))
for _, segID := range segIDs {
delete(replica.newSegments, segID)
delete(replica.normalSegments, segID)
delete(replica.flushedSegments, segID)
delete(replica.compactedSegments, segID)
}
}
// hasSegment checks whether this replica has a segment according to segment ID.
@ -660,22 +689,60 @@ func (replica *SegmentReplica) updateSegmentCheckPoint(segID UniqueID) {
log.Warn("There's no segment", zap.Int64("ID", segID))
}
// please call hasSegment first
func (replica *SegmentReplica) refreshFlushedSegmentPKRange(segID UniqueID, rowIDs []int64) {
replica.segMu.Lock()
defer replica.segMu.Unlock()
seg, ok := replica.flushedSegments[segID]
if ok {
seg.pkFilter.ClearAll()
seg.updatePKRange(rowIDs)
func (replica *SegmentReplica) mergeFlushedSegments(segID, collID, partID UniqueID, compactedFrom []UniqueID, channelName string, numOfRows int64) {
if collID != replica.collectionID {
log.Warn("Mismatch collection",
zap.Int64("input ID", collID),
zap.Int64("expected ID", replica.collectionID))
return
}
log.Warn("No match segment to update PK range", zap.Int64("ID", segID))
log.Debug("merge flushed segments",
zap.Int64("compacted To segmentID", segID),
zap.Int64s("compacted From segmentIDs", compactedFrom),
zap.Int64("partition ID", partID),
zap.String("channel name", channelName),
)
seg := &Segment{
collectionID: collID,
partitionID: partID,
segmentID: segID,
channelName: channelName,
numRows: numOfRows,
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
minPK: math.MaxInt64, // use max value, represents no value
maxPK: math.MinInt64, // use min value represents no value
}
replica.segMu.Lock()
for _, ID := range compactedFrom {
s, ok := replica.flushedSegments[ID]
if !ok {
log.Warn("no match flushed segment to merge from", zap.Int64("segmentID", ID))
continue
}
s.compactedTo = segID
replica.compactedSegments[ID] = s
delete(replica.flushedSegments, ID)
seg.pkFilter.Merge(s.pkFilter)
}
replica.segMu.Unlock()
seg.isNew.Store(false)
seg.isFlushed.Store(true)
replica.segMu.Lock()
replica.flushedSegments[segID] = seg
replica.segMu.Unlock()
}
func (replica *SegmentReplica) addFlushedSegmentWithPKs(segID, collID, partID UniqueID, channelName string, numOfRows int64, rowIDs []int64) {
// for tests only
func (replica *SegmentReplica) addFlushedSegmentWithPKs(segID, collID, partID UniqueID, channelName string, numOfRows int64, pks []UniqueID) {
if collID != replica.collectionID {
log.Warn("Mismatch collection",
zap.Int64("input ID", collID),
@ -702,7 +769,7 @@ func (replica *SegmentReplica) addFlushedSegmentWithPKs(segID, collID, partID Un
maxPK: math.MinInt64, // use min value represents no value
}
seg.updatePKRange(rowIDs)
seg.updatePKRange(pks)
seg.isNew.Store(false)
seg.isFlushed.Store(true)

View File

@ -249,19 +249,6 @@ func TestSegmentReplica(t *testing.T) {
func TestSegmentReplica_InterfaceMethod(t *testing.T) {
rc := &RootCoordFactory{}
t.Run("Test refreshFlushedSegmentPKRange", func(t *testing.T) {
replica, err := newReplica(context.TODO(), rc, 1)
require.NoError(t, err)
require.False(t, replica.hasSegment(100, true))
replica.refreshFlushedSegmentPKRange(100, []int64{10})
replica.addFlushedSegmentWithPKs(100, 1, 10, "a", 1, []int64{9})
require.True(t, replica.hasSegment(100, true))
replica.refreshFlushedSegmentPKRange(100, []int64{10})
})
t.Run("Test addFlushedSegmentWithPKs", func(t *testing.T) {
tests := []struct {
isvalid bool
@ -476,7 +463,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
sr.flushedSegments[test.flushedSegID] = &Segment{}
}
sr.updateSegmentEndPosition(test.inSegID, new(internalpb.MsgPosition))
sr.removeSegment(0)
sr.removeSegments(0)
})
}
@ -580,7 +567,6 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
}
})
}
})
t.Run("Test listAllSegmentIDs", func(t *testing.T) {
@ -592,7 +578,6 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
ids := sr.listAllSegmentIDs()
assert.ElementsMatch(t, []UniqueID{1, 2, 3}, ids)
})
t.Run("Test_addSegmentMinIOLoadError", func(t *testing.T) {
@ -634,6 +619,28 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
assert.NotNil(t, err)
})
t.Run("Test_mergeFlushedSegments", func(t *testing.T) {
sr, err := newReplica(context.Background(), rc, 1)
assert.Nil(t, err)
sr.addFlushedSegmentWithPKs(1, 1, 0, "channel", 10, []UniqueID{1})
sr.addFlushedSegmentWithPKs(2, 1, 0, "channel", 10, []UniqueID{1})
require.True(t, sr.hasSegment(1, true))
require.True(t, sr.hasSegment(2, true))
sr.mergeFlushedSegments(3, 1, 0, []UniqueID{1, 2}, "channel", 15)
assert.True(t, sr.hasSegment(3, true))
assert.False(t, sr.hasSegment(1, true))
assert.False(t, sr.hasSegment(2, true))
to2from := sr.listCompactedSegmentIDs()
assert.NotEmpty(t, to2from)
from, ok := to2from[3]
assert.True(t, ok)
assert.ElementsMatch(t, []UniqueID{1, 2}, from)
})
}
func TestInnerFunctionSegment(t *testing.T) {
rc := &RootCoordFactory{}