From 863f1bb34eba6be9703ecd56088046a594ef946f Mon Sep 17 00:00:00 2001 From: godchen Date: Tue, 9 Nov 2021 15:01:17 +0800 Subject: [PATCH] Fix multi delete data not effect (#11422) Signed-off-by: godchen --- internal/datanode/binlog_io.go | 9 +- internal/datanode/binlog_io_test.go | 8 +- internal/datanode/compactor.go | 31 +++--- internal/datanode/compactor_test.go | 107 +++++++++++--------- internal/datanode/flow_graph_delete_node.go | 24 ++--- internal/querynode/segment_loader.go | 9 +- internal/storage/data_codec.go | 48 ++++++--- internal/storage/data_codec_test.go | 6 +- 8 files changed, 136 insertions(+), 106 deletions(-) diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index aedabf17f6..797fed8464 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -114,6 +114,7 @@ func (b *binlogIO) upload( var p = &cpaths{ inPaths: make([]*datapb.FieldBinlog, 0), statsPaths: make([]*datapb.FieldBinlog, 0), + deltaInfo: &datapb.DeltaLogInfo{}, } kvs := make(map[string]string) @@ -135,7 +136,7 @@ func (b *binlogIO) upload( } // If there are delta logs - if dData != nil { + if dData.RowCount > 0 { k, v, err := b.genDeltaBlobs(dData, meta.GetID(), partID, segID) if err != nil { log.Warn("generate delta blobs wrong", zap.Error(err)) @@ -143,10 +144,8 @@ func (b *binlogIO) upload( } kvs[k] = bytes.NewBuffer(v).String() - p.deltaInfo = &datapb.DeltaLogInfo{ - RecordEntries: uint64(len(v)), - DeltaLogPath: k, - } + p.deltaInfo.RecordEntries = uint64(len(v)) + p.deltaInfo.DeltaLogPath = k } success := make(chan struct{}) diff --git a/internal/datanode/binlog_io_test.go b/internal/datanode/binlog_io_test.go index d8c5c098ee..24850120ce 100644 --- a/internal/datanode/binlog_io_test.go +++ b/internal/datanode/binlog_io_test.go @@ -41,7 +41,8 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { iData := genInsertData() dData := &DeleteData{ - Data: map[int64]int64{888: 666666}, + Pks: []int64{888}, + Tss: []uint64{666666}, } p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) @@ -127,7 +128,7 @@ func TestBinlogIOInnerMethods(t *testing.T) { tests := []struct { isvalid bool deletepk int64 - ts int64 + ts uint64 description string }{ @@ -139,7 +140,8 @@ func TestBinlogIOInnerMethods(t *testing.T) { if test.isvalid { k, v, err := b.genDeltaBlobs(&DeleteData{ - Data: map[int64]int64{test.deletepk: test.ts}, + Pks: []int64{test.deletepk}, + Tss: []uint64{test.ts}, }, meta.GetID(), 10, 1) assert.NoError(t, err) diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 55e9dc9d0d..a5e0732358 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -97,9 +97,11 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT var ( pk2ts = make(map[UniqueID]Timestamp) dbuff = &DelDataBuf{ - delData: &DeleteData{Data: make(map[UniqueID]UniqueID)}, - tsFrom: math.MaxUint64, - tsTo: 0, + delData: &DeleteData{ + Pks: make([]UniqueID, 0), + Tss: make([]Timestamp, 0)}, + tsFrom: math.MaxUint64, + tsTo: 0, } ) @@ -110,13 +112,16 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT return nil, nil, err } - for pk, ts := range dData.Data { - if timetravelTs != Timestamp(0) && Timestamp(ts) <= timetravelTs { - pk2ts[pk] = Timestamp(ts) + for i := int64(0); i < dData.RowCount; i++ { + pk := dData.Pks[i] + ts := dData.Tss[i] + + if timetravelTs != Timestamp(0) && Timestamp(dData.Tss[i]) <= timetravelTs { + pk2ts[pk] = ts continue } - dbuff.delData.Data[pk] = ts + dbuff.delData.Append(pk, ts) if Timestamp(ts) < dbuff.tsFrom { dbuff.tsFrom = Timestamp(ts) @@ -128,7 +133,7 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT } } - dbuff.updateSize(int64(len(dbuff.delData.Data))) + dbuff.updateSize(dbuff.delData.RowCount) return pk2ts, dbuff, nil } @@ -146,10 +151,9 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, fID2Content = make(map[UniqueID][]interface{}) ) + // get dim for _, fs := range schema.GetFields() { fID2Type[fs.GetFieldID()] = fs.GetDataType() - - // get dim if fs.GetDataType() == schemapb.DataType_FloatVector || fs.GetDataType() == schemapb.DataType_BinaryVector { for _, t := range fs.GetTypeParams() { @@ -165,7 +169,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, } for mergeItr.HasNext() { - // There will be no error if HasNext() returns true + // no error if HasNext() returns true vInter, _ := mergeItr.Next() v, ok := vInter.(*storage.Value) @@ -349,13 +353,12 @@ func (t *compactionTask) compact() error { mergeItr := storage.NewMergeIterator(iItr) - deltaMap, deltaBuf, err := t.mergeDeltalogs(dblobs, t.plan.GetTimetravel()) + deltaPk2Ts, deltaBuf, err := t.mergeDeltalogs(dblobs, t.plan.GetTimetravel()) if err != nil { - log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) return err } - iDatas, numRows, err := t.merge(mergeItr, deltaMap, meta.GetSchema()) + iDatas, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema()) if err != nil { log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) return err diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 4ceffb8be4..7974bd50b0 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -112,12 +112,22 @@ func TestCompactionTaskInnerMethods(t *testing.T) { } blobs, err := getDeltaBlobs( - 100, map[int64]int64{ - 1: 20000, - 2: 20001, - 3: 20002, - 4: 30000, - 5: 50000, + 100, + []UniqueID{ + 1, + 2, + 3, + 4, + 5, + 1, + }, + []Timestamp{ + 20000, + 20001, + 20002, + 30000, + 50000, + 50000, }) require.NoError(t, err) @@ -144,7 +154,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) { pk2ts, db, err := task.mergeDeltalogs(test.dBlobs, test.timetravel) assert.NoError(t, err) assert.Equal(t, 3, len(pk2ts)) - assert.Equal(t, int64(2), db.size) + assert.Equal(t, int64(3), db.size) + assert.Equal(t, int64(3), db.delData.RowCount) + assert.ElementsMatch(t, []UniqueID{1, 4, 5}, db.delData.Pks) + assert.ElementsMatch(t, []Timestamp{30000, 50000, 50000}, db.delData.Tss) } else { @@ -160,14 +173,17 @@ func TestCompactionTaskInnerMethods(t *testing.T) { t.Run("Multiple segments with timetravel", func(t *testing.T) { tests := []struct { - segIDA UniqueID - dataA map[int64]int64 + segIDA UniqueID + dataApk []UniqueID + dataAts []Timestamp - segIDB UniqueID - dataB map[int64]int64 + segIDB UniqueID + dataBpk []UniqueID + dataBts []Timestamp - segIDC UniqueID - dataC map[int64]int64 + segIDC UniqueID + dataCpk []UniqueID + dataCts []Timestamp timetravel Timestamp expectedpk2ts int @@ -175,30 +191,15 @@ func TestCompactionTaskInnerMethods(t *testing.T) { description string }{ { - 0, nil, - 100, map[int64]int64{ - 1: 20000, - 2: 30000, - 3: 20005}, - 200, map[int64]int64{ - 4: 50000, - 5: 50001, - 6: 50002}, + 0, nil, nil, + 100, []UniqueID{1, 2, 3}, []Timestamp{20000, 30000, 20005}, + 200, []UniqueID{4, 5, 6}, []Timestamp{50000, 50001, 50002}, 40000, 3, 3, "2 segments with timetravel 40000", }, { - 300, map[int64]int64{ - 10: 20001, - 20: 40001, - }, - 100, map[int64]int64{ - 1: 20000, - 2: 30000, - 3: 20005}, - 200, map[int64]int64{ - 4: 50000, - 5: 50001, - 6: 50002}, + 300, []UniqueID{10, 20}, []Timestamp{20001, 40001}, + 100, []UniqueID{1, 2, 3}, []Timestamp{20000, 30000, 20005}, + 200, []UniqueID{4, 5, 6}, []Timestamp{50000, 50001, 50002}, 40000, 4, 4, "3 segments with timetravel 40000", }, } @@ -207,17 +208,17 @@ func TestCompactionTaskInnerMethods(t *testing.T) { t.Run(test.description, func(t *testing.T) { dBlobs := make(map[UniqueID][]*Blob) if test.segIDA != UniqueID(0) { - d, err := getDeltaBlobs(test.segIDA, test.dataA) + d, err := getDeltaBlobs(test.segIDA, test.dataApk, test.dataAts) require.NoError(t, err) dBlobs[test.segIDA] = d } if test.segIDB != UniqueID(0) { - d, err := getDeltaBlobs(test.segIDB, test.dataB) + d, err := getDeltaBlobs(test.segIDB, test.dataBpk, test.dataBts) require.NoError(t, err) dBlobs[test.segIDB] = d } if test.segIDC != UniqueID(0) { - d, err := getDeltaBlobs(test.segIDC, test.dataC) + d, err := getDeltaBlobs(test.segIDC, test.dataCpk, test.dataCts) require.NoError(t, err) dBlobs[test.segIDC] = d } @@ -258,8 +259,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) } -func getDeltaBlobs(segID UniqueID, pk2ts map[int64]int64) ([]*Blob, error) { - deltaData := &DeleteData{Data: pk2ts} +func getDeltaBlobs(segID UniqueID, pks []UniqueID, tss []Timestamp) ([]*Blob, error) { + deltaData := &DeleteData{ + Pks: pks, + Tss: tss, + RowCount: int64(len(pks)), + } dCodec := storage.NewDeleteCodec() blob, err := dCodec.Serialize(1, 10, segID, deltaData) @@ -326,9 +331,11 @@ func TestCompactorInterfaceMethods(t *testing.T) { iData := genInsertData() meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name") - dData := &DeleteData{Data: map[int64]int64{ - 1: 20000, - }} + dData := &DeleteData{ + Pks: []UniqueID{1}, + Tss: []Timestamp{20000}, + RowCount: 1, + } cpaths, err := mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta) require.NoError(t, err) @@ -407,13 +414,17 @@ func TestCompactorInterfaceMethods(t *testing.T) { meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name") iData1 := genInsertDataWithRowIDs([2]int64{1, 2}) - dData1 := &DeleteData{Data: map[int64]int64{ - 1: 20000, - }} + dData1 := &DeleteData{ + Pks: []UniqueID{1}, + Tss: []Timestamp{20000}, + RowCount: 1, + } iData2 := genInsertDataWithRowIDs([2]int64{9, 10}) - dData2 := &DeleteData{Data: map[int64]int64{ - 9: 30000, - }} + dData2 := &DeleteData{ + Pks: []UniqueID{9}, + Tss: []Timestamp{30000}, + RowCount: 1, + } cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, dData1, meta) require.NoError(t, err) diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index a297eb79a6..49e852476a 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -72,12 +72,10 @@ func (ddb *DelDataBuf) updateTimeRange(tr TimeRange) { func newDelDataBuf() *DelDataBuf { return &DelDataBuf{ - delData: &DeleteData{ - Data: make(map[int64]int64), - }, - size: 0, - tsFrom: math.MaxUint64, - tsTo: 0, + delData: &DeleteData{}, + size: 0, + tsFrom: math.MaxUint64, + tsTo: 0, } } @@ -93,7 +91,7 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys)) segIDToPkMap := make(map[UniqueID][]int64) - segIDToTsMap := make(map[UniqueID][]int64) + segIDToTsMap := make(map[UniqueID][]uint64) m := dn.filterSegmentByPK(msg.PartitionID, msg.PrimaryKeys) for i, pk := range msg.PrimaryKeys { @@ -104,7 +102,7 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er } for _, segID := range segIDs { segIDToPkMap[segID] = append(segIDToPkMap[segID], pk) - segIDToTsMap[segID] = append(segIDToTsMap[segID], int64(msg.Timestamps[i])) + segIDToTsMap[segID] = append(segIDToTsMap[segID], msg.Timestamps[i]) } } @@ -125,8 +123,9 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er delData := delDataBuf.delData for i := 0; i < rows; i++ { - delData.Data[pks[i]] = tss[i] - log.Debug("delete", zap.Int64("primary key", pks[i]), zap.Int64("ts", tss[i])) + delData.Pks = append(delData.Pks, pks[i]) + delData.Tss = append(delData.Tss, tss[i]) + log.Debug("delete", zap.Int64("primary key", pks[i]), zap.Uint64("ts", tss[i])) } // store @@ -145,8 +144,9 @@ func (dn *deleteNode) showDelBuf() { if v, ok := dn.delBuf.Load(segID); ok { delDataBuf, _ := v.(*DelDataBuf) log.Debug("del data buffer status", zap.Int64("segID", segID), zap.Int64("size", delDataBuf.size)) - for pk, ts := range delDataBuf.delData.Data { - log.Debug("del data", zap.Int64("pk", pk), zap.Int64("ts", ts)) + length := len(delDataBuf.delData.Pks) + for i := 0; i < length; i++ { + log.Debug("del data", zap.Int64("pk", delDataBuf.delData.Pks[i]), zap.Uint64("ts", delDataBuf.delData.Tss[i])) } } else { log.Error("segment not exist", zap.Int64("segID", segID)) diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 0ace3b4cc6..20721a3fe0 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -335,14 +335,7 @@ func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb return err } - rowCount := len(deltaData.Data) - pks := make([]int64, 0) - tss := make([]Timestamp, 0) - for pk, ts := range deltaData.Data { - pks = append(pks, pk) - tss = append(tss, Timestamp(ts)) - } - err = segment.segmentLoadDeletedRecord(pks, tss, int64(rowCount)) + err = segment.segmentLoadDeletedRecord(deltaData.Pks, deltaData.Tss, deltaData.RowCount) if err != nil { return err } diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 3fdcdfa5b5..3aa47dca79 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -605,7 +605,16 @@ func (insertCodec *InsertCodec) Close() error { // DeleteData saves each entity delete message represented as map. // timestamp represents the time when this instance was deleted type DeleteData struct { - Data map[int64]int64 // primary key to timestamp + Pks []int64 // primary keys + Tss []Timestamp // timestamps + RowCount int64 +} + +// Append append 1 pk&ts pair to DeleteData +func (data *DeleteData) Append(pk UniqueID, ts Timestamp) { + data.Pks = append(data.Pks, pk) + data.Tss = append(data.Tss, ts) + data.RowCount++ } // DeleteCodec serializes and deserializes the delete data @@ -626,24 +635,31 @@ func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID Uni if err != nil { return nil, err } + if len(data.Pks) != len(data.Tss) { + return nil, fmt.Errorf("The length of pks, and TimeStamps is not equal") + } + length := len(data.Pks) sizeTotal := 0 - startTs, endTs := math.MaxInt64, math.MinInt64 - for key, value := range data.Data { - if value < int64(startTs) { - startTs = int(value) + var startTs, endTs Timestamp + startTs, endTs = math.MaxUint64, 0 + for i := 0; i < length; i++ { + pk := data.Pks[i] + ts := data.Tss[i] + if ts < startTs { + startTs = ts } - if value > int64(endTs) { - endTs = int(value) + if ts > endTs { + endTs = ts } - err := eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d", key, value)) + err := eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d", pk, ts)) if err != nil { return nil, err } - sizeTotal += binary.Size(key) - sizeTotal += binary.Size(value) + sizeTotal += binary.Size(pk) + sizeTotal += binary.Size(ts) } - eventWriter.SetEventTimestamp(uint64(startTs), uint64(endTs)) - binlogWriter.SetEventTimeStamp(uint64(startTs), uint64(endTs)) + eventWriter.SetEventTimestamp(startTs, endTs) + binlogWriter.SetEventTimeStamp(startTs, endTs) // https://github.com/milvus-io/milvus/issues/9620 // It's a little complicated to count the memory size of a map. @@ -676,7 +692,7 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID } var pid, sid UniqueID - result := &DeleteData{Data: make(map[int64]int64)} + result := &DeleteData{} for _, blob := range blobs { binlogReader, err := NewBinlogReader(blob.Value) if err != nil { @@ -710,17 +726,19 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return InvalidUniqueID, InvalidUniqueID, nil, err } - ts, err := strconv.ParseInt(splits[1], 10, 64) + ts, err := strconv.ParseUint(splits[1], 10, 64) if err != nil { return InvalidUniqueID, InvalidUniqueID, nil, err } - result.Data[pk] = ts + result.Pks = append(result.Pks, pk) + result.Tss = append(result.Tss, ts) } deleteCodec.readerCloseFunc = append(deleteCodec.readerCloseFunc, readerClose(binlogReader)) } + result.RowCount = int64(len(result.Pks)) return pid, sid, result, nil } diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 829eca7b09..957a201fe9 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -313,8 +313,12 @@ func TestInsertCodec(t *testing.T) { func TestDeleteCodec(t *testing.T) { deleteCodec := NewDeleteCodec() deleteData := &DeleteData{ - Data: map[int64]int64{1: 43757345, 2: 23578294723}, + Pks: []int64{1}, + Tss: []uint64{43757345}, + RowCount: int64(1), } + + deleteData.Append(2, 23578294723) blob, err := deleteCodec.Serialize(CollectionID, 1, 1, deleteData) assert.Nil(t, err)