From cf340f518f8c2fd73890452ad784dbff4c160886 Mon Sep 17 00:00:00 2001 From: smellthemoon <64083300+smellthemoon@users.noreply.github.com> Date: Mon, 17 Apr 2023 13:14:29 +0800 Subject: [PATCH] Add upsert integration test (#23381) Signed-off-by: lixinguo Co-authored-by: lixinguo --- tests/integration/hello_milvus_test.go | 27 +++ tests/integration/minicluster.go | 6 +- tests/integration/upsert_test.go | 246 +++++++++++++++++++++++++ 3 files changed, 277 insertions(+), 2 deletions(-) create mode 100644 tests/integration/upsert_test.go diff --git a/tests/integration/hello_milvus_test.go b/tests/integration/hello_milvus_test.go index a5f6a6d0ad..21c498a549 100644 --- a/tests/integration/hello_milvus_test.go +++ b/tests/integration/hello_milvus_test.go @@ -229,6 +229,9 @@ func TestHelloMilvus(t *testing.T) { params := make(map[string]int) params["nprobe"] = nprobe + params := make(map[string]int) + params["nprobe"] = nprobe + searchReq := constructSearchRequest("", collectionName, expr, floatVecField, distance.L2, params, nq, dim, topk, roundDecimal) @@ -354,6 +357,22 @@ func newFloatVectorFieldData(fieldName string, numRows, dim int) *schemapb.Field } } +func newInt64PrimaryKey(fieldName string, numRows int) *schemapb.FieldData { + return &schemapb.FieldData{ + Type: schemapb.DataType_Int64, + FieldName: fieldName, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: generateInt64Array(numRows), + }, + }, + }, + }, + } +} + func generateFloatVectors(numRows, dim int) []float32 { total := numRows * dim ret := make([]float32, 0, total) @@ -363,6 +382,14 @@ func generateFloatVectors(numRows, dim int) []float32 { return ret } +func generateInt64Array(numRows int) []int64 { + ret := make([]int64, 0, numRows) + for i := 0; i < numRows; i++ { + ret = append(ret, int64(rand.Int())) + } + return ret +} + func generateHashKeys(numRows int) []uint32 { ret := make([]uint32, 0, numRows) for i := 0; i < numRows; i++ { diff --git a/tests/integration/minicluster.go b/tests/integration/minicluster.go index 93254f989e..8c7f9c4e64 100644 --- a/tests/integration/minicluster.go +++ b/tests/integration/minicluster.go @@ -437,14 +437,16 @@ func (cluster *MiniCluster) Stop() error { dataNode.Stop() } log.Info("mini cluster datanodes stopped") + for _, queryNode := range cluster.queryNodes { queryNode.Stop() } - log.Info("mini cluster indexnodes stopped") + log.Info("mini cluster querynodes stopped") + for _, indexNode := range cluster.indexNodes { indexNode.Stop() } - log.Info("mini cluster querynodes stopped") + log.Info("mini cluster indexnodes stopped") cluster.etcdCli.KV.Delete(cluster.ctx, Params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix()) defer cluster.etcdCli.Close() diff --git a/tests/integration/upsert_test.go b/tests/integration/upsert_test.go new file mode 100644 index 0000000000..9519cd74fc --- /dev/null +++ b/tests/integration/upsert_test.go @@ -0,0 +1,246 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + "github.com/cockroachdb/errors" + "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus-proto/go-api/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/distance" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestUpsert(t *testing.T) { + ctx := context.Background() + c, err := StartMiniCluster(ctx) + assert.NoError(t, err) + err = c.Start() + assert.NoError(t, err) + defer c.Stop() + assert.NoError(t, err) + + prefix := "TestUpsert" + dbName := "" + collectionName := prefix + funcutil.GenRandomStr() + int64Field := "int64" + floatVecField := "fvec" + dim := 128 + rowNum := 3000 + + constructCollectionSchema := func() *schemapb.CollectionSchema { + pk := &schemapb.FieldSchema{ + FieldID: 0, + Name: int64Field, + IsPrimaryKey: true, + Description: "", + DataType: schemapb.DataType_Int64, + TypeParams: nil, + IndexParams: nil, + AutoID: false, + } + fVec := &schemapb.FieldSchema{ + FieldID: 0, + Name: floatVecField, + IsPrimaryKey: false, + Description: "", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: strconv.Itoa(dim), + }, + }, + IndexParams: nil, + AutoID: false, + } + return &schemapb.CollectionSchema{ + Name: collectionName, + Description: "", + AutoID: false, + Fields: []*schemapb.FieldSchema{ + pk, + fVec, + }, + } + } + schema := constructCollectionSchema() + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: 2, + }) + assert.NoError(t, err) + + err = merr.Error(createCollectionStatus) + if err != nil { + log.Warn("createCollectionStatus fail reason", zap.Error(err)) + } + + log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) + showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + assert.NoError(t, err) + assert.True(t, merr.Ok(showCollectionsResp.GetStatus())) + log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) + + pkFieldData := newInt64PrimaryKey(int64Field, rowNum) + fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) + hashKeys := generateHashKeys(rowNum) + upsertResult, err := c.proxy.Upsert(ctx, &milvuspb.UpsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) + assert.NoError(t, err) + assert.True(t, merr.Ok(upsertResult.GetStatus())) + + // flush + flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + assert.NoError(t, err) + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + ids := segmentIDs.GetData() + assert.NotEmpty(t, segmentIDs) + + segments, err := c.metaWatcher.ShowSegments() + assert.NoError(t, err) + assert.NotEmpty(t, segments) + for _, segment := range segments { + log.Info("ShowSegments result", zap.String("segment", segment.String())) + } + + if has && len(ids) > 0 { + flushed := func() bool { + resp, err := c.proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{ + SegmentIDs: ids, + }) + if err != nil { + //panic(errors.New("GetFlushState failed")) + return false + } + return resp.GetFlushed() + } + for !flushed() { + // respect context deadline/cancel + select { + case <-ctx.Done(): + panic(errors.New("deadline exceeded")) + default: + } + time.Sleep(500 * time.Millisecond) + } + } + + // create index + createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: floatVecField, + IndexName: "_default", + ExtraParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: strconv.Itoa(dim), + }, + { + Key: common.MetricTypeKey, + Value: distance.L2, + }, + { + Key: "index_type", + Value: "IVF_FLAT", + }, + { + Key: "nlist", + Value: strconv.Itoa(10), + }, + }, + }) + assert.NoError(t, err) + err = merr.Error(createIndexStatus) + if err != nil { + log.Warn("createIndexStatus fail reason", zap.Error(err)) + } + + // load + loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + assert.NoError(t, err) + err = merr.Error(loadStatus) + if err != nil { + log.Warn("LoadCollection fail reason", zap.Error(err)) + } + for { + loadProgress, err := c.proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{ + CollectionName: collectionName, + }) + if err != nil { + panic("GetLoadingProgress fail") + } + if loadProgress.GetProgress() == 100 { + break + } + time.Sleep(500 * time.Millisecond) + } + // search + expr := fmt.Sprintf("%s > 0", "int64") + nq := 10 + topk := 10 + roundDecimal := -1 + nprobe := 10 + + params := make(map[string]int) + params["nprobe"] = nprobe + + searchReq := constructSearchRequest("", collectionName, expr, + floatVecField, distance.IP, params, nq, dim, topk, roundDecimal) + + searchResult, err := c.proxy.Search(ctx, searchReq) + + err = merr.Error(searchResult.GetStatus()) + if err != nil { + log.Warn("searchResult fail reason", zap.Error(err)) + } + assert.NoError(t, err) + + log.Info("==================") + log.Info("==================") + log.Info("TestUpsert succeed") + log.Info("==================") + log.Info("==================") +}