From c052892e27f457987f06bb84540db2f8f3435521 Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Sun, 15 May 2022 20:41:59 +0800 Subject: [PATCH] [skip e2e]Update multi replicas chaos test (#16981) Signed-off-by: zhuwenxing --- .../template/pod-kill-by-pod-list.yaml | 17 +++ tests/python_client/chaos/checker.py | 68 +++++++-- .../chaos/test_chaos_multi_replicas.py | 136 +++++++++--------- 3 files changed, 144 insertions(+), 77 deletions(-) create mode 100644 tests/python_client/chaos/chaos_objects/template/pod-kill-by-pod-list.yaml diff --git a/tests/python_client/chaos/chaos_objects/template/pod-kill-by-pod-list.yaml b/tests/python_client/chaos/chaos_objects/template/pod-kill-by-pod-list.yaml new file mode 100644 index 0000000000..95d964dafb --- /dev/null +++ b/tests/python_client/chaos/chaos_objects/template/pod-kill-by-pod-list.yaml @@ -0,0 +1,17 @@ +apiVersion: chaos-mesh.org/v1alpha1 +kind: PodChaos +metadata: + name: test-querynode-pod-kill + namespace: chaos-testing +spec: + selector: + pods: + chaos-testing: + - milvus-multi-querynode-querynode-bcdc595d9-7vmcj + - milvus-multi-querynode-querynode-bcdc595d9-ccxls + - milvus-multi-querynode-querynode-bcdc595d9-dpwgp + + mode: all + action: pod-kill + duration: 2m + gracePeriod: 0 \ No newline at end of file diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 85bd14d648..254b135d7f 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -4,6 +4,7 @@ import time from time import sleep from delayed_assert import expect from base.collection_wrapper import ApiCollectionWrapper +from base.utility_wrapper import ApiUtilityWrapper from common import common_func as cf from common import common_type as ct from chaos import constants @@ -34,14 +35,16 @@ class Checker: b. count operations and success rate """ - def __init__(self): + def __init__(self, collection_name=None, shards_num=2): self._succ = 0 self._fail = 0 self.rsp_times = [] self.average_time = 0 self.c_wrap = ApiCollectionWrapper() - self.c_wrap.init_collection(name=cf.gen_unique_str('Checker_'), + c_name = collection_name if collection_name is not None else cf.gen_unique_str('Checker_') + self.c_wrap.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), + shards_num=shards_num, timeout=timeout, enable_traceback=enable_traceback) self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH), @@ -75,9 +78,9 @@ class Checker: class SearchChecker(Checker): """check search operations in a dependent thread""" - def __init__(self): - super().__init__() - self.c_wrap.load(enable_traceback=enable_traceback) # do load before search + def __init__(self, collection_name=None, shards_num=2, replica_number=1): + super().__init__(collection_name=collection_name, shards_num=shards_num) + self.c_wrap.load(replica_number=replica_number) # do load before search def keep_running(self): while True: @@ -105,8 +108,8 @@ class SearchChecker(Checker): class InsertFlushChecker(Checker): """check Insert and flush operations in a dependent thread""" - def __init__(self, flush=False): - super().__init__() + def __init__(self, collection_name=None, flush=False, shards_num=2): + super().__init__(collection_name=collection_name, shards_num=shards_num) self._flush = flush self.initial_entities = self.c_wrap.num_entities @@ -203,9 +206,9 @@ class IndexChecker(Checker): class QueryChecker(Checker): """check query operations in a dependent thread""" - def __init__(self): - super().__init__() - self.c_wrap.load(enable_traceback=enable_traceback) # load before query + def __init__(self, collection_name=None, shards_num=2, replica_number=1): + super().__init__(collection_name=collection_name, shards_num=shards_num) + self.c_wrap.load(replica_number=replica_number) # do load before query def keep_running(self): while True: @@ -228,6 +231,49 @@ class QueryChecker(Checker): sleep(constants.WAIT_PER_OP / 10) +class BulkLoadChecker(Checker): + """check bulk load operations in a dependent thread""" + + def __init__(self,): + super().__init__() + self.utility_wrap = ApiUtilityWrapper() + self.schema = cf.gen_default_collection_schema() + self.files = ["/tmp/test_data.json"] + self.failed_tasks = [] + + def update(self, files=None, schema=None): + if files: + self.files = files + if schema: + self.schema = schema + + def keep_running(self): + while True: + c_name = cf.gen_unique_str("BulkLoadChecker_") + self.c_wrap.init_collection(name=c_name, schema=self.schema) + # import data + t0 = time.time() + task_ids, res_1 = self.utility_wrap.bulk_load(collection_name=c_name, + partition_name='', + row_based=True, + files=self.files) + log.info(f"bulk load task ids:{task_ids}") + completed, res_2 = self.utility_wrap.wait_for_bulk_load_tasks_completed(task_ids=task_ids, + timeout=30) + t1 = time.time() - t0 + if completed and res_1 and res_2: + self.rsp_times.append(t1 - t0) + self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1) + self._succ += 1 + log.debug(f"bulk load success, time: {t1 - t0:.4f}, average_time: {self.average_time:4f}") + else: + self._fail += 1 + self.failed_tasks.append(c_name) + sleep(constants.WAIT_PER_OP / 10) + + + + def assert_statistic(checkers, expectations={}): for k in checkers.keys(): # expect succ if no expectations @@ -242,4 +288,4 @@ def assert_statistic(checkers, expectations={}): else: log.info(f"Expect Succ: {str(k)} {checker_result}") expect(succ_rate > 0.90 or total > 2, - f"Expect Succ: {str(k)} {checker_result}") \ No newline at end of file + f"Expect Succ: {str(k)} {checker_result}") diff --git a/tests/python_client/chaos/test_chaos_multi_replicas.py b/tests/python_client/chaos/test_chaos_multi_replicas.py index 8bee597744..0c0c2afc49 100644 --- a/tests/python_client/chaos/test_chaos_multi_replicas.py +++ b/tests/python_client/chaos/test_chaos_multi_replicas.py @@ -8,8 +8,7 @@ import json from time import sleep from pymilvus import connections -from chaos.checker import (CreateChecker, InsertFlushChecker, - SearchChecker, QueryChecker, IndexChecker, Op) +from chaos.checker import (InsertFlushChecker, SearchChecker, QueryChecker, Op) from common.cus_resource_opts import CustomResourceOperations as CusResource from common.milvus_sys import MilvusSys from utils.util_log import test_log as log @@ -17,6 +16,7 @@ from utils.util_k8s import wait_pods_ready, get_pod_list, get_pod_ip_name_pairs from utils.util_common import findkeys from chaos import chaos_commons as cc from common.common_type import CaseLabel +from common import common_func as cf from chaos import constants from delayed_assert import expect, assert_expectations @@ -70,6 +70,17 @@ def record_results(checkers): return res +def get_querynode_info(release_name): + querynode_id_pod_pair = {} + querynode_ip_pod_pair = get_pod_ip_name_pairs( + "chaos-testing", f"app.kubernetes.io/instance={release_name}, component=querynode") + ms = MilvusSys() + for node in ms.query_nodes: + ip = node["infos"]['hardware_infos']["ip"].split(":")[0] + querynode_id_pod_pair[node["identifier"]] = querynode_ip_pod_pair[ip] + return querynode_id_pod_pair + + class TestChaosBase: expect_create = constants.SUCC expect_insert = constants.SUCC @@ -82,37 +93,6 @@ class TestChaosBase: _chaos_config = None health_checkers = {} - def parser_testcase_config(self, chaos_yaml, chaos_config): - cluster_nodes = check_cluster_nodes(chaos_config) - tests_yaml = constants.TESTS_CONFIG_LOCATION + 'testcases.yaml' - tests_config = cc.gen_experiment_config(tests_yaml) - test_collections = tests_config.get('Collections', None) - for t in test_collections: - test_chaos = t.get('testcase', {}).get('chaos', {}) - if test_chaos in chaos_yaml: - expects = t.get('testcase', {}).get( - 'expectation', {}).get('cluster_1_node', {}) - # for the cluster_n_node - if cluster_nodes > 1: - expects = t.get('testcase', {}).get( - 'expectation', {}).get('cluster_n_node', {}) - log.info(f"yaml.expects: {expects}") - self.expect_create = expects.get( - Op.create.value, constants.SUCC) - self.expect_insert = expects.get( - Op.insert.value, constants.SUCC) - self.expect_flush = expects.get(Op.flush.value, constants.SUCC) - self.expect_index = expects.get(Op.index.value, constants.SUCC) - self.expect_search = expects.get( - Op.search.value, constants.SUCC) - self.expect_query = expects.get(Op.query.value, constants.SUCC) - log.info(f"self.expects: create:{self.expect_create}, insert:{self.expect_insert}, " - f"flush:{self.expect_flush}, index:{self.expect_index}, " - f"search:{self.expect_search}, query:{self.expect_query}") - return True - - return False - class TestChaos(TestChaosBase): @@ -128,9 +108,13 @@ class TestChaos(TestChaosBase): @pytest.fixture(scope="function", autouse=True) def init_health_checkers(self): + c_name = cf.gen_unique_str('Checker_') + replicas_num = 2 + shards_num = 2 checkers = { - Op.search: SearchChecker(replica_number=2), - Op.query: QueryChecker(replica_number=2) + Op.insert: InsertFlushChecker(collection_name=c_name, shards_num=shards_num), + Op.search: SearchChecker(collection_name=c_name, shards_num=shards_num, replica_number=replicas_num), + Op.query: QueryChecker(collection_name=c_name, shards_num=shards_num, replica_number=replicas_num) } self.health_checkers = checkers @@ -145,37 +129,49 @@ class TestChaos(TestChaosBase): log.info(f'Alive threads: {threading.enumerate()}') @pytest.mark.tags(CaseLabel.L3) - # @pytest.mark.parametrize('chaos_yaml', "chaos/chaos_objects/template/pod-failure-by-pod-list.yaml") - def test_chaos(self): + @pytest.mark.parametrize("is_streaming", [False]) # [False, True] + @pytest.mark.parametrize("failed_group_scope", ["one"]) # ["one", "all"] + @pytest.mark.parametrize("failed_node_type", ["shard_leader"]) # ["non_shard_leader", "shard_leader"] + @pytest.mark.parametrize("chaos_type", ["pod-failure"]) # ["pod-failure", "pod-kill"] + def test_multi_replicas_with_only_one_group_available(self, chaos_type, failed_node_type, failed_group_scope, is_streaming): # start the monitor threads to check the milvus ops log.info("*********************Chaos Test Start**********************") # log.info(f"chaos_yaml: {chaos_yaml}") log.info(connections.get_connection_addr('default')) + if is_streaming is False: + del self.health_checkers[Op.insert] cc.start_monitor_threads(self.health_checkers) - # get replicas info release_name = "milvus-multi-querynode" - replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas() - querynode_ip_pod_pair = get_pod_ip_name_pairs( - "chaos-testing", "app.kubernetes.io/instance=milvus-multi-querynode, component=querynode") - querynode_id_pod_pair = {} - ms = MilvusSys() - for node in ms.query_nodes: - ip = node["infos"]['hardware_infos']["ip"].split(":")[0] - querynode_id_pod_pair[node["identifier"] - ] = querynode_ip_pod_pair[ip] + querynode_id_pod_pair = get_querynode_info(release_name) + log.info(querynode_id_pod_pair) group_list = [] + shard_leader_list = [] + replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas() for g in replicas_info.groups: group_list.append(list(g.group_nodes)) - # keep only one group in healthy status, other groups all are unhealthy by injecting chaos, + for shard in g.shards: + shard_leader_list.append(shard.shard_leader) + # keep only one group in healthy status, other groups will be unhealthy by injecting pod failure chaos, # In the effected groups, each group has one pod is in pod failure status target_pod_list = [] - for g in group_list[1:]: - pod = querynode_id_pod_pair[g[0]] - target_pod_list.append(pod) - chaos_config = cc.gen_experiment_config("chaos/chaos_objects/template/pod-failure-by-pod-list.yaml") + target_group = [] + group_list = sorted(group_list, key=lambda x: -len(x)) + if failed_group_scope == "one": + target_group = group_list[:1] + if failed_group_scope == "all": + target_group = group_list[:] + for g in target_group: + target_nodes = [] + if failed_node_type == "shard_leader": + target_nodes = list(set(g) & set(shard_leader_list)) + if failed_node_type == "non_shard_leader": + target_nodes = list(set(g) - set(shard_leader_list)) + for target_node in target_nodes: + pod = querynode_id_pod_pair[target_node] + target_pod_list.append(pod) + chaos_config = cc.gen_experiment_config(f"chaos/chaos_objects/template/{chaos_type}-by-pod-list.yaml") chaos_config['metadata']['name'] = f"test-multi-replicase-{int(time.time())}" - kind = chaos_config['kind'] meta_name = chaos_config.get('metadata', None).get('name', None) chaos_config['spec']['selector']['pods']['chaos-testing'] = target_pod_list self._chaos_config = chaos_config # cache the chaos config for tear down @@ -185,10 +181,7 @@ class TestChaos(TestChaosBase): sleep(constants.WAIT_PER_OP * 2) # replicas info replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas() - log.info(f"replicas_info for collection {self.health_checkers[Op.search].c_wrap.name}: {replicas_info}") - - replicas_info, _ = self.health_checkers[Op.query].c_wrap.get_replicas() - log.info(f"replicas_info for collection {self.health_checkers[Op.query].c_wrap.name}: {replicas_info}") + log.info(f"replicas_info for search collection {self.health_checkers[Op.search].c_wrap.name}: {replicas_info}") # assert statistic:all ops 100% succ log.info("******1st assert before chaos: ") @@ -207,19 +200,27 @@ class TestChaos(TestChaosBase): # wait 120s sleep(constants.CHAOS_DURATION) log.info(f'Alive threads: {threading.enumerate()}') + # node info + querynode_id_pod_pair = get_querynode_info(release_name) + log.info(querynode_id_pod_pair) # replicas info replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas() - log.info(f"replicas_info for collection {self.health_checkers[Op.search].c_wrap.name}: {replicas_info}") + log.info(f"replicas_info for search collection {self.health_checkers[Op.search].c_wrap.name}: {replicas_info}") replicas_info, _ = self.health_checkers[Op.query].c_wrap.get_replicas() - log.info(f"replicas_info for collection {self.health_checkers[Op.query].c_wrap.name}: {replicas_info}") + log.info(f"replicas_info for query collection {self.health_checkers[Op.query].c_wrap.name}: {replicas_info}") # assert statistic log.info("******2nd assert after chaos injected: ") - assert_statistic(self.health_checkers, - expectations={ - Op.search: constants.SUCC, - Op.query: constants.SUCC - }) + expectations = { + Op.search: constants.SUCC, + Op.query: constants.SUCC + } + if failed_group_scope == "all": + expectations = { + Op.search: constants.FAIL, + Op.query: constants.FAIL + } + assert_statistic(self.health_checkers, expectations=expectations) # delete chaos chaos_res.delete(meta_name) log.info("chaos deleted") @@ -232,15 +233,18 @@ class TestChaos(TestChaosBase): log.info("all pods are ready") # reconnect if needed sleep(constants.WAIT_PER_OP * 2) - cc.reconnect(connections, alias='default') + # cc.reconnect(connections, alias='default') # reset counting again cc.reset_counting(self.health_checkers) # wait 50s (varies by feature) sleep(constants.WAIT_PER_OP * 5) + # node info + querynode_id_pod_pair = get_querynode_info(release_name) + log.info(querynode_id_pod_pair) + sleep(120) # replicas info replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas() log.info(f"replicas_info for collection {self.health_checkers[Op.search].c_wrap.name}: {replicas_info}") - replicas_info, _ = self.health_checkers[Op.query].c_wrap.get_replicas() log.info(f"replicas_info for collection {self.health_checkers[Op.query].c_wrap.name}: {replicas_info}") # assert statistic: all ops success again