Fix passing empty binlog content in compaction (#15909) (#15927)

Signed-off-by: Letian Jiang <letian.jiang@zilliz.com>
This commit is contained in:
Letian Jiang 2022-03-08 16:34:01 +08:00 committed by GitHub
parent 53db60bb4a
commit 74f66dce3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 14 deletions

View File

@ -195,9 +195,9 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
mergeStart := time.Now()
var (
dim int // dimension of vector field
num int // numOfRows in each binlog
n int // binlog number
dim int // dimension of float/binary vector field
maxRowsPerBinlog int // maximum rows populating one binlog
numBinlogs int // binlog number
expired int64 // the number of expired entity
err error
@ -261,10 +261,13 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
// calculate numRows from rowID field, fieldID 0
numRows := int64(len(fID2Content[0]))
num = int(Params.DataNodeCfg.FlushInsertBufferSize / (int64(dim) * 4))
n = int(numRows)/num + 1
maxRowsPerBinlog = int(Params.DataNodeCfg.FlushInsertBufferSize / (int64(dim) * 4))
numBinlogs = int(numRows) / maxRowsPerBinlog
if int(numRows)%maxRowsPerBinlog != 0 {
numBinlogs++
}
for i := 0; i < n; i++ {
for i := 0; i < numBinlogs; i++ {
iDatas = append(iDatas, &InsertData{Data: make(map[storage.FieldID]storage.FieldData)})
}
@ -275,13 +278,13 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
return nil, 0, errors.New("Unexpected error")
}
for i := 0; i < n; i++ {
for i := 0; i < numBinlogs; i++ {
var c []interface{}
if i == n-1 {
c = content[i*num:]
if i == numBinlogs-1 {
c = content[i*maxRowsPerBinlog:]
} else {
c = content[i*num : i*num+num]
c = content[i*maxRowsPerBinlog : i*maxRowsPerBinlog+maxRowsPerBinlog]
}
fData, err := interface2FieldData(tp, c, int64(len(c)))

View File

@ -260,6 +260,34 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
assert.Equal(t, 1, len(idata))
assert.NotEmpty(t, idata[0].Data)
})
t.Run("Merge without expiration2", func(t *testing.T) {
Params.DataCoordCfg.CompactionEntityExpiration = math.MaxInt64
flushInsertBufferSize := Params.DataNodeCfg.FlushInsertBufferSize
defer func() {
Params.DataNodeCfg.FlushInsertBufferSize = flushInsertBufferSize
}()
Params.DataNodeCfg.FlushInsertBufferSize = 128
iData := genInsertDataWithExpiredTS()
meta := NewMetaFactory().GetCollectionMeta(1, "test")
iblobs, err := getInsertBlobs(100, iData, meta)
require.NoError(t, err)
iitr, err := storage.NewInsertBinlogIterator(iblobs, 106)
require.NoError(t, err)
mitr := storage.NewMergeIterator([]iterator{iitr})
dm := map[UniqueID]Timestamp{}
ct := &compactionTask{}
idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime())
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
assert.Equal(t, 2, len(idata))
assert.NotEmpty(t, idata[0].Data)
})
t.Run("Merge with expiration", func(t *testing.T) {
Params.DataCoordCfg.CompactionEntityExpiration = 864000 // 10 days in seconds
iData := genInsertDataWithExpiredTS()
@ -281,8 +309,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), genTimestamp())
assert.NoError(t, err)
assert.Equal(t, int64(0), numOfRow)
assert.Equal(t, 1, len(idata))
assert.Empty(t, idata[0].Data)
assert.Equal(t, 0, len(idata))
})
})