milvus/tests/python_client/testcases/test_high_level_api.py

312 lines
15 KiB
Python
Raw Normal View History

import multiprocessing
import numbers
import random
import numpy
import threading
import pytest
import pandas as pd
import decimal
from decimal import Decimal, getcontext
from time import sleep
import heapq
from base.client_base import TestcaseBase
from utils.util_log import test_log as log
from common import common_func as cf
from common import common_type as ct
from common.common_type import CaseLabel, CheckTasks
from utils.util_pymilvus import *
from common.constants import *
from pymilvus.orm.types import CONSISTENCY_STRONG, CONSISTENCY_BOUNDED, CONSISTENCY_SESSION, CONSISTENCY_EVENTUALLY
from base.high_level_api_wrapper import HighLevelApiWrapper
client_w = HighLevelApiWrapper()
prefix = "high_level_api"
epsilon = ct.epsilon
default_nb = ct.default_nb
default_nb_medium = ct.default_nb_medium
default_nq = ct.default_nq
default_dim = ct.default_dim
default_limit = ct.default_limit
default_search_exp = "id >= 0"
exp_res = "exp_res"
default_search_string_exp = "varchar >= \"0\""
default_search_mix_exp = "int64 >= 0 && varchar >= \"0\""
default_invaild_string_exp = "varchar >= 0"
default_json_search_exp = "json_field[\"number\"] >= 0"
perfix_expr = 'varchar like "0%"'
default_search_field = ct.default_float_vec_field_name
default_search_params = ct.default_search_params
default_primary_key_field_name = "id"
default_vector_field_name = "vector"
default_float_field_name = ct.default_float_field_name
default_bool_field_name = ct.default_bool_field_name
default_string_field_name = ct.default_string_field_name
class TestHighLevelApi(TestcaseBase):
""" Test case of search interface """
@pytest.fixture(scope="function", params=[False, True])
def auto_id(self, request):
yield request.param
@pytest.fixture(scope="function", params=["COSINE", "L2"])
def metric_type(self, request):
yield request.param
"""
******************************************************************
# The following are invalid base cases
******************************************************************
"""
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.xfail(reason="pymilvus issue 1554")
def test_high_level_collection_invalid_primary_field(self):
"""
target: test high level api: client.create_collection
method: create collection with invalid primary field
expected: Raise exception
"""
client = self._connect(enable_high_level_api=True)
collection_name = cf.gen_unique_str(prefix)
# 1. create collection
error = {ct.err_code: 1, ct.err_msg: f"Param id_type must be int or string"}
client_w.create_collection(client, collection_name, default_dim, id_type="invalid",
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_high_level_collection_string_auto_id(self):
"""
target: test high level api: client.create_collection
method: create collection with auto id on string primary key
expected: Raise exception
"""
client = self._connect(enable_high_level_api=True)
collection_name = cf.gen_unique_str(prefix)
# 1. create collection
error = {ct.err_code: 1, ct.err_msg: f"The auto_id can only be specified on field with DataType.INT64"}
client_w.create_collection(client, collection_name, default_dim, id_type="string", auto_id=True,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_high_level_create_same_collection_different_params(self):
"""
target: test high level api: client.create_collection
method: create
expected: 1. Successfully to create collection with same params
2. Report errors for creating collection with same name and different params
"""
client = self._connect(enable_high_level_api=True)
collection_name = cf.gen_unique_str(prefix)
# 1. create collection
client_w.create_collection(client, collection_name, default_dim)
# 2. create collection with same params
client_w.create_collection(client, collection_name, default_dim)
# 3. create collection with same name and different params
error = {ct.err_code: 1, ct.err_msg: f"create duplicate collection with different parameters, "
f"collection: {collection_name}"}
client_w.create_collection(client, collection_name, default_dim+1,
check_task=CheckTasks.err_res, check_items=error)
client_w.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_high_level_collection_invalid_metric_type(self):
"""
target: test high level api: client.create_collection
method: create collection with auto id on string primary key
expected: Raise exception
"""
client = self._connect(enable_high_level_api=True)
collection_name = cf.gen_unique_str(prefix)
# 1. create collection
error = {ct.err_code: 1, ct.err_msg: f"metric type not found or not supported, supported: [L2 IP COSINE]"}
client_w.create_collection(client, collection_name, default_dim, metric_type="invalid",
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_high_level_search_not_consistent_metric_type(self, metric_type):
"""
target: test search with inconsistent metric type (default is IP) with that of index
method: create connection, collection, insert and search with not consistent metric type
expected: Raise exception
"""
client = self._connect(enable_high_level_api=True)
collection_name = cf.gen_unique_str(prefix)
# 1. create collection
client_w.create_collection(client, collection_name, default_dim)
# 2. search
rng = np.random.default_rng(seed=19530)
vectors_to_search = rng.random((1, 8))
search_params = {"metric_type": metric_type}
error = {ct.err_code: 1, ct.err_msg: f"metric type not match: expected=IP, actual={metric_type}"}
client_w.search(client, collection_name, vectors_to_search, limit=default_limit,
search_params=search_params,
check_task=CheckTasks.err_res, check_items=error)
client_w.drop_collection(client, collection_name)
"""
******************************************************************
# The following are valid base cases
******************************************************************
"""
@pytest.mark.tags(CaseLabel.L1)
def test_high_level_search_query_default(self):
"""
target: test search (high level api) normal case
method: create connection, collection, insert and search
expected: search/query successfully
"""
client = self._connect(enable_high_level_api=True)
collection_name = cf.gen_unique_str(prefix)
# 1. create collection
client_w.create_collection(client, collection_name, default_dim)
collections = client_w.list_collections(client)[0]
assert collection_name in collections
client_w.describe_collection(client, collection_name,
check_task=CheckTasks.check_describe_collection_property,
check_items={"collection_name": collection_name,
"dim": default_dim})
# 2. insert
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
client_w.insert(client, collection_name, rows)
client_w.flush(client, collection_name)
assert client_w.num_entities(client, collection_name)[0] == default_nb
# 3. search
vectors_to_search = rng.random((1, default_dim))
insert_ids = [i for i in range(default_nb)]
client_w.search(client, collection_name, vectors_to_search,
check_task=CheckTasks.check_search_results,
check_items={"enable_high_level_api": True,
"nq": len(vectors_to_search),
"ids": insert_ids,
"limit": default_limit})
# 4. query
client_w.query(client, collection_name, filter=default_search_exp,
check_task=CheckTasks.check_query_results,
check_items={exp_res: rows,
"with_vec": True,
"primary_field": default_primary_key_field_name})
client_w.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="issue 25110")
def test_high_level_search_query_string(self):
"""
target: test search (high level api) for string primary key
method: create connection, collection, insert and search
expected: search/query successfully
"""
client = self._connect(enable_high_level_api=True)
collection_name = cf.gen_unique_str(prefix)
# 1. create collection
client_w.create_collection(client, collection_name, default_dim, id_type="string", max_length=ct.default_length)
client_w.describe_collection(client, collection_name,
check_task=CheckTasks.check_describe_collection_property,
check_items={"collection_name": collection_name,
"dim": default_dim,
"auto_id": auto_id})
# 2. insert
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: str(i), default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
client_w.insert(client, collection_name, rows)
client_w.flush(client, collection_name)
assert client_w.num_entities(client, collection_name)[0] == default_nb
# 3. search
vectors_to_search = rng.random((1, default_dim))
client_w.search(client, collection_name, vectors_to_search,
check_task=CheckTasks.check_search_results,
check_items={"enable_high_level_api": True,
"nq": len(vectors_to_search),
"limit": default_limit})
# 4. query
client_w.query(client, collection_name, filter=default_search_exp,
check_task=CheckTasks.check_query_results,
check_items={exp_res: rows,
"with_vec": True,
"primary_field": default_primary_key_field_name})
client_w.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_high_level_search_different_metric_types(self, metric_type, auto_id):
"""
target: test search (high level api) normal case
method: create connection, collection, insert and search
expected: search successfully with limit(topK)
"""
client = self._connect(enable_high_level_api=True)
collection_name = cf.gen_unique_str(prefix)
# 1. create collection
client_w.create_collection(client, collection_name, default_dim, metric_type=metric_type, auto_id=auto_id)
# 2. insert
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
if auto_id:
for row in rows:
row.pop(default_primary_key_field_name)
client_w.insert(client, collection_name, rows)
client_w.flush(client, collection_name)
assert client_w.num_entities(client, collection_name)[0] == default_nb
# 3. search
vectors_to_search = rng.random((1, default_dim))
search_params = {"metric_type": metric_type}
client_w.search(client, collection_name, vectors_to_search, limit=default_limit,
search_params=search_params,
output_fields=[default_primary_key_field_name],
check_task=CheckTasks.check_search_results,
check_items={"enable_high_level_api": True,
"nq": len(vectors_to_search),
"limit": default_limit})
client_w.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_high_level_delete(self):
"""
target: test delete (high level api)
method: create connection, collection, insert delete, and search
expected: search/query successfully without deleted data
"""
client = self._connect(enable_high_level_api=True)
collection_name = cf.gen_unique_str(prefix)
# 1. create collection
client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. insert
default_nb = 1000
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
pks = client_w.insert(client, collection_name, rows)[0]
client_w.flush(client, collection_name)
assert client_w.num_entities(client, collection_name)[0] == default_nb
# 3. get first primary key
first_pk_data = client_w.get(client, collection_name, pks[0:1])
# 4. delete
delete_num = 3
client_w.delete(client, collection_name, pks[0:delete_num])
# 5. search
vectors_to_search = rng.random((1, default_dim))
insert_ids = [i for i in range(default_nb)]
for insert_id in pks[0:delete_num]:
if insert_id in insert_ids:
insert_ids.remove(insert_id)
limit = default_nb - delete_num
client_w.search(client, collection_name, vectors_to_search, limit=default_nb,
check_task=CheckTasks.check_search_results,
check_items={"enable_high_level_api": True,
"nq": len(vectors_to_search),
"ids": insert_ids,
"limit": limit})
# 6. query
client_w.query(client, collection_name, filter=default_search_exp,
check_task=CheckTasks.check_query_results,
check_items={exp_res: rows[delete_num:],
"with_vec": True,
"primary_field": default_primary_key_field_name})
client_w.drop_collection(client, collection_name)