test: add channel exclusive balance test and resource group test (#33093)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2024-05-31 13:55:52 +08:00 committed by GitHub
parent 4159a4d5d7
commit 3336b91ce6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1682 additions and 16 deletions

View File

@ -1,4 +1,3 @@
from numpy.core.fromnumeric import _partition_dispatcher
import pytest import pytest
import sys import sys
from pymilvus import DefaultConfig from pymilvus import DefaultConfig
@ -33,7 +32,7 @@ class Base:
collection_object_list = [] collection_object_list = []
resource_group_list = [] resource_group_list = []
high_level_api_wrap = None high_level_api_wrap = None
skip_connection = False
def setup_class(self): def setup_class(self):
log.info("[setup_class] Start setup class...") log.info("[setup_class] Start setup class...")
@ -128,6 +127,9 @@ class TestcaseBase(Base):
def _connect(self, enable_milvus_client_api=False): def _connect(self, enable_milvus_client_api=False):
""" Add a connection and create the connect """ """ Add a connection and create the connect """
if self.skip_connection:
return None
if enable_milvus_client_api: if enable_milvus_client_api:
if cf.param_info.param_uri: if cf.param_info.param_uri:
uri = cf.param_info.param_uri uri = cf.param_info.param_uri
@ -252,8 +254,8 @@ class TestcaseBase(Base):
insert_ids = [] insert_ids = []
time_stamp = 0 time_stamp = 0
# 1 create collection # 1 create collection
default_schema = cf.gen_default_collection_schema(auto_id=auto_id, dim=dim, primary_field=primary_field, default_schema = cf.gen_default_collection_schema(auto_id=auto_id, dim=dim, primary_field=primary_field,
enable_dynamic_field=enable_dynamic_field, enable_dynamic_field=enable_dynamic_field,
with_json=with_json, multiple_dim_array=multiple_dim_array, with_json=with_json, multiple_dim_array=multiple_dim_array,
is_partition_key=is_partition_key, is_partition_key=is_partition_key,
vector_data_type=vector_data_type) vector_data_type=vector_data_type)

View File

@ -1331,10 +1331,10 @@ class QueryChecker(Checker):
class DeleteChecker(Checker): class DeleteChecker(Checker):
"""check delete operations in a dependent thread""" """check delete operations in a dependent thread"""
def __init__(self, collection_name=None, schema=None): def __init__(self, collection_name=None, schema=None, shards_num=2):
if collection_name is None: if collection_name is None:
collection_name = cf.gen_unique_str("DeleteChecker_") collection_name = cf.gen_unique_str("DeleteChecker_")
super().__init__(collection_name=collection_name, schema=schema) super().__init__(collection_name=collection_name, schema=schema, shards_num=shards_num)
res, result = self.c_wrap.create_index(self.float_vector_field_name, res, result = self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM, constants.DEFAULT_INDEX_PARAM,
timeout=timeout, timeout=timeout,

View File

@ -3,6 +3,7 @@ import json
from pymilvus.grpc_gen import milvus_pb2 as milvus_types from pymilvus.grpc_gen import milvus_pb2 as milvus_types
from pymilvus import connections from pymilvus import connections
from utils.util_log import test_log as log from utils.util_log import test_log as log
from utils.util_log import test_log as log
sys_info_req = ujson.dumps({"metric_type": "system_info"}) sys_info_req = ujson.dumps({"metric_type": "system_info"})
sys_statistics_req = ujson.dumps({"metric_type": "system_statistics"}) sys_statistics_req = ujson.dumps({"metric_type": "system_statistics"})
sys_logs_req = ujson.dumps({"metric_type": "system_logs"}) sys_logs_req = ujson.dumps({"metric_type": "system_logs"})
@ -17,9 +18,24 @@ class MilvusSys:
# TODO: for now it only supports non_orm style API for getMetricsRequest # TODO: for now it only supports non_orm style API for getMetricsRequest
req = milvus_types.GetMetricsRequest(request=sys_info_req) req = milvus_types.GetMetricsRequest(request=sys_info_req)
self.sys_info = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None)
# req = milvus_types.GetMetricsRequest(request=sys_statistics_req)
# self.sys_statistics = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None)
# req = milvus_types.GetMetricsRequest(request=sys_logs_req)
# self.sys_logs = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None)
self.sys_info = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=60) self.sys_info = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=60)
log.debug(f"sys_info: {self.sys_info}") log.debug(f"sys_info: {self.sys_info}")
def refresh(self):
req = milvus_types.GetMetricsRequest(request=sys_info_req)
self.sys_info = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None)
# req = milvus_types.GetMetricsRequest(request=sys_statistics_req)
# self.sys_statistics = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None)
# req = milvus_types.GetMetricsRequest(request=sys_logs_req)
# self.sys_logs = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None)
log.debug(f"sys info response: {self.sys_info.response}")
@property @property
def build_version(self): def build_version(self):
"""get the first node's build version as milvus build version""" """get the first node's build version as milvus build version"""
@ -84,6 +100,7 @@ class MilvusSys:
@property @property
def nodes(self): def nodes(self):
"""get all the nodes in Milvus deployment""" """get all the nodes in Milvus deployment"""
self.refresh()
all_nodes = json.loads(self.sys_info.response).get('nodes_info') all_nodes = json.loads(self.sys_info.response).get('nodes_info')
online_nodes = [node for node in all_nodes if node["infos"]["has_error"] is False] online_nodes = [node for node in all_nodes if node["infos"]["has_error"] is False]
return online_nodes return online_nodes

View File

