Parallel operations of disk-ann file manager (#20120) (#20122)

Signed-off-by: zhagnlu <lu.zhang@zilliz.com>

Signed-off-by: zhagnlu <lu.zhang@zilliz.com>
Co-authored-by: zhagnlu <lu.zhang@zilliz.com>
This commit is contained in:
zhagnlu 2022-11-03 14:39:40 +08:00 committed by GitHub
parent fdefcff84a
commit cb2591d1fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 550 additions and 44 deletions

View File

@ -366,6 +366,8 @@ common:
SearchCacheBudgetGBRatio: 0.125
LoadNumThreadRatio: 8.0
BeamWidthRatio: 4.0
# This parameter specify how many times the number of threads is the number of cores
threadCoreCoefficient : 50
# please adjust in embedded Milvus: local
storageType: minio

View File

@ -17,6 +17,7 @@ set(COMMON_SRC
Slice.cpp
binary_set_c.cpp
init_c.cpp
Common.cpp
)
add_library(milvus_common SHARED ${COMMON_SRC})

View File

@ -0,0 +1,37 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "common/Common.h"
#include "log/Log.h"
namespace milvus {
int64_t index_file_slice_size = DEFAULT_INDEX_FILE_SLICE_SIZE;
int64_t thread_core_coefficient = DEFAULT_THREAD_CORE_COEFFICIENT;
void
SetIndexSliceSize(const int64_t size) {
index_file_slice_size = size;
LOG_SEGCORE_DEBUG_ << "set config index slice size: " << index_file_slice_size;
}
void
SetThreadCoreCoefficient(const int64_t coefficient) {
thread_core_coefficient = coefficient;
LOG_SEGCORE_DEBUG_ << "set thread pool core coefficient: " << thread_core_coefficient;
}
} // namespace milvus

View File

@ -0,0 +1,33 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <iostream>
#include "common/Consts.h"
namespace milvus {
extern int64_t index_file_slice_size;
extern int64_t thread_core_coefficient;
void
SetIndexSliceSize(const int64_t size);
void
SetThreadCoreCoefficient(const int64_t coefficient);
} // namespace milvus

View File

@ -36,3 +36,8 @@ const char INDEX_BUILD_ID_KEY[] = "indexBuildID";
const char INDEX_ROOT_PATH[] = "index_files";
const char RAWDATA_ROOT_PATH[] = "raw_datas";
const int DEFAULT_DISK_INDEX_MAX_MEMORY_LIMIT = 2; // gigabytes
const int64_t DEFAULT_THREAD_CORE_COEFFICIENT = 50;
const int64_t DEFAULT_INDEX_FILE_SLICE_SIZE = 4; // megabytes

View File

@ -15,26 +15,17 @@
// limitations under the License.
#include "common/Slice.h"
#include "common/Common.h"
#include "log/Log.h"
namespace milvus {
const int64_t DEFAULT_INDEX_FILE_SLICE_SIZE = 4; // megabytes
static const char* INDEX_FILE_SLICE_META = "SLICE_META";
static const char* META = "meta";
static const char* NAME = "name";
static const char* SLICE_NUM = "slice_num";
static const char* TOTAL_LEN = "total_len";
int64_t index_file_slice_size = DEFAULT_INDEX_FILE_SLICE_SIZE;
void
SetIndexSliceSize(const int64_t size) {
index_file_slice_size = size;
LOG_SEGCORE_DEBUG_ << "set config index slice size: " << index_file_slice_size;
}
void
Slice(
const std::string& prefix, const BinaryPtr& data_src, const int64_t slice_len, BinarySet& binarySet, Config& ret) {

View File

@ -20,11 +20,6 @@
namespace milvus {
extern int64_t index_file_slice_size;
void
SetIndexSliceSize(const int64_t size);
void
Assemble(BinarySet& binarySet);

View File

@ -21,8 +21,9 @@
#include <string>
#include "config/ConfigChunkManager.h"
#include "common/Slice.h"
#include "common/Common.h"
std::once_flag flag1, flag2;
std::once_flag flag1, flag2, flag3;
void
InitLocalRootPath(const char* root_path) {
@ -36,3 +37,9 @@ InitIndexSliceSize(const int64_t size) {
std::call_once(
flag2, [](int64_t size) { milvus::SetIndexSliceSize(size); }, size);
}
void
InitThreadCoreCoefficient(const int64_t value) {
std::call_once(
flag3, [](int64_t value) { milvus::SetThreadCoreCoefficient(value); }, value);
}

View File

@ -26,6 +26,9 @@ extern "C" {
void
InitIndexSliceSize(const int64_t);
void
InitThreadCoreCoefficient(const int64_t);
void
InitLocalRootPath(const char*);

View File

@ -33,6 +33,7 @@ set(STORAGE_FILES
IndexData.cpp
InsertData.cpp
Event.cpp
ThreadPool.cpp
storage_c.cpp)
if(BUILD_DISK_ANN STREQUAL "ON")

View File

@ -18,7 +18,7 @@
#include <boost/filesystem.hpp>
#include <mutex>
#include "common/Consts.h"
#include "common/Common.h"
#include "common/Slice.h"
#include "log/Log.h"
#include "config/ConfigKnowhere.h"
@ -28,6 +28,7 @@
#include "storage/Exception.h"
#include "storage/FieldData.h"
#include "storage/IndexData.h"
#include "storage/ThreadPool.h"
#include "storage/Util.h"
#define FILEMANAGER_TRY try {
@ -77,9 +78,28 @@ DiskFileManagerImpl::LoadFile(const std::string& file) noexcept {
return true;
}
std::pair<std::string, size_t>
EncodeAndUploadIndexSlice(RemoteChunkManager* remote_chunk_manager,
uint8_t* buf,
int64_t offset,
int64_t batch_size,
IndexMeta index_meta,
FieldDataMeta field_meta,
std::string object_key) {
auto fieldData = std::make_shared<FieldData>(buf + offset, batch_size);
auto indexData = std::make_shared<IndexData>(fieldData);
indexData->set_index_meta(index_meta);
indexData->SetFieldDataMeta(field_meta);
auto serialized_index_data = indexData->serialize_to_remote_file();
auto serialized_index_size = serialized_index_data.size();
remote_chunk_manager->Write(object_key, serialized_index_data.data(), serialized_index_size);
return std::pair<std::string, size_t>(object_key, serialized_index_size);
}
bool
DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
auto& local_chunk_manager = LocalChunkManager::GetInstance();
auto& pool = ThreadPool::GetInstance();
FILEMANAGER_TRY
if (!local_chunk_manager.Exist(file)) {
LOG_SEGCORE_ERROR_C << "local file: " << file << " does not exist ";
@ -97,24 +117,21 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
// Split local data to multi part with specified size
int slice_num = 0;
auto remotePrefix = GetRemoteIndexObjectPrefix();
std::vector<std::future<std::pair<std::string, size_t>>> futures;
for (int64_t offset = 0; offset < fileSize; slice_num++) {
auto batch_size = std::min(index_file_slice_size << 20, int64_t(fileSize) - offset);
auto fieldData = std::make_shared<FieldData>(buf.get() + offset, batch_size);
auto indexData = std::make_shared<IndexData>(fieldData);
indexData->set_index_meta(index_meta_);
indexData->SetFieldDataMeta(field_meta_);
auto serialized_index_data = indexData->serialize_to_remote_file();
auto serialized_index_size = serialized_index_data.size();
// Put file to remote
char objectKey[200];
snprintf(objectKey, sizeof(objectKey), "%s/%s_%d", remotePrefix.c_str(), fileName.c_str(), slice_num);
rcm_->Write(objectKey, serialized_index_data.data(), serialized_index_size);
// use multi-thread to put part file
futures.push_back(pool.Submit(EncodeAndUploadIndexSlice, rcm_.get(), buf.get(), offset, batch_size, index_meta_,
field_meta_, std::string(objectKey)));
offset += batch_size;
// record remote file to save etcd
remote_paths_to_size_[objectKey] = serialized_index_size;
}
for (auto& future : futures) {
auto res = future.get();
remote_paths_to_size_[res.first] = res.second;
}
FILEMANAGER_CATCH
FILEMANAGER_END
@ -136,29 +153,84 @@ DiskFileManagerImpl::CacheIndexToDisk(std::vector<std::string> remote_files) {
std::sort(slices.second.begin(), slices.second.end());
}
auto EstimateParalleDegree = [&](const std::string& file) -> uint64_t {
auto fileSize = rcm_->Size(file);
return uint64_t(DEFAULT_DISK_INDEX_MAX_MEMORY_LIMIT / fileSize);
};
for (auto& slices : index_slices) {
auto prefix = slices.first;
auto local_index_file_name = GetLocalIndexObjectPrefix() + prefix.substr(prefix.find_last_of("/") + 1);
local_chunk_manager.CreateFile(local_index_file_name);
int64_t offset = 0;
std::vector<std::string> batch_remote_files;
uint64_t max_parallel_degree = INT_MAX;
for (auto iter = slices.second.begin(); iter != slices.second.end(); iter++) {
if (batch_remote_files.size() == max_parallel_degree) {
auto next_offset = CacheBatchIndexFilesToDisk(batch_remote_files, local_index_file_name, offset);
offset = next_offset;
batch_remote_files.clear();
}
auto origin_file = prefix + "_" + std::to_string(*iter);
auto fileSize = rcm_->Size(origin_file);
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[fileSize]);
rcm_->Read(origin_file, buf.get(), fileSize);
auto decoded_index_data = DeserializeFileData(buf.get(), fileSize);
auto index_payload = decoded_index_data->GetPayload();
auto index_size = index_payload->rows * sizeof(uint8_t);
local_chunk_manager.Write(local_index_file_name, offset, const_cast<uint8_t*>(index_payload->raw_data),
index_size);
offset += index_size;
if (batch_remote_files.size() == 0) {
// Use first file size as average size to estimate
max_parallel_degree = EstimateParalleDegree(origin_file);
}
batch_remote_files.push_back(origin_file);
}
if (batch_remote_files.size() > 0) {
auto next_offset = CacheBatchIndexFilesToDisk(batch_remote_files, local_index_file_name, offset);
offset = next_offset;
batch_remote_files.clear();
}
local_paths_.emplace_back(local_index_file_name);
}
}
uint64_t
DownloadAndDecodeRemoteIndexfile(RemoteChunkManager* remote_chunk_manager,
std::string file,
milvus::storage::Payload** index_payload_ptr) {
auto fileSize = remote_chunk_manager->Size(file);
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[fileSize]);
remote_chunk_manager->Read(file, buf.get(), fileSize);
auto decoded_index_data = DeserializeFileData(buf.get(), fileSize);
auto index_payload = decoded_index_data->GetPayload();
auto index_size = index_payload->rows * sizeof(uint8_t);
*index_payload_ptr = index_payload.release();
return index_size;
}
uint64_t
DiskFileManagerImpl::CacheBatchIndexFilesToDisk(const std::vector<std::string>& remote_files,
const std::string& local_file_name,
uint64_t local_file_init_offfset) {
auto& local_chunk_manager = LocalChunkManager::GetInstance();
auto& pool = ThreadPool::GetInstance();
int batch_size = remote_files.size();
std::vector<milvus::storage::Payload*> cache_payloads(batch_size);
for (size_t i = 0; i < cache_payloads.size(); ++i) {
cache_payloads[i] = nullptr;
}
std::vector<uint64_t> cache_payload_sizes(batch_size);
std::vector<std::future<uint64_t>> futures;
for (int i = 0; i < batch_size; ++i) {
futures.push_back(
pool.Submit(DownloadAndDecodeRemoteIndexfile, rcm_.get(), remote_files[i], &cache_payloads[i]));
}
for (int i = 0; i < batch_size; ++i) {
cache_payload_sizes[i] = futures[i].get();
}
uint64_t offset = local_file_init_offfset;
for (int i = 0; i < batch_size; ++i) {
local_chunk_manager.Write(local_file_name, offset, const_cast<uint8_t*>((cache_payloads[i])->raw_data),
cache_payload_sizes[i]);
offset += cache_payload_sizes[i];
}
return offset;
}
std::string
DiskFileManagerImpl::GetFileName(const std::string& localfile) {
boost::filesystem::path localPath(localfile);

View File

@ -24,8 +24,11 @@
#include "storage/IndexData.h"
#include "storage/FileManager.h"
#include "storage/LocalChunkManager.h"
#include "storage/MinioChunkManager.h"
#include "common/Consts.h"
namespace milvus::storage {
class DiskFileManagerImpl : public FileManagerImpl {
@ -76,6 +79,11 @@ class DiskFileManagerImpl : public FileManagerImpl {
void
CacheIndexToDisk(std::vector<std::string> remote_files);
uint64_t
CacheBatchIndexFilesToDisk(const std::vector<std::string>& remote_files,
const std::string& local_file_name,
uint64_t local_file_init_offfset);
FieldDataMeta
GetFileDataMeta() const {
return field_meta_;

View File

@ -122,9 +122,9 @@ MinioChunkManager::MinioChunkManager(const StorageConfig& storage_config)
// TODO ::BucketExist and CreateBucket func not work, should be fixed
// index node has already tried to create bucket when receive index task if bucket not exist
// query node has already tried to create bucket during init stage if bucket not exist
// if (!BucketExists(storage_config.bucket_name)) {
// CreateBucket(storage_config.bucket_name);
// }
// if (!BucketExists(storage_config.bucket_name)) {
// CreateBucket(storage_config.bucket_name);
// }
LOG_SEGCORE_INFO_C << "init MinioChunkManager with parameter[endpoint: '" << storage_config.address
<< "', access_key:'" << storage_config.access_key_id << "', access_value:'"

View File

@ -0,0 +1,71 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma
#include <queue>
#include <shared_mutex>
#include <utility>
namespace milvus {
template <typename T>
class SafeQueue {
public:
SafeQueue(void) {
}
~SafeQueue() {
}
bool
empty() {
std::shared_lock<std::shared_mutex> lock(mutex_);
return queue_.empty();
}
void
size() {
std::shared_lock<std::shared_mutex> lock(mutex_);
return queue_.size();
}
// Add a element to queue
void
enqueue(T t) {
std::unique_lock<std::shared_mutex> lock(mutex_);
queue_.push(t);
}
// Get a available element from queue
bool
dequeue(T& t) {
std::unique_lock<std::shared_mutex> lock(mutex_);
if (queue_.empty()) {
return false;
}
t = std::move(queue_.front());
queue_.pop();
return true;
}
private:
std::queue<T> queue_;
mutable std::shared_mutex mutex_;
};
} // namespace milvus

View File

@ -0,0 +1,38 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ThreadPool.h"
namespace milvus {
void
ThreadPool::Init() {
for (int i = 0; i < threads_.size(); i++) {
threads_[i] = std::thread(Worker(this, i));
}
}
void
ThreadPool::ShutDown() {
shutdown_ = true;
condition_lock_.notify_all();
for (int i = 0; i < threads_.size(); i++) {
if (threads_[i].joinable()) {
threads_[i].join();
}
}
}
}; // namespace milvus

View File

@ -0,0 +1,117 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <functional>
#include <future>
#include <mutex>
#include <memory>
#include <queue>
#include <thread>
#include <vector>
#include <utility>
#include "SafeQueue.h"
#include "common/Common.h"
#include "log/Log.h"
namespace milvus {
class ThreadPool {
public:
explicit ThreadPool(const int thread_core_coefficient) : shutdown_(false) {
auto thread_num = std::thread::hardware_concurrency() * thread_core_coefficient;
LOG_SEGCORE_INFO_C << "Thread pool's worker num:" << thread_num;
threads_ = std::vector<std::thread>(thread_num);
Init();
}
~ThreadPool() {
ShutDown();
}
static ThreadPool&
GetInstance() {
static ThreadPool pool(thread_core_coefficient);
return pool;
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool&
operator=(const ThreadPool&) = delete;
ThreadPool&
operator=(ThreadPool&&) = delete;
void
Init();
void
ShutDown();
template <typename F, typename... Args>
auto
// Submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))>;
Submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
std::function<void()> wrap_func = [task_ptr]() { (*task_ptr)(); };
work_queue_.enqueue(wrap_func);
condition_lock_.notify_one();
return task_ptr->get_future();
}
public:
bool shutdown_;
SafeQueue<std::function<void()>> work_queue_;
std::vector<std::thread> threads_;
std::mutex mutex_;
std::condition_variable condition_lock_;
};
class Worker {
private:
int id_;
ThreadPool* pool_;
public:
Worker(ThreadPool* pool, const int id) : pool_(pool), id_(id) {
}
void
operator()() {
std::function<void()> func;
bool dequeue;
while (!pool_->shutdown_) {
std::unique_lock<std::mutex> lock(pool_->mutex_);
if (pool_->work_queue_.empty()) {
pool_->condition_lock_.wait(lock);
}
dequeue = pool_->work_queue_.dequeue(func);
lock.unlock();
if (dequeue) {
func();
}
}
}
};
} // namespace milvus

View File

@ -82,6 +82,7 @@ if (LINUX)
gtest
gtest_main
milvus_segcore
milvus_storage
milvus_indexbuilder
)
install(TARGETS index_builder_test DESTINATION unittest)
@ -94,6 +95,7 @@ add_executable(all_tests
target_link_libraries(all_tests
gtest
milvus_segcore
milvus_storage
milvus_indexbuilder
pthread
)

View File

@ -9,14 +9,18 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <chrono>
#include <gtest/gtest.h>
#include <string>
#include <vector>
#include <unistd.h>
#include "common/Slice.h"
#include "storage/Event.h"
#include "storage/LocalChunkManager.h"
#include "storage/MinioChunkManager.h"
#include "storage/DiskFileManagerImpl.h"
#include "storage/ThreadPool.h"
#include "config/ConfigChunkManager.h"
#include "config/ConfigKnowhere.h"
#include "test_utils/indexbuilder_test_utils.h"
@ -46,8 +50,12 @@ class DiskAnnFileManagerTest : public testing::Test {
TEST_F(DiskAnnFileManagerTest, AddFilePositive) {
auto& lcm = LocalChunkManager::GetInstance();
auto rcm = std::make_unique<MinioChunkManager>(storage_config_);
string testBucketName = "test-diskann";
storage_config_.bucket_name = testBucketName;
if (!rcm->BucketExists(testBucketName)) {
rcm->CreateBucket(testBucketName);
}
std::string indexFilePath = "/tmp/diskann/index_files/1000/index";
auto exist = lcm.Exist(indexFilePath);
@ -73,6 +81,7 @@ TEST_F(DiskAnnFileManagerTest, AddFilePositive) {
std::vector<std::string> remote_files;
for (auto& file2size : remote_files_to_size) {
std::cout << file2size.first << std::endl;
remote_files.emplace_back(file2size.first);
}
diskAnnFileManager->CacheIndexToDisk(remote_files);
@ -92,3 +101,104 @@ TEST_F(DiskAnnFileManagerTest, AddFilePositive) {
EXPECT_EQ(rawData[4], data[4]);
}
}
TEST_F(DiskAnnFileManagerTest, AddFilePositiveParallel) {
auto& lcm = LocalChunkManager::GetInstance();
auto rcm = std::make_unique<MinioChunkManager>(storage_config_);
string testBucketName = "test-diskann";
storage_config_.bucket_name = testBucketName;
if (!rcm->BucketExists(testBucketName)) {
rcm->CreateBucket(testBucketName);
}
std::string indexFilePath = "/tmp/diskann/index_files/1000/index";
auto exist = lcm.Exist(indexFilePath);
EXPECT_EQ(exist, false);
uint64_t index_size = 50 << 20;
lcm.CreateFile(indexFilePath);
std::vector<uint8_t> data(index_size);
lcm.Write(indexFilePath, data.data(), index_size);
// collection_id: 1, partition_id: 2, segment_id: 3
// field_id: 100, index_build_id: 1000, index_version: 1
FieldDataMeta filed_data_meta = {1, 2, 3, 100};
IndexMeta index_meta = {3, 100, 1000, 1, "index"};
int64_t slice_size = milvus::index_file_slice_size << 20;
auto diskAnnFileManager = std::make_shared<DiskFileManagerImpl>(filed_data_meta, index_meta, storage_config_);
auto ok = diskAnnFileManager->AddFile(indexFilePath);
EXPECT_EQ(ok, true);
auto remote_files_to_size = diskAnnFileManager->GetRemotePathsToFileSize();
auto num_slice = index_size / slice_size;
EXPECT_EQ(remote_files_to_size.size(), index_size % slice_size == 0 ? num_slice : num_slice + 1);
std::vector<std::string> remote_files;
for (auto& file2size : remote_files_to_size) {
std::cout << file2size.first << std::endl;
remote_files.emplace_back(file2size.first);
}
diskAnnFileManager->CacheIndexToDisk(remote_files);
auto local_files = diskAnnFileManager->GetLocalFilePaths();
for (auto& file : local_files) {
auto file_size = lcm.Size(file);
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[file_size]);
lcm.Read(file, buf.get(), file_size);
auto index = FieldData(buf.get(), file_size);
auto payload = index.get_payload();
auto rows = payload->rows;
auto rawData = payload->raw_data;
EXPECT_EQ(rows, index_size);
EXPECT_EQ(rawData[0], data[0]);
EXPECT_EQ(rawData[4], data[4]);
}
}
int
test_worker(string s) {
std::cout << s << std::endl;
sleep(4);
std::cout << s << std::endl;
return 1;
}
TEST_F(DiskAnnFileManagerTest, TestThreadPool) {
auto thread_pool = new milvus::ThreadPool(50);
std::vector<std::future<int>> futures;
auto start = chrono::system_clock::now();
for (int i = 0; i < 100; i++) {
futures.push_back(thread_pool->Submit(test_worker, "test_id" + std::to_string(i)));
}
for (auto& future : futures) {
EXPECT_EQ(future.get(), 1);
}
auto end = chrono::system_clock::now();
auto duration = chrono::duration_cast<chrono::microseconds>(end - start);
auto second = double(duration.count()) * chrono::microseconds::period::num / chrono::microseconds::period::den;
EXPECT_LT(second, 4 * 100);
}
int
test_exception(string s) {
if (s == "test_id60") {
throw std::runtime_error("run time error");
}
return 1;
}
TEST_F(DiskAnnFileManagerTest, TestThreadPoolException) {
try {
auto thread_pool = new milvus::ThreadPool(50);
std::vector<std::future<int>> futures;
for (int i = 0; i < 100; i++) {
futures.push_back(thread_pool->Submit(test_exception, "test_id" + std::to_string(i)));
}
for (auto& future : futures) {
future.get();
}
} catch (std::exception& e) {
EXPECT_EQ(std::string(e.what()), "run time error");
}
}

View File

@ -156,6 +156,9 @@ func (i *IndexNode) initKnowhere() {
cIndexSliceSize := C.int64_t(Params.CommonCfg.IndexSliceSize)
C.InitIndexSliceSize(cIndexSliceSize)
cThreadCoreCoefficient := C.int64_t(Params.CommonCfg.ThreadCoreCoefficient)
C.InitThreadCoreCoefficient(cThreadCoreCoefficient)
initcore.InitLocalStorageConfig(&Params)
}

View File

@ -218,6 +218,9 @@ func (node *QueryNode) InitSegcore() {
cIndexSliceSize := C.int64_t(Params.CommonCfg.IndexSliceSize)
C.InitIndexSliceSize(cIndexSliceSize)
cThreadCoreCoefficient := C.int64_t(Params.CommonCfg.ThreadCoreCoefficient)
C.InitThreadCoreCoefficient(cThreadCoreCoefficient)
initcore.InitLocalStorageConfig(&Params)
}

View File

@ -32,8 +32,9 @@ const (
DefaultRetentionDuration = 3600 * 24
// DefaultIndexSliceSize defines the default slice size of index file when serializing.
DefaultIndexSliceSize = 16
DefaultGracefulTime = 5000 //ms
DefaultIndexSliceSize = 16
DefaultGracefulTime = 5000 //ms
DefaultThreadCoreCoefficient = 50
DefaultSessionTTL = 60 //s
DefaultSessionRetryTimes = 30
@ -143,6 +144,7 @@ type commonConfig struct {
EntityExpirationTTL time.Duration
IndexSliceSize int64
ThreadCoreCoefficient int64
MaxDegree int64
SearchListSize int64
PGCodeBudgetGBRatio float64
@ -203,6 +205,7 @@ func (p *commonConfig) init(base *BaseTable) {
p.initBeamWidthRatio()
p.initGracefulTime()
p.initStorageType()
p.initThreadCoreCoefficient()
p.initEnableAuthorization()
@ -401,6 +404,10 @@ func (p *commonConfig) initIndexSliceSize() {
p.IndexSliceSize = p.Base.ParseInt64WithDefault("common.indexSliceSize", DefaultIndexSliceSize)
}
func (p *commonConfig) initThreadCoreCoefficient() {
p.ThreadCoreCoefficient = p.Base.ParseInt64WithDefault("common.threadCoreCoefficient", DefaultThreadCoreCoefficient)
}
func (p *commonConfig) initPGCodeBudgetGBRatio() {
p.PGCodeBudgetGBRatio = p.Base.ParseFloatWithDefault("common.DiskIndex.PGCodeBudgetGBRatio", DefaultPGCodeBudgetGBRatio)
}