fix: non-init seg_offset for growing raw-data when doing groupby (#34748)

related:  #34713

Signed-off-by: MrPresent-Han <chun.han@gmail.com>
Co-authored-by: MrPresent-Han <chun.han@gmail.com>
This commit is contained in:
Chun Han 2024-07-19 17:01:40 +08:00 committed by GitHub
parent e4e18cb8c3
commit ed057e6fce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 89 additions and 86 deletions

View File

@ -106,8 +106,11 @@ struct VectorIterator {
int idx = 0;
for (auto& iter : iterators_) {
if (iter->HasNext()) {
auto origin_pair = iter->Next();
origin_pair.first = convert_to_segment_offset(
origin_pair.first, idx);
auto off_dis_pair =
std::make_shared<OffsetDisPair>(iter->Next(), idx++);
std::make_shared<OffsetDisPair>(origin_pair, idx++);
heap_.push(off_dis_pair);
}
}

View File

@ -650,91 +650,91 @@ TEST(GroupBY, Reduce) {
DeleteSegment(c_segment_2);
}
//TEST(GroupBY, GrowingRawData) {
// //0. set up growing segment
// int dim = 128;
// uint64_t seed = 512;
// auto schema = std::make_shared<Schema>();
// auto metric_type = knowhere::metric::L2;
// auto int64_field_id = schema->AddDebugField("int64", DataType::INT64);
// auto int32_field_id = schema->AddDebugField("int32", DataType::INT32);
// auto vec_field_id = schema->AddDebugField(
// "embeddings", DataType::VECTOR_FLOAT, 128, metric_type);
// schema->set_primary_field_id(int64_field_id);
//
// auto config = SegcoreConfig::default_config();
// config.set_chunk_rows(128);
// config.set_enable_interim_segment_index(
// false); //no growing index, test brute force
// auto segment_growing = CreateGrowingSegment(schema, nullptr, 1, config);
// auto segment_growing_impl =
// dynamic_cast<SegmentGrowingImpl*>(segment_growing.get());
//
// //1. prepare raw data in growing segment
// int64_t rows_per_batch = 512;
// int n_batch = 3;
// for (int i = 0; i < n_batch; i++) {
// auto data_set =
// DataGen(schema, rows_per_batch, 42, 0, 8, 10, false, false);
// auto offset = segment_growing_impl->PreInsert(rows_per_batch);
// segment_growing_impl->Insert(offset,
// rows_per_batch,
// data_set.row_ids_.data(),
// data_set.timestamps_.data(),
// data_set.raw_);
// }
//
// //2. Search group by
// auto num_queries = 10;
// auto topK = 100;
// int group_size = 1;
// const char* raw_plan = R"(vector_anns: <
// field_id: 102
// query_info: <
// topk: 100
// metric_type: "L2"
// search_params: "{\"ef\": 10}"
// group_by_field_id: 101
// group_size: 1
// >
// placeholder_tag: "$0"
//
// >)";
// auto plan_str = translate_text_plan_to_binary_plan(raw_plan);
// auto plan =
// CreateSearchPlanByExpr(*schema, plan_str.data(), plan_str.size());
// auto ph_group_raw = CreatePlaceholderGroup(num_queries, dim, seed);
// auto ph_group =
// ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
// auto search_result =
// segment_growing_impl->Search(plan.get(), ph_group.get(), 1L << 63);
// CheckGroupBySearchResult(*search_result, topK, num_queries, true);
//
// auto& group_by_values = search_result->group_by_values_.value();
// int size = group_by_values.size();
// ASSERT_EQ(size, 640);
// //as the number of data is 512 and repeated count is 8, the group number is 64 for every query
// //and the total group number should be 640
// int expected_group_count = 64;
// int idx = 0;
// for (int i = 0; i < num_queries; i++) {
// std::unordered_set<int32_t> i32_set;
// float lastDistance = 0.0;
// for (int j = 0; j < expected_group_count; j++) {
// if (std::holds_alternative<int32_t>(group_by_values[idx])) {
// int32_t g_val = std::get<int32_t>(group_by_values[idx]);
// ASSERT_FALSE(
// i32_set.count(g_val) >
// 0); //as the group_size is 1, there should not be any duplication for group_by value
// i32_set.insert(g_val);
// auto distance = search_result->distances_.at(idx);
// ASSERT_TRUE(lastDistance <= distance);
// lastDistance = distance;
// }
// idx++;
// }
// }
//}
TEST(GroupBY, GrowingRawData) {
//0. set up growing segment
int dim = 128;
uint64_t seed = 512;
auto schema = std::make_shared<Schema>();
auto metric_type = knowhere::metric::L2;
auto int64_field_id = schema->AddDebugField("int64", DataType::INT64);
auto int32_field_id = schema->AddDebugField("int32", DataType::INT32);
auto vec_field_id = schema->AddDebugField(
"embeddings", DataType::VECTOR_FLOAT, 128, metric_type);
schema->set_primary_field_id(int64_field_id);
auto config = SegcoreConfig::default_config();
config.set_chunk_rows(128);
config.set_enable_interim_segment_index(
false); //no growing index, test brute force
auto segment_growing = CreateGrowingSegment(schema, nullptr, 1, config);
auto segment_growing_impl =
dynamic_cast<SegmentGrowingImpl*>(segment_growing.get());
//1. prepare raw data in growing segment
int64_t rows_per_batch = 512;
int n_batch = 3;
for (int i = 0; i < n_batch; i++) {
auto data_set =
DataGen(schema, rows_per_batch, 42, 0, 8, 10, false, false);
auto offset = segment_growing_impl->PreInsert(rows_per_batch);
segment_growing_impl->Insert(offset,
rows_per_batch,
data_set.row_ids_.data(),
data_set.timestamps_.data(),
data_set.raw_);
}
//2. Search group by
auto num_queries = 10;
auto topK = 100;
int group_size = 1;
const char* raw_plan = R"(vector_anns: <
field_id: 102
query_info: <
topk: 100
metric_type: "L2"
search_params: "{\"ef\": 10}"
group_by_field_id: 101
group_size: 1
>
placeholder_tag: "$0"
>)";
auto plan_str = translate_text_plan_to_binary_plan(raw_plan);
auto plan =
CreateSearchPlanByExpr(*schema, plan_str.data(), plan_str.size());
auto ph_group_raw = CreatePlaceholderGroup(num_queries, dim, seed);
auto ph_group =
ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
auto search_result =
segment_growing_impl->Search(plan.get(), ph_group.get(), 1L << 63);
CheckGroupBySearchResult(*search_result, topK, num_queries, true);
auto& group_by_values = search_result->group_by_values_.value();
int size = group_by_values.size();
ASSERT_EQ(size, 640);
//as the number of data is 512 and repeated count is 8, the group number is 64 for every query
//and the total group number should be 640
int expected_group_count = 64;
int idx = 0;
for (int i = 0; i < num_queries; i++) {
std::unordered_set<int32_t> i32_set;
float lastDistance = 0.0;
for (int j = 0; j < expected_group_count; j++) {
if (std::holds_alternative<int32_t>(group_by_values[idx])) {
int32_t g_val = std::get<int32_t>(group_by_values[idx]);
ASSERT_FALSE(
i32_set.count(g_val) >
0); //as the group_size is 1, there should not be any duplication for group_by value
i32_set.insert(g_val);
auto distance = search_result->distances_.at(idx);
ASSERT_TRUE(lastDistance <= distance);
lastDistance = distance;
}
idx++;
}
}
}
TEST(GroupBY, GrowingIndex) {
//0. set up growing segment