diff --git a/build/ci/jenkins/Scale.groovy b/build/ci/jenkins/Scale.groovy index 47305bac19..071095d662 100644 --- a/build/ci/jenkins/Scale.groovy +++ b/build/ci/jenkins/Scale.groovy @@ -17,7 +17,7 @@ pipeline { agent { kubernetes { - label 'milvus-scale-test' +// label 'milvus-scale-test' // inheritFrom 'milvus-test' defaultContainer 'milvus-test' yamlFile "build/ci/jenkins/pod/scale-test.yaml" diff --git a/build/ci/jenkins/pod/scale-test.yaml b/build/ci/jenkins/pod/scale-test.yaml index a991b3e4fc..ec8f363c6e 100644 --- a/build/ci/jenkins/pod/scale-test.yaml +++ b/build/ci/jenkins/pod/scale-test.yaml @@ -6,7 +6,7 @@ metadata: spec: containers: - name: milvus-test - image: harbor.milvus.io/qa/krte:dev-4 + image: harbor.milvus.io/qa/fouram:1.1 # image: dockerhub-mirror-sh.zilliz.cc/milvusdb/pytest:20211209-cef343f command: - cat diff --git a/tests/python_client/scale/test_index_node_scale.py b/tests/python_client/scale/test_index_node_scale.py index 20331b9fc5..cdf59adfb7 100644 --- a/tests/python_client/scale/test_index_node_scale.py +++ b/tests/python_client/scale/test_index_node_scale.py @@ -74,7 +74,7 @@ class TestIndexNodeScale: # create index # Note that the num of segments and the num of indexNode are related to indexing time start = datetime.datetime.now() - collection_w.create_index(ct.default_float_vec_field_name, default_index_params) + collection_w.create_index(ct.default_float_vec_field_name, default_index_params, timeout=60) assert collection_w.has_index()[0] t0 = datetime.datetime.now() - start log.info(f'Create index on {init_replicas} indexNode cost t0: {t0}') @@ -90,14 +90,14 @@ class TestIndexNodeScale: # create index again start = datetime.datetime.now() - collection_w.create_index(ct.default_float_vec_field_name, default_index_params) + collection_w.create_index(ct.default_float_vec_field_name, default_index_params, timeout=60) assert collection_w.has_index()[0] t1 = datetime.datetime.now() - start log.info(f'Create index on {expand_replicas} indexNode cost t1: {t1}') collection_w.drop_index() start = datetime.datetime.now() - collection_w.create_index(ct.default_float_vec_field_name, default_index_params) + collection_w.create_index(ct.default_float_vec_field_name, default_index_params, timeout=60) assert collection_w.has_index()[0] t2 = datetime.datetime.now() - start log.info(f'Create index on {expand_replicas} indexNode cost t2: {t2}') @@ -164,7 +164,7 @@ class TestIndexNodeScale: # create index on collection one and two start = datetime.datetime.now() - collection_w.create_index(ct.default_float_vec_field_name, default_index_params) + collection_w.create_index(ct.default_float_vec_field_name, default_index_params, timeout=60) assert collection_w.has_index()[0] t0 = datetime.datetime.now() - start @@ -187,7 +187,7 @@ class TestIndexNodeScale: assert not collection_w.has_index()[0] start = datetime.datetime.now() - collection_w.create_index(ct.default_float_vec_field_name, default_index_params) + collection_w.create_index(ct.default_float_vec_field_name, default_index_params, timeout=60) assert collection_w.has_index()[0] t2 = datetime.datetime.now() - start log.info(f'Create index on 1 indexNode cost t2: {t2}') diff --git a/tests/python_client/scale/test_query_node_scale.py b/tests/python_client/scale/test_query_node_scale.py index 853ec89a1f..e6fc328788 100644 --- a/tests/python_client/scale/test_query_node_scale.py +++ b/tests/python_client/scale/test_query_node_scale.py @@ -7,6 +7,7 @@ import pytest from base.collection_wrapper import ApiCollectionWrapper from base.utility_wrapper import ApiUtilityWrapper from common.common_type import CaseLabel, CheckTasks +from common.milvus_sys import MilvusSys from customize.milvus_operator import MilvusOperator from common import common_func as cf from common import common_type as ct @@ -21,11 +22,49 @@ nb = 10000 default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}} +def verify_load_balance(c_name, host, port=19530): + """ + verify load balance is available after scale + """ + connections.connect('default', host=host, port=port) + # verify load balance + utility_w = ApiUtilityWrapper() + collection_w = ApiCollectionWrapper() + collection_w.init_collection(c_name) + ms = MilvusSys() + res, _ = utility_w.get_query_segment_info(collection_w.name) + log.debug(res) + segment_distribution = cf.get_segment_distribution(res) + all_querynodes = [node["identifier"] for node in ms.query_nodes] + assert len(all_querynodes) > 1 + all_querynodes = sorted(all_querynodes, + key=lambda x: len(segment_distribution[x]["sealed"]) + if x in segment_distribution else 0, reverse=True) + log.debug(all_querynodes) + src_node_id = all_querynodes[0] + des_node_ids = all_querynodes[1:] + sealed_segment_ids = segment_distribution[src_node_id]["sealed"] + # load balance + utility_w.load_balance(collection_w.name, src_node_id, des_node_ids, sealed_segment_ids) + # get segments distribution after load balance + res, _ = utility_w.get_query_segment_info(collection_w.name) + log.debug(res) + segment_distribution = cf.get_segment_distribution(res) + sealed_segment_ids_after_load_banalce = segment_distribution[src_node_id]["sealed"] + # assert src node has no sealed segments + assert sealed_segment_ids_after_load_banalce == [] + des_sealed_segment_ids = [] + for des_node_id in des_node_ids: + des_sealed_segment_ids += segment_distribution[des_node_id]["sealed"] + # assert sealed_segment_ids is subset of des_sealed_segment_ids + assert set(sealed_segment_ids).issubset(des_sealed_segment_ids) + + @pytest.mark.tags(CaseLabel.L3) class TestQueryNodeScale: @pytest.mark.tags(CaseLabel.L3) - def test_scale_query_node(self): + def test_scale_query_node(self, host): """ target: test scale queryNode method: 1.deploy milvus cluster with 1 queryNode @@ -65,16 +104,17 @@ class TestQueryNodeScale: c_name = cf.gen_unique_str("scale_query") # c_name = 'scale_query_DymS7kI4' collection_w = ApiCollectionWrapper() - collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=2) + utility_w = ApiUtilityWrapper() + collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema()) # insert two segments - for i in range(3): + for i in range(30): df = cf.gen_default_dataframe_data(nb) collection_w.insert(df) log.debug(collection_w.num_entities) # create index - collection_w.create_index(ct.default_float_vec_field_name, default_index_params) + collection_w.create_index(ct.default_float_vec_field_name, default_index_params, timeout=60) assert collection_w.has_index()[0] assert collection_w.index()[0] == Index(collection_w.collection, ct.default_float_vec_field_name, default_index_params) @@ -105,6 +145,9 @@ class TestQueryNodeScale: mic.wait_for_healthy(release_name, constants.NAMESPACE) wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") + # verify load balance + verify_load_balance(c_name, host=host) + @counter def do_insert(): """ do insert """ @@ -308,4 +351,4 @@ class TestQueryNodeScale: label = f"app.kubernetes.io/instance={release_name}" log.info('Start to export milvus pod logs') read_pod_log(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name) - mic.uninstall(release_name, namespace=constants.NAMESPACE) + mic.uninstall(release_name, namespace=constants.NAMESPACE) \ No newline at end of file