mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 19:08:30 +08:00
verify data_queryable state after bulk load (#17601)
Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>
This commit is contained in:
parent
53a49a9255
commit
bea6fcd5fd
@ -50,12 +50,23 @@ class ApiUtilityWrapper:
|
|||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
state, _ = self.get_bulk_load_state(task_id, task_timeout, using, **kwargs)
|
state, _ = self.get_bulk_load_state(task_id, task_timeout, using, **kwargs)
|
||||||
if state.state_name == target_state:
|
if target_state == BulkLoadStates.BulkLoadDataQueryable:
|
||||||
successes[task_id] = state
|
if state.data_queryable is True:
|
||||||
elif state.state_name == BulkLoadStates.BulkLoadFailed:
|
successes[task_id] = True
|
||||||
fails[task_id] = state
|
else:
|
||||||
|
in_progress[task_id] = False
|
||||||
|
elif target_state == BulkLoadStates.BulkLoadDataIndexed:
|
||||||
|
if state.data_indexed is True:
|
||||||
|
successes[task_id] = True
|
||||||
|
else:
|
||||||
|
in_progress[task_id] = False
|
||||||
else:
|
else:
|
||||||
in_progress[task_id] = state
|
if state.state_name == target_state:
|
||||||
|
successes[task_id] = state
|
||||||
|
elif state.state_name == BulkLoadStates.BulkLoadFailed:
|
||||||
|
fails[task_id] = state
|
||||||
|
else:
|
||||||
|
in_progress[task_id] = state
|
||||||
end = time.time()
|
end = time.time()
|
||||||
if timeout is not None:
|
if timeout is not None:
|
||||||
if end - start > timeout:
|
if end - start > timeout:
|
||||||
@ -215,3 +226,4 @@ class ApiUtilityWrapper:
|
|||||||
res, _ = api_request([self.ut.mkts_from_hybridts, hybridts, milliseconds, delta])
|
res, _ = api_request([self.ut.mkts_from_hybridts, hybridts, milliseconds, delta])
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
from time import sleep
|
|
||||||
import pytest
|
import pytest
|
||||||
import random
|
import random
|
||||||
from base.client_base import TestcaseBase
|
from base.client_base import TestcaseBase
|
||||||
@ -196,7 +195,7 @@ class TestBulkLoad(TestcaseBase):
|
|||||||
logging.info(f"bulk load task ids:{task_ids}")
|
logging.info(f"bulk load task ids:{task_ids}")
|
||||||
success, state = self.utility_wrap.\
|
success, state = self.utility_wrap.\
|
||||||
wait_for_bulk_load_tasks_completed(task_ids=task_ids,
|
wait_for_bulk_load_tasks_completed(task_ids=task_ids,
|
||||||
target_state=BulkLoadStates.BulkLoadPersisted,
|
target_state=BulkLoadStates.BulkLoadDataQueryable,
|
||||||
timeout=30)
|
timeout=30)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
log.info(f"bulk load state:{success} in {tt}")
|
log.info(f"bulk load state:{success} in {tt}")
|
||||||
@ -205,8 +204,6 @@ class TestBulkLoad(TestcaseBase):
|
|||||||
assert m_partition.num_entities == entities
|
assert m_partition.num_entities == entities
|
||||||
assert self.collection_wrap.num_entities == entities
|
assert self.collection_wrap.num_entities == entities
|
||||||
|
|
||||||
# TODO: remove sleep when indexed state ready
|
|
||||||
sleep(5)
|
|
||||||
res, _ = self.utility_wrap.index_building_progress(c_name)
|
res, _ = self.utility_wrap.index_building_progress(c_name)
|
||||||
exp_res = {'total_rows': entities, 'indexed_rows': entities}
|
exp_res = {'total_rows': entities, 'indexed_rows': entities}
|
||||||
assert res == exp_res
|
assert res == exp_res
|
||||||
@ -230,7 +227,7 @@ class TestBulkLoad(TestcaseBase):
|
|||||||
@pytest.mark.parametrize("row_based", [True, False])
|
@pytest.mark.parametrize("row_based", [True, False])
|
||||||
@pytest.mark.parametrize("auto_id", [True, False])
|
@pytest.mark.parametrize("auto_id", [True, False])
|
||||||
@pytest.mark.parametrize("dim", [16])
|
@pytest.mark.parametrize("dim", [16])
|
||||||
@pytest.mark.parametrize("entities", [200])
|
@pytest.mark.parametrize("entities", [2000])
|
||||||
@pytest.mark.xfail(reason="issue #16890")
|
@pytest.mark.xfail(reason="issue #16890")
|
||||||
def test_binary_vector_only(self, row_based, auto_id, dim, entities):
|
def test_binary_vector_only(self, row_based, auto_id, dim, entities):
|
||||||
"""
|
"""
|
||||||
@ -269,20 +266,18 @@ class TestBulkLoad(TestcaseBase):
|
|||||||
# TODO: Update to BulkLoadDataIndexed when issue #16889 fixed
|
# TODO: Update to BulkLoadDataIndexed when issue #16889 fixed
|
||||||
success, _ = self.utility_wrap.wait_for_bulk_load_tasks_completed(
|
success, _ = self.utility_wrap.wait_for_bulk_load_tasks_completed(
|
||||||
task_ids=task_ids,
|
task_ids=task_ids,
|
||||||
target_state=BulkLoadStates.BulkLoadPersisted,
|
target_state=BulkLoadStates.BulkLoadDataIndexed,
|
||||||
timeout=30)
|
timeout=30)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
log.info(f"bulk load state:{success} in {tt}")
|
log.info(f"bulk load state:{success} in {tt}")
|
||||||
assert success
|
assert success
|
||||||
|
|
||||||
# # verify build index status
|
# TODO: verify build index after #16890 fixed
|
||||||
# sleep(10)
|
|
||||||
# # TODO: verify build index after issue #16890 fixed
|
|
||||||
# res, _ = self.utility_wrap.index_building_progress(c_name)
|
# res, _ = self.utility_wrap.index_building_progress(c_name)
|
||||||
# exp_res = {'total_rows': entities, 'indexed_rows': entities}
|
# exp_res = {'total_rows': entities, 'indexed_rows': entities}
|
||||||
# assert res == exp_res
|
# assert res == exp_res
|
||||||
|
|
||||||
# TODO: verify num entities
|
# verify num entities
|
||||||
assert self.collection_wrap.num_entities == entities
|
assert self.collection_wrap.num_entities == entities
|
||||||
|
|
||||||
# load collection
|
# load collection
|
||||||
@ -539,8 +534,9 @@ class TestBulkLoad(TestcaseBase):
|
|||||||
@pytest.mark.parametrize("auto_id", [True, False]) # True, False
|
@pytest.mark.parametrize("auto_id", [True, False]) # True, False
|
||||||
@pytest.mark.parametrize("dim", [16]) # 16
|
@pytest.mark.parametrize("dim", [16]) # 16
|
||||||
@pytest.mark.parametrize("entities", [100]) # 3000
|
@pytest.mark.parametrize("entities", [100]) # 3000
|
||||||
@pytest.mark.parametrize("file_nums", [32]) # 10 # TODO: more after issue #17152 fixed
|
@pytest.mark.parametrize("file_nums", [32]) # 10
|
||||||
@pytest.mark.parametrize("multi_folder", [True, False]) # True, False
|
@pytest.mark.parametrize("multi_folder", [True, False]) # True, False
|
||||||
|
@pytest.mark.xfail(reason="issue #17600")
|
||||||
# TODO: reason="BulkloadIndexed cannot be reached for issue #16889")
|
# TODO: reason="BulkloadIndexed cannot be reached for issue #16889")
|
||||||
def test_float_vector_from_multi_files(self, row_based, auto_id, dim, entities, file_nums, multi_folder):
|
def test_float_vector_from_multi_files(self, row_based, auto_id, dim, entities, file_nums, multi_folder):
|
||||||
"""
|
"""
|
||||||
@ -827,6 +823,7 @@ class TestBulkLoad(TestcaseBase):
|
|||||||
@pytest.mark.parametrize("dim", [128]) # 128
|
@pytest.mark.parametrize("dim", [128]) # 128
|
||||||
@pytest.mark.parametrize("entities", [1000]) # 1000
|
@pytest.mark.parametrize("entities", [1000]) # 1000
|
||||||
@pytest.mark.parametrize("with_int_field", [True, False])
|
@pytest.mark.parametrize("with_int_field", [True, False])
|
||||||
|
@pytest.mark.xfail(reason="issue #17600")
|
||||||
def test_with_uid_n_int_numpy(self, auto_id, dim, entities, with_int_field):
|
def test_with_uid_n_int_numpy(self, auto_id, dim, entities, with_int_field):
|
||||||
"""
|
"""
|
||||||
collection schema 1: [pk, float_vector]
|
collection schema 1: [pk, float_vector]
|
||||||
@ -881,8 +878,9 @@ class TestBulkLoad(TestcaseBase):
|
|||||||
@pytest.mark.tags(CaseLabel.L3)
|
@pytest.mark.tags(CaseLabel.L3)
|
||||||
@pytest.mark.parametrize("auto_id", [True, False])
|
@pytest.mark.parametrize("auto_id", [True, False])
|
||||||
@pytest.mark.parametrize("dim", [6])
|
@pytest.mark.parametrize("dim", [6])
|
||||||
@pytest.mark.parametrize("entities", [1000])
|
@pytest.mark.parametrize("entities", [2000])
|
||||||
@pytest.mark.parametrize("file_nums", [10])
|
@pytest.mark.parametrize("file_nums", [10])
|
||||||
|
@pytest.mark.xfail(reason="issue #17597")
|
||||||
def test_multi_numpy_files_from_diff_folders(self, auto_id, dim, entities, file_nums):
|
def test_multi_numpy_files_from_diff_folders(self, auto_id, dim, entities, file_nums):
|
||||||
"""
|
"""
|
||||||
collection schema 1: [pk, float_vector]
|
collection schema 1: [pk, float_vector]
|
||||||
@ -909,16 +907,18 @@ class TestBulkLoad(TestcaseBase):
|
|||||||
data_fields = [df.vec_field]
|
data_fields = [df.vec_field]
|
||||||
if not auto_id:
|
if not auto_id:
|
||||||
data_fields.append(df.pk_field)
|
data_fields.append(df.pk_field)
|
||||||
|
task_ids = []
|
||||||
for i in range(file_nums):
|
for i in range(file_nums):
|
||||||
files = prepare_bulk_load_numpy_files(rows=entities, dim=dim,
|
files = prepare_bulk_load_numpy_files(rows=entities, dim=dim,
|
||||||
data_fields=data_fields,
|
data_fields=data_fields,
|
||||||
file_nums=1, force=True)
|
file_nums=1, force=True)
|
||||||
task_ids, _ = self.utility_wrap.bulk_load(collection_name=c_name,
|
task_id, _ = self.utility_wrap.bulk_load(collection_name=c_name,
|
||||||
row_based=row_based,
|
row_based=row_based,
|
||||||
files=files)
|
files=files)
|
||||||
|
task_ids.append(task_id[0])
|
||||||
success, states = self.utility_wrap.\
|
success, states = self.utility_wrap.\
|
||||||
wait_for_bulk_load_tasks_completed(task_ids=task_ids,
|
wait_for_bulk_load_tasks_completed(task_ids=task_ids,
|
||||||
target_state=BulkLoadStates.BulkLoadPersisted,
|
target_state=BulkLoadStates.BulkLoadDataQueryable,
|
||||||
timeout=30)
|
timeout=30)
|
||||||
log.info(f"bulk load state:{success}")
|
log.info(f"bulk load state:{success}")
|
||||||
|
|
||||||
@ -927,7 +927,6 @@ class TestBulkLoad(TestcaseBase):
|
|||||||
assert self.collection_wrap.num_entities == entities * file_nums
|
assert self.collection_wrap.num_entities == entities * file_nums
|
||||||
|
|
||||||
# verify search and query
|
# verify search and query
|
||||||
# sleep(10)
|
|
||||||
search_data = cf.gen_vectors(1, dim)
|
search_data = cf.gen_vectors(1, dim)
|
||||||
search_params = ct.default_search_params
|
search_params = ct.default_search_params
|
||||||
res, _ = self.collection_wrap.search(search_data, df.vec_field,
|
res, _ = self.collection_wrap.search(search_data, df.vec_field,
|
||||||
@ -1903,4 +1902,4 @@ class TestBulkLoadAdvanced(TestcaseBase):
|
|||||||
"limit": 1})
|
"limit": 1})
|
||||||
# self.collection_wrap.query(expr=f"id in {ids}")
|
# self.collection_wrap.query(expr=f"id in {ids}")
|
||||||
|
|
||||||
"""Validate data consistency and availability during import"""
|
"""Validate data consistency and availability during import"""
|
||||||
|
Loading…
Reference in New Issue
Block a user