mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
[test]Refine chaos apply (#26823)
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
parent
4b58c71908
commit
68a2940b66
@ -5,11 +5,11 @@ repos:
|
||||
- id: golangci-lint
|
||||
args: [--config=.golangci.yml, --timeout=3m]
|
||||
- repo: https://github.com/crate-ci/typos
|
||||
rev: v1.13.10
|
||||
rev: v1.16.10
|
||||
hooks:
|
||||
- id: typos
|
||||
- repo: https://github.com/zhuwenxing/trufflehog
|
||||
rev: v3.28.5
|
||||
- repo: https://github.com/trufflesecurity/trufflehog
|
||||
rev: v3.54.3
|
||||
hooks:
|
||||
- id: trufflehog
|
||||
name: TruffleHog
|
||||
|
@ -23,7 +23,8 @@ from common.common_type import CheckTasks
|
||||
from utils.util_log import test_log as log
|
||||
from utils.api_request import Error
|
||||
|
||||
lock = threading.Lock()
|
||||
event_lock = threading.Lock()
|
||||
request_lock = threading.Lock()
|
||||
|
||||
|
||||
def get_chaos_info():
|
||||
@ -52,6 +53,7 @@ class EventRecords(metaclass=Singleton):
|
||||
self.created_file = False
|
||||
|
||||
def insert(self, event_name, event_status, ts=None):
|
||||
log.info(f"insert event: {event_name}, {event_status}")
|
||||
insert_ts = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') if ts is None else ts
|
||||
data = {
|
||||
"event_name": [event_name],
|
||||
@ -60,11 +62,11 @@ class EventRecords(metaclass=Singleton):
|
||||
}
|
||||
df = pd.DataFrame(data)
|
||||
if not self.created_file:
|
||||
with lock:
|
||||
with event_lock:
|
||||
df.to_parquet(self.file_name, engine='fastparquet')
|
||||
self.created_file = True
|
||||
else:
|
||||
with lock:
|
||||
with event_lock:
|
||||
df.to_parquet(self.file_name, engine='fastparquet', append=True)
|
||||
|
||||
def get_records_df(self):
|
||||
@ -91,22 +93,22 @@ class RequestRecords(metaclass=Singleton):
|
||||
if len(self.buffer) > 100:
|
||||
df = pd.DataFrame(self.buffer)
|
||||
if not self.created_file:
|
||||
with lock:
|
||||
with request_lock:
|
||||
df.to_parquet(self.file_name, engine='fastparquet')
|
||||
self.created_file = True
|
||||
else:
|
||||
with lock:
|
||||
with request_lock:
|
||||
df.to_parquet(self.file_name, engine='fastparquet', append=True)
|
||||
self.buffer = []
|
||||
|
||||
def sink(self):
|
||||
df = pd.DataFrame(self.buffer)
|
||||
if not self.created_file:
|
||||
with lock:
|
||||
with request_lock:
|
||||
df.to_parquet(self.file_name, engine='fastparquet')
|
||||
self.created_file = True
|
||||
else:
|
||||
with lock:
|
||||
with request_lock:
|
||||
df.to_parquet(self.file_name, engine='fastparquet', append=True)
|
||||
|
||||
def get_records_df(self):
|
||||
@ -232,7 +234,7 @@ def trace(fmt=DEFAULT_FMT, prefix='test', flag=True):
|
||||
log.debug(f"insert request record cost {tt}s")
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
log.info(log_str)
|
||||
log.debug(log_str)
|
||||
if result:
|
||||
self.rsp_times.append(elapsed)
|
||||
self.average_time = (
|
||||
|
@ -96,13 +96,17 @@ def wait_signal_to_apply_chaos():
|
||||
t0 = time.time()
|
||||
for f in all_db_file:
|
||||
while True and (time.time() - t0 < timeout):
|
||||
df = pd.read_parquet(f)
|
||||
result = df[(df['event_name'] == 'init_chaos') & (df['event_status'] == 'ready')]
|
||||
if len(result) > 0:
|
||||
log.info(f"{f}: {result}")
|
||||
ready_apply_chaos = True
|
||||
break
|
||||
else:
|
||||
try:
|
||||
df = pd.read_parquet(f)
|
||||
result = df[(df['event_name'] == 'init_chaos') & (df['event_status'] == 'ready')]
|
||||
if len(result) > 0:
|
||||
log.info(f"{f}: {result}")
|
||||
ready_apply_chaos = True
|
||||
break
|
||||
else:
|
||||
ready_apply_chaos = False
|
||||
except Exception as e:
|
||||
log.error(f"read_parquet error: {e}")
|
||||
ready_apply_chaos = False
|
||||
return ready_apply_chaos
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user