Fix multi delete data not effect (#11422)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-11-09 15:01:17 +08:00 committed by GitHub
parent c3f0c5a3ff
commit 863f1bb34e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 136 additions and 106 deletions

View File

@ -114,6 +114,7 @@ func (b *binlogIO) upload(
var p = &cpaths{
inPaths: make([]*datapb.FieldBinlog, 0),
statsPaths: make([]*datapb.FieldBinlog, 0),
deltaInfo: &datapb.DeltaLogInfo{},
}
kvs := make(map[string]string)
@ -135,7 +136,7 @@ func (b *binlogIO) upload(
}
// If there are delta logs
if dData != nil {
if dData.RowCount > 0 {
k, v, err := b.genDeltaBlobs(dData, meta.GetID(), partID, segID)
if err != nil {
log.Warn("generate delta blobs wrong", zap.Error(err))
@ -143,10 +144,8 @@ func (b *binlogIO) upload(
}
kvs[k] = bytes.NewBuffer(v).String()
p.deltaInfo = &datapb.DeltaLogInfo{
RecordEntries: uint64(len(v)),
DeltaLogPath: k,
}
p.deltaInfo.RecordEntries = uint64(len(v))
p.deltaInfo.DeltaLogPath = k
}
success := make(chan struct{})

View File

@ -41,7 +41,8 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
iData := genInsertData()
dData := &DeleteData{
Data: map[int64]int64{888: 666666},
Pks: []int64{888},
Tss: []uint64{666666},
}
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
@ -127,7 +128,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
tests := []struct {
isvalid bool
deletepk int64
ts int64
ts uint64
description string
}{
@ -139,7 +140,8 @@ func TestBinlogIOInnerMethods(t *testing.T) {
if test.isvalid {
k, v, err := b.genDeltaBlobs(&DeleteData{
Data: map[int64]int64{test.deletepk: test.ts},
Pks: []int64{test.deletepk},
Tss: []uint64{test.ts},
}, meta.GetID(), 10, 1)
assert.NoError(t, err)

View File

@ -97,9 +97,11 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT
var (
pk2ts = make(map[UniqueID]Timestamp)
dbuff = &DelDataBuf{
delData: &DeleteData{Data: make(map[UniqueID]UniqueID)},
tsFrom: math.MaxUint64,
tsTo: 0,
delData: &DeleteData{
Pks: make([]UniqueID, 0),
Tss: make([]Timestamp, 0)},
tsFrom: math.MaxUint64,
tsTo: 0,
}
)
@ -110,13 +112,16 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT
return nil, nil, err
}
for pk, ts := range dData.Data {
if timetravelTs != Timestamp(0) && Timestamp(ts) <= timetravelTs {
pk2ts[pk] = Timestamp(ts)
for i := int64(0); i < dData.RowCount; i++ {
pk := dData.Pks[i]
ts := dData.Tss[i]
if timetravelTs != Timestamp(0) && Timestamp(dData.Tss[i]) <= timetravelTs {
pk2ts[pk] = ts
continue
}
dbuff.delData.Data[pk] = ts
dbuff.delData.Append(pk, ts)
if Timestamp(ts) < dbuff.tsFrom {
dbuff.tsFrom = Timestamp(ts)
@ -128,7 +133,7 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT
}
}
dbuff.updateSize(int64(len(dbuff.delData.Data)))
dbuff.updateSize(dbuff.delData.RowCount)
return pk2ts, dbuff, nil
}
@ -146,10 +151,9 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
fID2Content = make(map[UniqueID][]interface{})
)
// get dim
for _, fs := range schema.GetFields() {
fID2Type[fs.GetFieldID()] = fs.GetDataType()
// get dim
if fs.GetDataType() == schemapb.DataType_FloatVector ||
fs.GetDataType() == schemapb.DataType_BinaryVector {
for _, t := range fs.GetTypeParams() {
@ -165,7 +169,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
}
for mergeItr.HasNext() {
// There will be no error if HasNext() returns true
// no error if HasNext() returns true
vInter, _ := mergeItr.Next()
v, ok := vInter.(*storage.Value)
@ -349,13 +353,12 @@ func (t *compactionTask) compact() error {
mergeItr := storage.NewMergeIterator(iItr)
deltaMap, deltaBuf, err := t.mergeDeltalogs(dblobs, t.plan.GetTimetravel())
deltaPk2Ts, deltaBuf, err := t.mergeDeltalogs(dblobs, t.plan.GetTimetravel())
if err != nil {
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return err
}
iDatas, numRows, err := t.merge(mergeItr, deltaMap, meta.GetSchema())
iDatas, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema())
if err != nil {
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return err

View File

@ -112,12 +112,22 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
}
blobs, err := getDeltaBlobs(
100, map[int64]int64{
1: 20000,
2: 20001,
3: 20002,
4: 30000,
5: 50000,
100,
[]UniqueID{
1,
2,
3,
4,
5,
1,
},
[]Timestamp{
20000,
20001,
20002,
30000,
50000,
50000,
})
require.NoError(t, err)
@ -144,7 +154,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
pk2ts, db, err := task.mergeDeltalogs(test.dBlobs, test.timetravel)
assert.NoError(t, err)
assert.Equal(t, 3, len(pk2ts))
assert.Equal(t, int64(2), db.size)
assert.Equal(t, int64(3), db.size)
assert.Equal(t, int64(3), db.delData.RowCount)
assert.ElementsMatch(t, []UniqueID{1, 4, 5}, db.delData.Pks)
assert.ElementsMatch(t, []Timestamp{30000, 50000, 50000}, db.delData.Tss)
} else {
@ -160,14 +173,17 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
t.Run("Multiple segments with timetravel", func(t *testing.T) {
tests := []struct {
segIDA UniqueID
dataA map[int64]int64
segIDA UniqueID
dataApk []UniqueID
dataAts []Timestamp
segIDB UniqueID
dataB map[int64]int64
segIDB UniqueID
dataBpk []UniqueID
dataBts []Timestamp
segIDC UniqueID
dataC map[int64]int64
segIDC UniqueID
dataCpk []UniqueID
dataCts []Timestamp
timetravel Timestamp
expectedpk2ts int
@ -175,30 +191,15 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
description string
}{
{
0, nil,
100, map[int64]int64{
1: 20000,
2: 30000,
3: 20005},
200, map[int64]int64{
4: 50000,
5: 50001,
6: 50002},
0, nil, nil,
100, []UniqueID{1, 2, 3}, []Timestamp{20000, 30000, 20005},
200, []UniqueID{4, 5, 6}, []Timestamp{50000, 50001, 50002},
40000, 3, 3, "2 segments with timetravel 40000",
},
{
300, map[int64]int64{
10: 20001,
20: 40001,
},
100, map[int64]int64{
1: 20000,
2: 30000,
3: 20005},
200, map[int64]int64{
4: 50000,
5: 50001,
6: 50002},
300, []UniqueID{10, 20}, []Timestamp{20001, 40001},
100, []UniqueID{1, 2, 3}, []Timestamp{20000, 30000, 20005},
200, []UniqueID{4, 5, 6}, []Timestamp{50000, 50001, 50002},
40000, 4, 4, "3 segments with timetravel 40000",
},
}
@ -207,17 +208,17 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
t.Run(test.description, func(t *testing.T) {
dBlobs := make(map[UniqueID][]*Blob)
if test.segIDA != UniqueID(0) {
d, err := getDeltaBlobs(test.segIDA, test.dataA)
d, err := getDeltaBlobs(test.segIDA, test.dataApk, test.dataAts)
require.NoError(t, err)
dBlobs[test.segIDA] = d
}
if test.segIDB != UniqueID(0) {
d, err := getDeltaBlobs(test.segIDB, test.dataB)
d, err := getDeltaBlobs(test.segIDB, test.dataBpk, test.dataBts)
require.NoError(t, err)
dBlobs[test.segIDB] = d
}
if test.segIDC != UniqueID(0) {
d, err := getDeltaBlobs(test.segIDC, test.dataC)
d, err := getDeltaBlobs(test.segIDC, test.dataCpk, test.dataCts)
require.NoError(t, err)
dBlobs[test.segIDC] = d
}
@ -258,8 +259,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
})
}
func getDeltaBlobs(segID UniqueID, pk2ts map[int64]int64) ([]*Blob, error) {
deltaData := &DeleteData{Data: pk2ts}
func getDeltaBlobs(segID UniqueID, pks []UniqueID, tss []Timestamp) ([]*Blob, error) {
deltaData := &DeleteData{
Pks: pks,
Tss: tss,
RowCount: int64(len(pks)),
}
dCodec := storage.NewDeleteCodec()
blob, err := dCodec.Serialize(1, 10, segID, deltaData)
@ -326,9 +331,11 @@ func TestCompactorInterfaceMethods(t *testing.T) {
iData := genInsertData()
meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name")
dData := &DeleteData{Data: map[int64]int64{
1: 20000,
}}
dData := &DeleteData{
Pks: []UniqueID{1},
Tss: []Timestamp{20000},
RowCount: 1,
}
cpaths, err := mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta)
require.NoError(t, err)
@ -407,13 +414,17 @@ func TestCompactorInterfaceMethods(t *testing.T) {
meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name")
iData1 := genInsertDataWithRowIDs([2]int64{1, 2})
dData1 := &DeleteData{Data: map[int64]int64{
1: 20000,
}}
dData1 := &DeleteData{
Pks: []UniqueID{1},
Tss: []Timestamp{20000},
RowCount: 1,
}
iData2 := genInsertDataWithRowIDs([2]int64{9, 10})
dData2 := &DeleteData{Data: map[int64]int64{
9: 30000,
}}
dData2 := &DeleteData{
Pks: []UniqueID{9},
Tss: []Timestamp{30000},
RowCount: 1,
}
cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, dData1, meta)
require.NoError(t, err)

View File

@ -72,12 +72,10 @@ func (ddb *DelDataBuf) updateTimeRange(tr TimeRange) {
func newDelDataBuf() *DelDataBuf {
return &DelDataBuf{
delData: &DeleteData{
Data: make(map[int64]int64),
},
size: 0,
tsFrom: math.MaxUint64,
tsTo: 0,
delData: &DeleteData{},
size: 0,
tsFrom: math.MaxUint64,
tsTo: 0,
}
}
@ -93,7 +91,7 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er
log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys))
segIDToPkMap := make(map[UniqueID][]int64)
segIDToTsMap := make(map[UniqueID][]int64)
segIDToTsMap := make(map[UniqueID][]uint64)
m := dn.filterSegmentByPK(msg.PartitionID, msg.PrimaryKeys)
for i, pk := range msg.PrimaryKeys {
@ -104,7 +102,7 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er
}
for _, segID := range segIDs {
segIDToPkMap[segID] = append(segIDToPkMap[segID], pk)
segIDToTsMap[segID] = append(segIDToTsMap[segID], int64(msg.Timestamps[i]))
segIDToTsMap[segID] = append(segIDToTsMap[segID], msg.Timestamps[i])
}
}
@ -125,8 +123,9 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er
delData := delDataBuf.delData
for i := 0; i < rows; i++ {
delData.Data[pks[i]] = tss[i]
log.Debug("delete", zap.Int64("primary key", pks[i]), zap.Int64("ts", tss[i]))
delData.Pks = append(delData.Pks, pks[i])
delData.Tss = append(delData.Tss, tss[i])
log.Debug("delete", zap.Int64("primary key", pks[i]), zap.Uint64("ts", tss[i]))
}
// store
@ -145,8 +144,9 @@ func (dn *deleteNode) showDelBuf() {
if v, ok := dn.delBuf.Load(segID); ok {
delDataBuf, _ := v.(*DelDataBuf)
log.Debug("del data buffer status", zap.Int64("segID", segID), zap.Int64("size", delDataBuf.size))
for pk, ts := range delDataBuf.delData.Data {
log.Debug("del data", zap.Int64("pk", pk), zap.Int64("ts", ts))
length := len(delDataBuf.delData.Pks)
for i := 0; i < length; i++ {
log.Debug("del data", zap.Int64("pk", delDataBuf.delData.Pks[i]), zap.Uint64("ts", delDataBuf.delData.Tss[i]))
}
} else {
log.Error("segment not exist", zap.Int64("segID", segID))

View File

@ -335,14 +335,7 @@ func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb
return err
}
rowCount := len(deltaData.Data)
pks := make([]int64, 0)
tss := make([]Timestamp, 0)
for pk, ts := range deltaData.Data {
pks = append(pks, pk)
tss = append(tss, Timestamp(ts))
}
err = segment.segmentLoadDeletedRecord(pks, tss, int64(rowCount))
err = segment.segmentLoadDeletedRecord(deltaData.Pks, deltaData.Tss, deltaData.RowCount)
if err != nil {
return err
}

View File

@ -605,7 +605,16 @@ func (insertCodec *InsertCodec) Close() error {
// DeleteData saves each entity delete message represented as <primarykey,timestamp> map.
// timestamp represents the time when this instance was deleted
type DeleteData struct {
Data map[int64]int64 // primary key to timestamp
Pks []int64 // primary keys
Tss []Timestamp // timestamps
RowCount int64
}
// Append append 1 pk&ts pair to DeleteData
func (data *DeleteData) Append(pk UniqueID, ts Timestamp) {
data.Pks = append(data.Pks, pk)
data.Tss = append(data.Tss, ts)
data.RowCount++
}
// DeleteCodec serializes and deserializes the delete data
@ -626,24 +635,31 @@ func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID Uni
if err != nil {
return nil, err
}
if len(data.Pks) != len(data.Tss) {
return nil, fmt.Errorf("The length of pks, and TimeStamps is not equal")
}
length := len(data.Pks)
sizeTotal := 0
startTs, endTs := math.MaxInt64, math.MinInt64
for key, value := range data.Data {
if value < int64(startTs) {
startTs = int(value)
var startTs, endTs Timestamp
startTs, endTs = math.MaxUint64, 0
for i := 0; i < length; i++ {
pk := data.Pks[i]
ts := data.Tss[i]
if ts < startTs {
startTs = ts
}
if value > int64(endTs) {
endTs = int(value)
if ts > endTs {
endTs = ts
}
err := eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d", key, value))
err := eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d", pk, ts))
if err != nil {
return nil, err
}
sizeTotal += binary.Size(key)
sizeTotal += binary.Size(value)
sizeTotal += binary.Size(pk)
sizeTotal += binary.Size(ts)
}
eventWriter.SetEventTimestamp(uint64(startTs), uint64(endTs))
binlogWriter.SetEventTimeStamp(uint64(startTs), uint64(endTs))
eventWriter.SetEventTimestamp(startTs, endTs)
binlogWriter.SetEventTimeStamp(startTs, endTs)
// https://github.com/milvus-io/milvus/issues/9620
// It's a little complicated to count the memory size of a map.
@ -676,7 +692,7 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
}
var pid, sid UniqueID
result := &DeleteData{Data: make(map[int64]int64)}
result := &DeleteData{}
for _, blob := range blobs {
binlogReader, err := NewBinlogReader(blob.Value)
if err != nil {
@ -710,17 +726,19 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return InvalidUniqueID, InvalidUniqueID, nil, err
}
ts, err := strconv.ParseInt(splits[1], 10, 64)
ts, err := strconv.ParseUint(splits[1], 10, 64)
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
}
result.Data[pk] = ts
result.Pks = append(result.Pks, pk)
result.Tss = append(result.Tss, ts)
}
deleteCodec.readerCloseFunc = append(deleteCodec.readerCloseFunc, readerClose(binlogReader))
}
result.RowCount = int64(len(result.Pks))
return pid, sid, result, nil
}

View File

@ -313,8 +313,12 @@ func TestInsertCodec(t *testing.T) {
func TestDeleteCodec(t *testing.T) {
deleteCodec := NewDeleteCodec()
deleteData := &DeleteData{
Data: map[int64]int64{1: 43757345, 2: 23578294723},
Pks: []int64{1},
Tss: []uint64{43757345},
RowCount: int64(1),
}
deleteData.Append(2, 23578294723)
blob, err := deleteCodec.Serialize(CollectionID, 1, 1, deleteData)
assert.Nil(t, err)