Update bulk_insert to do_bulk_insert (#20277)

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>
This commit is contained in:
yanliang567 2022-11-02 20:35:35 +08:00 committed by GitHub
parent c2744bdf49
commit 3ac87fe025
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 160 additions and 155 deletions

View File

@ -18,13 +18,13 @@ class ApiUtilityWrapper:
ut = utility
role = None
def bulk_insert(self, collection_name, files="", partition_name=None, timeout=None,
using="default", check_task=None, check_items=None, **kwargs):
def do_bulk_insert(self, collection_name, files="", partition_name=None, timeout=None,
using="default", check_task=None, check_items=None, **kwargs):
working_tasks = self.get_bulk_insert_working_list()
log.info(f"before bulk load, there are {len(working_tasks)} working tasks")
log.info(f"files to load: {files}")
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.bulk_insert, collection_name,
res, is_succ = api_request([self.ut.do_bulk_insert, collection_name,
files, partition_name, timeout, using], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
collection_name=collection_name, using=using).run()

View File

@ -116,14 +116,14 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task id:{task_id}")
success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -198,12 +198,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
completed, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{completed} in {tt}")
@ -294,14 +294,14 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# import data into the partition
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=p_name,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, state = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -386,11 +386,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.load()
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(collection_name=c_name,
files=files)
logging.info(f"bulk insert task ids:{task_ids}")
task_id, _ = self.utility_wrap.do_bulk_insert(collection_name=c_name,
files=files)
logging.info(f"bulk insert task ids:{task_id}")
success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -478,12 +478,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.load()
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -590,12 +590,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.num_entities
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -679,12 +679,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.load()
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -781,12 +781,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.load()
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -885,51 +885,54 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.load()
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, files=files
err_msg = "row-based import, only allow one JSON file each time"
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files,
check_task=CheckTasks.err_res, check_items={"err_code": 1, "err_msg": err_msg},
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
if not is_row_based:
assert not success
failed_reason = "is duplicated" # "the field xxx is duplicated"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
else:
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities * file_nums
# verify index built
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {'total_rows': entities * file_nums, 'indexed_rows': entities * file_nums}
assert res == exp_res
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
nq = 5
topk = 1
search_data = cf.gen_vectors(nq, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": topk},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
# logging.info(f"bulk insert task ids:{task_id}")
# success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
# task_ids=[task_id], timeout=90
# )
# tt = time.time() - t0
# log.info(f"bulk insert state:{success} in {tt}")
# if not is_row_based:
# assert not success
# failed_reason = "is duplicated" # "the field xxx is duplicated"
# for state in states.values():
# assert state.state_name in ["Failed", "Failed and cleaned"]
# assert failed_reason in state.infos.get("failed_reason", "")
# else:
# assert success
# num_entities = self.collection_wrap.num_entities
# log.info(f" collection entities: {num_entities}")
# assert num_entities == entities * file_nums
#
# # verify index built
# res, _ = self.utility_wrap.index_building_progress(c_name)
# exp_res = {'total_rows': entities * file_nums, 'indexed_rows': entities * file_nums}
# assert res == exp_res
#
# # verify search and query
# log.info(f"wait for load finished and be ready for search")
# time.sleep(5)
# nq = 5
# topk = 1
# search_data = cf.gen_vectors(nq, dim)
# search_params = ct.default_search_params
# res, _ = self.collection_wrap.search(
# search_data,
# df.vec_field,
# param=search_params,
# limit=topk,
# check_task=CheckTasks.check_search_results,
# check_items={"nq": nq, "limit": topk},
# )
# for hits in res:
# ids = hits.ids
# results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
# assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True])
@ -1016,12 +1019,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -1110,12 +1113,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
schema = cf.gen_collection_schema(fields=fields, auto_id=False)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
log.info(f"bulk insert state:{success}")
assert success
@ -1174,12 +1177,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
log.info(f"bulk insert state:{success}")
assert success
@ -1245,12 +1248,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -1324,12 +1327,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
file_nums=1,
force=True,
)
task_id, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
task_ids.append(task_id[0])
task_ids.append(task_id)
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
log.info(f"bulk insert state:{success}")
@ -1380,14 +1383,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
assert not success
failed_reason = f"failed to get file size of '{files[0]}'"
@ -1429,14 +1432,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
assert not success
failed_reason = "JSON parser: row count is 0"
@ -1449,7 +1452,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [8]) # 8
@pytest.mark.parametrize("entities", [100]) # 100
@pytest.mark.xfail(reason="issue https://github.com/milvus-io/milvus/issues/19658")
# @pytest.mark.xfail(reason="issue https://github.com/milvus-io/milvus/issues/19658")
def test_wrong_file_type(self, is_row_based, auto_id, dim, entities):
"""
collection schema: [pk, float_vector]
@ -1491,22 +1494,24 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
log.info(schema)
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert not success
failed_reason = "unsupported file type"
failed_reason = f"the file '{files[0]}' has no corresponding field in collection"
failed_reason2 = "unsupportted file type"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
assert failed_reason in state.infos.get("failed_reason", "") or \
failed_reason2 in state.infos.get("failed_reason", "")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True])
@ -1543,14 +1548,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
assert not success
if is_row_based:
@ -1597,14 +1602,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -1654,14 +1659,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -1713,14 +1718,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name="",
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -1765,12 +1770,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
log.info(f"bulk insert state:{success}")
assert not success
@ -1802,7 +1807,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
c_name = cf.gen_unique_str("bulk_insert")
# import data into a non existing collection
err_msg = f"can't find collection: {c_name}"
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
files=files,
check_task=CheckTasks.err_res,
@ -1841,7 +1846,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
# import data into a non existing partition
p_name = "non_existing"
err_msg = f" partition {p_name} does not exist"
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=p_name,
files=files,
@ -1886,12 +1891,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
log.info(f"bulk insert state:{success}")
assert not success
@ -1909,6 +1914,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
@pytest.mark.parametrize("dim", [16])
@pytest.mark.parametrize("entities", [300])
@pytest.mark.parametrize("file_nums", [10]) # max task nums 32? need improve
@pytest.mark.skip(reason="not support multiple files now")
def test_float_vector_one_of_files_fail(
self, is_row_based, auto_id, dim, entities, file_nums
):
@ -1960,12 +1966,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -2012,12 +2018,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -2068,12 +2074,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
log.info(schema)
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -2119,12 +2125,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -2174,12 +2180,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
schema = cf.gen_collection_schema(fields=fields, auto_id=False)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
log.info(f"bulk insert state:{success}")
assert not success
@ -2229,12 +2235,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
log.info(f"bulk insert state:{success}")
assert not success
@ -2295,11 +2301,11 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -2350,12 +2356,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
log.info(f"bulk insert state:{success}")
assert not success
@ -2412,12 +2418,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
log.info(f"bulk insert state:{success}")
assert not success
@ -2488,12 +2494,12 @@ class TestBulkInsertAdvanced(TestcaseBaseBulkInsert):
check_flag = False
break
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=180
task_ids=[task_id], timeout=180
)
tt = time.time() - t0
log.info(

View File

@ -118,15 +118,14 @@ class TestBulkInsertTaskClean(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
# is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -220,15 +219,15 @@ class TestBulkInsertTaskClean(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
@ -245,7 +244,7 @@ class TestBulkInsertTaskClean(TestcaseBaseBulkInsert):
log.info(f" collection entities: {num_entities}")
assert num_entities == 0
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
task_ids=[task_id], timeout=90
)
assert not success
for state in states.values():