From f2c28776093957e04f87851fe14c8d96718af626 Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Thu, 21 Mar 2024 15:31:09 +0800 Subject: [PATCH] test: update restful v2 testcases (#31404) * update restful v2 test cases * add case to ci --------- Signed-off-by: zhuwenxing --- tests/restful_client_v2/api/milvus.py | 140 +- tests/restful_client_v2/base/testbase.py | 16 +- tests/restful_client_v2/conftest.py | 18 + tests/restful_client_v2/pytest.ini | 1 + .../testcases/test_collection_operations.py | 115 +- .../testcases/test_index_operation.py | 2 +- .../testcases/test_jobs_operation.py | 1565 ++++++++++++++++- .../test_restful_sdk_mix_use_scenario.py | 24 +- .../testcases/test_role_operation.py | 7 +- .../testcases/test_user_operation.py | 28 +- .../testcases/test_vector_operations.py | 1073 ++++++++++- tests/restful_client_v2/utils/utils.py | 3 + tests/scripts/ci_e2e_4am.sh | 14 +- 13 files changed, 2922 insertions(+), 84 deletions(-) diff --git a/tests/restful_client_v2/api/milvus.py b/tests/restful_client_v2/api/milvus.py index ac553149c3..fdeccbe884 100644 --- a/tests/restful_client_v2/api/milvus.py +++ b/tests/restful_client_v2/api/milvus.py @@ -5,7 +5,7 @@ import uuid from utils.util_log import test_log as logger from minio import Minio from minio.error import S3Error - +from minio.commonconfig import REPLACE, CopySource def logger_request_response(response, url, tt, headers, data, str_data, str_response, method): if len(data) > 2000: @@ -136,7 +136,59 @@ class VectorClient(Requests): return response.json() - def vector_query(self, payload, db_name="default", timeout=10): + def vector_advanced_search(self, payload, db_name="default", timeout=10): + time.sleep(1) + url = f'{self.endpoint}/v2/vectordb/entities/advanced_search' + if self.db_name is not None: + payload["dbName"] = self.db_name + if db_name != "default": + payload["dbName"] = db_name + response = self.post(url, headers=self.update_headers(), data=payload) + rsp = response.json() + if "data" in rsp and len(rsp["data"]) == 0: + t0 = time.time() + while time.time() - t0 < timeout: + response = self.post(url, headers=self.update_headers(), data=payload) + rsp = response.json() + if len(rsp["data"]) > 0: + break + time.sleep(1) + else: + response = self.post(url, headers=self.update_headers(), data=payload) + rsp = response.json() + if "data" in rsp and len(rsp["data"]) == 0: + logger.info(f"after {timeout}s, still no data") + + return response.json() + + def vector_hybrid_search(self, payload, db_name="default", timeout=10): + time.sleep(1) + url = f'{self.endpoint}/v2/vectordb/entities/hybrid_search' + if self.db_name is not None: + payload["dbName"] = self.db_name + if db_name != "default": + payload["dbName"] = db_name + response = self.post(url, headers=self.update_headers(), data=payload) + rsp = response.json() + if "data" in rsp and len(rsp["data"]) == 0: + t0 = time.time() + while time.time() - t0 < timeout: + response = self.post(url, headers=self.update_headers(), data=payload) + rsp = response.json() + if len(rsp["data"]) > 0: + break + time.sleep(1) + else: + response = self.post(url, headers=self.update_headers(), data=payload) + rsp = response.json() + if "data" in rsp and len(rsp["data"]) == 0: + logger.info(f"after {timeout}s, still no data") + + return response.json() + + + + def vector_query(self, payload, db_name="default", timeout=5): time.sleep(1) url = f'{self.endpoint}/v2/vectordb/entities/query' if self.db_name is not None: @@ -527,6 +579,7 @@ class RoleClient(Requests): self.api_key = token self.db_name = None self.headers = self.update_headers() + self.role_names = [] def update_headers(self): headers = { @@ -546,6 +599,8 @@ class RoleClient(Requests): url = f'{self.endpoint}/v2/vectordb/roles/create' response = self.post(url, headers=self.update_headers(), data=payload) res = response.json() + if res["code"] == 200: + self.role_names.append(payload["roleName"]) return res def role_describe(self, role_name): @@ -706,36 +761,63 @@ class ImportJobClient(Requests): return headers def list_import_jobs(self, payload, db_name="default"): + if self.db_name is not None: + db_name = self.db_name payload["dbName"] = db_name - data = payload + if db_name is None: + payload.pop("dbName") url = f'{self.endpoint}/v2/vectordb/jobs/import/list' - response = self.post(url, headers=self.update_headers(), data=data) - res = response.json() - return res - - def create_import_jobs(self, payload): - url = f'{self.endpoint}/v2/vectordb/jobs/import/create' response = self.post(url, headers=self.update_headers(), data=payload) res = response.json() return res - def get_import_job_progress(self, task_id): + def create_import_jobs(self, payload, db_name="default"): + if self.db_name is not None: + db_name = self.db_name + url = f'{self.endpoint}/v2/vectordb/jobs/import/create' + payload["dbName"] = db_name + response = self.post(url, headers=self.update_headers(), data=payload) + res = response.json() + return res + + def get_import_job_progress(self, job_id, db_name="default"): + if self.db_name is not None: + db_name = self.db_name payload = { - "taskID": task_id + "dbName": db_name, + "jobID": job_id } + if db_name is None: + payload.pop("dbName") + if job_id is None: + payload.pop("jobID") url = f'{self.endpoint}/v2/vectordb/jobs/import/get_progress' response = self.post(url, headers=self.update_headers(), data=payload) res = response.json() return res + def wait_import_job_completed(self, job_id): + finished = False + t0 = time.time() + rsp = self.get_import_job_progress(job_id) + while not finished: + rsp = self.get_import_job_progress(job_id) + if rsp['data']['state'] == "Completed": + finished = True + time.sleep(5) + if time.time() - t0 > 120: + break + return rsp, finished + class StorageClient(): - def __init__(self, endpoint, access_key, secret_key, bucket_name): + def __init__(self, endpoint, access_key, secret_key, bucket_name, root_path="file"): self.endpoint = endpoint self.access_key = access_key self.secret_key = secret_key self.bucket_name = bucket_name + self.root_path = root_path self.client = Minio( self.endpoint, access_key=access_key, @@ -748,3 +830,37 @@ class StorageClient(): self.client.fput_object(self.bucket_name, object_name, file_path) except S3Error as exc: logger.error("fail to copy files to minio", exc) + + def copy_file(self, src_bucket, src_object, dst_bucket, dst_object): + try: + # if dst bucket not exist, create it + if not self.client.bucket_exists(dst_bucket): + self.client.make_bucket(dst_bucket) + self.client.copy_object(dst_bucket, dst_object, CopySource(src_bucket, src_object)) + except S3Error as exc: + logger.error("fail to copy files to minio", exc) + + def get_collection_binlog(self, collection_id): + dir_list = [ + "delta_log", + "insert_log" + ] + binlog_list = [] + # list objects dir/collection_id in bucket + for dir in dir_list: + prefix = f"{self.root_path}/{dir}/{collection_id}/" + objects = self.client.list_objects(self.bucket_name, prefix=prefix) + for obj in objects: + binlog_list.append(f"{self.bucket_name}/{obj.object_name}") + print(binlog_list) + return binlog_list + + +if __name__ == "__main__": + sc = StorageClient( + endpoint="10.104.19.57:9000", + access_key="minioadmin", + secret_key="minioadmin", + bucket_name="milvus-bucket" + ) + sc.get_collection_binlog("448305293023730313") diff --git a/tests/restful_client_v2/base/testbase.py b/tests/restful_client_v2/base/testbase.py index bb84c95ab2..7452058d61 100644 --- a/tests/restful_client_v2/base/testbase.py +++ b/tests/restful_client_v2/base/testbase.py @@ -27,6 +27,7 @@ class Base: collection_client = None partition_client = None index_client = None + alias_client = None user_client = None role_client = None import_job_client = None @@ -49,7 +50,7 @@ class TestBase(Base): logger.error(e) @pytest.fixture(scope="function", autouse=True) - def init_client(self, endpoint, token, minio_host): + def init_client(self, endpoint, token, minio_host, bucket_name, root_path): self.endpoint = f"{endpoint}" self.api_key = f"{token}" self.invalid_api_key = "invalid_token" @@ -61,14 +62,14 @@ class TestBase(Base): self.user_client = UserClient(self.endpoint, self.api_key) self.role_client = RoleClient(self.endpoint, self.api_key) self.import_job_client = ImportJobClient(self.endpoint, self.api_key) - self.storage_client = StorageClient(f"{minio_host}:9000", "minioadmin", "minioadmin", "milvus-bucket") + self.storage_client = StorageClient(f"{minio_host}:9000", "minioadmin", "minioadmin", bucket_name, root_path) if token is None: self.vector_client.api_key = None self.collection_client.api_key = None self.partition_client.api_key = None connections.connect(uri=endpoint, token=token) - def init_collection(self, collection_name, pk_field="id", metric_type="L2", dim=128, nb=100, batch_size=1000): + def init_collection(self, collection_name, pk_field="id", metric_type="L2", dim=128, nb=100, batch_size=1000, return_insert_id=False): # create collection schema_payload = { "collectionName": collection_name, @@ -85,6 +86,7 @@ class TestBase(Base): batch = nb // batch_size remainder = nb % batch_size data = [] + insert_ids = [] for i in range(batch): nb = batch_size data = get_data_by_payload(schema_payload, nb) @@ -96,6 +98,8 @@ class TestBase(Base): logger.debug(f"body size: {body_size / 1024 / 1024} MB") rsp = self.vector_client.vector_insert(payload) assert rsp['code'] == 200 + if return_insert_id: + insert_ids.extend(rsp['data']['insertIds']) # insert remainder data if remainder: nb = remainder @@ -106,6 +110,10 @@ class TestBase(Base): } rsp = self.vector_client.vector_insert(payload) assert rsp['code'] == 200 + if return_insert_id: + insert_ids.extend(rsp['data']['insertIds']) + if return_insert_id: + return schema_payload, data, insert_ids return schema_payload, data @@ -131,5 +139,7 @@ class TestBase(Base): def update_database(self, db_name="default"): self.create_database(db_name=db_name) + db.using_database(db_name=db_name) self.collection_client.db_name = db_name self.vector_client.db_name = db_name + self.import_job_client.db_name = db_name diff --git a/tests/restful_client_v2/conftest.py b/tests/restful_client_v2/conftest.py index d4229c53a8..8f1680c50f 100644 --- a/tests/restful_client_v2/conftest.py +++ b/tests/restful_client_v2/conftest.py @@ -6,6 +6,9 @@ def pytest_addoption(parser): parser.addoption("--endpoint", action="store", default="http://127.0.0.1:19530", help="endpoint") parser.addoption("--token", action="store", default="root:Milvus", help="token") parser.addoption("--minio_host", action="store", default="127.0.0.1", help="minio host") + parser.addoption("--bucket_name", action="store", default="milvus-bucket", help="minio bucket name") + parser.addoption("--root_path", action="store", default="file", help="minio bucket root path") + parser.addoption("--release_name", action="store", default="my-release", help="release name") @pytest.fixture @@ -21,3 +24,18 @@ def token(request): @pytest.fixture def minio_host(request): return request.config.getoption("--minio_host") + + +@pytest.fixture +def bucket_name(request): + return request.config.getoption("--bucket_name") + + +@pytest.fixture +def root_path(request): + return request.config.getoption("--root_path") + + +@pytest.fixture +def release_name(request): + return request.config.getoption("--release_name") diff --git a/tests/restful_client_v2/pytest.ini b/tests/restful_client_v2/pytest.ini index bba8d5e14e..4e5cb5bfa9 100644 --- a/tests/restful_client_v2/pytest.ini +++ b/tests/restful_client_v2/pytest.ini @@ -11,4 +11,5 @@ filterwarnings = markers = L0 : 'L0 case, high priority' L1 : 'L1 case, second priority' + L2 : 'L2 case, system level case' diff --git a/tests/restful_client_v2/testcases/test_collection_operations.py b/tests/restful_client_v2/testcases/test_collection_operations.py index aae36f081a..8e60881832 100644 --- a/tests/restful_client_v2/testcases/test_collection_operations.py +++ b/tests/restful_client_v2/testcases/test_collection_operations.py @@ -17,9 +17,8 @@ from pymilvus import ( @pytest.mark.L0 class TestCreateCollection(TestBase): - @pytest.mark.parametrize("metric_type", ["L2", "IP", "COSINE"]) @pytest.mark.parametrize("dim", [128]) - def test_create_collections_fast(self, dim, metric_type): + def test_create_collections_quick_setup(self, dim): """ target: test create collection method: create a collection with a simple schema @@ -31,7 +30,6 @@ class TestCreateCollection(TestBase): payload = { "collectionName": name, "dimension": dim, - "metricType": metric_type } logging.info(f"create collection {name} with payload: {payload}") rsp = client.collection_create(payload) @@ -44,6 +42,112 @@ class TestCreateCollection(TestBase): rsp = client.collection_describe(name) assert rsp['code'] == 200 assert rsp['data']['collectionName'] == name + assert rsp['data']['autoId'] is False + assert rsp['data']['enableDynamicField'] is True + assert "COSINE" in str(rsp['data']["indexes"]) + + @pytest.mark.parametrize("dim", [128]) + @pytest.mark.parametrize("metric_type", ["L2", "COSINE", "IP"]) + @pytest.mark.parametrize("id_type", ["Int64", "VarChar"]) + @pytest.mark.parametrize("primary_field", ["id", "url"]) + @pytest.mark.parametrize("vector_field", ["vector", "embedding"]) + def test_create_collection_quick_setup_with_custom(self, vector_field, primary_field, dim, id_type, metric_type): + """ + Insert a vector with a simple payload + """ + # create a collection + name = gen_collection_name() + collection_payload = { + "collectionName": name, + "dimension": dim, + "metricType": metric_type, + "primaryFieldName": primary_field, + "vectorFieldName": vector_field, + "idType": id_type, + } + if id_type == "VarChar": + collection_payload["params"] = {"max_length": "256"} + rsp = self.collection_client.collection_create(collection_payload) + assert rsp['code'] == 200 + rsp = self.collection_client.collection_describe(name) + logger.info(f"rsp: {rsp}") + assert rsp['code'] == 200 + assert rsp['data']['collectionName'] == name + fields = [f["name"] for f in rsp['data']['fields']] + assert primary_field in fields + assert vector_field in fields + for f in rsp['data']['fields']: + if f['name'] == primary_field: + assert f['type'] == id_type + assert f['primaryKey'] is True + for index in rsp['data']['indexes']: + assert index['metricType'] == metric_type + + def test_create_collections_with_all_params(self): + """ + target: test create collection + method: create a collection with a simple schema + expected: create collection success + """ + name = gen_collection_name() + dim = 128 + metric_type = "COSINE" + client = self.collection_client + num_shards = 2 + num_partitions = 36 + consistency_level = "Strong" + ttl_seconds = 360 + payload = { + "collectionName": name, + "enableDynamicField": True, + "params":{ + "shardsNum": f"{num_shards}", + "partitionsNum": f"{num_partitions}", + "consistencyLevel": f"{consistency_level}", + "ttlSeconds": f"{ttl_seconds}", + }, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": True, "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "json", "dataType": "JSON", "elementTypeParams": {}}, + {"fieldName": "int_array", "dataType": "Array", "elementDataType": "Int64", + "elementTypeParams": {"max_capacity": "1024"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [ + {"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": f"{metric_type}"}] + } + + logging.info(f"create collection {name} with payload: {payload}") + rsp = client.collection_create(payload) + assert rsp['code'] == 200 + rsp = client.collection_list() + + all_collections = rsp['data'] + assert name in all_collections + # describe collection by pymilvus + c = Collection(name) + res = c.describe() + logger.info(f"describe collection: {res}") + # describe collection + time.sleep(10) + rsp = client.collection_describe(name) + logger.info(f"describe collection: {rsp}") + + ttl_seconds_actual = None + for d in rsp["data"]["properties"]: + if d["key"] == "collection.ttl.seconds": + ttl_seconds_actual = int(d["value"]) + assert rsp['code'] == 200 + assert rsp['data']['collectionName'] == name + assert rsp['data']['shardsNum'] == num_shards + assert rsp['data']['partitionsNum'] == num_partitions + assert rsp['data']['consistencyLevel'] == consistency_level + assert ttl_seconds_actual == ttl_seconds + @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("enable_dynamic_field", [True, False]) @@ -369,7 +473,6 @@ class TestCreateCollection(TestBase): rsp = client.collection_describe(name) assert rsp['code'] == 200 assert rsp['data']['collectionName'] == name - assert f"FloatVector({dim})" in str(rsp['data']['fields']) def test_create_collections_concurrent_with_different_param(self): """ @@ -737,7 +840,7 @@ class TestDescribeCollection(TestBase): rsp = client.collection_describe(name) assert rsp['code'] == 200 assert rsp['data']['collectionName'] == name - assert rsp['data']['autoId'] is True + assert rsp['data']['autoId'] is False assert rsp['data']['enableDynamicField'] is True assert len(rsp['data']['indexes']) == 1 @@ -781,7 +884,7 @@ class TestDescribeCollection(TestBase): for field in rsp['data']['fields']: if field['name'] == "store_address": - assert field['PartitionKey'] is True + assert field['partitionKey'] is True if field['name'] == "reviewer_id": assert field['primaryKey'] is True assert rsp['data']['autoId'] is False diff --git a/tests/restful_client_v2/testcases/test_index_operation.py b/tests/restful_client_v2/testcases/test_index_operation.py index 3ac5ba1360..3da34e3d7c 100644 --- a/tests/restful_client_v2/testcases/test_index_operation.py +++ b/tests/restful_client_v2/testcases/test_index_operation.py @@ -242,7 +242,7 @@ class TestCreateIndex(TestBase): assert expected_index[i]['indexConfig']['index_type'] == actual_index[i]['indexType'] -@pytest.mark.L0 +@pytest.mark.L1 class TestCreateIndexNegative(TestBase): @pytest.mark.parametrize("index_type", ["BIN_FLAT", "BIN_IVF_FLAT"]) diff --git a/tests/restful_client_v2/testcases/test_jobs_operation.py b/tests/restful_client_v2/testcases/test_jobs_operation.py index 231ba856a2..29f92d54de 100644 --- a/tests/restful_client_v2/testcases/test_jobs_operation.py +++ b/tests/restful_client_v2/testcases/test_jobs_operation.py @@ -1,15 +1,1188 @@ import random import json +import subprocess import time +from sklearn import preprocessing +from pathlib import Path +import pandas as pd +import numpy as np +from pymilvus import Collection from utils.utils import gen_collection_name +from utils.util_log import test_log as logger import pytest from base.testbase import TestBase +from uuid import uuid4 -@pytest.mark.L0 -class TestJobE2E(TestBase): +@pytest.mark.L1 +class TestCreateImportJob(TestBase): - def test_job_e2e(self): + @pytest.mark.parametrize("insert_num", [5000]) + @pytest.mark.parametrize("import_task_num", [2]) + @pytest.mark.parametrize("auto_id", [True, False]) + @pytest.mark.parametrize("is_partition_key", [True, False]) + @pytest.mark.parametrize("enable_dynamic_field", [True, False]) + def test_job_e2e(self, insert_num, import_task_num, auto_id, is_partition_key, enable_dynamic_field): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_field, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": is_partition_key, "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # upload file to storage + data = [] + for i in range(insert_num): + tmp = { + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)] + } + if not auto_id: + tmp["book_id"] = i + if enable_dynamic_field: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + # dump data to file + file_name = f"bulk_insert_data_{uuid4()}.json" + file_path = f"/tmp/{file_name}" + with open(file_path, "w") as f: + json.dump(data, f) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + + # create import job + payload = { + "collectionName": name, + "files": [[file_name]], + } + for i in range(import_task_num): + rsp = self.import_job_client.create_import_jobs(payload) + # list import job + payload = { + "collectionName": name, + } + rsp = self.import_job_client.list_import_jobs(payload) + + # get import job progress + for task in rsp['data']["records"]: + task_id = task['jobId'] + finished = False + t0 = time.time() + + while not finished: + rsp = self.import_job_client.get_import_job_progress(task_id) + if rsp['data']['state'] == "Completed": + finished = True + time.sleep(5) + if time.time() - t0 > 360: + assert False, "import job timeout" + time.sleep(10) + c = Collection(name) + res = c.query( + expr="", + output_fields=["count(*)"], + ) + assert res[0]["count(*)"] == insert_num * import_task_num + # query data + payload = { + "collectionName": name, + "filter": "book_id > 0", + "outputFields": ["*"], + } + rsp = self.vector_client.vector_query(payload) + assert rsp["code"] == 200 + + @pytest.mark.parametrize("insert_num", [5000]) + @pytest.mark.parametrize("import_task_num", [1]) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("is_partition_key", [True]) + @pytest.mark.parametrize("enable_dynamic_field", [True]) + def test_import_job_with_db(self, insert_num, import_task_num, auto_id, is_partition_key, enable_dynamic_field): + self.create_database(db_name="test_job") + self.update_database(db_name="test_job") + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_field, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": is_partition_key, "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # upload file to storage + data = [] + for i in range(insert_num): + tmp = { + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)] + } + if not auto_id: + tmp["book_id"] = i + if enable_dynamic_field: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + # dump data to file + file_name = f"bulk_insert_data_{int(time.time())}.json" + file_path = f"/tmp/{file_name}" + with open(file_path, "w") as f: + json.dump(data, f) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + + # create import job + payload = { + "collectionName": name, + "files": [[file_name]], + } + for i in range(import_task_num): + rsp = self.import_job_client.create_import_jobs(payload) + # list import job + payload = { + "collectionName": name, + } + rsp = self.import_job_client.list_import_jobs(payload) + + # get import job progress + for task in rsp['data']["records"]: + task_id = task['jobId'] + finished = False + t0 = time.time() + + while not finished: + rsp = self.import_job_client.get_import_job_progress(task_id) + if rsp['data']['state'] == "Completed": + finished = True + time.sleep(5) + if time.time() - t0 > 120: + assert False, "import job timeout" + time.sleep(10) + c = Collection(name) + res = c.query( + expr="", + output_fields=["count(*)"], + ) + assert res[0]["count(*)"] == insert_num * import_task_num + # query data + payload = { + "collectionName": name, + "filter": "book_id > 0", + "outputFields": ["*"], + } + rsp = self.vector_client.vector_query(payload) + assert rsp["code"] == 200 + + @pytest.mark.parametrize("insert_num", [5000]) + @pytest.mark.parametrize("import_task_num", [1]) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("is_partition_key", [False]) + @pytest.mark.parametrize("enable_dynamic_field", [True]) + def test_import_job_with_partition(self, insert_num, import_task_num, auto_id, is_partition_key, enable_dynamic_field): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_field, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": is_partition_key, + "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # upload file to storage + data = [] + for i in range(insert_num): + tmp = { + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)] + } + if not auto_id: + tmp["book_id"] = i + if enable_dynamic_field: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + # dump data to file + file_name = f"bulk_insert_data_{int(time.time())}.json" + file_path = f"/tmp/{file_name}" + with open(file_path, "w") as f: + json.dump(data, f) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + # create partition + partition_name = "test_partition" + rsp = self.partition_client.partition_create(collection_name=name, partition_name=partition_name) + # create import job + payload = { + "collectionName": name, + "partitionName": partition_name, + "files": [[file_name]], + } + for i in range(import_task_num): + rsp = self.import_job_client.create_import_jobs(payload) + # list import job + payload = { + "collectionName": name, + } + rsp = self.import_job_client.list_import_jobs(payload) + + # get import job progress + for task in rsp['data']["records"]: + task_id = task['jobId'] + finished = False + t0 = time.time() + + while not finished: + rsp = self.import_job_client.get_import_job_progress(task_id) + if rsp['data']['state'] == "Completed": + finished = True + time.sleep(5) + if time.time() - t0 > 120: + assert False, "import job timeout" + time.sleep(10) + c = Collection(name) + res = c.query( + expr="", + output_fields=["count(*)"], + ) + logger.info(f"count in collection: {res}") + assert res[0]["count(*)"] == insert_num * import_task_num + res = c.query( + expr="", + partition_names=[partition_name], + output_fields=["count(*)"], + ) + logger.info(f"count in partition {[partition_name]}: {res}") + assert res[0]["count(*)"] == insert_num * import_task_num + # query data + payload = { + "collectionName": name, + "filter": "book_id > 0", + "outputFields": ["*"], + } + rsp = self.vector_client.vector_query(payload) + assert rsp["code"] == 200 + + + + def test_job_import_multi_json_file(self): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # upload file to storage + file_nums = 2 + file_names = [] + for file_num in range(file_nums): + data = [{ + "book_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)]} + for i in range(1000*file_num, 1000*(file_num+1))] + + # dump data to file + file_name = f"bulk_insert_data_{file_num}.json" + file_path = f"/tmp/{file_name}" + # create dir for file path + Path(file_path).parent.mkdir(parents=True, exist_ok=True) + with open(file_path, "w") as f: + json.dump(data, f) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + file_names.append([file_name]) + + # create import job + payload = { + "collectionName": name, + "files": file_names, + } + rsp = self.import_job_client.create_import_jobs(payload) + # list import job + payload = { + "collectionName": name, + } + rsp = self.import_job_client.list_import_jobs(payload) + + # get import job progress + for job in rsp['data']["records"]: + job_id = job['jobId'] + finished = False + t0 = time.time() + + while not finished: + rsp = self.import_job_client.get_import_job_progress(job_id) + if rsp['data']['state'] == "Completed": + finished = True + time.sleep(5) + if time.time() - t0 > 120: + assert False, "import job timeout" + time.sleep(10) + # assert data count + c = Collection(name) + assert c.num_entities == 2000 + # assert import data can be queried + payload = { + "collectionName": name, + "filter": f"book_id in {[i for i in range(1000)]}", + "limit": 100, + "offset": 0, + "outputFields": ["*"] + } + rsp = self.vector_client.vector_query(payload) + assert len(rsp['data']) == 100 + + def test_job_import_multi_parquet_file(self): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # upload file to storage + file_nums = 2 + file_names = [] + for file_num in range(file_nums): + data = [{ + "book_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)]} + for i in range(1000*file_num, 1000*(file_num+1))] + + # dump data to file + file_name = f"bulk_insert_data_{file_num}.parquet" + file_path = f"/tmp/{file_name}" + # create dir for file path + Path(file_path).parent.mkdir(parents=True, exist_ok=True) + df = pd.DataFrame(data) + df.to_parquet(file_path, index=False) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + file_names.append([file_name]) + + # create import job + payload = { + "collectionName": name, + "files": file_names, + } + rsp = self.import_job_client.create_import_jobs(payload) + # list import job + payload = { + "collectionName": name, + } + rsp = self.import_job_client.list_import_jobs(payload) + + # get import job progress + for job in rsp['data']["records"]: + job_id = job['jobId'] + finished = False + t0 = time.time() + + while not finished: + rsp = self.import_job_client.get_import_job_progress(job_id) + if rsp['data']['state'] == "Completed": + finished = True + time.sleep(5) + if time.time() - t0 > 120: + assert False, "import job timeout" + time.sleep(10) + # assert data count + c = Collection(name) + assert c.num_entities == 2000 + # assert import data can be queried + payload = { + "collectionName": name, + "filter": f"book_id in {[i for i in range(1000)]}", + "limit": 100, + "offset": 0, + "outputFields": ["*"] + } + rsp = self.vector_client.vector_query(payload) + assert len(rsp['data']) == 100 + + def test_job_import_multi_numpy_file(self): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # upload file to storage + file_nums = 2 + file_names = [] + for file_num in range(file_nums): + data = [{ + "book_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)]} + for i in range(1000*file_num, 1000*(file_num+1))] + + file_list = [] + # dump data to file + file_dir = f"bulk_insert_data_{file_num}" + base_file_path = f"/tmp/{file_dir}" + df = pd.DataFrame(data) + # each column is a list and convert to a npy file + for column in df.columns: + file_path = f"{base_file_path}/{column}.npy" + # create dir for file path + Path(file_path).parent.mkdir(parents=True, exist_ok=True) + file_name = f"{file_dir}/{column}.npy" + np.save(file_path, np.array(df[column].values.tolist())) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + file_list.append(file_name) + file_names.append(file_list) + # create import job + payload = { + "collectionName": name, + "files": file_names, + } + rsp = self.import_job_client.create_import_jobs(payload) + # list import job + payload = { + "collectionName": name, + } + rsp = self.import_job_client.list_import_jobs(payload) + + # get import job progress + for job in rsp['data']["records"]: + job_id = job['jobId'] + finished = False + t0 = time.time() + + while not finished: + rsp = self.import_job_client.get_import_job_progress(job_id) + if rsp['data']['state'] == "Completed": + finished = True + time.sleep(5) + if time.time() - t0 > 120: + assert False, "import job timeout" + time.sleep(10) + # assert data count + c = Collection(name) + assert c.num_entities == 2000 + # assert import data can be queried + payload = { + "collectionName": name, + "filter": f"book_id in {[i for i in range(1000)]}", + "limit": 100, + "offset": 0, + "outputFields": ["*"] + } + rsp = self.vector_client.vector_query(payload) + assert len(rsp['data']) == 100 + + def test_job_import_multi_file_type(self): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # upload file to storage + file_nums = 2 + file_names = [] + + # numpy file + for file_num in range(file_nums): + data = [{ + "book_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)]} + for i in range(1000*file_num, 1000*(file_num+1))] + + file_list = [] + # dump data to file + file_dir = f"bulk_insert_data_{file_num}" + base_file_path = f"/tmp/{file_dir}" + df = pd.DataFrame(data) + # each column is a list and convert to a npy file + for column in df.columns: + file_path = f"{base_file_path}/{column}.npy" + # create dir for file path + Path(file_path).parent.mkdir(parents=True, exist_ok=True) + file_name = f"{file_dir}/{column}.npy" + np.save(file_path, np.array(df[column].values.tolist())) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + file_list.append(file_name) + file_names.append(file_list) + # parquet file + for file_num in range(2,file_nums+2): + data = [{ + "book_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)]} + for i in range(1000*file_num, 1000*(file_num+1))] + + # dump data to file + file_name = f"bulk_insert_data_{file_num}.parquet" + file_path = f"/tmp/{file_name}" + # create dir for file path + Path(file_path).parent.mkdir(parents=True, exist_ok=True) + df = pd.DataFrame(data) + df.to_parquet(file_path, index=False) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + file_names.append([file_name]) + # json file + for file_num in range(4, file_nums+4): + data = [{ + "book_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)]} + for i in range(1000*file_num, 1000*(file_num+1))] + + # dump data to file + file_name = f"bulk_insert_data_{file_num}.json" + file_path = f"/tmp/{file_name}" + with open(file_path, "w") as f: + json.dump(data, f) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + file_names.append([file_name]) + + # create import job + payload = { + "collectionName": name, + "files": file_names, + } + rsp = self.import_job_client.create_import_jobs(payload) + # list import job + payload = { + "collectionName": name, + } + rsp = self.import_job_client.list_import_jobs(payload) + + # get import job progress + for job in rsp['data']["records"]: + job_id = job['jobId'] + finished = False + t0 = time.time() + + while not finished: + rsp = self.import_job_client.get_import_job_progress(job_id) + if rsp['data']['state'] == "Completed": + finished = True + time.sleep(5) + if time.time() - t0 > 120: + assert False, "import job timeout" + time.sleep(10) + # assert data count + c = Collection(name) + assert c.num_entities == 6000 + # assert import data can be queried + payload = { + "collectionName": name, + "filter": f"book_id in {[i for i in range(1000)]}", + "limit": 100, + "offset": 0, + "outputFields": ["*"] + } + rsp = self.vector_client.vector_query(payload) + assert len(rsp['data']) == 100 + + @pytest.mark.parametrize("insert_round", [2]) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("is_partition_key", [False]) + @pytest.mark.parametrize("enable_dynamic_schema", [True]) + @pytest.mark.parametrize("nb", [3000]) + @pytest.mark.parametrize("dim", [128]) + def test_job_import_binlog_file_type(self, nb, dim, insert_round, auto_id, + is_partition_key, enable_dynamic_schema, bucket_name, root_path): + # todo: copy binlog file to backup bucket + """ + Insert a vector with a simple payload + """ + # create a collection + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_schema, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": is_partition_key, + "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "bool", "dataType": "Bool", "elementTypeParams": {}}, + {"fieldName": "json", "dataType": "JSON", "elementTypeParams": {}}, + {"fieldName": "int_array", "dataType": "Array", "elementDataType": "Int64", + "elementTypeParams": {"max_capacity": "1024"}}, + {"fieldName": "varchar_array", "dataType": "Array", "elementDataType": "VarChar", + "elementTypeParams": {"max_capacity": "1024", "max_length": "256"}}, + {"fieldName": "bool_array", "dataType": "Array", "elementDataType": "Bool", + "elementTypeParams": {"max_capacity": "1024"}}, + {"fieldName": "text_emb", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}}, + {"fieldName": "image_emb", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}}, + ] + }, + "indexParams": [ + {"fieldName": "text_emb", "indexName": "text_emb", "metricType": "L2"}, + {"fieldName": "image_emb", "indexName": "image_emb", "metricType": "L2"} + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 200 + # create restore collection + restore_collection_name = f"{name}_restore" + payload["collectionName"] = restore_collection_name + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 200 + rsp = self.collection_client.collection_describe(name) + logger.info(f"rsp: {rsp}") + assert rsp['code'] == 200 + # insert data + for i in range(insert_round): + data = [] + for i in range(nb): + if auto_id: + tmp = { + "user_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "bool": random.choice([True, False]), + "json": {"key": i}, + "int_array": [i], + "varchar_array": [f"varchar_{i}"], + "bool_array": [random.choice([True, False])], + "text_emb": preprocessing.normalize([np.array([random.random() for _ in range(dim)])])[ + 0].tolist(), + "image_emb": preprocessing.normalize([np.array([random.random() for _ in range(dim)])])[ + 0].tolist(), + } + else: + tmp = { + "book_id": i, + "user_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "bool": random.choice([True, False]), + "json": {"key": i}, + "int_array": [i], + "varchar_array": [f"varchar_{i}"], + "bool_array": [random.choice([True, False])], + "text_emb": preprocessing.normalize([np.array([random.random() for _ in range(dim)])])[ + 0].tolist(), + "image_emb": preprocessing.normalize([np.array([random.random() for _ in range(dim)])])[ + 0].tolist(), + } + if enable_dynamic_schema: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + payload = { + "collectionName": name, + "data": data, + } + rsp = self.vector_client.vector_insert(payload) + assert rsp['code'] == 200 + assert rsp['data']['insertCount'] == nb + # flush data to generate binlog file + c = Collection(name) + c.flush() + time.sleep(2) + + # query data to make sure the data is inserted + rsp = self.vector_client.vector_query({"collectionName": name, "filter": "user_id > 0", "limit": 50}) + assert rsp['code'] == 200 + assert len(rsp['data']) == 50 + # get collection id + c = Collection(name) + res = c.describe() + collection_id = res["collection_id"] + + # create import job + payload = { + "collectionName": restore_collection_name, + "files": [[f"/{root_path}/insert_log/{collection_id}/", + # f"{bucket_name}/{root_path}/delta_log/{collection_id}/" + ]], + "options": { + "backup": "true" + } + + } + rsp = self.import_job_client.create_import_jobs(payload) + assert rsp['code'] == 200 + # list import job + payload = { + "collectionName": restore_collection_name, + } + rsp = self.import_job_client.list_import_jobs(payload) + + # get import job progress + for job in rsp['data']["records"]: + job_id = job['jobId'] + finished = False + t0 = time.time() + + while not finished: + rsp = self.import_job_client.get_import_job_progress(job_id) + if rsp['data']['state'] == "Completed": + finished = True + time.sleep(5) + if time.time() - t0 > 120: + assert False, "import job timeout" + time.sleep(10) + c_restore = Collection(restore_collection_name) + assert c.num_entities == c_restore.num_entities + + +@pytest.mark.L2 +class TestImportJobAdvance(TestBase): + def test_job_import_recovery_after_chaos(self, release_name): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # upload file to storage + file_nums = 10 + batch_size = 1000 + file_names = [] + for file_num in range(file_nums): + data = [{ + "book_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)]} + for i in range(batch_size*file_num, batch_size*(file_num+1))] + + # dump data to file + file_name = f"bulk_insert_data_{file_num}.json" + file_path = f"/tmp/{file_name}" + # create dir for file path + Path(file_path).parent.mkdir(parents=True, exist_ok=True) + with open(file_path, "w") as f: + json.dump(data, f) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + file_names.append([file_name]) + + # create import job + payload = { + "collectionName": name, + "files": file_names, + } + rsp = self.import_job_client.create_import_jobs(payload) + job_id = rsp['data']['jobId'] + # list import job + payload = { + "collectionName": name, + } + rsp = self.import_job_client.list_import_jobs(payload) + assert job_id in [job["jobId"] for job in rsp['data']["records"]] + rsp = self.import_job_client.list_import_jobs(payload) + # kill milvus by deleting pod + cmd = f"kubectl delete pod -l 'app.kubernetes.io/instance={release_name}, app.kubernetes.io/name=milvus' " + result = subprocess.run(cmd, shell=True, capture_output=True, text=True) + + output = result.stdout + return_code = result.returncode + logger.info(f"output: {output}, return_code, {return_code}") + + # get import job progress + for job in rsp['data']["records"]: + job_id = job['jobId'] + finished = False + t0 = time.time() + + while not finished: + try: + rsp = self.import_job_client.get_import_job_progress(job_id) + if rsp['data']['state'] == "Completed": + finished = True + time.sleep(5) + if time.time() - t0 > 360: + assert False, "import job timeout" + except Exception as e: + logger.error(f"get import job progress failed: {e}") + time.sleep(5) + time.sleep(10) + rsp = self.import_job_client.list_import_jobs(payload) + # assert data count + c = Collection(name) + assert c.num_entities == file_nums * batch_size + # assert import data can be queried + payload = { + "collectionName": name, + "filter": f"book_id in {[i for i in range(1000)]}", + "limit": 100, + "offset": 0, + "outputFields": ["*"] + } + rsp = self.vector_client.vector_query(payload) + assert len(rsp['data']) == 100 + + +@pytest.mark.L2 +class TestCreateImportJobAdvance(TestBase): + def test_job_import_with_multi_task_and_datanode(self, release_name): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # upload file to storage + task_num = 48 + file_nums = 1 + batch_size = 100000 + file_names = [] + for file_num in range(file_nums): + data = [{ + "book_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)]} + for i in range(batch_size*file_num, batch_size*(file_num+1))] + + # dump data to file + file_name = f"bulk_insert_data_{file_num}.json" + file_path = f"/tmp/{file_name}" + # create dir for file path + Path(file_path).parent.mkdir(parents=True, exist_ok=True) + with open(file_path, "w") as f: + json.dump(data, f) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + file_names.append([file_name]) + for i in range(task_num): + # create import job + payload = { + "collectionName": name, + "files": file_names, + } + rsp = self.import_job_client.create_import_jobs(payload) + job_id = rsp['data']['jobId'] + # list import job + payload = { + "collectionName": name, + } + rsp = self.import_job_client.list_import_jobs(payload) + assert job_id in [job["jobId"] for job in rsp['data']["records"]] + rsp = self.import_job_client.list_import_jobs(payload) + # get import job progress + for job in rsp['data']["records"]: + job_id = job['jobId'] + finished = False + t0 = time.time() + + while not finished: + try: + rsp = self.import_job_client.get_import_job_progress(job_id) + if rsp['data']['state'] == "Completed": + finished = True + time.sleep(5) + if time.time() - t0 > 360: + assert False, "import job timeout" + except Exception as e: + logger.error(f"get import job progress failed: {e}") + time.sleep(5) + time.sleep(10) + rsp = self.import_job_client.list_import_jobs(payload) + # assert data count + c = Collection(name) + assert c.num_entities == file_nums * batch_size * task_num + # assert import data can be queried + payload = { + "collectionName": name, + "filter": f"book_id in {[i for i in range(1000)]}", + "limit": 100, + "offset": 0, + "outputFields": ["*"] + } + rsp = self.vector_client.vector_query(payload) + assert len(rsp['data']) == 100 + + def test_job_import_with_extremely_large_task_num(self, release_name): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # upload file to storage + task_num = 1000 + file_nums = 2 + batch_size = 10 + file_names = [] + for file_num in range(file_nums): + data = [{ + "book_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)]} + for i in range(batch_size*file_num, batch_size*(file_num+1))] + + # dump data to file + file_name = f"bulk_insert_data_{file_num}.json" + file_path = f"/tmp/{file_name}" + # create dir for file path + Path(file_path).parent.mkdir(parents=True, exist_ok=True) + with open(file_path, "w") as f: + json.dump(data, f) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + file_names.append([file_name]) + for i in range(task_num): + # create import job + payload = { + "collectionName": name, + "files": file_names, + } + rsp = self.import_job_client.create_import_jobs(payload) + job_id = rsp['data']['jobId'] + # list import job + payload = { + "collectionName": name, + } + rsp = self.import_job_client.list_import_jobs(payload) + assert job_id in [job["jobId"] for job in rsp['data']["records"]] + rsp = self.import_job_client.list_import_jobs(payload) + # get import job progress + for job in rsp['data']["records"]: + job_id = job['jobId'] + finished = False + t0 = time.time() + + while not finished: + try: + rsp = self.import_job_client.get_import_job_progress(job_id) + if rsp['data']['state'] == "Completed": + finished = True + time.sleep(5) + if time.time() - t0 > 360: + assert False, "import job timeout" + except Exception as e: + logger.error(f"get import job progress failed: {e}") + time.sleep(5) + time.sleep(10) + rsp = self.import_job_client.list_import_jobs(payload) + # assert data count + c = Collection(name) + assert c.num_entities == file_nums * batch_size * task_num + # assert import data can be queried + payload = { + "collectionName": name, + "filter": f"book_id in {[i for i in range(1000)]}", + "limit": 100, + "offset": 0, + "outputFields": ["*"] + } + rsp = self.vector_client.vector_query(payload) + assert len(rsp['data']) == 100 + + +@pytest.mark.L1 +class TestCreateImportJobNegative(TestBase): + + def test_import_job_with_empty_files(self): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # create import job + payload = { + "collectionName": name, + "files": [[]], + } + rsp = self.import_job_client.create_import_jobs(payload) + assert rsp['code'] == 1100 and "empty" in rsp['message'] + + def test_import_job_with_non_exist_files(self): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # create import job + payload = { + "collectionName": name, + "files": [["invalid_file.json"]], + } + rsp = self.import_job_client.create_import_jobs(payload) + time.sleep(5) + rsp = self.import_job_client.get_import_job_progress(rsp['data']['jobId']) + assert rsp["data"]["state"] == "Failed" + + def test_import_job_with_non_exist_binlog_files(self): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # create import job + payload = { + "collectionName": name, + "files": [[ + f"invalid_bucket/invalid_root_path/insert_log/invalid_id/", + ]], + "options": { + "backup": "true" + } + } + rsp = self.import_job_client.create_import_jobs(payload) + assert rsp['code'] == 1100 and "invalid" in rsp['message'] + + def test_import_job_with_wrong_file_type(self): # create collection name = gen_collection_name() dim = 128 @@ -36,7 +1209,53 @@ class TestJobE2E(TestBase): for i in range(10000)] # dump data to file - file_name = "bulk_insert_data.json" + file_name = "bulk_insert_data.txt" + file_path = f"/tmp/{file_name}" + + json_data = json.dumps(data) + + # 将JSON数据保存到txt文件 + with open(file_path, 'w') as file: + file.write(json_data) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + + # create import job + payload = { + "collectionName": name, + "files": [[file_name]], + } + rsp = self.import_job_client.create_import_jobs(payload) + assert rsp['code'] == 2100 and "unexpected file type" in rsp['message'] + + def test_import_job_with_empty_rows(self): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # upload file to storage + data = [{ + "book_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)]} + for i in range(0)] + + # dump data to file + file_name = "bulk_insert_empty_data.json" file_path = f"/tmp/{file_name}" with open(file_path, "w") as f: json.dump(data, f) @@ -46,9 +1265,143 @@ class TestJobE2E(TestBase): # create import job payload = { "collectionName": name, - "files": [file_name], + "files": [[file_name]], } rsp = self.import_job_client.create_import_jobs(payload) + job_id = rsp['data']['jobId'] + # list import job + payload = { + "collectionName": name, + } + rsp = self.import_job_client.list_import_jobs(payload) + + # wait import job to be completed + res, result = self.import_job_client.wait_import_job_completed(job_id) + assert result + c = Collection(name) + assert c.num_entities == 0 + + def test_create_import_job_with_new_user(self): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + # create new user + username = "test_user" + password = "12345678" + payload = { + "userName": username, + "password": password + } + self.user_client.user_create(payload) + # try to describe collection with new user + self.collection_client.api_key = f"{username}:{password}" + try: + rsp = self.collection_client.collection_describe(collection_name=name) + logger.info(f"describe collection: {rsp}") + except Exception as e: + logger.error(f"describe collection failed: {e}") + + # upload file to storage + data = [] + auto_id = True + enable_dynamic_field = True + for i in range(1): + tmp = { + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)] + } + if not auto_id: + tmp["book_id"] = i + if enable_dynamic_field: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + + # dump data to file + file_name = f"bulk_insert_data_{int(time.time())}.json" + file_path = f"/tmp/{file_name}" + with open(file_path, "w") as f: + json.dump(data, f) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + + # create import job + payload = { + "collectionName": name, + "files": [[file_name]], + } + self.import_job_client.api_key = f"{username}:{password}" + rsp = self.import_job_client.create_import_jobs(payload) + assert rsp['code'] == 1100 and "empty" in rsp['message'] + + + + @pytest.mark.parametrize("insert_num", [5000]) + @pytest.mark.parametrize("import_task_num", [2]) + @pytest.mark.parametrize("auto_id", [True, False]) + @pytest.mark.parametrize("is_partition_key", [True, False]) + @pytest.mark.parametrize("enable_dynamic_field", [True, False]) + def test_get_job_progress_with_mismatch_db_name(self, insert_num, import_task_num, auto_id, is_partition_key, enable_dynamic_field): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_field, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": is_partition_key, "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + + # upload file to storage + data = [] + for i in range(insert_num): + tmp = { + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)] + } + if not auto_id: + tmp["book_id"] = i + if enable_dynamic_field: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + # dump data to file + file_name = f"bulk_insert_data_{int(time.time())}.json" + file_path = f"/tmp/{file_name}" + with open(file_path, "w") as f: + json.dump(data, f) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + + # create import job + payload = { + "collectionName": name, + "files": [[file_name]], + } + for i in range(import_task_num): + rsp = self.import_job_client.create_import_jobs(payload) # list import job payload = { "collectionName": name, @@ -56,26 +1409,212 @@ class TestJobE2E(TestBase): rsp = self.import_job_client.list_import_jobs(payload) # get import job progress - for task in rsp['data']: - task_id = task['taskID'] + for task in rsp['data']["records"]: + task_id = task['jobId'] finished = False t0 = time.time() while not finished: rsp = self.import_job_client.get_import_job_progress(task_id) - if rsp['data']['state'] == "ImportCompleted": + if rsp['data']['state'] == "Completed": finished = True time.sleep(5) if time.time() - t0 > 120: assert False, "import job timeout" time.sleep(10) + c = Collection(name) + res = c.query( + expr="", + output_fields=["count(*)"], + ) + assert res[0]["count(*)"] == insert_num * import_task_num # query data payload = { "collectionName": name, - "filter": f"book_id in {[i for i in range(1000)]}", - "limit": 100, - "offset": 0, - "outputFields": ["*"] + "filter": "book_id > 0", + "outputFields": ["*"], } rsp = self.vector_client.vector_query(payload) - assert len(rsp['data']) == 100 + assert rsp["code"] == 200 + + +@pytest.mark.L1 +class TestListImportJob(TestBase): + + def test_list_job_e2e(self): + # create two db + self.create_database(db_name="db1") + self.create_database(db_name="db2") + + # create collection + insert_num = 5000 + import_task_num = 2 + auto_id = True + is_partition_key = True + enable_dynamic_field = True + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_field, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": is_partition_key, "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + for db_name in ["db1", "db2"]: + rsp = self.collection_client.collection_create(payload, db_name=db_name) + + # upload file to storage + data = [] + for i in range(insert_num): + tmp = { + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)] + } + if not auto_id: + tmp["book_id"] = i + if enable_dynamic_field: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + # dump data to file + file_name = f"bulk_insert_data_{int(time.time())}.json" + file_path = f"/tmp/{file_name}" + with open(file_path, "w") as f: + json.dump(data, f) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + + # create import job + for db in ["db1", "db2"]: + payload = { + "collectionName": name, + "files": [[file_name]], + } + for i in range(import_task_num): + rsp = self.import_job_client.create_import_jobs(payload, db_name=db) + # list import job + payload = { + } + for db_name in [None, "db1", "db2", "default"]: + try: + rsp = self.import_job_client.list_import_jobs(payload, db_name=db_name) + logger.info(f"job num: {len(rsp['data']['records'])}") + except Exception as e: + logger.error(f"list import job failed: {e}") + + +@pytest.mark.L1 +class TestGetImportJobProgress(TestBase): + + def test_list_job_e2e(self): + # create two db + self.create_database(db_name="db1") + self.create_database(db_name="db2") + + # create collection + insert_num = 5000 + import_task_num = 2 + auto_id = True + is_partition_key = True + enable_dynamic_field = True + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_field, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": is_partition_key, "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + for db_name in ["db1", "db2"]: + rsp = self.collection_client.collection_create(payload, db_name=db_name) + + # upload file to storage + data = [] + for i in range(insert_num): + tmp = { + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": [random.random() for _ in range(dim)] + } + if not auto_id: + tmp["book_id"] = i + if enable_dynamic_field: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + # dump data to file + file_name = f"bulk_insert_data_{int(time.time())}.json" + file_path = f"/tmp/{file_name}" + with open(file_path, "w") as f: + json.dump(data, f) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + job_id_list = [] + # create import job + for db in ["db1", "db2"]: + payload = { + "collectionName": name, + "files": [[file_name]], + } + for i in range(import_task_num): + rsp = self.import_job_client.create_import_jobs(payload, db_name=db) + job_id_list.append(rsp['data']['jobId']) + time.sleep(5) + # get import job progress + for job_id in job_id_list: + try: + rsp = self.import_job_client.get_import_job_progress(job_id) + logger.info(f"job progress: {rsp}") + except Exception as e: + logger.error(f"get import job progress failed: {e}") + + +@pytest.mark.L1 +class TestGetImportJobProgressNegative(TestBase): + + def test_list_job_with_invalid_job_id(self): + + # get import job progress with invalid job id + job_id_list = ["invalid_job_id", None] + for job_id in job_id_list: + try: + rsp = self.import_job_client.get_import_job_progress(job_id) + logger.info(f"job progress: {rsp}") + except Exception as e: + logger.error(f"get import job progress failed: {e}") + + def test_list_job_with_job_id(self): + + # get import job progress with invalid job id + job_id_list = ["invalid_job_id", None] + for job_id in job_id_list: + try: + rsp = self.import_job_client.get_import_job_progress(job_id) + logger.info(f"job progress: {rsp}") + except Exception as e: + logger.error(f"get import job progress failed: {e}") + + def test_list_job_with_new_user(self): + # create new user + user_name = "test_user" + password = "12345678" + self.user_client.user_create({ + "userName": user_name, + "password": password, + }) + diff --git a/tests/restful_client_v2/testcases/test_restful_sdk_mix_use_scenario.py b/tests/restful_client_v2/testcases/test_restful_sdk_mix_use_scenario.py index b2f09bcbfb..97a862248a 100644 --- a/tests/restful_client_v2/testcases/test_restful_sdk_mix_use_scenario.py +++ b/tests/restful_client_v2/testcases/test_restful_sdk_mix_use_scenario.py @@ -15,8 +15,8 @@ class TestRestfulSdkCompatibility(TestBase): @pytest.mark.parametrize("dim", [128, 256]) @pytest.mark.parametrize("enable_dynamic", [True, False]) - @pytest.mark.parametrize("shard_num", [1, 2]) - def test_collection_created_by_sdk_describe_by_restful(self, dim, enable_dynamic, shard_num): + @pytest.mark.parametrize("num_shards", [1, 2]) + def test_collection_created_by_sdk_describe_by_restful(self, dim, enable_dynamic, num_shards): """ """ # 1. create collection by sdk @@ -29,7 +29,7 @@ class TestRestfulSdkCompatibility(TestBase): ] default_schema = CollectionSchema(fields=default_fields, description="test collection", enable_dynamic_field=enable_dynamic) - collection = Collection(name=name, schema=default_schema, shards_num=shard_num) + collection = Collection(name=name, schema=default_schema, num_shards=num_shards) logger.info(collection.schema) # 2. use restful to get collection info client = self.collection_client @@ -41,7 +41,7 @@ class TestRestfulSdkCompatibility(TestBase): assert rsp['data']['collectionName'] == name assert rsp['data']['enableDynamicField'] == enable_dynamic assert rsp['data']['load'] == "LoadStateNotLoad" - assert rsp['data']['shardsNum'] == shard_num + assert rsp['data']['shardsNum'] == num_shards @pytest.mark.parametrize("metric_type", ["L2", "IP", "COSINE"]) @pytest.mark.parametrize("dim", [128]) @@ -131,6 +131,9 @@ class TestRestfulSdkCompatibility(TestBase): FieldSchema(name="int64", dtype=DataType.INT64, is_primary=True), FieldSchema(name="float", dtype=DataType.FLOAT), FieldSchema(name="varchar", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="json", dtype=DataType.JSON), + FieldSchema(name="int_array", dtype=DataType.ARRAY, element_type=DataType.INT64, max_capacity=1024), + FieldSchema(name="varchar_array", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=1024, max_length=65535), FieldSchema(name="float_vector", dtype=DataType.FLOAT_VECTOR, dim=128) ] default_schema = CollectionSchema(fields=default_fields, description="test collection", @@ -142,7 +145,13 @@ class TestRestfulSdkCompatibility(TestBase): collection.load() # insert data by restful data = [ - {"int64": i, "float": i, "varchar": str(i), "float_vector": [random.random() for _ in range(dim)], "age": i} + {"int64": i, + "float": i, + "varchar": str(i), + "json": {f"key_{i}": f"value_{i}"}, + "int_array": [random.randint(0, 100) for _ in range(10)], + "varchar_array": [str(i) for _ in range(10)], + "float_vector": [random.random() for _ in range(dim)], "age": i} for i in range(nb) ] client = self.vector_client @@ -153,6 +162,7 @@ class TestRestfulSdkCompatibility(TestBase): rsp = client.vector_insert(payload) assert rsp['code'] == 200 assert rsp['data']['insertCount'] == nb + assert len(rsp['data']["insertIds"]) == nb def test_collection_create_by_sdk_search_vector_by_restful(self): """ @@ -181,7 +191,7 @@ class TestRestfulSdkCompatibility(TestBase): client = self.vector_client payload = { "collectionName": name, - "vector": [random.random() for _ in range(dim)], + "data": [[random.random() for _ in range(dim)]], "limit": 10 } # search data by restful @@ -306,7 +316,7 @@ class TestRestfulSdkCompatibility(TestBase): pk_id_list.append(item["int64"]) payload = { "collectionName": name, - "id": pk_id_list + "filter": f"int64 in {pk_id_list}" } # delete data by restful rsp = self.vector_client.vector_delete(payload) diff --git a/tests/restful_client_v2/testcases/test_role_operation.py b/tests/restful_client_v2/testcases/test_role_operation.py index 08a7e55a99..3ea6121c59 100644 --- a/tests/restful_client_v2/testcases/test_role_operation.py +++ b/tests/restful_client_v2/testcases/test_role_operation.py @@ -1,6 +1,6 @@ from utils.utils import gen_unique_str from base.testbase import TestBase - +import pytest class TestRoleE2E(TestBase): @@ -10,7 +10,7 @@ class TestRoleE2E(TestBase): all_roles = rsp['data'] # delete all roles except default roles for role in all_roles: - if role.startswith("role"): + if role.startswith("role") and role in self.role_client.role_names: payload = { "roleName": role } @@ -26,6 +26,7 @@ class TestRoleE2E(TestBase): self.role_client.role_revoke(payload) self.role_client.role_drop(payload) + @pytest.mark.L1 def test_role_e2e(self): # list role before create @@ -41,6 +42,7 @@ class TestRoleE2E(TestBase): assert role_name in rsp['data'] # describe role rsp = self.role_client.role_describe(role_name) + assert rsp['code'] == 200 # grant privilege to role payload = { "roleName": role_name, @@ -49,6 +51,7 @@ class TestRoleE2E(TestBase): "privilege": "CreateCollection" } rsp = self.role_client.role_grant(payload) + assert rsp['code'] == 200 # describe role after grant rsp = self.role_client.role_describe(role_name) privileges = [] diff --git a/tests/restful_client_v2/testcases/test_user_operation.py b/tests/restful_client_v2/testcases/test_user_operation.py index e6d421984e..570985295d 100644 --- a/tests/restful_client_v2/testcases/test_user_operation.py +++ b/tests/restful_client_v2/testcases/test_user_operation.py @@ -4,9 +4,32 @@ from base.testbase import TestBase from pymilvus import (connections) -@pytest.mark.L0 + class TestUserE2E(TestBase): + def teardown_method(self): + # because role num is limited, so we need to delete all roles after test + rsp = self.role_client.role_list() + all_roles = rsp['data'] + # delete all roles except default roles + for role in all_roles: + if role.startswith("role") and role in self.role_client.role_names: + payload = { + "roleName": role + } + # revoke privilege from role + rsp = self.role_client.role_describe(role) + for d in rsp['data']: + payload = { + "roleName": role, + "objectType": d['objectType'], + "objectName": d['objectName'], + "privilege": d['privilege'] + } + self.role_client.role_revoke(payload) + self.role_client.role_drop(payload) + + @pytest.mark.L0 def test_user_e2e(self): # list user before create @@ -43,6 +66,7 @@ class TestUserE2E(TestBase): rsp = self.user_client.user_list() assert user_name not in rsp['data'] + @pytest.mark.L1 def test_user_binding_role(self): # create user user_name = gen_unique_str("user") @@ -100,7 +124,7 @@ class TestUserE2E(TestBase): assert rsp['code'] == 200 -@pytest.mark.L0 +@pytest.mark.L1 class TestUserNegative(TestBase): def test_create_user_with_short_password(self): diff --git a/tests/restful_client_v2/testcases/test_vector_operations.py b/tests/restful_client_v2/testcases/test_vector_operations.py index ec2e6fcf06..69908d36fc 100644 --- a/tests/restful_client_v2/testcases/test_vector_operations.py +++ b/tests/restful_client_v2/testcases/test_vector_operations.py @@ -9,7 +9,7 @@ from utils.utils import gen_collection_name from utils.util_log import test_log as logger import pytest from base.testbase import TestBase -from utils.utils import (get_data_by_payload, get_common_fields_by_data, gen_vector) +from utils.utils import (gen_unique_str, get_data_by_payload, get_common_fields_by_data, gen_vector) from pymilvus import ( Collection, utility ) @@ -57,7 +57,7 @@ class TestInsertVector(TestBase): @pytest.mark.parametrize("nb", [3000]) @pytest.mark.parametrize("dim", [128]) def test_insert_entities_with_all_scalar_datatype(self, nb, dim, insert_round, auto_id, - is_partition_key, enable_dynamic_schema): + is_partition_key, enable_dynamic_schema): """ Insert a vector with a simple payload """ @@ -153,7 +153,7 @@ class TestInsertVector(TestBase): @pytest.mark.parametrize("nb", [3000]) @pytest.mark.parametrize("dim", [128]) def test_insert_entities_with_all_vector_datatype(self, nb, dim, insert_round, auto_id, - is_partition_key, enable_dynamic_schema): + is_partition_key, enable_dynamic_schema): """ Insert a vector with a simple payload """ @@ -368,14 +368,15 @@ class TestUpsertVector(TestBase): @pytest.mark.parametrize("insert_round", [2]) @pytest.mark.parametrize("nb", [3000]) @pytest.mark.parametrize("dim", [128]) - def test_upsert_vector(self, nb, dim, insert_round): + @pytest.mark.parametrize("id_type", ["Int64", "VarChar"]) + def test_upsert_vector_default(self, nb, dim, insert_round, id_type): # create a collection name = gen_collection_name() payload = { "collectionName": name, "schema": { "fields": [ - {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "book_id", "dataType": f"{id_type}", "isPrimary": True, "elementTypeParams": {"max_length": "256"}}, {"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": True, "elementTypeParams": {}}, {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, @@ -394,7 +395,7 @@ class TestUpsertVector(TestBase): data = [] for j in range(nb): tmp = { - "book_id": i * nb + j, + "book_id": i * nb + j if id_type == "Int64" else f"{i * nb + j}", "user_id": i * nb + j, "word_count": i * nb + j, "book_describe": f"book_{i * nb + j}", @@ -418,7 +419,7 @@ class TestUpsertVector(TestBase): data = [] for j in range(nb): tmp = { - "book_id": i * nb + j, + "book_id": i * nb + j if id_type == "Int64" else f"{i * nb + j}", "user_id": i * nb + j + 1, "word_count": i * nb + j + 2, "book_describe": f"book_{i * nb + j + 3}", @@ -433,11 +434,98 @@ class TestUpsertVector(TestBase): logger.info(f"body size: {body_size / 1024 / 1024} MB") rsp = self.vector_client.vector_upsert(payload) # query data to make sure the data is updated - rsp = self.vector_client.vector_query({"collectionName": name, "filter": "book_id > 0"}) + if id_type == "Int64": + rsp = self.vector_client.vector_query({"collectionName": name, "filter": "book_id > 0"}) + if id_type == "VarChar": + rsp = self.vector_client.vector_query({"collectionName": name, "filter": "book_id > '0'"}) for data in rsp['data']: - assert data['user_id'] == data['book_id'] + 1 - assert data['word_count'] == data['book_id'] + 2 - assert data['book_describe'] == f"book_{data['book_id'] + 3}" + assert data['user_id'] == int(data['book_id']) + 1 + assert data['word_count'] == int(data['book_id']) + 2 + assert data['book_describe'] == f"book_{int(data['book_id']) + 3}" + res = utility.get_query_segment_info(name) + logger.info(f"res: {res}") + + @pytest.mark.parametrize("insert_round", [2]) + @pytest.mark.parametrize("nb", [3000]) + @pytest.mark.parametrize("dim", [128]) + @pytest.mark.parametrize("id_type", ["Int64", "VarChar"]) + @pytest.mark.xfail(reason="currently not support auto_id for upsert") + def test_upsert_vector_pk_auto_id(self, nb, dim, insert_round, id_type): + # create a collection + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": True, + "fields": [ + {"fieldName": "book_id", "dataType": f"{id_type}", "isPrimary": True, "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "text_emb", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "text_emb", "indexName": "text_emb_index", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 200 + rsp = self.collection_client.collection_describe(name) + logger.info(f"rsp: {rsp}") + assert rsp['code'] == 200 + ids = [] + # insert data + for i in range(insert_round): + data = [] + for j in range(nb): + tmp = { + "book_id": i * nb + j if id_type == "Int64" else f"{i * nb + j}", + "user_id": i * nb + j, + "word_count": i * nb + j, + "book_describe": f"book_{i * nb + j}", + "text_emb": preprocessing.normalize([np.array([random.random() for i in range(dim)])])[0].tolist() + } + data.append(tmp) + payload = { + "collectionName": name, + "data": data, + } + body_size = sys.getsizeof(json.dumps(payload)) + logger.info(f"body size: {body_size / 1024 / 1024} MB") + rsp = self.vector_client.vector_insert(payload) + assert rsp['code'] == 200 + assert rsp['data']['insertCount'] == nb + ids.extend(rsp['data']['insertIds']) + c = Collection(name) + c.flush() + + # upsert data + for i in range(insert_round): + data = [] + for j in range(nb): + tmp = { + "book_id": ids[i * nb + j], + "user_id": i * nb + j + 1, + "word_count": i * nb + j + 2, + "book_describe": f"book_{i * nb + j + 3}", + "text_emb": preprocessing.normalize([np.array([random.random() for i in range(dim)])])[0].tolist() + } + data.append(tmp) + payload = { + "collectionName": name, + "data": data, + } + body_size = sys.getsizeof(json.dumps(payload)) + logger.info(f"body size: {body_size / 1024 / 1024} MB") + rsp = self.vector_client.vector_upsert(payload) + # query data to make sure the data is updated + if id_type == "Int64": + rsp = self.vector_client.vector_query({"collectionName": name, "filter": "book_id > 0"}) + if id_type == "VarChar": + rsp = self.vector_client.vector_query({"collectionName": name, "filter": "book_id > '0'"}) + for data in rsp['data']: + assert data['user_id'] == int(data['book_id']) + 1 + assert data['word_count'] == int(data['book_id']) + 2 + assert data['book_describe'] == f"book_{int(data['book_id']) + 3}" res = utility.get_query_segment_info(name) logger.info(f"res: {res}") @@ -445,7 +533,287 @@ class TestUpsertVector(TestBase): @pytest.mark.L0 class TestSearchVector(TestBase): - @pytest.mark.parametrize("metric_type", ["IP", "L2"]) + + @pytest.mark.parametrize("insert_round", [1]) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("is_partition_key", [True]) + @pytest.mark.parametrize("enable_dynamic_schema", [True]) + @pytest.mark.parametrize("nb", [3000]) + @pytest.mark.parametrize("dim", [16]) + def test_search_vector_with_all_vector_datatype(self, nb, dim, insert_round, auto_id, + is_partition_key, enable_dynamic_schema): + """ + Insert a vector with a simple payload + """ + # create a collection + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_schema, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": is_partition_key, + "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "float_vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}}, + {"fieldName": "float16_vector", "dataType": "Float16Vector", + "elementTypeParams": {"dim": f"{dim}"}}, + {"fieldName": "bfloat16_vector", "dataType": "BFloat16Vector", + "elementTypeParams": {"dim": f"{dim}"}}, + {"fieldName": "binary_vector", "dataType": "BinaryVector", "elementTypeParams": {"dim": f"{dim}"}}, + ] + }, + "indexParams": [ + {"fieldName": "float_vector", "indexName": "float_vector", "metricType": "COSINE"}, + {"fieldName": "float16_vector", "indexName": "float16_vector", "metricType": "COSINE"}, + {"fieldName": "bfloat16_vector", "indexName": "bfloat16_vector", "metricType": "COSINE"}, + {"fieldName": "binary_vector", "indexName": "binary_vector", "metricType": "HAMMING", + "indexConfig": {"index_type": "BIN_IVF_FLAT", "nlist": "512"}} + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 200 + rsp = self.collection_client.collection_describe(name) + logger.info(f"rsp: {rsp}") + assert rsp['code'] == 200 + # insert data + for i in range(insert_round): + data = [] + for i in range(nb): + if auto_id: + tmp = { + "user_id": i%100, + "word_count": i, + "book_describe": f"book_{i}", + "float_vector": gen_vector(datatype="FloatVector", dim=dim), + "float16_vector": gen_vector(datatype="Float16Vector", dim=dim), + "bfloat16_vector": gen_vector(datatype="BFloat16Vector", dim=dim), + "binary_vector": gen_vector(datatype="BinaryVector", dim=dim) + } + else: + tmp = { + "book_id": i, + "user_id": i%100, + "word_count": i, + "book_describe": f"book_{i}", + "float_vector": gen_vector(datatype="FloatVector", dim=dim), + "float16_vector": gen_vector(datatype="Float16Vector", dim=dim), + "bfloat16_vector": gen_vector(datatype="BFloat16Vector", dim=dim), + "binary_vector": gen_vector(datatype="BinaryVector", dim=dim) + } + if enable_dynamic_schema: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + payload = { + "collectionName": name, + "data": data, + } + rsp = self.vector_client.vector_insert(payload) + assert rsp['code'] == 200 + assert rsp['data']['insertCount'] == nb + # search data + payload = { + "collectionName": name, + "data": [gen_vector(datatype="FloatVector", dim=dim)], + "annsField": "float_vector", + "filter": "word_count > 100", + "groupingField": "user_id", + "outputFields": ["*"], + "searchParams": { + "metricType": "COSINE", + "params": { + "radius": "0.1", + "range_filter": "0.8" + } + }, + "limit": 100, + } + rsp = self.vector_client.vector_search(payload) + assert rsp['code'] == 200 + + @pytest.mark.parametrize("insert_round", [1]) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("is_partition_key", [True]) + @pytest.mark.parametrize("enable_dynamic_schema", [True]) + @pytest.mark.parametrize("nb", [3000]) + @pytest.mark.parametrize("dim", [128]) + def test_search_vector_with_float_vector_datatype(self, nb, dim, insert_round, auto_id, + is_partition_key, enable_dynamic_schema): + """ + Insert a vector with a simple payload + """ + # create a collection + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_schema, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": is_partition_key, + "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "float_vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}}, + ] + }, + "indexParams": [ + {"fieldName": "float_vector", "indexName": "float_vector", "metricType": "COSINE"}, + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 200 + rsp = self.collection_client.collection_describe(name) + logger.info(f"rsp: {rsp}") + assert rsp['code'] == 200 + # insert data + for i in range(insert_round): + data = [] + for i in range(nb): + if auto_id: + tmp = { + "user_id": i%100, + "word_count": i, + "book_describe": f"book_{i}", + "float_vector": gen_vector(datatype="FloatVector", dim=dim), + } + else: + tmp = { + "book_id": i, + "user_id": i%100, + "word_count": i, + "book_describe": f"book_{i}", + "float_vector": gen_vector(datatype="FloatVector", dim=dim), + } + if enable_dynamic_schema: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + payload = { + "collectionName": name, + "data": data, + } + rsp = self.vector_client.vector_insert(payload) + assert rsp['code'] == 200 + assert rsp['data']['insertCount'] == nb + # search data + payload = { + "collectionName": name, + "data": [gen_vector(datatype="FloatVector", dim=dim)], + "filter": "word_count > 100", + "groupingField": "user_id", + "outputFields": ["*"], + "searchParams": { + "metricType": "COSINE", + "params": { + "radius": "0.1", + "range_filter": "0.8" + } + }, + "limit": 100, + } + rsp = self.vector_client.vector_search(payload) + assert rsp['code'] == 200 + assert len(rsp['data']) == 100 + + + + @pytest.mark.parametrize("insert_round", [2]) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("is_partition_key", [True]) + @pytest.mark.parametrize("enable_dynamic_schema", [True]) + @pytest.mark.parametrize("nb", [3000]) + @pytest.mark.parametrize("dim", [128]) + def test_search_vector_with_binary_vector_datatype(self, nb, dim, insert_round, auto_id, + is_partition_key, enable_dynamic_schema): + """ + Insert a vector with a simple payload + """ + # create a collection + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_schema, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": is_partition_key, + "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "binary_vector", "dataType": "BinaryVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [ + {"fieldName": "binary_vector", "indexName": "binary_vector", "metricType": "HAMMING", + "indexConfig": {"index_type": "BIN_IVF_FLAT", "nlist": "512"}} + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 200 + rsp = self.collection_client.collection_describe(name) + logger.info(f"rsp: {rsp}") + assert rsp['code'] == 200 + # insert data + for i in range(insert_round): + data = [] + for i in range(nb): + if auto_id: + tmp = { + "user_id": i%100, + "word_count": i, + "book_describe": f"book_{i}", + "binary_vector": gen_vector(datatype="BinaryVector", dim=dim), + } + else: + tmp = { + "book_id": i, + "user_id": i%100, + "word_count": i, + "book_describe": f"book_{i}", + "binary_vector": gen_vector(datatype="BinaryVector", dim=dim), + } + if enable_dynamic_schema: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + payload = { + "collectionName": name, + "data": data, + } + rsp = self.vector_client.vector_insert(payload) + assert rsp['code'] == 200 + assert rsp['data']['insertCount'] == nb + # flush data + c = Collection(name) + c.flush() + time.sleep(5) + # wait for index + rsp = self.index_client.index_describe(collection_name=name, index_name="binary_vector") + + # search data + payload = { + "collectionName": name, + "data": [gen_vector(datatype="BinaryVector", dim=dim)], + "filter": "word_count > 100", + "outputFields": ["*"], + "searchParams": { + "metricType": "HAMMING", + "params": { + "radius": "0.1", + "range_filter": "0.8" + } + }, + "limit": 100, + } + rsp = self.vector_client.vector_search(payload) + assert rsp['code'] == 200 + assert len(rsp['data']) == 100 + + @pytest.mark.parametrize("metric_type", ["IP", "L2", "COSINE"]) def test_search_vector_with_simple_payload(self, metric_type): """ Search a vector with a simple payload @@ -459,7 +827,7 @@ class TestSearchVector(TestBase): vector_to_search = preprocessing.normalize([np.array([random.random() for i in range(dim)])])[0].tolist() payload = { "collectionName": name, - "vector": vector_to_search, + "data": [vector_to_search], } rsp = self.vector_client.vector_search(payload) assert rsp['code'] == 200 @@ -472,7 +840,7 @@ class TestSearchVector(TestBase): distance = [item['distance'] for item in res] if metric_type == "L2": assert distance == sorted(distance) - if metric_type == "IP": + if metric_type == "IP" or metric_type == "COSINE": assert distance == sorted(distance, reverse=True) @pytest.mark.parametrize("sum_limit_offset", [16384, 16385]) @@ -515,17 +883,16 @@ class TestSearchVector(TestBase): if metric_type == "IP": assert distance == sorted(distance, reverse=True) - @pytest.mark.parametrize("level", [0, 1, 2]) - @pytest.mark.parametrize("offset", [0, 10, 100]) - @pytest.mark.parametrize("limit", [1, 100]) - @pytest.mark.parametrize("metric_type", ["L2", "IP"]) - def test_search_vector_with_complex_payload(self, limit, offset, level, metric_type): + @pytest.mark.parametrize("offset", [0, 100]) + @pytest.mark.parametrize("limit", [100]) + @pytest.mark.parametrize("metric_type", ["L2", "IP", "COSINE"]) + def test_search_vector_with_complex_payload(self, limit, offset, metric_type): """ Search a vector with a simple payload """ name = gen_collection_name() self.name = name - nb = limit + offset + 100 + nb = limit + offset + 3000 dim = 128 schema_payload, data = self.init_collection(name, dim=dim, nb=nb, metric_type=metric_type) vector_field = schema_payload.get("vectorField") @@ -534,7 +901,7 @@ class TestSearchVector(TestBase): output_fields = get_common_fields_by_data(data, exclude_fields=[vector_field]) payload = { "collectionName": name, - "vector": vector_to_search, + "data": [vector_to_search], "outputFields": output_fields, "filter": "uid >= 0", "limit": limit, @@ -570,7 +937,7 @@ class TestSearchVector(TestBase): output_fields = get_common_fields_by_data(data, exclude_fields=[vector_field]) payload = { "collectionName": name, - "vector": vector_to_search, + "data": [vector_to_search], "outputFields": output_fields, "filter": filter_expr, "limit": limit, @@ -611,7 +978,7 @@ class TestSearchVector(TestBase): logger.info(f"filter_expr: {filter_expr}") payload = { "collectionName": name, - "vector": vector_to_search, + "data": [vector_to_search], "outputFields": output_fields, "filter": filter_expr, "limit": limit, @@ -658,7 +1025,7 @@ class TestSearchVector(TestBase): logger.info(f"filter_expr: {filter_expr}") payload = { "collectionName": name, - "vector": vector_to_search, + "data": [vector_to_search], "outputFields": output_fields, "filter": filter_expr, "limit": limit, @@ -684,6 +1051,26 @@ class TestSearchVector(TestBase): @pytest.mark.L1 class TestSearchVectorNegative(TestBase): + + @pytest.mark.parametrize("metric_type", ["L2"]) + def test_search_vector_without_required_vector_param(self, metric_type): + """ + Search a vector with a simple payload + """ + name = gen_collection_name() + self.name = name + self.init_collection(name, metric_type=metric_type) + + # search data + dim = 128 + vector_to_search = preprocessing.normalize([np.array([random.random() for i in range(dim)])])[0].tolist() + payload = { + "collectionName": name, + } + rsp = self.vector_client.vector_search(payload) + assert rsp['code'] == 200 + + @pytest.mark.parametrize("limit", [0, 16385]) def test_search_vector_with_invalid_limit(self, limit): """ @@ -734,9 +1121,452 @@ class TestSearchVectorNegative(TestBase): assert rsp['code'] == 1 +@pytest.mark.L0 +class TestAdvancedSearchVector(TestBase): + + @pytest.mark.parametrize("insert_round", [1]) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("is_partition_key", [True]) + @pytest.mark.parametrize("enable_dynamic_schema", [True]) + @pytest.mark.parametrize("nb", [3000]) + @pytest.mark.parametrize("dim", [2]) + def test_advanced_search_vector_with_multi_float32_vector_datatype(self, nb, dim, insert_round, auto_id, + is_partition_key, enable_dynamic_schema): + """ + Insert a vector with a simple payload + """ + # create a collection + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_schema, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": is_partition_key, + "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "float_vector_1", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}}, + {"fieldName": "float_vector_2", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}}, + ] + }, + "indexParams": [ + {"fieldName": "float_vector_1", "indexName": "float_vector_1", "metricType": "COSINE"}, + {"fieldName": "float_vector_2", "indexName": "float_vector_2", "metricType": "COSINE"}, + + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 200 + rsp = self.collection_client.collection_describe(name) + logger.info(f"rsp: {rsp}") + assert rsp['code'] == 200 + # insert data + for i in range(insert_round): + data = [] + for i in range(nb): + if auto_id: + tmp = { + "user_id": i%100, + "word_count": i, + "book_describe": f"book_{i}", + "float_vector_1": gen_vector(datatype="FloatVector", dim=dim), + "float_vector_2": gen_vector(datatype="FloatVector", dim=dim), + } + else: + tmp = { + "book_id": i, + "user_id": i%100, + "word_count": i, + "book_describe": f"book_{i}", + "float_vector_1": gen_vector(datatype="FloatVector", dim=dim), + "float_vector_2": gen_vector(datatype="FloatVector", dim=dim), + + } + if enable_dynamic_schema: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + payload = { + "collectionName": name, + "data": data, + } + rsp = self.vector_client.vector_insert(payload) + assert rsp['code'] == 200 + assert rsp['data']['insertCount'] == nb + # advanced search data + + payload = { + "collectionName": name, + "search": [{ + "data": [gen_vector(datatype="FloatVector", dim=dim)], + "annsField": "float_vector_1", + "limit": 10, + "outputFields": ["*"] + }, + { + "data": [gen_vector(datatype="FloatVector", dim=dim)], + "annsField": "float_vector_2", + "limit": 10, + "outputFields": ["*"] + } + + ], + "rerank": { + "strategy": "rrf", + "params": { + "k": 10, + } + }, + "limit": 10, + "outputFields": ["user_id", "word_count", "book_describe"] + } + + rsp = self.vector_client.vector_advanced_search(payload) + assert rsp['code'] == 200 + assert len(rsp['data']) == 10 + + + +@pytest.mark.L0 +class TestHybridSearchVector(TestBase): + + @pytest.mark.parametrize("insert_round", [1]) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("is_partition_key", [True]) + @pytest.mark.parametrize("enable_dynamic_schema", [True]) + @pytest.mark.parametrize("nb", [3000]) + @pytest.mark.parametrize("dim", [2]) + def test_hybrid_search_vector_with_multi_float32_vector_datatype(self, nb, dim, insert_round, auto_id, + is_partition_key, enable_dynamic_schema): + """ + Insert a vector with a simple payload + """ + # create a collection + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_schema, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": is_partition_key, + "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "float_vector_1", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}}, + {"fieldName": "float_vector_2", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}}, + ] + }, + "indexParams": [ + {"fieldName": "float_vector_1", "indexName": "float_vector_1", "metricType": "COSINE"}, + {"fieldName": "float_vector_2", "indexName": "float_vector_2", "metricType": "COSINE"}, + + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 200 + rsp = self.collection_client.collection_describe(name) + logger.info(f"rsp: {rsp}") + assert rsp['code'] == 200 + # insert data + for i in range(insert_round): + data = [] + for i in range(nb): + if auto_id: + tmp = { + "user_id": i%100, + "word_count": i, + "book_describe": f"book_{i}", + "float_vector_1": gen_vector(datatype="FloatVector", dim=dim), + "float_vector_2": gen_vector(datatype="FloatVector", dim=dim), + } + else: + tmp = { + "book_id": i, + "user_id": i%100, + "word_count": i, + "book_describe": f"book_{i}", + "float_vector_1": gen_vector(datatype="FloatVector", dim=dim), + "float_vector_2": gen_vector(datatype="FloatVector", dim=dim), + + } + if enable_dynamic_schema: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + payload = { + "collectionName": name, + "data": data, + } + rsp = self.vector_client.vector_insert(payload) + assert rsp['code'] == 200 + assert rsp['data']['insertCount'] == nb + # advanced search data + + payload = { + "collectionName": name, + "search": [{ + "data": [gen_vector(datatype="FloatVector", dim=dim)], + "annsField": "float_vector_1", + "limit": 10, + "outputFields": ["*"] + }, + { + "data": [gen_vector(datatype="FloatVector", dim=dim)], + "annsField": "float_vector_2", + "limit": 10, + "outputFields": ["*"] + } + + ], + "rerank": { + "strategy": "rrf", + "params": { + "k": 10, + } + }, + "limit": 10, + "outputFields": ["user_id", "word_count", "book_describe"] + } + + rsp = self.vector_client.vector_hybrid_search(payload) + assert rsp['code'] == 200 + assert len(rsp['data']) == 10 + + + + @pytest.mark.L0 class TestQueryVector(TestBase): + @pytest.mark.parametrize("insert_round", [1]) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("is_partition_key", [True]) + @pytest.mark.parametrize("enable_dynamic_schema", [True]) + @pytest.mark.parametrize("nb", [3000]) + @pytest.mark.parametrize("dim", [128]) + def test_query_entities_with_all_scalar_datatype(self, nb, dim, insert_round, auto_id, + is_partition_key, enable_dynamic_schema): + """ + Insert a vector with a simple payload + """ + # create a collection + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_schema, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": is_partition_key, + "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "25536"}}, + {"fieldName": "bool", "dataType": "Bool", "elementTypeParams": {}}, + {"fieldName": "json", "dataType": "JSON", "elementTypeParams": {}}, + {"fieldName": "int_array", "dataType": "Array", "elementDataType": "Int64", + "elementTypeParams": {"max_capacity": "1024"}}, + {"fieldName": "varchar_array", "dataType": "Array", "elementDataType": "VarChar", + "elementTypeParams": {"max_capacity": "1024", "max_length": "256"}}, + {"fieldName": "bool_array", "dataType": "Array", "elementDataType": "Bool", + "elementTypeParams": {"max_capacity": "1024"}}, + {"fieldName": "text_emb", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}}, + {"fieldName": "image_emb", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}}, + ] + }, + "indexParams": [ + {"fieldName": "text_emb", "indexName": "text_emb", "metricType": "L2"}, + {"fieldName": "image_emb", "indexName": "image_emb", "metricType": "L2"} + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 200 + rsp = self.collection_client.collection_describe(name) + logger.info(f"rsp: {rsp}") + assert rsp['code'] == 200 + # insert data + for i in range(insert_round): + data = [] + for i in range(nb): + if auto_id: + tmp = { + "user_id": i, + "word_count": i, + "book_describe": f"book_{gen_unique_str(length=1000)}", + "bool": random.choice([True, False]), + "json": {"key": [i]}, + "int_array": [i], + "varchar_array": [f"varchar_{i}"], + "bool_array": [random.choice([True, False])], + "text_emb": preprocessing.normalize([np.array([random.random() for _ in range(dim)])])[ + 0].tolist(), + "image_emb": preprocessing.normalize([np.array([random.random() for _ in range(dim)])])[ + 0].tolist(), + } + else: + tmp = { + "book_id": i, + "user_id": i, + "word_count": i, + "book_describe": gen_unique_str(length=1000), + "bool": random.choice([True, False]), + "json": {"key": i}, + "int_array": [i], + "varchar_array": [f"varchar_{i}"], + "bool_array": [random.choice([True, False])], + "text_emb": preprocessing.normalize([np.array([random.random() for _ in range(dim)])])[ + 0].tolist(), + "image_emb": preprocessing.normalize([np.array([random.random() for _ in range(dim)])])[ + 0].tolist(), + } + if enable_dynamic_schema: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + payload = { + "collectionName": name, + "data": data, + } + rsp = self.vector_client.vector_insert(payload) + assert rsp['code'] == 200 + assert rsp['data']['insertCount'] == nb + # query data to make sure the data is inserted + # 1. query for int64 + payload = { + "collectionName": name, + "filter": "user_id > 0", + "limit": 50, + "outputFields": ["*"] + } + rsp = self.vector_client.vector_query(payload) + assert rsp['code'] == 200 + assert len(rsp['data']) == 50 + + # 2. query for varchar + payload = { + "collectionName": name, + "filter": "book_describe like \"book%\"", + "limit": 50, + "outputFields": ["*"] + } + rsp = self.vector_client.vector_query(payload) + assert rsp['code'] == 200 + assert len(rsp['data']) == 50 + + # 3. query for json + payload = { + "collectionName": name, + "filter": "json_contains(json['key'] , 1)", + "limit": 50, + "outputFields": ["*"] + } + rsp = self.vector_client.vector_query(payload) + assert len(rsp['data']) == 1 + + # 4. query for array + payload = { + "collectionName": name, + "filter": "array_contains(int_array, 1)", + "limit": 50, + "outputFields": ["*"] + } + rsp = self.vector_client.vector_query(payload) + assert len(rsp['data']) == 1 + + @pytest.mark.parametrize("insert_round", [1]) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("is_partition_key", [True]) + @pytest.mark.parametrize("enable_dynamic_schema", [True]) + @pytest.mark.parametrize("nb", [3000]) + @pytest.mark.parametrize("dim", [128]) + def test_query_entities_with_all_vector_datatype(self, nb, dim, insert_round, auto_id, + is_partition_key, enable_dynamic_schema): + """ + Insert a vector with a simple payload + """ + # create a collection + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_schema, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": is_partition_key, + "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "float_vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}}, + {"fieldName": "float16_vector", "dataType": "Float16Vector", + "elementTypeParams": {"dim": f"{dim}"}}, + {"fieldName": "bfloat16_vector", "dataType": "BFloat16Vector", + "elementTypeParams": {"dim": f"{dim}"}}, + {"fieldName": "binary_vector", "dataType": "BinaryVector", "elementTypeParams": {"dim": f"{dim}"}}, + ] + }, + "indexParams": [ + {"fieldName": "float_vector", "indexName": "float_vector", "metricType": "L2"}, + {"fieldName": "float16_vector", "indexName": "float16_vector", "metricType": "L2"}, + {"fieldName": "bfloat16_vector", "indexName": "bfloat16_vector", "metricType": "L2"}, + {"fieldName": "binary_vector", "indexName": "binary_vector", "metricType": "HAMMING", + "indexConfig": {"index_type": "BIN_IVF_FLAT", "nlist": "512"}} + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 200 + rsp = self.collection_client.collection_describe(name) + logger.info(f"rsp: {rsp}") + assert rsp['code'] == 200 + # insert data + for i in range(insert_round): + data = [] + for i in range(nb): + if auto_id: + tmp = { + "user_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "float_vector": gen_vector(datatype="FloatVector", dim=dim), + "float16_vector": gen_vector(datatype="Float16Vector", dim=dim), + "bfloat16_vector": gen_vector(datatype="BFloat16Vector", dim=dim), + "binary_vector": gen_vector(datatype="BinaryVector", dim=dim) + } + else: + tmp = { + "book_id": i, + "user_id": i, + "word_count": i, + "book_describe": f"book_{i}", + "float_vector": gen_vector(datatype="FloatVector", dim=dim), + "float16_vector": gen_vector(datatype="Float16Vector", dim=dim), + "bfloat16_vector": gen_vector(datatype="BFloat16Vector", dim=dim), + "binary_vector": gen_vector(datatype="BinaryVector", dim=dim) + } + if enable_dynamic_schema: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + payload = { + "collectionName": name, + "data": data, + } + rsp = self.vector_client.vector_insert(payload) + assert rsp['code'] == 200 + assert rsp['data']['insertCount'] == nb + c = Collection(name) + res = c.query( + expr="user_id > 0", + limit=50, + output_fields=["*"], + ) + logger.info(f"res: {res}") + # query data to make sure the data is inserted + rsp = self.vector_client.vector_query({"collectionName": name, "filter": "user_id > 0", "limit": 50}) + assert rsp['code'] == 200 + assert len(rsp['data']) == 50 + @pytest.mark.parametrize("expr", ["10+20 <= uid < 20+30", "uid in [1,2,3,4]", "uid > 0", "uid >= 0", "uid > 0", "uid > -100 and uid < 100"]) @@ -780,6 +1610,39 @@ class TestQueryVector(TestBase): for field in output_fields: assert field in r + def test_query_vector_with_count(self): + """ + Query a vector with a simple payload + """ + name = gen_collection_name() + self.name = name + self.init_collection(name, nb=3000) + # query for "count(*)" + payload = { + "collectionName": name, + "filter": " ", + "limit": 0, + "outputFields": ["count(*)"] + } + rsp = self.vector_client.vector_query(payload) + assert rsp['code'] == 200 + assert rsp['data'][0]['count(*)'] == 3000 + + @pytest.mark.xfail(reason="query by id is not supported") + def test_query_vector_by_id(self): + """ + Query a vector with a simple payload + """ + name = gen_collection_name() + self.name = name + _, _, insert_ids = self.init_collection(name, nb=3000, return_insert_id=True) + payload = { + "collectionName": name, + "id": insert_ids, + } + rsp = self.vector_client.vector_query(payload) + assert rsp['code'] == 200 + @pytest.mark.parametrize("filter_expr", ["name > \"placeholder\"", "name like \"placeholder%\""]) @pytest.mark.parametrize("include_output_fields", [True, False]) def test_query_vector_with_varchar_filter(self, filter_expr, include_output_fields): @@ -874,6 +1737,29 @@ class TestQueryVector(TestBase): assert name.startswith(prefix) +@pytest.mark.L1 +class TestQueryVectorNegative(TestBase): + + def test_query_with_id_and_filter(self): + name = gen_collection_name() + self.name = name + nb = 200 + dim = 128 + schema_payload, data, insert_ids = self.init_collection(name, dim=dim, nb=nb, return_insert_id=True) + output_fields = get_common_fields_by_data(data) + uids = [] + for item in data: + uids.append(item.get("uid")) + payload = { + "collectionName": name, + "outputFields": output_fields, + "filter": f"uid in {uids}", + "id": insert_ids, + } + rsp = self.vector_client.vector_query(payload) + assert rsp['code'] != 200 + + @pytest.mark.L0 class TestGetVector(TestBase): @@ -890,7 +1776,7 @@ class TestGetVector(TestBase): vector_to_search = preprocessing.normalize([np.array([random.random() for i in range(dim)])])[0].tolist() payload = { "collectionName": name, - "vector": vector_to_search, + "data": [vector_to_search], } rsp = self.vector_client.vector_search(payload) assert rsp['code'] == 200 @@ -982,9 +1868,56 @@ class TestGetVector(TestBase): @pytest.mark.L0 class TestDeleteVector(TestBase): - @pytest.mark.parametrize("include_invalid_id", [True, False]) + @pytest.mark.xfail(reason="delete by id is not supported") + def test_delete_vector_by_id(self): + """ + Query a vector with a simple payload + """ + name = gen_collection_name() + self.name = name + _, _, insert_ids = self.init_collection(name, nb=3000, return_insert_id=True) + payload = { + "collectionName": name, + "id": insert_ids, + } + rsp = self.vector_client.vector_query(payload) + assert rsp['code'] == 200 + @pytest.mark.parametrize("id_field_type", ["list", "one"]) - def test_delete_vector_default(self, id_field_type, include_invalid_id): + def test_delete_vector_by_pk_field_ids(self, id_field_type): + name = gen_collection_name() + self.name = name + nb = 200 + dim = 128 + schema_payload, data, insert_ids = self.init_collection(name, dim=dim, nb=nb, return_insert_id=True) + time.sleep(1) + id_to_delete = None + if id_field_type == "list": + id_to_delete = insert_ids + if id_field_type == "one": + id_to_delete = insert_ids[0] + if isinstance(id_to_delete, list): + payload = { + "collectionName": name, + "filter": f"id in {id_to_delete}" + } + else: + payload = { + "collectionName": name, + "filter": f"id == {id_to_delete}" + } + rsp = self.vector_client.vector_delete(payload) + assert rsp['code'] == 200 + # verify data deleted by get + payload = { + "collectionName": name, + "id": id_to_delete + } + rsp = self.vector_client.vector_get(payload) + assert len(rsp['data']) == 0 + + @pytest.mark.parametrize("id_field_type", ["list", "one"]) + def test_delete_vector_by_filter_pk_field(self, id_field_type): name = gen_collection_name() self.name = name nb = 200 @@ -1013,19 +1946,21 @@ class TestDeleteVector(TestBase): id_to_get = ids if id_field_type == "one": id_to_get = ids[0] - if include_invalid_id: - if isinstance(id_to_get, list): - id_to_get.append(0) - else: - id_to_get = 0 if isinstance(id_to_get, list): if len(id_to_get) >= 100: id_to_get = id_to_get[-100:] # delete by id list - payload = { - "collectionName": name, - "id": id_to_get - } + if isinstance(id_to_get, list): + payload = { + "collectionName": name, + "filter": f"id in {id_to_get}", + } + else: + payload = { + "collectionName": name, + "filter": f"id == {id_to_get}", + } + rsp = self.vector_client.vector_delete(payload) assert rsp['code'] == 200 logger.info(f"delete res: {rsp}") @@ -1042,7 +1977,71 @@ class TestDeleteVector(TestBase): assert rsp['code'] == 200 assert len(rsp['data']) == 0 - def test_delete_vector_by_filter(self): + def test_delete_vector_by_custom_pk_field(self): + dim = 128 + nb = 3000 + insert_round = 1 + + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "text_emb", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "text_emb", "indexName": "text_emb_index", "metricType": "L2"}] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 200 + rsp = self.collection_client.collection_describe(name) + logger.info(f"rsp: {rsp}") + assert rsp['code'] == 200 + pk_values = [] + # insert data + for i in range(insert_round): + data = [] + for j in range(nb): + tmp = { + "book_id": i * nb + j, + "word_count": i * nb + j, + "book_describe": f"book_{i * nb + j}", + "text_emb": preprocessing.normalize([np.array([random.random() for i in range(dim)])])[0].tolist() + } + data.append(tmp) + payload = { + "collectionName": name, + "data": data, + } + tmp = [d["book_id"] for d in data] + pk_values.extend(tmp) + body_size = sys.getsizeof(json.dumps(payload)) + logger.info(f"body size: {body_size / 1024 / 1024} MB") + rsp = self.vector_client.vector_insert(payload) + assert rsp['code'] == 200 + assert rsp['data']['insertCount'] == nb + # query data before delete + c = Collection(name) + res = c.query(expr="", output_fields=["count(*)"]) + logger.info(f"res: {res}") + + # delete data + payload = { + "collectionName": name, + "filter": f"book_id in {pk_values}", + } + rsp = self.vector_client.vector_delete(payload) + + # query data after delete + time.sleep(1) + res = c.query(expr="", output_fields=["count(*)"]) + logger.info(f"res: {res}") + assert res[0]["count(*)"] == 0 + + def test_delete_vector_by_filter_custom_field(self): dim = 128 nb = 3000 insert_round = 1 diff --git a/tests/restful_client_v2/utils/utils.py b/tests/restful_client_v2/utils/utils.py index 013f72e08d..efa8fe9e0a 100644 --- a/tests/restful_client_v2/utils/utils.py +++ b/tests/restful_client_v2/utils/utils.py @@ -115,9 +115,11 @@ def get_random_json_data(uid=None): def get_data_by_payload(payload, nb=100): dim = payload.get("dimension", 128) vector_field = payload.get("vectorField", "vector") + pk_field = payload.get("primaryField", "id") data = [] if nb == 1: data = [{ + pk_field: int(time.time()*10000), vector_field: preprocessing.normalize([np.array([random.random() for i in range(dim)])])[0].tolist(), **get_random_json_data() @@ -125,6 +127,7 @@ def get_data_by_payload(payload, nb=100): else: for i in range(nb): data.append({ + pk_field: int(time.time()*10000), vector_field: preprocessing.normalize([np.array([random.random() for i in range(dim)])])[0].tolist(), **get_random_json_data(uid=i) }) diff --git a/tests/scripts/ci_e2e_4am.sh b/tests/scripts/ci_e2e_4am.sh index c5288ededd..7af1f2c240 100755 --- a/tests/scripts/ci_e2e_4am.sh +++ b/tests/scripts/ci_e2e_4am.sh @@ -66,7 +66,7 @@ echo "prepare e2e test" install_pytest_requirements -# Run restful test +# Run restful test v1 cd ${ROOT}/tests/restful_client @@ -79,6 +79,18 @@ else --html=${CI_LOG_PATH}/report_restful.html --self-contained-html fi +# Run restful test v2 +cd ${ROOT}/tests/restful_client_v2 + +if [[ -n "${TEST_TIMEOUT:-}" ]]; then + + timeout "${TEST_TIMEOUT}" pytest testcases --endpoint http://${MILVUS_SERVICE_NAME}:${MILVUS_SERVICE_PORT} -v -x -m L0 -n 6 --timeout 180\ + --html=${CI_LOG_PATH}/report_restful.html --self-contained-html +else + pytest testcases --endpoint http://${MILVUS_SERVICE_NAME}:${MILVUS_SERVICE_PORT} -v -x -m L0 -n 6 --timeout 180\ + --html=${CI_LOG_PATH}/report_restful.html --self-contained-html +fi + cd ${ROOT}/tests/python_client