enhance: [2.4] Add disk filemananger parallel load control to reduce the memory consumption (#35282)

issue: #35280 
pr: #35281

Signed-off-by: xianliang.li <xianliang.li@zilliz.com>
This commit is contained in:
foxspy 2024-08-06 15:46:16 +08:00 committed by GitHub
parent 2ac1bf7532
commit 6e8083d4a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -312,18 +312,32 @@ DiskFileManagerImpl::CacheIndexToDisk(
// Get the remote files
std::vector<std::string> batch_remote_files;
batch_remote_files.reserve(slices.second.size());
uint64_t max_parallel_degree =
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
auto appendIndexFiles = [&]() {
auto index_chunks = GetObjectData(rcm_.get(), batch_remote_files);
for (auto& chunk : index_chunks) {
auto index_data = chunk.get()->GetFieldData();
auto index_size = index_data->Size();
auto chunk_data = reinterpret_cast<uint8_t*>(
const_cast<void*>(index_data->Data()));
file.Write(chunk_data, index_size);
}
batch_remote_files.clear();
};
for (int& iter : slices.second) {
auto origin_file = prefix + "_" + std::to_string(iter);
batch_remote_files.push_back(origin_file);
}
auto index_chunks = GetObjectData(rcm_.get(), batch_remote_files);
for (auto& chunk : index_chunks) {
auto index_data = chunk.get()->GetFieldData();
auto index_size = index_data->Size();
auto chunk_data = reinterpret_cast<uint8_t*>(
const_cast<void*>(index_data->Data()));
file.Write(chunk_data, index_size);
if (batch_remote_files.size() == max_parallel_degree) {
appendIndexFiles();
}
}
if (batch_remote_files.size() > 0) {
appendIndexFiles();
}
local_paths_.emplace_back(local_index_file_name);
}