@ -3,6 +3,7 @@ import os
import time import time
from benedict import benedict from benedict import benedict
from utils.util_log import test_log as log from utils.util_log import test_log as log
from utils.util_k8s import get_pod_ip_name_pairs
from common.cus_resource_opts import CustomResourceOperations as CusResource from common.cus_resource_opts import CustomResourceOperations as CusResource
template_yaml = os.path.join(os.path.dirname(__file__), 'template/default.yaml') template_yaml = os.path.join(os.path.dirname(__file__), 'template/default.yaml')
@ -81,11 +82,13 @@ class MilvusOperator(object):
if delete_depends: if delete_depends:
del_configs = {'spec.dependencies.etcd.inCluster.deletionPolicy': 'Delete', del_configs = {'spec.dependencies.etcd.inCluster.deletionPolicy': 'Delete',
'spec.dependencies.pulsar.inCluster.deletionPolicy': 'Delete', 'spec.dependencies.pulsar.inCluster.deletionPolicy': 'Delete',
'spec.dependencies.kafka.inCluster.deletionPolicy': 'Delete',
'spec.dependencies.storage.inCluster.deletionPolicy': 'Delete' 'spec.dependencies.storage.inCluster.deletionPolicy': 'Delete'
} }
if delete_pvc: if delete_pvc:
del_configs.update({'spec.dependencies.etcd.inCluster.pvcDeletion': True, del_configs.update({'spec.dependencies.etcd.inCluster.pvcDeletion': True,
'spec.dependencies.pulsar.inCluster.pvcDeletion': True, 'spec.dependencies.pulsar.inCluster.pvcDeletion': True,
'spec.dependencies.kafka.inCluster.pvcDeletion': True,
'spec.dependencies.storage.inCluster.pvcDeletion': True 'spec.dependencies.storage.inCluster.pvcDeletion': True
}) })
if delete_depends or delete_pvc: if delete_depends or delete_pvc:
@ -113,6 +116,40 @@ class MilvusOperator(object):
version=self.version, namespace=namespace) version=self.version, namespace=namespace)
log.debug(f"upgrade milvus with configs: {d_configs}") log.debug(f"upgrade milvus with configs: {d_configs}")
cus_res.patch(release_name, d_configs) cus_res.patch(release_name, d_configs)
self.wait_for_healthy(release_name, namespace=namespace)
def rolling_update(self, release_name, new_image_name, namespace='default'):
"""
Method: patch custom resource object to rolling update milvus
Params:
release_name: release name of milvus
namespace: namespace that the milvus is running in
"""
cus_res = CusResource(kind=self.plural, group=self.group,
version=self.version, namespace=namespace)
rolling_configs = {'spec.components.enableRollingUpdate': True,
'spec.components.imageUpdateMode': "rollingUpgrade",
'spec.components.image': new_image_name}
log.debug(f"rolling update milvus with configs: {rolling_configs}")
cus_res.patch(release_name, rolling_configs)
self.wait_for_healthy(release_name, namespace=namespace)
def scale(self, release_name, component, replicas, namespace='default'):
"""
Method: scale milvus components by replicas
Params:
release_name: release name of milvus
replicas: the number of replicas to scale
component: the component to scale, e.g: dataNode, queryNode, indexNode, proxy
namespace: namespace that the milvus is running in
"""
cus_res = CusResource(kind=self.plural, group=self.group,
version=self.version, namespace=namespace)
component = component.replace('node', 'Node')
scale_configs = {f'spec.components.{component}.replicas': replicas}
log.info(f"scale milvus with configs: {scale_configs}")
self.upgrade(release_name, scale_configs, namespace=namespace)
self.wait_for_healthy(release_name, namespace=namespace)
def wait_for_healthy(self, release_name, namespace='default', timeout=600): def wait_for_healthy(self, release_name, namespace='default', timeout=600):
""" """
@ -152,3 +189,24 @@ class MilvusOperator(object):
endpoint = res_object['status']['endpoint'] endpoint = res_object['status']['endpoint']
return endpoint return endpoint
def etcd_endpoints(self, release_name, namespace='default'):
"""
Method: get etcd endpoints by name and namespace
Return: a string type etcd endpoints. e.g: host:port
"""
etcd_endpoints = None
cus_res = CusResource(kind=self.plural, group=self.group,
version=self.version, namespace=namespace)
res_object = cus_res.get(release_name)
try:
etcd_endpoints = res_object['spec']['dependencies']['etcd']['endpoints']
except KeyError:
log.info("etcd endpoints not found")
# get pod ip by pod name
label_selector = f"app.kubernetes.io/instance={release_name}-etcd, app.kubernetes.io/name=etcd"
res = get_pod_ip_name_pairs(namespace, label_selector)
if res:
etcd_endpoints = [f"{pod_ip}:2379" for pod_ip in res.keys()]
return etcd_endpoints[0]

View File

@ -13,6 +13,7 @@ spec:
simdType: avx simdType: avx
components: {} components: {}
dependencies: dependencies:
msgStreamType: kafka
etcd: etcd:
inCluster: inCluster:
deletionPolicy: Delete deletionPolicy: Delete
@ -21,6 +22,113 @@ spec:
metrics: metrics:
podMonitor: podMonitor:
enabled: true enabled: true
kafka:
inCluster:
deletionPolicy: Retain
pvcDeletion: false
values:
replicaCount: 3
defaultReplicationFactor: 2
metrics:
kafka:
enabled: true
serviceMonitor:
enabled: true
jmx:
enabled: true
pulsar:
inCluster:
deletionPolicy: Retain
pvcDeletion: false
values:
components:
autorecovery: false
functions: false
toolset: false
pulsar_manager: false
monitoring:
prometheus: false
grafana: false
node_exporter: false
alert_manager: false
proxy:
replicaCount: 1
resources:
requests:
cpu: 0.01
memory: 256Mi
configData:
PULSAR_MEM: >
-Xms256m -Xmx256m
PULSAR_GC: >
-XX:MaxDirectMemorySize=256m
bookkeeper:
replicaCount: 2
resources:
requests:
cpu: 0.01
memory: 256Mi
configData:
PULSAR_MEM: >
-Xms256m
-Xmx256m
-XX:MaxDirectMemorySize=256m
PULSAR_GC: >
-Dio.netty.leakDetectionLevel=disabled
-Dio.netty.recycler.linkCapacity=1024
-XX:+UseG1GC -XX:MaxGCPauseMillis=10
-XX:+ParallelRefProcEnabled
-XX:+UnlockExperimentalVMOptions
-XX:+DoEscapeAnalysis
-XX:ParallelGCThreads=32
-XX:ConcGCThreads=32
-XX:G1NewSizePercent=50
-XX:+DisableExplicitGC
-XX:-ResizePLAB
-XX:+ExitOnOutOfMemoryError
-XX:+PerfDisableSharedMem
-XX:+PrintGCDetails
zookeeper:
replicaCount: 1
resources:
requests:
cpu: 0.01
memory: 256Mi
configData:
PULSAR_MEM: >
-Xms256m
-Xmx256m
PULSAR_GC: >
-Dcom.sun.management.jmxremote
-Djute.maxbuffer=10485760
-XX:+ParallelRefProcEnabled
-XX:+UnlockExperimentalVMOptions
-XX:+DoEscapeAnalysis -XX:+DisableExplicitGC
-XX:+PerfDisableSharedMem
-Dzookeeper.forceSync=no
broker:
replicaCount: 1
resources:
requests:
cpu: 0.01
memory: 256Mi
configData:
PULSAR_MEM: >
-Xms256m
-Xmx256m
PULSAR_GC: >
-XX:MaxDirectMemorySize=256m
-Dio.netty.leakDetectionLevel=disabled
-Dio.netty.recycler.linkCapacity=1024
-XX:+ParallelRefProcEnabled
-XX:+UnlockExperimentalVMOptions
-XX:+DoEscapeAnalysis
-XX:ParallelGCThreads=32
-XX:ConcGCThreads=32
-XX:G1NewSizePercent=50
-XX:+DisableExplicitGC
-XX:-ResizePLAB
-XX:+ExitOnOutOfMemoryError
storage: storage:
inCluster: inCluster:
deletionPolicy: Delete deletionPolicy: Delete
@ -29,4 +137,3 @@ spec:
metrics: metrics:
podMonitor: podMonitor:
enabled: true enabled: true

View File

@ -7,11 +7,11 @@ metadata:
labels: labels:
app: milvus app: milvus
spec: spec:
mode: standalone mode: cluster
config: config:
dataNode: dataNode:
memory: memory:
forceSyncEnable: false forceSyncEnable: false
rootCoord: rootCoord:
enableActiveStandby: true enableActiveStandby: true
dataCoord: dataCoord:
@ -29,7 +29,7 @@ spec:
components: components:
enableRollingUpdate: true enableRollingUpdate: true
imageUpdateMode: rollingUpgrade imageUpdateMode: rollingUpgrade
image: milvusdb/milvus:2.2.0-20230208-2e4d64ec image: harbor.milvus.io/milvus/milvus:master-20240426-4fb8044a-amd64
disableMetric: false disableMetric: false
dataNode: dataNode:
replicas: 3 replicas: 3
@ -45,7 +45,7 @@ spec:
pvcDeletion: false pvcDeletion: false
values: values:
replicaCount: 3 replicaCount: 3
kafka: kafka:
inCluster: inCluster:
deletionPolicy: Retain deletionPolicy: Retain
pvcDeletion: false pvcDeletion: false
@ -58,13 +58,13 @@ spec:
serviceMonitor: serviceMonitor:
enabled: true enabled: true
jmx: jmx:
enabled: true enabled: true
pulsar: pulsar:
inCluster: inCluster:
deletionPolicy: Retain deletionPolicy: Retain
pvcDeletion: false pvcDeletion: false
values: values:
components: components:
autorecovery: false autorecovery: false
functions: false functions: false
toolset: false toolset: false
@ -158,4 +158,3 @@ spec:
pvcDeletion: false pvcDeletion: false
values: values:
mode: distributed mode: distributed

