[skip e2e]Add tasks to check compatibility (#16710)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2022-04-29 09:17:48 +08:00 committed by GitHub
parent 222112d98a
commit 61f50122ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 125 additions and 18 deletions

View File

@ -1,12 +1,16 @@
from pymilvus import connections
import sys
sys.path.append("..")
sys.path.append("../..")
from common.milvus_sys import MilvusSys
from utils import *
def task_1(data_size, host):
"""
task_1:
before upgrade: create collection and insert data, load and search
after upgrade: get collection, load, search, create index, load, and search
before upgrade: create collection and insert data with flush, load and search
after upgrade: get collection, load, search, insert data with flush, create index, load, and search
"""
prefix = "task_1_"
connections.connect(host=host, port=19530, timeout=60)
@ -32,6 +36,49 @@ def task_2(data_size, host):
load_and_search(prefix)
def task_3(data_size, host):
"""
task_3:
before upgrade: create collection, insert data, flush, create index, load with one replicas and search
after upgrade: get collection, load, search, insert data, create index, release, load with multi replicas, and search
"""
prefix = "task_3_"
connections.connect(host=host, port=19530, timeout=60)
get_collections(prefix)
load_and_search(prefix)
create_collections_and_insert_data(prefix, count=data_size)
create_index(prefix)
load_and_search(prefix, replicas=NUM_REPLICAS)
def task_4(data_size, host):
"""
task_4:
before upgrade: create collection, load, insert data without flush
after upgrade: get collection, load with multi replicas, search, insert data with flush, load with multi replicas and search
"""
prefix = "task_4_"
connections.connect(host=host, port=19530, timeout=60)
get_collections(prefix)
load_and_search(prefix, replicas=NUM_REPLICAS)
create_collections_and_insert_data(prefix, flush=False, count=data_size)
load_and_search(prefix, replicas=NUM_REPLICAS)
def task_5(data_size, host):
"""
task_5_:
before upgrade: create collection and insert data without flush
after upgrade: get collection, load with multi replicas, search, insert data with flush, load with multi replicas and search
"""
prefix = "task_5_"
connections.connect(host=host, port=19530, timeout=60)
get_collections(prefix)
load_and_search(prefix, replicas=NUM_REPLICAS)
create_collections_and_insert_data(prefix, flush=True, count=data_size)
load_and_search(prefix, replicas=NUM_REPLICAS)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='config for deploy test')
@ -41,5 +88,11 @@ if __name__ == '__main__':
data_size = args.data_size
host = args.host
print(f"data size: {data_size}")
connections.connect(host=host, port=19530, timeout=60)
ms = MilvusSys()
task_1(data_size, host)
task_2(data_size, host)
task_2(data_size, host)
if len(ms.query_nodes) >= NUM_REPLICAS:
task_3(data_size, host)
task_4(data_size, host)
task_5(data_size, host)

View File

