mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-03 20:39:36 +08:00
commit
0201dd963d
10
tests/milvus_ann_acc/ci/function/file_transfer.groovy
Normal file
10
tests/milvus_ann_acc/ci/function/file_transfer.groovy
Normal file
@ -0,0 +1,10 @@
|
||||
def FileTransfer (sourceFiles, remoteDirectory, remoteIP, protocol = "ftp", makeEmptyDirs = true) {
|
||||
if (protocol == "ftp") {
|
||||
ftpPublisher masterNodeName: '', paramPublish: [parameterName: ''], alwaysPublishFromMaster: false, continueOnError: false, failOnError: true, publishers: [
|
||||
[configName: "${remoteIP}", transfers: [
|
||||
[asciiMode: false, cleanRemote: false, excludes: '', flatten: false, makeEmptyDirs: "${makeEmptyDirs}", noDefaultExcludes: false, patternSeparator: '[, ]+', remoteDirectory: "${remoteDirectory}", remoteDirectorySDF: false, removePrefix: '', sourceFiles: "${sourceFiles}"]], usePromotionTimestamp: true, useWorkspaceInPromotion: false, verbose: true
|
||||
]
|
||||
]
|
||||
}
|
||||
}
|
||||
return this
|
16
tests/milvus_ann_acc/ci/jenkinsfile/acc_test.groovy
Normal file
16
tests/milvus_ann_acc/ci/jenkinsfile/acc_test.groovy
Normal file
@ -0,0 +1,16 @@
|
||||
timeout(time: 7200, unit: 'MINUTES') {
|
||||
try {
|
||||
dir ("milvu_ann_acc") {
|
||||
print "Git clone url: ${TEST_URL}:${TEST_BRANCH}"
|
||||
checkout([$class: 'GitSCM', branches: [[name: "${TEST_BRANCH}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "${TEST_URL}", name: 'origin', refspec: "+refs/heads/${TEST_BRANCH}:refs/remotes/origin/${TEST_BRANCH}"]]])
|
||||
print "Install requirements"
|
||||
sh 'python3 -m pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com'
|
||||
// sleep(120000)
|
||||
sh "python3 main.py --suite=${params.SUITE} --host=acc-test-${env.JOB_NAME}-${env.BUILD_NUMBER}-engine.milvus.svc.cluster.local --port=19530"
|
||||
}
|
||||
} catch (exc) {
|
||||
echo 'Milvus Ann Accuracy Test Failed !'
|
||||
throw exc
|
||||
}
|
||||
}
|
||||
|
13
tests/milvus_ann_acc/ci/jenkinsfile/cleanup.groovy
Normal file
13
tests/milvus_ann_acc/ci/jenkinsfile/cleanup.groovy
Normal file
@ -0,0 +1,13 @@
|
||||
try {
|
||||
def result = sh script: "helm status ${env.JOB_NAME}-${env.BUILD_NUMBER}", returnStatus: true
|
||||
if (!result) {
|
||||
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}"
|
||||
}
|
||||
} catch (exc) {
|
||||
def result = sh script: "helm status ${env.JOB_NAME}-${env.BUILD_NUMBER}", returnStatus: true
|
||||
if (!result) {
|
||||
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}"
|
||||
}
|
||||
throw exc
|
||||
}
|
||||
|
@ -0,0 +1,22 @@
|
||||
timeout(time: 30, unit: 'MINUTES') {
|
||||
try {
|
||||
dir ("milvus") {
|
||||
sh 'helm init --client-only --skip-refresh --stable-repo-url https://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts'
|
||||
sh 'helm repo update'
|
||||
checkout([$class: 'GitSCM', branches: [[name: "${HELM_BRANCH}"]], userRemoteConfigs: [[url: "${HELM_URL}", name: 'origin', refspec: "+refs/heads/${HELM_BRANCH}:refs/remotes/origin/${HELM_BRANCH}"]]])
|
||||
dir ("milvus") {
|
||||
sh "helm install --wait --timeout 300 --set engine.image.tag=${IMAGE_TAG} --set expose.type=clusterIP --name acc-test-${env.JOB_NAME}-${env.BUILD_NUMBER} -f ci/db_backend/sqlite_${params.IMAGE_TYPE}_values.yaml -f ci/filebeat/values.yaml --namespace milvus --version ${HELM_BRANCH} ."
|
||||
}
|
||||
}
|
||||
// dir ("milvus") {
|
||||
// checkout([$class: 'GitSCM', branches: [[name: "${env.SERVER_BRANCH}"]], userRemoteConfigs: [[url: "${env.SERVER_URL}", name: 'origin', refspec: "+refs/heads/${env.SERVER_BRANCH}:refs/remotes/origin/${env.SERVER_BRANCH}"]]])
|
||||
// dir ("milvus") {
|
||||
// load "ci/jenkins/step/deploySingle2Dev.groovy"
|
||||
// }
|
||||
// }
|
||||
} catch (exc) {
|
||||
echo 'Deploy Milvus Server Failed !'
|
||||
throw exc
|
||||
}
|
||||
}
|
||||
|
15
tests/milvus_ann_acc/ci/jenkinsfile/notify.groovy
Normal file
15
tests/milvus_ann_acc/ci/jenkinsfile/notify.groovy
Normal file
@ -0,0 +1,15 @@
|
||||
def notify() {
|
||||
if (!currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
|
||||
// Send an email only if the build status has changed from green/unstable to red
|
||||
emailext subject: '$DEFAULT_SUBJECT',
|
||||
body: '$DEFAULT_CONTENT',
|
||||
recipientProviders: [
|
||||
[$class: 'DevelopersRecipientProvider'],
|
||||
[$class: 'RequesterRecipientProvider']
|
||||
],
|
||||
replyTo: '$DEFAULT_REPLYTO',
|
||||
to: '$DEFAULT_RECIPIENTS'
|
||||
}
|
||||
}
|
||||
return this
|
||||
|
130
tests/milvus_ann_acc/ci/main_jenkinsfile
Normal file
130
tests/milvus_ann_acc/ci/main_jenkinsfile
Normal file
@ -0,0 +1,130 @@
|
||||
pipeline {
|
||||
agent none
|
||||
|
||||
options {
|
||||
timestamps()
|
||||
}
|
||||
|
||||
parameters{
|
||||
choice choices: ['cpu', 'gpu'], description: 'cpu or gpu version', name: 'IMAGE_TYPE'
|
||||
string defaultValue: '0.6.0', description: 'server image version', name: 'IMAGE_VERSION', trim: true
|
||||
string defaultValue: 'suite.yaml', description: 'test suite config yaml', name: 'SUITE', trim: true
|
||||
string defaultValue: '09509e53-9125-4f5d-9ce8-42855987ad67', description: 'git credentials', name: 'GIT_USER', trim: true
|
||||
}
|
||||
|
||||
environment {
|
||||
IMAGE_TAG = "${params.IMAGE_VERSION}-${params.IMAGE_TYPE}-ubuntu18.04-release"
|
||||
HELM_URL = "https://github.com/milvus-io/milvus-helm.git"
|
||||
HELM_BRANCH = "0.6.0"
|
||||
TEST_URL = "git@192.168.1.105:Test/milvus_ann_acc.git"
|
||||
TEST_BRANCH = "0.6.0"
|
||||
}
|
||||
|
||||
stages {
|
||||
stage("Setup env") {
|
||||
agent {
|
||||
kubernetes {
|
||||
label 'dev-test'
|
||||
defaultContainer 'jnlp'
|
||||
yaml """
|
||||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
labels:
|
||||
app: milvus
|
||||
componet: test
|
||||
spec:
|
||||
containers:
|
||||
- name: milvus-testframework
|
||||
image: registry.zilliz.com/milvus/milvus-test:v0.2
|
||||
command:
|
||||
- cat
|
||||
tty: true
|
||||
volumeMounts:
|
||||
- name: kubeconf
|
||||
mountPath: /root/.kube/
|
||||
readOnly: true
|
||||
- name: hdf5-path
|
||||
mountPath: /test
|
||||
readOnly: true
|
||||
volumes:
|
||||
- name: kubeconf
|
||||
secret:
|
||||
secretName: test-cluster-config
|
||||
- name: hdf5-path
|
||||
flexVolume:
|
||||
driver: "fstab/cifs"
|
||||
fsType: "cifs"
|
||||
secretRef:
|
||||
name: "cifs-test-secret"
|
||||
options:
|
||||
networkPath: "//192.168.1.126/test"
|
||||
mountOptions: "vers=1.0"
|
||||
"""
|
||||
}
|
||||
}
|
||||
|
||||
stages {
|
||||
stage("Deploy Default Server") {
|
||||
steps {
|
||||
gitlabCommitStatus(name: 'Accuracy Test') {
|
||||
container('milvus-testframework') {
|
||||
script {
|
||||
print "In Deploy Default Server Stage"
|
||||
load "${env.WORKSPACE}/ci/jenkinsfile/deploy_default_server.groovy"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
stage("Acc Test") {
|
||||
steps {
|
||||
gitlabCommitStatus(name: 'Accuracy Test') {
|
||||
container('milvus-testframework') {
|
||||
script {
|
||||
print "In Acc test stage"
|
||||
load "${env.WORKSPACE}/ci/jenkinsfile/acc_test.groovy"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
stage ("Cleanup Env") {
|
||||
steps {
|
||||
gitlabCommitStatus(name: 'Cleanup Env') {
|
||||
container('milvus-testframework') {
|
||||
script {
|
||||
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup.groovy"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
post {
|
||||
always {
|
||||
container('milvus-testframework') {
|
||||
script {
|
||||
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup.groovy"
|
||||
}
|
||||
}
|
||||
}
|
||||
success {
|
||||
script {
|
||||
echo "Milvus ann-accuracy test success !"
|
||||
}
|
||||
}
|
||||
aborted {
|
||||
script {
|
||||
echo "Milvus ann-accuracy test aborted !"
|
||||
}
|
||||
}
|
||||
failure {
|
||||
script {
|
||||
echo "Milvus ann-accuracy test failed !"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,13 @@
|
||||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
labels:
|
||||
app: milvus
|
||||
componet: testframework
|
||||
spec:
|
||||
containers:
|
||||
- name: milvus-testframework
|
||||
image: registry.zilliz.com/milvus/milvus-test:v0.2
|
||||
command:
|
||||
- cat
|
||||
tty: true
|
@ -8,7 +8,7 @@ import numpy
|
||||
import sklearn.preprocessing
|
||||
from milvus import Milvus, IndexType, MetricType
|
||||
|
||||
logger = logging.getLogger("milvus_ann_acc.client")
|
||||
logger = logging.getLogger("milvus_acc.client")
|
||||
|
||||
SERVER_HOST_DEFAULT = "127.0.0.1"
|
||||
SERVER_PORT_DEFAULT = 19530
|
||||
@ -28,17 +28,17 @@ def time_wrapper(func):
|
||||
|
||||
|
||||
class MilvusClient(object):
|
||||
def __init__(self, table_name=None, ip=None, port=None):
|
||||
def __init__(self, table_name=None, host=None, port=None):
|
||||
self._milvus = Milvus()
|
||||
self._table_name = table_name
|
||||
try:
|
||||
if not ip:
|
||||
if not host:
|
||||
self._milvus.connect(
|
||||
host = SERVER_HOST_DEFAULT,
|
||||
port = SERVER_PORT_DEFAULT)
|
||||
else:
|
||||
self._milvus.connect(
|
||||
host = ip,
|
||||
host = host,
|
||||
port = port)
|
||||
except Exception as e:
|
||||
raise e
|
||||
@ -113,7 +113,6 @@ class MilvusClient(object):
|
||||
X = X.astype(numpy.float32)
|
||||
status, results = self._milvus.search_vectors(self._table_name, top_k, nprobe, X.tolist())
|
||||
self.check_status(status)
|
||||
# logger.info(results[0])
|
||||
ids = []
|
||||
for result in results:
|
||||
tmp_ids = []
|
||||
@ -125,24 +124,20 @@ class MilvusClient(object):
|
||||
def count(self):
|
||||
return self._milvus.get_table_row_count(self._table_name)[1]
|
||||
|
||||
def delete(self, timeout=60):
|
||||
logger.info("Start delete table: %s" % self._table_name)
|
||||
self._milvus.delete_table(self._table_name)
|
||||
i = 0
|
||||
while i < timeout:
|
||||
if self.count():
|
||||
time.sleep(1)
|
||||
i = i + 1
|
||||
else:
|
||||
break
|
||||
if i >= timeout:
|
||||
logger.error("Delete table timeout")
|
||||
def delete(self, table_name):
|
||||
logger.info("Start delete table: %s" % table_name)
|
||||
return self._milvus.delete_table(table_name)
|
||||
|
||||
def describe(self):
|
||||
return self._milvus.describe_table(self._table_name)
|
||||
|
||||
def exists_table(self):
|
||||
return self._milvus.has_table(self._table_name)
|
||||
def exists_table(self, table_name):
|
||||
return self._milvus.has_table(table_name)
|
||||
|
||||
def get_server_version(self):
|
||||
status, res = self._milvus.server_version()
|
||||
self.check_status(status)
|
||||
return res
|
||||
|
||||
@time_wrapper
|
||||
def preload_table(self):
|
||||
|
@ -1,26 +1,57 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from yaml import load, dump
|
||||
import logging
|
||||
from logging import handlers
|
||||
from client import MilvusClient
|
||||
import runner
|
||||
|
||||
LOG_FOLDER = "logs"
|
||||
logger = logging.getLogger("milvus_acc")
|
||||
formatter = logging.Formatter('[%(asctime)s] [%(levelname)-4s] [%(pathname)s:%(lineno)d] %(message)s')
|
||||
if not os.path.exists(LOG_FOLDER):
|
||||
os.system('mkdir -p %s' % LOG_FOLDER)
|
||||
fileTimeHandler = handlers.TimedRotatingFileHandler(os.path.join(LOG_FOLDER, 'acc'), "D", 1, 10)
|
||||
fileTimeHandler.suffix = "%Y%m%d.log"
|
||||
fileTimeHandler.setFormatter(formatter)
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
fileTimeHandler.setFormatter(formatter)
|
||||
logger.addHandler(fileTimeHandler)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument(
|
||||
'--dataset',
|
||||
metavar='NAME',
|
||||
help='the dataset to load training points from',
|
||||
default='glove-100-angular',
|
||||
choices=DATASETS.keys())
|
||||
"--host",
|
||||
default="127.0.0.1",
|
||||
help="server host")
|
||||
parser.add_argument(
|
||||
"-k", "--count",
|
||||
default=10,
|
||||
type=positive_int,
|
||||
help="the number of near neighbours to search for")
|
||||
"--port",
|
||||
default=19530,
|
||||
help="server port")
|
||||
parser.add_argument(
|
||||
'--definitions',
|
||||
'--suite',
|
||||
metavar='FILE',
|
||||
help='load algorithm definitions from FILE',
|
||||
default='algos.yaml')
|
||||
parser.add_argument(
|
||||
'--image-tag',
|
||||
default=None,
|
||||
help='pull image first')
|
||||
help='load config definitions from suite_czr'
|
||||
'.yaml',
|
||||
default='suite_czr.yaml')
|
||||
args = parser.parse_args()
|
||||
if args.suite:
|
||||
with open(args.suite, "r") as f:
|
||||
suite = load(f)
|
||||
hdf5_path = suite["hdf5_path"]
|
||||
dataset_configs = suite["datasets"]
|
||||
if not hdf5_path or not dataset_configs:
|
||||
logger.warning("No datasets given")
|
||||
sys.exit()
|
||||
f.close()
|
||||
for dataset_config in dataset_configs:
|
||||
logger.debug(dataset_config)
|
||||
milvus_instance = MilvusClient(host=args.host, port=args.port)
|
||||
runner.run(milvus_instance, dataset_config, hdf5_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -2,3 +2,8 @@ numpy==1.16.3
|
||||
pymilvus>=0.2.0
|
||||
scikit-learn==0.19.1
|
||||
h5py==2.7.1
|
||||
influxdb==5.2.2
|
||||
pyyaml==3.12
|
||||
tableprint==0.8.0
|
||||
ansicolors==1.1.8
|
||||
scipy==1.3.1
|
162
tests/milvus_ann_acc/runner.py
Normal file
162
tests/milvus_ann_acc/runner.py
Normal file
@ -0,0 +1,162 @@
|
||||
import os
|
||||
import pdb
|
||||
import time
|
||||
import random
|
||||
import sys
|
||||
import logging
|
||||
import h5py
|
||||
import numpy
|
||||
from influxdb import InfluxDBClient
|
||||
|
||||
INSERT_INTERVAL = 100000
|
||||
# s
|
||||
DELETE_INTERVAL_TIME = 5
|
||||
INFLUXDB_HOST = "192.168.1.194"
|
||||
INFLUXDB_PORT = 8086
|
||||
INFLUXDB_USER = "admin"
|
||||
INFLUXDB_PASSWD = "admin"
|
||||
INFLUXDB_NAME = "test_result"
|
||||
influxdb_client = InfluxDBClient(host=INFLUXDB_HOST, port=INFLUXDB_PORT, username=INFLUXDB_USER, password=INFLUXDB_PASSWD, database=INFLUXDB_NAME)
|
||||
|
||||
logger = logging.getLogger("milvus_acc.runner")
|
||||
|
||||
|
||||
def parse_dataset_name(dataset_name):
|
||||
data_type = dataset_name.split("-")[0]
|
||||
dimension = int(dataset_name.split("-")[1])
|
||||
metric = dataset_name.split("-")[-1]
|
||||
# metric = dataset.attrs['distance']
|
||||
# dimension = len(dataset["train"][0])
|
||||
if metric == "euclidean":
|
||||
metric_type = "l2"
|
||||
elif metric == "angular":
|
||||
metric_type = "ip"
|
||||
return ("ann"+data_type, dimension, metric_type)
|
||||
|
||||
|
||||
def get_dataset(hdf5_path, dataset_name):
|
||||
file_path = os.path.join(hdf5_path, '%s.hdf5' % dataset_name)
|
||||
if not os.path.exists(file_path):
|
||||
raise Exception("%s not existed" % file_path)
|
||||
dataset = h5py.File(file_path)
|
||||
return dataset
|
||||
|
||||
|
||||
def get_table_name(hdf5_path, dataset_name, index_file_size):
|
||||
data_type, dimension, metric_type = parse_dataset_name(dataset_name)
|
||||
dataset = get_dataset(hdf5_path, dataset_name)
|
||||
table_size = len(dataset["train"])
|
||||
table_size = str(table_size // 1000000)+"m"
|
||||
table_name = data_type+'_'+table_size+'_'+str(index_file_size)+'_'+str(dimension)+'_'+metric_type
|
||||
return table_name
|
||||
|
||||
|
||||
def recall_calc(result_ids, true_ids, top_k, recall_k):
|
||||
sum_intersect_num = 0
|
||||
recall = 0.0
|
||||
for index, result_item in enumerate(result_ids):
|
||||
if len(set(true_ids[index][:top_k])) != len(set(result_item)):
|
||||
logger.warning("Error happened: query result length is wrong")
|
||||
continue
|
||||
tmp = set(true_ids[index][:recall_k]).intersection(set(result_item))
|
||||
sum_intersect_num = sum_intersect_num + len(tmp)
|
||||
recall = round(sum_intersect_num / (len(result_ids) * recall_k), 4)
|
||||
return recall
|
||||
|
||||
|
||||
def run(milvus, config, hdf5_path, force=True):
|
||||
server_version = milvus.get_server_version()
|
||||
logger.info(server_version)
|
||||
|
||||
for dataset_name, config_value in config.items():
|
||||
dataset = get_dataset(hdf5_path, dataset_name)
|
||||
index_file_sizes = config_value["index_file_sizes"]
|
||||
index_types = config_value["index_types"]
|
||||
nlists = config_value["nlists"]
|
||||
search_param = config_value["search_param"]
|
||||
top_ks = search_param["top_ks"]
|
||||
nprobes = search_param["nprobes"]
|
||||
nqs = search_param["nqs"]
|
||||
|
||||
for index_file_size in index_file_sizes:
|
||||
table_name = get_table_name(hdf5_path, dataset_name, index_file_size)
|
||||
if milvus.exists_table(table_name):
|
||||
if force is True:
|
||||
logger.info("Re-create table: %s" % table_name)
|
||||
milvus.delete(table_name)
|
||||
time.sleep(DELETE_INTERVAL_TIME)
|
||||
else:
|
||||
logger.warning("Table name: %s existed" % table_name)
|
||||
continue
|
||||
data_type, dimension, metric_type = parse_dataset_name(dataset_name)
|
||||
milvus.create_table(table_name, dimension, index_file_size, metric_type)
|
||||
logger.info(milvus.describe())
|
||||
insert_vectors = numpy.array(dataset["train"])
|
||||
# milvus.insert(insert_vectors)
|
||||
|
||||
loops = len(insert_vectors) // INSERT_INTERVAL + 1
|
||||
for i in range(loops):
|
||||
start = i*INSERT_INTERVAL
|
||||
end = min((i+1)*INSERT_INTERVAL, len(insert_vectors))
|
||||
tmp_vectors = insert_vectors[start:end]
|
||||
if start < end:
|
||||
milvus.insert(tmp_vectors, ids=[i for i in range(start, end)])
|
||||
time.sleep(20)
|
||||
row_count = milvus.count()
|
||||
logger.info("Table: %s, row count: %s" % (table_name, row_count))
|
||||
if milvus.count() != len(insert_vectors):
|
||||
logger.error("Table row count is not equal to insert vectors")
|
||||
return
|
||||
for index_type in index_types:
|
||||
for nlist in nlists:
|
||||
milvus.create_index(index_type, nlist)
|
||||
logger.info(milvus.describe_index())
|
||||
logger.info("Start preload table: %s, index_type: %s, nlist: %s" % (table_name, index_type, nlist))
|
||||
milvus.preload_table()
|
||||
true_ids = numpy.array(dataset["neighbors"])
|
||||
for nprobe in nprobes:
|
||||
for nq in nqs:
|
||||
query_vectors = numpy.array(dataset["test"][:nq])
|
||||
for top_k in top_ks:
|
||||
rec1 = 0.0
|
||||
rec10 = 0.0
|
||||
rec100 = 0.0
|
||||
result_ids = milvus.query(query_vectors, top_k, nprobe)
|
||||
logger.info("Query result: %s" % len(result_ids))
|
||||
rec1 = recall_calc(result_ids, true_ids, top_k, 1)
|
||||
if top_k == 10:
|
||||
rec10 = recall_calc(result_ids, true_ids, top_k, 10)
|
||||
if top_k == 100:
|
||||
rec10 = recall_calc(result_ids, true_ids, top_k, 10)
|
||||
rec100 = recall_calc(result_ids, true_ids, top_k, 100)
|
||||
avg_radio = recall_calc(result_ids, true_ids, top_k, top_k)
|
||||
logger.debug("Recall_1: %s" % rec1)
|
||||
logger.debug("Recall_10: %s" % rec10)
|
||||
logger.debug("Recall_100: %s" % rec100)
|
||||
logger.debug("Accuracy: %s" % avg_radio)
|
||||
acc_record = [{
|
||||
"measurement": "accuracy",
|
||||
"tags": {
|
||||
"server_version": server_version,
|
||||
"dataset": dataset_name,
|
||||
"index_file_size": index_file_size,
|
||||
"index_type": index_type,
|
||||
"nlist": nlist,
|
||||
"search_nprobe": nprobe,
|
||||
"top_k": top_k,
|
||||
"nq": len(query_vectors)
|
||||
},
|
||||
# "time": time.ctime(),
|
||||
"time": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||
"fields": {
|
||||
"recall1": rec1,
|
||||
"recall10": rec10,
|
||||
"recall100": rec100,
|
||||
"avg_radio": avg_radio
|
||||
}
|
||||
}]
|
||||
logger.info(acc_record)
|
||||
try:
|
||||
res = influxdb_client.write_points(acc_record)
|
||||
except Exception as e:
|
||||
logger.error("Insert infuxdb failed: %s" % str(e))
|
29
tests/milvus_ann_acc/suite.yaml
Normal file
29
tests/milvus_ann_acc/suite.yaml
Normal file
@ -0,0 +1,29 @@
|
||||
datasets:
|
||||
- sift-128-euclidean:
|
||||
index_file_sizes: [50, 1024]
|
||||
index_types: ['ivf_flat', 'ivf_sq8', 'ivf_sq8h']
|
||||
# index_types: ['ivf_sq8']
|
||||
nlists: [16384]
|
||||
search_param:
|
||||
nprobes: [1, 32, 128, 256]
|
||||
top_ks: [10]
|
||||
nqs: [10000]
|
||||
- glove-25-angular:
|
||||
index_file_sizes: [50, 1024]
|
||||
index_types: ['ivf_flat', 'ivf_sq8', 'ivf_sq8h']
|
||||
# index_types: ['ivf_sq8']
|
||||
nlists: [16384]
|
||||
search_param:
|
||||
nprobes: [1, 32, 128, 256]
|
||||
top_ks: [10]
|
||||
nqs: [10000]
|
||||
- glove-200-angular:
|
||||
index_file_sizes: [50, 1024]
|
||||
index_types: ['ivf_flat', 'ivf_sq8', 'ivf_sq8h']
|
||||
# index_types: ['ivf_sq8']
|
||||
nlists: [16384]
|
||||
search_param:
|
||||
nprobes: [1, 32, 128, 256]
|
||||
top_ks: [10]
|
||||
nqs: [10000]
|
||||
hdf5_path: /test/milvus/ann_hdf5/
|
11
tests/milvus_ann_acc/suite.yaml.bak
Normal file
11
tests/milvus_ann_acc/suite.yaml.bak
Normal file
@ -0,0 +1,11 @@
|
||||
datasets:
|
||||
- glove-200-angular:
|
||||
index_file_sizes: [1024]
|
||||
index_types: ['ivf_sq8']
|
||||
# index_types: ['ivf_sq8']
|
||||
nlists: [16384]
|
||||
search_param:
|
||||
nprobes: [256, 400, 256]
|
||||
top_ks: [100]
|
||||
nqs: [10000]
|
||||
hdf5_path: /test/milvus/ann_hdf5/
|
20
tests/milvus_ann_acc/suite_czr.yaml
Normal file
20
tests/milvus_ann_acc/suite_czr.yaml
Normal file
@ -0,0 +1,20 @@
|
||||
datasets:
|
||||
- sift-128-euclidean:
|
||||
index_file_sizes: [1024]
|
||||
index_types: ['ivf_sq8', 'ivf_sq8h']
|
||||
# index_types: ['ivf_sq8']
|
||||
nlists: [16384]
|
||||
search_param:
|
||||
nprobes: [16, 128, 1024]
|
||||
top_ks: [1, 10, 100]
|
||||
nqs: [10, 100, 1000]
|
||||
- glove-200-angular:
|
||||
index_file_sizes: [1024]
|
||||
index_types: ['ivf_sq8', 'ivf_sq8h']
|
||||
# index_types: ['ivf_sq8']
|
||||
nlists: [16384]
|
||||
search_param:
|
||||
nprobes: [16, 128, 1024]
|
||||
top_ks: [1, 10, 100]
|
||||
nqs: [10, 100, 1000]
|
||||
hdf5_path: /test/milvus/ann_hdf5/
|
10
tests/milvus_ann_acc/suite_debug.yaml
Normal file
10
tests/milvus_ann_acc/suite_debug.yaml
Normal file
@ -0,0 +1,10 @@
|
||||
datasets:
|
||||
- sift-128-euclidean:
|
||||
index_file_sizes: [1024]
|
||||
index_types: ['ivf_flat']
|
||||
nlists: [16384]
|
||||
search_param:
|
||||
nprobes: [1, 256]
|
||||
top_ks: [10]
|
||||
nqs: [10000]
|
||||
hdf5_path: /test/milvus/ann_hdf5/
|
@ -1,132 +1,33 @@
|
||||
import os
|
||||
import pdb
|
||||
import time
|
||||
import random
|
||||
import sys
|
||||
import h5py
|
||||
import numpy
|
||||
import logging
|
||||
from logging import handlers
|
||||
from influxdb import InfluxDBClient
|
||||
|
||||
from client import MilvusClient
|
||||
INFLUXDB_HOST = "192.168.1.194"
|
||||
INFLUXDB_PORT = 8086
|
||||
INFLUXDB_USER = "admin"
|
||||
INFLUXDB_PASSWD = "admin"
|
||||
INFLUXDB_NAME = "test_result"
|
||||
|
||||
LOG_FOLDER = "logs"
|
||||
logger = logging.getLogger("milvus_ann_acc")
|
||||
client = InfluxDBClient(host=INFLUXDB_HOST, port=INFLUXDB_PORT, username=INFLUXDB_USER, password=INFLUXDB_PASSWD, database=INFLUXDB_NAME)
|
||||
|
||||
formatter = logging.Formatter('[%(asctime)s] [%(levelname)-4s] [%(pathname)s:%(lineno)d] %(message)s')
|
||||
if not os.path.exists(LOG_FOLDER):
|
||||
os.system('mkdir -p %s' % LOG_FOLDER)
|
||||
fileTimeHandler = handlers.TimedRotatingFileHandler(os.path.join(LOG_FOLDER, 'acc'), "D", 1, 10)
|
||||
fileTimeHandler.suffix = "%Y%m%d.log"
|
||||
fileTimeHandler.setFormatter(formatter)
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
fileTimeHandler.setFormatter(formatter)
|
||||
logger.addHandler(fileTimeHandler)
|
||||
|
||||
|
||||
def get_dataset_fn(dataset_name):
|
||||
file_path = "/test/milvus/ann_hdf5/"
|
||||
if not os.path.exists(file_path):
|
||||
raise Exception("%s not exists" % file_path)
|
||||
return os.path.join(file_path, '%s.hdf5' % dataset_name)
|
||||
|
||||
|
||||
def get_dataset(dataset_name):
|
||||
hdf5_fn = get_dataset_fn(dataset_name)
|
||||
hdf5_f = h5py.File(hdf5_fn)
|
||||
return hdf5_f
|
||||
|
||||
|
||||
def parse_dataset_name(dataset_name):
|
||||
data_type = dataset_name.split("-")[0]
|
||||
dimension = int(dataset_name.split("-")[1])
|
||||
metric = dataset_name.split("-")[-1]
|
||||
# metric = dataset.attrs['distance']
|
||||
# dimension = len(dataset["train"][0])
|
||||
if metric == "euclidean":
|
||||
metric_type = "l2"
|
||||
elif metric == "angular":
|
||||
metric_type = "ip"
|
||||
return ("ann"+data_type, dimension, metric_type)
|
||||
|
||||
|
||||
def get_table_name(dataset_name, index_file_size):
|
||||
data_type, dimension, metric_type = parse_dataset_name(dataset_name)
|
||||
dataset = get_dataset(dataset_name)
|
||||
table_size = len(dataset["train"])
|
||||
table_size = str(table_size // 1000000)+"m"
|
||||
table_name = data_type+'_'+table_size+'_'+str(index_file_size)+'_'+str(dimension)+'_'+metric_type
|
||||
return table_name
|
||||
|
||||
|
||||
def main(dataset_name, index_file_size, nlist=16384, force=False):
|
||||
top_k = 10
|
||||
nprobes = [32, 128]
|
||||
|
||||
dataset = get_dataset(dataset_name)
|
||||
table_name = get_table_name(dataset_name, index_file_size)
|
||||
m = MilvusClient(table_name)
|
||||
if m.exists_table():
|
||||
if force is True:
|
||||
logger.info("Re-create table: %s" % table_name)
|
||||
m.delete()
|
||||
time.sleep(10)
|
||||
else:
|
||||
logger.info("Table name: %s existed" % table_name)
|
||||
return
|
||||
data_type, dimension, metric_type = parse_dataset_name(dataset_name)
|
||||
m.create_table(table_name, dimension, index_file_size, metric_type)
|
||||
print(m.describe())
|
||||
vectors = numpy.array(dataset["train"])
|
||||
query_vectors = numpy.array(dataset["test"])
|
||||
# m.insert(vectors)
|
||||
|
||||
interval = 100000
|
||||
loops = len(vectors) // interval + 1
|
||||
|
||||
for i in range(loops):
|
||||
start = i*interval
|
||||
end = min((i+1)*interval, len(vectors))
|
||||
tmp_vectors = vectors[start:end]
|
||||
if start < end:
|
||||
m.insert(tmp_vectors, ids=[i for i in range(start, end)])
|
||||
|
||||
time.sleep(60)
|
||||
print(m.count())
|
||||
|
||||
for index_type in ["ivf_flat", "ivf_sq8", "ivf_sq8h"]:
|
||||
m.create_index(index_type, nlist)
|
||||
print(m.describe_index())
|
||||
if m.count() != len(vectors):
|
||||
return
|
||||
m.preload_table()
|
||||
true_ids = numpy.array(dataset["neighbors"])
|
||||
for nprobe in nprobes:
|
||||
print("nprobe: %s" % nprobe)
|
||||
sum_radio = 0.0; avg_radio = 0.0
|
||||
result_ids = m.query(query_vectors, top_k, nprobe)
|
||||
# print(result_ids[:10])
|
||||
for index, result_item in enumerate(result_ids):
|
||||
if len(set(true_ids[index][:top_k])) != len(set(result_item)):
|
||||
logger.info("Error happened")
|
||||
# logger.info(query_vectors[index])
|
||||
# logger.info(true_ids[index][:top_k], result_item)
|
||||
tmp = set(true_ids[index][:top_k]).intersection(set(result_item))
|
||||
sum_radio = sum_radio + (len(tmp) / top_k)
|
||||
avg_radio = round(sum_radio / len(result_ids), 4)
|
||||
logger.info(avg_radio)
|
||||
m.drop_index()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("glove-25-angular")
|
||||
# main("sift-128-euclidean", 1024, force=True)
|
||||
for index_file_size in [50, 1024]:
|
||||
print("Index file size: %d" % index_file_size)
|
||||
main("glove-25-angular", index_file_size, force=True)
|
||||
|
||||
print("sift-128-euclidean")
|
||||
for index_file_size in [50, 1024]:
|
||||
print("Index file size: %d" % index_file_size)
|
||||
main("sift-128-euclidean", index_file_size, force=True)
|
||||
# m = MilvusClient()
|
||||
print(client.get_list_database())
|
||||
acc_record = [{
|
||||
"measurement": "accuracy",
|
||||
"tags": {
|
||||
"server_version": "0.4.3",
|
||||
"dataset": "test",
|
||||
"index_type": "test",
|
||||
"nlist": 12,
|
||||
"search_nprobe": 12,
|
||||
"top_k": 1,
|
||||
"nq": 1
|
||||
},
|
||||
"time": time.ctime(),
|
||||
"fields": {
|
||||
"accuracy": 0.1
|
||||
}
|
||||
}]
|
||||
try:
|
||||
res = client.write_points(acc_record)
|
||||
print(res)
|
||||
except Exception as e:
|
||||
print(str(e))
|
@ -149,7 +149,8 @@ class TestConnect:
|
||||
milvus.connect(uri=uri_value, timeout=1)
|
||||
assert not milvus.connected()
|
||||
|
||||
def test_connect_with_multiprocess(self, args):
|
||||
# disable
|
||||
def _test_connect_with_multiprocess(self, args):
|
||||
'''
|
||||
target: test uri connect with multiprocess
|
||||
method: set correct uri, test with multiprocessing connecting
|
||||
|
@ -25,7 +25,8 @@ index_params = {'index_type': IndexType.IVFLAT, 'nlist': 16384}
|
||||
|
||||
class TestMixBase:
|
||||
|
||||
def test_search_during_createIndex(self, args):
|
||||
# disable
|
||||
def _test_search_during_createIndex(self, args):
|
||||
loops = 10000
|
||||
table = gen_unique_str()
|
||||
query_vecs = [vectors[0], vectors[1]]
|
||||
|
Loading…
Reference in New Issue
Block a user