mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 19:08:30 +08:00
[test]Update test cases for deploy and chaos test (#20180)
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com> Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
parent
d528aa3d90
commit
e2b1cb6a44
@ -5,14 +5,15 @@ from utils import *
|
||||
def task_1(data_size, host):
|
||||
"""
|
||||
task_1:
|
||||
before reinstall: create collection and insert data, load and search
|
||||
after reinstall: get collection, load, search, create index, load, and search
|
||||
before reinstall: create collection, insert data, create index and insert data, load and search
|
||||
after reinstall: get collection, load, search, release, insert data, create index, load, and search
|
||||
"""
|
||||
prefix = "task_1_"
|
||||
connections.connect(host=host, port=19530, timeout=60)
|
||||
get_collections(prefix)
|
||||
load_and_search(prefix)
|
||||
create_index(prefix)
|
||||
release_collection(prefix)
|
||||
create_collections_and_insert_data(prefix,data_size)
|
||||
load_and_search(prefix)
|
||||
|
||||
|
||||
@ -20,13 +21,14 @@ def task_2(data_zise, host):
|
||||
"""
|
||||
task_2:
|
||||
before reinstall: create collection, insert data and create index, load and search
|
||||
after reinstall: get collection, load, search, insert data, create index, load, and search
|
||||
after reinstall: get collection, load, search, insert data, release, create index, load, and search
|
||||
"""
|
||||
prefix = "task_2_"
|
||||
connections.connect(host=host, port=19530, timeout=60)
|
||||
get_collections(prefix)
|
||||
load_and_search(prefix)
|
||||
create_collections_and_insert_data(prefix, data_zise)
|
||||
release_collection(prefix)
|
||||
create_index(prefix)
|
||||
load_and_search(prefix)
|
||||
|
||||
|
@ -10,7 +10,7 @@ def task_1(data_size, host):
|
||||
"""
|
||||
task_1:
|
||||
before upgrade: create collection and insert data with flush, load and search
|
||||
after upgrade: get collection, load, search, insert data with flush, create index, load, and search
|
||||
after upgrade: get collection, load, search, insert data with flush, release, create index, load, and search
|
||||
"""
|
||||
prefix = "task_1_"
|
||||
connections.connect(host=host, port=19530, timeout=60)
|
||||
@ -19,6 +19,7 @@ def task_1(data_size, host):
|
||||
create_index(prefix)
|
||||
load_and_search(prefix)
|
||||
create_collections_and_insert_data(prefix, data_size)
|
||||
release_collection(prefix)
|
||||
create_index(prefix)
|
||||
load_and_search(prefix)
|
||||
|
||||
@ -27,7 +28,7 @@ def task_2(data_size, host):
|
||||
"""
|
||||
task_2:
|
||||
before upgrade: create collection, insert data and create index, load and search
|
||||
after upgrade: get collection, load, search, insert data, create index, load, and search
|
||||
after upgrade: get collection, load, search, insert data, release, create index, load, and search
|
||||
"""
|
||||
prefix = "task_2_"
|
||||
connections.connect(host=host, port=19530, timeout=60)
|
||||
@ -35,6 +36,7 @@ def task_2(data_size, host):
|
||||
assert len(col_list) == len(all_index_types)
|
||||
load_and_search(prefix)
|
||||
create_collections_and_insert_data(prefix, data_size)
|
||||
release_collection(prefix)
|
||||
create_index(prefix)
|
||||
load_and_search(prefix)
|
||||
|
||||
@ -43,7 +45,7 @@ def task_3(data_size, host):
|
||||
"""
|
||||
task_3:
|
||||
before upgrade: create collection, insert data, flush, create index, load with one replicas and search
|
||||
after upgrade: get collection, load, search, insert data, create index, release, load with multi replicas, and search
|
||||
after upgrade: get collection, load, search, insert data, release, create index, load with multi replicas, and search
|
||||
"""
|
||||
prefix = "task_3_"
|
||||
connections.connect(host=host, port=19530, timeout=60)
|
||||
@ -51,6 +53,7 @@ def task_3(data_size, host):
|
||||
assert len(col_list) == len(all_index_types)
|
||||
load_and_search(prefix)
|
||||
create_collections_and_insert_data(prefix, count=data_size)
|
||||
release_collection(prefix)
|
||||
create_index(prefix)
|
||||
load_and_search(prefix, replicas=NUM_REPLICAS)
|
||||
|
||||
@ -97,6 +100,10 @@ if __name__ == '__main__':
|
||||
print(f"data size: {data_size}")
|
||||
connections.connect(host=host, port=19530, timeout=60)
|
||||
ms = MilvusSys()
|
||||
# create index for flat
|
||||
print("create index for flat start")
|
||||
create_index_flat()
|
||||
print("create index for flat done")
|
||||
task_1(data_size, host)
|
||||
task_2(data_size, host)
|
||||
if len(ms.query_nodes) >= NUM_REPLICAS:
|
||||
|
@ -5,30 +5,30 @@ from utils import *
|
||||
def task_1(data_size, host):
|
||||
"""
|
||||
task_1:
|
||||
before reinstall: create collection create index and insert data, load and search
|
||||
after reinstall: get collection, load, search, create index, load, and search
|
||||
before reinstall: create collection, insert data, create index and insert data, load and search
|
||||
after reinstall: get collection, load, search, release, insert data, create index, load, and search
|
||||
"""
|
||||
prefix = "task_1_"
|
||||
connections.connect(host=host, port=19530, timeout=60)
|
||||
get_collections(prefix)
|
||||
create_collections_and_insert_data(prefix,data_size)
|
||||
create_index(prefix)
|
||||
load_and_search(prefix)
|
||||
create_collections_and_insert_data(prefix,data_size)
|
||||
create_index(prefix)
|
||||
load_and_search(prefix)
|
||||
|
||||
|
||||
def task_2(data_size, host):
|
||||
"""
|
||||
task_2:
|
||||
before reinstall: create collection, create index, insert data and create index,load and search
|
||||
before reinstall: create collection, insert data, create index, insert data, create index,load and search
|
||||
after reinstall: get collection, load, search, insert data, create index, load, and search
|
||||
"""
|
||||
prefix = "task_2_"
|
||||
connections.connect(host=host, port=19530, timeout=60)
|
||||
get_collections(prefix)
|
||||
create_collections_and_insert_data(prefix, data_size)
|
||||
create_index(prefix)
|
||||
load_and_search(prefix)
|
||||
create_collections_and_insert_data(prefix, data_size)
|
||||
create_index(prefix)
|
||||
load_and_search(prefix)
|
||||
|
@ -10,7 +10,7 @@ def task_1(data_size, host):
|
||||
"""
|
||||
task_1:
|
||||
before upgrade: create collection and insert data with flush, load and search
|
||||
after upgrade: get collection, load, search, insert data with flush, create index, load, and search
|
||||
after upgrade: get collection, load, search, insert data with flush, release, create index, load, and search
|
||||
"""
|
||||
prefix = "task_1_"
|
||||
connections.connect(host=host, port=19530, timeout=60)
|
||||
@ -23,7 +23,7 @@ def task_2(data_size, host):
|
||||
"""
|
||||
task_2:
|
||||
before upgrade: create collection, insert data and create index, load , search, and insert data without flush
|
||||
after upgrade: get collection, load, search, insert data, create index, load, and search
|
||||
after upgrade: get collection, load, search, insert data, release, create index, load, and search
|
||||
"""
|
||||
prefix = "task_2_"
|
||||
connections.connect(host=host, port=19530, timeout=60)
|
||||
|
@ -10,7 +10,7 @@ pymilvus_version = pymilvus.__version__
|
||||
|
||||
all_index_types = ["FLAT", "IVF_FLAT", "IVF_SQ8", "IVF_PQ", "HNSW", "ANNOY"]
|
||||
|
||||
default_index_params = [{"nlist": 128}, {"nlist": 128}, {"nlist": 128}, {"nlist": 128, "m": 16, "nbits": 8},
|
||||
default_index_params = [{}, {"nlist": 128}, {"nlist": 128}, {"nlist": 128, "m": 16, "nbits": 8},
|
||||
{"M": 48, "efConstruction": 500}, {"n_trees": 50}]
|
||||
|
||||
index_params_map = dict(zip(all_index_types, default_index_params))
|
||||
@ -116,6 +116,23 @@ def create_collections_and_insert_data(prefix, flush=True, count=3000, collectio
|
||||
print(get_collections(prefix))
|
||||
|
||||
|
||||
def create_index_flat():
|
||||
# create index
|
||||
default_flat_index = {"index_type": "FLAT", "params": {}, "metric_type": "L2"}
|
||||
all_col_list = list_collections()
|
||||
col_list = []
|
||||
for col_name in all_col_list:
|
||||
if "FLAT" in col_name and "task" in col_name and "IVF" not in col_name:
|
||||
col_list.append(col_name)
|
||||
print("\nCreate index for FLAT...")
|
||||
for col_name in col_list:
|
||||
c = Collection(name=col_name)
|
||||
print(c)
|
||||
t0 = time.time()
|
||||
c.create_index(field_name="float_vector", index_params=default_flat_index)
|
||||
print(f"create index time: {time.time() - t0:.4f}")
|
||||
|
||||
|
||||
def create_index(prefix):
|
||||
# create index
|
||||
default_index = {"index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2"}
|
||||
@ -136,6 +153,14 @@ def create_index(prefix):
|
||||
print(f"create index time: {time.time() - t0:.4f}")
|
||||
|
||||
|
||||
def release_collection(prefix):
|
||||
col_list = get_collections(prefix)
|
||||
print("release collection")
|
||||
for col_name in col_list:
|
||||
c = Collection(name=col_name)
|
||||
c.release()
|
||||
|
||||
|
||||
def load_and_search(prefix, replicas=1):
|
||||
print("search data starts")
|
||||
col_list = get_collections(prefix)
|
||||
|
@ -27,6 +27,7 @@ default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
|
||||
|
||||
prefix = "deploy_test"
|
||||
|
||||
TIMEOUT = 60
|
||||
|
||||
class TestActionFirstDeployment(TestDeployBase):
|
||||
""" Test case of action before reinstall """
|
||||
@ -54,7 +55,10 @@ class TestActionFirstDeployment(TestDeployBase):
|
||||
is_binary = True
|
||||
collection_w = self.init_collection_general(insert_data=False, is_binary=is_binary, name=name)[0]
|
||||
if collection_w.has_index():
|
||||
collection_w.drop_index()
|
||||
index_names = [index.index_name for index in collection_w.indexes]
|
||||
for index_name in index_names:
|
||||
collection_w.drop_index(index_name=index_name)
|
||||
|
||||
|
||||
|
||||
|
||||
@ -112,6 +116,12 @@ class TestActionFirstDeployment(TestDeployBase):
|
||||
# create index for vector
|
||||
default_index_param = gen_index_param(index_type)
|
||||
collection_w.create_index(default_index_field, default_index_param)
|
||||
# create index for string
|
||||
if is_string_indexed == "is_string_indexed":
|
||||
default_string_index_params = {}
|
||||
default_string_index_name = "_default_string_idx"
|
||||
collection_w.create_index(
|
||||
default_string_field_name, default_string_index_params, index_name=default_string_index_name)
|
||||
|
||||
# load for growing segment
|
||||
if replica_number >= 1:
|
||||
@ -120,7 +130,7 @@ class TestActionFirstDeployment(TestDeployBase):
|
||||
except Exception as e:
|
||||
log.error(
|
||||
f"release collection failed: {e} maybe the collection is not loaded")
|
||||
collection_w.load(replica_number=replica_number)
|
||||
collection_w.load(replica_number=replica_number, timeout=TIMEOUT)
|
||||
|
||||
# delete data for growing segment
|
||||
delete_expr = f"{ct.default_int64_field_name} in {[i for i in range(0,10)]}"
|
||||
@ -156,12 +166,6 @@ class TestActionFirstDeployment(TestDeployBase):
|
||||
delete_expr = f"{ct.default_int64_field_name} in {[i for i in range(10,20)]}"
|
||||
if is_deleted == "is_deleted":
|
||||
collection_w.delete(expr=delete_expr)
|
||||
# create index for string
|
||||
if is_string_indexed == "is_string_indexed":
|
||||
default_string_index_params = {}
|
||||
default_string_index_name = "_default_string_idx"
|
||||
collection_w.create_index(
|
||||
default_string_field_name, default_string_index_params, index_name=default_string_index_name)
|
||||
|
||||
# delete data for sealed segment and after index
|
||||
delete_expr = f"{ct.default_int64_field_name} in {[i for i in range(20,30)]}"
|
||||
@ -176,7 +180,7 @@ class TestActionFirstDeployment(TestDeployBase):
|
||||
# reload after flush and creating index
|
||||
if replica_number > 0:
|
||||
collection_w.release()
|
||||
collection_w.load(replica_number=replica_number)
|
||||
collection_w.load(replica_number=replica_number, timeout=TIMEOUT)
|
||||
|
||||
# insert data to get growing segment after reload
|
||||
if segment_status == "all":
|
||||
|
@ -39,9 +39,27 @@ class TestActionSecondDeployment(TestDeployBase):
|
||||
log.info(("*" * 35) + " teardown " + ("*" * 35))
|
||||
log.info("[teardown_method] Start teardown test case %s..." %
|
||||
method.__name__)
|
||||
log.info("show collection info")
|
||||
log.info(f"collection {self.collection_w.name} has entities: {self.collection_w.num_entities}")
|
||||
try:
|
||||
replicas = self.collection_w.get_replicas(enable_traceback=False)
|
||||
replicas_loaded = len(replicas.groups)
|
||||
except Exception as e:
|
||||
log.info("get replicas failed with error {str(e)}")
|
||||
replicas_loaded = 0
|
||||
log.info(f"collection {self.collection_w.name} has {replicas_loaded} replicas")
|
||||
index_infos = [index.to_dict() for index in self.collection_w.indexes]
|
||||
log.info(f"collection {self.collection_w.name} index infos {index_infos}")
|
||||
log.info("skip drop collection")
|
||||
|
||||
def create_index(self, collection_w, default_index_field, default_index_param):
|
||||
try:
|
||||
replicas = collection_w.get_replicas(enable_traceback=False)
|
||||
replicas_loaded = len(replicas.groups)
|
||||
except Exception as e:
|
||||
log.info("get replicas failed")
|
||||
replicas_loaded = 0
|
||||
log.info(f"before create index, collection {collection_w.name} has {replicas_loaded} replicas")
|
||||
index_field_map = dict([(index.field_name, index.index_name) for index in collection_w.indexes])
|
||||
index_infos = [index.to_dict() for index in collection_w.indexes]
|
||||
log.info(index_infos)
|
||||
@ -68,6 +86,7 @@ class TestActionSecondDeployment(TestDeployBase):
|
||||
if "BIN" in name:
|
||||
is_binary = True
|
||||
collection_w, _ = self.collection_wrap.init_collection(name=name)
|
||||
self.collection_w = collection_w
|
||||
schema = collection_w.schema
|
||||
data_type = [field.dtype for field in schema.fields]
|
||||
field_name = [field.name for field in schema.fields]
|
||||
@ -88,13 +107,14 @@ class TestActionSecondDeployment(TestDeployBase):
|
||||
vector_index_types = binary_vector_index_types + float_vector_index_types
|
||||
if len(vector_index_types) > 0:
|
||||
vector_index_type = vector_index_types[0]
|
||||
|
||||
# get replicas loaded
|
||||
try:
|
||||
replicas, _ = collection_w.get_replicas(enable_traceback=False)
|
||||
replicas = collection_w.get_replicas(enable_traceback=False)
|
||||
replicas_loaded = len(replicas.groups)
|
||||
except Exception as e:
|
||||
log.info("get replicas failed")
|
||||
log.info(f"get replicas failed with error {str(e)}")
|
||||
replicas_loaded = 0
|
||||
log.info(f"collection {name} has {replicas_loaded} replicas")
|
||||
# params for search and query
|
||||
if is_binary:
|
||||
_, vectors_to_search = cf.gen_binary_vectors(
|
||||
@ -107,8 +127,16 @@ class TestActionSecondDeployment(TestDeployBase):
|
||||
|
||||
# load if not loaded
|
||||
if replicas_loaded == 0:
|
||||
default_index_param = gen_index_param(vector_index_type)
|
||||
self.create_index(collection_w, default_index_field, default_index_param)
|
||||
# create index for vector if not exist before load
|
||||
is_vector_indexed = False
|
||||
index_infos = [index.to_dict() for index in collection_w.indexes]
|
||||
for index_info in index_infos:
|
||||
if "metric_type" in index_info.keys():
|
||||
is_vector_indexed = True
|
||||
break
|
||||
if is_vector_indexed is False:
|
||||
default_index_param = gen_index_param(vector_index_type)
|
||||
self.create_index(collection_w, default_index_field, default_index_param)
|
||||
collection_w.load()
|
||||
|
||||
# search and query
|
||||
@ -200,10 +228,6 @@ class TestActionSecondDeployment(TestDeployBase):
|
||||
collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name],
|
||||
check_task=CheckTasks.check_query_not_empty)
|
||||
|
||||
# create index
|
||||
default_index_param = gen_index_param(vector_index_type)
|
||||
self.create_index(collection_w, default_index_field, default_index_param)
|
||||
|
||||
# search and query
|
||||
collection_w.search(vectors_to_search[:default_nq], default_search_field,
|
||||
search_params, default_limit,
|
||||
|
Loading…
Reference in New Issue
Block a user