[skip ci] Add case to test proxy scale (#13298)

Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
This commit is contained in:
ThreadDao 2021-12-13 21:29:24 +08:00 committed by GitHub
parent 57d2f25ef2
commit 600a8aa575
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 65 deletions

View File

@ -1,6 +1,6 @@
# scale object
IMAGE_REPOSITORY = "harbor.zilliz.cc/milvus/milvus" # repository of milvus image
IMAGE_TAG = "master-20211207-4cd314d" # tag of milvus image
IMAGE_TAG = "master-20211213-d14fff0" # tag of milvus image
NAMESPACE = "chaos-testing" # namespace
IF_NOT_PRESENT = "IfNotPresent" # image pullPolicy IfNotPresent
ALWAYS = "Always" # image pullPolicy Always

View File

@ -3,11 +3,19 @@ import os
from scale import constants
from utils.util_log import test_log as log
from common import common_func as cf
from scale import scale_common as sc
def get_milvus_chart_env_var(var=constants.MILVUS_CHART_ENV):
""" get log path for testing """
try:
milvus_helm_chart = os.environ[var]
return str(milvus_helm_chart)
except Exception as e:
log.error(f'Failed to get environment variables {var}, with exception {str(e)}')
class HelmEnv:
milvus_chart_path = sc.get_milvus_chart_env_var()
milvus_chart_path = get_milvus_chart_env_var()
def __init__(self, release_name=None, **kwargs):
self.release_name = release_name if release_name else cf.gen_unique_str(constants.DEFAULT_RELEASE_PREFIX)
@ -123,4 +131,4 @@ if __name__ == '__main__':
# host = env.get_service_ip()
# env.helm_upgrade_cluster_milvus(queryNode=2)
# env.helm_uninstall_cluster_milvus()
env.export_all_logs()
env.export_all_logs()

View File

@ -1,70 +1,22 @@
import os
from pymilvus import connections, Index
from customize.milvus_operator import MilvusOperator
from scale import constants
from utils.util_log import test_log as log
from base.collection_wrapper import ApiCollectionWrapper
from common import common_func as cf
from common import common_type as ct
def get_milvus_chart_env_var(var=constants.MILVUS_CHART_ENV):
""" get log path for testing """
try:
milvus_helm_chart = os.environ[var]
return str(milvus_helm_chart)
except Exception as e:
log.error(f'Failed to get environment variables {var}, with exception {str(e)}')
# milvus_helm_chart = constants.MILVUS_CHART_PATH
# log.error(f'Failed to get environment variables {var}, please set.')
# log.warning(
# f"Failed to get environment variables: {var}, use default: {constants.MILVUS_CHART_PATH}, {str(e)}")
# if not os.path.exists(milvus_helm_chart):
# raise Exception(f'milvus_helm_chart: {milvus_helm_chart} not exist')
# return milvus_helm_chart
def deploy_default_milvus(release_name, image_tag=None):
if image_tag is None:
image = f'{constants.IMAGE_REPO}:{constants.IMAGE_TAG}'
else:
image = f'{constants.IMAGE_REPO}:{image_tag}'
default_config = {
'metadata.namespace': constants.NAMESPACE,
'metadata.name': release_name,
'spec.components.image': image,
'spec.components.proxy.serviceType': 'LoadBalancer'
}
milvusOp = MilvusOperator()
milvusOp.install(default_config)
if milvusOp.wait_for_healthy(release_name, namespace=constants.NAMESPACE):
endpoint = milvusOp.endpoint(release_name, constants.NAMESPACE)
endpoint = endpoint.split(':')
host = endpoint[0]
port = int(endpoint[-1])
return milvusOp, host, port
else:
raise Exception(f"Failed to install {release_name}")
def e2e_milvus(host, c_name, collection_exist=False):
def e2e_milvus(host, c_name):
# connect
connections.add_connection(default={"host": host, "port": 19530})
connections.connect(alias='default')
# create
collection_w = ApiCollectionWrapper()
if collection_exist:
collection_w.init_collection(name=c_name)
else:
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema())
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema())
# insert
data = cf.gen_default_list_data(ct.default_nb)
data = cf.gen_default_list_data()
mutation_res, _ = collection_w.insert(data)
assert mutation_res.insert_count == ct.default_nb
log.debug(collection_w.num_entities)
@ -86,4 +38,4 @@ def e2e_milvus(host, c_name, collection_exist=False):
ids = search_res[0].ids[0]
term_expr = f'{ct.default_int64_field_name} in [{ids}]'
query_res, _ = collection_w.query(term_expr, output_fields=["*", "%"])
assert query_res[0][ct.default_int64_field_name] == ids
assert query_res[0][ct.default_int64_field_name] == ids

View File

@ -1,9 +1,11 @@
import multiprocessing
import pytest
from customize.milvus_operator import MilvusOperator
from scale.helm_env import HelmEnv
from common import common_func as cf
from common.common_type import CaseLabel
from scale import scale_common as sc
from scale import scale_common as sc, constants
from utils.util_log import test_log as log
prefix = "proxy_scale"
@ -22,17 +24,47 @@ class TestProxyScale:
"""
# deploy all nodes one pod cluster milvus with helm
release_name = "scale-proxy"
env = HelmEnv(release_name=release_name)
host = env.helm_install_cluster_milvus()
image = f'{constants.IMAGE_REPOSITORY}:{constants.IMAGE_TAG}'
data_config = {
'metadata.namespace': constants.NAMESPACE,
'metadata.name': release_name,
'spec.components.image': image,
'spec.components.proxy.serviceType': 'LoadBalancer',
'spec.components.proxy.replicas': 1,
'spec.components.dataNode.replicas': 2,
'spec.config.dataCoord.enableCompaction': True,
'spec.config.dataCoord.enableGarbageCollection': True
}
mic = MilvusOperator()
mic.install(data_config)
healthy = mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200)
log.info(f"milvus healthy: {healthy}")
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
# host = "10.98.0.7"
c_name = cf.gen_unique_str(prefix)
sc.e2e_milvus(host, c_name)
process_list = []
for i in range(5):
p = multiprocessing.Process(target=sc.e2e_milvus, args=(host, c_name))
p.start()
process_list.append(p)
# scale proxy
env.helm_upgrade_cluster_milvus(proxy=2)
for p in process_list:
p.join()
# c_name_2 = cf.gen_unique_str(prefix)
sc.e2e_milvus(host, c_name, collection_exist=True)
log.info('Milvus test before scale up')
mic.upgrade(release_name, {'spec.components.proxy.replicas': 5}, constants.NAMESPACE)
for i in range(5):
p = multiprocessing.Process(target=sc.e2e_milvus, args=(host, c_name))
p.start()
process_list.append(p)
for p in process_list:
p.join()
log.info('Milvus test after scale up')
def test_shrink_proxy(self):
"""