[skip ci] Update thread in chaos to daemon (#11392)

Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
This commit is contained in:
ThreadDao 2021-11-08 13:01:34 +08:00 committed by GitHub
parent e00a4242df
commit 701ec90a0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 24 deletions

View File

@ -34,12 +34,9 @@ def gen_experiment_config(yaml):
def start_monitor_threads(checkers={}): def start_monitor_threads(checkers={}):
"""start the threads by checkers""" """start the threads by checkers"""
threads = {}
for k, ch in checkers.items(): for k, ch in checkers.items():
t = threading.Thread(target=ch.keep_running, args=()) t = threading.Thread(target=ch.keep_running, args=(), name=k, daemon=True)
t.start() t.start()
threads[k] = t
return threads
def get_env_variable_by_name(name): def get_env_variable_by_name(name):

View File

@ -34,7 +34,6 @@ class Checker:
def __init__(self): def __init__(self):
self._succ = 0 self._succ = 0
self._fail = 0 self._fail = 0
self._running = True
self.c_wrap = ApiCollectionWrapper() self.c_wrap = ApiCollectionWrapper()
self.c_wrap.init_collection(name=cf.gen_unique_str('Checker_'), self.c_wrap.init_collection(name=cf.gen_unique_str('Checker_'),
schema=cf.gen_default_collection_schema(), schema=cf.gen_default_collection_schema(),
@ -49,9 +48,6 @@ class Checker:
def succ_rate(self): def succ_rate(self):
return self._succ / self.total() if self.total() != 0 else 0 return self._succ / self.total() if self.total() != 0 else 0
def terminate(self):
self._running = False
def reset(self): def reset(self):
self._succ = 0 self._succ = 0
self._fail = 0 self._fail = 0
@ -64,7 +60,7 @@ class SearchChecker(Checker):
self.c_wrap.load() # do load before search self.c_wrap.load() # do load before search
def keep_running(self): def keep_running(self):
while self._running is True: while True:
search_vec = cf.gen_vectors(5, ct.default_dim) search_vec = cf.gen_vectors(5, ct.default_dim)
_, result = self.c_wrap.search( _, result = self.c_wrap.search(
data=search_vec, data=search_vec,
@ -87,7 +83,7 @@ class InsertFlushChecker(Checker):
self.initial_entities = self.c_wrap.num_entities self.initial_entities = self.c_wrap.num_entities
def keep_running(self): def keep_running(self):
while self._running: while True:
_, insert_result = \ _, insert_result = \
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS), self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS),
timeout=timeout, check_task=CheckTasks.check_nothing) timeout=timeout, check_task=CheckTasks.check_nothing)
@ -116,7 +112,7 @@ class CreateChecker(Checker):
super().__init__() super().__init__()
def keep_running(self): def keep_running(self):
while self._running is True: while True:
_, result = self.c_wrap.init_collection( _, result = self.c_wrap.init_collection(
name=cf.gen_unique_str("CreateChecker_"), name=cf.gen_unique_str("CreateChecker_"),
schema=cf.gen_default_collection_schema(), schema=cf.gen_default_collection_schema(),
@ -139,7 +135,7 @@ class IndexChecker(Checker):
log.debug(f"Index ready entities: {self.c_wrap.num_entities }") # do as a flush before indexing log.debug(f"Index ready entities: {self.c_wrap.num_entities }") # do as a flush before indexing
def keep_running(self): def keep_running(self):
while self._running: while True:
_, result = self.c_wrap.create_index(ct.default_float_vec_field_name, _, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM, constants.DEFAULT_INDEX_PARAM,
name=cf.gen_unique_str('index_'), name=cf.gen_unique_str('index_'),
@ -158,7 +154,7 @@ class QueryChecker(Checker):
self.c_wrap.load() # load before query self.c_wrap.load() # load before query
def keep_running(self): def keep_running(self):
while self._running: while True:
int_values = [] int_values = []
for _ in range(5): for _ in range(5):
int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH)) int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH))

View File

@ -1,3 +1,5 @@
import threading
import pytest import pytest
import os import os
import time import time
@ -51,7 +53,6 @@ class TestChaosBase:
port = 19530 port = 19530
_chaos_config = None _chaos_config = None
health_checkers = {} health_checkers = {}
checker_threads = {}
def parser_testcase_config(self, chaos_yaml): def parser_testcase_config(self, chaos_yaml):
tests_yaml = constants.TESTS_CONFIG_LOCATION + 'testcases.yaml' tests_yaml = constants.TESTS_CONFIG_LOCATION + 'testcases.yaml'
@ -118,12 +119,8 @@ class TestChaos(TestChaosBase):
namespace=constants.CHAOS_NAMESPACE) namespace=constants.CHAOS_NAMESPACE)
meta_name = self._chaos_config.get('metadata', None).get('name', None) meta_name = self._chaos_config.get('metadata', None).get('name', None)
chaos_res.delete(meta_name, raise_ex=False) chaos_res.delete(meta_name, raise_ex=False)
for k, ch in self.health_checkers.items():
ch.terminate()
log.info(f"tear down: checker {k} terminated")
sleep(2) sleep(2)
for k, t in self.checker_threads.items(): log.info(f'Alive threads: {threading.enumerate()}')
log.info(f"Thread {k} is_alive(): {t.is_alive()}")
@pytest.mark.tags(CaseLabel.L3) @pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize('chaos_yaml', cc.get_chaos_yamls()) @pytest.mark.parametrize('chaos_yaml', cc.get_chaos_yamls())
@ -131,7 +128,7 @@ class TestChaos(TestChaosBase):
# start the monitor threads to check the milvus ops # start the monitor threads to check the milvus ops
log.info("*********************Chaos Test Start**********************") log.info("*********************Chaos Test Start**********************")
log.info(connections.get_connection_addr('default')) log.info(connections.get_connection_addr('default'))
self.checker_threads = cc.start_monitor_threads(self.health_checkers) cc.start_monitor_threads(self.health_checkers)
# parse chaos object # parse chaos object
chaos_config = cc.gen_experiment_config(chaos_yaml) chaos_config = cc.gen_experiment_config(chaos_yaml)
@ -174,8 +171,7 @@ class TestChaos(TestChaosBase):
# wait 40s # wait 40s
sleep(constants.CHAOS_DURATION) sleep(constants.CHAOS_DURATION)
for k, t in self.checker_threads.items(): log.info(f'Alive threads: {threading.enumerate()}')
log.info(f"10s later: Thread {k} is_alive(): {t.is_alive()}")
# assert statistic # assert statistic
log.info("******2nd assert after chaos injected: ") log.info("******2nd assert after chaos injected: ")
@ -193,8 +189,7 @@ class TestChaos(TestChaosBase):
# delete chaos # delete chaos
chaos_res.delete(meta_name) chaos_res.delete(meta_name)
log.info("chaos deleted") log.info("chaos deleted")
for k, t in self.checker_threads.items(): log.info(f'Alive threads: {threading.enumerate()}')
log.info(f"Thread {k} is_alive(): {t.is_alive()}")
sleep(2) sleep(2)
# reconnect if needed # reconnect if needed