From 1fb8b46db09abf447f961e80c556c070632e1c62 Mon Sep 17 00:00:00 2001 From: yanliang567 <82361606+yanliang567@users.noreply.github.com> Date: Mon, 23 Sep 2024 14:09:11 +0800 Subject: [PATCH] test: Share one collection for group search tests (#36427) related issue: #36407 1. add partial load tests 2. use new test class to share one collection for all grouping search tests Signed-off-by: yanliang567 --- .../testcases/test_field_partial_load.py | 471 ++++++++++++++++++ .../testcases/test_mix_scenes.py | 350 +++++++++++++ tests/python_client/testcases/test_search.py | 424 ---------------- 3 files changed, 821 insertions(+), 424 deletions(-) create mode 100644 tests/python_client/testcases/test_field_partial_load.py diff --git a/tests/python_client/testcases/test_field_partial_load.py b/tests/python_client/testcases/test_field_partial_load.py new file mode 100644 index 0000000000..49a1356607 --- /dev/null +++ b/tests/python_client/testcases/test_field_partial_load.py @@ -0,0 +1,471 @@ +import pytest +from base.client_base import TestcaseBase +from common import common_func as cf +from common import common_type as ct +from common.common_type import CaseLabel, CheckTasks +from utils.util_pymilvus import * + + +class TestFieldPartialLoad(TestcaseBase): + @pytest.mark.tags(CaseLabel.L0) + def test_field_partial_load_default(self): + """ + target: test field partial load + method: + 1. create a collection with fields + 2. index/not index fields to be loaded; index/not index fields to be skipped + 3. load a part of the fields + expected: + 1. verify the collection loaded successfully + 2. verify the loaded fields can be searched in expr and output_fields + 3. verify the skipped fields not loaded, and cannot search with them in expr or output_fields + """ + self._connect() + name = cf.gen_unique_str() + dim = 128 + nb = 2000 + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + load_int64_field = cf.gen_int64_field(name="int64_load") + not_load_int64_field = cf.gen_int64_field(name="int64_not_load") + load_string_field = cf.gen_string_field(name="string_load") + not_load_string_field = cf.gen_string_field(name="string_not_load") + vector_field = cf.gen_float_vec_field(dim=dim) + schema = cf.gen_collection_schema(fields=[pk_field, load_int64_field, not_load_int64_field, + load_string_field, not_load_string_field, vector_field], + auto_id=True) + collection_w = self.init_collection_wrap(name=name, schema=schema) + int_values = [i for i in range(nb)] + string_values = [str(i) for i in range(nb)] + float_vec_values = gen_vectors(nb, dim) + collection_w.insert([int_values, int_values, string_values, string_values, float_vec_values]) + + # build index + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + collection_w.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name, load_int64_field.name], + replica_number=1) + # search + search_params = ct.default_search_params + nq = 2 + search_vectors = float_vec_values[0:nq] + res, _ = collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params, + limit=100, output_fields=["*"]) + assert pk_field.name in res[0][0].fields.keys() \ + and vector_field.name in res[0][0].fields.keys() \ + and load_string_field.name in res[0][0].fields.keys() \ + and load_int64_field.name in res[0][0].fields.keys() \ + and not_load_string_field.name not in res[0][0].fields.keys() \ + and not_load_int64_field.name not in res[0][0].fields.keys() + + # release and reload with some other fields + collection_w.release() + collection_w.load(load_fields=[pk_field.name, vector_field.name, + not_load_string_field.name, not_load_int64_field.name]) + res, _ = collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params, + limit=100, output_fields=["*"]) + assert pk_field.name in res[0][0].fields.keys() \ + and vector_field.name in res[0][0].fields.keys() \ + and load_string_field.name not in res[0][0].fields.keys() \ + and load_int64_field.name not in res[0][0].fields.keys() \ + and not_load_string_field.name in res[0][0].fields.keys() \ + and not_load_int64_field.name in res[0][0].fields.keys() + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.xfail(reason="issue #36353") + def test_skip_load_dynamic_field(self): + """ + target: test skip load dynamic field + method: + 1. create a collection with dynamic field + 2. load + 3. search on dynamic field in expr and/or output_fields + expected: search successfully + 4. release and reload with skip load dynamic field=true + 5. search on dynamic field in expr and/or output_fields + expected: raise exception + """ + self._connect() + name = cf.gen_unique_str() + dim = 128 + nb = 2000 + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + load_int64_field = cf.gen_int64_field(name="int64_load") + load_string_field = cf.gen_string_field(name="string_load") + vector_field = cf.gen_float_vec_field(dim=dim) + schema = cf.gen_collection_schema(fields=[pk_field, load_int64_field, load_string_field, vector_field], + auto_id=True, enable_dynamic_field=True) + collection_w = self.init_collection_wrap(name=name, schema=schema) + data = [] + for i in range(nb): + data.append({ + f"{load_int64_field.name}": i, + f"{load_string_field.name}": str(i), + f"{vector_field.name}": [random.uniform(-1, 1) for _ in range(dim)], + "color": i, + "tag": i, + }) + collection_w.insert(data) + + # build index + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + collection_w.load() + # search + search_params = ct.default_search_params + nq = 2 + search_vectors = cf.gen_vectors(nq, dim) + res, _ = collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params, + expr="color > 0", + limit=100, output_fields=["*"], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": 100}) + + collection_w.release() + collection_w.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name], + skip_load_dynamic_field=True) + error = {ct.err_code: 999, ct.err_msg: f"field color cannot be returned since dynamic field not loaded"} + collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params, + limit=100, output_fields=["color"], + check_task=CheckTasks.err_res, check_items=error) + error = {ct.err_code: 999, ct.err_msg: f"field color is dynamic but dynamic field is not loaded"} + collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params, + expr="color > 0", limit=100, output_fields=["*"], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + def test_skip_load_some_vector_fields(self): + """ + target: test skip load some vector fields + method: + 1. create a collection with multiple vector fields + 2. not create index for skip load vector fields + 2. load some vector fields + 3. search on vector fields in expr and/or output_fields + expected: search successfully + """ + self._connect() + name = cf.gen_unique_str() + dim = 128 + nb = 2000 + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + load_string_field = cf.gen_string_field(name="string_load") + vector_field = cf.gen_float_vec_field(name="vec_float32", dim=dim) + sparse_vector_field = cf.gen_float_vec_field(name="sparse", vector_data_type="SPARSE_FLOAT_VECTOR") + schema = cf.gen_collection_schema(fields=[pk_field, load_string_field, vector_field, sparse_vector_field], + auto_id=True) + collection_w = self.init_collection_wrap(name=name, schema=schema) + string_values = [str(i) for i in range(nb)] + float_vec_values = cf.gen_vectors(nb, dim) + sparse_vec_values = cf.gen_vectors(nb, dim, vector_data_type="SPARSE_FLOAT_VECTOR") + collection_w.insert([string_values, float_vec_values, sparse_vec_values]) + + # build index on one of vector fields + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + # not load sparse vector field + collection_w.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name]) + # search + search_params = ct.default_search_params + nq = 2 + search_vectors = float_vec_values[0:nq] + res, _ = collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params, + limit=100, output_fields=["*"], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": 100}) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.xfail(reason="fail to load 2 partition with same load_fields #36037 ") + def test_partial_load_with_partition(self): + """ + target: test partial load with partitions + method: + 1. create a collection with fields + 2. create 2 partitions: p1, p2 + 3. partial load p1 + 4. search on p1 + 5. load p2 with different fields + expected: p2 load fail + 6. load p2 with the same partial fields + 7. search on p2 + expected: search successfully + 8. load the collection with all fields + expected: load fail + """ + self._connect() + name = cf.gen_unique_str() + dim = 128 + nb = 2000 + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + load_int64_field = cf.gen_int64_field(name="int64_load") + not_load_int64_field = cf.gen_int64_field(name="int64_not_load") + load_string_field = cf.gen_string_field(name="string_load") + not_load_string_field = cf.gen_string_field(name="string_not_load") + vector_field = cf.gen_float_vec_field(dim=dim) + schema = cf.gen_collection_schema(fields=[pk_field, load_int64_field, not_load_int64_field, + load_string_field, not_load_string_field, vector_field], + auto_id=True) + collection_w = self.init_collection_wrap(name=name, schema=schema) + p1 = self.init_partition_wrap(collection_w, name='p1') + p2 = self.init_partition_wrap(collection_w, name='p2') + int_values = [i for i in range(nb)] + string_values = [str(i) for i in range(nb)] + float_vec_values = gen_vectors(nb, dim) + p1.insert([int_values, int_values, string_values, string_values, float_vec_values]) + p2.insert([int_values, int_values, string_values, string_values, float_vec_values]) + + # build index + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + # p1 load with partial fields + p1.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name, load_int64_field.name]) + # search + search_params = ct.default_search_params + nq = 2 + search_vectors = float_vec_values[0:nq] + res, _ = p1.search(data=search_vectors, anns_field=vector_field.name, params=search_params, + limit=100, output_fields=["*"]) + assert pk_field.name in res[0][0].fields.keys() \ + and vector_field.name in res[0][0].fields.keys() + # load p2 with different fields + error = {ct.err_code: 999, ct.err_msg: f"can't change the load field list for loaded collection"} + p2.load(load_fields=[pk_field.name, vector_field.name, not_load_string_field.name, not_load_int64_field.name], + check_task=CheckTasks.err_res, check_items=error) + # load p2 with the same partial fields + p2.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name, load_int64_field.name]) + res, _ = p2.search(data=search_vectors, anns_field=vector_field.name, params=search_params, + limit=100, output_fields=["*"]) + assert pk_field.name in res[0][0].fields.keys() \ + and vector_field.name in res[0][0].fields.keys() + + # load the collection with all fields + collection_w.load(check_task=CheckTasks.err_res, check_items=error) + collection_w.search(data=search_vectors, anns_field=vector_field.name, params=search_params, + limit=100, output_fields=["*"], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + def test_skip_load_on_all_scalar_field_types(self): + """ + target: test skip load on all scalar field types + method: + 1. create a collection with fields define skip load on all scalar field types + expected: + 1. load and search successfully + """ + prefix = "partial_load" + collection_w = self.init_collection_general(prefix, insert_data=True, is_index=True, + is_all_data_type=True, with_json=True)[0] + collection_w.release() + # load with only pk field and vector field + collection_w.load(load_fields=[ct.default_int64_field_name, ct.default_float_vec_field_name]) + search_vectors = cf.gen_vectors(1, ct.default_dim) + search_params = {"params": {}} + res = collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name, + param=search_params, limit=10, output_fields=["*"], + check_tasks=CheckTasks.check_search_results, check_items={"nq": 1, "limit": 10})[0] + assert len(res[0][0].fields.keys()) == 2 + + +class TestFieldPartialLoadInvalid(TestcaseBase): + @pytest.mark.tags(CaseLabel.L1) + def test_skip_load_on_pk_field_or_vector_field(self): + """ + target: test skip load on pk field + method: + 1. create a collection with fields define skip load on pk field + expected: + 1. raise exception + """ + self._connect() + name = cf.gen_unique_str() + dim = 32 + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + load_int64_field = cf.gen_int64_field(name="int64_load") + vector_field = cf.gen_float_vec_field(dim=dim) + schema = cf.gen_collection_schema(fields=[pk_field, load_int64_field, vector_field], auto_id=True) + collection_w = self.init_collection_wrap(name=name, schema=schema) + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + # load without pk field + error = {ct.err_code: 999, ct.err_msg: f"does not contain primary key field {pk_field.name}"} + collection_w.load(load_fields=[vector_field.name, load_int64_field.name], + check_task=CheckTasks.err_res, check_items=error) + error = {ct.err_code: 999, ct.err_msg: f"does not contain vector field"} + collection_w.load(load_fields=[pk_field.name, load_int64_field.name], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + def test_skip_load_on_partition_key_field(self): + """ + target: test skip load on partition key field + method: + 1. create a collection with fields define skip load on partition key field + expected: + 1. raise exception + """ + self._connect() + name = cf.gen_unique_str() + dim = 32 + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + partition_key_field = cf.gen_int64_field(name="int64_load", is_partition_key=True) + vector_field = cf.gen_float_vec_field(dim=dim) + schema = cf.gen_collection_schema(fields=[pk_field, partition_key_field, vector_field], auto_id=True) + collection_w = self.init_collection_wrap(name=name, schema=schema) + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + # load without pk field + error = {ct.err_code: 999, ct.err_msg: f"does not contain partition key field {partition_key_field.name}"} + collection_w.load(load_fields=[vector_field.name, pk_field.name], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + def test_skip_load_on_clustering_key_field(self): + """ + target: test skip load on clustering key field + method: + 1. create a collection with fields define skip load on clustering key field + expected: + 1. raise exception + """ + self._connect() + name = cf.gen_unique_str() + dim = 32 + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + clustering_key_field = cf.gen_int64_field(name="int64_load", is_clustering_key=True) + vector_field = cf.gen_float_vec_field(dim=dim) + schema = cf.gen_collection_schema(fields=[pk_field, clustering_key_field, vector_field], auto_id=True) + collection_w = self.init_collection_wrap(name=name, schema=schema) + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + # load without pk field + error = {ct.err_code: 999, ct.err_msg: f"does not contain clustering key field {clustering_key_field.name}"} + collection_w.load(load_fields=[vector_field.name, pk_field.name], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + def test_update_load_fields_list_when_reloading_collection(self): + """ + target: test update load fields list when reloading collection + method: + 1. create a collection with fields + 2. load a part of the fields + 3. update load fields list when reloading collection + expected: + 1. raise exception + """ + self._connect() + name = cf.gen_unique_str() + dim = 32 + nb = 2000 + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + not_load_int64_field = cf.gen_int64_field(name="not_int64_load") + load_string_field = cf.gen_string_field(name="string_load") + vector_field = cf.gen_float_vec_field(dim=dim) + schema = cf.gen_collection_schema(fields=[pk_field, not_load_int64_field, load_string_field, vector_field], + auto_id=True, enable_dynamic_field=True) + collection_w = self.init_collection_wrap(name=name, schema=schema) + int_values = [i for i in range(nb)] + string_values = [str(i) for i in range(nb)] + float_vec_values = cf.gen_vectors(nb, dim) + collection_w.insert([int_values, string_values, float_vec_values]) + + # build index + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + collection_w.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name]) + # search + search_params = ct.default_search_params + nq = 1 + search_vectors = float_vec_values[0:nq] + collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params, + limit=10, output_fields=[load_string_field.name], + check_task=CheckTasks.check_search_results, check_items={"nq": nq, "limit": 10}) + # try to add more fields in load fields list when reloading + error = {ct.err_code: 999, ct.err_msg: f"can't change the load field list for loaded collection"} + collection_w.load(load_fields=[pk_field.name, vector_field.name, + load_string_field.name, not_load_int64_field.name], + check_task=CheckTasks.err_res, check_items=error) + # try to remove fields in load fields list when reloading + collection_w.load(load_fields=[pk_field.name, vector_field.name], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + def test_one_of_dynamic_fields_in_load_fields_list(self): + """ + target: test one of dynamic fields in load fields list + method: + 1. create a collection with fields + 3. add one of dynamic fields in load fields list when loading + expected: raise exception + 4. add non_existing field in load fields list when loading + expected: raise exception + """ + self._connect() + name = cf.gen_unique_str() + dim = 32 + nb = 2000 + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + load_int64_field = cf.gen_int64_field(name="int64_load") + load_string_field = cf.gen_string_field(name="string_load") + vector_field = cf.gen_float_vec_field(dim=dim) + schema = cf.gen_collection_schema(fields=[pk_field, load_int64_field, load_string_field, vector_field], + auto_id=True, enable_dynamic_field=True) + collection_w = self.init_collection_wrap(name=name, schema=schema) + data = [] + for i in range(nb): + data.append({ + f"{load_int64_field.name}": i, + f"{load_string_field.name}": str(i), + f"{vector_field.name}": [random.uniform(-1, 1) for _ in range(dim)], + "color": i, + "tag": i, + }) + collection_w.insert(data) + # build index + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + # add one of dynamic fields in load fields list + error = {ct.err_code: 999, + ct.err_msg: f"failed to get field schema by name: fieldName(color) not found"} + collection_w.load(load_fields=[pk_field.name, vector_field.name, "color"], + check_task=CheckTasks.err_res, check_items=error) + # add non_existing field in load fields list + error = {ct.err_code: 999, + ct.err_msg: f"failed to get field schema by name: fieldName(not_existing) not found"} + collection_w.load(load_fields=[pk_field.name, vector_field.name, "not_existing"], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + def test_search_on_not_loaded_fields(self): + """ + target: test search on skipped fields + method: + 1. create a collection with fields + 2. load a part of the fields + 3. search on skipped fields in expr and/or output_fields + expected: + 1. raise exception + """ + self._connect() + name = cf.gen_unique_str() + dim = 32 + nb = 2000 + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + not_load_int64_field = cf.gen_int64_field(name="not_int64_load") + load_string_field = cf.gen_string_field(name="string_load") + vector_field = cf.gen_float_vec_field(dim=dim) + schema = cf.gen_collection_schema(fields=[pk_field, not_load_int64_field, load_string_field, vector_field], + auto_id=True, enable_dynamic_field=True) + collection_w = self.init_collection_wrap(name=name, schema=schema) + int_values = [i for i in range(nb)] + string_values = [str(i) for i in range(nb)] + float_vec_values = cf.gen_vectors(nb, dim) + collection_w.insert([int_values, string_values, float_vec_values]) + + # build index + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + collection_w.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name]) + # search + search_params = ct.default_search_params + nq = 1 + search_vectors = float_vec_values[0:nq] + error = {ct.err_code: 999, ct.err_msg: f"field {not_load_int64_field.name} is not loaded"} + collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params, + limit=10, output_fields=[not_load_int64_field.name, load_string_field.name], + check_task=CheckTasks.err_res, check_items=error) + error = {ct.err_code: 999, ct.err_msg: f"cannot parse expression"} + collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params, + expr=f"{not_load_int64_field.name} > 0", + limit=10, output_fields=[load_string_field.name], + check_task=CheckTasks.err_res, check_items=error) diff --git a/tests/python_client/testcases/test_mix_scenes.py b/tests/python_client/testcases/test_mix_scenes.py index daaf57596d..39aaf04a57 100644 --- a/tests/python_client/testcases/test_mix_scenes.py +++ b/tests/python_client/testcases/test_mix_scenes.py @@ -2,10 +2,14 @@ import re import math # do not remove `math` import pytest from pymilvus import DataType, AnnSearchRequest, RRFRanker +import numpy as np +import random +from pymilvus import AnnSearchRequest, RRFRanker, WeightedRanker from common.common_type import CaseLabel, CheckTasks from common import common_type as ct from common import common_func as cf +from utils.util_log import test_log as log from common.code_mapping import QueryErrorMessage as qem from common.common_params import ( FieldParams, MetricType, DefaultVectorIndexParams, DefaultScalarIndexParams, Expr, AlterIndexParams @@ -1156,3 +1160,349 @@ class TestMixScenes(TestcaseBase): # re-query self.collection_wrap.query(expr=expr, output_fields=scalar_fields, check_task=CheckTasks.check_query_results, check_items={"exp_res": []}) + + +@pytest.mark.xdist_group("TestMultiVectorsGroupSearch") +class TestGroupSearch(TestCaseClassBase): + """ + Testing group search scenarios + 1. collection schema: int64_pk(auto_id), float16_vector, float_vector, bfloat16_vector, varchar + 2. varchar field is inserted with dup values for group by + 3. index for each vector field with different index types, dims and metric types + Author: Yanliang567 + """ + def setup_class(self): + super().setup_class(self) + + # connect to server before testing + self._connect(self) + + # init params + self.primary_field = "int64_pk" + self.indexed_string_field = "varchar_with_index" + + # create a collection with fields + self.collection_wrap.init_collection( + name=cf.gen_unique_str("test_group_search"), + schema=cf.set_collection_schema( + fields=[DataType.VARCHAR.name, self.primary_field, DataType.FLOAT16_VECTOR.name, + DataType.FLOAT_VECTOR.name, DataType.BFLOAT16_VECTOR.name, + DataType.INT8.name, DataType.INT64.name, DataType.BOOL.name, + self.indexed_string_field], + field_params={ + self.primary_field: FieldParams(is_primary=True).to_dict, + DataType.FLOAT16_VECTOR.name: FieldParams(dim=31).to_dict, + DataType.FLOAT_VECTOR.name: FieldParams(dim=64).to_dict, + DataType.BFLOAT16_VECTOR.name: FieldParams(dim=24).to_dict, + }, + auto_id=True + ) + ) + + self.vector_fields = [DataType.FLOAT16_VECTOR.name, DataType.FLOAT_VECTOR.name, DataType.BFLOAT16_VECTOR.name] + self.dims = [31, 64, 24] + self.index_types = ["IVF_SQ8", "HNSW", "IVF_FLAT"] + + @pytest.fixture(scope="class", autouse=True) + def prepare_data(self): + # prepare data (> 1024 triggering index building) + import pandas as pd + nb = 100 + for _ in range(100): + string_values = pd.Series(data=[str(i) for i in range(nb)], dtype="string") + data = [string_values] + for i in range(len(self.vector_fields)): + data.append(cf.gen_vectors(dim=self.dims[i], nb=nb, vector_data_type=self.vector_fields[i])) + data.append(pd.Series(data=[np.int8(i) for i in range(nb)], dtype="int8")) + data.append(pd.Series(data=[np.int64(i) for i in range(nb)], dtype="int64")) + data.append(pd.Series(data=[np.bool_(i) for i in range(nb)], dtype="bool")) + data.append(pd.Series(data=[str(i) for i in range(nb)], dtype="string")) + self.collection_wrap.insert(data) + + # flush collection, segment sealed + self.collection_wrap.flush() + + # build index for each vector field + index_params = { + **DefaultVectorIndexParams.IVF_SQ8(DataType.FLOAT16_VECTOR.name, metric_type=MetricType.L2), + **DefaultVectorIndexParams.HNSW(DataType.FLOAT_VECTOR.name, metric_type=MetricType.IP), + **DefaultVectorIndexParams.IVF_FLAT(DataType.BFLOAT16_VECTOR.name, metric_type=MetricType.COSINE), + # index params for varchar field + **DefaultScalarIndexParams.INVERTED(self.indexed_string_field) + } + + self.build_multi_index(index_params=index_params) + assert sorted([n.field_name for n in self.collection_wrap.indexes]) == sorted(index_params.keys()) + + # load collection + self.collection_wrap.load() + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.xfail(reason="issue #36407") + @pytest.mark.parametrize("group_by_field", [DataType.VARCHAR.name, "varchar_with_index"]) + def test_search_group_size(self, group_by_field): + """ + target: + 1. search on 3 different float vector fields with group by varchar field with group size + verify results entity = limit * group_size and group size is full if group_strict_size is True + verify results group counts = limit if group_strict_size is False + """ + nq = 2 + limit = 50 + group_size = 5 + for j in range(len(self.vector_fields)): + search_vectors = cf.gen_vectors(nq, dim=self.dims[j], vector_data_type=self.vector_fields[j]) + search_params = {"params": cf.get_search_params_params(self.index_types[j])} + # when group_strict_size=true, it shall return results with entities = limit * group_size + res1 = self.collection_wrap.search(data=search_vectors, anns_field=self.vector_fields[j], + param=search_params, limit=limit, + group_by_field=group_by_field, + group_size=group_size, group_strict_size=True, + output_fields=[group_by_field])[0] + for i in range(nq): + assert len(res1[i]) == limit * group_size + for l in range(limit): + group_values = [] + for k in range(group_size): + group_values.append(res1[i][l*group_size+k].fields.get(group_by_field)) + assert len(set(group_values)) == 1 + + # when group_strict_size=false, it shall return results with group counts = limit + res1 = self.collection_wrap.search(data=search_vectors, anns_field=self.vector_fields[j], + param=search_params, limit=limit, + group_by_field=group_by_field, + group_size=group_size, group_strict_size=False, + output_fields=[group_by_field])[0] + for i in range(nq): + group_values = [] + for l in range(len(res1[i])): + group_values.append(res1[i][l].fields.get(group_by_field)) + assert len(set(group_values)) == limit + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.xfail(reason="issue #36407") + def test_hybrid_search_group_size(self): + """ + hybrid search group by on 3 different float vector fields with group by varchar field with group size + verify results returns with de-dup group values and group distances are in order as rank_group_scorer + """ + nq = 2 + limit = 50 + group_size = 5 + req_list = [] + for j in range(len(self.vector_fields)): + search_params = { + "data": cf.gen_vectors(nq, dim=self.dims[j], vector_data_type=self.vector_fields[j]), + "anns_field": self.vector_fields[j], + "param": {"params": cf.get_search_params_params(self.index_types[j])}, + "limit": limit, + "expr": f"{self.primary_field} > 0"} + req = AnnSearchRequest(**search_params) + req_list.append(req) + # 4. hybrid search group by + rank_scorers = ["max", "avg", "sum"] + for scorer in rank_scorers: + res = self.collection_wrap.hybrid_search(req_list, WeightedRanker(0.3, 0.3, 0.3), limit=limit, + group_by_field=DataType.VARCHAR.name, + group_size=group_size, rank_group_scorer=scorer, + output_fields=[DataType.VARCHAR.name])[0] + for i in range(nq): + group_values = [] + for l in range(len(res[i])): + group_values.append(res[i][l].fields.get(DataType.VARCHAR.name)) + assert len(set(group_values)) == limit + + # group_distances = [] + tmp_distances = [100 for _ in range(group_size)] # init with a large value + group_distances = [res[i][0].distance] # init with the first value + for l in range(len(res[i]) - 1): + curr_group_value = res[i][l].fields.get(DataType.VARCHAR.name) + next_group_value = res[i][l + 1].fields.get(DataType.VARCHAR.name) + if curr_group_value == next_group_value: + group_distances.append(res[i][l + 1].distance) + else: + if scorer == 'sum': + assert np.sum(group_distances) < np.sum(tmp_distances) + elif scorer == 'avg': + assert np.mean(group_distances) < np.mean(tmp_distances) + else: # default max + assert np.max(group_distances) < np.max(tmp_distances) + + tmp_distances = group_distances + group_distances = [res[i][l + 1].distance] + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.xfail(reason="issue #36407") + def test_hybrid_search_group_by(self): + """ + verify hybrid search group by works with different Rankers + """ + # 3. prepare search params + req_list = [] + for i in range(len(self.vector_fields)): + search_param = { + "data": cf.gen_vectors(ct.default_nq, dim=self.dims[i], vector_data_type=self.vector_fields[i]), + "anns_field": self.vector_fields[i], + "param": {}, + "limit": ct.default_limit, + "expr": f"{self.primary_field} > 0"} + req = AnnSearchRequest(**search_param) + req_list.append(req) + # 4. hybrid search group by + res = self.collection_wrap.hybrid_search(req_list, WeightedRanker(0.1, 0.9, 0.2), ct.default_limit, + group_by_field=DataType.VARCHAR.name, + output_fields=[DataType.VARCHAR.name], + check_task=CheckTasks.check_search_results, + check_items={"nq": ct.default_nq, "limit": ct.default_limit})[0] + print(res) + for i in range(ct.default_nq): + group_values = [] + for l in range(ct.default_limit): + group_values.append(res[i][l].fields.get(DataType.VARCHAR.name)) + assert len(group_values) == len(set(group_values)) + + # 5. hybrid search with RRFRanker on one vector field with group by + req_list = [] + for i in range(1, len(self.vector_fields)): + search_param = { + "data": cf.gen_vectors(ct.default_nq, dim=self.dims[i], vector_data_type=self.vector_fields[i]), + "anns_field": self.vector_fields[i], + "param": {}, + "limit": ct.default_limit, + "expr": f"{self.primary_field} > 0"} + req = AnnSearchRequest(**search_param) + req_list.append(req) + self.collection_wrap.hybrid_search(req_list, RRFRanker(), ct.default_limit, + group_by_field=self.indexed_string_field, + check_task=CheckTasks.check_search_results, + check_items={"nq": ct.default_nq, "limit": ct.default_limit}) + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("support_field", [DataType.INT8.name, DataType.INT64.name, + DataType.BOOL.name, DataType.VARCHAR.name]) + def test_search_group_by_supported_scalars(self, support_field): + """ + verify search group by works with supported scalar fields + """ + nq = 2 + limit = 15 + for j in range(len(self.vector_fields)): + search_vectors = cf.gen_vectors(nq, dim=self.dims[j], vector_data_type=self.vector_fields[j]) + search_params = {"params": cf.get_search_params_params(self.index_types[j])} + res1 = self.collection_wrap.search(data=search_vectors, anns_field=self.vector_fields[j], + param=search_params, limit=limit, + group_by_field=support_field, + output_fields=[support_field])[0] + for i in range(nq): + grpby_values = [] + dismatch = 0 + results_num = 2 if support_field == DataType.BOOL.name else limit + for l in range(results_num): + top1 = res1[i][l] + top1_grpby_pk = top1.id + top1_grpby_value = top1.fields.get(support_field) + expr = f"{support_field}=={top1_grpby_value}" + if support_field == DataType.VARCHAR.name: + expr = f"{support_field}=='{top1_grpby_value}'" + grpby_values.append(top1_grpby_value) + res_tmp = self.collection_wrap.search(data=[search_vectors[i]], anns_field=self.vector_fields[j], + param=search_params, limit=1, expr=expr, + output_fields=[support_field])[0] + top1_expr_pk = res_tmp[0][0].id + if top1_grpby_pk != top1_expr_pk: + dismatch += 1 + log.info(f"{support_field} on {self.vector_fields[j]} dismatch_item, top1_grpby_dis: {top1.distance}, top1_expr_dis: {res_tmp[0][0].distance}") + log.info(f"{support_field} on {self.vector_fields[j]} top1_dismatch_num: {dismatch}, results_num: {results_num}, dismatch_rate: {dismatch / results_num}") + baseline = 1 if support_field == DataType.BOOL.name else 0.2 # skip baseline check for boolean + assert dismatch / results_num <= baseline + # verify no dup values of the group_by_field in results + assert len(grpby_values) == len(set(grpby_values)) + + @pytest.mark.tags(CaseLabel.L2) + def test_search_pagination_group_by(self): + """ + verify search group by works with pagination + """ + limit = 10 + page_rounds = 3 + search_param = {} + default_search_exp = f"{self.primary_field} >= 0" + grpby_field = self.indexed_string_field + default_search_field = self.vector_fields[1] + search_vectors = cf.gen_vectors(1, dim=self.dims[1], vector_data_type=self.vector_fields[1]) + all_pages_ids = [] + all_pages_grpby_field_values = [] + for r in range(page_rounds): + page_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field, + param=search_param, limit=limit, offset=limit * r, + expr=default_search_exp, group_by_field=grpby_field, + output_fields=[grpby_field], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": limit}, + )[0] + for j in range(limit): + all_pages_grpby_field_values.append(page_res[0][j].get(grpby_field)) + all_pages_ids += page_res[0].ids + hit_rate = round(len(set(all_pages_grpby_field_values)) / len(all_pages_grpby_field_values), 3) + assert hit_rate >= 0.8 + + total_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field, + param=search_param, limit=limit * page_rounds, + expr=default_search_exp, group_by_field=grpby_field, + output_fields=[grpby_field], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": limit * page_rounds} + )[0] + hit_num = len(set(total_res[0].ids).intersection(set(all_pages_ids))) + hit_rate = round(hit_num / (limit * page_rounds), 3) + assert hit_rate >= 0.8 + log.info(f"search pagination with groupby hit_rate: {hit_rate}") + grpby_field_values = [] + for i in range(limit * page_rounds): + grpby_field_values.append(total_res[0][i].fields.get(grpby_field)) + assert len(grpby_field_values) == len(set(grpby_field_values)) + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.skip(reason="issue #36401") + def test_search_pagination_group_size(self): + limit = 10 + group_size = 5 + page_rounds = 3 + search_param = {} + default_search_exp = f"{self.primary_field} >= 0" + grpby_field = self.indexed_string_field + default_search_field = self.vector_fields[1] + search_vectors = cf.gen_vectors(1, dim=self.dims[1], vector_data_type=self.vector_fields[1]) + all_pages_ids = [] + all_pages_grpby_field_values = [] + for r in range(page_rounds): + page_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field, + param=search_param, limit=limit, offset=limit * r, + expr=default_search_exp, + group_by_field=grpby_field, group_size=group_size, + output_fields=[grpby_field], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": limit}, + )[0] + for j in range(limit): + all_pages_grpby_field_values.append(page_res[0][j].get(grpby_field)) + all_pages_ids += page_res[0].ids + hit_rate = round(len(set(all_pages_grpby_field_values)) / len(all_pages_grpby_field_values), 3) + assert hit_rate >= 0.8 + + total_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field, + param=search_param, limit=limit * page_rounds, + expr=default_search_exp, + group_by_field=grpby_field, group_size=group_size, + output_fields=[grpby_field], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": limit * page_rounds} + )[0] + hit_num = len(set(total_res[0].ids).intersection(set(all_pages_ids))) + hit_rate = round(hit_num / (limit * page_rounds), 3) + assert hit_rate >= 0.8 + log.info(f"search pagination with groupby hit_rate: {hit_rate}") + grpby_field_values = [] + for i in range(limit * page_rounds): + grpby_field_values.append(total_res[0][i].fields.get(grpby_field)) + assert len(grpby_field_values) == len(set(grpby_field_values)) \ No newline at end of file diff --git a/tests/python_client/testcases/test_search.py b/tests/python_client/testcases/test_search.py index d48c97bd6c..5f2470accf 100644 --- a/tests/python_client/testcases/test_search.py +++ b/tests/python_client/testcases/test_search.py @@ -10240,202 +10240,6 @@ class TestSearchIterator(TestcaseBase): class TestSearchGroupBy(TestcaseBase): """ Test case of search group by """ - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("index_type, metric", zip(["FLAT", "IVF_FLAT", "HNSW"], ct.float_metrics)) - @pytest.mark.parametrize("vector_data_type", ["FLOAT16_VECTOR", "FLOAT_VECTOR", "BFLOAT16_VECTOR"]) - def test_search_group_by_default(self, index_type, metric, vector_data_type): - """ - target: test search group by - method: 1. create a collection with data - 2. create index with different metric types - 3. search with group by - verify no duplicate values for group_by_field - 4. search with filtering every value of group_by_field - verify: verify that every record in groupby results is the top1 for that value of the group_by_field - """ - collection_w = self.init_collection_general(prefix, auto_id=True, insert_data=False, is_index=False, - vector_data_type=vector_data_type, - is_all_data_type=True, with_json=False)[0] - _index_params = {"index_type": index_type, "metric_type": metric, "params": {"M": 16, "efConstruction": 128}} - if index_type in ["IVF_FLAT", "FLAT"]: - _index_params = {"index_type": index_type, "metric_type": metric, "params": {"nlist": 128}} - collection_w.create_index(ct.default_float_vec_field_name, index_params=_index_params) - # insert with the same values for scalar fields - for _ in range(50): - data = cf.gen_dataframe_all_data_type(nb=100, auto_id=True, with_json=False) - collection_w.insert(data) - - collection_w.flush() - collection_w.create_index(ct.default_float_vec_field_name, index_params=_index_params) - collection_w.load() - - search_params = {"metric_type": metric, "params": {"ef": 128}} - nq = 2 - limit = 15 - search_vectors = cf.gen_vectors(nq, dim=ct.default_dim) - # # verify the results are same if gourp by pk - # res1 = collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name, - # param=search_params, limit=limit, consistency_level=CONSISTENCY_STRONG, - # group_by_field=ct.default_int64_field_name)[0] - # res2 = collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name, - # param=search_params, limit=limit, consistency_level=CONSISTENCY_STRONG)[0] - # hits_num = 0 - # for i in range(nq): - # assert res1[i].ids == res2[i].ids - # hits_num += len(set(res1[i].ids).intersection(set(res2[i].ids))) - # hit_rate = hits_num / (nq * limit) - # log.info(f"groupy primary key hits_num: {hits_num}, nq: {nq}, limit: {limit}, hit_rate: {hit_rate}") - # assert hit_rate >= 0.60 - - # verify that every record in groupby results is the top1 for that value of the group_by_field - supported_grpby_fields = [ct.default_int8_field_name, ct.default_int16_field_name, - ct.default_int32_field_name, ct.default_bool_field_name, - ct.default_string_field_name] - for grpby_field in supported_grpby_fields: - res1 = collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name, - param=search_params, limit=limit, - group_by_field=grpby_field, - output_fields=[grpby_field])[0] - for i in range(nq): - grpby_values = [] - dismatch = 0 - results_num = 2 if grpby_field == ct.default_bool_field_name else limit - for l in range(results_num): - top1 = res1[i][l] - top1_grpby_pk = top1.id - top1_grpby_value = top1.fields.get(grpby_field) - expr = f"{grpby_field}=={top1_grpby_value}" - if grpby_field == ct.default_string_field_name: - expr = f"{grpby_field}=='{top1_grpby_value}'" - grpby_values.append(top1_grpby_value) - res_tmp = collection_w.search(data=[search_vectors[i]], anns_field=ct.default_float_vec_field_name, - param=search_params, limit=1, - expr=expr, - output_fields=[grpby_field])[0] - top1_expr_pk = res_tmp[0][0].id - if top1_grpby_pk != top1_expr_pk: - dismatch += 1 - log.info(f"{grpby_field} on {metric} dismatch_item, top1_grpby_dis: {top1.distance}, top1_expr_dis: {res_tmp[0][0].distance}") - log.info(f"{grpby_field} on {metric} top1_dismatch_num: {dismatch}, results_num: {results_num}, dismatch_rate: {dismatch / results_num}") - baseline = 1 if grpby_field == ct.default_bool_field_name else 0.2 # skip baseline check for boolean - assert dismatch / results_num <= baseline - # verify no dup values of the group_by_field in results - assert len(grpby_values) == len(set(grpby_values)) - - @pytest.mark.tags(CaseLabel.L0) - def test_search_group_size_default(self): - """ - target: test search group by - method: 1. create a collection with 3 different float vectors - 2. build index with 3 different index types and metrics - 2. search on 3 different float vector fields with group by varchar field with group size - verify results entity = limit * group_size and group size is full if group_strict_size is True - verify results group counts = limit if group_strict_size is False - """ - self._connect() - dense_types = ["FLOAT16_VECTOR", "FLOAT_VECTOR", "BFLOAT16_VECTOR"] - dims = [16, 128, 64] - index_types = ["FLAT", "IVF_SQ8", "HNSW"] - metrics = ct.float_metrics - fields = [cf.gen_int64_field(is_primary=True), cf.gen_string_field()] - for i in range(len(dense_types)): - fields.append(cf.gen_float_vec_field(name=dense_types[i], - vector_data_type=dense_types[i], dim=dims[i])) - schema = cf.gen_collection_schema(fields, auto_id=True) - collection_w = self.init_collection_wrap(name=prefix, schema=schema) - - # insert with the same values for scalar fields - nb = 100 - for _ in range(100): - string_values = pd.Series(data=[str(i) for i in range(nb)], dtype="string") - data = [string_values] - for i in range(len(dense_types)): - data.append(cf.gen_vectors(dim=dims[i], nb=nb, vector_data_type=dense_types[i])) - collection_w.insert(data) - - collection_w.flush() - for i in range(len(dense_types)): - _index_params = {"index_type": index_types[i], "metric_type": metrics[i], - "params": cf.get_index_params_params(index_types[i])} - collection_w.create_index(dense_types[i], _index_params) - collection_w.load() - - nq = 2 - limit = 50 - group_size = 5 - for j in range(len(dense_types)): - search_vectors = cf.gen_vectors(nq, dim=dims[j], vector_data_type=dense_types[j]) - search_params = {"params": cf.get_search_params_params(index_types[j])} - # when group_strict_size=true, it shall return results with entities = limit * group_size - res1 = collection_w.search(data=search_vectors, anns_field=dense_types[j], - param=search_params, limit=limit, # consistency_level=CONSISTENCY_STRONG, - group_by_field=ct.default_string_field_name, - group_size=group_size, group_strict_size=True, - output_fields=[ct.default_string_field_name])[0] - for i in range(nq): - for l in range(limit): - group_values = [] - for k in range(10): - group_values.append(res1[i][l].fields.get(ct.default_string_field_name)) - assert len(set(group_values)) == 1 - assert len(res1[i]) == limit * group_size - - # when group_strict_size=false, it shall return results with group counts = limit - res1 = collection_w.search(data=search_vectors, anns_field=dense_types[j], - param=search_params, limit=limit, # consistency_level=CONSISTENCY_STRONG, - group_by_field=ct.default_string_field_name, - group_size=group_size, group_strict_size=False, - output_fields=[ct.default_string_field_name])[0] - for i in range(nq): - group_values = [] - for l in range(len(res1[i])): - group_values.append(res1[i][l].fields.get(ct.default_string_field_name)) - assert len(set(group_values)) == limit - - # hybrid search group by - req_list = [] - for j in range(len(dense_types)): - search_params = { - "data": cf.gen_vectors(nq, dim=dims[j], vector_data_type=dense_types[j]), - "anns_field": dense_types[j], - "param": {"params": cf.get_search_params_params(index_types[j])}, - "limit": limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_params) - req_list.append(req) - # 4. hybrid search group by - import numpy as np - rank_scorers = ["max", "avg", "sum"] - for scorer in rank_scorers: - res = collection_w.hybrid_search(req_list, WeightedRanker(0.3, 0.3, 0.3), limit=limit, - group_by_field=ct.default_string_field_name, - group_size=group_size, rank_group_scorer=scorer, - output_fields=[ct.default_string_field_name])[0] - for i in range(nq): - group_values = [] - for l in range(len(res[i])): - group_values.append(res[i][l].fields.get(ct.default_string_field_name)) - assert len(set(group_values)) == limit - - # group_distances = [] - tmp_distances = [100 for _ in range(group_size)] # init with a large value - group_distances = [res[i][0].distance] # init with the first value - for l in range(len(res[i])-1): - curr_group_value = res[i][l].fields.get(ct.default_string_field_name) - next_group_value = res[i][l+1].fields.get(ct.default_string_field_name) - if curr_group_value == next_group_value: - group_distances.append(res[i][l+1].distance) - else: - if scorer == 'sum': - assert np.sum(group_distances) < np.sum(tmp_distances) - elif scorer == 'avg': - assert np.mean(group_distances) < np.mean(tmp_distances) - else: # default max - assert np.max(group_distances) < np.max(tmp_distances) - - tmp_distances = group_distances - group_distances = [res[i][l+1].distance] - @pytest.mark.tags(CaseLabel.L2) def test_search_max_group_size_and_max_limit(self): """ @@ -10501,70 +10305,6 @@ class TestSearchGroupBy(TestcaseBase): check_task=CheckTasks.err_res, check_items={"err_code": err_code, "err_msg": err_msg}) - @pytest.mark.tags(CaseLabel.L0) - @pytest.mark.parametrize("grpby_field", [ct.default_string_field_name, ct.default_int8_field_name]) - def test_search_group_by_with_field_indexed(self, grpby_field): - """ - target: test search group by with the field indexed - method: 1. create a collection with data - 2. create index for the vector field and the groupby field - 3. search with group by - 4. search with filtering every value of group_by_field - verify: verify that every record in groupby results is the top1 for that value of the group_by_field - """ - metric = "COSINE" - collection_w = self.init_collection_general(prefix, auto_id=True, insert_data=False, is_index=False, - is_all_data_type=True, with_json=False)[0] - _index = {"index_type": "HNSW", "metric_type": metric, "params": {"M": 16, "efConstruction": 128}} - collection_w.create_index(ct.default_float_vec_field_name, index_params=_index) - # insert with the same values(by insert rounds) for scalar fields - for _ in range(50): - data = cf.gen_dataframe_all_data_type(nb=100, auto_id=True, with_json=False) - collection_w.insert(data) - - collection_w.flush() - collection_w.create_index(ct.default_float_vec_field_name, index_params=_index) - collection_w.create_index(grpby_field) - collection_w.load() - - search_params = {"metric_type": metric, "params": {"ef": 128}} - nq = 2 - limit = 20 - search_vectors = cf.gen_vectors(nq, dim=ct.default_dim) - - # verify that every record in groupby results is the top1 for that value of the group_by_field - res1 = collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name, - param=search_params, limit=limit, - group_by_field=grpby_field, - output_fields=[grpby_field])[0] - for i in range(nq): - grpby_values = [] - dismatch = 0 - results_num = 2 if grpby_field == ct.default_bool_field_name else limit - for l in range(results_num): - top1 = res1[i][l] - top1_grpby_pk = top1.id - top1_grpby_value = top1.fields.get(grpby_field) - expr = f"{grpby_field}=={top1_grpby_value}" - if grpby_field == ct.default_string_field_name: - expr = f"{grpby_field}=='{top1_grpby_value}'" - grpby_values.append(top1_grpby_value) - res_tmp = collection_w.search(data=[search_vectors[i]], anns_field=ct.default_float_vec_field_name, - param=search_params, limit=1, - expr=expr, - output_fields=[grpby_field])[0] - top1_expr_pk = res_tmp[0][0].id - log.info(f"nq={i}, limit={l}") - # assert top1_grpby_pk == top1_expr_pk - if top1_grpby_pk != top1_expr_pk: - dismatch += 1 - log.info(f"{grpby_field} on {metric} dismatch_item, top1_grpby_dis: {top1.distance}, top1_expr_dis: {res_tmp[0][0].distance}") - log.info(f"{grpby_field} on {metric} top1_dismatch_num: {dismatch}, results_num: {results_num}, dismatch_rate: {dismatch / results_num}") - baseline = 1 if grpby_field == ct.default_bool_field_name else 0.2 # skip baseline check for boolean - assert dismatch / results_num <= baseline - # verify no dup values of the group_by_field in results - assert len(grpby_values) == len(set(grpby_values)) - @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("grpby_unsupported_field", [ct.default_float_field_name, ct.default_json_field_name, ct.default_double_field_name, ct.default_float_vec_field_name]) @@ -10698,68 +10438,6 @@ class TestSearchGroupBy(TestcaseBase): check_task=CheckTasks.err_res, check_items={"err_code": err_code, "err_msg": err_msg}) - @pytest.mark.tags(CaseLabel.L1) - # @pytest.mark.xfail(reason="issue #30828") - def test_search_pagination_group_by(self): - """ - target: test search pagination with group by - method: 1. create a collection with data - 2. create index HNSW - 3. search with groupby and pagination - 4. search with groupby and limits=pages*page_rounds - verify: search with groupby and pagination returns correct results - """ - # 1. create a collection - metric = "COSINE" - collection_w = self.init_collection_general(prefix, auto_id=True, insert_data=False, is_index=False, - is_all_data_type=True, with_json=False)[0] - # insert with the same values for scalar fields - for _ in range(50): - data = cf.gen_dataframe_all_data_type(nb=100, auto_id=True, with_json=False) - collection_w.insert(data) - - collection_w.flush() - _index = {"index_type": "HNSW", "metric_type": metric, "params": {"M": 16, "efConstruction": 128}} - collection_w.create_index(ct.default_float_vec_field_name, index_params=_index) - collection_w.load() - # 2. search pagination with offset - limit = 10 - page_rounds = 3 - search_param = {"metric_type": metric} - grpby_field = ct.default_string_field_name - search_vectors = cf.gen_vectors(1, dim=ct.default_dim) - all_pages_ids = [] - all_pages_grpby_field_values = [] - for r in range(page_rounds): - page_res = collection_w.search(search_vectors, anns_field=default_search_field, - param=search_param, limit=limit, offset=limit * r, - expr=default_search_exp, group_by_field=grpby_field, - output_fields=["*"], - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, "limit": limit}, - )[0] - for j in range(limit): - all_pages_grpby_field_values.append(page_res[0][j].get(grpby_field)) - all_pages_ids += page_res[0].ids - hit_rate = round(len(set(all_pages_grpby_field_values)) / len(all_pages_grpby_field_values), 3) - assert hit_rate >= 0.8 - - total_res = collection_w.search(search_vectors, anns_field=default_search_field, - param=search_param, limit=limit * page_rounds, - expr=default_search_exp, group_by_field=grpby_field, - output_fields=[grpby_field], - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, "limit": limit * page_rounds} - )[0] - hit_num = len(set(total_res[0].ids).intersection(set(all_pages_ids))) - hit_rate = round(hit_num / (limit * page_rounds), 3) - assert hit_rate >= 0.8 - log.info(f"search pagination with groupby hit_rate: {hit_rate}") - grpby_field_values = [] - for i in range(limit * page_rounds): - grpby_field_values.append(total_res[0][i].fields.get(grpby_field)) - assert len(grpby_field_values) == len(set(grpby_field_values)) - @pytest.mark.tags(CaseLabel.L1) def test_search_iterator_not_support_group_by(self): """ @@ -10834,108 +10512,6 @@ class TestSearchGroupBy(TestcaseBase): check_task=CheckTasks.err_res, check_items={"err_code": err_code, "err_msg": err_msg}) - @pytest.mark.tags(CaseLabel.L1) - def test_hybrid_search_support_group_by(self): - """ - target: verify that hybrid search does not support groupby - method: 1. create a collection with multiple vector fields - 2. create index hnsw and load - 3. hybrid_search with group by - verify: the error code and msg - """ - # 1. initialize collection with data - dim = 128 - supported_index = ["HNSW", "FLAT", "IVF_FLAT", "IVF_SQ8"] - metric = ct.default_L0_metric - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=dim, is_index=False, - enable_dynamic_field=False, - multiple_dim_array=[dim, dim, dim])[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - vector_name_list.append(ct.default_float_vec_field_name) - for i in range(len(vector_name_list)): - index = supported_index[i] - _index_params = {"index_type": index, "metric_type": metric, - "params": cf.get_index_params_params(index)} - collection_w.create_index(vector_name_list[i], _index_params) - collection_w.load() - # 3. prepare search params - req_list = [] - for vector_name in vector_name_list: - search_param = { - "data": [[random.random() for _ in range(dim)] for _ in range(ct.default_nq)], - "anns_field": vector_name, - "param": {"metric_type": metric, "offset": 0}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 4. hybrid search group by - res = collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9, 1, 0.2), default_limit, - group_by_field=ct.default_string_field_name, - output_fields=[ct.default_string_field_name], - check_task=CheckTasks.check_search_results, - check_items={"nq": ct.default_nq, "limit": default_limit})[0] - print(res) - for i in range(ct.default_nq): - group_values = [] - for l in range(ct.default_limit): - group_values.append(res[i][l].fields.get(ct.default_string_field_name)) - assert len(group_values) == len(set(group_values)) - - # 5. hybrid search with RRFRanker on one vector field with group by - req_list = [] - for vector_name in vector_name_list[:1]: - search_param = { - "data": [[random.random() for _ in range(dim)] for _ in range(1)], - "anns_field": vector_name, - "param": {"metric_type": metric, "offset": 0}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - collection_w.hybrid_search(req_list, RRFRanker(), default_limit, - group_by_field=ct.default_string_field_name, - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, "limit": default_limit}) - - @pytest.mark.tags(CaseLabel.L1) - def test_multi_vectors_search_one_vector_group_by(self): - """ - target: test search group by works on a collection with multi vectors - method: 1. create a collection with multiple vector fields - 2. create index hnsw and load - 3. search on the vector with hnsw index with group by - verify: search successfully - """ - dim = 33 - index_type = "HNSW" - metric_type = "COSINE" - _index_params = {"index_type": index_type, "metric_type": metric_type, - "params": {"M": 16, "efConstruction": 128}} - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=dim, is_index=False, - enable_dynamic_field=False, multiple_dim_array=[dim, dim])[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - vector_name_list.append(ct.default_float_vec_field_name) - for vector_name in vector_name_list: - collection_w.create_index(vector_name, _index_params) - collection_w.load() - - nq = 2 - limit = 10 - search_params = {"metric_type": metric_type, "params": {"ef": 32}} - for vector_name in vector_name_list: - search_vectors = cf.gen_vectors(nq, dim=dim) - # verify the results are same if gourp by pk - collection_w.search(data=search_vectors, anns_field=vector_name, - param=search_params, limit=limit, - group_by_field=ct.default_int64_field_name, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, "limit": limit}) - @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("index", ct.all_index_types[9:11]) def test_sparse_vectors_group_by(self, index):