From 600a8aa575f30c39fa8a6321cfdf6c770e9b92cf Mon Sep 17 00:00:00 2001 From: ThreadDao Date: Mon, 13 Dec 2021 21:29:24 +0800 Subject: [PATCH] [skip ci] Add case to test proxy scale (#13298) Signed-off-by: ThreadDao --- tests/python_client/scale/constants.py | 2 +- tests/python_client/scale/helm_env.py | 14 ++++- tests/python_client/scale/scale_common.py | 56 ++----------------- tests/python_client/scale/test_proxy_scale.py | 50 ++++++++++++++--- 4 files changed, 57 insertions(+), 65 deletions(-) diff --git a/tests/python_client/scale/constants.py b/tests/python_client/scale/constants.py index 76c58068ae..bdb648d097 100644 --- a/tests/python_client/scale/constants.py +++ b/tests/python_client/scale/constants.py @@ -1,6 +1,6 @@ # scale object IMAGE_REPOSITORY = "harbor.zilliz.cc/milvus/milvus" # repository of milvus image -IMAGE_TAG = "master-20211207-4cd314d" # tag of milvus image +IMAGE_TAG = "master-20211213-d14fff0" # tag of milvus image NAMESPACE = "chaos-testing" # namespace IF_NOT_PRESENT = "IfNotPresent" # image pullPolicy IfNotPresent ALWAYS = "Always" # image pullPolicy Always diff --git a/tests/python_client/scale/helm_env.py b/tests/python_client/scale/helm_env.py index 2d85368710..28aedf27a9 100644 --- a/tests/python_client/scale/helm_env.py +++ b/tests/python_client/scale/helm_env.py @@ -3,11 +3,19 @@ import os from scale import constants from utils.util_log import test_log as log from common import common_func as cf -from scale import scale_common as sc + + +def get_milvus_chart_env_var(var=constants.MILVUS_CHART_ENV): + """ get log path for testing """ + try: + milvus_helm_chart = os.environ[var] + return str(milvus_helm_chart) + except Exception as e: + log.error(f'Failed to get environment variables {var}, with exception {str(e)}') class HelmEnv: - milvus_chart_path = sc.get_milvus_chart_env_var() + milvus_chart_path = get_milvus_chart_env_var() def __init__(self, release_name=None, **kwargs): self.release_name = release_name if release_name else cf.gen_unique_str(constants.DEFAULT_RELEASE_PREFIX) @@ -123,4 +131,4 @@ if __name__ == '__main__': # host = env.get_service_ip() # env.helm_upgrade_cluster_milvus(queryNode=2) # env.helm_uninstall_cluster_milvus() - env.export_all_logs() \ No newline at end of file + env.export_all_logs() diff --git a/tests/python_client/scale/scale_common.py b/tests/python_client/scale/scale_common.py index d102a600dd..89abf6a758 100644 --- a/tests/python_client/scale/scale_common.py +++ b/tests/python_client/scale/scale_common.py @@ -1,70 +1,22 @@ -import os - from pymilvus import connections, Index -from customize.milvus_operator import MilvusOperator -from scale import constants from utils.util_log import test_log as log from base.collection_wrapper import ApiCollectionWrapper from common import common_func as cf from common import common_type as ct -def get_milvus_chart_env_var(var=constants.MILVUS_CHART_ENV): - """ get log path for testing """ - try: - milvus_helm_chart = os.environ[var] - return str(milvus_helm_chart) - except Exception as e: - log.error(f'Failed to get environment variables {var}, with exception {str(e)}') - # milvus_helm_chart = constants.MILVUS_CHART_PATH - # log.error(f'Failed to get environment variables {var}, please set.') - # log.warning( - # f"Failed to get environment variables: {var}, use default: {constants.MILVUS_CHART_PATH}, {str(e)}") - # if not os.path.exists(milvus_helm_chart): - # raise Exception(f'milvus_helm_chart: {milvus_helm_chart} not exist') - # return milvus_helm_chart - - -def deploy_default_milvus(release_name, image_tag=None): - if image_tag is None: - image = f'{constants.IMAGE_REPO}:{constants.IMAGE_TAG}' - else: - image = f'{constants.IMAGE_REPO}:{image_tag}' - default_config = { - 'metadata.namespace': constants.NAMESPACE, - 'metadata.name': release_name, - 'spec.components.image': image, - 'spec.components.proxy.serviceType': 'LoadBalancer' - } - - milvusOp = MilvusOperator() - milvusOp.install(default_config) - - if milvusOp.wait_for_healthy(release_name, namespace=constants.NAMESPACE): - endpoint = milvusOp.endpoint(release_name, constants.NAMESPACE) - endpoint = endpoint.split(':') - host = endpoint[0] - port = int(endpoint[-1]) - return milvusOp, host, port - else: - raise Exception(f"Failed to install {release_name}") - - -def e2e_milvus(host, c_name, collection_exist=False): +def e2e_milvus(host, c_name): # connect connections.add_connection(default={"host": host, "port": 19530}) connections.connect(alias='default') # create collection_w = ApiCollectionWrapper() - if collection_exist: - collection_w.init_collection(name=c_name) - else: - collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema()) + collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema()) # insert - data = cf.gen_default_list_data(ct.default_nb) + data = cf.gen_default_list_data() mutation_res, _ = collection_w.insert(data) assert mutation_res.insert_count == ct.default_nb log.debug(collection_w.num_entities) @@ -86,4 +38,4 @@ def e2e_milvus(host, c_name, collection_exist=False): ids = search_res[0].ids[0] term_expr = f'{ct.default_int64_field_name} in [{ids}]' query_res, _ = collection_w.query(term_expr, output_fields=["*", "%"]) - assert query_res[0][ct.default_int64_field_name] == ids \ No newline at end of file + assert query_res[0][ct.default_int64_field_name] == ids diff --git a/tests/python_client/scale/test_proxy_scale.py b/tests/python_client/scale/test_proxy_scale.py index 73028a5872..ad778d64b6 100644 --- a/tests/python_client/scale/test_proxy_scale.py +++ b/tests/python_client/scale/test_proxy_scale.py @@ -1,9 +1,11 @@ +import multiprocessing import pytest - +from customize.milvus_operator import MilvusOperator from scale.helm_env import HelmEnv from common import common_func as cf from common.common_type import CaseLabel -from scale import scale_common as sc +from scale import scale_common as sc, constants +from utils.util_log import test_log as log prefix = "proxy_scale" @@ -22,17 +24,47 @@ class TestProxyScale: """ # deploy all nodes one pod cluster milvus with helm release_name = "scale-proxy" - env = HelmEnv(release_name=release_name) - host = env.helm_install_cluster_milvus() + image = f'{constants.IMAGE_REPOSITORY}:{constants.IMAGE_TAG}' + data_config = { + 'metadata.namespace': constants.NAMESPACE, + 'metadata.name': release_name, + 'spec.components.image': image, + 'spec.components.proxy.serviceType': 'LoadBalancer', + 'spec.components.proxy.replicas': 1, + 'spec.components.dataNode.replicas': 2, + 'spec.config.dataCoord.enableCompaction': True, + 'spec.config.dataCoord.enableGarbageCollection': True + } + mic = MilvusOperator() + mic.install(data_config) + healthy = mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200) + log.info(f"milvus healthy: {healthy}") + host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0] + # host = "10.98.0.7" c_name = cf.gen_unique_str(prefix) - sc.e2e_milvus(host, c_name) + process_list = [] + for i in range(5): + p = multiprocessing.Process(target=sc.e2e_milvus, args=(host, c_name)) + p.start() + process_list.append(p) - # scale proxy - env.helm_upgrade_cluster_milvus(proxy=2) + for p in process_list: + p.join() - # c_name_2 = cf.gen_unique_str(prefix) - sc.e2e_milvus(host, c_name, collection_exist=True) + log.info('Milvus test before scale up') + + mic.upgrade(release_name, {'spec.components.proxy.replicas': 5}, constants.NAMESPACE) + + for i in range(5): + p = multiprocessing.Process(target=sc.e2e_milvus, args=(host, c_name)) + p.start() + process_list.append(p) + + for p in process_list: + p.join() + + log.info('Milvus test after scale up') def test_shrink_proxy(self): """