[test]Refine deploy test (#18212)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2022-07-11 17:08:25 +08:00 committed by GitHub
parent 1da970af18
commit dbbe6557ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 893 additions and 23 deletions

View File

@ -122,10 +122,10 @@ class TestcaseBase(Base):
**kwargs)
return partition_wrap
def init_collection_general(self, prefix, insert_data=False, nb=ct.default_nb,
def init_collection_general(self, prefix="test", insert_data=False, nb=ct.default_nb,
partition_num=0, is_binary=False, is_all_data_type=False,
auto_id=False, dim=ct.default_dim, is_index=False,
primary_field=ct.default_int64_field_name, is_flush=True):
primary_field=ct.default_int64_field_name, is_flush=True, name=None, **kwargs):
"""
target: create specified collections
method: 1. create collections (binary/non-binary, default/all data type, auto_id or not)
@ -138,6 +138,8 @@ class TestcaseBase(Base):
log.info("Test case of search interface: initialize before test case")
self._connect()
collection_name = cf.gen_unique_str(prefix)
if name is not None:
collection_name = name
vectors = []
binary_raw_vectors = []
insert_ids = []
@ -149,8 +151,7 @@ class TestcaseBase(Base):
if is_all_data_type:
default_schema = cf.gen_collection_schema_all_datatype(auto_id=auto_id, dim=dim, primary_field=primary_field)
log.info("init_collection_general: collection creation")
collection_w = self.init_collection_wrap(name=collection_name,
schema=default_schema)
collection_w = self.init_collection_wrap(name=collection_name, schema=default_schema, **kwargs)
# 2 add extra partitions if specified (default is 1 partition named "_default")
if partition_num > 0:
cf.gen_partitions(collection_w, partition_num)
@ -161,8 +162,6 @@ class TestcaseBase(Base):
if is_flush:
assert collection_w.is_empty is False
assert collection_w.num_entities == nb
log.info("insert_data: inserted data into collection %s (num_entities: %s)"
% (collection_w.name, nb))
# This condition will be removed after auto index feature
if not is_index:
collection_w.load()

View File

@ -299,7 +299,6 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
@trace()
def get_compaction_state(self, timeout=None, check_task=None, check_items=None, **kwargs):
timeout = TIMEOUT if timeout is None else timeout
func_name = sys._getframe().f_code.co_name
@ -307,7 +306,6 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
@trace()
def get_compaction_plans(self, timeout=None, check_task=None, check_items={}, **kwargs):
timeout = TIMEOUT if timeout is None else timeout
func_name = sys._getframe().f_code.co_name
@ -315,7 +313,6 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
@trace()
def wait_for_compaction_completed(self, timeout=None, **kwargs):
timeout = TIMEOUT * 3 if timeout is None else timeout
res = self.collection.wait_for_compaction_completed(timeout, **kwargs)

View File

@ -62,6 +62,9 @@ class ResponseChecker:
elif self.check_task == CheckTasks.check_query_empty:
result = self.check_query_empty(self.response, self.func_name)
elif self.check_task == CheckTasks.check_query_empty:
result = self.check_query_not_empty(self.response, self.func_name)
elif self.check_task == CheckTasks.check_distance:
# Calculate distance interface that response check
result = self.check_distance(self.response, self.func_name, self.check_items)
@ -282,6 +285,23 @@ class ResponseChecker:
raise Exception("The query result to check isn't list type object")
assert len(query_res) == 0, "Query result is not empty"
@staticmethod
def check_query_not_empty(query_res, func_name):
"""
Verify that the query result is not empty
:param: query_res: A list that contains all results
:type: list
:param func_name: Query API name
:type func_name: str
"""
if func_name != 'query':
log.warning("The function name is {} rather than {}".format(func_name, "query"))
if not isinstance(query_res, list):
raise Exception("The query result to check isn't list type object")
assert len(query_res) > 0
@staticmethod
def check_distance(distance_res, func_name, check_items):
if func_name != 'calc_distance':

View File

@ -112,7 +112,15 @@ def gen_default_collection_schema(description=ct.default_desc, primary_field=ct.
primary_field=primary_field, auto_id=auto_id)
return schema
def gen_general_collection_schema(description=ct.default_desc, primary_field=ct.default_int64_field_name,
auto_id=False, is_binary=False, dim=ct.default_dim):
if is_binary:
fields = [gen_int64_field(), gen_float_field(), gen_string_field(), gen_binary_vec_field(dim=dim)]
else:
fields = [gen_int64_field(), gen_float_field(), gen_string_field(), gen_float_vec_field(dim=dim)]
schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, description=description,
primary_field=primary_field, auto_id=auto_id)
return schema
def gen_string_pk_default_collection_schema(description=ct.default_desc, primary_field=ct.default_string_field_name,
auto_id=False, dim=ct.default_dim):
@ -637,8 +645,7 @@ def insert_data(collection_w, nb=3000, is_binary=False, is_all_data_type=False,
binary_raw_vectors = []
insert_ids = []
start = insert_offset
log.info("insert_data: inserting data into collection %s (num_entities: %s)"
% (collection_w.name, nb))
log.info(f"inserted {nb} data into collection {collection_w.name}")
for i in range(num):
default_data = gen_default_dataframe_data(nb // num, dim=dim, start=start)
if is_binary:

View File

@ -186,6 +186,7 @@ class CheckTasks:
check_search_results = "check_search_results"
check_query_results = "check_query_results"
check_query_empty = "check_query_empty" # verify that query result is empty
check_query_not_empty = "check_query_not_empty"
check_distance = "check_distance"
check_delete_compact = "check_delete_compact"
check_merge_compact = "check_merge_compact"

View File

View File

@ -0,0 +1,10 @@
from base.client_base import TestcaseBase
from utils.util_log import test_log as log
class TestDeployBase(TestcaseBase):
def teardown_method(self, method):
log.info(("*" * 35) + " teardown " + ("*" * 35))
log.info("[teardown_method] Start teardown test case %s..." % method.__name__)
log.info("skip drop collection")

View File

@ -0,0 +1,63 @@
import json
from utils.util_log import test_log as log
all_index_types = ["FLAT", "IVF_FLAT", "IVF_SQ8", "IVF_PQ", "HNSW", "ANNOY", "RHNSW_FLAT", "RHNSW_PQ", "RHNSW_SQ",
"BIN_FLAT", "BIN_IVF_FLAT"]
default_index_params = [{"nlist": 128}, {"nlist": 128}, {"nlist": 128}, {"nlist": 128, "m": 16, "nbits": 8},
{"M": 48, "efConstruction": 500}, {"n_trees": 50}, {"M": 48, "efConstruction": 500},
{"M": 48, "efConstruction": 500, "PQM": 8}, {"M": 48, "efConstruction": 500}, {"nlist": 128},
{"nlist": 128}]
index_params_map = dict(zip(all_index_types, default_index_params))
def gen_index_param(index_type):
metric_type = "L2"
if "BIN" in index_type:
metric_type = "HAMMING"
index_param = {
"index_type": index_type,
"params": index_params_map[index_type],
"metric_type": metric_type
}
return index_param
def gen_search_param(index_type, metric_type="L2"):
search_params = []
if index_type in ["FLAT", "IVF_FLAT", "IVF_SQ8", "IVF_SQ8H", "IVF_PQ"]:
for nprobe in [10]:
ivf_search_params = {"metric_type": metric_type, "params": {"nprobe": nprobe}}
search_params.append(ivf_search_params)
elif index_type in ["BIN_FLAT", "BIN_IVF_FLAT"]:
for nprobe in [10]:
bin_search_params = {"metric_type": "HAMMING", "params": {"nprobe": nprobe}}
search_params.append(bin_search_params)
elif index_type in ["HNSW", "RHNSW_FLAT", "RHNSW_PQ", "RHNSW_SQ"]:
for ef in [64]:
hnsw_search_param = {"metric_type": metric_type, "params": {"ef": ef}}
search_params.append(hnsw_search_param)
elif index_type in ["NSG", "RNSG"]:
for search_length in [100]:
nsg_search_param = {"metric_type": metric_type, "params": {"search_length": search_length}}
search_params.append(nsg_search_param)
elif index_type == "ANNOY":
for search_k in [1000]:
annoy_search_param = {"metric_type": metric_type, "params": {"search_k": search_k}}
search_params.append(annoy_search_param)
else:
print("Invalid index_type.")
raise Exception("Invalid index_type.")
return search_params
def get_all_collections():
try:
with open("/tmp/ci_logs/all_collections.json", "r") as f:
data = json.load(f)
all_collections = data["all"]
except Exception as e:
log.error(f"get_all_collections error: {e}")
return []
return all_collections

View File

@ -0,0 +1,31 @@
import logging
import pytest
import functools
import socket
import common.common_type as ct
import common.common_func as cf
from utils.util_log import test_log as log
from common.common_func import param_info
from check.param_check import ip_check, number_check
from config.log_config import log_config
from utils.util_pymilvus import get_milvus, gen_unique_str, gen_default_fields, gen_binary_default_fields
from pymilvus.orm.types import CONSISTENCY_STRONG
timeout = 60
dimension = 128
delete_timeout = 60
def pytest_addoption(parser):
parser.addoption('--data_size', type='int', action='store', default=3000, help="data size for deploy test")
@pytest.fixture
def data_size(request):
return request.config.getoption("--data_size")
# add a fixture for all index?

View File

@ -0,0 +1,202 @@
import pytest
import random
from common import common_func as cf
from common import common_type as ct
from common.common_type import CaseLabel, CheckTasks
from common.milvus_sys import MilvusSys
from utils.util_pymilvus import *
from deploy.base import TestDeployBase
from deploy import common as dc
from deploy.common import gen_index_param, gen_search_param
default_nb = ct.default_nb
default_nq = ct.default_nq
default_dim = ct.default_dim
default_limit = ct.default_limit
default_search_field = ct.default_float_vec_field_name
default_search_params = ct.default_search_params
default_int64_field_name = ct.default_int64_field_name
default_float_field_name = ct.default_float_field_name
default_bool_field_name = ct.default_bool_field_name
default_string_field_name = ct.default_string_field_name
binary_field_name = default_binary_vec_field_name
default_search_exp = "int64 >= 0"
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
class TestActionBeforeReinstall(TestDeployBase):
""" Test case of action before reinstall """
def teardown_method(self, method):
log.info(("*" * 35) + " teardown " + ("*" * 35))
log.info("[teardown_method] Start teardown test case %s..." %
method.__name__)
log.info("skip drop collection")
@pytest.mark.skip()
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("index_type", dc.all_index_types) # , "BIN_FLAT"
def test_task_1(self, index_type, data_size):
"""
before reinstall: create collection and insert data, load and search
after reinstall: get collection, search, create index, load, and search
"""
name = "task_1_" + index_type
insert_data = False
is_binary = True if "BIN" in index_type else False
is_flush = False
# init collection
collection_w = self.init_collection_general(insert_data=insert_data, is_binary=is_binary, nb=data_size,
is_flush=is_flush, name=name)[0]
if is_binary:
_, vectors_to_search = cf.gen_binary_vectors(
default_nb, default_dim)
default_search_field = ct.default_binary_vec_field_name
else:
vectors_to_search = cf.gen_vectors(default_nb, default_dim)
default_search_field = ct.default_float_vec_field_name
search_params = gen_search_param(index_type)[0]
# search
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
# query
output_fields = [ct.default_int64_field_name]
collection_w.query(default_term_expr, output_fields=output_fields,
check_task=CheckTasks.check_query_not_empty)
# create index
default_index = gen_index_param(index_type)
collection_w.create_index(default_search_field, default_index)
# release and load after creating index
collection_w.release()
collection_w.load()
# search
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
# query
output_fields = [ct.default_int64_field_name]
collection_w.query(default_term_expr, output_fields=output_fields,
check_task=CheckTasks.check_query_not_empty)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("index_type", dc.all_index_types) # , "BIN_FLAT"
def test_task_2(self, index_type, data_size):
"""
before reinstall: create collection, insert data and create index,load and search
after reinstall: get collection, search, insert data, create index, load, and search
"""
name = "task_2_" + index_type
is_binary = True if "BIN" in index_type else False
# init collection
collection_w = self.init_collection_general(insert_data=False, is_binary=is_binary, nb=data_size,
is_flush=False, name=name, active_trace=True)[0]
vectors_to_search = cf.gen_vectors(default_nb, default_dim)
default_search_field = ct.default_float_vec_field_name
if is_binary:
_, vectors_to_search = cf.gen_binary_vectors(
default_nb, default_dim)
default_search_field = ct.default_binary_vec_field_name
search_params = gen_search_param(index_type)[0]
output_fields = [ct.default_int64_field_name]
# search
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
output_fields=output_fields,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
# query
collection_w.query(default_term_expr, output_fields=output_fields,
check_task=CheckTasks.check_query_not_empty)
# insert data
self.init_collection_general(insert_data=True, is_binary=is_binary, nb=data_size,
is_flush=False, name=name, active_trace=True)
# create index
default_index = gen_index_param(index_type)
collection_w.create_index(default_search_field, default_index)
# release and load after
collection_w.release()
collection_w.load()
# search
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
output_fields=output_fields,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
# query
collection_w.query(default_term_expr, output_fields=output_fields,
check_task=CheckTasks.check_query_not_empty)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("replica_number", [0,1,2])
@pytest.mark.parametrize("is_compacted", [True, False])
@pytest.mark.parametrize("is_deleted", [True, False])
@pytest.mark.parametrize("is_string_indexed", [True, False])
@pytest.mark.parametrize("is_vector_indexed", [True, False]) # , "BIN_FLAT"
@pytest.mark.parametrize("segment_status", ["only_growing", "sealed", "all"]) # , "BIN_FLAT"
# @pytest.mark.parametrize("is_empty", [True, False]) # , "BIN_FLAT" (keep one is enough)
@pytest.mark.parametrize("index_type", random.sample(dc.all_index_types, 3)) # , "BIN_FLAT"
def test_task_all(self, index_type, is_compacted,
segment_status, is_vector_indexed, is_string_indexed, replica_number, is_deleted, data_size):
"""
before reinstall: create collection and insert data, load and search
after reinstall: get collection, search, create index, load, and search
"""
name = f"index_type_{index_type}_segment_status_{segment_status}_is_vector_indexed_{is_vector_indexed}_is_string_indexed_{is_string_indexed}_is_compacted_{is_compacted}_is_deleted_{is_deleted}_replica_number_{replica_number}_data_size_{data_size}"
ms = MilvusSys()
is_binary = True if "BIN" in index_type else False
# insert with small size data without flush to get growing segment
collection_w = self.init_collection_general(insert_data=True, is_binary=is_binary, nb=3000,
is_flush=False, name=name)[0]
# load for growing segment
if replica_number > 0:
collection_w.load(replica_number=replica_number)
delete_expr = f"{ct.default_int64_field_name} in [0,1,2,3,4,5,6,7,8,9]"
# delete data for growing segment
if is_deleted:
collection_w.delete(expr=delete_expr)
if segment_status == "only_growing":
pytest.skip("already get growing segment, skip testcase")
# insert with flush multiple times to generate multiple sealed segment
for i in range(5):
self.init_collection_general(insert_data=True, is_binary=is_binary, nb=data_size,
is_flush=False, name=name)[0]
if is_binary:
default_index_field = ct.default_binary_vec_field_name
else:
default_index_field = ct.default_float_vec_field_name
if is_vector_indexed:
# create index
default_index_param = gen_index_param(index_type)
collection_w.create_index(default_index_field, default_index_param)
if is_string_indexed:
# create index
default_string_index_params = {}
collection_w.create_index(default_string_field_name, default_string_index_params)
# delete data for sealed segment
delete_expr = f"{ct.default_int64_field_name} in [10,11,12,13,14,15,16,17,18,19]"
if is_deleted:
collection_w.delete(expr=delete_expr)
if is_compacted:
collection_w.compact()
# reload after flush and create index
if replica_number > 0:
collection_w.release()
collection_w.load(replica_number=replica_number)

View File

@ -0,0 +1,125 @@
import pytest
from common import common_func as cf
from common import common_type as ct
from common.common_type import CaseLabel, CheckTasks
from utils.util_pymilvus import *
from deploy.base import TestDeployBase
from deploy import common as dc
from deploy.common import gen_index_param, gen_search_param
default_nb = ct.default_nb
default_nq = ct.default_nq
default_dim = ct.default_dim
default_limit = ct.default_limit
default_search_field = ct.default_float_vec_field_name
default_search_params = ct.default_search_params
default_int64_field_name = ct.default_int64_field_name
default_float_field_name = ct.default_float_field_name
default_bool_field_name = ct.default_bool_field_name
default_string_field_name = ct.default_string_field_name
binary_field_name = default_binary_vec_field_name
default_search_exp = "int64 >= 0"
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
class TestActionBeforeReinstall(TestDeployBase):
""" Test case of action before reinstall """
def teardown_method(self, method):
log.info(("*" * 35) + " teardown " + ("*" * 35))
log.info("[teardown_method] Start teardown test case %s..." % method.__name__)
log.info("skip drop collection")
@pytest.mark.skip()
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("index_type", dc.all_index_types) #, "BIN_FLAT"
def test_task_1(self, index_type, data_size):
"""
before reinstall: create collection and insert data, load and search
after reinstall: get collection, load, search, create index, load, and search
"""
name = "task_1_" + index_type
insert_data = True
is_binary = True if "BIN" in index_type else False
is_flush = False
collection_w = self.init_collection_general(insert_data=insert_data, is_binary=is_binary, nb=data_size,
is_flush=is_flush, name=name)[0]
collection_w.load()
if is_binary:
_, vectors_to_search = cf.gen_binary_vectors(default_nb, default_dim)
default_search_field = ct.default_binary_vec_field_name
else:
vectors_to_search = cf.gen_vectors(default_nb, default_dim)
default_search_field = ct.default_float_vec_field_name
search_params = gen_search_param(index_type)[0]
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
output_fields = [ct.default_int64_field_name]
collection_w.query(default_term_expr, output_fields=output_fields,
check_task=CheckTasks.check_query_not_empty)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("index_type", dc.all_index_types) # , "BIN_FLAT"
def test_task_2(self, index_type, data_size):
"""
before reinstall: create collection, insert data and create index,load and search
after reinstall: get collection, load, search, insert data, create index, load, and search
"""
name = "task_2_" + index_type
insert_data = True
is_binary = True if "BIN" in index_type else False
is_flush = False
# create collection and insert data
collection_w = self.init_collection_general(insert_data=insert_data, is_binary=is_binary, nb=data_size,
is_flush=is_flush, name=name, active_trace=True)[0]
vectors_to_search = cf.gen_vectors(default_nb, default_dim)
default_search_field = ct.default_float_vec_field_name
if is_binary:
_, vectors_to_search = cf.gen_binary_vectors(default_nb, default_dim)
default_search_field = ct.default_binary_vec_field_name
# create index
default_index = gen_index_param(index_type)
collection_w.create_index(default_search_field, default_index)
# load
collection_w.load()
# search
search_params = gen_search_param(index_type)[0]
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
# query
output_fields = [ct.default_int64_field_name]
collection_w.query(default_term_expr, output_fields=output_fields,
check_task=CheckTasks.check_query_not_empty)

View File

@ -0,0 +1,176 @@
import pytest
from common import common_func as cf
from common import common_type as ct
from common.common_type import CaseLabel, CheckTasks
from common.milvus_sys import MilvusSys
from utils.util_pymilvus import *
from deploy.base import TestDeployBase
from deploy.common import gen_index_param, gen_search_param
from utils.util_log import test_log as log
default_nb = ct.default_nb
default_nq = ct.default_nq
default_dim = ct.default_dim
default_limit = ct.default_limit
default_search_field = ct.default_float_vec_field_name
default_search_params = ct.default_search_params
default_int64_field_name = ct.default_int64_field_name
default_float_field_name = ct.default_float_field_name
default_bool_field_name = ct.default_bool_field_name
default_string_field_name = ct.default_string_field_name
binary_field_name = default_binary_vec_field_name
default_search_exp = "int64 >= 0"
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
prefix = "test_reinstall"
class TestActionBeforeReinstall(TestDeployBase):
""" Test case of action before reinstall """
def teardown_method(self, method):
log.info(("*" * 35) + " teardown " + ("*" * 35))
log.info("[teardown_method] Start teardown test case %s..." %
method.__name__)
log.info("skip drop collection")
@pytest.mark.tags(CaseLabel.L3)
def test_task_all_empty(self):
"""
before reinstall: create collection
"""
name = cf.gen_unique_str(prefix)
self.init_collection_general(insert_data=False, name=name)[0]
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("replica_number", [1])
@pytest.mark.parametrize("is_compacted", ["is_compacted"])
@pytest.mark.parametrize("is_deleted", ["is_deleted"])
@pytest.mark.parametrize("is_string_indexed", ["is_string_indexed", "not_string_indexed"])
@pytest.mark.parametrize("is_vector_indexed", ["is_vector_indexed", "not_vector_indexed"])
@pytest.mark.parametrize("segment_status", ["only_growing", "only_sealed", "all"])
@pytest.mark.parametrize("index_type", ["IVF_FLAT"]) #"IVF_FLAT", "HNSW", "BIN_IVF_FLAT"
def test_task_all(self, index_type, is_compacted,
segment_status, is_vector_indexed, is_string_indexed, replica_number, is_deleted, data_size):
"""
before reinstall: create collection and insert data, load and search
"""
name = ""
for k,v in locals().items():
if k in ["self", "name"]:
continue
name += f"_{k}_{v}"
name = prefix + name
self._connect()
ms = MilvusSys()
if len(ms.query_nodes) < replica_number:
# this step is to make sure this testcase can run on standalone mode
# or cluster mode which has only one querynode
pytest.skip("skip test, not enough nodes")
log.info(f"collection name: {name}, replica_number: {replica_number}, is_compacted: {is_compacted},"
f"is_deleted: {is_deleted}, is_vector_indexed: {is_vector_indexed}, is_string_indexed: {is_string_indexed},"
f"segment_status: {segment_status}, index_type: {index_type}")
is_binary = True if "BIN" in index_type else False
# params for search and query
if is_binary:
_, vectors_to_search = cf.gen_binary_vectors(
default_nb, default_dim)
default_search_field = ct.default_binary_vec_field_name
else:
vectors_to_search = cf.gen_vectors(default_nb, default_dim)
default_search_field = ct.default_float_vec_field_name
search_params = gen_search_param(index_type)[0]
# init collection and insert with small size data without flush to get growing segment
collection_w = self.init_collection_general(insert_data=True, is_binary=is_binary, nb=3000,
is_flush=False, is_index=True, name=name)[0]
# load for growing segment
if replica_number >= 1:
try:
collection_w.release()
except Exception as e:
log.error(
f"release collection failed: {e} maybe the collection is not loaded")
collection_w.load(replica_number=replica_number)
# delete data for growing segment
delete_expr = f"{ct.default_int64_field_name} in [0,1,2,3,4,5,6,7,8,9]"
if is_deleted == "is_deleted":
collection_w.delete(expr=delete_expr)
# search and query for growing segment
if replica_number >= 1:
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
output_fields = [ct.default_int64_field_name]
collection_w.query(default_term_expr, output_fields=output_fields,
check_task=CheckTasks.check_query_not_empty)
# skip subsequent operations when segment_status is set to only_growing
if segment_status == "only_growing":
pytest.skip(
"already get growing segment, skip subsequent operations")
# insert with flush multiple times to generate multiple sealed segment
for i in range(2):
self.init_collection_general(insert_data=True, is_binary=is_binary, nb=data_size,
is_flush=False, is_index=True, name=name)
collection_w.flush()
# params for creating index
if is_binary:
default_index_field = ct.default_binary_vec_field_name
else:
default_index_field = ct.default_float_vec_field_name
# create index for vector
if is_vector_indexed == "is_vector_indexed":
default_index_param = gen_index_param(index_type)
collection_w.create_index(default_index_field, default_index_param)
# create index for string
if is_string_indexed == "is_string_indexed":
default_string_index_params = {}
default_string_index_name = "_default_string_idx"
collection_w.create_index(
default_string_field_name, default_string_index_params, index_name=default_string_index_name)
# delete data for sealed segment
delete_expr = f"{ct.default_int64_field_name} in [10,11,12,13,14,15,16,17,18,19]"
if is_deleted == "is_deleted":
collection_w.delete(expr=delete_expr)
if is_compacted == "is_compacted":
collection_w.compact()
if segment_status == "all":
self.init_collection_general(insert_data=True, is_binary=is_binary, nb=3000,
is_flush=False, is_index=True, name=name)
# reload after flush and creating index
if replica_number > 0:
collection_w.release()
collection_w.load(replica_number=replica_number)
# insert data to get growing segment
if segment_status == "all":
self.init_collection_general(insert_data=True, is_binary=is_binary, nb=3000,
is_flush=False, is_index=True, name=name)
# search and query for sealed and growing segment
if replica_number > 0:
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
output_fields = [ct.default_int64_field_name]
collection_w.query(default_term_expr, output_fields=output_fields,
check_task=CheckTasks.check_query_not_empty)

View File

@ -0,0 +1,198 @@
import pytest
from common import common_func as cf
from common import common_type as ct
from common.common_type import CaseLabel, CheckTasks
from common.milvus_sys import MilvusSys
from utils.util_pymilvus import *
from deploy.base import TestDeployBase
from deploy.common import gen_index_param, gen_search_param, get_all_collections
from utils.util_log import test_log as log
default_nb = ct.default_nb
default_nq = ct.default_nq
default_dim = ct.default_dim
default_limit = ct.default_limit
default_search_field = ct.default_float_vec_field_name
default_search_params = ct.default_search_params
default_int64_field_name = ct.default_int64_field_name
default_float_field_name = ct.default_float_field_name
default_bool_field_name = ct.default_bool_field_name
default_string_field_name = ct.default_string_field_name
binary_field_name = default_binary_vec_field_name
default_search_exp = "int64 >= 0"
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
class TestActionBeforeReinstall(TestDeployBase):
""" Test case of action before reinstall """
@pytest.fixture(scope="function", params=get_all_collections())
def collection_name(self, request):
if request.param == [] or request.param == "":
pytest.skip("The collection name is invalid")
yield request.param
def teardown_method(self, method):
log.info(("*" * 35) + " teardown " + ("*" * 35))
log.info("[teardown_method] Start teardown test case %s..." %
method.__name__)
log.info("skip drop collection")
@pytest.mark.tags(CaseLabel.L3)
def test_check(self, collection_name, data_size):
"""
before reinstall: create collection
"""
self._connect()
ms = MilvusSys()
name = collection_name
collection_w = self.init_collection_general(
insert_data=False, name=name, active_trace=True)[0]
schema = collection_w.schema
data_type = [field.dtype.name for field in schema.fields]
field_name = [field.name for field in schema.fields]
type_field_map = dict(zip(data_type,field_name))
is_binary = False
if "BINARY_VECTOR" in data_type:
is_binary = True
if is_binary:
default_index_field = ct.default_binary_vec_field_name
vector_index_type = "BIN_FLAT"
else:
default_index_field = ct.default_float_vec_field_name
vector_index_type = "IVF_FLAT"
is_vector_indexed = False
is_string_indexed = False
indexed_fields = [index.field_name for index in collection_w.indexes]
binary_vector_index_types = [index.params["index_type"] for index in collection_w.indexes if index.field_name == type_field_map.get("BINARY_VECTOR", "")]
float_vector_index_types = [index.params["index_type"] for index in collection_w.indexes if index.field_name == type_field_map.get("FLOAT_VECTOR", "")]
string_index_types = [index.params["index_type"] for index in collection_w.indexes if index.field_name == type_field_map.get("VARCHAR", "")]
index_names = [index.index_name for index in collection_w.indexes] # used to drop index
vector_index_types = binary_vector_index_types + float_vector_index_types
if len(vector_index_types) > 0:
is_vector_indexed = True
vector_index_type = vector_index_types[0]
if len(string_index_types) > 0:
is_string_indexed = True
try:
replicas, _ = collection_w.get_replicas(enable_traceback=False)
replicas_loaded = len(replicas.groups)
except Exception as e:
log.info("get replicas failed")
replicas_loaded = 0
# params for search and query
if is_binary:
_, vectors_to_search = cf.gen_binary_vectors(
default_nb, default_dim)
default_search_field = ct.default_binary_vec_field_name
else:
vectors_to_search = cf.gen_vectors(default_nb, default_dim)
default_search_field = ct.default_float_vec_field_name
search_params = gen_search_param(vector_index_type)[0]
# load if not loaded
if replicas_loaded == 0:
collection_w.load()
# search and query
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
output_fields=[ct.default_int64_field_name],
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name],
check_task=CheckTasks.check_query_not_empty)
# flush
collection_w.num_entities
# search and query
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
output_fields=[ct.default_int64_field_name],
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name],
check_task=CheckTasks.check_query_not_empty)
# insert data and flush
for i in range(2):
self.init_collection_general(insert_data=True, is_binary=is_binary, nb=data_size,
is_flush=False, is_index=True, name=name)
collection_w.num_entities
# delete data
delete_expr = f"{ct.default_int64_field_name} in [0,1,2,3,4,5,6,7,8,9]"
collection_w.delete(expr=delete_expr)
# search and query
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
output_fields=[ct.default_int64_field_name],
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name],
check_task=CheckTasks.check_query_not_empty)
# drop index if exist
if len(index_names) > 0:
for index_name in index_names:
collection_w.drop_index(index_name=index_name)
# search and query after dropping index
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
output_fields=[ct.default_int64_field_name],
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name],
check_task=CheckTasks.check_query_not_empty)
# create index
default_index_param = gen_index_param(vector_index_type)
collection_w.create_index(default_index_field, default_index_param, index_name=cf.gen_unique_str())
collection_w.create_index(default_string_field_name, {}, index_name=cf.gen_unique_str())
# search and query
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
output_fields=[ct.default_int64_field_name],
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name],
check_task=CheckTasks.check_query_not_empty)
# release and reload with changed replicas
collection_w.release()
replica_number = 1
if replicas_loaded in [0,1] and len(ms.query_nodes)>=2 :
replica_number = 2
collection_w.load(replica_number=replica_number)
# search and query
collection_w.search(vectors_to_search[:default_nq], default_search_field,
search_params, default_limit,
default_search_exp,
output_fields=[ct.default_int64_field_name],
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": default_limit})
collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name],
check_task=CheckTasks.check_query_not_empty)

View File

@ -0,0 +1,37 @@
import time
import json
from collections import defaultdict
import pytest
from base.client_base import TestcaseBase
from common import common_func as cf
from common import common_type as ct
from deploy.common import get_all_collections
from common.common_type import CaseLabel
from utils.util_log import test_log as log
class TestGetCollections(TestcaseBase):
""" Test case of end to end"""
def teardown_method(self, method):
log.info(("*" * 35) + " teardown " + ("*" * 35))
log.info("[teardown_method] Start teardown test case %s..." %
method.__name__)
@pytest.mark.tags(CaseLabel.L1)
def test_get_collections_by_prefix(self,):
self._connect()
all_collections = self.utility_wrap.list_collections()[0]
all_collections = [c_name for c_name in all_collections if "test_reinstall" in c_name or "test_upgrade" in c_name]
log.info(f"find {len(all_collections)} collections:")
log.info(all_collections)
data = {
"all": all_collections
}
with open("/tmp/ci_logs/all_collections.json", "w") as f:
f.write(json.dumps(data))
log.info(f"write {len(all_collections)} collections to /tmp/ci_logs/all_collections.json")
collections_in_json = get_all_collections()
assert len(all_collections) == len(collections_in_json)

View File

@ -46,8 +46,8 @@ entity = gen_entities(1, is_normal=True)
entities = gen_entities(default_nb, is_normal=True)
raw_vectors, binary_entities = gen_binary_entities(default_nb)
default_query, _ = gen_search_vectors_params(field_name, entities, default_top_k, nq)
index_name1=cf.gen_unique_str("float")
index_name2=cf.gen_unique_str("varhar")
index_name1 = cf.gen_unique_str("float")
index_name2 = cf.gen_unique_str("varhar")
class TestCollectionSearchInvalid(TestcaseBase):

View File

@ -1,4 +1,5 @@
import traceback
import copy
import os
from utils.util_log import test_log as log
@ -19,7 +20,10 @@ def api_request_catch():
def wrapper(func):
def inner_wrapper(*args, **kwargs):
try:
res = func(*args, **kwargs)
_kwargs = copy.deepcopy(kwargs)
if "enable_traceback" in _kwargs:
del _kwargs["enable_traceback"]
res = func(*args, **_kwargs)
# if enable_traceback == "True":
if kwargs.get("enable_traceback", True):
res_str = str(res)

View File

@ -3,7 +3,7 @@ from datetime import datetime
import functools
from utils.util_log import test_log as log
DEFAULT_FMT = '[{start_time}][{end_time}][{elapsed:0.8f}s] {collection_name} {func_name} ({arg_str}) -> {result!r}'
DEFAULT_FMT = '[{start_time}] [{elapsed:0.8f}s] {collection_name} {func_name} -> {res!r}'
def trace(fmt=DEFAULT_FMT, prefix='test', flag=True):
@ -15,23 +15,23 @@ def trace(fmt=DEFAULT_FMT, prefix='test', flag=True):
if flag:
start_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
t0 = time.perf_counter()
result = func(*args, **kwargs)
res, result = func(*args, **kwargs)
elapsed = time.perf_counter() - t0
end_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
func_name = func.__name__
collection_name = args[0].collection.name
arg_lst = [repr(arg) for arg in args[1:]][:100]
arg_lst.extend(f'{k}={v!r}' for k, v in kwargs.items())
arg_str = ', '.join(arg_lst)[:200]
# arg_lst = [repr(arg) for arg in args[1:]][:100]
# arg_lst.extend(f'{k}={v!r}' for k, v in kwargs.items())
# arg_str = ', '.join(arg_lst)[:200]
log_str = f"[{prefix}]" + fmt.format(**locals())
# TODO: add report function in this place, like uploading to influxdb
# it is better a async way to do this, in case of blocking the request processing
log.info(log_str)
return result
return res, result
else:
result = func(*args, **kwargs)
return result
res, result = func(*args, **kwargs)
return res, result
return inner_wrapper