View File

@ -1,7 +1,7 @@
[pytest] [pytest]
addopts = --host localhost --html=/tmp/ci_logs/report.html --self-contained-html -v addopts = --host 10.104.21.154 --minio_host 10.104.21.153 --html=/tmp/ci_logs/report.html --self-contained-html -v --log-cli-level=INFO --capture=no
# python3 -W ignore -m pytest # python3 -W ignore -m pytest
log_format = [%(asctime)s - %(levelname)s - %(name)s]: %(message)s (%(filename)s:%(lineno)s) log_format = [%(asctime)s - %(levelname)s - %(name)s]: %(message)s (%(filename)s:%(lineno)s)
@ -9,4 +9,4 @@ log_date_format = %Y-%m-%d %H:%M:%S
filterwarnings = filterwarnings =
ignore::DeprecationWarning ignore::DeprecationWarning

View File

@ -46,6 +46,7 @@ loguru==0.7.0
psutil==5.9.4 psutil==5.9.4
pandas==1.5.3 pandas==1.5.3
tenacity==8.1.0 tenacity==8.1.0
rich==13.7.0
# for standby test # for standby test
etcd-sdk-python==0.0.4 etcd-sdk-python==0.0.4
deepdiff==6.7.1 deepdiff==6.7.1

View File

@ -0,0 +1,11 @@
import pytest
def pytest_addoption(parser):
parser.addoption("--image_tag", action="store", default="master-20240514-89a7c34c", help="image_tag")
@pytest.fixture
def image_tag(request):
return request.config.getoption("--image_tag")

View File

