mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
fix: bulkinsert binlog didn't consider ts order when processing delta data (#29163)
#29162 Signed-off-by: wayblink <anyang.wang@zilliz.com>
This commit is contained in:
parent
58dbb7872a
commit
2274aa3b50
@ -305,14 +305,20 @@ func (p *BinlogAdapter) readDeltalogs(segmentHolder *SegmentFilesHolder) (map[in
|
||||
if primaryKey.GetDataType() == schemapb.DataType_Int64 {
|
||||
deletedIDDict := make(map[int64]uint64)
|
||||
for _, deleteLog := range deleteLogs {
|
||||
deletedIDDict[deleteLog.Pk.GetValue().(int64)] = deleteLog.Ts
|
||||
_, exist := deletedIDDict[deleteLog.Pk.GetValue().(int64)]
|
||||
if !exist || deleteLog.Ts > deletedIDDict[deleteLog.Pk.GetValue().(int64)] {
|
||||
deletedIDDict[deleteLog.Pk.GetValue().(int64)] = deleteLog.Ts
|
||||
}
|
||||
}
|
||||
log.Info("Binlog adapter: count of deleted entities", zap.Int("deletedCount", len(deletedIDDict)))
|
||||
return deletedIDDict, nil, nil
|
||||
} else if primaryKey.GetDataType() == schemapb.DataType_VarChar {
|
||||
deletedIDDict := make(map[string]uint64)
|
||||
for _, deleteLog := range deleteLogs {
|
||||
deletedIDDict[deleteLog.Pk.GetValue().(string)] = deleteLog.Ts
|
||||
_, exist := deletedIDDict[deleteLog.Pk.GetValue().(string)]
|
||||
if !exist || deleteLog.Ts > deletedIDDict[deleteLog.Pk.GetValue().(string)] {
|
||||
deletedIDDict[deleteLog.Pk.GetValue().(string)] = deleteLog.Ts
|
||||
}
|
||||
}
|
||||
log.Info("Binlog adapter: count of deleted entities", zap.Int("deletedCount", len(deletedIDDict)))
|
||||
return nil, deletedIDDict, nil
|
||||
@ -530,9 +536,10 @@ func (p *BinlogAdapter) getShardingListByPrimaryInt64(primaryKeys []int64,
|
||||
continue
|
||||
}
|
||||
|
||||
_, deleted := intDeletedList[key]
|
||||
deleteTs, deleted := intDeletedList[key]
|
||||
// if the key exists in intDeletedList, that means this entity has been deleted
|
||||
if deleted {
|
||||
// only skip entity when delete happen after insert
|
||||
if deleted && deleteTs > uint64(ts) {
|
||||
shardList = append(shardList, -1) // this entity has been deleted, set shardID = -1 and skip this entity
|
||||
actualDeleted++
|
||||
} else {
|
||||
@ -584,9 +591,10 @@ func (p *BinlogAdapter) getShardingListByPrimaryVarchar(primaryKeys []string,
|
||||
continue
|
||||
}
|
||||
|
||||
_, deleted := strDeletedList[key]
|
||||
deleteTs, deleted := strDeletedList[key]
|
||||
// if exists in strDeletedList, that means this entity has been deleted
|
||||
if deleted {
|
||||
// only skip entity when delete happen after insert
|
||||
if deleted && deleteTs > uint64(ts) {
|
||||
shardList = append(shardList, -1) // this entity has been deleted, set shardID = -1 and skip this entity
|
||||
actualDeleted++
|
||||
} else {
|
||||
|
@ -35,7 +35,7 @@ const (
|
||||
baseTimestamp = 43757345
|
||||
)
|
||||
|
||||
func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool) []byte {
|
||||
func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool, startTimestamp uint64) []byte {
|
||||
deleteData := &storage.DeleteData{
|
||||
Pks: make([]storage.PrimaryKey, 0),
|
||||
Tss: make([]storage.Timestamp, 0),
|
||||
@ -47,7 +47,7 @@ func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool) [
|
||||
assert.NotNil(t, deltaData)
|
||||
for i, id := range deltaData {
|
||||
deleteData.Pks = append(deleteData.Pks, storage.NewVarCharPrimaryKey(id))
|
||||
deleteData.Tss = append(deleteData.Tss, baseTimestamp+uint64(i))
|
||||
deleteData.Tss = append(deleteData.Tss, startTimestamp+uint64(i))
|
||||
deleteData.RowCount++
|
||||
}
|
||||
} else {
|
||||
@ -55,7 +55,7 @@ func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool) [
|
||||
assert.NotNil(t, deltaData)
|
||||
for i, id := range deltaData {
|
||||
deleteData.Pks = append(deleteData.Pks, storage.NewInt64PrimaryKey(id))
|
||||
deleteData.Tss = append(deleteData.Tss, baseTimestamp+uint64(i))
|
||||
deleteData.Tss = append(deleteData.Tss, startTimestamp+uint64(i))
|
||||
deleteData.RowCount++
|
||||
}
|
||||
}
|
||||
@ -171,7 +171,7 @@ func Test_BinlogAdapterReadDeltalog(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
deleteItems := []int64{1001, 1002, 1003}
|
||||
buf := createDeltalogBuf(t, deleteItems, false)
|
||||
buf := createDeltalogBuf(t, deleteItems, false, baseTimestamp)
|
||||
chunkManager := &MockChunkManager{
|
||||
readBuf: map[string][]byte{
|
||||
"dummy": buf,
|
||||
@ -212,7 +212,7 @@ func Test_BinlogAdapterDecodeDeleteLogs(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
deleteItems := []int64{1001, 1002, 1003, 1004, 1005}
|
||||
buf := createDeltalogBuf(t, deleteItems, false)
|
||||
buf := createDeltalogBuf(t, deleteItems, false, baseTimestamp)
|
||||
chunkManager := &MockChunkManager{
|
||||
readBuf: map[string][]byte{
|
||||
"dummy": buf,
|
||||
@ -244,7 +244,7 @@ func Test_BinlogAdapterDecodeDeleteLogs(t *testing.T) {
|
||||
|
||||
// wrong data type of delta log
|
||||
chunkManager.readBuf = map[string][]byte{
|
||||
"dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true),
|
||||
"dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true, baseTimestamp),
|
||||
}
|
||||
|
||||
adapter, err = NewBinlogAdapter(ctx, collectionInfo, 1024, 2048, chunkManager, flushFunc, 0, math.MaxUint64)
|
||||
@ -317,7 +317,7 @@ func Test_BinlogAdapterReadDeltalogs(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
deleteItems := []int64{1001, 1002, 1003, 1004, 1005}
|
||||
buf := createDeltalogBuf(t, deleteItems, false)
|
||||
buf := createDeltalogBuf(t, deleteItems, false, baseTimestamp)
|
||||
chunkManager := &MockChunkManager{
|
||||
readBuf: map[string][]byte{
|
||||
"dummy": buf,
|
||||
@ -374,7 +374,7 @@ func Test_BinlogAdapterReadDeltalogs(t *testing.T) {
|
||||
collectionInfo.resetSchema(schema)
|
||||
|
||||
chunkManager.readBuf = map[string][]byte{
|
||||
"dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true),
|
||||
"dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true, baseTimestamp),
|
||||
}
|
||||
|
||||
adapter, err = NewBinlogAdapter(ctx, collectionInfo, 1024, 2048, chunkManager, flushFunc, 0, math.MaxUint64)
|
||||
@ -462,7 +462,7 @@ func Test_BinlogAdapterReadTimestamp(t *testing.T) {
|
||||
|
||||
// succeed
|
||||
rowCount := 10
|
||||
fieldsData := createFieldsData(sampleSchema(), rowCount)
|
||||
fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp)
|
||||
chunkManager.readBuf["dummy"] = createBinlogBuf(t, schemapb.DataType_Int64, fieldsData[106].([]int64))
|
||||
ts, err = adapter.readTimestamp("dummy")
|
||||
assert.NoError(t, err)
|
||||
@ -502,7 +502,7 @@ func Test_BinlogAdapterReadPrimaryKeys(t *testing.T) {
|
||||
|
||||
// wrong primary key type
|
||||
rowCount := 10
|
||||
fieldsData := createFieldsData(sampleSchema(), rowCount)
|
||||
fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp)
|
||||
chunkManager.readBuf["dummy"] = createBinlogBuf(t, schemapb.DataType_Bool, fieldsData[102].([]bool))
|
||||
|
||||
adapter.collectionInfo.PrimaryKey.DataType = schemapb.DataType_Bool
|
||||
@ -545,7 +545,7 @@ func Test_BinlogAdapterShardListInt64(t *testing.T) {
|
||||
assert.NotNil(t, adapter)
|
||||
assert.NoError(t, err)
|
||||
|
||||
fieldsData := createFieldsData(sampleSchema(), 0)
|
||||
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
|
||||
shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{1})
|
||||
|
||||
// wrong input
|
||||
@ -587,7 +587,7 @@ func Test_BinlogAdapterShardListVarchar(t *testing.T) {
|
||||
assert.NotNil(t, adapter)
|
||||
assert.NoError(t, err)
|
||||
|
||||
fieldsData := createFieldsData(strKeySchema(), 0)
|
||||
fieldsData := createFieldsData(strKeySchema(), 0, baseTimestamp)
|
||||
shardsData := createShardsData(strKeySchema(), fieldsData, shardNum, []int64{1})
|
||||
// wrong input
|
||||
shardList, err := adapter.getShardingListByPrimaryVarchar([]string{"1"}, []int64{1, 2}, shardsData, map[string]uint64{})
|
||||
@ -615,6 +615,7 @@ func Test_BinlogAdapterShardListVarchar(t *testing.T) {
|
||||
|
||||
func Test_BinlogAdapterReadInt64PK(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
paramtable.Init()
|
||||
|
||||
chunkManager := &MockChunkManager{}
|
||||
|
||||
@ -677,7 +678,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) {
|
||||
|
||||
// prepare binlog data
|
||||
rowCount := 1000
|
||||
fieldsData := createFieldsData(sampleSchema(), rowCount)
|
||||
fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp)
|
||||
deletedItems := []int64{41, 51, 100, 400, 600}
|
||||
|
||||
chunkManager.readBuf = map[string][]byte{
|
||||
@ -693,7 +694,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) {
|
||||
"111_insertlog": createBinlogBuf(t, schemapb.DataType_FloatVector, fieldsData[111].([][]float32)),
|
||||
"112_insertlog": createBinlogBuf(t, schemapb.DataType_JSON, fieldsData[112].([][]byte)),
|
||||
"113_insertlog": createBinlogBuf(t, schemapb.DataType_Array, fieldsData[113].([]*schemapb.ScalarField)),
|
||||
"deltalog": createDeltalogBuf(t, deletedItems, false),
|
||||
"deltalog": createDeltalogBuf(t, deletedItems, false, baseTimestamp+300),
|
||||
}
|
||||
|
||||
// failed to read primary keys
|
||||
@ -708,15 +709,18 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) {
|
||||
// succeed flush
|
||||
chunkManager.readBuf["1_insertlog"] = createBinlogBuf(t, schemapb.DataType_Int64, fieldsData[1].([]int64))
|
||||
|
||||
adapter.tsEndPoint = baseTimestamp + uint64(499) // 4 entities deleted, 500 entities excluded
|
||||
// as we createDeltalogBuf with baseTimestamp+300. deletedata pk = {41, 51, 100, 400, 600} ts = {341, 351, 400, 700, 900}
|
||||
// ts = {341, 351, 400} < 499 will be deleted
|
||||
adapter.tsEndPoint = baseTimestamp + uint64(499) // 3 entities deleted, 500 entities excluded
|
||||
err = adapter.Read(holder)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, shardNum, int32(flushCounter))
|
||||
assert.Equal(t, rowCount-4-500, flushRowCount)
|
||||
assert.Equal(t, rowCount-3-500, flushRowCount)
|
||||
}
|
||||
|
||||
func Test_BinlogAdapterReadVarcharPK(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
paramtable.Init()
|
||||
|
||||
chunkManager := &MockChunkManager{}
|
||||
|
||||
@ -788,7 +792,7 @@ func Test_BinlogAdapterReadVarcharPK(t *testing.T) {
|
||||
"104_insertlog": createBinlogBuf(t, schemapb.DataType_VarChar, varcharData),
|
||||
"105_insertlog": createBinlogBuf(t, schemapb.DataType_Bool, boolData),
|
||||
"106_insertlog": createBinlogBuf(t, schemapb.DataType_FloatVector, floatVecData),
|
||||
"deltalog": createDeltalogBuf(t, deletedItems, true),
|
||||
"deltalog": createDeltalogBuf(t, deletedItems, true, baseTimestamp+300),
|
||||
}
|
||||
|
||||
// succeed
|
||||
@ -800,7 +804,7 @@ func Test_BinlogAdapterReadVarcharPK(t *testing.T) {
|
||||
assert.NotNil(t, adapter)
|
||||
assert.NoError(t, err)
|
||||
|
||||
adapter.tsEndPoint = baseTimestamp + uint64(499) // 3 entities deleted, 500 entities excluded, the "999" is excluded, so totally 502 entities skipped
|
||||
adapter.tsEndPoint = baseTimestamp + uint64(499) // 2 entities deleted, 500 entities excluded, the "999" is excluded, so totally 502 entities skipped
|
||||
err = adapter.Read(holder)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, shardNum, int32(flushCounter))
|
||||
@ -823,7 +827,7 @@ func Test_BinlogAdapterDispatch(t *testing.T) {
|
||||
|
||||
// prepare empty in-memory segments data
|
||||
partitionID := int64(1)
|
||||
fieldsData := createFieldsData(sampleSchema(), 0)
|
||||
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
|
||||
shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{partitionID})
|
||||
|
||||
shardList := []int32{0, -1, 1}
|
||||
@ -1146,7 +1150,7 @@ func Test_BinlogAdapterVerifyField(t *testing.T) {
|
||||
|
||||
shardNum := int32(2)
|
||||
partitionID := int64(1)
|
||||
fieldsData := createFieldsData(sampleSchema(), 0)
|
||||
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
|
||||
shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{partitionID})
|
||||
|
||||
flushFunc := func(fields BlockData, shardID int, partID int64) error {
|
||||
@ -1173,7 +1177,7 @@ func Test_BinlogAdapterReadInsertlog(t *testing.T) {
|
||||
|
||||
shardNum := int32(2)
|
||||
partitionID := int64(1)
|
||||
fieldsData := createFieldsData(sampleSchema(), 0)
|
||||
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
|
||||
shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{partitionID})
|
||||
|
||||
flushFunc := func(fields BlockData, shardID int, partID int64) error {
|
||||
@ -1205,7 +1209,7 @@ func Test_BinlogAdapterReadInsertlog(t *testing.T) {
|
||||
|
||||
// prepare binlog data
|
||||
rowCount := 3
|
||||
fieldsData = createFieldsData(sampleSchema(), rowCount)
|
||||
fieldsData = createFieldsData(sampleSchema(), rowCount, baseTimestamp)
|
||||
|
||||
failedFunc := func(fieldID int64, fieldName string, fieldType schemapb.DataType, wrongField int64, wrongType schemapb.DataType) {
|
||||
// row count mismatch
|
||||
|
@ -331,7 +331,7 @@ func Test_BinlogFileBool(t *testing.T) {
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong log type
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -388,7 +388,7 @@ func Test_BinlogFileInt8(t *testing.T) {
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong log type
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -446,7 +446,7 @@ func Test_BinlogFileInt16(t *testing.T) {
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong log type
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -503,7 +503,7 @@ func Test_BinlogFileInt32(t *testing.T) {
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong log type
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -560,7 +560,7 @@ func Test_BinlogFileInt64(t *testing.T) {
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong log type
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -617,7 +617,7 @@ func Test_BinlogFileFloat(t *testing.T) {
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong log type
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -674,7 +674,7 @@ func Test_BinlogFileDouble(t *testing.T) {
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong log type
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -778,7 +778,7 @@ func Test_BinlogFileJSON(t *testing.T) {
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong log type
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -858,7 +858,7 @@ func Test_BinlogFileArray(t *testing.T) {
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong log type
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -937,7 +937,7 @@ func Test_BinlogFileBinaryVector(t *testing.T) {
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong log type
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -1004,7 +1004,7 @@ func Test_BinlogFileFloatVector(t *testing.T) {
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong log type
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -1072,7 +1072,7 @@ func Test_BinlogFileFloat16Vector(t *testing.T) {
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong log type
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.NoError(t, err)
|
||||
|
||||
|
@ -321,7 +321,7 @@ func Test_BinlogParserParse(t *testing.T) {
|
||||
|
||||
// progress
|
||||
rowCount := 100
|
||||
fieldsData := createFieldsData(sampleSchema(), rowCount)
|
||||
fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp)
|
||||
chunkManager.listResult["deltaPath"] = []string{}
|
||||
chunkManager.listResult["insertPath"] = []string{
|
||||
"123/0/a",
|
||||
|
@ -240,7 +240,7 @@ func jsonNumber(value string) json.Number {
|
||||
return json.Number(value)
|
||||
}
|
||||
|
||||
func createFieldsData(collectionSchema *schemapb.CollectionSchema, rowCount int) map[storage.FieldID]interface{} {
|
||||
func createFieldsData(collectionSchema *schemapb.CollectionSchema, rowCount int, startTimestamp int64) map[storage.FieldID]interface{} {
|
||||
fieldsData := make(map[storage.FieldID]interface{})
|
||||
|
||||
// internal fields
|
||||
@ -248,7 +248,7 @@ func createFieldsData(collectionSchema *schemapb.CollectionSchema, rowCount int)
|
||||
timestampData := make([]int64, 0)
|
||||
for i := 0; i < rowCount; i++ {
|
||||
rowIDData = append(rowIDData, int64(i))
|
||||
timestampData = append(timestampData, baseTimestamp+int64(i))
|
||||
timestampData = append(timestampData, startTimestamp+int64(i))
|
||||
}
|
||||
fieldsData[0] = rowIDData
|
||||
fieldsData[1] = timestampData
|
||||
@ -1083,7 +1083,7 @@ func Test_TryFlushBlocks(t *testing.T) {
|
||||
|
||||
// prepare flush data, 3 shards, each shard 10 rows
|
||||
rowCount := 10
|
||||
fieldsData := createFieldsData(schema, rowCount)
|
||||
fieldsData := createFieldsData(schema, rowCount, baseTimestamp)
|
||||
shardsData := createShardsData(schema, fieldsData, shardNum, []int64{partitionID})
|
||||
|
||||
t.Run("non-force flush", func(t *testing.T) {
|
||||
|
@ -991,7 +991,7 @@ func Test_ImportWrapperFlushFunc(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
fieldsData := createFieldsData(schema, 5)
|
||||
fieldsData := createFieldsData(schema, 5, baseTimestamp)
|
||||
blockData := createBlockData(schema, fieldsData)
|
||||
t.Run("fieldsData is not empty", func(t *testing.T) {
|
||||
err = wrapper.flushFunc(blockData, shardID, partitionID)
|
||||
|
@ -207,7 +207,7 @@ func Test_JSONRowConsumerHandleIntPK(t *testing.T) {
|
||||
consumer.rowIDAllocator = newIDAllocator(ctx, t, errors.New("error"))
|
||||
|
||||
waitFlushRowCount := 10
|
||||
fieldsData := createFieldsData(schema, waitFlushRowCount)
|
||||
fieldsData := createFieldsData(schema, waitFlushRowCount, baseTimestamp)
|
||||
consumer.shardsData = createShardsData(schema, fieldsData, shardNum, []int64{partitionID})
|
||||
|
||||
// nil input will trigger force flush, flushErrFunc returns error
|
||||
|
@ -824,7 +824,7 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) {
|
||||
}
|
||||
|
||||
t.Run("shards number mismatch", func(t *testing.T) {
|
||||
fieldsData := createFieldsData(sampleSchema(), 0)
|
||||
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
|
||||
shards := createShardsData(sampleSchema(), fieldsData, 1, []int64{1})
|
||||
segmentData := genFieldsDataFunc()
|
||||
parser.autoIDRange, err = splitFieldsData(parser.collectionInfo, segmentData, shards, parser.rowIDAllocator)
|
||||
@ -861,7 +861,7 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) {
|
||||
}
|
||||
parser.collectionInfo.resetSchema(schema)
|
||||
parser.collectionInfo.ShardNum = 2
|
||||
fieldsData := createFieldsData(schema, 0)
|
||||
fieldsData := createFieldsData(schema, 0, baseTimestamp)
|
||||
shards := createShardsData(schema, fieldsData, 2, []int64{1})
|
||||
parser.autoIDRange, err = splitFieldsData(parser.collectionInfo, segmentData, shards, parser.rowIDAllocator)
|
||||
assert.Error(t, err)
|
||||
@ -871,7 +871,7 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
parser.rowIDAllocator = newIDAllocator(ctx, t, errors.New("dummy error"))
|
||||
parser.collectionInfo.resetSchema(sampleSchema())
|
||||
fieldsData := createFieldsData(sampleSchema(), 0)
|
||||
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
|
||||
shards := createShardsData(sampleSchema(), fieldsData, 2, []int64{1})
|
||||
segmentData := genFieldsDataFunc()
|
||||
parser.autoIDRange, err = splitFieldsData(parser.collectionInfo, segmentData, shards, parser.rowIDAllocator)
|
||||
@ -885,7 +885,7 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) {
|
||||
schema.AutoID = true
|
||||
|
||||
partitionID := int64(1)
|
||||
fieldsData := createFieldsData(sampleSchema(), 0)
|
||||
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
|
||||
shards := createShardsData(sampleSchema(), fieldsData, 2, []int64{partitionID})
|
||||
segmentData := genFieldsDataFunc()
|
||||
parser.autoIDRange, err = splitFieldsData(parser.collectionInfo, segmentData, shards, parser.rowIDAllocator)
|
||||
@ -929,7 +929,7 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) {
|
||||
},
|
||||
}
|
||||
parser.collectionInfo.resetSchema(schema)
|
||||
fieldsData := createFieldsData(schema, 0)
|
||||
fieldsData := createFieldsData(schema, 0, baseTimestamp)
|
||||
shards := createShardsData(schema, fieldsData, 2, []int64{1})
|
||||
segmentData := make(BlockData)
|
||||
segmentData[101] = &storage.Int64FieldData{
|
||||
@ -1198,7 +1198,7 @@ func Test_NumpyParserHashToPartition(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, parser)
|
||||
|
||||
fieldsData := createFieldsData(schema, 5)
|
||||
fieldsData := createFieldsData(schema, 5, baseTimestamp)
|
||||
blockData := createBlockData(schema, fieldsData)
|
||||
|
||||
// no partition key, partition ID list greater than 1, return error
|
||||
|
Loading…
Reference in New Issue
Block a user