From 558feed5ed1367c0682d4e92f0b2e561694a2b5f Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Tue, 16 Apr 2024 14:51:20 +0800 Subject: [PATCH] fix: Use pk from binlog during import (#32118) During binlog import, even if the primary key's autoID is set to true, the primary key from the binlog should be used instead of being reassigned. issue: https://github.com/milvus-io/milvus/discussions/31943, https://github.com/milvus-io/milvus/issues/28521 --------- Signed-off-by: bigsheeper --- internal/datanode/importv2/task.go | 11 +++++++++++ internal/datanode/importv2/util.go | 9 +++++++++ internal/datanode/importv2/util_test.go | 24 ++++++++++++++++++++++++ tests/integration/import/binlog_test.go | 17 ++++++++++++++--- 4 files changed, 58 insertions(+), 3 deletions(-) diff --git a/internal/datanode/importv2/task.go b/internal/datanode/importv2/task.go index f0b01ac15a..a13f421f55 100644 --- a/internal/datanode/importv2/task.go +++ b/internal/datanode/importv2/task.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -161,6 +162,11 @@ func NewPreImportTask(req *datapb.PreImportRequest) Task { } }) ctx, cancel := context.WithCancel(context.Background()) + // During binlog import, even if the primary key's autoID is set to true, + // the primary key from the binlog should be used instead of being reassigned. + if importutilv2.IsBackup(req.GetOptions()) { + UnsetAutoID(req.GetSchema()) + } return &PreImportTask{ PreImportTask: &datapb.PreImportTask{ JobID: req.GetJobID(), @@ -230,6 +236,11 @@ type ImportTask struct { func NewImportTask(req *datapb.ImportRequest) Task { ctx, cancel := context.WithCancel(context.Background()) + // During binlog import, even if the primary key's autoID is set to true, + // the primary key from the binlog should be used instead of being reassigned. + if importutilv2.IsBackup(req.GetOptions()) { + UnsetAutoID(req.GetSchema()) + } task := &ImportTask{ ImportTaskV2: &datapb.ImportTaskV2{ JobID: req.GetJobID(), diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index b9b99b7777..aace88a104 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -217,3 +217,12 @@ func LogStats(manager TaskManager) { tasks = manager.GetBy(WithType(ImportTaskType)) logFunc(tasks, ImportTaskType) } + +func UnsetAutoID(schema *schemapb.CollectionSchema) { + for _, field := range schema.GetFields() { + if field.GetIsPrimaryKey() && field.GetAutoID() { + field.AutoID = false + return + } + } +} diff --git a/internal/datanode/importv2/util_test.go b/internal/datanode/importv2/util_test.go index ab5cd9b2ca..d96926c0a6 100644 --- a/internal/datanode/importv2/util_test.go +++ b/internal/datanode/importv2/util_test.go @@ -89,3 +89,27 @@ func Test_AppendSystemFieldsData(t *testing.T) { assert.Equal(t, count, insertData.Data[common.RowIDField].RowNum()) assert.Equal(t, count, insertData.Data[common.TimeStampField].RowNum()) } + +func Test_UnsetAutoID(t *testing.T) { + pkField := &schemapb.FieldSchema{ + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + AutoID: true, + } + vecField := &schemapb.FieldSchema{ + FieldID: 101, + Name: "vec", + DataType: schemapb.DataType_FloatVector, + } + + schema := &schemapb.CollectionSchema{} + schema.Fields = []*schemapb.FieldSchema{pkField, vecField} + UnsetAutoID(schema) + for _, field := range schema.GetFields() { + if field.GetIsPrimaryKey() { + assert.False(t, schema.GetFields()[0].GetAutoID()) + } + } +} diff --git a/tests/integration/import/binlog_test.go b/tests/integration/import/binlog_test.go index ae8cfae10d..799c7c70e2 100644 --- a/tests/integration/import/binlog_test.go +++ b/tests/integration/import/binlog_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -37,7 +38,7 @@ import ( "github.com/milvus-io/milvus/tests/integration" ) -func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64) { +func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64, *schemapb.IDs) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() c := s.Cluster @@ -86,6 +87,7 @@ func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64) { }) s.NoError(err) s.Equal(int32(0), insertResult.GetStatus().GetCode()) + insertedIDs := insertResult.GetIDs() // flush flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ @@ -148,7 +150,7 @@ func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64) { // get collectionID and partitionID collectionID := showCollectionsResp.GetCollectionIds()[0] partitionID := showPartitionsResp.GetPartitionIDs()[0] - return collectionID, partitionID + return collectionID, partitionID, insertedIDs } func (s *BulkInsertSuite) TestBinlogImport() { @@ -157,7 +159,7 @@ func (s *BulkInsertSuite) TestBinlogImport() { endTs = "548373346338803234" ) - collectionID, partitionID := s.PrepareCollectionA() + collectionID, partitionID, insertedIDs := s.PrepareCollectionA() c := s.Cluster ctx, cancel := context.WithTimeout(c.GetContext(), 60*time.Second) @@ -252,4 +254,13 @@ func (s *BulkInsertSuite) TestBinlogImport() { err = merr.CheckRPCCall(searchResult, err) s.NoError(err) s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) + // check ids from collectionA, because during binlog import, even if the primary key's autoID is set to true, + // the primary key from the binlog should be used instead of being reassigned. + insertedIDsMap := lo.SliceToMap(insertedIDs.GetIntId().GetData(), func(id int64) (int64, struct{}) { + return id, struct{}{} + }) + for _, id := range searchResult.GetResults().GetIds().GetIntId().GetData() { + _, ok := insertedIDsMap[id] + s.True(ok) + } }