test: Add group size tests (#36240)

related issue: #36146

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>
This commit is contained in:
yanliang567 2024-09-13 16:13:08 +08:00 committed by GitHub
parent c22a2cebb6
commit 2e434b2358
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 167 additions and 60 deletions

View File

@ -228,7 +228,7 @@ def equal_entities_list(exp, actual, primary_field, with_vec=False):
def output_field_value_check(search_res, original):
"""
check if the value of output fields is correct
check if the value of output fields is correct, it only works on auto_id = False
:param search_res: the search result of specific output fields
:param original: the data in the collection
:return: True or False
@ -241,6 +241,10 @@ def output_field_value_check(search_res, original):
if isinstance(entity[field], list):
for order in range(0, len(entity[field]), 4):
assert abs(original[field][_id][order] - entity[field][order]) < ct.epsilon
elif isinstance(entity[field], dict) and field != ct.default_json_field_name:
# sparse checking, sparse vector must be the last, this is a bit hacky,
# but sparse only supports list data type insertion for now
assert entity[field].keys() == original[-1][_id].keys()
else:
num = original[original[ct.default_int64_field_name] == _id].index.to_list()[0]
assert original[field][num] == entity[field]

View File

@ -2,8 +2,7 @@ import ujson
import json
from pymilvus.grpc_gen import milvus_pb2 as milvus_types
from pymilvus import connections
from utils.util_log import test_log as log
from utils.util_log import test_log as log
# from utils.util_log import test_log as log
sys_info_req = ujson.dumps({"metric_type": "system_info"})
sys_statistics_req = ujson.dumps({"metric_type": "system_statistics"})
sys_logs_req = ujson.dumps({"metric_type": "system_logs"})
@ -24,7 +23,7 @@ class MilvusSys:
# req = milvus_types.GetMetricsRequest(request=sys_logs_req)
# self.sys_logs = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None)
self.sys_info = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=60)
log.debug(f"sys_info: {self.sys_info}")
# log.debug(f"sys_info: {self.sys_info}")
def refresh(self):
req = milvus_types.GetMetricsRequest(request=sys_info_req)
@ -33,13 +32,18 @@ class MilvusSys:
# self.sys_statistics = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None)
# req = milvus_types.GetMetricsRequest(request=sys_logs_req)
# self.sys_logs = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None)
log.debug(f"sys info response: {self.sys_info.response}")
# log.debug(f"sys info response: {self.sys_info.response}")
@property
def system_version(self):
"""get the first node's build version as milvus build version"""
return self.nodes[0].get('infos').get('system_info').get('system_version')
@property
def build_version(self):
"""get the first node's build version as milvus build version"""
return self.nodes[0].get('infos').get('system_info').get('system_version')
return self.nodes[0].get('infos').get('system_info').get('build_version')
@property
def build_time(self):
@ -116,5 +120,8 @@ class MilvusSys:
if __name__ == '__main__':
connections.connect(host="10.96.250.111", port="19530")
uri = ""
token = ""
connections.connect(uri=uri, token=token)
ms = MilvusSys()
print(ms.build_version)

View File

@ -12,8 +12,8 @@ allure-pytest==2.7.0
pytest-print==0.2.1
pytest-level==0.1.1
pytest-xdist==2.5.0
pymilvus==2.5.0rc78
pymilvus[bulk_writer]==2.5.0rc78
pymilvus==2.5.0rc79
pymilvus[bulk_writer]==2.5.0rc79
pytest-rerunfailures==9.1.1
git+https://github.com/Projectplace/pytest-tags
ndg-httpsclient

View File

@ -1494,7 +1494,6 @@ class TestCollectionSearch(TestcaseBase):
expected: Search without errors and data consistency
"""
# 1. initialize collection with random primary key
collection_w, _vectors, _, insert_ids, time_stamp = \
self.init_collection_general(
prefix, True, 10, random_primary_key=random_primary_key)[0:5]
@ -3582,9 +3581,8 @@ class TestCollectionSearch(TestcaseBase):
# 1. initialize with data
auto_id = True
enable_dynamic_field = False
collection_w, _, _, insert_ids = self.init_collection_general(prefix, True,
auto_id=auto_id,
enable_dynamic_field=enable_dynamic_field)[0:4]
collection_w, _, _, insert_ids = \
self.init_collection_general(prefix, True, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)[0:4]
# 2. search
log.info("test_search_with_output_field: Searching collection %s" % collection_w.name)
collection_w.search(vectors[:default_nq], default_search_field,
@ -3592,11 +3590,9 @@ class TestCollectionSearch(TestcaseBase):
default_search_exp, _async=_async,
output_fields=[field_name],
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"ids": insert_ids,
"limit": default_limit,
"_async": _async,
"output_fields": [field_name]})
check_items={"nq": default_nq, "ids": insert_ids,
"limit": default_limit, "_async": _async,
"output_fields": [field_name]})[0]
@pytest.mark.tags(CaseLabel.L2)
def test_search_with_output_fields(self, _async):
@ -10314,6 +10310,90 @@ class TestSearchGroupBy(TestcaseBase):
# verify no dup values of the group_by_field in results
assert len(grpby_values) == len(set(grpby_values))
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.parametrize("index_type, metric", zip(["FLAT", "IVF_FLAT", "HNSW"], ct.float_metrics))
@pytest.mark.parametrize("vector_data_type", ["FLOAT_VECTOR", "FLOAT16_VECTOR", "BFLOAT16_VECTOR"])
@pytest.mark.parametrize("group_strict_size", [True, False])
def test_search_group_size_default(self, index_type, metric, vector_data_type, group_strict_size):
"""
target: test search group by
method: 1. create a collection with data
2. search with group by int32 with group size
verify results entity = limit * group_size and group size is full if group_strict_size is True
verfiy results group counts = limit if group_strict_size is False
"""
collection_w = self.init_collection_general(prefix, auto_id=True, insert_data=False, is_index=False,
vector_data_type=vector_data_type,
is_all_data_type=True, with_json=False)[0]
_index_params = {"index_type": index_type, "metric_type": metric, "params": {"M": 16, "efConstruction": 128}}
if index_type in ["IVF_FLAT", "FLAT"]:
_index_params = {"index_type": index_type, "metric_type": metric, "params": {"nlist": 128}}
collection_w.create_index(ct.default_float_vec_field_name, index_params=_index_params)
# insert with the same values for scalar fields
for _ in range(500):
data = cf.gen_dataframe_all_data_type(nb=100, auto_id=True, with_json=False)
collection_w.insert(data)
collection_w.flush()
collection_w.create_index(ct.default_float_vec_field_name, index_params=_index_params)
collection_w.load()
search_params = {"metric_type": metric, "params": {"ef": 128}}
nq = 2
limit = 100
group_size = 10
search_vectors = cf.gen_vectors(nq, dim=ct.default_dim)
# verify
res1 = collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name,
param=search_params, limit=limit, consistency_level=CONSISTENCY_STRONG,
group_by_field=ct.default_int32_field_name, group_size=group_size,
group_strict_size=group_strict_size,
output_fields=[ct.default_int32_field_name]
)[0]
# print(res1)
if group_strict_size is True: # when true, it shall return results with entities = limit * group_size
for i in range(nq):
for l in range(limit):
group_values = []
for k in range(10):
group_values.append(res1[i][l].fields.get(ct.default_int32_field_name))
assert len(set(group_values)) == 1
assert len(res1[i]) == limit * group_size
else: # when False, it shall return results with group counts = limit
for i in range(nq):
group_values = []
for l in range(len(res1[i])):
group_values.append(res1[i][l].fields.get(ct.default_int32_field_name))
assert len(set(group_values)) == limit
@pytest.mark.tags(CaseLabel.L2)
def test_search_max_group_size_and_max_limit(self):
"""
target: test search group by with max group size and max limit
method: 1. create a collection with data
2. search with group by int32 with max group size and max limit
"""
pass
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("group_size", [0, -1])
@pytest.mark.xfail(reason="issue #36146")
def test_search_negative_group_size(self, group_size):
"""
target: test search group by with negative group size
"""
collection_w = self.init_collection_general(prefix, auto_id=True, insert_data=True, is_index=True)[0]
search_params = ct.default_search_params
search_vectors = cf.gen_vectors(1, dim=ct.default_dim)
# verify
error = {ct.err_code: 999, ct.err_msg: "group_size must be greater than 1"}
collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name,
param=search_params, limit=10,
group_by_field=ct.default_int64_field_name,
group_size=group_size,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("metric", ["JACCARD", "HAMMING"])
def test_search_binary_vec_group_by(self, metric):
@ -10342,7 +10422,7 @@ class TestSearchGroupBy(TestcaseBase):
limit = 10
search_vectors = cf.gen_binary_vectors(nq, dim=ct.default_dim)[1]
# verify the results are same if gourp by pk
# verify the results are same if group by pk
err_code = 999
err_msg = "not support search_group_by operation based on binary"
collection_w.search(data=search_vectors, anns_field=ct.default_binary_vec_field_name,
@ -10793,7 +10873,7 @@ class TestSearchGroupBy(TestcaseBase):
self._connect()
c_name = cf.gen_unique_str(prefix)
schema = cf.gen_default_sparse_schema()
collection_w, _ = self.collection_wrap.init_collection(c_name, schema=schema)
collection_w = self.init_collection_wrap(c_name, schema=schema)
nb = 5000
data = cf.gen_default_list_sparse_data(nb=nb)
# update float fields
@ -10809,15 +10889,14 @@ class TestSearchGroupBy(TestcaseBase):
nq = 2
limit = 20
search_params = ct.default_sparse_search_params
search_vectors = cf.gen_default_list_sparse_data(nb=nq)[-1][-2:]
# verify the results are same if gourp by pk
search_vectors = cf.gen_default_list_sparse_data(nb=nq)[-1][0:nq]
# verify the result if gourp by
res = collection_w.search(data=search_vectors, anns_field=ct.default_sparse_vec_field_name,
param=search_params, limit=limit,
group_by_field="varchar",
output_fields=["varchar"],
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": limit})
param=search_params, limit=limit,
group_by_field=ct.default_string_field_name,
output_fields=[ct.default_string_field_name],
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": limit})[0]
hit = res[0]
set_varchar = set()
@ -12772,25 +12851,31 @@ class TestSparseSearch(TestcaseBase):
self._connect()
c_name = cf.gen_unique_str(prefix)
schema = cf.gen_default_sparse_schema(auto_id=False)
collection_w, _ = self.collection_wrap.init_collection(c_name, schema=schema)
data = cf.gen_default_list_sparse_data(nb=4000)
collection_w = self.init_collection_wrap(c_name, schema=schema)
data = cf.gen_default_list_sparse_data(nb=3000)
collection_w.insert(data)
params = cf.get_index_params_params(index)
index_params = {"index_type": index, "metric_type": "IP", "params": params}
collection_w.create_index(ct.default_sparse_vec_field_name, index_params, index_name=index)
collection_w.load()
collection_w.search(data[-1][-1:], ct.default_sparse_vec_field_name,
collection_w.search(data[-1][0:default_nq], ct.default_sparse_vec_field_name,
ct.default_sparse_search_params, default_limit,
output_fields=[ct.default_sparse_vec_field_name],
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
"limit": default_limit,
"original_entities": [data],
"output_fields": [ct.default_sparse_vec_field_name]})
expr = "int64 < 100 "
collection_w.search(data[-1][-1:], ct.default_sparse_vec_field_name,
collection_w.search(data[-1][0:default_nq], ct.default_sparse_vec_field_name,
ct.default_sparse_search_params, default_limit,
expr,
expr=expr, output_fields=[ct.default_sparse_vec_field_name],
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq})
check_items={"nq": default_nq,
"limit": default_limit,
"original_entities": [data],
"output_fields": [ct.default_sparse_vec_field_name]})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("index", ct.all_index_types[9:11])
@ -12804,7 +12889,7 @@ class TestSparseSearch(TestcaseBase):
self._connect()
c_name = cf.gen_unique_str(prefix)
schema = cf.gen_default_sparse_schema(auto_id=False)
collection_w, _ = self.collection_wrap.init_collection(c_name, schema=schema)
collection_w = self.init_collection_wrap(c_name, schema=schema)
data = cf.gen_default_list_sparse_data(dim=dim)
collection_w.insert(data)
params = cf.get_index_params_params(index)
@ -12812,14 +12897,13 @@ class TestSparseSearch(TestcaseBase):
collection_w.create_index(ct.default_sparse_vec_field_name, index_params, index_name=index)
collection_w.load()
collection_w.search(data[-1][-1:], ct.default_sparse_vec_field_name,
ct.default_sparse_search_params, default_limit,
collection_w.search(data[-1][0:default_nq], ct.default_sparse_vec_field_name,
ct.default_sparse_search_params, limit=1,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
"limit": 1})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="issue #31485")
@pytest.mark.parametrize("index", ct.all_index_types[9:11])
def test_sparse_index_enable_mmap_search(self, index):
"""
@ -12830,9 +12914,10 @@ class TestSparseSearch(TestcaseBase):
self._connect()
c_name = cf.gen_unique_str(prefix)
schema = cf.gen_default_sparse_schema(auto_id=False)
collection_w, _ = self.collection_wrap.init_collection(c_name, schema=schema)
collection_w = self.init_collection_wrap(c_name, schema=schema)
data = cf.gen_default_list_sparse_data()
first_nb = 3000
data = cf.gen_default_list_sparse_data(nb=first_nb, start=0)
collection_w.insert(data)
params = cf.get_index_params_params(index)
@ -12840,18 +12925,28 @@ class TestSparseSearch(TestcaseBase):
collection_w.create_index(ct.default_sparse_vec_field_name, index_params, index_name=index)
collection_w.set_properties({'mmap.enabled': True})
pro = collection_w.describe().get("properties")
pro = collection_w.describe()[0].get("properties")
assert pro["mmap.enabled"] == 'True'
collection_w.alter_index(index, {'mmap.enabled': True})
assert collection_w.index().params["mmap.enabled"] == 'True'
assert collection_w.index()[0].params["mmap.enabled"] == 'True'
data2 = cf.gen_default_list_sparse_data(nb=2000, start=first_nb) # id shall be continuous
all_data = [] # combine 2 insert datas for next checking
for i in range(len(data2)):
all_data.append(data[i] + data2[i])
collection_w.insert(data2)
collection_w.flush()
collection_w.load()
collection_w.search(data[-1][-1:], ct.default_sparse_vec_field_name,
collection_w.search(data[-1][0:default_nq], ct.default_sparse_vec_field_name,
ct.default_sparse_search_params, default_limit,
output_fields=[ct.default_sparse_vec_field_name],
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
term_expr = f'{ct.default_int64_field_name} in [0, 1, 10, 100]'
res = collection_w.query(term_expr)
"limit": default_limit,
"original_entities": [all_data],
"output_fields": [ct.default_sparse_vec_field_name]})
expr_id_list = [0, 1, 10, 100]
term_expr = f'{ct.default_int64_field_name} in {expr_id_list}'
res = collection_w.query(term_expr)[0]
assert len(res) == 4
@pytest.mark.tags(CaseLabel.L1)
@ -12866,15 +12961,15 @@ class TestSparseSearch(TestcaseBase):
self._connect()
c_name = cf.gen_unique_str(prefix)
schema = cf.gen_default_sparse_schema(auto_id=False)
collection_w, _ = self.collection_wrap.init_collection(c_name, schema=schema)
collection_w = self.init_collection_wrap(c_name, schema=schema)
data = cf.gen_default_list_sparse_data(nb=4000)
collection_w.insert(data)
params = {"index_type": index, "metric_type": "IP", "params": {"drop_ratio_build": ratio}}
collection_w.create_index(ct.default_sparse_vec_field_name, params, index_name=index)
collection_w.load()
assert collection_w.has_index(index_name=index) == True
assert collection_w.has_index(index_name=index)[0] is True
search_params = {"metric_type": "IP", "params": {"drop_ratio_search": ratio}}
collection_w.search(data[-1][-1:], ct.default_sparse_vec_field_name,
collection_w.search(data[-1][0:default_nq], ct.default_sparse_vec_field_name,
search_params, default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
@ -12891,7 +12986,7 @@ class TestSparseSearch(TestcaseBase):
self._connect()
c_name = cf.gen_unique_str(prefix)
schema = cf.gen_default_sparse_schema()
collection_w, _ = self.collection_wrap.init_collection(c_name, schema=schema)
collection_w = self.init_collection_wrap(c_name, schema=schema)
data = cf.gen_default_list_sparse_data(nb=4000)
collection_w.insert(data)
params = cf.get_index_params_params(index)
@ -12899,9 +12994,9 @@ class TestSparseSearch(TestcaseBase):
collection_w.create_index(ct.default_sparse_vec_field_name, index_params, index_name=index)
collection_w.load()
d = cf.gen_default_list_sparse_data(nb=1)
collection_w.search(d[-1][-1:], ct.default_sparse_vec_field_name,
ct.default_sparse_search_params, 5,
d = cf.gen_default_list_sparse_data(nb=10)
collection_w.search(d[-1][0:default_nq], ct.default_sparse_vec_field_name,
ct.default_sparse_search_params, default_limit,
output_fields=["float", "sparse_vector"],
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
@ -12911,6 +13006,7 @@ class TestSparseSearch(TestcaseBase):
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("index", ct.all_index_types[9:11])
@pytest.mark.xfail(reason="issue #36174")
def test_sparse_vector_search_iterator(self, index):
"""
target: create sparse vectors and search iterator
@ -12920,7 +13016,7 @@ class TestSparseSearch(TestcaseBase):
self._connect()
c_name = cf.gen_unique_str(prefix)
schema = cf.gen_default_sparse_schema()
collection_w, _ = self.collection_wrap.init_collection(c_name, schema=schema)
collection_w = self.init_collection_wrap(c_name, schema=schema)
data = cf.gen_default_list_sparse_data(nb=4000)
collection_w.insert(data)
params = cf.get_index_params_params(index)
@ -12928,9 +13024,9 @@ class TestSparseSearch(TestcaseBase):
collection_w.create_index(ct.default_sparse_vec_field_name, index_params, index_name=index)
collection_w.load()
batch_size = 10
collection_w.search_iterator(data[-1][-1:], ct.default_sparse_vec_field_name,
ct.default_sparse_search_params, batch_size,
batch_size = 100
collection_w.search_iterator(data[-1][0:1], ct.default_sparse_vec_field_name,
ct.default_sparse_search_params, limit=500, batch_size=batch_size,
check_task=CheckTasks.check_search_iterator,
check_items={"batch_size": batch_size})