enhance:limit binlog index rows num (#30173)

issue: https://github.com/milvus-io/milvus/issues/27678
also relate issue: https://github.com/milvus-io/milvus/issues/30065

Signed-off-by: cqy123456 <qianya.cheng@zilliz.com>
This commit is contained in:
cqy123456 2024-01-29 19:49:02 +08:00 committed by GitHub
parent f92edc6cc5
commit 74cfba0249
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1511,33 +1511,45 @@ SegmentSealedImpl::mask_with_timestamps(BitsetType& bitset_chunk,
bool bool
SegmentSealedImpl::generate_binlog_index(const FieldId field_id) { SegmentSealedImpl::generate_binlog_index(const FieldId field_id) {
if (col_index_meta_ == nullptr) if (col_index_meta_ == nullptr || !col_index_meta_->HasFiled(field_id)) {
return false;
auto& field_meta = schema_->operator[](field_id);
if (field_meta.is_vector() &&
field_meta.get_data_type() == DataType::VECTOR_FLOAT &&
segcore_config_.get_enable_interim_segment_index()) {
try {
auto& field_index_meta =
col_index_meta_->GetFieldIndexMeta(field_id);
auto& index_params = field_index_meta.GetIndexParams();
if (index_params.find(knowhere::meta::INDEX_TYPE) ==
index_params.end() ||
index_params.at(knowhere::meta::INDEX_TYPE) ==
knowhere::IndexEnum::INDEX_FAISS_IDMAP) {
return false; return false;
} }
auto& field_meta = schema_->operator[](field_id);
auto& field_index_meta = col_index_meta_->GetFieldIndexMeta(field_id);
auto& index_params = field_index_meta.GetIndexParams();
auto enable_binlog_index = [&]() {
// checkout config
if (!segcore_config_.get_enable_interim_segment_index()) {
return false;
}
// check data type
if (!field_meta.is_vector() ||
field_meta.get_data_type() != DataType::VECTOR_FLOAT) {
return false;
}
// check index type
if (index_params.find(knowhere::meta::INDEX_TYPE) ==
index_params.end() ||
field_index_meta.IsFlatIndex()) {
return false;
}
// check index exist
if (vector_indexings_.is_ready(field_id)) {
return false;
}
return true;
};
if (!enable_binlog_index()) {
return false;
}
try {
// get binlog data and meta // get binlog data and meta
auto row_count = num_rows_.value(); auto row_count = num_rows_.value();
auto dim = field_meta.get_dim(); auto dim = field_meta.get_dim();
std::shared_ptr<ColumnBase> vec_data{}; std::shared_ptr<ColumnBase> vec_data{};
{
// field should be exists.
// otherwise, out_of_range exception is thrown by fields_.at
std::shared_lock lck(mutex_);
vec_data = fields_.at(field_id); vec_data = fields_.at(field_id);
}
auto dataset = auto dataset =
knowhere::GenDataSet(row_count, dim, (void*)vec_data->Data()); knowhere::GenDataSet(row_count, dim, (void*)vec_data->Data());
dataset->SetIsOwner(false); dataset->SetIsOwner(false);
@ -1547,6 +1559,9 @@ SegmentSealedImpl::generate_binlog_index(const FieldId field_id) {
field_index_meta, field_index_meta,
segcore_config_, segcore_config_,
SegmentType::Sealed)); SegmentType::Sealed));
if (row_count < field_binlog_config->GetBuildThreshold()) {
return false;
}
auto build_config = field_binlog_config->GetBuildBaseParams(); auto build_config = field_binlog_config->GetBuildBaseParams();
build_config[knowhere::meta::DIM] = std::to_string(dim); build_config[knowhere::meta::DIM] = std::to_string(dim);
build_config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1); build_config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1);
@ -1558,17 +1573,21 @@ SegmentSealedImpl::generate_binlog_index(const FieldId field_id) {
index_metric, index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber()); knowhere::Version::GetCurrentVersion().VersionNumber());
vec_index->BuildWithDataset(dataset, build_config); vec_index->BuildWithDataset(dataset, build_config);
if (enable_binlog_index()) {
std::unique_lock lck(mutex_);
vector_indexings_.append_field_indexing( vector_indexings_.append_field_indexing(
field_id, index_metric, std::move(vec_index)); field_id, index_metric, std::move(vec_index));
vec_binlog_config_[field_id] = std::move(field_binlog_config); vec_binlog_config_[field_id] = std::move(field_binlog_config);
set_bit(binlog_index_bitset_, field_id, true); set_bit(binlog_index_bitset_, field_id, true);
LOG_INFO(
"replace binlog with binlog index in segment {}, field {}.",
this->get_segment_id(),
field_id.get());
}
return true; return true;
} catch (std::exception& e) { } catch (std::exception& e) {
return false; LOG_WARN("fail to generate binlog index, because {}", e.what());
}
} else {
return false; return false;
} }
} }