@ -0,0 +1,446 @@
import pytest
import time
from pymilvus import connections, utility, Collection
from utils.util_log import test_log as log
from base.client_base import TestcaseBase
from chaos.checker import (InsertChecker,
FlushChecker,
UpsertChecker,
DeleteChecker,
Op,
ResultAnalyzer
)
from chaos import chaos_commons as cc
from common import common_func as cf
from utils.util_k8s import get_querynode_id_pod_pairs
from utils.util_birdwatcher import BirdWatcher
from customize.milvus_operator import MilvusOperator
from common.milvus_sys import MilvusSys
from common.common_type import CaseLabel
from chaos.chaos_commons import assert_statistic
namespace = 'chaos-testing'
prefix = "test_rg"
from rich.table import Table
from rich.console import Console
def display_segment_distribution_info(collection_name, release_name, segment_info=None):
table = Table(title=f"{collection_name} Segment Distribution Info")
table.width = 200
table.add_column("Segment ID", style="cyan")
table.add_column("Collection ID", style="cyan")
table.add_column("Partition ID", style="cyan")
table.add_column("Num Rows", style="cyan")
table.add_column("State", style="cyan")
table.add_column("Channel", style="cyan")
table.add_column("Node ID", style="cyan")
table.add_column("Node Name", style="cyan")
res = utility.get_query_segment_info(collection_name)
log.info(f"segment info: {res}")
label = f"app.kubernetes.io/instance={release_name}, app.kubernetes.io/component=querynode"
querynode_id_pod_pair = get_querynode_id_pod_pairs("chaos-testing", label)
for r in res:
channel = "unknown"
if segment_info and str(r.segmentID) in segment_info:
channel = segment_info[str(r.segmentID)]["Insert Channel"]
table.add_row(
str(r.segmentID),
str(r.collectionID),
str(r.partitionID),
str(r.num_rows),
str(r.state),
str(channel),
str(r.nodeIds),
str([querynode_id_pod_pair.get(node_id) for node_id in r.nodeIds])
)
console = Console()
console.width = 300
console.print(table)
def display_channel_on_qn_distribution_info(collection_name, release_name, segment_info=None):
"""
node id, node name, channel, segment id
1, rg-test-613938-querynode-0, [rg-test-613938-rootcoord-dml_3_449617770820133536v0], [449617770820133655]
2, rg-test-613938-querynode-1, [rg-test-613938-rootcoord-dml_3_449617770820133537v0], [449617770820133656]
"""
m = {}
res = utility.get_query_segment_info(collection_name)
for r in res:
if r.nodeIds:
for node_id in r.nodeIds:
if node_id not in m:
m[node_id] = {
"node_name": "",
"channel": [],
"segment_id": []
}
m[node_id]["segment_id"].append(r.segmentID)
# get channel info
for node_id in m.keys():
for seg in m[node_id]["segment_id"]:
if segment_info and str(seg) in segment_info:
m[node_id]["channel"].append(segment_info[str(seg)]["Insert Channel"])
# get node name
label = f"app.kubernetes.io/instance={release_name}, app.kubernetes.io/component=querynode"
querynode_id_pod_pair = get_querynode_id_pod_pairs("chaos-testing", label)
for node_id in m.keys():
m[node_id]["node_name"] = querynode_id_pod_pair.get(node_id)
table = Table(title=f"{collection_name} Channel Distribution Info")
table.width = 200
table.add_column("Node ID", style="cyan")
table.add_column("Node Name", style="cyan")
table.add_column("Channel", style="cyan")
table.add_column("Segment ID", style="cyan")
for node_id, v in m.items():
table.add_row(
str(node_id),
str(v["node_name"]),
"\n".join([str(x) for x in set(v["channel"])]),
"\n".join([str(x) for x in v["segment_id"]])
)
console = Console()
console.width = 300
console.print(table)
return m
def _install_milvus(image_tag="master-latest"):
release_name = f"rg-test-{cf.gen_digits_by_length(6)}"
cus_configs = {'spec.mode': 'cluster',
'spec.dependencies.msgStreamType': 'kafka',
'spec.components.image': f'harbor.milvus.io/milvus/milvus:{image_tag}',
'metadata.namespace': namespace,
'metadata.name': release_name,
'spec.components.proxy.serviceType': 'LoadBalancer',
'spec.config.queryCoord.balancer': 'ChannelLevelScoreBalancer',
'spec.config.queryCoord.channelExclusiveNodeFactor': 2
}
milvus_op = MilvusOperator()
log.info(f"install milvus with configs: {cus_configs}")
milvus_op.install(cus_configs)
healthy = milvus_op.wait_for_healthy(release_name, namespace, timeout=1200)
log.info(f"milvus healthy: {healthy}")
if healthy:
endpoint = milvus_op.endpoint(release_name, namespace).split(':')
log.info(f"milvus endpoint: {endpoint}")
host = endpoint[0]
port = endpoint[1]
return release_name, host, port
else:
return release_name, None, None
class TestChannelExclusiveBalance(TestcaseBase):
def teardown_method(self, method):
log.info(("*" * 35) + " teardown " + ("*" * 35))
log.info("[teardown_method] Start teardown test case %s..." % method.__name__)
milvus_op = MilvusOperator()
milvus_op.uninstall(self.release_name, namespace)
connections.disconnect("default")
connections.remove_connection("default")
def init_health_checkers(self, collection_name=None, shards_num=2):
c_name = collection_name
checkers = {
Op.insert: InsertChecker(collection_name=c_name, shards_num=shards_num),
Op.flush: FlushChecker(collection_name=c_name, shards_num=shards_num),
Op.upsert: UpsertChecker(collection_name=c_name, shards_num=shards_num),
Op.delete: DeleteChecker(collection_name=c_name, shards_num=shards_num),
}
self.health_checkers = checkers
@pytest.mark.tags(CaseLabel.L3)
def test_channel_exclusive_balance_during_qn_scale_up(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
qn_num = 1
milvus_op.scale(release_name, 'queryNode', qn_num, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
etcd_endpoint = milvus_op.etcd_endpoints(release_name, namespace)
bw = BirdWatcher(etcd_endpoints=etcd_endpoint, root_path=release_name)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
c_name = cf.gen_unique_str("Checker_")
self.init_health_checkers(collection_name=c_name)
c = Collection(name=c_name)
res = c.describe()
collection_id = res["collection_id"]
cc.start_monitor_threads(self.health_checkers)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
log.info("*********************Load Start**********************")
request_duration = 360
for i in range(10):
time.sleep(request_duration // 10)
for k, v in self.health_checkers.items():
v.check_result()
qn_num += min(qn_num + 1, 8)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
milvus_op.scale(release_name, 'queryNode', 8, namespace)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
time.sleep(60)
ra = ResultAnalyzer()
ra.get_stage_success_rate()
assert_statistic(self.health_checkers)
for k, v in self.health_checkers.items():
v.terminate()
time.sleep(60)
# in final state, channel exclusive balance is on, so all qn should have only one channel
for k, v in res.items():
assert len(set(v["channel"])) == 1
@pytest.mark.tags(CaseLabel.L3)
def test_channel_exclusive_balance_during_qn_scale_down(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
qn_num = 8
milvus_op.scale(release_name, 'queryNode', qn_num, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
etcd_endpoint = milvus_op.etcd_endpoints(release_name, namespace)
bw = BirdWatcher(etcd_endpoints=etcd_endpoint, root_path=release_name)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
c_name = cf.gen_unique_str("Checker_")
self.init_health_checkers(collection_name=c_name)
c = Collection(name=c_name)
res = c.describe()
collection_id = res["collection_id"]
cc.start_monitor_threads(self.health_checkers)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
log.info("*********************Load Start**********************")
request_duration = 360
for i in range(10):
time.sleep(request_duration // 10)
for k, v in self.health_checkers.items():
v.check_result()
qn_num = max(qn_num - 1, 3)
milvus_op.scale(release_name, 'queryNode', qn_num, namespace)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
milvus_op.scale(release_name, 'queryNode', 1, namespace)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
time.sleep(60)
ra = ResultAnalyzer()
ra.get_stage_success_rate()
assert_statistic(self.health_checkers)
for k, v in self.health_checkers.items():
v.terminate()
time.sleep(60)
# shard num = 2, k = 2, qn_num = 3
# in final state, channel exclusive balance is off, so all qn should have more than one channel
for k, v in res.items():
assert len(set(v["channel"])) > 1
@pytest.mark.tags(CaseLabel.L3)
def test_channel_exclusive_balance_with_channel_num_is_1(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
qn_num = 1
milvus_op.scale(release_name, 'queryNode', qn_num, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
etcd_endpoint = milvus_op.etcd_endpoints(release_name, namespace)
bw = BirdWatcher(etcd_endpoints=etcd_endpoint, root_path=release_name)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
c_name = cf.gen_unique_str("Checker_")
self.init_health_checkers(collection_name=c_name, shards_num=1)
c = Collection(name=c_name)
res = c.describe()
collection_id = res["collection_id"]
cc.start_monitor_threads(self.health_checkers)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
log.info("*********************Load Start**********************")
request_duration = 360
for i in range(10):
time.sleep(request_duration // 10)
for k, v in self.health_checkers.items():
v.check_result()
qn_num = qn_num + 1
qn_num = min(qn_num, 8)
milvus_op.scale(release_name, 'queryNode', qn_num, namespace)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
for r in res:
assert len(set(r["channel"])) == 1
milvus_op.scale(release_name, 'queryNode', 8, namespace)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
time.sleep(60)
ra = ResultAnalyzer()
ra.get_stage_success_rate()
assert_statistic(self.health_checkers)
for k, v in self.health_checkers.items():
v.terminate()
time.sleep(60)
# since shard num is 1, so all qn should have only one channel, no matter what k is
for k, v in res.items():
assert len(set(v["channel"])) == 1
@pytest.mark.tags(CaseLabel.L3)
def test_channel_exclusive_balance_after_k_increase(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
qn_num = 1
milvus_op.scale(release_name, 'queryNode', qn_num, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
etcd_endpoint = milvus_op.etcd_endpoints(release_name, namespace)
bw = BirdWatcher(etcd_endpoints=etcd_endpoint, root_path=release_name)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
c_name = cf.gen_unique_str("Checker_")
self.init_health_checkers(collection_name=c_name)
c = Collection(name=c_name)
res = c.describe()
collection_id = res["collection_id"]
cc.start_monitor_threads(self.health_checkers)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
log.info("*********************Load Start**********************")
request_duration = 360
for i in range(10):
time.sleep(request_duration // 10)
for k, v in self.health_checkers.items():
v.check_result()
qn_num = qn_num + 1
qn_num = min(qn_num, 8)
if qn_num == 5:
config = {
"spec.config.queryCoord.channelExclusiveNodeFactor": 3
}
milvus_op.upgrade(release_name, config, namespace)
milvus_op.scale(release_name, 'queryNode', qn_num, namespace)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
if qn_num == 4:
# channel exclusive balance is on, so all qn should have only one channel
for r in res.values():
assert len(set(r["channel"])) == 1
if qn_num == 5:
# k is changed to 3 when qn_num is 5,
# channel exclusive balance is off, so all qn should have more than one channel
# wait for a while to make sure all qn have more than one channel
ready = False
t0 = time.time()
while not ready and time.time() - t0 < 180:
ready = True
for r in res.values():
if len(set(r["channel"])) == 1:
ready = False
time.sleep(10)
res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
if qn_num == 6:
# channel exclusive balance is on, so all qn should have only one channel
ready = False
t0 = time.time()
while not ready and time.time() - t0 < 180:
ready = True
for r in res.values():
if len(set(r["channel"])) != 1:
ready = False
time.sleep(10)
res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
milvus_op.scale(release_name, 'queryNode', 8, namespace)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
time.sleep(60)
ra = ResultAnalyzer()
ra.get_stage_success_rate()
assert_statistic(self.health_checkers)
for k, v in self.health_checkers.items():
v.terminate()
time.sleep(60)
@pytest.mark.tags(CaseLabel.L3)
def test_channel_exclusive_balance_for_search_performance(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
qn_num = 1
milvus_op.scale(release_name, 'queryNode', qn_num, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
etcd_endpoint = milvus_op.etcd_endpoints(release_name, namespace)
bw = BirdWatcher(etcd_endpoints=etcd_endpoint, root_path=release_name)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
c_name = cf.gen_unique_str("Checker_")
self.init_health_checkers(collection_name=c_name)
c = Collection(name=c_name)
res = c.describe()
collection_id = res["collection_id"]
cc.start_monitor_threads(self.health_checkers)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
log.info("*********************Load Start**********************")
request_duration = 360
for i in range(10):
time.sleep(request_duration // 10)
for k, v in self.health_checkers.items():
v.check_result()
qn_num = qn_num + 1
qn_num = min(qn_num, 8)
milvus_op.scale(release_name, 'queryNode', qn_num, namespace)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
milvus_op.scale(release_name, 'queryNode', 8, namespace)
seg_res = bw.show_segment_info(collection_id)
display_segment_distribution_info(c_name, release_name, segment_info=seg_res)
display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res)
time.sleep(60)
ra = ResultAnalyzer()
ra.get_stage_success_rate()
assert_statistic(self.health_checkers)
for k, v in self.health_checkers.items():
v.terminate()
time.sleep(60)

View File

@ -0,0 +1,944 @@
import pytest
import time
from typing import Union, List
from pymilvus import connections, utility, Collection
from pymilvus.client.constants import DEFAULT_RESOURCE_GROUP
from pymilvus.client.types import ResourceGroupConfig, ResourceGroupInfo
from utils.util_log import test_log as log
from base.client_base import TestcaseBase
from chaos.checker import (InsertChecker,
UpsertChecker,
SearchChecker,
HybridSearchChecker,
QueryChecker,
DeleteChecker,
Op,
ResultAnalyzer
)
from chaos import chaos_commons as cc
from common import common_func as cf
from utils.util_k8s import get_querynode_id_pod_pairs
from common import common_type as ct
from customize.milvus_operator import MilvusOperator
from common.milvus_sys import MilvusSys
from common.common_type import CaseLabel
from chaos.chaos_commons import assert_statistic
from delayed_assert import assert_expectations
namespace = 'chaos-testing'
prefix = "test_rg"
from rich.table import Table
from rich.console import Console
def display_resource_group_info(info: Union[ResourceGroupInfo, List[ResourceGroupInfo]]):
table = Table(title="Resource Group Info")
table.width = 200
table.add_column("Name", style="cyan")
table.add_column("Capacity", style="cyan")
table.add_column("Available Node", style="cyan")
table.add_column("Loaded Replica", style="cyan")
table.add_column("Outgoing Node", style="cyan")
table.add_column("Incoming Node", style="cyan")
table.add_column("Request", style="cyan")
table.add_column("Limit", style="cyan")
table.add_column("Nodes", style="cyan")
if isinstance(info, list):
for i in info:
table.add_row(
i.name,
str(i.capacity),
str(i.num_available_node),
str(i.num_loaded_replica),
str(i.num_outgoing_node),
str(i.num_incoming_node),
str(i.config.requests.node_num),
str(i.config.limits.node_num),
"\n".join([str(node.hostname) for node in i.nodes])
)
else:
table.add_row(
info.name,
str(info.capacity),
str(info.num_available_node),
str(info.num_loaded_replica),
str(info.num_outgoing_node),
str(info.num_incoming_node),
str(info.config.requests.node_num),
str(info.config.limits.node_num),
"\n".join([str(node.hostname) for node in info.nodes])
)
console = Console()
console.width = 300
console.print(table)
def display_segment_distribution_info(collection_name, release_name):
table = Table(title=f"{collection_name} Segment Distribution Info")
table.width = 200
table.add_column("Segment ID", style="cyan")
table.add_column("Collection ID", style="cyan")
table.add_column("Partition ID", style="cyan")
table.add_column("Num Rows", style="cyan")
table.add_column("State", style="cyan")
table.add_column("Node ID", style="cyan")
table.add_column("Node Name", style="cyan")
res = utility.get_query_segment_info(collection_name)
label = f"app.kubernetes.io/instance={release_name}, app.kubernetes.io/component=querynode"
querynode_id_pod_pair = get_querynode_id_pod_pairs("chaos-testing", label)
for r in res:
table.add_row(
str(r.segmentID),
str(r.collectionID),
str(r.partitionID),
str(r.num_rows),
str(r.state),
str(r.nodeIds),
str([querynode_id_pod_pair.get(node_id) for node_id in r.nodeIds])
)
console = Console()
console.width = 300
console.print(table)
def list_all_resource_groups():
rg_names = utility.list_resource_groups()
resource_groups = []
for rg_name in rg_names:
resource_group = utility.describe_resource_group(rg_name)
resource_groups.append(resource_group)
display_resource_group_info(resource_groups)
def _install_milvus(image_tag="master-latest"):
release_name = f"rg-test-{cf.gen_digits_by_length(6)}"
cus_configs = {'spec.mode': 'cluster',
'spec.dependencies.msgStreamType': 'kafka',
'spec.components.image': f'harbor.milvus.io/milvus/milvus:{image_tag}',
'metadata.namespace': namespace,
'metadata.name': release_name,
'spec.components.proxy.serviceType': 'LoadBalancer',
}
milvus_op = MilvusOperator()
log.info(f"install milvus with configs: {cus_configs}")
milvus_op.install(cus_configs)
healthy = milvus_op.wait_for_healthy(release_name, namespace, timeout=1200)
log.info(f"milvus healthy: {healthy}")
if healthy:
endpoint = milvus_op.endpoint(release_name, namespace).split(':')
log.info(f"milvus endpoint: {endpoint}")
host = endpoint[0]
port = endpoint[1]
return release_name, host, port
else:
return release_name, None, None
class TestResourceGroup(TestcaseBase):
def teardown_method(self, method):
log.info(("*" * 35) + " teardown " + ("*" * 35))
log.info("[teardown_method] Start teardown test case %s..." % method.__name__)
milvus_op = MilvusOperator()
milvus_op.uninstall(self.release_name, namespace)
connections.disconnect("default")
connections.remove_connection("default")
@pytest.mark.tags(CaseLabel.L3)
def test_resource_group_scale_up(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
# create rg1 with request node_num=4, limit node_num=6
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 4},
limits={"node_num": 6},
))
# scale up rg1 to 8 nodes one by one
for replicas in range(1, 8):
milvus_op.scale(release_name, 'queryNode', replicas, namespace)
time.sleep(10)
# get querynode info
qn = mil.query_nodes
log.info(f"query node info: {len(qn)}")
resource_group = self.utility.describe_resource_group(name)
log.info(f"Resource group {name} info:\n {display_resource_group_info(resource_group)}")
list_all_resource_groups()
# assert the node in rg >= 4
resource_group = self.utility.describe_resource_group(name)
assert resource_group.num_available_node >= 4
@pytest.mark.tags(CaseLabel.L3)
def test_resource_group_scale_down(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
milvus_op.scale(release_name, 'queryNode', 8, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
# create rg1 with request node_num=4, limit node_num=6
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 4},
limits={"node_num": 6},
))
# scale down rg1 from 8 to 1 node one by one
for replicas in range(8, 1, -1):
milvus_op.scale(release_name, 'queryNode', replicas, namespace)
time.sleep(10)
resource_group = self.utility.describe_resource_group(name)
log.info(f"Resource group {name} info:\n {display_resource_group_info(resource_group)}")
list_all_resource_groups()
# assert the node in rg <= 1
resource_group = self.utility.describe_resource_group(name)
assert resource_group.num_available_node <= 1
@pytest.mark.tags(CaseLabel.L3)
def test_resource_group_all_querynode_add_into_two_different_config_rg(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
milvus_op.scale(release_name, 'queryNode', 8, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
rg_list = []
# create rg1 with request node_num=4, limit node_num=6
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 4},
limits={"node_num": 6},
))
rg_list.append(name)
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 3},
limits={"node_num": 6},
))
rg_list.append(name)
# assert two rg satisfy the request node_num
list_all_resource_groups()
for rg in rg_list:
resource_group = self.utility.describe_resource_group(rg)
assert resource_group.num_available_node >= resource_group.config.requests.node_num
# scale down rg1 from 8 to 1 node one by one
for replicas in range(8, 1, -1):
milvus_op.scale(release_name, 'queryNode', replicas, namespace)
time.sleep(10)
for name in rg_list:
resource_group = self.utility.describe_resource_group(name)
log.info(f"Resource group {name} info:\n {display_resource_group_info(resource_group)}")
list_all_resource_groups()
@pytest.mark.tags(CaseLabel.L3)
def test_resource_group_querynode_add_into_two_different_config_rg_one_by_one(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
rg_list = []
# create rg1 with request node_num=4, limit node_num=6
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 4},
limits={"node_num": 6},
))
rg_list.append(name)
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 3},
limits={"node_num": 6},
))
rg_list.append(name)
for replicas in range(1, 8):
milvus_op.scale(release_name, 'queryNode', replicas, namespace)
time.sleep(10)
list_all_resource_groups()
for rg in rg_list:
resource_group = self.utility.describe_resource_group(rg)
assert resource_group.num_available_node >= resource_group.config.requests.node_num
# scale down rg1 from 8 to 1 node one by one
for replicas in range(8, 1, -1):
milvus_op.scale(release_name, 'queryNode', replicas, namespace)
time.sleep(10)
list_all_resource_groups()
for rg in rg_list:
resource_group = self.utility.describe_resource_group(rg)
assert resource_group.num_available_node >= 1
@pytest.mark.tags(CaseLabel.L3)
def test_resource_group_querynode_add_into_new_rg(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
self.release_name = release_name
milvus_op.scale(release_name, 'queryNode', 10, namespace)
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
rg_list = []
# create rg1 with request node_num=4, limit node_num=6
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 4},
limits={"node_num": 6},
))
rg_list.append(name)
for rg in rg_list:
resource_group = self.utility.describe_resource_group(rg)
assert resource_group.num_available_node >= resource_group.config.requests.node_num
# create a new rg with request node_num=3, limit node_num=6
# the querynode will be added into the new rg from default rg
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 3},
limits={"node_num": 6},
))
rg_list.append(name)
list_all_resource_groups()
for rg in rg_list:
resource_group = self.utility.describe_resource_group(rg)
assert resource_group.num_available_node >= resource_group.config.requests.node_num
@pytest.mark.tags(CaseLabel.L3)
def test_resource_group_with_two_rg_link_to_each_other_when_all_not_reached_to_request(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
milvus_op.scale(release_name, 'queryNode', 8, namespace)
utility.update_resource_groups(
{DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 1})})
# create rg1 with request node_num=4, limit node_num=6
name = cf.gen_unique_str("rg")
rg1_name = name
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 4},
limits={"node_num": 6},
))
name = cf.gen_unique_str("rg")
rg2_name = name
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 4},
limits={"node_num": 6},
))
list_all_resource_groups()
log.info("update resource group")
utility.update_resource_groups(
{rg1_name: ResourceGroupConfig(requests={"node_num": 6},
limits={"node_num": 8},
transfer_from=[{"resource_group": rg2_name}],
transfer_to=[{"resource_group": rg2_name}], )})
time.sleep(10)
list_all_resource_groups()
utility.update_resource_groups(
{rg2_name: ResourceGroupConfig(requests={"node_num": 6},
limits={"node_num": 8},
transfer_from=[{"resource_group": rg1_name}],
transfer_to=[{"resource_group": rg1_name}], )})
time.sleep(10)
list_all_resource_groups()
# no querynode was transferred between rg1 and rg2
resource_group = self.utility.describe_resource_group(rg1_name)
assert resource_group.num_available_node == 4
resource_group = self.utility.describe_resource_group(rg2_name)
assert resource_group.num_available_node == 4
@pytest.mark.tags(CaseLabel.L3)
def test_resource_group_with_rg_transfer_from_non_default_rg(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
milvus_op.scale(release_name, 'queryNode', 15, namespace)
utility.update_resource_groups(
{DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 3})})
# create rg1 with request node_num=4, limit node_num=6
name = cf.gen_unique_str("rg")
rg1_name = name
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 2},
limits={"node_num": 2},
))
name = cf.gen_unique_str("rg")
rg2_name = name
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 6},
limits={"node_num": 10},
))
list_all_resource_groups()
rg2_available_node_before = self.utility.describe_resource_group(rg2_name).num_available_node
log.info("update resource group")
utility.update_resource_groups(
{rg1_name: ResourceGroupConfig(requests={"node_num": 4},
limits={"node_num": 6},
transfer_from=[{"resource_group": rg2_name}],
transfer_to=[{"resource_group": rg2_name}], )})
time.sleep(10)
list_all_resource_groups()
# expect qn in rg 1 transfer from rg2 not the default rg
rg2_available_node_after = self.utility.describe_resource_group(rg2_name).num_available_node
assert rg2_available_node_before > rg2_available_node_after
@pytest.mark.tags(CaseLabel.L3)
def test_resource_group_with_rg_transfer_to_non_default_rg(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
milvus_op.scale(release_name, 'queryNode', 10, namespace)
utility.update_resource_groups(
{DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 10})})
# create rg1 with request node_num=4, limit node_num=6
name = cf.gen_unique_str("rg")
rg1_name = name
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 2},
limits={"node_num": 10},
))
name = cf.gen_unique_str("rg")
rg2_name = name
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 4},
limits={"node_num": 4},
))
list_all_resource_groups()
rg1_node_available_before = self.utility.describe_resource_group(rg1_name).num_available_node
log.info("update resource group")
utility.update_resource_groups(
{rg2_name: ResourceGroupConfig(requests={"node_num": 2},
limits={"node_num": 2},
transfer_from=[{"resource_group": rg1_name}],
transfer_to=[{"resource_group": rg1_name}], )})
time.sleep(10)
list_all_resource_groups()
# expect qn in rg 2 transfer to rg1 not the default rg
rg1_node_available_after = self.utility.describe_resource_group(rg1_name).num_available_node
assert rg1_node_available_after > rg1_node_available_before
@pytest.mark.tags(CaseLabel.L3)
def test_resource_group_with_rg_transfer_with_rg_list(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
milvus_op.scale(release_name, 'queryNode', 12, namespace)
utility.update_resource_groups(
{DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 1})})
# create rg1 with request node_num=4, limit node_num=6
name = cf.gen_unique_str("rg")
source_rg = name
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 1},
limits={"node_num": 1},
))
name = cf.gen_unique_str("rg")
small_rg = name
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 2},
limits={"node_num": 4},
))
name = cf.gen_unique_str("rg")
big_rg = name
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 3},
limits={"node_num": 6},
))
list_all_resource_groups()
small_rg_node_available_before = self.utility.describe_resource_group(small_rg).num_available_node
big_rg_node_available_before = self.utility.describe_resource_group(big_rg).num_available_node
log.info("update resource group")
utility.update_resource_groups(
{source_rg: ResourceGroupConfig(requests={"node_num": 6},
limits={"node_num": 6},
transfer_from=[{"resource_group": small_rg}, {"resource_group": big_rg}],
)})
time.sleep(10)
list_all_resource_groups()
# expect source rg transfer from small rg and big rg
small_rg_node_available_after = self.utility.describe_resource_group(small_rg).num_available_node
big_rg_node_available_after = self.utility.describe_resource_group(big_rg).num_available_node
assert (small_rg_node_available_before + big_rg_node_available_before > small_rg_node_available_after +
big_rg_node_available_after)
class TestReplicasManagement(TestcaseBase):
def teardown_method(self, method):
log.info(("*" * 35) + " teardown " + ("*" * 35))
log.info("[teardown_method] Start teardown test case %s..." % method.__name__)
milvus_op = MilvusOperator()
milvus_op.uninstall(self.release_name, namespace)
connections.disconnect("default")
connections.remove_connection("default")
@pytest.mark.tags(CaseLabel.L3)
def test_load_replicas_one_collection_multi_replicas_to_multi_rg(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
milvus_op.scale(release_name, 'queryNode', 12, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
resource_groups = []
for i in range(4):
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 2},
limits={"node_num": 6},
))
resource_groups.append(name)
list_all_resource_groups()
# create collection and load with 2 replicase
self.skip_connection = True
collection_w, vectors = self.init_collection_general(prefix, insert_data=True,
enable_dynamic_field=True)[0:2]
collection_w.release()
log.info(f"resource groups: {resource_groups}")
collection_w.load(replica_number=len(resource_groups), _resource_groups=resource_groups)
list_all_resource_groups()
# list replicas
replicas = collection_w.get_replicas()
log.info(f"replicas: {replicas}")
rg_to_scale_down = resource_groups[0]
# scale down a rg to 1 node
self.utility.update_resource_groups(
{rg_to_scale_down: ResourceGroupConfig(requests={"node_num": 1},
limits={"node_num": 1}, )}
)
list_all_resource_groups()
replicas = collection_w.get_replicas()
log.info(f"replicas: {replicas}")
# scale down a rg t0 0 node
self.utility.update_resource_groups(
{rg_to_scale_down: ResourceGroupConfig(requests={"node_num": 0},
limits={"node_num": 0}, )}
)
list_all_resource_groups()
replicas = collection_w.get_replicas()
log.info(f"replicas: {replicas}")
@pytest.mark.tags(CaseLabel.L3)
def test_load_multi_collection_multi_replicas_to_multi_rg(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
milvus_op.scale(release_name, 'queryNode', 12, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
# create two rg with request node_num=4, limit node_num=6
resource_groups = []
for i in range(3):
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 3},
limits={"node_num": 6},
))
resource_groups.append(name)
log.info(f"resource groups: {resource_groups}")
list_all_resource_groups()
col_list = []
# create collection and load with multi replicase
self.skip_connection = True
for i in range(3):
prefix = cf.gen_unique_str("test_rg")
collection_w, vectors = self.init_collection_general(prefix, insert_data=True,
enable_dynamic_field=True)[0:2]
collection_w.release()
col_list.append(collection_w)
collection_w.load(replica_number=len(resource_groups), _resource_groups=resource_groups)
list_all_resource_groups()
# list replicas
for col in col_list:
replicas = col.get_replicas()
log.info(f"replicas: {replicas}")
@pytest.mark.tags(CaseLabel.L3)
def test_load_multi_collection_one_replicas_to_multi_rg(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
milvus_op.scale(release_name, 'queryNode', 12, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
# create two rg with request node_num=4, limit node_num=6
resource_groups = []
for i in range(3):
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 3},
limits={"node_num": 6},
))
resource_groups.append(name)
log.info(f"resource groups: {resource_groups}")
list_all_resource_groups()
col_list = []
# create collection and load with multi replicase
self.skip_connection = True
for i in range(3):
prefix = cf.gen_unique_str("test_rg")
collection_w, vectors = self.init_collection_general(prefix, insert_data=True,
enable_dynamic_field=True)[0:2]
collection_w.release()
col_list.append(collection_w)
collection_w.load(replica_number=1, _resource_groups=resource_groups)
list_all_resource_groups()
# list replicas
for col in col_list:
replicas = col.get_replicas()
log.info(f"replicas: {replicas}")
@pytest.mark.tags(CaseLabel.L3)
def test_transfer_replicas_to_other_rg(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
milvus_op.scale(release_name, 'queryNode', 12, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
# create two rg with request node_num=4, limit node_num=6
resource_groups = []
for i in range(3):
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 3},
limits={"node_num": 6},
))
resource_groups.append(name)
log.info(f"resource groups: {resource_groups}")
list_all_resource_groups()
col_list = []
# create collection and load with multi replicase
self.skip_connection = True
for i in range(3):
prefix = cf.gen_unique_str("test_rg")
collection_w, vectors = self.init_collection_general(prefix, insert_data=True,
enable_dynamic_field=True)[0:2]
collection_w.release()
col_list.append(collection_w)
collection_w.load(replica_number=1, _resource_groups=[resource_groups[i]])
list_all_resource_groups()
# list replicas
for col in col_list:
replicas = col.get_replicas()
log.info(f"replicas: {replicas}")
# transfer replicas to default rg
self.utility.transfer_replica(source_group=resource_groups[0], target_group=DEFAULT_RESOURCE_GROUP,
collection_name=col_list[0].name, num_replicas=1)
list_all_resource_groups()
# list replicas
for col in col_list:
replicas = col.get_replicas()
log.info(f"replicas: {replicas}")
class TestServiceAvailableDuringScale(TestcaseBase):
def init_health_checkers(self, collection_name=None):
c_name = collection_name
shards_num = 5
checkers = {
Op.insert: InsertChecker(collection_name=c_name, shards_num=shards_num),
Op.upsert: UpsertChecker(collection_name=c_name, shards_num=shards_num),
Op.search: SearchChecker(collection_name=c_name, shards_num=shards_num),
Op.hybrid_search: HybridSearchChecker(collection_name=c_name, shards_num=shards_num),
Op.query: QueryChecker(collection_name=c_name, shards_num=shards_num),
Op.delete: DeleteChecker(collection_name=c_name, shards_num=shards_num),
}
self.health_checkers = checkers
def teardown_method(self, method):
log.info(("*" * 35) + " teardown " + ("*" * 35))
log.info("[teardown_method] Start teardown test case %s..." % method.__name__)
milvus_op = MilvusOperator()
milvus_op.uninstall(self.release_name, namespace)
connections.disconnect("default")
connections.remove_connection("default")
def test_service_available_during_scale_up(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
milvus_op.scale(release_name, 'queryNode', 3, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
utility.update_resource_groups(
{DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 10})})
# create rg
resource_groups = []
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 1},
limits={"node_num": 1},
))
resource_groups.append(name)
list_all_resource_groups()
c_name = cf.gen_unique_str("Checker_")
self.init_health_checkers(collection_name=c_name)
# load collection to non default rg
self.health_checkers[Op.search].c_wrap.release()
self.health_checkers[Op.search].c_wrap.load(_resource_groups=resource_groups)
cc.start_monitor_threads(self.health_checkers)
log.info("*********************Load Start**********************")
request_duration = 360
for i in range(10):
time.sleep(request_duration//10)
for k, v in self.health_checkers.items():
v.check_result()
# scale up querynode when progress is 3/10
if i == 3:
utility.update_resource_groups(
{name: ResourceGroupConfig(requests={"node_num": 2}, limits={"node_num": 2})})
log.info(f"scale up querynode in rg {name} from 1 to 2")
list_all_resource_groups()
display_segment_distribution_info(c_name, release_name)
time.sleep(60)
ra = ResultAnalyzer()
ra.get_stage_success_rate()
assert_statistic(self.health_checkers)
for k, v in self.health_checkers.items():
v.terminate()
def test_service_available_during_scale_down(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
milvus_op.scale(release_name, 'queryNode', 3, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
utility.update_resource_groups(
{DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 5})})
# create rg
resource_groups = []
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 2},
limits={"node_num": 2},
))
resource_groups.append(name)
list_all_resource_groups()
c_name = cf.gen_unique_str("Checker_")
self.init_health_checkers(collection_name=c_name)
# load collection to non default rg
self.health_checkers[Op.search].c_wrap.release()
self.health_checkers[Op.search].c_wrap.load(_resource_groups=resource_groups)
cc.start_monitor_threads(self.health_checkers)
list_all_resource_groups()
log.info("*********************Load Start**********************")
request_duration = 360
for i in range(10):
time.sleep(request_duration//10)
for k, v in self.health_checkers.items():
v.check_result()
# scale down querynode in rg when progress is 3/10
if i == 3:
list_all_resource_groups()
utility.update_resource_groups(
{name: ResourceGroupConfig(requests={"node_num": 1}, limits={"node_num": 1})})
log.info(f"scale down querynode in rg {name} from 2 to 1")
list_all_resource_groups()
time.sleep(60)
ra = ResultAnalyzer()
ra.get_stage_success_rate()
assert_statistic(self.health_checkers)
for k, v in self.health_checkers.items():
v.terminate()
class TestServiceAvailableDuringTransferReplicas(TestcaseBase):
def init_health_checkers(self, collection_name=None):
c_name = collection_name
shards_num = 5
checkers = {
Op.insert: InsertChecker(collection_name=c_name, shards_num=shards_num),
Op.upsert: UpsertChecker(collection_name=c_name, shards_num=shards_num),
Op.search: SearchChecker(collection_name=c_name, shards_num=shards_num),
Op.hybrid_search: HybridSearchChecker(collection_name=c_name, shards_num=shards_num),
Op.query: QueryChecker(collection_name=c_name, shards_num=shards_num),
Op.delete: DeleteChecker(collection_name=c_name, shards_num=shards_num),
}
self.health_checkers = checkers
def teardown_method(self, method):
log.info(("*" * 35) + " teardown " + ("*" * 35))
log.info("[teardown_method] Start teardown test case %s..." % method.__name__)
milvus_op = MilvusOperator()
milvus_op.uninstall(self.release_name, namespace)
connections.disconnect("default")
connections.remove_connection("default")
def test_service_available_during_transfer_replicas(self, image_tag):
"""
steps
"""
milvus_op = MilvusOperator()
release_name, host, port = _install_milvus(image_tag=image_tag)
milvus_op.scale(release_name, 'queryNode', 5, namespace)
self.release_name = release_name
assert host is not None
connections.connect("default", host=host, port=port)
mil = MilvusSys(alias="default")
log.info(f"milvus build version: {mil.build_version}")
utility.update_resource_groups(
{DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 10})})
# create rg
resource_groups = []
for i in range(2):
name = cf.gen_unique_str("rg")
self.utility = utility
self.utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": 1},
limits={"node_num": 1},
))
resource_groups.append(name)
list_all_resource_groups()
c_name = cf.gen_unique_str("Checker_")
self.init_health_checkers(collection_name=c_name)
self.health_checkers[Op.search].c_wrap.release()
self.health_checkers[Op.search].c_wrap.load(_resource_groups=resource_groups[0:1])
cc.start_monitor_threads(self.health_checkers)
list_all_resource_groups()
display_segment_distribution_info(c_name, release_name)
log.info("*********************Load Start**********************")
request_duration = 360
for i in range(10):
time.sleep(request_duration//10)
for k, v in self.health_checkers.items():
v.check_result()
# transfer replicas from default to another
if i == 3:
# transfer replicas from default rg to another rg
list_all_resource_groups()
display_segment_distribution_info(c_name, release_name)
self.utility.transfer_replica(source_group=resource_groups[0], target_group=resource_groups[1],
collection_name=c_name, num_replicas=1)
list_all_resource_groups()
display_segment_distribution_info(c_name, release_name)
time.sleep(60)
ra = ResultAnalyzer()
ra.get_stage_success_rate()
assert_statistic(self.health_checkers)
for k, v in self.health_checkers.items():
v.terminate()

View File

@ -0,0 +1,79 @@
import os
import re
from utils.util_log import test_log as log
def extraction_all_data(text):
# Patterns to handle the specifics of each key-value line
patterns = {
'Segment ID': r"Segment ID:\s*(\d+)",
'Segment State': r"Segment State:\s*(\w+)",
'Collection ID': r"Collection ID:\s*(\d+)",
'PartitionID': r"PartitionID:\s*(\d+)",
'Insert Channel': r"Insert Channel:(.+)",
'Num of Rows': r"Num of Rows:\s*(\d+)",
'Max Row Num': r"Max Row Num:\s*(\d+)",
'Last Expire Time': r"Last Expire Time:\s*(.+)",
'Compact from': r"Compact from:\s*(\[\])",
'Start Position ID': r"Start Position ID:\s*(\[[\d\s]+\])",
'Start Position Time': r"Start Position ID:.*time:\s*(.+),",
'Start Channel Name': r"channel name:\s*([^,\n]+)",
'Dml Position ID': r"Dml Position ID:\s*(\[[\d\s]+\])",
'Dml Position Time': r"Dml Position ID:.*time:\s*(.+),",
'Dml Channel Name': r"channel name:\s*(.+)",
'Binlog Nums': r"Binlog Nums:\s*(\d+)",
'StatsLog Nums': r"StatsLog Nums:\s*(\d+)",
'DeltaLog Nums': r"DeltaLog Nums:\s*(\d+)"
}
refined_data = {}
for key, pattern in patterns.items():
match = re.search(pattern, text)
if match:
refined_data[key] = match.group(1).strip()
return refined_data
class BirdWatcher:
"""
birdwatcher is a cli tool to get information about milvus
the command:
show segment info
"""
def __init__(self, etcd_endpoints, root_path):
self.prefix = f"birdwatcher --olc=\"#connect --etcd {etcd_endpoints} --rootPath={root_path},"
def parse_segment_info(self, output):
splitter = output.strip().split('\n')[0]
segments = output.strip().split(splitter)
segments = [segment for segment in segments if segment.strip()]
# Parse all segments
parsed_segments = [extraction_all_data(segment) for segment in segments]
parsed_segments = [segment for segment in parsed_segments if segment]
return parsed_segments
def show_segment_info(self, collection_id=None):
cmd = f"{self.prefix} show segment info --format table\""
if collection_id:
cmd = f"{self.prefix} show segment info --collection {collection_id} --format table\""
log.info(f"cmd: {cmd}")
output = os.popen(cmd).read()
# log.info(f"{cmd} output: {output}")
output = self.parse_segment_info(output)
for segment in output:
log.info(segment)
seg_res = {}
for segment in output:
seg_res[segment['Segment ID']] = segment
return seg_res
if __name__ == "__main__":
birdwatcher = BirdWatcher("10.104.18.24:2379", "rg-test-613938")
res = birdwatcher.show_segment_info()
print(res)

View File

@ -452,6 +452,8 @@ def record_time_when_standby_activated(namespace, release_name, coord_type, time
log.info(f"Standby {coord_type} pod does not switch standby mode") log.info(f"Standby {coord_type} pod does not switch standby mode")
if __name__ == '__main__': if __name__ == '__main__':
label = "app.kubernetes.io/name=milvus, component=querynode" label = "app.kubernetes.io/name=milvus, component=querynode"
instance_name = get_milvus_instance_name("chaos-testing", "10.96.250.111") instance_name = get_milvus_instance_name("chaos-testing", "10.96.250.111")