mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
e1f6cc8a5c
Signed-off-by: wangting0128 <ting.wang@zilliz.com>
246 lines
9.5 KiB
Python
246 lines
9.5 KiB
Python
import sys
|
|
import re
|
|
import logging
|
|
import traceback
|
|
import argparse
|
|
from yaml import full_load, dump
|
|
import config
|
|
import utils
|
|
|
|
|
|
def parse_server_tag(server_tag):
|
|
""" paser server tag from server config"""
|
|
# tag format: "8c"/"8c16m"/"8c16m1g"
|
|
if server_tag[-1] == "c":
|
|
p = r"(\d+)c"
|
|
elif server_tag[-1] == "m":
|
|
p = r"(\d+)c(\d+)m"
|
|
elif server_tag[-1] == "g":
|
|
p = r"(\d+)c(\d+)m(\d+)g"
|
|
else:
|
|
raise Exception("Unable to parse server tag")
|
|
m = re.match(p, server_tag)
|
|
cpus = int(m.groups()[0])
|
|
mems = None
|
|
gpus = None
|
|
if len(m.groups()) > 1:
|
|
mems = int(m.groups()[1])
|
|
if len(m.groups()) > 2:
|
|
gpus = int(m.groups()[2])
|
|
return {"cpus": cpus, "mems": mems, "gpus": gpus}
|
|
|
|
|
|
def update_values(src_values_file, deploy_params_file):
|
|
"""
|
|
description: update values.yaml
|
|
return: no return
|
|
"""
|
|
|
|
perf_tolerations = [{
|
|
"key": "node-role.kubernetes.io/benchmark",
|
|
"operator": "Exists",
|
|
"effect": "NoSchedule"
|
|
}]
|
|
|
|
# deploy_mode, hostname, server_tag, milvus_config, server_config=None
|
|
try:
|
|
with open(src_values_file) as f:
|
|
values_dict = full_load(f)
|
|
f.close()
|
|
with open(deploy_params_file) as f:
|
|
deploy_params = full_load(f)
|
|
f.close()
|
|
except Exception as e:
|
|
logging.error(str(e))
|
|
raise Exception("File not found")
|
|
|
|
deploy_mode = utils.get_deploy_mode(deploy_params)
|
|
print("[benchmark update] deploy_mode: %s" % str(deploy_mode))
|
|
|
|
cluster = False
|
|
if deploy_mode in [config.CLUSTER_DEPLOY_MODE, config.CLUSTER_3RD_DEPLOY_MODE]:
|
|
cluster = True
|
|
elif deploy_mode == config.SINGLE_DEPLOY_MODE:
|
|
values_dict["cluster"]["enabled"] = False
|
|
values_dict["etcd"]["replicaCount"] = 1
|
|
values_dict["minio"]["mode"] = "standalone"
|
|
values_dict["pulsar"]["enabled"] = False
|
|
server_tag = utils.get_server_tag(deploy_params)
|
|
print(server_tag)
|
|
# TODO: update milvus config
|
|
# update values.yaml with the given host
|
|
# if server_name:
|
|
# node_config = {'kubernetes.io/hostname': server_name}
|
|
# elif server_tag:
|
|
# node_config = {'instance-type': server_tag}
|
|
cpus = None
|
|
mems = None
|
|
gpus = None
|
|
if server_tag:
|
|
res = parse_server_tag(server_tag)
|
|
cpus = res["cpus"]
|
|
mems = res["mems"]
|
|
gpus = res["gpus"]
|
|
if cpus:
|
|
resources = {
|
|
"limits": {
|
|
"cpu": str(int(cpus)) + ".0"
|
|
},
|
|
"requests": {
|
|
"cpu": str(int(cpus) // 2 + 1) + ".0"
|
|
# "cpu": "4.0"
|
|
# "cpu": str(int(cpus) - 1) + ".0"
|
|
}
|
|
}
|
|
if cpus and mems:
|
|
resources_cluster = {
|
|
"limits": {
|
|
"cpu": str(int(cpus)) + ".0",
|
|
"memory": str(int(mems)) + "Gi"
|
|
},
|
|
"requests": {
|
|
"cpu": str(int(cpus) // 2 + 1) + ".0",
|
|
"memory": str(int(mems) // 2 + 1) + "Gi"
|
|
# "cpu": "4.0"
|
|
# "cpu": str(int(cpus) - 1) + ".0"
|
|
}
|
|
}
|
|
# use external minio/s3
|
|
|
|
# TODO: disable temp
|
|
values_dict['minio']['enabled'] = True
|
|
# values_dict["externalS3"]["enabled"] = True
|
|
values_dict["externalS3"]["enabled"] = False
|
|
values_dict["externalS3"]["host"] = config.MINIO_HOST
|
|
values_dict["externalS3"]["port"] = config.MINIO_PORT
|
|
values_dict["externalS3"]["accessKey"] = config.MINIO_ACCESS_KEY
|
|
values_dict["externalS3"]["secretKey"] = config.MINIO_SECRET_KEY
|
|
values_dict["externalS3"]["bucketName"] = config.MINIO_BUCKET_NAME
|
|
logging.debug(values_dict["externalS3"])
|
|
|
|
if cluster is False:
|
|
# TODO: support pod affinity for standalone mode
|
|
if cpus:
|
|
# values_dict['standalone']['nodeSelector'] = node_config
|
|
# values_dict['minio']['nodeSelector'] = node_config
|
|
# values_dict['etcd']['nodeSelector'] = node_config
|
|
# # set limit/request cpus in resources
|
|
values_dict['standalone']['resources'] = resources
|
|
if mems:
|
|
values_dict['standalone']['resources']["limits"].update({"memory": str(int(mems)) + "Gi"})
|
|
values_dict['standalone']['resources']["requests"].update({"memory": str(int(mems) // 2 + 1) + "Gi"})
|
|
if gpus:
|
|
logging.info("TODO: Need to schedule pod on GPU server")
|
|
logging.debug("Add tolerations into standalone server")
|
|
values_dict['standalone']['tolerations'] = perf_tolerations
|
|
values_dict['minio']['tolerations'] = perf_tolerations
|
|
values_dict['etcd']['tolerations'] = perf_tolerations
|
|
else:
|
|
# TODO: mem limits on distributed mode
|
|
# values_dict['pulsar']["broker"]["configData"].update({"maxMessageSize": "52428800", "PULSAR_MEM": BOOKKEEPER_PULSAR_MEM})
|
|
# values_dict['pulsar']["bookkeeper"]["configData"].update({"nettyMaxFrameSizeBytes": "52428800", "PULSAR_MEM": BROKER_PULSAR_MEM})
|
|
if cpus:
|
|
# values_dict['standalone']['nodeSelector'] = node_config
|
|
# values_dict['minio']['nodeSelector'] = node_config
|
|
# values_dict['etcd']['nodeSelector'] = node_config
|
|
# # set limit/request cpus in resources
|
|
# values_dict['proxy']['resources'] = resources
|
|
values_dict['queryNode']['resources'] = resources_cluster
|
|
values_dict['indexNode']['resources'] = resources_cluster
|
|
values_dict['dataNode']['resources'] = resources_cluster
|
|
# values_dict['minio']['resources'] = resources
|
|
# values_dict['pulsarStandalone']['resources'] = resources
|
|
if mems:
|
|
logging.debug("TODO: Update mem resources")
|
|
# # pulsar distributed mode
|
|
# values_dict['pulsar']["enabled"] = True
|
|
# values_dict['pulsar']['autoRecovery']['nodeSelector'] = node_config
|
|
# values_dict['pulsar']['proxy']['nodeSelector'] = node_config
|
|
# values_dict['pulsar']['broker']['nodeSelector'] = node_config
|
|
# values_dict['pulsar']['bookkeeper']['nodeSelector'] = node_config
|
|
# values_dict['pulsar']['zookeeper']['nodeSelector'] = node_config
|
|
|
|
logging.debug("Add tolerations into cluster server")
|
|
values_dict['proxy']['tolerations'] = perf_tolerations
|
|
values_dict['queryNode']['tolerations'] = perf_tolerations
|
|
values_dict['indexNode']['tolerations'] = perf_tolerations
|
|
values_dict['dataNode']['tolerations'] = perf_tolerations
|
|
values_dict['etcd']['tolerations'] = perf_tolerations
|
|
values_dict['minio']['tolerations'] = perf_tolerations
|
|
if deploy_mode == config.SINGLE_DEPLOY_MODE:
|
|
values_dict['pulsarStandalone']['tolerations'] = perf_tolerations
|
|
# TODO: for distributed deployment
|
|
# values_dict['pulsar']['autoRecovery']['tolerations'] = perf_tolerations
|
|
# values_dict['pulsar']['proxy']['tolerations'] = perf_tolerations
|
|
# values_dict['pulsar']['broker']['tolerations'] = perf_tolerations
|
|
# values_dict['pulsar']['bookkeeper']['tolerations'] = perf_tolerations
|
|
# values_dict['pulsar']['zookeeper']['tolerations'] = perf_tolerations
|
|
milvus_params = deploy_params["milvus"]
|
|
if "datanode" in milvus_params:
|
|
if "replicas" in milvus_params["datanode"]:
|
|
values_dict['dataNode']["replicas"] = milvus_params["datanode"]["replicas"]
|
|
if "querynode"in milvus_params:
|
|
if "replicas" in milvus_params["querynode"]:
|
|
values_dict['queryNode']["replicas"] = milvus_params["querynode"]["replicas"]
|
|
if "indexnode"in milvus_params:
|
|
if "replicas" in milvus_params["indexnode"]:
|
|
values_dict['indexNode']["replicas"] = milvus_params["indexnode"]["replicas"]
|
|
if "proxy"in milvus_params:
|
|
if "replicas" in milvus_params["proxy"]:
|
|
values_dict['proxy']["replicas"] = milvus_params["proxy"]["replicas"]
|
|
# add extra volumes
|
|
values_dict['extraVolumes'] = [{
|
|
'name': 'test',
|
|
'flexVolume': {
|
|
'driver': "fstab/cifs",
|
|
'fsType': "cifs",
|
|
'secretRef': {
|
|
'name': "cifs-test-secret"
|
|
},
|
|
'options': {
|
|
'networkPath': config.IDC_NAS_URL,
|
|
'mountOptions': "vers=1.0"
|
|
}
|
|
}
|
|
}]
|
|
values_dict['extraVolumeMounts'] = [{
|
|
'name': 'test',
|
|
'mountPath': '/test'
|
|
}]
|
|
|
|
server_resource = utils.get_server_resource(deploy_params)
|
|
print("[benchmark update] server_resource: %s" % str(server_resource))
|
|
values_dict = utils.update_dict_value(server_resource, values_dict)
|
|
|
|
print(values_dict)
|
|
# Update content of src_values_file
|
|
with open(src_values_file, 'w') as f:
|
|
dump(values_dict, f, default_flow_style=False)
|
|
f.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
arg_parser = argparse.ArgumentParser(
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
|
|
arg_parser.add_argument(
|
|
'--src-values',
|
|
help='src values.yaml')
|
|
arg_parser.add_argument(
|
|
'--deploy-params',
|
|
help='deploy params')
|
|
|
|
args = arg_parser.parse_args()
|
|
src_values_file = args.src_values
|
|
deploy_params_file = args.deploy_params
|
|
if not src_values_file or not deploy_params_file:
|
|
logging.error("No valid file input")
|
|
sys.exit(-1)
|
|
try:
|
|
update_values(src_values_file, deploy_params_file)
|
|
logging.info("Values.yaml updated")
|
|
except Exception as e:
|
|
logging.error(str(e))
|
|
logging.error(traceback.format_exc())
|
|
sys.exit(-1)
|