From b3488271022bb82cc933fd486d788f9bcee1b02a Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Wed, 13 Dec 2023 19:56:38 +0800 Subject: [PATCH] test: add array data type and parquet file type for bulk insert case (#29030) add array data type and parquet file type for the bulk insert case --------- Signed-off-by: zhuwenxing --- .../python_client/common/bulk_insert_data.py | 327 ++++++++++++++++- tests/python_client/common/common_func.py | 7 +- .../testcases/test_bulk_insert.py | 340 +++++++++++++++++- 3 files changed, 656 insertions(+), 18 deletions(-) diff --git a/tests/python_client/common/bulk_insert_data.py b/tests/python_client/common/bulk_insert_data.py index ec613de3cb..80e9a284a3 100644 --- a/tests/python_client/common/bulk_insert_data.py +++ b/tests/python_client/common/bulk_insert_data.py @@ -1,14 +1,19 @@ import copy +import json import os +import time + import numpy as np +import pandas as pd import random +from faker import Faker from sklearn import preprocessing from common.common_func import gen_unique_str from common.minio_comm import copy_files_to_minio from utils.util_log import test_log as log data_source = "/tmp/bulk_insert_data" - +fake = Faker() BINARY = "binary" FLOAT = "float" @@ -21,6 +26,11 @@ class DataField: bool_field = "bool_scalar" float_field = "float_scalar" double_field = "double_scalar" + json_field = "json" + array_bool_field = "array_bool" + array_int_field = "array_int" + array_float_field = "array_float" + array_string_field = "array_string" class DataErrorType: @@ -31,6 +41,8 @@ class DataErrorType: typo_on_bool = "typo_on_bool" str_on_float_scalar = "str_on_float_scalar" str_on_vector_field = "str_on_vector_field" + empty_array_field = "empty_array_field" + mismatch_type_array_field = "mismatch_type_array_field" def gen_file_prefix(is_row_based=True, auto_id=True, prefix=""): @@ -75,7 +87,7 @@ def gen_binary_vectors(nb, dim): def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect, - rows, dim, start_uid=0, err_type="", **kwargs): + rows, dim, start_uid=0, err_type="", enable_dynamic_field=False, **kwargs): if err_type == DataErrorType.str_on_int_pk: str_pk = True @@ -99,7 +111,9 @@ def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect, data_field = data_fields[j] if data_field == DataField.pk_field: if str_pk: - f.write('"uid":"' + str(gen_unique_str()) + '"') + line = '"uid":"' + str(gen_unique_str()) + '"' + f.write(line) + # f.write('"uid":"' + str(gen_unique_str()) + '"') else: if err_type == DataErrorType.float_on_int_pk: f.write('"uid":' + str(i + start_uid + random.random()) + '') @@ -110,14 +124,24 @@ def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect, # if not auto_id, use the same value as pk to check the query results later f.write('"int_scalar":' + str(i + start_uid) + '') else: - f.write('"int_scalar":' + str(random.randint(-999999, 9999999)) + '') + line = '"int_scalar":' + str(random.randint(-999999, 9999999)) + '' + f.write(line) if data_field == DataField.float_field: if err_type == DataErrorType.int_on_float_scalar: f.write('"float_scalar":' + str(random.randint(-999999, 9999999)) + '') elif err_type == DataErrorType.str_on_float_scalar: f.write('"float_scalar":"' + str(gen_unique_str()) + '"') else: - f.write('"float_scalar":' + str(random.random()) + '') + line = '"float_scalar":' + str(random.random()) + '' + f.write(line) + if data_field == DataField.double_field: + if err_type == DataErrorType.int_on_float_scalar: + f.write('"double_scalar":' + str(random.randint(-999999, 9999999)) + '') + elif err_type == DataErrorType.str_on_float_scalar: + f.write('"double_scalar":"' + str(gen_unique_str()) + '"') + else: + line = '"double_scalar":' + str(random.random()) + '' + f.write(line) if data_field == DataField.string_field: f.write('"string_scalar":"' + str(gen_unique_str()) + '"') if data_field == DataField.bool_field: @@ -125,6 +149,41 @@ def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect, f.write('"bool_scalar":' + str(random.choice(["True", "False", "TRUE", "FALSE", "0", "1"])) + '') else: f.write('"bool_scalar":' + str(random.choice(["true", "false"])) + '') + if data_field == DataField.json_field: + data = { + gen_unique_str(): random.randint(-999999, 9999999), + } + f.write('"json":' + json.dumps(data) + '') + if data_field == DataField.array_bool_field: + if err_type == DataErrorType.empty_array_field: + f.write('"array_bool":[]') + elif err_type == DataErrorType.mismatch_type_array_field: + f.write('"array_bool": "mistype"') + else: + + f.write('"array_bool":[' + str(random.choice(["true", "false"])) + ',' + str(random.choice(["true", "false"])) + ']') + if data_field == DataField.array_int_field: + if err_type == DataErrorType.empty_array_field: + f.write('"array_int":[]') + elif err_type == DataErrorType.mismatch_type_array_field: + f.write('"array_int": "mistype"') + else: + f.write('"array_int":[' + str(random.randint(-999999, 9999999)) + ',' + str(random.randint(-999999, 9999999)) + ']') + if data_field == DataField.array_float_field: + if err_type == DataErrorType.empty_array_field: + f.write('"array_float":[]') + elif err_type == DataErrorType.mismatch_type_array_field: + f.write('"array_float": "mistype"') + else: + f.write('"array_float":[' + str(random.random()) + ',' + str(random.random()) + ']') + if data_field == DataField.array_string_field: + if err_type == DataErrorType.empty_array_field: + f.write('"array_string":[]') + elif err_type == DataErrorType.mismatch_type_array_field: + f.write('"array_string": "mistype"') + else: + f.write('"array_string":["' + str(gen_unique_str()) + '","' + str(gen_unique_str()) + '"]') + if data_field == DataField.vec_field: # vector field if err_type == DataErrorType.one_entity_wrong_dim and i == wrong_row: @@ -133,10 +192,16 @@ def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect, vectors = gen_str_invalid_vectors(1, dim) if float_vect else gen_str_invalid_vectors(1, dim//8) else: vectors = gen_float_vectors(1, dim) if float_vect else gen_binary_vectors(1, (dim//8)) - f.write('"vectors":' + ",".join(str(x).replace("'", '"') for x in vectors) + '') + line = '"vectors":' + ",".join(str(x).replace("'", '"') for x in vectors) + '' + f.write(line) # not write common for the last field if j != len(data_fields) - 1: f.write(',') + if enable_dynamic_field: + d = {str(i+start_uid): i+start_uid, "name": fake.name(), "address": fake.address()} + d_str = json.dumps(d) + d_str = d_str[1:-1] # remove {} + f.write("," + d_str) f.write('}') f.write("\n") f.write("]") @@ -276,6 +341,20 @@ def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False): return file_name +def gen_dynamic_field_in_numpy_file(dir, rows, start=0, force=False): + file_name = f"$meta.npy" + file = f"{dir}/{file_name}" + if not os.path.exists(file) or force: + # non vector columns + data = [] + if rows > 0: + data = [json.dumps({str(i): i, "name": fake.name(), "address": fake.address()}) for i in range(start, rows+start)] + arr = np.array(data) + log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}") + np.save(file, arr) + return file_name + + def gen_bool_in_numpy_file(dir, data_field, rows, start=0, force=False): file_name = f"{data_field}.npy" file = f"{dir}/{file_name}" @@ -291,6 +370,19 @@ def gen_bool_in_numpy_file(dir, data_field, rows, start=0, force=False): return file_name +def gen_json_in_numpy_file(dir, data_field, rows, start=0, force=False): + file_name = f"{data_field}.npy" + file = f"{dir}/{file_name}" + if not os.path.exists(file) or force: + data = [] + if rows > 0: + data = [json.dumps({"name": fake.name(), "address": fake.address()}) for i in range(start, rows+start)] + arr = np.array(data) + log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}") + np.save(file, arr) + return file_name + + def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False): file_name = f"{data_field}.npy" file = f"{dir}/{file_name}" @@ -307,13 +399,65 @@ def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False): data = [i for i in range(start, start + rows)] elif data_field == DataField.int_field: data = [random.randint(-999999, 9999999) for _ in range(rows)] - # print(f"file_name: {file_name} data type: {arr.dtype}") arr = np.array(data) log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}") np.save(file, arr) return file_name +def gen_vectors(float_vector, rows, dim): + vectors = [] + if rows > 0: + if float_vector: + vectors = gen_float_vectors(rows, dim) + else: + vectors = gen_binary_vectors(rows, (dim // 8)) + return vectors + + +def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128, array_length=None): + if array_length is None: + array_length = random.randint(0, 10) + + data = [] + if rows > 0: + if data_field == DataField.vec_field: + data = gen_vectors(float_vector=float_vector, rows=rows, dim=dim) + elif data_field == DataField.float_field: + data = [np.float32(random.random()) for _ in range(rows)] + elif data_field == DataField.double_field: + data = [np.float64(random.random()) for _ in range(rows)] + elif data_field == DataField.pk_field: + data = [np.int64(i) for i in range(start, start + rows)] + elif data_field == DataField.int_field: + data = [np.int64(random.randint(-999999, 9999999)) for _ in range(rows)] + elif data_field == DataField.string_field: + data = [gen_unique_str(str(i)) for i in range(start, rows + start)] + elif data_field == DataField.bool_field: + data = [random.choice([True, False]) for i in range(start, rows + start)] + elif data_field == DataField.json_field: + data = pd.Series([json.dumps({ + gen_unique_str(): random.randint(-999999, 9999999) + }) for i in range(start, rows + start)], dtype=np.dtype("str")) + elif data_field == DataField.array_bool_field: + data = pd.Series( + [np.array([random.choice([True, False]) for _ in range(array_length)], dtype=np.dtype("bool")) + for i in range(start, rows + start)]) + elif data_field == DataField.array_int_field: + data = pd.Series( + [np.array([random.randint(-999999, 9999999) for _ in range(array_length)], dtype=np.dtype("int64")) + for i in range(start, rows + start)]) + elif data_field == DataField.array_float_field: + data = pd.Series( + [np.array([random.random() for _ in range(array_length)], dtype=np.dtype("float32")) + for i in range(start, rows + start)]) + elif data_field == DataField.array_string_field: + data = pd.Series( + [np.array([gen_unique_str(str(i)) for _ in range(array_length)], dtype=np.dtype("str")) + for i in range(start, rows + start)]) + return data + + def gen_file_name(is_row_based, rows, dim, auto_id, str_pk, float_vector, data_fields, file_num, file_type, err_type): row_suffix = entity_suffix(rows) @@ -334,7 +478,7 @@ def gen_file_name(is_row_based, rows, dim, auto_id, str_pk, pk = "str_pk_" prefix = gen_file_prefix(is_row_based=is_row_based, auto_id=auto_id, prefix=err_type) - file_name = f"{prefix}_{pk}{vt}{field_suffix}{dim}d_{row_suffix}_{file_num}{file_type}" + file_name = f"{prefix}_{pk}{vt}{field_suffix}{dim}d_{row_suffix}_{file_num}_{int(time.time())}{file_type}" return file_name @@ -381,7 +525,65 @@ def gen_json_files(is_row_based, rows, dim, auto_id, str_pk, return files -def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type="", force=False): +def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, dim=128, array_length=None, enable_dynamic_field=False): + data = [] + for r in range(rows): + d = {} + for data_field in data_fields: + if data_field == DataField.vec_field: + # vector columns + d[data_field] = gen_vectors(float_vector=float_vector, rows=1, dim=dim)[0] + elif data_field == DataField.float_field: + d[data_field] = random.random() + elif data_field == DataField.double_field: + d[data_field] = random.random() + elif data_field == DataField.pk_field: + d[data_field] = r+start + elif data_field == DataField.int_field: + d[data_field] =random.randint(-999999, 9999999) + elif data_field == DataField.string_field: + d[data_field] = gen_unique_str(str(r + start)) + elif data_field == DataField.bool_field: + d[data_field] = random.choice([True, False]) + elif data_field == DataField.json_field: + d[data_field] = {str(r+start): r+start} + elif data_field == DataField.array_bool_field: + array_length = random.randint(0, 10) if array_length is None else array_length + d[data_field] = [random.choice([True, False]) for _ in range(array_length)] + elif data_field == DataField.array_int_field: + array_length = random.randint(0, 10) if array_length is None else array_length + d[data_field] = [random.randint(-999999, 9999999) for _ in range(array_length)] + elif data_field == DataField.array_float_field: + array_length = random.randint(0, 10) if array_length is None else array_length + d[data_field] = [random.random() for _ in range(array_length)] + elif data_field == DataField.array_string_field: + array_length = random.randint(0, 10) if array_length is None else array_length + d[data_field] = [gen_unique_str(str(i)) for i in range(array_length)] + if enable_dynamic_field: + d[str(r+start)] = r+start + d["name"] = fake.name() + d["address"] = fake.address() + data.append(d) + + return data + + +def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False): + files = [] + start_uid = 0 + for i in range(file_nums): + file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json" + file = f"{data_source}/{file_name}" + data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field) + log.info(f"data: {data}") + with open(file, "w") as f: + json.dump(data, f) + files.append(file_name) + start_uid += rows + return files + + +def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type="", force=False, enable_dynamic_field=False): # gen numpy files files = [] start_uid = 0 @@ -395,10 +597,15 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type="" file_name = gen_string_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) elif data_field == DataField.bool_field: file_name = gen_bool_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) + elif data_field == DataField.json_field: + file_name = gen_json_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) else: file_name = gen_int_or_float_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) files.append(file_name) + if enable_dynamic_field: + file_name = gen_dynamic_field_in_numpy_file(dir=data_source, rows=rows, force=force) + files.append(file_name) else: for i in range(file_nums): subfolder = gen_subfolder(root=data_source, dim=dim, rows=rows, file_num=i) @@ -409,6 +616,52 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type="" else: file_name = gen_int_or_float_in_numpy_file(dir=dir, data_field=data_field, rows=rows, start=start_uid, force=force) files.append(f"{subfolder}/{file_name}") + if enable_dynamic_field: + file_name = gen_dynamic_field_in_numpy_file(dir=dir, rows=rows, start=start_uid, force=force) + files.append(f"{subfolder}/{file_name}") + start_uid += rows + return files + + +def gen_dynamic_field_data_in_parquet_file(rows, start=0): + data = [] + if rows > 0: + data = pd.Series([json.dumps({str(i): i, "name": fake.name(), "address": fake.address()}) for i in range(start, rows+start)], dtype=np.dtype("str")) + return data + + +def gen_parquet_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False): + # gen numpy files + if err_type == "": + err_type = "none" + files = [] + start_uid = 0 + if file_nums == 1: + all_field_data = {} + for data_field in data_fields: + data = gen_data_by_data_field(data_field=data_field, rows=rows, start=0, + float_vector=float_vector, dim=dim, array_length=array_length) + all_field_data[data_field] = data + if enable_dynamic_field: + all_field_data["$meta"] = gen_dynamic_field_data_in_parquet_file(rows=rows, start=0) + df = pd.DataFrame(all_field_data) + log.info(f"df: \n{df}") + file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet" + df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow') + files.append(file_name) + else: + for i in range(file_nums): + all_field_data = {} + for data_field in data_fields: + data = gen_data_by_data_field(data_field=data_field, rows=rows, start=0, + float_vector=float_vector, dim=dim, array_length=array_length) + all_field_data[data_field] = data + if enable_dynamic_field: + all_field_data["$meta"] = gen_dynamic_field_data_in_parquet_file(rows=rows, start=0) + df = pd.DataFrame(all_field_data) + file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-error-{err_type}-{int(time.time())}.parquet" + df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow') + files.append(file_name) start_uid += rows return files @@ -476,6 +729,7 @@ def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket data_fields_c = copy.deepcopy(data_fields) log.info(f"data_fields: {data_fields}") log.info(f"data_fields_c: {data_fields_c}") + files = gen_json_files(is_row_based=is_row_based, rows=rows, dim=dim, auto_id=auto_id, str_pk=str_pk, float_vector=float_vector, data_fields=data_fields_c, file_nums=file_nums, multi_folder=multi_folder, @@ -485,7 +739,19 @@ def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket return files -def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, +def prepare_bulk_insert_new_json_files(minio_endpoint="", bucket_name="milvus-bucket", + rows=100, dim=128, float_vector=True, + data_fields=[], file_nums=1, enable_dynamic_field=False, + err_type="", force=False, **kwargs): + + log.info(f"data_fields: {data_fields}") + files = gen_new_json_files(float_vector=float_vector, rows=rows, dim=dim, data_fields=data_fields, file_nums=file_nums, err_type=err_type, enable_dynamic_field=enable_dynamic_field, **kwargs) + + copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) + return files + + +def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False): """ Generate column based files based on params in numpy format and copy them to the minio @@ -517,12 +783,51 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke File name list or file name with sub-folder list """ files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector, - data_fields=data_fields, + data_fields=data_fields, enable_dynamic_field=enable_dynamic_field, file_nums=file_nums, force=force) copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) return files + +def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None, + enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False): + """ + Generate column based files based on params in parquet format and copy them to the minio + Note: each field in data_fields would be generated one parquet file. + + :param rows: the number entities to be generated in the file(s) + :type rows: int + + :param dim: dim of vector data + :type dim: int + + :param: float_vector: generate float vectors or binary vectors + :type float_vector: boolean + + :param: data_fields: data fields to be generated in the file(s): + it supports one or all of [int_pk, vectors, int, float] + Note: it does not automatically add pk field + :type data_fields: list + + :param file_nums: file numbers to be generated + The file(s) would be generated in data_source folder if file_nums = 1 + The file(s) would be generated in different sub-folders if file_nums > 1 + :type file_nums: int + + :param force: re-generate the file(s) regardless existing or not + :type force: boolean + + Return: List + File name list or file name with sub-folder list + """ + files = gen_parquet_files(rows=rows, dim=dim, float_vector=float_vector, enable_dynamic_field=enable_dynamic_field, + data_fields=data_fields, array_length=array_length, + file_nums=file_nums) + copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) + return files + + def gen_csv_file(file, float_vector, data_fields, rows, dim, start_uid): with open(file, "w") as f: # field name diff --git a/tests/python_client/common/common_func.py b/tests/python_client/common/common_func.py index eef0815f8f..e6bb455d03 100644 --- a/tests/python_client/common/common_func.py +++ b/tests/python_client/common/common_func.py @@ -98,6 +98,9 @@ def gen_json_field(name=ct.default_json_field_name, description=ct.default_desc, def gen_array_field(name=ct.default_array_field_name, element_type=DataType.INT64, max_capacity=ct.default_max_capacity, description=ct.default_desc, is_primary=False, **kwargs): + if element_type == DataType.VARCHAR: + kwargs['max_length'] = ct.default_length + array_field, _ = ApiFieldSchemaWrapper().init_field_schema(name=name, dtype=DataType.ARRAY, element_type=element_type, max_capacity=max_capacity, description=description, is_primary=is_primary, **kwargs) @@ -477,7 +480,7 @@ def gen_dataframe_all_data_type(nb=ct.default_nb, dim=ct.default_dim, start=0, w if not random_primary_key: int64_values = pd.Series(data=[i for i in range(start, start + nb)]) else: - int64_values = pd.Series(data=random.sample(range(start, start + nb), nb)) + int64_values = pd.Series(data=random.sample(range(start, start + nb), nb)) int32_values = pd.Series(data=[np.int32(i) for i in range(start, start + nb)], dtype="int32") int16_values = pd.Series(data=[np.int16(i) for i in range(start, start + nb)], dtype="int16") int8_values = pd.Series(data=[np.int8(i) for i in range(start, start + nb)], dtype="int8") @@ -985,7 +988,7 @@ def gen_search_param(index_type, metric_type="L2"): log.error("Invalid index_type.") raise Exception("Invalid index_type.") log.debug(search_params) - + return search_params diff --git a/tests/python_client/testcases/test_bulk_insert.py b/tests/python_client/testcases/test_bulk_insert.py index 7fffb714f3..00dff27fa2 100644 --- a/tests/python_client/testcases/test_bulk_insert.py +++ b/tests/python_client/testcases/test_bulk_insert.py @@ -1,6 +1,7 @@ import logging import time import pytest +from pymilvus import DataType import numpy as np from pathlib import Path from base.client_base import TestcaseBase @@ -11,7 +12,9 @@ from common.common_type import CaseLabel, CheckTasks from utils.util_log import test_log as log from common.bulk_insert_data import ( prepare_bulk_insert_json_files, + prepare_bulk_insert_new_json_files, prepare_bulk_insert_numpy_files, + prepare_bulk_insert_parquet_files, prepare_bulk_insert_csv_files, DataField as df, ) @@ -24,8 +27,9 @@ default_multi_fields = [ df.string_field, df.bool_field, df.float_field, + df.array_int_field ] -default_vec_n_int_fields = [df.vec_field, df.int_field] +default_vec_n_int_fields = [df.vec_field, df.int_field, df.array_int_field] # milvus_ns = "chaos-testing" @@ -267,6 +271,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_int64_field(name=df.pk_field, is_primary=True), cf.gen_float_vec_field(name=df.vec_field, dim=dim), cf.gen_int32_field(name=df.int_field), + cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT32), ] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) @@ -604,7 +609,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert): @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [1000]) # 1000 - def test_with_all_field_numpy(self, auto_id, dim, entities): + @pytest.mark.parametrize("enable_dynamic_field", [True, False]) + def test_with_all_field_json(self, auto_id, dim, entities, enable_dynamic_field): """ collection schema 1: [pk, int64, float64, string float_vector] data file: vectors.npy and uid.npy, @@ -613,26 +619,31 @@ class TestBulkInsert(TestcaseBaseBulkInsert): 2. import data 3. verify """ - data_fields = [df.pk_field, df.int_field, df.float_field, df.double_field, df.vec_field] fields = [ cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), cf.gen_int64_field(name=df.int_field), cf.gen_float_field(name=df.float_field), cf.gen_double_field(name=df.double_field), + cf.gen_json_field(name=df.json_field), + cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64), + cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT), + cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR), + cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL), cf.gen_float_vec_field(name=df.vec_field, dim=dim), ] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] - files = prepare_bulk_insert_numpy_files( + files = prepare_bulk_insert_json_files( minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, rows=entities, dim=dim, data_fields=data_fields, + enable_dynamic_field=enable_dynamic_field, force=True, ) self._connect() c_name = cf.gen_unique_str("bulk_insert") - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) self.collection_wrap.init_collection(c_name, schema=schema) # import data @@ -666,9 +677,327 @@ class TestBulkInsert(TestcaseBaseBulkInsert): df.vec_field, param=search_params, limit=1, + output_fields=["*"], check_task=CheckTasks.check_search_results, check_items={"nq": 1, "limit": 1}, ) + for hit in res: + for r in hit: + fields_from_search = r.fields.keys() + for f in fields: + assert f.name in fields_from_search + if enable_dynamic_field: + assert "name" in fields_from_search + assert "address" in fields_from_search + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("dim", [2]) # 128 + @pytest.mark.parametrize("entities", [2]) # 1000 + @pytest.mark.parametrize("enable_dynamic_field", [True]) + def test_bulk_insert_all_field_with_new_json_format(self, auto_id, dim, entities, enable_dynamic_field): + """ + collection schema 1: [pk, int64, float64, string float_vector] + data file: vectors.npy and uid.npy, + Steps: + 1. create collection + 2. import data + 3. verify + """ + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), + cf.gen_int64_field(name=df.int_field), + cf.gen_float_field(name=df.float_field), + cf.gen_double_field(name=df.double_field), + cf.gen_json_field(name=df.json_field), + cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64), + cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT), + cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR), + cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + ] + data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + files = prepare_bulk_insert_new_json_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + rows=entities, + dim=dim, + data_fields=data_fields, + enable_dynamic_field=enable_dynamic_field, + force=True, + ) + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) + self.collection_wrap.init_collection(c_name, schema=schema) + + # import data + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, files=files + ) + logging.info(f"bulk insert task ids:{task_id}") + success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=300 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{success} in {tt} with states:{states}") + assert success + num_entities = self.collection_wrap.num_entities + log.info(f" collection entities: {num_entities}") + assert num_entities == entities + # verify imported data is available for search + index_params = ct.default_index + self.collection_wrap.create_index( + field_name=df.vec_field, index_params=index_params + ) + self.collection_wrap.load() + log.info(f"wait for load finished and be ready for search") + time.sleep(2) + # log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") + search_data = cf.gen_vectors(1, dim) + search_params = ct.default_search_params + res, _ = self.collection_wrap.search( + search_data, + df.vec_field, + param=search_params, + limit=1, + output_fields=["*"], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": 1}, + ) + for hit in res: + for r in hit: + fields_from_search = r.fields.keys() + for f in fields: + assert f.name in fields_from_search + if enable_dynamic_field: + assert "name" in fields_from_search + assert "address" in fields_from_search + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("auto_id", [True, False]) + @pytest.mark.parametrize("dim", [128]) # 128 + @pytest.mark.parametrize("entities", [1000]) # 1000 + @pytest.mark.parametrize("enable_dynamic_field", [True, False]) + def test_bulk_insert_all_field_with_numpy(self, auto_id, dim, entities, enable_dynamic_field): + """ + collection schema 1: [pk, int64, float64, string float_vector] + data file: vectors.npy and uid.npy, + note: numpy file is not supported for array field + Steps: + 1. create collection + 2. import data + 3. verify + """ + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), + cf.gen_int64_field(name=df.int_field), + cf.gen_float_field(name=df.float_field), + cf.gen_double_field(name=df.double_field), + cf.gen_json_field(name=df.json_field), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + ] + data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + files = prepare_bulk_insert_numpy_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + rows=entities, + dim=dim, + data_fields=data_fields, + force=True, + enable_dynamic_field=enable_dynamic_field, + ) + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) + self.collection_wrap.init_collection(c_name, schema=schema) + + # import data + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, files=files + ) + logging.info(f"bulk insert task ids:{task_id}") + success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=300 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{success} in {tt} with states:{states}") + assert success + num_entities = self.collection_wrap.num_entities + log.info(f" collection entities: {num_entities}") + assert num_entities == entities + # verify imported data is available for search + index_params = ct.default_index + self.collection_wrap.create_index( + field_name=df.vec_field, index_params=index_params + ) + self.collection_wrap.load() + log.info(f"wait for load finished and be ready for search") + time.sleep(2) + # log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") + search_data = cf.gen_vectors(1, dim) + search_params = ct.default_search_params + res, _ = self.collection_wrap.search( + search_data, + df.vec_field, + param=search_params, + limit=1, + output_fields=["*"], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": 1}, + ) + for hit in res: + for r in hit: + fields_from_search = r.fields.keys() + for f in fields: + assert f.name in fields_from_search + if enable_dynamic_field: + assert "name" in fields_from_search + assert "address" in fields_from_search + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("auto_id", [True, False]) + @pytest.mark.parametrize("dim", [128]) # 128 + @pytest.mark.parametrize("entities", [1000]) # 1000 + @pytest.mark.parametrize("file_nums", [1]) + @pytest.mark.parametrize("array_len", [None, 0, 100]) + @pytest.mark.parametrize("enable_dynamic_field", [True, False]) + def test_bulk_insert_all_field_with_parquet(self, auto_id, dim, entities, file_nums, array_len, enable_dynamic_field): + """ + collection schema 1: [pk, int64, float64, string float_vector] + data file: vectors.parquet and uid.parquet, + Steps: + 1. create collection + 2. import data + 3. verify + """ + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), + cf.gen_int64_field(name=df.int_field), + cf.gen_float_field(name=df.float_field), + cf.gen_double_field(name=df.double_field), + cf.gen_json_field(name=df.json_field), + cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64), + cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT), + cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR), + cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + ] + data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + files = prepare_bulk_insert_parquet_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + rows=entities, + dim=dim, + data_fields=data_fields, + file_nums=file_nums, + array_length=array_len, + enable_dynamic_field=enable_dynamic_field, + force=True, + ) + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) + self.collection_wrap.init_collection(c_name, schema=schema) + + # import data + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, files=files + ) + logging.info(f"bulk insert task ids:{task_id}") + success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=300 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{success} in {tt} with states:{states}") + assert success + num_entities = self.collection_wrap.num_entities + log.info(f" collection entities: {num_entities}") + assert num_entities == entities + # verify imported data is available for search + index_params = ct.default_index + self.collection_wrap.create_index( + field_name=df.vec_field, index_params=index_params + ) + self.collection_wrap.load() + log.info(f"wait for load finished and be ready for search") + time.sleep(2) + # log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") + search_data = cf.gen_vectors(1, dim) + search_params = ct.default_search_params + res, _ = self.collection_wrap.search( + search_data, + df.vec_field, + param=search_params, + limit=1, + output_fields=["*"], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": 1}, + ) + for hit in res: + for r in hit: + fields_from_search = r.fields.keys() + for f in fields: + assert f.name in fields_from_search + if enable_dynamic_field: + assert "name" in fields_from_search + assert "address" in fields_from_search + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("dim", [128]) # 128 + @pytest.mark.parametrize("entities", [1000]) # 1000 + @pytest.mark.parametrize("file_nums", [0, 10]) + @pytest.mark.parametrize("array_len", [1]) + def test_with_wrong_parquet_file_num(self, auto_id, dim, entities, file_nums, array_len): + """ + collection schema 1: [pk, int64, float64, string float_vector] + data file: vectors.parquet and uid.parquet, + Steps: + 1. create collection + 2. import data + 3. verify failure, because only one file is allowed + """ + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), + cf.gen_int64_field(name=df.int_field), + cf.gen_float_field(name=df.float_field), + cf.gen_double_field(name=df.double_field), + cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64), + cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT), + cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR), + cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + ] + data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + files = prepare_bulk_insert_parquet_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + rows=entities, + dim=dim, + data_fields=data_fields, + file_nums=file_nums, + array_length=array_len, + force=True, + ) + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) + self.collection_wrap.init_collection(c_name, schema=schema) + + # import data + error = {} + if file_nums == 0: + error = {ct.err_code: 1100, ct.err_msg: "import request is empty: invalid parameter"} + if file_nums > 1: + error = {ct.err_code: 65535, ct.err_msg: "for JSON or parquet file, each task only accepts one file"} + self.utility_wrap.do_bulk_insert( + collection_name=c_name, files=files, + check_task=CheckTasks.err_res, check_items=error + ) @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True, False]) @@ -781,6 +1110,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_string_field(name=df.string_field, is_partition_key=(par_key_field == df.string_field)), cf.gen_bool_field(name=df.bool_field), cf.gen_float_field(name=df.float_field), + cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64) ] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema, num_partitions=10)