milvus/tests/python_client/testcases/test_partition_20.py
binbin 2fdd534bd4
[skip e2e] Fix typos in test comments (#15059)
Signed-off-by: Binbin Lv <binbin.lv@zilliz.com>
2022-01-10 13:09:13 +08:00

814 lines
32 KiB
Python

from os import name
import threading
import pytest
from base.partition_wrapper import ApiPartitionWrapper
from base.client_base import TestcaseBase
from common import common_func as cf
from common import common_type as ct
from common.common_type import CaseLabel, CheckTasks
from common.code_mapping import PartitionErrorMessage
prefix = "partition_"
class TestPartitionParams(TestcaseBase):
""" Test case of partition interface in parameters"""
@pytest.mark.tags(CaseLabel.L0)
def test_partition_default(self):
"""
target: verify create a partition
method: create a partition
expected: create successfully
"""
# create collection
collection_w = self.init_collection_wrap()
# create partition
partition_name = cf.gen_unique_str(prefix)
description = cf.gen_unique_str("desc_")
self.init_partition_wrap(collection_w, partition_name,
description=description,
check_task=CheckTasks.check_partition_property,
check_items={"name": partition_name, "description": description,
"is_empty": True, "num_entities": 0}
)
# check that the partition has been created
assert collection_w.has_partition(partition_name)[0]
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("partition_name", [""])
def test_partition_empty_name(self, partition_name):
"""
target: verify create a partition with empty name
method: create a partition with empty name
expected: raise exception
"""
# create a collection
collection_w = self.init_collection_wrap()
# create partition
self.partition_wrap.init_partition(collection_w.collection, partition_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1,
ct.err_msg: "Partition name should not be empty"})
@pytest.mark.tags(CaseLabel.L2)
def test_partition_empty_description(self):
"""
target: verify create a partition with empty description
method: create a partition with empty description
expected: create successfully
"""
# create collection
collection_w = self.init_collection_wrap()
# init partition
partition_name = cf.gen_unique_str(prefix)
description = ""
self.init_partition_wrap(collection_w, partition_name,
description=description,
check_task=CheckTasks.check_partition_property,
check_items={"name": partition_name, "description": description,
"is_empty": True, "num_entities": 0}
)
# check that the partition has been created
assert collection_w.has_partition(partition_name)[0]
@pytest.mark.tags(CaseLabel.L2)
def test_partition_max_description_length(self):
"""
target: verify create a partition with 255 length name and 1024 length description
method: create a partition with 255 length name and 1024 length description
expected: create successfully
"""
# create collection
collection_w = self.init_collection_wrap()
# init partition
partition_name = cf.gen_str_by_length(255)
description = cf.gen_str_by_length(2048)
self.init_partition_wrap(collection_w, partition_name,
description=description,
check_task=CheckTasks.check_partition_property,
check_items={"name": partition_name, "description": description,
"is_empty": True}
)
@pytest.mark.tags(CaseLabel.L1)
def test_partition_dup_name(self):
"""
target: verify create partitions with duplicate names
method: create partitions with duplicate names
expected: 1. create successfully
2. the same partition returned with diff object ids
"""
# create a collection
collection_w = self.init_collection_wrap()
# create two partitions
partition_name = cf.gen_unique_str(prefix)
description = cf.gen_unique_str()
partition_w1 = self.init_partition_wrap(collection_w, partition_name, description)
partition_w2 = self.init_partition_wrap(collection_w, partition_name, description)
# public check func to be extracted
assert id(partition_w1.partition) != id(partition_w2.partition)
assert partition_w1.name == partition_w2.name
assert partition_w1.description == partition_w2.description
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("description", ct.get_invalid_strs)
def test_partition_special_chars_description(self, description):
"""
target: verify create a partition with special characters in description
method: create a partition with special characters in description
expected: create successfully
"""
# create collection
collection_w = self.init_collection_wrap()
# create partition
partition_name = cf.gen_unique_str(prefix)
self.init_partition_wrap(collection_w, partition_name,
description=description,
check_task=CheckTasks.check_partition_property,
check_items={"name": partition_name, "description": description,
"is_empty": True, "num_entities": 0}
)
assert collection_w.has_partition(partition_name)[0]
@pytest.mark.tags(CaseLabel.L0)
def test_partition_default_name(self):
"""
target: verify create a partition with default name
method: 1. get the _default partition
2. create a partition with _default name
expected: the same partition returned
"""
# create collection
collection_w = self.init_collection_wrap()
# check that the default partition exists
assert collection_w.has_partition(ct.default_partition_name)[0]
# check that can get the _default partition
collection, _ = collection_w.partition(ct.default_partition_name)
# check that init the _default partition object
partition_w = self.init_partition_wrap(collection_w, ct.default_partition_name)
assert collection.name == partition_w.name
@pytest.mark.tags(CaseLabel.L2)
def test_partition_max_length_name(self):
"""
target: verify create a partition with max length(256) name
method: create a partition with max length name
expected: raise exception
"""
# create collection
collection_w = self.init_collection_wrap()
# create partition
partition_name = cf.gen_str_by_length(256)
self.partition_wrap.init_partition(collection_w.collection, partition_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, 'err_msg': "is illegal"}
)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("partition_name", ct.get_invalid_strs)
def test_partition_invalid_name(self, partition_name):
"""
target: verify create a partition with invalid name
method: create a partition with invalid names
expected: raise exception
"""
# create collection
collection_w = self.init_collection_wrap()
# create partition
self.partition_wrap.init_partition(collection_w.collection, partition_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, 'err_msg': "is illegal"}
)
# TODO: need an error code issue #5144 and assert independently
@pytest.mark.tags(CaseLabel.L2)
def test_partition_none_collection(self):
"""
target: verify create a partition with none collection
method: create a partition with none collection
expected: raise exception
"""
# create partition with collection is None
partition_name = cf.gen_unique_str(prefix)
self.partition_wrap.init_partition(collection=None, name=partition_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1,
ct.err_msg: "must be pymilvus.Collection"})
@pytest.mark.tags(CaseLabel.L1)
def test_partition_drop(self):
"""
target: verify drop a partition in one collection
method: 1. create a partition in one collection
2. drop the partition
expected: drop successfully
"""
# create collection
collection_w = self.init_collection_wrap()
# create partition
partition_name = cf.gen_unique_str(prefix)
partition_w = self.init_partition_wrap(collection_w, partition_name)
# check that the partition exists
assert collection_w.has_partition(partition_name)[0]
# drop partition
partition_w.drop()
# check that the partition not exists
assert not collection_w.has_partition(partition_name)[0]
@pytest.mark.tags(CaseLabel.L2)
def test_load_partiton_respectively(self):
"""
target: test release the partition after load partition
method: load partition1 and load another partition
expected: raise exception
"""
self._connect()
collection_w = self.init_collection_wrap()
partition_w1 = self.init_partition_wrap(collection_w)
partition_w2 = self.init_partition_wrap(collection_w)
partition_w1.insert(cf.gen_default_list_data())
partition_w2.insert(cf.gen_default_list_data())
partition_w1.load()
error = {ct.err_code: 1, ct.err_msg: f'load the partition after load collection is not supported'}
partition_w2.load(check_task=CheckTasks.err_res,
check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_load_partitions_after_release(self):
"""
target: test release the partition after load partition
method: load partitions and release partitions
expected: no exception
"""
self._connect()
collection_w = self.init_collection_wrap()
partition_w1 = self.init_partition_wrap(collection_w, name="partition_w1")
partition_w2 = self.init_partition_wrap(collection_w, name="partition_w2")
partition_w1.insert(cf.gen_default_list_data())
partition_w2.insert(cf.gen_default_list_data())
partition_names = ["partition_w1", "partition_w2"]
collection_w.load(partition_names)
collection_w.release(partition_names)
@pytest.mark.tags(CaseLabel.L2)
def test_load_partition_after_load_partition(self):
"""
target: test release the partition after load partition
method: load partition1 and release the partition1
load partition2
expected: no exception
"""
self._connect()
collection_w = self.init_collection_wrap()
partition_w1 = self.init_partition_wrap(collection_w)
partition_w2 = self.init_partition_wrap(collection_w)
partition_w1.insert(cf.gen_default_list_data())
partition_w2.insert(cf.gen_default_list_data())
partition_w1.load()
partition_w1.release()
partition_w2.load()
@pytest.mark.tags(CaseLabel.L1)
def test_partition_release(self):
"""
target: verify release partition
method: 1. create a collection and two partitions
2. insert data into each partition
3. flush and load the partition1
4. release partition1
5. release partition2
expected: 1. the 1st partition is released
2. the 2nd partition is released
"""
# create collection
collection_w = self.init_collection_wrap()
# create two partitions
partition_w1 = self.init_partition_wrap(collection_w)
partition_w2 = self.init_partition_wrap(collection_w)
# insert data to two partition
partition_w1.insert(cf.gen_default_list_data())
partition_w2.insert(cf.gen_default_list_data())
# load two partitions
partition_w1.load()
# search partition1
search_vectors = cf.gen_vectors(1, ct.default_dim)
res1, _ = partition_w1.search(data=search_vectors,
anns_field=ct.default_float_vec_field_name,
params={"nprobe": 32}, limit=1)
assert len(res1) == 1
# release the first partition
partition_w1.release()
partition_w2.release()
# check result
res1, _ = partition_w1.search(data=search_vectors,
anns_field=ct.default_float_vec_field_name,
params={"nprobe": 32}, limit=1,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "partitions have been released"})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("data", [cf.gen_default_dataframe_data(10),
cf.gen_default_list_data(10),
cf.gen_default_tuple_data(10)])
def test_partition_insert(self, data):
"""
target: verify insert entities multiple times
method: 1. create a collection and a partition
2. partition.insert(data)
3. insert data again
expected: insert data successfully
"""
nums = 10
# create collection
collection_w = self.init_collection_wrap()
# create partition
partition_name = cf.gen_unique_str(prefix)
partition_w = self.init_partition_wrap(collection_w, partition_name,
check_task=CheckTasks.check_partition_property,
check_items={"name": partition_name,
"is_empty": True, "num_entities": 0}
)
# insert data
partition_w.insert(data)
# self._connect().flush([collection_w.name]) # don't need flush for issue #5737
assert not partition_w.is_empty
assert partition_w.num_entities == nums
# insert data
partition_w.insert(data)
# self._connect().flush([collection_w.name])
assert not partition_w.is_empty
assert partition_w.num_entities == (nums + nums)
class TestPartitionOperations(TestcaseBase):
""" Test case of partition interface in operations """
@pytest.mark.tags(CaseLabel.L1)
def test_partition_dropped_collection(self):
"""
target: verify create partition against a dropped collection
method: 1. create a collection
2. drop collection
3. create partition in collection
expected: raise exception
"""
# create collection
collection_w = self.init_collection_wrap()
# drop collection
collection_w.drop()
# create partition failed
self.partition_wrap.init_partition(collection_w.collection, cf.gen_unique_str(prefix),
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "can't find collection"})
@pytest.mark.tags(CaseLabel.L2)
def test_partition_same_name_in_diff_collections(self):
"""
target: verify create partitions with same name in diff collections
method: 1. create a partition in collection1
2. create a partition in collection2
expected: create successfully
"""
# create two collections
collection_w1 = self.init_collection_wrap()
collection_w2 = self.init_collection_wrap()
# create 2 partitions in 2 diff collections
partition_name = cf.gen_unique_str(prefix)
self.init_partition_wrap(collection_wrap=collection_w1, name=partition_name)
self.init_partition_wrap(collection_wrap=collection_w2, name=partition_name)
# check result
assert collection_w1.has_partition(partition_name)[0]
assert collection_w2.has_partition(partition_name)[0]
@pytest.mark.tags(CaseLabel.L2)
def test_partition_multi_partitions_in_collection(self):
"""
target: verify create multiple partitions in one collection
method: create multiple partitions in one collection
expected: create successfully
"""
# create collection
collection_w = self.init_collection_wrap()
for _ in range(10):
partition_name = cf.gen_unique_str(prefix)
# create partition with different names and check the partition exists
self.init_partition_wrap(collection_w, partition_name)
assert collection_w.has_partition(partition_name)[0]
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="skip temporarily for debug")
def test_partition_maximum_partitions(self):
"""
target: verify create maximum partitions
method: 1. create maximum partitions
2. create one more partition
expected: raise exception
"""
threads_num = 8
threads = []
def create_partition(collection, threads_n):
for _ in range(ct.max_partition_num // threads_n):
name = cf.gen_unique_str(prefix)
par_wrap = ApiPartitionWrapper()
par_wrap.init_partition(collection, name, check_task=CheckTasks.check_nothing)
collection_w = self.init_collection_wrap()
for _ in range(threads_num):
t = threading.Thread(target=create_partition, args=(collection_w.collection, threads_num))
threads.append(t)
t.start()
for t in threads:
t.join()
p_name = cf.gen_unique_str()
self.partition_wrap.init_partition(
collection_w.collection, p_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1,
ct.err_msg: "maximum partition's number should be limit to 4096"})
# TODO: Try to verify load collection with a large number of partitions. #11651
@pytest.mark.tags(CaseLabel.L0)
def test_partition_drop_default_partition(self):
"""
target: verify drop the _default partition
method: drop the _default partition
expected: raise exception
"""
# create collection
collection_w = self.init_collection_wrap()
# get the default partition
default_partition, _ = collection_w.partition(ct.default_partition_name)
partition_w = self.init_partition_wrap(collection_w, ct.default_partition_name)
assert default_partition.name == partition_w.name
# verify that drop partition with error
partition_w.drop(check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "default partition cannot be deleted"})
@pytest.mark.tags(CaseLabel.L1)
def test_partition_drop_partition_twice(self):
"""
target: verify drop the same partition twice
method: 1.create a partition with default schema
2. drop the partition
3. drop the same partition again
expected: raise exception for 2nd time
"""
# create collection
collection_w = self.init_collection_wrap()
# create partition
partition_name = cf.gen_unique_str(prefix)
partition_w = self.init_partition_wrap(collection_w, partition_name)
collection_w.has_partition(partition_name)
# drop partition
partition_w.drop()
assert not collection_w.has_partition(partition_name)[0]
# verify that drop the partition again with exception
partition_w.drop(check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: PartitionErrorMessage.PartitionNotExist})
@pytest.mark.tags(CaseLabel.L2)
def test_partition_create_and_drop_multi_times(self):
"""
target: verify create and drop for times
method: 1. create a partition with default schema
2. drop the partition
3. loop #1 and #2 for times
expected: create and drop successfully
"""
# create collection
collection_w = self.init_collection_wrap()
# range for 5 times
partition_name = cf.gen_unique_str(prefix)
for i in range(5):
# create partition and check that the partition exists
partition_w = self.init_partition_wrap(collection_w, partition_name)
assert collection_w.has_partition(partition_name)[0]
# drop partition and check that the partition not exists
partition_w.drop()
assert not collection_w.has_partition(partition_name)[0]
@pytest.mark.tags(CaseLabel.L1)
def test_partition_drop_non_empty_partition(self):
"""
target: verify drop a partition which has data inserted
method: 1. create a partition with default schema
2. insert some data
3. drop the partition
expected: drop successfully
"""
# create collection
collection_w = self.init_collection_wrap()
# create partition
partition_name = cf.gen_unique_str(prefix)
partition_w = self.init_partition_wrap(collection_w, partition_name)
assert collection_w.has_partition(partition_name)[0]
# insert data to partition
partition_w.insert(cf.gen_default_dataframe_data())
# drop partition
partition_w.drop()
assert not collection_w.has_partition(partition_name)[0]
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("data", [cf.gen_default_list_data(nb=3000)])
@pytest.mark.parametrize("index_param", cf.gen_simple_index())
def test_partition_drop_indexed_partition(self, data, index_param):
"""
target: verify drop an indexed partition
method: 1. create a partition
2. insert same data
3. create an index
4. drop the partition
expected: drop successfully
"""
# create collection
collection_w = self.init_collection_wrap()
# create partition
partition_name = cf.gen_unique_str(prefix)
partition_w = self.init_partition_wrap(collection_w, partition_name)
assert collection_w.has_partition(partition_name)[0]
# insert data to partition
ins_res, _ = partition_w.insert(data)
assert len(ins_res.primary_keys) == len(data[0])
# create index of collection
collection_w.create_index(ct.default_float_vec_field_name, index_param)
# drop partition
partition_w.drop()
assert not collection_w.has_partition(partition_name)[0]
@pytest.mark.tags(CaseLabel.L2)
def test_partition_release_empty_partition(self):
"""
target: verify release an empty partition
method: 1. create a partition
2. release the partition
expected: release successfully
"""
# create partition
partition_w = self.init_partition_wrap()
assert partition_w.is_empty
# release partition
partition_w.release()
# TODO: assert no more memory consumed
@pytest.mark.tags(CaseLabel.L2)
def test_partition_release_dropped_partition(self):
"""
target: verify release a dropped partition
method: 1. create a partition
2. drop the partition
3. release the partition
expected: raise exception
"""
# create partition
partition_w = self.init_partition_wrap()
# drop partition
partition_w.drop()
# release the dropped partition and check err response
partition_w.release(check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: PartitionErrorMessage.PartitionNotExist})
@pytest.mark.tags(CaseLabel.L2)
def test_partition_release_dropped_collection(self):
"""
target: verify release a dropped collection
method: 1. create a collection and partition
2. drop the collection
3. release the partition
expected: raise exception
"""
# create collection
collection_w = self.init_collection_wrap()
# create partition
partition_name = cf.gen_unique_str(prefix)
partition_w = self.init_partition_wrap(collection_w, partition_name)
assert collection_w.has_partition(partition_name)[0]
# drop collection
collection_w.drop()
# release the partition and check err response
partition_w.release(check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "can't find collection"})
@pytest.mark.tags(CaseLabel.L1)
def test_partition_release_after_collection_released(self):
"""
target: verify release a partition after the collection released
method: 1. create a collection and partition
2. insert some data
3. release the collection
4. release the partition
expected: partition released successfully
"""
# create collection
collection_w = self.init_collection_wrap()
# create partition
partition_name = cf.gen_unique_str(prefix)
partition_w = self.init_partition_wrap(collection_w, partition_name)
assert collection_w.has_partition(partition_name)[0]
# insert data to partition
data = cf.gen_default_list_data()
partition_w.insert(data)
assert partition_w.num_entities == len(data[0])
assert collection_w.num_entities == len(data[0])
# load partition
partition_w.load()
# search of partition
search_vectors = cf.gen_vectors(1, ct.default_dim)
res_1, _ = partition_w.search(data=search_vectors,
anns_field=ct.default_float_vec_field_name,
params={"nprobe": 32}, limit=1)
assert len(res_1) == 1
# release collection
collection_w.release()
# search of partition
res_2, _ = partition_w.search(data=search_vectors,
anns_field=ct.default_float_vec_field_name,
params={"nprobe": 32}, limit=1,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0,
ct.err_msg: "not loaded into memory"})
# release partition
partition_w.release()
@pytest.mark.tags(CaseLabel.L1)
def test_partition_insert_default_partition(self):
"""
target: verify insert data into _default partition
method: 1. create a collection
2. insert some data into _default partition
expected: insert successfully
"""
# create collection
collection_w = self.init_collection_wrap()
# get the default partition
partition_name = ct.default_partition_name
assert collection_w.has_partition(partition_name)[0]
partition_w = self.init_partition_wrap(collection_w, partition_name)
# insert data to partition
data = cf.gen_default_dataframe_data()
partition_w.insert(data)
# self._connect().flush([collection_w.name])
assert partition_w.num_entities == len(data)
@pytest.mark.tags(CaseLabel.L1)
def test_partition_insert_dropped_partition(self):
"""
target: verify insert data into a dropped partition
method: 1. create a collection
2. insert some data into a dropped partition
expected: raise exception
"""
# create partition
partition_w = self.init_partition_wrap()
# drop partition
partition_w.drop()
# insert data to partition
partition_w.insert(cf.gen_default_dataframe_data(),
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "Partition not exist"})
# TODO: update the assert error
@pytest.mark.tags(CaseLabel.L1)
def test_partition_insert_dropped_collection(self):
"""
target: verify insert data into a dropped collection
method: 1. create a collection
2. insert some data into a dropped collection
expected: raise exception
"""
# create collection
collection_w = self.init_collection_wrap()
# create partition
partition_name = cf.gen_unique_str(prefix)
partition_w = self.init_partition_wrap(collection_w, partition_name)
assert collection_w.has_partition(partition_name)[0]
# drop collection
collection_w.drop()
# insert data to partition
partition_w.insert(cf.gen_default_dataframe_data(),
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "None Type"})
@pytest.mark.tags(CaseLabel.L2)
def test_partition_insert_maximum_size_data(self):
"""
target: verify insert maximum size data(256M?) a time
method: 1. create a partition
2. insert maximum size data
expected: insert successfully
"""
# create collection
collection_w = self.init_collection_wrap()
# create partition
partition_w = self.init_partition_wrap(collection_w)
# insert data to partition
max_size = 100000 # TODO: clarify the max size of data
ins_res, _ = partition_w.insert(cf.gen_default_dataframe_data(max_size), timeout=40)
assert len(ins_res.primary_keys) == max_size
# self._connect().flush([collection_w.name])
assert partition_w.num_entities == max_size
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("dim", [ct.default_dim - 1, ct.default_dim + 1])
def test_partition_insert_mismatched_dimensions(self, dim):
"""
target: verify insert maximum size data(256M?) a time
method: 1. create a collection with default dim
2. insert dismatch dim data
expected: raise exception
"""
# create partition
partition_w = self.init_partition_wrap()
data = cf.gen_default_list_data(nb=10, dim=dim)
# insert data to partition
partition_w.insert(data, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "but entities field dim"})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("sync", [True, False])
def test_partition_insert_sync(self, sync):
"""
target: verify insert sync
method: 1. create a partition
2. insert data in sync
expected: insert successfully
"""
pass