test: add testcases contain growing segments (#37262)
Some checks failed
Code Checker / Code Checker AMD64 Ubuntu 22.04 (push) Waiting to run
Code Checker / Code Checker Amazonlinux 2023 (push) Waiting to run
Code Checker / Code Checker rockylinux8 (push) Waiting to run
Mac Code Checker / Code Checker MacOS 13 (push) Waiting to run
Build and test / Build and test AMD64 Ubuntu 22.04 (push) Waiting to run
Build and test / UT for Cpp (push) Blocked by required conditions
Build and test / UT for Go (push) Blocked by required conditions
Build and test / Integration Test (push) Blocked by required conditions
Build and test / Upload Code Coverage (push) Blocked by required conditions
all-contributors / contributor (push) Has been cancelled
Update Knowhere Commit / update-knowhere-commit (push) Has been cancelled

/kind improvement

---------

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2024-11-25 10:40:34 +08:00 committed by GitHub
parent fbb68ca370
commit 0b9edb62a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 326 additions and 0 deletions

View File

@ -2506,6 +2506,199 @@ class TestSearchWithFullTextSearch(TestcaseBase):
assert len( assert len(
overlap) > 0, f"query text: {search_text}, \ntext: {result_text} \n overlap: {overlap} \n word freq a: {word_freq_a} \n word freq b: {word_freq_b}\n result: {r}" overlap) > 0, f"query text: {search_text}, \ntext: {result_text} \n overlap: {overlap} \n word freq a: {word_freq_a} \n word freq b: {word_freq_b}\n result: {r}"
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.parametrize("nq", [2])
@pytest.mark.parametrize("empty_percent", [0.5])
@pytest.mark.parametrize("enable_partition_key", [True])
@pytest.mark.parametrize("enable_inverted_index", [True])
@pytest.mark.parametrize("index_type", ["SPARSE_INVERTED_INDEX"])
@pytest.mark.parametrize("expr", ["id_range"])
@pytest.mark.parametrize("tokenizer", ["standard"])
@pytest.mark.parametrize("offset", [0])
def test_full_text_search_for_growing_segment(
self, offset, tokenizer, expr, enable_inverted_index, enable_partition_key, empty_percent, index_type, nq
):
"""
target: test full text search
method: 1. enable full text search and insert data with varchar
2. search with text
3. verify the result
expected: full text search successfully and result is correct
"""
analyzer_params = {
"tokenizer": tokenizer,
}
dim = 128
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(
name="word",
dtype=DataType.VARCHAR,
max_length=65535,
enable_analyzer=True,
analyzer_params=analyzer_params,
is_partition_key=enable_partition_key,
),
FieldSchema(
name="sentence",
dtype=DataType.VARCHAR,
max_length=65535,
enable_analyzer=True,
analyzer_params=analyzer_params,
),
FieldSchema(
name="paragraph",
dtype=DataType.VARCHAR,
max_length=65535,
enable_analyzer=True,
analyzer_params=analyzer_params,
),
FieldSchema(
name="text",
dtype=DataType.VARCHAR,
max_length=65535,
enable_analyzer=True,
enable_match=True,
analyzer_params=analyzer_params,
),
FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim),
FieldSchema(name="text_sparse_emb", dtype=DataType.SPARSE_FLOAT_VECTOR),
]
schema = CollectionSchema(fields=fields, description="test collection")
bm25_function = Function(
name="text_bm25_emb",
function_type=FunctionType.BM25,
input_field_names=["text"],
output_field_names=["text_sparse_emb"],
params={},
)
schema.add_function(bm25_function)
data_size = 5000
collection_w = self.init_collection_wrap(
name=cf.gen_unique_str(prefix), schema=schema
)
fake = fake_en
if tokenizer == "jieba":
language = "zh"
fake = fake_zh
else:
language = "en"
data = [
{
"id": i,
"word": fake.word().lower() if random.random() >= empty_percent else "",
"sentence": fake.sentence().lower() if random.random() >= empty_percent else "",
"paragraph": fake.paragraph().lower() if random.random() >= empty_percent else "",
"text": fake.text().lower() if random.random() >= empty_percent else "",
"emb": [random.random() for _ in range(dim)],
}
for i in range(data_size)
]
df = pd.DataFrame(data)
log.info(f"dataframe\n{df}")
texts = df["text"].to_list()
word_freq = cf.analyze_documents(texts, language=language)
most_freq_word = word_freq.most_common(10)
tokens = [item[0] for item in most_freq_word]
if len(tokens) == 0:
log.info(f"empty tokens, add a dummy token")
tokens = ["dummy"]
collection_w.create_index(
"emb",
{"index_type": "HNSW", "metric_type": "L2", "params": {"M": 16, "efConstruction": 500}},
)
collection_w.create_index(
"text_sparse_emb",
{
"index_type": index_type,
"metric_type": "BM25",
"params": {
"bm25_k1": 1.5,
"bm25_b": 0.75,
}
}
)
if enable_inverted_index:
collection_w.create_index("text", {"index_type": "INVERTED"})
collection_w.load()
batch_size = 5000
for i in range(0, len(df), batch_size):
collection_w.insert(
data[i: i + batch_size]
if i + batch_size < len(df)
else data[i: len(df)]
)
limit = 100
search_data = [fake.text().lower() + " " + random.choice(tokens) for _ in range(nq)]
if expr == "text_match":
filter = f"TextMatch(text, '{tokens[0]}')"
res, _ = collection_w.query(
expr=filter,
)
elif expr == "id_range":
filter = f"id < {data_size // 2}"
else:
filter = ""
res, _ = collection_w.query(
expr=filter,
limit=limit,
)
candidates_num = len(res)
log.info(f"search data: {search_data}")
# use offset = 0 to get all the results
full_res_list, _ = collection_w.search(
data=search_data,
anns_field="text_sparse_emb",
expr=filter,
param={},
limit=limit + offset,
offset=0,
output_fields=["id", "text", "text_sparse_emb"])
full_res_id_list = []
for i in range(nq):
res = full_res_list[i]
tmp = []
for r in res:
tmp.append(r.id)
full_res_id_list.append(tmp)
res_list, _ = collection_w.search(
data=search_data,
anns_field="text_sparse_emb",
expr=filter,
param={},
limit=limit,
offset=offset,
output_fields=["id", "text", "text_sparse_emb"])
# verify correctness
for i in range(nq):
assert 0 < len(res_list[i]) <= min(limit, candidates_num)
search_text = search_data[i]
log.info(f"res: {res_list[i]}")
res = res_list[i]
for j in range(len(res)):
r = res[j]
_id = r.id
# get the first id of the result in which position is larger than offset
if j == 0:
first_id = _id
p = full_res_id_list[i].index(first_id)
assert 1.2 * offset >= p >= offset * 0.8
result_text = r.text
# verify search result satisfies the filter
if expr == "text_match":
assert tokens[0] in result_text
if expr == "id_range":
assert _id < data_size // 2
# verify search result has overlap with search text
overlap, word_freq_a, word_freq_b = cf.check_token_overlap(search_text, result_text, language=language)
log.info(f"overlap {overlap}")
assert len(
overlap) > 0, f"query text: {search_text}, \ntext: {result_text} \n overlap: {overlap} \n word freq a: {word_freq_a} \n word freq b: {word_freq_b}\n result: {r}"
@pytest.mark.tags(CaseLabel.L1) @pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("nq", [2]) @pytest.mark.parametrize("nq", [2])
@pytest.mark.parametrize("empty_percent", [0]) @pytest.mark.parametrize("empty_percent", [0])

