fix: pattern match use incorrect raw data (#30764)

issue: https://github.com/milvus-io/milvus/issues/30687
We store all the varchar datas in an continuous address and use
string_view to quickly find them. In this case, using string_view.data()
directly will point to all rest varchar datas.

---------

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
Jiquan Long 2024-02-22 19:56:52 +08:00 committed by GitHub
parent 375190e76e
commit e2330f02f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 87 additions and 66 deletions

View File

@ -44,7 +44,8 @@ struct UnaryElementFuncForMatch {
// translate the pattern match in advance, which avoid computing it every loop.
std::regex reg(TranslatePatternMatchToRegex(val));
for (int i = 0; i < size; ++i) {
res[i] = std::regex_match(src[i].data(), reg);
res[i] =
std::regex_match(std::begin(src[i]), std::end(src[i]), reg);
}
} else if constexpr (std::is_same_v<T, std::string>) {
// translate the pattern match in advance, which avoid computing it every loop.

View File

@ -48,26 +48,34 @@ prepareSegmentSystemFieldData(const std::unique_ptr<SegmentSealed>& segment,
segment->LoadFieldData(TimestampFieldID, field_data_info);
}
int GetSearchResultBound(const SearchResult& search_result){
int
GetSearchResultBound(const SearchResult& search_result) {
int i = 0;
for(; i < search_result.seg_offsets_.size(); i++){
if(search_result.seg_offsets_[i]==INVALID_SEG_OFFSET) break;
for (; i < search_result.seg_offsets_.size(); i++) {
if (search_result.seg_offsets_[i] == INVALID_SEG_OFFSET)
break;
}
return i - 1;
}
void CheckGroupBySearchResult(const SearchResult& search_result, int topK, int nq, bool strict){
void
CheckGroupBySearchResult(const SearchResult& search_result,
int topK,
int nq,
bool strict) {
int total = topK * nq;
ASSERT_EQ(search_result.group_by_values_.size(), total);
ASSERT_EQ(search_result.seg_offsets_.size(), total);
ASSERT_EQ(search_result.distances_.size(), total);
ASSERT_TRUE(search_result.seg_offsets_[0]!=INVALID_SEG_OFFSET);
ASSERT_TRUE(search_result.seg_offsets_[0] != INVALID_SEG_OFFSET);
int res_bound = GetSearchResultBound(search_result);
ASSERT_TRUE(res_bound>0);
if(strict){
ASSERT_TRUE(res_bound==total-1);
ASSERT_TRUE(res_bound > 0);
if (strict) {
ASSERT_TRUE(res_bound == total - 1);
} else {
ASSERT_TRUE(res_bound==total-1||search_result.seg_offsets_[res_bound+1]==INVALID_SEG_OFFSET);
ASSERT_TRUE(res_bound == total - 1 ||
search_result.seg_offsets_[res_bound + 1] ==
INVALID_SEG_OFFSET);
}
}
@ -414,7 +422,7 @@ TEST(GroupBY, SealedIndex) {
}
}
TEST(GroupBY, SealedData){
TEST(GroupBY, SealedData) {
using namespace milvus;
using namespace milvus::query;
using namespace milvus::segcore;
@ -423,7 +431,7 @@ TEST(GroupBY, SealedData){
int dim = 64;
auto schema = std::make_shared<Schema>();
auto vec_fid = schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, dim, knowhere::metric::L2);
"fakevec", DataType::VECTOR_FLOAT, dim, knowhere::metric::L2);
auto int8_fid = schema->AddDebugField("int8", DataType::INT8);
auto int16_fid = schema->AddDebugField("int16", DataType::INT16);
auto int32_fid = schema->AddDebugField("int32", DataType::INT32);
@ -443,7 +451,7 @@ TEST(GroupBY, SealedData){
auto info = FieldDataInfo(field_data.field_id(), N);
auto field_meta = fields.at(FieldId(field_id));
info.channel->push(
CreateFieldDataFromDataArray(N, &field_data, field_meta));
CreateFieldDataFromDataArray(N, &field_data, field_meta));
info.channel->close();
segment->LoadFieldData(FieldId(field_id), info);
@ -466,14 +474,14 @@ TEST(GroupBY, SealedData){
>)";
auto plan_str = translate_text_plan_to_binary_plan(raw_plan);
auto plan =
CreateSearchPlanByExpr(*schema, plan_str.data(), plan_str.size());
CreateSearchPlanByExpr(*schema, plan_str.data(), plan_str.size());
auto num_queries = 1;
auto seed = 1024;
auto ph_group_raw = CreatePlaceholderGroup(num_queries, dim, seed);
auto ph_group =
ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
auto search_result =
segment->Search(plan.get(), ph_group.get(), 1L << 63);
segment->Search(plan.get(), ph_group.get(), 1L << 63);
CheckGroupBySearchResult(*search_result, topK, num_queries, false);
auto& group_by_values = search_result->group_by_values_;
@ -488,8 +496,8 @@ TEST(GroupBY, SealedData){
i8_set.insert(g_val);
auto distance = search_result->distances_.at(i);
ASSERT_TRUE(
lastDistance <=
distance); //distance should be decreased as metrics_type is L2
lastDistance <=
distance); //distance should be decreased as metrics_type is L2
lastDistance = distance;
} else {
//check padding
@ -626,7 +634,7 @@ TEST(GroupBY, Reduce) {
DeleteSegment(c_segment_2);
}
TEST(GroupBY, GrowingRawData){
TEST(GroupBY, GrowingRawData) {
//0. set up growing segment
int dim = 128;
uint64_t seed = 512;
@ -635,27 +643,30 @@ TEST(GroupBY, GrowingRawData){
auto int64_field_id = schema->AddDebugField("int64", DataType::INT64);
auto int32_field_id = schema->AddDebugField("int32", DataType::INT32);
auto vec_field_id = schema->AddDebugField(
"embeddings", DataType::VECTOR_FLOAT, 128, metric_type);
"embeddings", DataType::VECTOR_FLOAT, 128, metric_type);
schema->set_primary_field_id(int64_field_id);
auto config = SegcoreConfig::default_config();
config.set_chunk_rows(128);
config.set_enable_interim_segment_index(false);//no growing index, test brute force
config.set_enable_interim_segment_index(
false); //no growing index, test brute force
auto segment_growing = CreateGrowingSegment(schema, nullptr, 1, config);
auto segment_growing_impl = dynamic_cast<SegmentGrowingImpl*>(segment_growing.get());
auto segment_growing_impl =
dynamic_cast<SegmentGrowingImpl*>(segment_growing.get());
//1. prepare raw data in growing segment
int64_t rows_per_batch = 512;
int n_batch = 3;
for(int i = 0; i < n_batch; i++){
auto data_set = DataGen(schema, rows_per_batch);
auto offset = segment_growing_impl->PreInsert(rows_per_batch);
segment_growing_impl->Insert(offset, rows_per_batch, data_set.row_ids_.data(),
for (int i = 0; i < n_batch; i++) {
auto data_set = DataGen(schema, rows_per_batch);
auto offset = segment_growing_impl->PreInsert(rows_per_batch);
segment_growing_impl->Insert(offset,
rows_per_batch,
data_set.row_ids_.data(),
data_set.timestamps_.data(),
data_set.raw_);
}
//2. Search group by
const char* raw_plan = R"(vector_anns: <
field_id: 102
@ -669,30 +680,32 @@ TEST(GroupBY, GrowingRawData){
>)";
auto plan_str = translate_text_plan_to_binary_plan(raw_plan);
auto plan = CreateSearchPlanByExpr(*schema, plan_str.data(), plan_str.size());
auto plan =
CreateSearchPlanByExpr(*schema, plan_str.data(), plan_str.size());
auto num_queries = 10;
auto topK = 100;
auto ph_group_raw = CreatePlaceholderGroup(num_queries, dim, seed);
auto ph_group =
ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
auto search_result =
segment_growing_impl->Search(plan.get(), ph_group.get(), 1L << 63);
segment_growing_impl->Search(plan.get(), ph_group.get(), 1L << 63);
CheckGroupBySearchResult(*search_result, topK, num_queries, true);
auto& group_by_values = search_result->group_by_values_;
int idx = 0;
for (int i = 0; i < num_queries; i++){
for (int i = 0; i < num_queries; i++) {
std::unordered_set<int32_t> i32_set;
float lastDistance = 0.0;
for (int j = 0; j < topK; j++){
for (int j = 0; j < topK; j++) {
if (std::holds_alternative<int32_t>(group_by_values[idx])) {
int32_t g_val = std::get<int32_t>(group_by_values[idx]);
ASSERT_FALSE(i32_set.count(g_val) >0); //no repetition on groupBy field
ASSERT_FALSE(i32_set.count(g_val) >
0); //no repetition on groupBy field
i32_set.insert(g_val);
auto distance = search_result->distances_.at(idx);
ASSERT_TRUE(
lastDistance <=
distance); //distance should be decreased as metrics_type is L2
lastDistance <=
distance); //distance should be decreased as metrics_type is L2
lastDistance = distance;
} else {
//check padding
@ -704,7 +717,7 @@ TEST(GroupBY, GrowingRawData){
}
}
TEST(GroupBY, GrowingIndex){
TEST(GroupBY, GrowingIndex) {
//0. set up growing segment
int dim = 128;
uint64_t seed = 512;
@ -713,39 +726,43 @@ TEST(GroupBY, GrowingIndex){
auto int64_field_id = schema->AddDebugField("int64", DataType::INT64);
auto int32_field_id = schema->AddDebugField("int32", DataType::INT32);
auto vec_field_id = schema->AddDebugField(
"embeddings", DataType::VECTOR_FLOAT, 128, metric_type);
"embeddings", DataType::VECTOR_FLOAT, 128, metric_type);
schema->set_primary_field_id(int64_field_id);
std::map<std::string, std::string> index_params = {
{"index_type", "IVF_FLAT"},
{"metric_type", metric_type},
{"nlist", "128"}};
{"index_type", "IVF_FLAT"},
{"metric_type", metric_type},
{"nlist", "128"}};
std::map<std::string, std::string> type_params = {{"dim", "128"}};
FieldIndexMeta fieldIndexMeta(
vec_field_id, std::move(index_params), std::move(type_params));
std::map<FieldId, FieldIndexMeta> fieldMap = {{vec_field_id, fieldIndexMeta}};
vec_field_id, std::move(index_params), std::move(type_params));
std::map<FieldId, FieldIndexMeta> fieldMap = {
{vec_field_id, fieldIndexMeta}};
IndexMetaPtr metaPtr =
std::make_shared<CollectionIndexMeta>(10000, std::move(fieldMap));
std::make_shared<CollectionIndexMeta>(10000, std::move(fieldMap));
auto config = SegcoreConfig::default_config();
config.set_chunk_rows(128);
config.set_enable_interim_segment_index(true);//no growing index, test growing inter index
config.set_enable_interim_segment_index(
true); //no growing index, test growing inter index
config.set_nlist(128);
auto segment_growing = CreateGrowingSegment(schema, metaPtr, 1, config);
auto segment_growing_impl = dynamic_cast<SegmentGrowingImpl*>(segment_growing.get());
auto segment_growing_impl =
dynamic_cast<SegmentGrowingImpl*>(segment_growing.get());
//1. prepare raw data in growing segment
int64_t rows_per_batch = 1024;
int n_batch = 10;
for(int i = 0; i < n_batch; i++){
for (int i = 0; i < n_batch; i++) {
auto data_set = DataGen(schema, rows_per_batch);
auto offset = segment_growing_impl->PreInsert(rows_per_batch);
segment_growing_impl->Insert(offset, rows_per_batch, data_set.row_ids_.data(),
segment_growing_impl->Insert(offset,
rows_per_batch,
data_set.row_ids_.data(),
data_set.timestamps_.data(),
data_set.raw_);
}
//2. Search group by int32
const char* raw_plan = R"(vector_anns: <
field_id: 102
@ -759,30 +776,32 @@ TEST(GroupBY, GrowingIndex){
>)";
auto plan_str = translate_text_plan_to_binary_plan(raw_plan);
auto plan = CreateSearchPlanByExpr(*schema, plan_str.data(), plan_str.size());
auto plan =
CreateSearchPlanByExpr(*schema, plan_str.data(), plan_str.size());
auto num_queries = 10;
auto topK = 100;
auto ph_group_raw = CreatePlaceholderGroup(num_queries, dim, seed);
auto ph_group =
ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
auto search_result =
segment_growing_impl->Search(plan.get(), ph_group.get(), 1L << 63);
segment_growing_impl->Search(plan.get(), ph_group.get(), 1L << 63);
CheckGroupBySearchResult(*search_result, topK, num_queries, true);
auto& group_by_values = search_result->group_by_values_;
int idx = 0;
for (int i = 0; i < num_queries; i++){
for (int i = 0; i < num_queries; i++) {
std::unordered_set<int32_t> i32_set;
float lastDistance = 0.0;
for (int j = 0; j < topK; j++){
for (int j = 0; j < topK; j++) {
if (std::holds_alternative<int32_t>(group_by_values[idx])) {
int32_t g_val = std::get<int32_t>(group_by_values[idx]);
ASSERT_FALSE(i32_set.count(g_val) >0); //no repetition on groupBy field
ASSERT_FALSE(i32_set.count(g_val) >
0); //no repetition on groupBy field
i32_set.insert(g_val);
auto distance = search_result->distances_.at(idx);
ASSERT_TRUE(
lastDistance <=
distance); //distance should be decreased as metrics_type is L2
lastDistance <=
distance); //distance should be decreased as metrics_type is L2
lastDistance = distance;
} else {
//check padding

View File

@ -410,19 +410,19 @@ TEST(Indexing, Iterator) {
create_index_info.metric_type = knowhere::metric::L2;
create_index_info.index_type = knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC;
create_index_info.index_engine_version =
knowhere::Version::GetCurrentVersion().VersionNumber();
knowhere::Version::GetCurrentVersion().VersionNumber();
auto index = milvus::index::IndexFactory::GetInstance().CreateIndex(
create_index_info, milvus::storage::FileManagerContext());
create_index_info, milvus::storage::FileManagerContext());
auto build_conf = knowhere::Json{
{knowhere::meta::METRIC_TYPE, knowhere::metric::L2},
{knowhere::meta::DIM, std::to_string(dim)},
{knowhere::indexparam::NLIST, "128"},
{knowhere::meta::METRIC_TYPE, knowhere::metric::L2},
{knowhere::meta::DIM, std::to_string(dim)},
{knowhere::indexparam::NLIST, "128"},
};
auto search_conf = knowhere::Json{
{knowhere::meta::METRIC_TYPE, knowhere::metric::L2},
{knowhere::indexparam::NPROBE, 4},
{knowhere::meta::METRIC_TYPE, knowhere::metric::L2},
{knowhere::indexparam::NPROBE, 4},
};
std::vector<knowhere::DataSetPtr> datasets;
@ -452,13 +452,14 @@ TEST(Indexing, Iterator) {
searchInfo.search_params_ = search_conf;
auto vec_index = dynamic_cast<index::VectorIndex*>(index.get());
knowhere::expected<std::vector<std::shared_ptr<knowhere::IndexNode::iterator>>>
kw_iterators = vec_index->VectorIterators(query_ds, searchInfo, view);
knowhere::expected<
std::vector<std::shared_ptr<knowhere::IndexNode::iterator>>>
kw_iterators = vec_index->VectorIterators(query_ds, searchInfo, view);
ASSERT_TRUE(kw_iterators.has_value());
ASSERT_EQ(kw_iterators.value().size(), 1);
auto iterator = kw_iterators.value()[0];
ASSERT_TRUE(iterator->HasNext());
while(iterator->HasNext()){
while (iterator->HasNext()) {
auto [off, dis] = iterator->Next();
ASSERT_TRUE(off >= 0);
ASSERT_TRUE(dis >= 0);