2021-02-25 17:35:36 +08:00
|
|
|
|
import sys
|
|
|
|
|
import pdb
|
|
|
|
|
import random
|
|
|
|
|
import logging
|
|
|
|
|
import json
|
|
|
|
|
import time, datetime
|
|
|
|
|
import traceback
|
|
|
|
|
from multiprocessing import Process
|
2021-07-02 11:40:16 +08:00
|
|
|
|
from pymilvus import Milvus, DataType
|
2021-02-25 17:35:36 +08:00
|
|
|
|
import numpy as np
|
2021-10-14 17:10:35 +08:00
|
|
|
|
import utils as util
|
2021-07-02 11:40:16 +08:00
|
|
|
|
import config
|
|
|
|
|
from milvus_benchmark.runners import utils
|
2021-02-25 17:35:36 +08:00
|
|
|
|
|
|
|
|
|
logger = logging.getLogger("milvus_benchmark.client")
|
|
|
|
|
|
2021-09-18 17:55:50 +08:00
|
|
|
|
|
|
|
|
|
# yaml file and code file comparison table of Index parameters
|
2021-02-25 17:35:36 +08:00
|
|
|
|
INDEX_MAP = {
|
|
|
|
|
"flat": "FLAT",
|
|
|
|
|
"ivf_flat": "IVF_FLAT",
|
|
|
|
|
"ivf_sq8": "IVF_SQ8",
|
|
|
|
|
"nsg": "NSG",
|
|
|
|
|
"ivf_sq8h": "IVF_SQ8_HYBRID",
|
|
|
|
|
"ivf_pq": "IVF_PQ",
|
|
|
|
|
"hnsw": "HNSW",
|
|
|
|
|
"annoy": "ANNOY",
|
|
|
|
|
"bin_flat": "BIN_FLAT",
|
|
|
|
|
"bin_ivf_flat": "BIN_IVF_FLAT",
|
|
|
|
|
"rhnsw_pq": "RHNSW_PQ",
|
|
|
|
|
"rhnsw_sq": "RHNSW_SQ"
|
|
|
|
|
}
|
|
|
|
|
epsilon = 0.1
|
2021-07-02 11:40:16 +08:00
|
|
|
|
DEFAULT_WARM_QUERY_TOPK = 1
|
|
|
|
|
DEFAULT_WARM_QUERY_NQ = 1
|
2021-02-25 17:35:36 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def time_wrapper(func):
|
|
|
|
|
"""
|
|
|
|
|
This decorator prints the execution time for the decorated function.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def wrapper(*args, **kwargs):
|
2021-10-11 19:46:30 +08:00
|
|
|
|
"""
|
|
|
|
|
log: Specify output log
|
|
|
|
|
rps: Specify the rps of the return interface
|
|
|
|
|
"""
|
2021-02-25 17:35:36 +08:00
|
|
|
|
start = time.time()
|
|
|
|
|
# logger.debug("Milvus {} start".format(func.__name__))
|
|
|
|
|
log = kwargs.get("log", True)
|
|
|
|
|
kwargs.pop("log", None)
|
2021-10-08 18:07:06 +08:00
|
|
|
|
rps = kwargs.get("rps", False)
|
|
|
|
|
kwargs.pop("rps", None)
|
2021-02-25 17:35:36 +08:00
|
|
|
|
result = func(*args, **kwargs)
|
|
|
|
|
end = time.time()
|
|
|
|
|
if log:
|
2021-10-08 18:07:06 +08:00
|
|
|
|
_rps = round(end - start, 2)
|
|
|
|
|
logger.debug("Milvus {} run in {}s".format(func.__name__, _rps))
|
|
|
|
|
if rps is not False:
|
|
|
|
|
return result, _rps
|
2021-02-25 17:35:36 +08:00
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MilvusClient(object):
|
2021-07-02 11:40:16 +08:00
|
|
|
|
def __init__(self, collection_name=None, host=None, port=None, timeout=300):
|
2021-02-25 17:35:36 +08:00
|
|
|
|
self._collection_name = collection_name
|
2021-07-02 11:40:16 +08:00
|
|
|
|
self._collection_info = None
|
2021-02-25 17:35:36 +08:00
|
|
|
|
start_time = time.time()
|
|
|
|
|
if not host:
|
2021-07-02 11:40:16 +08:00
|
|
|
|
host = config.SERVER_HOST_DEFAULT
|
2021-02-25 17:35:36 +08:00
|
|
|
|
if not port:
|
2021-07-02 11:40:16 +08:00
|
|
|
|
port = config.SERVER_PORT_DEFAULT
|
2021-02-25 17:35:36 +08:00
|
|
|
|
# retry connect remote server
|
|
|
|
|
i = 0
|
|
|
|
|
while time.time() < start_time + timeout:
|
|
|
|
|
try:
|
|
|
|
|
self._milvus = Milvus(
|
|
|
|
|
host=host,
|
|
|
|
|
port=port,
|
|
|
|
|
try_connect=False,
|
|
|
|
|
pre_ping=False)
|
|
|
|
|
break
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(str(e))
|
|
|
|
|
logger.error("Milvus connect failed: %d times" % i)
|
|
|
|
|
i = i + 1
|
2021-07-02 11:40:16 +08:00
|
|
|
|
time.sleep(30)
|
2021-02-25 17:35:36 +08:00
|
|
|
|
|
|
|
|
|
if time.time() > start_time + timeout:
|
|
|
|
|
raise Exception("Server connect timeout")
|
|
|
|
|
# self._metric_type = None
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
return 'Milvus collection %s' % self._collection_name
|
|
|
|
|
|
2021-07-02 11:40:16 +08:00
|
|
|
|
def set_collection(self, collection_name):
|
2021-10-08 17:50:56 +08:00
|
|
|
|
""" seting collection name """
|
2021-07-02 11:40:16 +08:00
|
|
|
|
self._collection_name = collection_name
|
|
|
|
|
|
|
|
|
|
# TODO: server not support
|
|
|
|
|
# def check_status(self, status):
|
|
|
|
|
# if not status.OK():
|
|
|
|
|
# logger.error(status.message)
|
|
|
|
|
# logger.error(self._milvus.server_status())
|
|
|
|
|
# logger.error(self.count())
|
|
|
|
|
# raise Exception("Status not ok")
|
2021-02-25 17:35:36 +08:00
|
|
|
|
|
|
|
|
|
def check_result_ids(self, result):
|
|
|
|
|
for index, item in enumerate(result):
|
|
|
|
|
if item[0].distance >= epsilon:
|
|
|
|
|
logger.error(index)
|
|
|
|
|
logger.error(item[0].distance)
|
|
|
|
|
raise Exception("Distance wrong")
|
|
|
|
|
|
2021-07-02 11:40:16 +08:00
|
|
|
|
@property
|
|
|
|
|
def collection_name(self):
|
|
|
|
|
return self._collection_name
|
|
|
|
|
|
2021-02-25 17:35:36 +08:00
|
|
|
|
# only support the given field name
|
|
|
|
|
def create_collection(self, dimension, data_type=DataType.FLOAT_VECTOR, auto_id=False,
|
|
|
|
|
collection_name=None, other_fields=None):
|
|
|
|
|
self._dimension = dimension
|
|
|
|
|
if not collection_name:
|
|
|
|
|
collection_name = self._collection_name
|
|
|
|
|
vec_field_name = utils.get_default_field_name(data_type)
|
2021-07-02 11:40:16 +08:00
|
|
|
|
fields = [
|
|
|
|
|
{"name": vec_field_name, "type": data_type, "params": {"dim": dimension}},
|
|
|
|
|
{"name": "id", "type": DataType.INT64, "is_primary": True}
|
|
|
|
|
]
|
2021-02-25 17:35:36 +08:00
|
|
|
|
if other_fields:
|
|
|
|
|
other_fields = other_fields.split(",")
|
2021-07-02 11:40:16 +08:00
|
|
|
|
for other_field_name in other_fields:
|
|
|
|
|
if other_field_name.startswith("int"):
|
|
|
|
|
field_type = DataType.INT64
|
|
|
|
|
elif other_field_name.startswith("float"):
|
|
|
|
|
field_type = DataType.FLOAT
|
|
|
|
|
elif other_field_name.startswith("double"):
|
|
|
|
|
field_type = DataType.DOUBLE
|
|
|
|
|
else:
|
|
|
|
|
raise Exception("Field name not supported")
|
|
|
|
|
fields.append({"name": other_field_name, "type": field_type})
|
2021-02-25 17:35:36 +08:00
|
|
|
|
create_param = {
|
|
|
|
|
"fields": fields,
|
|
|
|
|
"auto_id": auto_id}
|
|
|
|
|
try:
|
|
|
|
|
self._milvus.create_collection(collection_name, create_param)
|
|
|
|
|
logger.info("Create collection: <%s> successfully" % collection_name)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(str(e))
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def create_partition(self, tag, collection_name=None):
|
|
|
|
|
if not collection_name:
|
|
|
|
|
collection_name = self._collection_name
|
|
|
|
|
self._milvus.create_partition(collection_name, tag)
|
|
|
|
|
|
|
|
|
|
@time_wrapper
|
2021-08-16 11:06:10 +08:00
|
|
|
|
def insert(self, entities, collection_name=None, timeout=None):
|
2021-02-25 17:35:36 +08:00
|
|
|
|
tmp_collection_name = self._collection_name if collection_name is None else collection_name
|
|
|
|
|
try:
|
2021-08-16 11:06:10 +08:00
|
|
|
|
insert_res = self._milvus.insert(tmp_collection_name, entities, timeout=timeout)
|
2021-07-02 11:40:16 +08:00
|
|
|
|
return insert_res.primary_keys
|
2021-02-25 17:35:36 +08:00
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(str(e))
|
|
|
|
|
|
2021-07-23 15:36:12 +08:00
|
|
|
|
@time_wrapper
|
|
|
|
|
def insert_flush(self, entities, _async=False, collection_name=None):
|
2021-10-06 21:10:03 +08:00
|
|
|
|
# the method that included insert and flush
|
2021-07-23 15:36:12 +08:00
|
|
|
|
tmp_collection_name = self._collection_name if collection_name is None else collection_name
|
|
|
|
|
try:
|
|
|
|
|
insert_res = self._milvus.insert(tmp_collection_name, entities)
|
|
|
|
|
return insert_res.primary_keys
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(str(e))
|
|
|
|
|
self._milvus.flush([tmp_collection_name], _async=_async)
|
|
|
|
|
|
2021-02-25 17:35:36 +08:00
|
|
|
|
def get_dimension(self):
|
|
|
|
|
info = self.get_info()
|
|
|
|
|
for field in info["fields"]:
|
|
|
|
|
if field["type"] in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]:
|
|
|
|
|
return field["params"]["dim"]
|
|
|
|
|
|
|
|
|
|
def get_rand_ids(self, length):
|
|
|
|
|
segment_ids = []
|
|
|
|
|
while True:
|
|
|
|
|
stats = self.get_stats()
|
|
|
|
|
segments = stats["partitions"][0]["segments"]
|
|
|
|
|
# random choice one segment
|
|
|
|
|
segment = random.choice(segments)
|
|
|
|
|
try:
|
|
|
|
|
segment_ids = self._milvus.list_id_in_segment(self._collection_name, segment["id"])
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(str(e))
|
|
|
|
|
if not len(segment_ids):
|
|
|
|
|
continue
|
|
|
|
|
elif len(segment_ids) > length:
|
|
|
|
|
return random.sample(segment_ids, length)
|
|
|
|
|
else:
|
|
|
|
|
logger.debug("Reset length: %d" % len(segment_ids))
|
|
|
|
|
return segment_ids
|
|
|
|
|
|
|
|
|
|
# def get_rand_ids_each_segment(self, length):
|
|
|
|
|
# res = []
|
|
|
|
|
# status, stats = self._milvus.get_collection_stats(self._collection_name)
|
|
|
|
|
# self.check_status(status)
|
|
|
|
|
# segments = stats["partitions"][0]["segments"]
|
|
|
|
|
# segments_num = len(segments)
|
|
|
|
|
# # random choice from each segment
|
|
|
|
|
# for segment in segments:
|
|
|
|
|
# status, segment_ids = self._milvus.list_id_in_segment(self._collection_name, segment["name"])
|
|
|
|
|
# self.check_status(status)
|
|
|
|
|
# res.extend(segment_ids[:length])
|
|
|
|
|
# return segments_num, res
|
|
|
|
|
|
|
|
|
|
# def get_rand_entities(self, length):
|
|
|
|
|
# ids = self.get_rand_ids(length)
|
|
|
|
|
# status, get_res = self._milvus.get_entity_by_id(self._collection_name, ids)
|
|
|
|
|
# self.check_status(status)
|
|
|
|
|
# return ids, get_res
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@time_wrapper
|
|
|
|
|
def get_entities(self, get_ids):
|
|
|
|
|
get_res = self._milvus.get_entity_by_id(self._collection_name, get_ids)
|
|
|
|
|
return get_res
|
|
|
|
|
|
|
|
|
|
@time_wrapper
|
|
|
|
|
def delete(self, ids, collection_name=None):
|
2021-10-09 17:51:18 +08:00
|
|
|
|
# delete entity by id
|
2021-02-25 17:35:36 +08:00
|
|
|
|
tmp_collection_name = self._collection_name if collection_name is None else collection_name
|
|
|
|
|
self._milvus.delete_entity_by_id(tmp_collection_name, ids)
|
|
|
|
|
|
|
|
|
|
def delete_rand(self):
|
|
|
|
|
delete_id_length = random.randint(1, 100)
|
|
|
|
|
count_before = self.count()
|
|
|
|
|
logger.debug("%s: length to delete: %d" % (self._collection_name, delete_id_length))
|
|
|
|
|
delete_ids = self.get_rand_ids(delete_id_length)
|
|
|
|
|
self.delete(delete_ids)
|
|
|
|
|
self.flush()
|
|
|
|
|
logger.info("%s: count after delete: %d" % (self._collection_name, self.count()))
|
|
|
|
|
get_res = self._milvus.get_entity_by_id(self._collection_name, delete_ids)
|
|
|
|
|
for item in get_res:
|
|
|
|
|
assert not item
|
|
|
|
|
# if count_before - len(delete_ids) < self.count():
|
|
|
|
|
# logger.error(delete_ids)
|
|
|
|
|
# raise Exception("Error occured")
|
|
|
|
|
|
|
|
|
|
@time_wrapper
|
2021-08-16 11:06:10 +08:00
|
|
|
|
def flush(self, _async=False, collection_name=None, timeout=None):
|
2021-02-25 17:35:36 +08:00
|
|
|
|
tmp_collection_name = self._collection_name if collection_name is None else collection_name
|
2021-08-16 11:06:10 +08:00
|
|
|
|
self._milvus.flush([tmp_collection_name], _async=_async, timeout=timeout)
|
2021-02-25 17:35:36 +08:00
|
|
|
|
|
|
|
|
|
@time_wrapper
|
|
|
|
|
def compact(self, collection_name=None):
|
|
|
|
|
tmp_collection_name = self._collection_name if collection_name is None else collection_name
|
|
|
|
|
status = self._milvus.compact(tmp_collection_name)
|
|
|
|
|
self.check_status(status)
|
|
|
|
|
|
2021-07-02 11:40:16 +08:00
|
|
|
|
# only support "in" in expr
|
|
|
|
|
@time_wrapper
|
2021-08-16 11:06:10 +08:00
|
|
|
|
def get(self, ids, collection_name=None, timeout=None):
|
2021-07-02 11:40:16 +08:00
|
|
|
|
tmp_collection_name = self._collection_name if collection_name is None else collection_name
|
|
|
|
|
# res = self._milvus.get(tmp_collection_name, ids, output_fields=None, partition_names=None)
|
|
|
|
|
ids_expr = "id in %s" % (str(ids))
|
2021-08-16 11:06:10 +08:00
|
|
|
|
res = self._milvus.query(tmp_collection_name, ids_expr, output_fields=None, partition_names=None, timeout=timeout)
|
2021-07-02 11:40:16 +08:00
|
|
|
|
return res
|
|
|
|
|
|
2021-02-25 17:35:36 +08:00
|
|
|
|
@time_wrapper
|
|
|
|
|
def create_index(self, field_name, index_type, metric_type, _async=False, index_param=None):
|
|
|
|
|
index_type = INDEX_MAP[index_type]
|
|
|
|
|
metric_type = utils.metric_type_trans(metric_type)
|
|
|
|
|
logger.info("Building index start, collection_name: %s, index_type: %s, metric_type: %s" % (
|
|
|
|
|
self._collection_name, index_type, metric_type))
|
|
|
|
|
if index_param:
|
|
|
|
|
logger.info(index_param)
|
|
|
|
|
index_params = {
|
|
|
|
|
"index_type": index_type,
|
|
|
|
|
"metric_type": metric_type,
|
|
|
|
|
"params": index_param
|
|
|
|
|
}
|
|
|
|
|
self._milvus.create_index(self._collection_name, field_name, index_params, _async=_async)
|
|
|
|
|
|
|
|
|
|
# TODO: need to check
|
2021-07-02 11:40:16 +08:00
|
|
|
|
def describe_index(self, field_name, collection_name=None):
|
2021-02-25 17:35:36 +08:00
|
|
|
|
# stats = self.get_stats()
|
2021-07-02 11:40:16 +08:00
|
|
|
|
tmp_collection_name = self._collection_name if collection_name is None else collection_name
|
|
|
|
|
info = self._milvus.describe_index(tmp_collection_name, field_name)
|
|
|
|
|
logger.info(info)
|
|
|
|
|
index_info = {"index_type": "flat", "metric_type": None, "index_param": None}
|
|
|
|
|
if info:
|
|
|
|
|
index_info = {"index_type": info["index_type"], "metric_type": info["metric_type"], "index_param": info["params"]}
|
|
|
|
|
# transfer index type name
|
|
|
|
|
for k, v in INDEX_MAP.items():
|
|
|
|
|
if index_info['index_type'] == v:
|
|
|
|
|
index_info['index_type'] = k
|
2021-02-25 17:35:36 +08:00
|
|
|
|
return index_info
|
|
|
|
|
|
|
|
|
|
def drop_index(self, field_name):
|
|
|
|
|
logger.info("Drop index: %s" % self._collection_name)
|
|
|
|
|
return self._milvus.drop_index(self._collection_name, field_name)
|
|
|
|
|
|
|
|
|
|
@time_wrapper
|
2021-07-02 11:40:16 +08:00
|
|
|
|
def query(self, vector_query, filter_query=None, collection_name=None, timeout=300):
|
2021-10-07 18:05:05 +08:00
|
|
|
|
""" This method corresponds to the search method of milvus """
|
2021-02-25 17:35:36 +08:00
|
|
|
|
tmp_collection_name = self._collection_name if collection_name is None else collection_name
|
|
|
|
|
must_params = [vector_query]
|
|
|
|
|
if filter_query:
|
|
|
|
|
must_params.extend(filter_query)
|
|
|
|
|
query = {
|
|
|
|
|
"bool": {"must": must_params}
|
|
|
|
|
}
|
2021-07-02 11:40:16 +08:00
|
|
|
|
result = self._milvus.search(tmp_collection_name, query, timeout=timeout)
|
2021-02-25 17:35:36 +08:00
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
@time_wrapper
|
2021-07-02 11:40:16 +08:00
|
|
|
|
def warm_query(self, index_field_name, search_param, metric_type, times=2):
|
|
|
|
|
query_vectors = [[random.random() for _ in range(self._dimension)] for _ in range(DEFAULT_WARM_QUERY_NQ)]
|
|
|
|
|
# index_info = self.describe_index(index_field_name)
|
|
|
|
|
vector_query = {"vector": {index_field_name: {
|
|
|
|
|
"topk": DEFAULT_WARM_QUERY_TOPK,
|
|
|
|
|
"query": query_vectors,
|
|
|
|
|
"metric_type": metric_type,
|
|
|
|
|
"params": search_param}
|
|
|
|
|
}}
|
|
|
|
|
must_params = [vector_query]
|
|
|
|
|
query = {
|
|
|
|
|
"bool": {"must": must_params}
|
|
|
|
|
}
|
|
|
|
|
logger.debug("Start warm up query")
|
|
|
|
|
for i in range(times):
|
|
|
|
|
self._milvus.search(self._collection_name, query)
|
|
|
|
|
logger.debug("End warm up query")
|
|
|
|
|
|
|
|
|
|
@time_wrapper
|
|
|
|
|
def load_and_query(self, vector_query, filter_query=None, collection_name=None, timeout=120):
|
2021-02-25 17:35:36 +08:00
|
|
|
|
tmp_collection_name = self._collection_name if collection_name is None else collection_name
|
|
|
|
|
must_params = [vector_query]
|
|
|
|
|
if filter_query:
|
|
|
|
|
must_params.extend(filter_query)
|
|
|
|
|
query = {
|
|
|
|
|
"bool": {"must": must_params}
|
|
|
|
|
}
|
|
|
|
|
self.load_collection(tmp_collection_name)
|
2021-07-02 11:40:16 +08:00
|
|
|
|
result = self._milvus.search(tmp_collection_name, query, timeout=timeout)
|
2021-02-25 17:35:36 +08:00
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def get_ids(self, result):
|
2021-07-02 11:40:16 +08:00
|
|
|
|
# idss = result._entities.ids
|
2021-02-25 17:35:36 +08:00
|
|
|
|
ids = []
|
2021-07-02 11:40:16 +08:00
|
|
|
|
# len_idss = len(idss)
|
|
|
|
|
# len_r = len(result)
|
|
|
|
|
# top_k = len_idss // len_r
|
|
|
|
|
# for offset in range(0, len_idss, top_k):
|
|
|
|
|
# ids.append(idss[offset: min(offset + top_k, len_idss)])
|
|
|
|
|
for res in result:
|
|
|
|
|
ids.append(res.ids)
|
2021-02-25 17:35:36 +08:00
|
|
|
|
return ids
|
|
|
|
|
|
2021-08-16 11:06:10 +08:00
|
|
|
|
def query_rand(self, nq_max=100, timeout=None):
|
2021-02-25 17:35:36 +08:00
|
|
|
|
# for ivf search
|
|
|
|
|
dimension = 128
|
|
|
|
|
top_k = random.randint(1, 100)
|
|
|
|
|
nq = random.randint(1, nq_max)
|
|
|
|
|
nprobe = random.randint(1, 100)
|
|
|
|
|
search_param = {"nprobe": nprobe}
|
|
|
|
|
query_vectors = [[random.random() for _ in range(dimension)] for _ in range(nq)]
|
|
|
|
|
metric_type = random.choice(["l2", "ip"])
|
|
|
|
|
logger.info("%s, Search nq: %d, top_k: %d, nprobe: %d" % (self._collection_name, nq, top_k, nprobe))
|
|
|
|
|
vec_field_name = utils.get_default_field_name()
|
|
|
|
|
vector_query = {"vector": {vec_field_name: {
|
|
|
|
|
"topk": top_k,
|
|
|
|
|
"query": query_vectors,
|
|
|
|
|
"metric_type": utils.metric_type_trans(metric_type),
|
|
|
|
|
"params": search_param}
|
|
|
|
|
}}
|
2021-08-16 11:06:10 +08:00
|
|
|
|
self.query(vector_query, timeout=timeout)
|
2021-02-25 17:35:36 +08:00
|
|
|
|
|
2021-08-16 11:06:10 +08:00
|
|
|
|
def load_query_rand(self, nq_max=100, timeout=None):
|
2021-02-25 17:35:36 +08:00
|
|
|
|
# for ivf search
|
|
|
|
|
dimension = 128
|
|
|
|
|
top_k = random.randint(1, 100)
|
|
|
|
|
nq = random.randint(1, nq_max)
|
|
|
|
|
nprobe = random.randint(1, 100)
|
|
|
|
|
search_param = {"nprobe": nprobe}
|
|
|
|
|
query_vectors = [[random.random() for _ in range(dimension)] for _ in range(nq)]
|
|
|
|
|
metric_type = random.choice(["l2", "ip"])
|
|
|
|
|
logger.info("%s, Search nq: %d, top_k: %d, nprobe: %d" % (self._collection_name, nq, top_k, nprobe))
|
|
|
|
|
vec_field_name = utils.get_default_field_name()
|
|
|
|
|
vector_query = {"vector": {vec_field_name: {
|
|
|
|
|
"topk": top_k,
|
|
|
|
|
"query": query_vectors,
|
|
|
|
|
"metric_type": utils.metric_type_trans(metric_type),
|
|
|
|
|
"params": search_param}
|
|
|
|
|
}}
|
2021-08-16 11:06:10 +08:00
|
|
|
|
self.load_and_query(vector_query, timeout=timeout)
|
2021-02-25 17:35:36 +08:00
|
|
|
|
|
|
|
|
|
# TODO: need to check
|
|
|
|
|
def count(self, collection_name=None):
|
|
|
|
|
if collection_name is None:
|
|
|
|
|
collection_name = self._collection_name
|
|
|
|
|
row_count = self._milvus.get_collection_stats(collection_name)["row_count"]
|
|
|
|
|
logger.debug("Row count: %d in collection: <%s>" % (row_count, collection_name))
|
|
|
|
|
return row_count
|
|
|
|
|
|
|
|
|
|
def drop(self, timeout=120, collection_name=None):
|
2021-10-11 19:40:38 +08:00
|
|
|
|
"""
|
|
|
|
|
drop steps:
|
|
|
|
|
1.drop collection
|
|
|
|
|
2.check collection exist
|
|
|
|
|
3.Set timeout to exit
|
|
|
|
|
"""
|
2021-02-25 17:35:36 +08:00
|
|
|
|
timeout = int(timeout)
|
|
|
|
|
if collection_name is None:
|
|
|
|
|
collection_name = self._collection_name
|
|
|
|
|
logger.info("Start delete collection: %s" % collection_name)
|
|
|
|
|
self._milvus.drop_collection(collection_name)
|
|
|
|
|
i = 0
|
|
|
|
|
while i < timeout:
|
|
|
|
|
try:
|
2021-10-07 18:03:15 +08:00
|
|
|
|
# row_count = self.count(collection_name=collection_name)
|
|
|
|
|
# if row_count:
|
|
|
|
|
# time.sleep(1)
|
|
|
|
|
# i = i + 1
|
|
|
|
|
# continue
|
|
|
|
|
res = self._milvus.has_collection(collection_name)
|
|
|
|
|
if res:
|
2021-02-25 17:35:36 +08:00
|
|
|
|
time.sleep(1)
|
|
|
|
|
i = i + 1
|
|
|
|
|
continue
|
|
|
|
|
else:
|
|
|
|
|
break
|
|
|
|
|
except Exception as e:
|
2021-07-02 11:40:16 +08:00
|
|
|
|
logger.warning("Collection count failed: {}".format(str(e)))
|
2021-02-25 17:35:36 +08:00
|
|
|
|
break
|
|
|
|
|
if i >= timeout:
|
|
|
|
|
logger.error("Delete collection timeout")
|
|
|
|
|
|
|
|
|
|
def get_stats(self):
|
|
|
|
|
return self._milvus.get_collection_stats(self._collection_name)
|
|
|
|
|
|
|
|
|
|
def get_info(self, collection_name=None):
|
|
|
|
|
if collection_name is None:
|
|
|
|
|
collection_name = self._collection_name
|
2021-07-02 11:40:16 +08:00
|
|
|
|
return self._milvus.describe_collection(collection_name)
|
2021-02-25 17:35:36 +08:00
|
|
|
|
|
|
|
|
|
def show_collections(self):
|
|
|
|
|
return self._milvus.list_collections()
|
|
|
|
|
|
|
|
|
|
def exists_collection(self, collection_name=None):
|
|
|
|
|
if collection_name is None:
|
|
|
|
|
collection_name = self._collection_name
|
|
|
|
|
res = self._milvus.has_collection(collection_name)
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
def clean_db(self):
|
|
|
|
|
collection_names = self.show_collections()
|
|
|
|
|
for name in collection_names:
|
|
|
|
|
self.drop(collection_name=name)
|
|
|
|
|
|
|
|
|
|
@time_wrapper
|
2021-07-23 15:36:12 +08:00
|
|
|
|
def load_collection(self, collection_name=None, timeout=3000):
|
2021-02-25 17:35:36 +08:00
|
|
|
|
if collection_name is None:
|
|
|
|
|
collection_name = self._collection_name
|
2021-07-23 15:36:12 +08:00
|
|
|
|
return self._milvus.load_collection(collection_name, timeout=timeout)
|
2021-02-25 17:35:36 +08:00
|
|
|
|
|
|
|
|
|
@time_wrapper
|
2021-07-23 15:36:12 +08:00
|
|
|
|
def release_collection(self, collection_name=None, timeout=3000):
|
2021-02-25 17:35:36 +08:00
|
|
|
|
if collection_name is None:
|
|
|
|
|
collection_name = self._collection_name
|
2021-07-23 15:36:12 +08:00
|
|
|
|
return self._milvus.release_collection(collection_name, timeout=timeout)
|
2021-02-25 17:35:36 +08:00
|
|
|
|
|
|
|
|
|
@time_wrapper
|
2021-07-23 15:36:12 +08:00
|
|
|
|
def load_partitions(self, tag_names, collection_name=None, timeout=3000):
|
2021-02-25 17:35:36 +08:00
|
|
|
|
if collection_name is None:
|
|
|
|
|
collection_name = self._collection_name
|
2021-07-23 15:36:12 +08:00
|
|
|
|
return self._milvus.load_partitions(collection_name, tag_names, timeout=timeout)
|
2021-02-25 17:35:36 +08:00
|
|
|
|
|
|
|
|
|
@time_wrapper
|
2021-07-23 15:36:12 +08:00
|
|
|
|
def release_partitions(self, tag_names, collection_name=None, timeout=3000):
|
2021-02-25 17:35:36 +08:00
|
|
|
|
if collection_name is None:
|
|
|
|
|
collection_name = self._collection_name
|
2021-07-23 15:36:12 +08:00
|
|
|
|
return self._milvus.release_partitions(collection_name, tag_names, timeout=timeout)
|
2021-02-25 17:35:36 +08:00
|
|
|
|
|
2021-09-17 18:49:53 +08:00
|
|
|
|
@time_wrapper
|
|
|
|
|
def scene_test(self, collection_name=None, vectors=None, ids=None):
|
2021-10-11 19:36:46 +08:00
|
|
|
|
"""
|
|
|
|
|
Scene test steps:
|
|
|
|
|
1.create collection with the specified collection name
|
|
|
|
|
2.insert data
|
|
|
|
|
3.flush data
|
|
|
|
|
4.create index
|
|
|
|
|
5.drop collection
|
|
|
|
|
"""
|
|
|
|
|
|
2021-09-17 18:49:53 +08:00
|
|
|
|
logger.debug("[scene_test] Start scene test : %s" % collection_name)
|
|
|
|
|
self.create_collection(dimension=128, collection_name=collection_name)
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
|
|
|
|
collection_info = self.get_info(collection_name)
|
|
|
|
|
|
|
|
|
|
entities = utils.generate_entities(collection_info, vectors, ids)
|
|
|
|
|
logger.debug("[scene_test] Start insert : %s" % collection_name)
|
2021-09-22 16:37:53 +08:00
|
|
|
|
self.insert(entities, collection_name=collection_name)
|
2021-09-17 18:49:53 +08:00
|
|
|
|
logger.debug("[scene_test] Start flush : %s" % collection_name)
|
2021-10-07 17:40:23 +08:00
|
|
|
|
self.flush(collection_name=collection_name)
|
2021-09-17 18:49:53 +08:00
|
|
|
|
|
|
|
|
|
logger.debug("[scene_test] Start create index : %s" % collection_name)
|
|
|
|
|
self.create_index(field_name='float_vector', index_type="ivf_sq8", metric_type='l2',
|
|
|
|
|
collection_name=collection_name, index_param={'nlist': 2048})
|
|
|
|
|
# time.sleep(59)
|
|
|
|
|
|
|
|
|
|
logger.debug("[scene_test] Start drop : %s" % collection_name)
|
|
|
|
|
self.drop(collection_name=collection_name)
|
|
|
|
|
logger.debug("[scene_test]Scene test close : %s" % collection_name)
|
|
|
|
|
# time.sleep(1)
|
|
|
|
|
|
2021-02-25 17:35:36 +08:00
|
|
|
|
# TODO: remove
|
|
|
|
|
# def get_server_version(self):
|
|
|
|
|
# return self._milvus.server_version()
|
|
|
|
|
|
|
|
|
|
# def get_server_mode(self):
|
|
|
|
|
# return self.cmd("mode")
|
|
|
|
|
|
|
|
|
|
# def get_server_commit(self):
|
|
|
|
|
# return self.cmd("build_commit_id")
|
|
|
|
|
|
|
|
|
|
# def get_server_config(self):
|
|
|
|
|
# return json.loads(self.cmd("get_milvus_config"))
|
|
|
|
|
|
|
|
|
|
# def get_mem_info(self):
|
|
|
|
|
# result = json.loads(self.cmd("get_system_info"))
|
|
|
|
|
# result_human = {
|
|
|
|
|
# # unit: Gb
|
|
|
|
|
# "memory_used": round(int(result["memory_used"]) / (1024 * 1024 * 1024), 2)
|
|
|
|
|
# }
|
|
|
|
|
# return result_human
|
|
|
|
|
|
|
|
|
|
# def cmd(self, command):
|
|
|
|
|
# res = self._milvus._cmd(command)
|
|
|
|
|
# logger.info("Server command: %s, result: %s" % (command, res))
|
|
|
|
|
# return res
|
|
|
|
|
|
|
|
|
|
# @time_wrapper
|
|
|
|
|
# def set_config(self, parent_key, child_key, value):
|
|
|
|
|
# self._milvus.set_config(parent_key, child_key, value)
|
|
|
|
|
|
|
|
|
|
# def get_config(self, key):
|
|
|
|
|
# return self._milvus.get_config(key)
|