View File

@ -4766,6 +4766,139 @@ class TestQueryTextMatch(TestcaseBase):
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.parametrize("enable_partition_key", [True])
@pytest.mark.parametrize("enable_inverted_index", [True])
@pytest.mark.parametrize("tokenizer", ["jieba", "standard"])
@pytest.mark.xfail(reason="unstable case, issue: https://github.com/milvus-io/milvus/issues/36962")
def test_query_text_match_with_growing_segment(
self, tokenizer, enable_inverted_index, enable_partition_key
):
"""
target: test text match normal
method: 1. enable text match and insert data with varchar
2. get the most common words and query with text match
3. verify the result
expected: text match successfully and result is correct
"""
tokenizer_params = {
"tokenizer": tokenizer,
}
dim = 128
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(
name="word",
dtype=DataType.VARCHAR,
max_length=65535,
enable_tokenizer=True,
enable_match=True,
is_partition_key=enable_partition_key,
tokenizer_params=tokenizer_params,
),
FieldSchema(
name="sentence",
dtype=DataType.VARCHAR,
max_length=65535,
enable_tokenizer=True,
enable_match=True,
tokenizer_params=tokenizer_params,
),
FieldSchema(
name="paragraph",
dtype=DataType.VARCHAR,
max_length=65535,
enable_tokenizer=True,
enable_match=True,
tokenizer_params=tokenizer_params,
),
FieldSchema(
name="text",
dtype=DataType.VARCHAR,
max_length=65535,
enable_tokenizer=True,
enable_match=True,
tokenizer_params=tokenizer_params,
),
FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim),
]
schema = CollectionSchema(fields=fields, description="test collection")
data_size = 3000
collection_w = self.init_collection_wrap(
name=cf.gen_unique_str(prefix), schema=schema
)
fake = fake_en
if tokenizer == "jieba":
language = "zh"
fake = fake_zh
else:
language = "en"
collection_w.create_index(
"emb",
{"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}},
)
if enable_inverted_index:
collection_w.create_index("word", {"index_type": "INVERTED"})
collection_w.load()
# generate growing segment
data = [
{
"id": i,
"word": fake.word().lower(),
"sentence": fake.sentence().lower(),
"paragraph": fake.paragraph().lower(),
"text": fake.text().lower(),
"emb": [random.random() for _ in range(dim)],
}
for i in range(data_size)
]
df = pd.DataFrame(data)
log.info(f"dataframe\n{df}")
batch_size = 5000
for i in range(0, len(df), batch_size):
collection_w.insert(
data[i: i + batch_size]
if i + batch_size < len(df)
else data[i: len(df)]
)
# analyze the croup
text_fields = ["word", "sentence", "paragraph", "text"]
wf_map = {}
for field in text_fields:
wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language)
# query single field for one token
for field in text_fields:
token = wf_map[field].most_common()[0][0]
expr = f"TextMatch({field}, '{token}')"
log.info(f"expr: {expr}")
res, _ = collection_w.query(expr=expr, output_fields=["id", field])
assert len(res) > 0
log.info(f"res len {len(res)}")
for r in res:
assert token in r[field]
# verify inverted index
if enable_inverted_index:
if field == "word":
expr = f"{field} == '{token}'"
log.info(f"expr: {expr}")
res, _ = collection_w.query(expr=expr, output_fields=["id", field])
log.info(f"res len {len(res)}")
for r in res:
assert r[field] == token
# query single field for multi-word
for field in text_fields:
# match top 10 most common words
top_10_tokens = []
for word, count in wf_map[field].most_common(10):
top_10_tokens.append(word)
string_of_top_10_words = " ".join(top_10_tokens)
expr = f"TextMatch({field}, '{string_of_top_10_words}')"
log.info(f"expr {expr}")
res, _ = collection_w.query(expr=expr, output_fields=["id", field])
log.info(f"res len {len(res)}")
for r in res:
assert any([token in r[field] for token in top_10_tokens])
@pytest.mark.skip("unimplemented") @pytest.mark.skip("unimplemented")
@pytest.mark.tags(CaseLabel.L0) @pytest.mark.tags(CaseLabel.L0)
def test_query_text_match_custom_analyzer(self): def test_query_text_match_custom_analyzer(self):