mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 19:08:30 +08:00
[test] [skip-e2e] Add L3 case to test sq load balance cross replicas (#17279)
Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
This commit is contained in:
parent
c88514bc49
commit
eb737d4d59
@ -1072,7 +1072,6 @@ class TestCollectionOperation(TestcaseBase):
|
||||
self.collection_wrap.init_collection(c_name, schema=schema, check_task=CheckTasks.check_collection_property,
|
||||
check_items={exp_name: c_name, exp_schema: schema})
|
||||
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_load_collection_after_load_partition(self):
|
||||
"""
|
||||
@ -2420,7 +2419,38 @@ class TestLoadCollection(TestcaseBase):
|
||||
check_task=CheckTasks.check_query_results,
|
||||
check_items={'exp_res': [{'int64': 0}, {'int64': 3000}]})
|
||||
|
||||
# https://github.com/milvus-io/milvus/issues/16726
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
def test_load_replica_sq_count_balance(self):
|
||||
"""
|
||||
target: test load with multi replicas, and sq request load balance cross replicas
|
||||
method: 1.Deploy milvus with multi querynodes
|
||||
2.Insert entities and load with replicas
|
||||
3.Do query req many times
|
||||
4.Verify the querynode sq_req_count metrics
|
||||
expected: Infer whether the query request is load balanced.
|
||||
"""
|
||||
from utils.util_k8s import get_metrics_querynode_sq_req_count
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
df = cf.gen_default_dataframe_data(nb=5000)
|
||||
mutation_res, _ = collection_w.insert(df)
|
||||
assert collection_w.num_entities == 5000
|
||||
total_sq_count = 20
|
||||
|
||||
collection_w.load(replica_number=3)
|
||||
for i in range(total_sq_count):
|
||||
ids = [random.randint(0, 100) for _ in range(5)]
|
||||
collection_w.query(f"{ct.default_int64_field_name} in {ids}")
|
||||
|
||||
replicas, _ = collection_w.get_replicas()
|
||||
log.debug(replicas)
|
||||
sq_req_count = get_metrics_querynode_sq_req_count()
|
||||
for group in replicas.groups:
|
||||
group_nodes = group.group_nodes
|
||||
group_sq_req_count = 0
|
||||
for node in group_nodes:
|
||||
group_sq_req_count += sq_req_count[node]
|
||||
log.debug(f"Group nodes {group_nodes} with total sq_req_count {group_sq_req_count}")
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_get_collection_replicas_not_loaded(self):
|
||||
"""
|
||||
@ -2435,7 +2465,8 @@ class TestLoadCollection(TestcaseBase):
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
collection_w.get_replicas(check_task=CheckTasks.err_res,
|
||||
check_items={"err_code": 15, "err_msg": "getCollectionInfoByID: can't find collectionID"})
|
||||
check_items={"err_code": 15,
|
||||
"err_msg": "collection not found, maybe not loaded"})
|
||||
|
||||
|
||||
class TestReleaseAdvanced(TestcaseBase):
|
||||
@ -2738,9 +2769,10 @@ class TestCollectionString(TestcaseBase):
|
||||
self._connect()
|
||||
c_name = cf.gen_unique_str(prefix)
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
self.collection_wrap.init_collection(name=c_name, schema=schema, check_task=CheckTasks.check_collection_property,
|
||||
self.collection_wrap.init_collection(name=c_name, schema=schema,
|
||||
check_task=CheckTasks.check_collection_property,
|
||||
check_items={exp_name: c_name, exp_schema: schema})
|
||||
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_collection_with_muti_string_fields(self):
|
||||
"""
|
||||
@ -2756,7 +2788,8 @@ class TestCollectionString(TestcaseBase):
|
||||
string_field_1 = cf.gen_string_field(is_primary=True)
|
||||
string_field_2 = cf.gen_string_field(name=c_name)
|
||||
schema = cf.gen_collection_schema(fields=[int_field, string_field_1, string_field_2, vec_field])
|
||||
self.collection_wrap.init_collection(name=c_name, schema=schema, check_task=CheckTasks.check_collection_property,
|
||||
self.collection_wrap.init_collection(name=c_name, schema=schema,
|
||||
check_task=CheckTasks.check_collection_property,
|
||||
check_items={exp_name: c_name, exp_schema: schema})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
@ -2788,8 +2821,8 @@ class TestCollectionString(TestcaseBase):
|
||||
max_length = 100000
|
||||
string_field = cf.gen_string_field(max_length_per_row=max_length)
|
||||
schema = cf.gen_collection_schema([int_field, string_field, vec_field])
|
||||
error = {ct.err_code: 0, ct.err_msg: "invalid max_length_per_row: %s" %max_length}
|
||||
self.collection_wrap.init_collection(name=c_name, schema=schema,
|
||||
error = {ct.err_code: 0, ct.err_msg: "invalid max_length_per_row: %s" % max_length}
|
||||
self.collection_wrap.init_collection(name=c_name, schema=schema,
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
@ -2819,13 +2852,8 @@ class TestCollectionString(TestcaseBase):
|
||||
int_field = cf.gen_int64_field()
|
||||
vec_field = cf.gen_float_vec_field()
|
||||
string_field = cf.gen_string_field(is_primary=True, auto_id=True)
|
||||
fields=[int_field, string_field, vec_field]
|
||||
schema, _=self.collection_schema_wrap.init_collection_schema(fields=fields)
|
||||
fields = [int_field, string_field, vec_field]
|
||||
schema, _ = self.collection_schema_wrap.init_collection_schema(fields=fields)
|
||||
error = {ct.err_code: 0, ct.err_msg: "autoID is not supported when the VarChar field is the primary key"}
|
||||
self.collection_wrap.init_collection(name=cf.gen_unique_str(prefix), schema=schema,
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -1,5 +1,8 @@
|
||||
import json
|
||||
import os.path
|
||||
import time
|
||||
|
||||
import requests
|
||||
from pymilvus import connections
|
||||
from kubernetes import client, config
|
||||
from kubernetes.client.rest import ApiException
|
||||
@ -238,6 +241,32 @@ def read_pod_log(namespace, label_selector, release_name):
|
||||
raise Exception(str(e))
|
||||
|
||||
|
||||
def get_metrics_querynode_sq_req_count():
|
||||
""" get metric milvus_querynode_collection_num from prometheus"""
|
||||
|
||||
PROMETHEUS = 'http://10.96.7.6:9090'
|
||||
query_str = 'milvus_querynode_sq_req_count{app_kubernetes_io_instance="mic-replica",' \
|
||||
'app_kubernetes_io_name="milvus",namespace="chaos-testing"}'
|
||||
|
||||
response = requests.get(PROMETHEUS + '/api/v1/query', params={'query': query_str})
|
||||
if response.status_code == 200:
|
||||
results = response.json()["data"]['result']
|
||||
# print(results)
|
||||
# print(type(results))
|
||||
log.debug(json.dumps(results, indent=4))
|
||||
milvus_querynode_sq_req_count = {}
|
||||
for res in results:
|
||||
if res["metric"]["status"] == "total":
|
||||
querynode_id = res["metric"]["node_id"]
|
||||
# pod = res["metric"]["pod"]
|
||||
value = res["value"][-1]
|
||||
milvus_querynode_sq_req_count[int(querynode_id)] = int(value)
|
||||
# log.debug(milvus_querynode_sq_req_count)
|
||||
return milvus_querynode_sq_req_count
|
||||
else:
|
||||
raise Exception(-1, f"Failed to get metrics with status code {response.status_code}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
label = "app.kubernetes.io/name=milvus, component=querynode"
|
||||
instance_name = get_milvus_instance_name("chaos-testing", "10.96.250.111")
|
||||
|
Loading…
Reference in New Issue
Block a user