From bc131a911c364572ed4d2d4ce6a4c789ae855e84 Mon Sep 17 00:00:00 2001 From: ThreadDao Date: Fri, 22 Nov 2024 10:54:32 +0800 Subject: [PATCH] test: add cases for gosdk v2 load release (#37831) issue: #33419 Signed-off-by: ThreadDao --- tests/go_client/base/milvus_client.go | 12 + .../go_client/testcases/load_release_test.go | 341 ++++++++++++++++++ 2 files changed, 353 insertions(+) create mode 100644 tests/go_client/testcases/load_release_test.go diff --git a/tests/go_client/base/milvus_client.go b/tests/go_client/base/milvus_client.go index fe1d2898a9..49f5bf2456 100644 --- a/tests/go_client/base/milvus_client.go +++ b/tests/go_client/base/milvus_client.go @@ -163,6 +163,12 @@ func (mc *MilvusClient) LoadPartitions(ctx context.Context, option client.LoadPa return loadTask, err } +// ReleasePartitions Release Partitions from memory +func (mc *MilvusClient) ReleasePartitions(ctx context.Context, option client.ReleasePartitionsOption, callOptions ...grpc.CallOption) error { + err := mc.mClient.ReleasePartitions(ctx, option, callOptions...) + return err +} + // -- index -- // CreateIndex Create Index @@ -226,6 +232,12 @@ func (mc *MilvusClient) LoadCollection(ctx context.Context, option client.LoadCo return loadTask, err } +// ReleaseCollection Release Collection +func (mc *MilvusClient) ReleaseCollection(ctx context.Context, option client.ReleaseCollectionOption, callOptions ...grpc.CallOption) error { + err := mc.mClient.ReleaseCollection(ctx, option, callOptions...) + return err +} + // Search search from collection func (mc *MilvusClient) Search(ctx context.Context, option client.SearchOption, callOptions ...grpc.CallOption) ([]client.ResultSet, error) { resultSets, err := mc.mClient.Search(ctx, option, callOptions...) diff --git a/tests/go_client/testcases/load_release_test.go b/tests/go_client/testcases/load_release_test.go new file mode 100644 index 0000000000..cdba9cf32e --- /dev/null +++ b/tests/go_client/testcases/load_release_test.go @@ -0,0 +1,341 @@ +package testcases + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/client/v2/entity" + "github.com/milvus-io/milvus/client/v2/index" + clientv2 "github.com/milvus-io/milvus/client/v2/milvusclient" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/tests/go_client/common" + hp "github.com/milvus-io/milvus/tests/go_client/testcases/helper" +) + +// test load collection +func TestLoadCollection(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := createDefaultMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption()) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption()) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + + // load + loadTask, err := mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName)) + common.CheckErr(t, err, true) + err = loadTask.Await(ctx) + common.CheckErr(t, err, true) + + // describe collection + coll, err := mc.DescribeCollection(ctx, clientv2.NewDescribeCollectionOption(schema.CollectionName)) + common.CheckErr(t, err, true) + + t.Log("https://github.com/milvus-io/milvus/issues/34149") + log.Debug("collection", zap.Bool("loaded", coll.Loaded)) + + res, err := mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithConsistencyLevel(entity.ClStrong).WithOutputFields(common.QueryCountFieldName)) + common.CheckErr(t, err, true) + count, _ := res.GetColumn(common.QueryCountFieldName).GetAsInt64(0) + require.EqualValues(t, common.DefaultNb, count) +} + +// test load not existed collection +func TestLoadCollectionNotExist(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + // connect + mc := createDefaultMilvusClient(ctx, t) + + // Load collection + _, errLoad := mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption("collName")) + common.CheckErr(t, errLoad, false, "collection not found[database=default][collection=collName]") +} + +// test load collection async +func TestLoadCollectionAsync(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := createDefaultMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption()) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption()) + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + + // load + _, err := mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName)) + common.CheckErr(t, err, true) + + // query + res, err := mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(common.QueryCountFieldName)) + common.CheckErr(t, err, true) + count, _ := res.GetColumn(common.QueryCountFieldName).GetAsInt64(0) + require.LessOrEqual(t, count, int64(common.DefaultNb*2)) +} + +// load collection without index +func TestLoadCollectionWithoutIndex(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := createDefaultMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption()) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption()) + + // load + _, errLoad := mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName)) + common.CheckErr(t, errLoad, false, "index not found") + + // load partitions without index + _, errLoadPartition := mc.LoadPartitions(ctx, clientv2.NewLoadPartitionsOption(schema.CollectionName, common.DefaultPartition)) + common.CheckErr(t, errLoadPartition, false, "index not found") +} + +// load collection with multi partitions +func TestLoadCollectionMultiPartitions(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := createDefaultMilvusClient(ctx, t) + + parName := common.GenRandomString("p", 4) + + // create collection and partition + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption()) + err := mc.CreatePartition(ctx, clientv2.NewCreatePartitionOption(schema.CollectionName, parName)) + common.CheckErr(t, err, true) + + // insert [0, nb) into default partition, [nb, nb*2) into new partition + _defVec := hp.GenColumnData(common.DefaultNb, entity.FieldTypeFloatVector, *hp.TNewDataOption()) + _defPk := hp.GenColumnData(common.DefaultNb, entity.FieldTypeInt64, *hp.TNewDataOption()) + insertRes1, err1 := mc.Insert(ctx, clientv2.NewColumnBasedInsertOption(schema.CollectionName).WithColumns(_defPk, _defVec)) + common.CheckErr(t, err1, true) + require.EqualValues(t, common.DefaultNb, insertRes1.InsertCount) + + _parPk := hp.GenColumnData(common.DefaultNb, entity.FieldTypeInt64, *hp.TNewDataOption().TWithStart(common.DefaultNb)) + _parVec := hp.GenColumnData(common.DefaultNb, entity.FieldTypeFloatVector, *hp.TNewDataOption().TWithStart(common.DefaultNb)) + insertRes2, err2 := mc.Insert(ctx, clientv2.NewColumnBasedInsertOption(schema.CollectionName).WithColumns(_parPk, _parVec).WithPartition(parName)) + common.CheckErr(t, err2, true) + require.EqualValues(t, common.DefaultNb, insertRes2.InsertCount) + + // create index + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + + // load default partition + taskDef, errDef := mc.LoadPartitions(ctx, clientv2.NewLoadPartitionsOption(schema.CollectionName, common.DefaultPartition)) + common.CheckErr(t, errDef, true) + err = taskDef.Await(ctx) + common.CheckErr(t, err, true) + + // query from parName -> error + _, err = mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(common.QueryCountFieldName).WithPartitions(parName)) + log.Debug("error", zap.Error(err)) + common.CheckErr(t, err, false, "partition not loaded") + + // query count(*) from default partition + res, err := mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(common.QueryCountFieldName).WithPartitions(common.DefaultPartition).WithConsistencyLevel(entity.ClStrong)) + common.CheckErr(t, err, true) + count, _ := res.GetColumn(common.QueryCountFieldName).GetAsInt64(0) + require.Equal(t, int64(common.DefaultNb), count) + + // load parName partitions + taskPar, errPar := mc.LoadPartitions(ctx, clientv2.NewLoadPartitionsOption(schema.CollectionName, parName)) + common.CheckErr(t, errPar, true) + err = taskPar.Await(ctx) + common.CheckErr(t, err, true) + + // query count(*) from all partitions + res, err = mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(common.QueryCountFieldName).WithPartitions(parName, common.DefaultPartition).WithConsistencyLevel(entity.ClStrong)) + common.CheckErr(t, err, true) + count, _ = res.GetColumn(common.QueryCountFieldName).GetAsInt64(0) + require.Equal(t, int64(common.DefaultNb*2), count) + + // release collection -> load all partitions -> query count(*) all partitions 6000 + mc.ReleaseCollection(ctx, clientv2.NewReleaseCollectionOption(schema.CollectionName)) + mc.LoadPartitions(ctx, clientv2.NewLoadPartitionsOption(schema.CollectionName, parName, common.DefaultPartition)) + res, err = mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(common.QueryCountFieldName).WithConsistencyLevel(entity.ClStrong)) + common.CheckErr(t, err, true) + count, _ = res.GetColumn(common.QueryCountFieldName).GetAsInt64(0) + require.Equal(t, int64(common.DefaultNb*2), count) +} + +// test load repeated partition names +func TestLoadPartitionsRepeatedly(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := createDefaultMilvusClient(ctx, t) + + parName := common.GenRandomString("p", 4) + + // create collection and partition + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption()) + err := mc.CreatePartition(ctx, clientv2.NewCreatePartitionOption(schema.CollectionName, parName)) + common.CheckErr(t, err, true) + + _parPk := hp.GenColumnData(common.DefaultNb, entity.FieldTypeInt64, *hp.TNewDataOption().TWithStart(common.DefaultNb)) + _parVec := hp.GenColumnData(common.DefaultNb, entity.FieldTypeFloatVector, *hp.TNewDataOption().TWithStart(common.DefaultNb)) + insertRes2, err2 := mc.Insert(ctx, clientv2.NewColumnBasedInsertOption(schema.CollectionName).WithColumns(_parPk, _parVec).WithPartition(parName)) + common.CheckErr(t, err2, true) + require.EqualValues(t, common.DefaultNb, insertRes2.InsertCount) + + // create index + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + + // load partition with repeated names + taskDef, errDef := mc.LoadPartitions(ctx, clientv2.NewLoadPartitionsOption(schema.CollectionName, parName)) + common.CheckErr(t, errDef, true) + err = taskDef.Await(ctx) + common.CheckErr(t, err, true) + + // query count(*) from default partition + res, err := mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(common.QueryCountFieldName).WithConsistencyLevel(entity.ClStrong)) + common.CheckErr(t, err, true) + count, _ := res.GetColumn(common.QueryCountFieldName).GetAsInt64(0) + require.EqualValues(t, common.DefaultNb, count) +} + +func TestLoadMultiVectorsIndex(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := createDefaultMilvusClient(ctx, t) + + // create collection -> insert -> flush -> index + _, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64MultiVec), hp.TNewFieldsOption(), hp.TNewSchemaOption().TWithEnableDynamicField(true)) + hnswIdx := index.NewHNSWIndex(entity.IP, 8, 200) + + for _, fieldName := range []string{common.DefaultFloatVecFieldName, common.DefaultFloat16VecFieldName, common.DefaultBFloat16VecFieldName} { + idxTask, err := mc.CreateIndex(ctx, clientv2.NewCreateIndexOption(schema.CollectionName, fieldName, hnswIdx)) + common.CheckErr(t, err, true) + err = idxTask.Await(ctx) + common.CheckErr(t, err, true) + } + + _, err := mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName)) + common.CheckErr(t, err, false, "there is no vector index on field") +} + +func TestLoadCollectionAllFields(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := createDefaultMilvusClient(ctx, t) + + // create collection -> insert -> flush -> index + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.AllFields), hp.TNewFieldsOption(), hp.TNewSchemaOption().TWithEnableDynamicField(true)) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption()) + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + + // load collection + loadTask, err := mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName)) + common.CheckErr(t, err, true) + err = loadTask.Await(ctx) + common.CheckErr(t, err, true) + + // query count(*) + res, err := mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(common.QueryCountFieldName).WithConsistencyLevel(entity.ClStrong)) + common.CheckErr(t, err, true) + count, _ := res.GetColumn(common.QueryCountFieldName).GetAsInt64(0) + require.EqualValues(t, common.DefaultNb, count) +} + +func TestLoadCollectionSparse(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := createDefaultMilvusClient(ctx, t) + + // create collection -> insert -> flush -> index + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64VarcharSparseVec), hp.TNewFieldsOption(), hp.TNewSchemaOption().TWithEnableDynamicField(true)) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithSparseMaxLen(200)) + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + + // load collection + loadTask, err := mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName)) + common.CheckErr(t, err, true) + err = loadTask.Await(ctx) + common.CheckErr(t, err, true) + + // query count(*) + res, err := mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(common.QueryCountFieldName).WithConsistencyLevel(entity.ClStrong)) + common.CheckErr(t, err, true) + count, _ := res.GetColumn(common.QueryCountFieldName).GetAsInt64(0) + require.EqualValues(t, common.DefaultNb, count) +} + +func TestReleaseCollection(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := createDefaultMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption()) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithNb(100)) + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + + // release collection + err := mc.ReleaseCollection(ctx, clientv2.NewReleaseCollectionOption(schema.CollectionName)) + common.CheckErr(t, err, true) + + // query count(*) -> error + _, err = mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(common.QueryCountFieldName).WithConsistencyLevel(entity.ClStrong)) + common.CheckErr(t, err, false, "collection not loaded") +} + +func TestReleaseCollectionNotExist(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + // connect + mc := createDefaultMilvusClient(ctx, t) + + // Load collection + errLoad := mc.ReleaseCollection(ctx, clientv2.NewReleaseCollectionOption("collName")) + common.CheckErr(t, errLoad, false, "collection not found[database=default][collection=collName]") +} + +func TestReleasePartitions(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := createDefaultMilvusClient(ctx, t) + + parName := common.GenRandomString("p", 4) + nb := 100 + + // create collection and partition + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption()) + err := mc.CreatePartition(ctx, clientv2.NewCreatePartitionOption(schema.CollectionName, parName)) + common.CheckErr(t, err, true) + + // insert [0, nb) into default partition and new partition + _defVec := hp.GenColumnData(nb, entity.FieldTypeFloatVector, *hp.TNewDataOption()) + _defPk := hp.GenColumnData(nb, entity.FieldTypeInt64, *hp.TNewDataOption()) + insertRes1, err1 := mc.Insert(ctx, clientv2.NewColumnBasedInsertOption(schema.CollectionName).WithColumns(_defPk, _defVec)) + common.CheckErr(t, err1, true) + require.EqualValues(t, nb, insertRes1.InsertCount) + + insertRes2, err2 := mc.Insert(ctx, clientv2.NewColumnBasedInsertOption(schema.CollectionName).WithColumns(_defVec, _defPk).WithPartition(parName)) + common.CheckErr(t, err2, true) + require.EqualValues(t, nb, insertRes2.InsertCount) + + // create index + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + + // release partition + errRelease := mc.ReleasePartitions(ctx, clientv2.NewReleasePartitionsOptions(schema.CollectionName, parName, common.DefaultPartition)) + common.CheckErr(t, errRelease, true) + + // check release success + _, errQuery := mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(common.QueryCountFieldName).WithPartitions(parName)) + common.CheckErr(t, errQuery, false, "collection not loaded") +} + +func TestReleasePartitionsNotExist(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + // connect + mc := createDefaultMilvusClient(ctx, t) + + // Load collection + _, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption()) + errLoad := mc.ReleasePartitions(ctx, clientv2.NewReleasePartitionsOptions(schema.CollectionName, "parName")) + common.CheckErr(t, errLoad, false, "partition not found") +}