@ -1,17 +1,20 @@
from pymilvus import connections
import sys
sys.path.append("..")
sys.path.append("../..")
from common.milvus_sys import MilvusSys
from utils import *
def task_1(data_size, host):
"""
task_1:
before upgrade: create collection and insert data, load and search
after upgrade: get collection, load, search, create index, load, and search
before upgrade: create collection and insert data with flush, load and search
after upgrade: get collection, load, search, insert data with flush, create index, load, and search
"""
prefix = "task_1_"
connections.connect(host=host, port=19530, timeout=60)
get_collections(prefix)
load_and_search(prefix)
create_collections_and_insert_data(prefix, data_size)
load_and_search(prefix)
@ -19,16 +22,52 @@ def task_1(data_size, host):
def task_2(data_size, host):
"""
task_2:
before upgrade: create collection, insert data and create index, load and search
before upgrade: create collection, insert data and create index, load , search, and insert data without flush
after upgrade: get collection, load, search, insert data, create index, load, and search
"""
prefix = "task_2_"
connections.connect(host=host, port=19530, timeout=60)
get_collections(prefix)
load_and_search(prefix)
create_collections_and_insert_data(prefix, data_size)
create_index(prefix)
load_and_search(prefix)
create_collections_and_insert_data(prefix, flush=False, count=data_size)
def task_3(data_size, host):
"""
task_3:
before upgrade: create collection, insert data, flush, create index, load with one replicas and search
after upgrade: get collection, load, search, insert data, create index, release, load with multi replicas, and search
"""
prefix = "task_3_"
connections.connect(host=host, port=19530, timeout=60)
get_collections(prefix)
create_collections_and_insert_data(prefix, data_size)
create_index(prefix)
load_and_search(prefix)
def task_4(data_size, host):
"""
task_4_:
before upgrade: create collection, insert data, flush, and create index
after upgrade: get collection, load with multi replicas, search, insert data, load with multi replicas and search
"""
prefix = "task_4_"
connections.connect(host=host, port=19530, timeout=60)
get_collections(prefix)
create_collections_and_insert_data(prefix, flush=True, count=data_size)
create_index(prefix)
def task_5(data_size, host):
"""
task_5_:
before upgrade: create collection and insert data without flush
after upgrade: get collection, load with multi replicas, search, insert data with flush, load with multi replicas and search
"""
prefix = "task_5_"
connections.connect(host=host, port=19530, timeout=60)
get_collections(prefix)
create_collections_and_insert_data(prefix, flush=False, count=data_size)
if __name__ == '__main__':
@ -40,5 +79,11 @@ if __name__ == '__main__':
data_size = args.data_size
host = args.host
print(f"data size: {data_size}")
connections.connect(host=host, port=19530, timeout=60)
ms = MilvusSys()
task_1(data_size, host)
task_2(data_size, host)
if len(ms.query_nodes) >= NUM_REPLICAS:
task_3(data_size, host)
task_4(data_size, host)
task_5(data_size, host)

View File

@ -15,6 +15,7 @@ default_index_params = [{"nlist": 128}, {"nlist": 128}, {"nlist": 128}, {"nlist"
index_params_map = dict(zip(all_index_types, default_index_params))
NUM_REPLICAS = 2
def filter_collections_by_prefix(prefix):
col_list = list_collections()
@ -64,7 +65,7 @@ def get_collections(prefix):
return col_list
def create_collections_and_insert_data(prefix, count=3000):
def create_collections_and_insert_data(prefix, flush=True, count=3000, collection_cnt=11):
import random
dim = 128
nb = count // 10
@ -74,7 +75,7 @@ def create_collections_and_insert_data(prefix, count=3000):
FieldSchema(name="float_vector", dtype=DataType.FLOAT_VECTOR, dim=dim)
]
default_schema = CollectionSchema(fields=default_fields, description="test collection")
for index_name in all_index_types:
for index_name in all_index_types[:collection_cnt]:
print(f"\nCreate collection...")
col_name = prefix + index_name
collection = Collection(name=col_name, schema=default_schema)
@ -97,11 +98,12 @@ def create_collections_and_insert_data(prefix, count=3000):
total_time += end_time - start_time
print(f"end insert, time: {total_time:.4f}")
print("Get collection entities")
start_time = time.time()
print(f"collection entities: {collection.num_entities}")
end_time = time.time()
print("Get collection entities time = %.4fs" % (end_time - start_time))
if flush:
print("Get collection entities")
start_time = time.time()
print(f"collection entities: {collection.num_entities}")
end_time = time.time()
print("Get collection entities time = %.4fs" % (end_time - start_time))
print(f"\nList collections...")
print(get_collections(prefix))
@ -126,17 +128,24 @@ def create_index(prefix):
print(f"create index time: {time.time() - t0:.4f}")
def load_and_search(prefix):
def load_and_search(prefix, replicas=1):
print("search data starts")
col_list = get_collections(prefix)
for col_name in col_list:
c = Collection(name=col_name)
print(f"collection name: {col_name}")
print("release collection")
c.release()
print("load collection")
t0 = time.time()
c.load()
if replicas == 1:
c.load()
if replicas > 1:
c.load(replica_number=replicas)
print(c.get_replicas())
print(f"load time: {time.time() - t0:.4f}")
topK = 5
vectors = [[0.0 for _ in range(128)] for _ in range(3000)]
vectors = [[1.0 for _ in range(128)] for _ in range(3000)]
index_name = col_name.replace(prefix, "")
search_params = gen_search_param(index_name)[0]
print(search_params)