diff --git a/tests/python_client/chaos/chaos_commons.py b/tests/python_client/chaos/chaos_commons.py index d4484a754a..cac6c78663 100644 --- a/tests/python_client/chaos/chaos_commons.py +++ b/tests/python_client/chaos/chaos_commons.py @@ -34,12 +34,9 @@ def gen_experiment_config(yaml): def start_monitor_threads(checkers={}): """start the threads by checkers""" - threads = {} for k, ch in checkers.items(): - t = threading.Thread(target=ch.keep_running, args=()) + t = threading.Thread(target=ch.keep_running, args=(), name=k, daemon=True) t.start() - threads[k] = t - return threads def get_env_variable_by_name(name): diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 3deedbe0e5..a814f60f28 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -34,7 +34,6 @@ class Checker: def __init__(self): self._succ = 0 self._fail = 0 - self._running = True self.c_wrap = ApiCollectionWrapper() self.c_wrap.init_collection(name=cf.gen_unique_str('Checker_'), schema=cf.gen_default_collection_schema(), @@ -49,9 +48,6 @@ class Checker: def succ_rate(self): return self._succ / self.total() if self.total() != 0 else 0 - def terminate(self): - self._running = False - def reset(self): self._succ = 0 self._fail = 0 @@ -64,7 +60,7 @@ class SearchChecker(Checker): self.c_wrap.load() # do load before search def keep_running(self): - while self._running is True: + while True: search_vec = cf.gen_vectors(5, ct.default_dim) _, result = self.c_wrap.search( data=search_vec, @@ -87,7 +83,7 @@ class InsertFlushChecker(Checker): self.initial_entities = self.c_wrap.num_entities def keep_running(self): - while self._running: + while True: _, insert_result = \ self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS), timeout=timeout, check_task=CheckTasks.check_nothing) @@ -116,7 +112,7 @@ class CreateChecker(Checker): super().__init__() def keep_running(self): - while self._running is True: + while True: _, result = self.c_wrap.init_collection( name=cf.gen_unique_str("CreateChecker_"), schema=cf.gen_default_collection_schema(), @@ -139,7 +135,7 @@ class IndexChecker(Checker): log.debug(f"Index ready entities: {self.c_wrap.num_entities }") # do as a flush before indexing def keep_running(self): - while self._running: + while True: _, result = self.c_wrap.create_index(ct.default_float_vec_field_name, constants.DEFAULT_INDEX_PARAM, name=cf.gen_unique_str('index_'), @@ -158,7 +154,7 @@ class QueryChecker(Checker): self.c_wrap.load() # load before query def keep_running(self): - while self._running: + while True: int_values = [] for _ in range(5): int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH)) diff --git a/tests/python_client/chaos/test_chaos.py b/tests/python_client/chaos/test_chaos.py index fb26ac175b..74a77167af 100644 --- a/tests/python_client/chaos/test_chaos.py +++ b/tests/python_client/chaos/test_chaos.py @@ -1,3 +1,5 @@ +import threading + import pytest import os import time @@ -51,7 +53,6 @@ class TestChaosBase: port = 19530 _chaos_config = None health_checkers = {} - checker_threads = {} def parser_testcase_config(self, chaos_yaml): tests_yaml = constants.TESTS_CONFIG_LOCATION + 'testcases.yaml' @@ -118,12 +119,8 @@ class TestChaos(TestChaosBase): namespace=constants.CHAOS_NAMESPACE) meta_name = self._chaos_config.get('metadata', None).get('name', None) chaos_res.delete(meta_name, raise_ex=False) - for k, ch in self.health_checkers.items(): - ch.terminate() - log.info(f"tear down: checker {k} terminated") sleep(2) - for k, t in self.checker_threads.items(): - log.info(f"Thread {k} is_alive(): {t.is_alive()}") + log.info(f'Alive threads: {threading.enumerate()}') @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize('chaos_yaml', cc.get_chaos_yamls()) @@ -131,7 +128,7 @@ class TestChaos(TestChaosBase): # start the monitor threads to check the milvus ops log.info("*********************Chaos Test Start**********************") log.info(connections.get_connection_addr('default')) - self.checker_threads = cc.start_monitor_threads(self.health_checkers) + cc.start_monitor_threads(self.health_checkers) # parse chaos object chaos_config = cc.gen_experiment_config(chaos_yaml) @@ -174,8 +171,7 @@ class TestChaos(TestChaosBase): # wait 40s sleep(constants.CHAOS_DURATION) - for k, t in self.checker_threads.items(): - log.info(f"10s later: Thread {k} is_alive(): {t.is_alive()}") + log.info(f'Alive threads: {threading.enumerate()}') # assert statistic log.info("******2nd assert after chaos injected: ") @@ -193,8 +189,7 @@ class TestChaos(TestChaosBase): # delete chaos chaos_res.delete(meta_name) log.info("chaos deleted") - for k, t in self.checker_threads.items(): - log.info(f"Thread {k} is_alive(): {t.is_alive()}") + log.info(f'Alive threads: {threading.enumerate()}') sleep(2) # reconnect if needed