diff --git a/configs/milvus.yaml b/configs/milvus.yaml index e43df797d1..49ad04da8d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -366,6 +366,8 @@ common: SearchCacheBudgetGBRatio: 0.125 LoadNumThreadRatio: 8.0 BeamWidthRatio: 4.0 + # This parameter specify how many times the number of threads is the number of cores + threadCoreCoefficient : 50 # please adjust in embedded Milvus: local storageType: minio diff --git a/internal/core/src/common/CMakeLists.txt b/internal/core/src/common/CMakeLists.txt index 888e121153..3b315c5cf4 100644 --- a/internal/core/src/common/CMakeLists.txt +++ b/internal/core/src/common/CMakeLists.txt @@ -17,6 +17,7 @@ set(COMMON_SRC Slice.cpp binary_set_c.cpp init_c.cpp + Common.cpp ) add_library(milvus_common SHARED ${COMMON_SRC}) diff --git a/internal/core/src/common/Common.cpp b/internal/core/src/common/Common.cpp new file mode 100644 index 0000000000..719b09bfa7 --- /dev/null +++ b/internal/core/src/common/Common.cpp @@ -0,0 +1,37 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "common/Common.h" +#include "log/Log.h" + +namespace milvus { + +int64_t index_file_slice_size = DEFAULT_INDEX_FILE_SLICE_SIZE; +int64_t thread_core_coefficient = DEFAULT_THREAD_CORE_COEFFICIENT; + +void +SetIndexSliceSize(const int64_t size) { + index_file_slice_size = size; + LOG_SEGCORE_DEBUG_ << "set config index slice size: " << index_file_slice_size; +} + +void +SetThreadCoreCoefficient(const int64_t coefficient) { + thread_core_coefficient = coefficient; + LOG_SEGCORE_DEBUG_ << "set thread pool core coefficient: " << thread_core_coefficient; +} + +} // namespace milvus diff --git a/internal/core/src/common/Common.h b/internal/core/src/common/Common.h new file mode 100644 index 0000000000..7aa9bd63e9 --- /dev/null +++ b/internal/core/src/common/Common.h @@ -0,0 +1,33 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include "common/Consts.h" + +namespace milvus { + +extern int64_t index_file_slice_size; +extern int64_t thread_core_coefficient; + +void +SetIndexSliceSize(const int64_t size); + +void +SetThreadCoreCoefficient(const int64_t coefficient); + +} // namespace milvus diff --git a/internal/core/src/common/Consts.h b/internal/core/src/common/Consts.h index dd6033cffe..01b70c1042 100644 --- a/internal/core/src/common/Consts.h +++ b/internal/core/src/common/Consts.h @@ -36,3 +36,8 @@ const char INDEX_BUILD_ID_KEY[] = "indexBuildID"; const char INDEX_ROOT_PATH[] = "index_files"; const char RAWDATA_ROOT_PATH[] = "raw_datas"; + +const int DEFAULT_DISK_INDEX_MAX_MEMORY_LIMIT = 2; // gigabytes +const int64_t DEFAULT_THREAD_CORE_COEFFICIENT = 50; + +const int64_t DEFAULT_INDEX_FILE_SLICE_SIZE = 4; // megabytes diff --git a/internal/core/src/common/Slice.cpp b/internal/core/src/common/Slice.cpp index e75d6a762b..1bf77bf263 100644 --- a/internal/core/src/common/Slice.cpp +++ b/internal/core/src/common/Slice.cpp @@ -15,26 +15,17 @@ // limitations under the License. #include "common/Slice.h" +#include "common/Common.h" #include "log/Log.h" namespace milvus { -const int64_t DEFAULT_INDEX_FILE_SLICE_SIZE = 4; // megabytes - static const char* INDEX_FILE_SLICE_META = "SLICE_META"; static const char* META = "meta"; static const char* NAME = "name"; static const char* SLICE_NUM = "slice_num"; static const char* TOTAL_LEN = "total_len"; -int64_t index_file_slice_size = DEFAULT_INDEX_FILE_SLICE_SIZE; - -void -SetIndexSliceSize(const int64_t size) { - index_file_slice_size = size; - LOG_SEGCORE_DEBUG_ << "set config index slice size: " << index_file_slice_size; -} - void Slice( const std::string& prefix, const BinaryPtr& data_src, const int64_t slice_len, BinarySet& binarySet, Config& ret) { diff --git a/internal/core/src/common/Slice.h b/internal/core/src/common/Slice.h index d6b53511be..ef51e3fb51 100644 --- a/internal/core/src/common/Slice.h +++ b/internal/core/src/common/Slice.h @@ -20,11 +20,6 @@ namespace milvus { -extern int64_t index_file_slice_size; - -void -SetIndexSliceSize(const int64_t size); - void Assemble(BinarySet& binarySet); diff --git a/internal/core/src/common/init_c.cpp b/internal/core/src/common/init_c.cpp index a7e3010103..56d7bc6452 100644 --- a/internal/core/src/common/init_c.cpp +++ b/internal/core/src/common/init_c.cpp @@ -21,8 +21,9 @@ #include #include "config/ConfigChunkManager.h" #include "common/Slice.h" +#include "common/Common.h" -std::once_flag flag1, flag2; +std::once_flag flag1, flag2, flag3; void InitLocalRootPath(const char* root_path) { @@ -36,3 +37,9 @@ InitIndexSliceSize(const int64_t size) { std::call_once( flag2, [](int64_t size) { milvus::SetIndexSliceSize(size); }, size); } + +void +InitThreadCoreCoefficient(const int64_t value) { + std::call_once( + flag3, [](int64_t value) { milvus::SetThreadCoreCoefficient(value); }, value); +} diff --git a/internal/core/src/common/init_c.h b/internal/core/src/common/init_c.h index a1bcfa1a50..489d74b50d 100644 --- a/internal/core/src/common/init_c.h +++ b/internal/core/src/common/init_c.h @@ -26,6 +26,9 @@ extern "C" { void InitIndexSliceSize(const int64_t); +void +InitThreadCoreCoefficient(const int64_t); + void InitLocalRootPath(const char*); diff --git a/internal/core/src/storage/CMakeLists.txt b/internal/core/src/storage/CMakeLists.txt index 00d587136e..502b642d97 100644 --- a/internal/core/src/storage/CMakeLists.txt +++ b/internal/core/src/storage/CMakeLists.txt @@ -33,6 +33,7 @@ set(STORAGE_FILES IndexData.cpp InsertData.cpp Event.cpp + ThreadPool.cpp storage_c.cpp) if(BUILD_DISK_ANN STREQUAL "ON") diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index e38a6007ee..3c168bb8ea 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -18,7 +18,7 @@ #include #include -#include "common/Consts.h" +#include "common/Common.h" #include "common/Slice.h" #include "log/Log.h" #include "config/ConfigKnowhere.h" @@ -28,6 +28,7 @@ #include "storage/Exception.h" #include "storage/FieldData.h" #include "storage/IndexData.h" +#include "storage/ThreadPool.h" #include "storage/Util.h" #define FILEMANAGER_TRY try { @@ -77,9 +78,28 @@ DiskFileManagerImpl::LoadFile(const std::string& file) noexcept { return true; } +std::pair +EncodeAndUploadIndexSlice(RemoteChunkManager* remote_chunk_manager, + uint8_t* buf, + int64_t offset, + int64_t batch_size, + IndexMeta index_meta, + FieldDataMeta field_meta, + std::string object_key) { + auto fieldData = std::make_shared(buf + offset, batch_size); + auto indexData = std::make_shared(fieldData); + indexData->set_index_meta(index_meta); + indexData->SetFieldDataMeta(field_meta); + auto serialized_index_data = indexData->serialize_to_remote_file(); + auto serialized_index_size = serialized_index_data.size(); + remote_chunk_manager->Write(object_key, serialized_index_data.data(), serialized_index_size); + return std::pair(object_key, serialized_index_size); +} + bool DiskFileManagerImpl::AddFile(const std::string& file) noexcept { auto& local_chunk_manager = LocalChunkManager::GetInstance(); + auto& pool = ThreadPool::GetInstance(); FILEMANAGER_TRY if (!local_chunk_manager.Exist(file)) { LOG_SEGCORE_ERROR_C << "local file: " << file << " does not exist "; @@ -97,24 +117,21 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept { // Split local data to multi part with specified size int slice_num = 0; auto remotePrefix = GetRemoteIndexObjectPrefix(); + std::vector>> futures; for (int64_t offset = 0; offset < fileSize; slice_num++) { auto batch_size = std::min(index_file_slice_size << 20, int64_t(fileSize) - offset); - auto fieldData = std::make_shared(buf.get() + offset, batch_size); - auto indexData = std::make_shared(fieldData); - indexData->set_index_meta(index_meta_); - indexData->SetFieldDataMeta(field_meta_); - auto serialized_index_data = indexData->serialize_to_remote_file(); - auto serialized_index_size = serialized_index_data.size(); - // Put file to remote char objectKey[200]; snprintf(objectKey, sizeof(objectKey), "%s/%s_%d", remotePrefix.c_str(), fileName.c_str(), slice_num); - rcm_->Write(objectKey, serialized_index_data.data(), serialized_index_size); - + // use multi-thread to put part file + futures.push_back(pool.Submit(EncodeAndUploadIndexSlice, rcm_.get(), buf.get(), offset, batch_size, index_meta_, + field_meta_, std::string(objectKey))); offset += batch_size; - // record remote file to save etcd - remote_paths_to_size_[objectKey] = serialized_index_size; + } + for (auto& future : futures) { + auto res = future.get(); + remote_paths_to_size_[res.first] = res.second; } FILEMANAGER_CATCH FILEMANAGER_END @@ -136,29 +153,84 @@ DiskFileManagerImpl::CacheIndexToDisk(std::vector remote_files) { std::sort(slices.second.begin(), slices.second.end()); } + auto EstimateParalleDegree = [&](const std::string& file) -> uint64_t { + auto fileSize = rcm_->Size(file); + return uint64_t(DEFAULT_DISK_INDEX_MAX_MEMORY_LIMIT / fileSize); + }; + for (auto& slices : index_slices) { auto prefix = slices.first; auto local_index_file_name = GetLocalIndexObjectPrefix() + prefix.substr(prefix.find_last_of("/") + 1); local_chunk_manager.CreateFile(local_index_file_name); int64_t offset = 0; + std::vector batch_remote_files; + uint64_t max_parallel_degree = INT_MAX; for (auto iter = slices.second.begin(); iter != slices.second.end(); iter++) { + if (batch_remote_files.size() == max_parallel_degree) { + auto next_offset = CacheBatchIndexFilesToDisk(batch_remote_files, local_index_file_name, offset); + offset = next_offset; + batch_remote_files.clear(); + } auto origin_file = prefix + "_" + std::to_string(*iter); - auto fileSize = rcm_->Size(origin_file); - auto buf = std::unique_ptr(new uint8_t[fileSize]); - rcm_->Read(origin_file, buf.get(), fileSize); - - auto decoded_index_data = DeserializeFileData(buf.get(), fileSize); - auto index_payload = decoded_index_data->GetPayload(); - auto index_size = index_payload->rows * sizeof(uint8_t); - - local_chunk_manager.Write(local_index_file_name, offset, const_cast(index_payload->raw_data), - index_size); - offset += index_size; + if (batch_remote_files.size() == 0) { + // Use first file size as average size to estimate + max_parallel_degree = EstimateParalleDegree(origin_file); + } + batch_remote_files.push_back(origin_file); + } + if (batch_remote_files.size() > 0) { + auto next_offset = CacheBatchIndexFilesToDisk(batch_remote_files, local_index_file_name, offset); + offset = next_offset; + batch_remote_files.clear(); } local_paths_.emplace_back(local_index_file_name); } } +uint64_t +DownloadAndDecodeRemoteIndexfile(RemoteChunkManager* remote_chunk_manager, + std::string file, + milvus::storage::Payload** index_payload_ptr) { + auto fileSize = remote_chunk_manager->Size(file); + auto buf = std::shared_ptr(new uint8_t[fileSize]); + remote_chunk_manager->Read(file, buf.get(), fileSize); + + auto decoded_index_data = DeserializeFileData(buf.get(), fileSize); + auto index_payload = decoded_index_data->GetPayload(); + auto index_size = index_payload->rows * sizeof(uint8_t); + *index_payload_ptr = index_payload.release(); + return index_size; +} + +uint64_t +DiskFileManagerImpl::CacheBatchIndexFilesToDisk(const std::vector& remote_files, + const std::string& local_file_name, + uint64_t local_file_init_offfset) { + auto& local_chunk_manager = LocalChunkManager::GetInstance(); + auto& pool = ThreadPool::GetInstance(); + int batch_size = remote_files.size(); + std::vector cache_payloads(batch_size); + for (size_t i = 0; i < cache_payloads.size(); ++i) { + cache_payloads[i] = nullptr; + } + std::vector cache_payload_sizes(batch_size); + std::vector> futures; + for (int i = 0; i < batch_size; ++i) { + futures.push_back( + pool.Submit(DownloadAndDecodeRemoteIndexfile, rcm_.get(), remote_files[i], &cache_payloads[i])); + } + for (int i = 0; i < batch_size; ++i) { + cache_payload_sizes[i] = futures[i].get(); + } + uint64_t offset = local_file_init_offfset; + for (int i = 0; i < batch_size; ++i) { + local_chunk_manager.Write(local_file_name, offset, const_cast((cache_payloads[i])->raw_data), + cache_payload_sizes[i]); + offset += cache_payload_sizes[i]; + } + return offset; +} + std::string DiskFileManagerImpl::GetFileName(const std::string& localfile) { boost::filesystem::path localPath(localfile); diff --git a/internal/core/src/storage/DiskFileManagerImpl.h b/internal/core/src/storage/DiskFileManagerImpl.h index 07f1419e95..d967b4fe67 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.h +++ b/internal/core/src/storage/DiskFileManagerImpl.h @@ -24,8 +24,11 @@ #include "storage/IndexData.h" #include "storage/FileManager.h" +#include "storage/LocalChunkManager.h" #include "storage/MinioChunkManager.h" +#include "common/Consts.h" + namespace milvus::storage { class DiskFileManagerImpl : public FileManagerImpl { @@ -76,6 +79,11 @@ class DiskFileManagerImpl : public FileManagerImpl { void CacheIndexToDisk(std::vector remote_files); + uint64_t + CacheBatchIndexFilesToDisk(const std::vector& remote_files, + const std::string& local_file_name, + uint64_t local_file_init_offfset); + FieldDataMeta GetFileDataMeta() const { return field_meta_; diff --git a/internal/core/src/storage/MinioChunkManager.cpp b/internal/core/src/storage/MinioChunkManager.cpp index 27176f2721..d32c01cdd4 100644 --- a/internal/core/src/storage/MinioChunkManager.cpp +++ b/internal/core/src/storage/MinioChunkManager.cpp @@ -122,9 +122,9 @@ MinioChunkManager::MinioChunkManager(const StorageConfig& storage_config) // TODO ::BucketExist and CreateBucket func not work, should be fixed // index node has already tried to create bucket when receive index task if bucket not exist // query node has already tried to create bucket during init stage if bucket not exist - // if (!BucketExists(storage_config.bucket_name)) { - // CreateBucket(storage_config.bucket_name); - // } + // if (!BucketExists(storage_config.bucket_name)) { + // CreateBucket(storage_config.bucket_name); + // } LOG_SEGCORE_INFO_C << "init MinioChunkManager with parameter[endpoint: '" << storage_config.address << "', access_key:'" << storage_config.access_key_id << "', access_value:'" diff --git a/internal/core/src/storage/SafeQueue.h b/internal/core/src/storage/SafeQueue.h new file mode 100644 index 0000000000..1f536837ff --- /dev/null +++ b/internal/core/src/storage/SafeQueue.h @@ -0,0 +1,71 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma + +#include +#include +#include + +namespace milvus { + +template +class SafeQueue { + public: + SafeQueue(void) { + } + ~SafeQueue() { + } + + bool + empty() { + std::shared_lock lock(mutex_); + return queue_.empty(); + } + + void + size() { + std::shared_lock lock(mutex_); + return queue_.size(); + } + + // Add a element to queue + void + enqueue(T t) { + std::unique_lock lock(mutex_); + queue_.push(t); + } + + // Get a available element from queue + bool + dequeue(T& t) { + std::unique_lock lock(mutex_); + + if (queue_.empty()) { + return false; + } + + t = std::move(queue_.front()); + queue_.pop(); + + return true; + } + + private: + std::queue queue_; + mutable std::shared_mutex mutex_; +}; +} // namespace milvus diff --git a/internal/core/src/storage/ThreadPool.cpp b/internal/core/src/storage/ThreadPool.cpp new file mode 100644 index 0000000000..a1392faad8 --- /dev/null +++ b/internal/core/src/storage/ThreadPool.cpp @@ -0,0 +1,38 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ThreadPool.h" + +namespace milvus { + +void +ThreadPool::Init() { + for (int i = 0; i < threads_.size(); i++) { + threads_[i] = std::thread(Worker(this, i)); + } +} + +void +ThreadPool::ShutDown() { + shutdown_ = true; + condition_lock_.notify_all(); + for (int i = 0; i < threads_.size(); i++) { + if (threads_[i].joinable()) { + threads_[i].join(); + } + } +} +}; // namespace milvus diff --git a/internal/core/src/storage/ThreadPool.h b/internal/core/src/storage/ThreadPool.h new file mode 100644 index 0000000000..d422ee4fbe --- /dev/null +++ b/internal/core/src/storage/ThreadPool.h @@ -0,0 +1,117 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "SafeQueue.h" +#include "common/Common.h" +#include "log/Log.h" + +namespace milvus { + +class ThreadPool { + public: + explicit ThreadPool(const int thread_core_coefficient) : shutdown_(false) { + auto thread_num = std::thread::hardware_concurrency() * thread_core_coefficient; + LOG_SEGCORE_INFO_C << "Thread pool's worker num:" << thread_num; + threads_ = std::vector(thread_num); + Init(); + } + + ~ThreadPool() { + ShutDown(); + } + + static ThreadPool& + GetInstance() { + static ThreadPool pool(thread_core_coefficient); + return pool; + } + + ThreadPool(const ThreadPool&) = delete; + ThreadPool(ThreadPool&&) = delete; + ThreadPool& + operator=(const ThreadPool&) = delete; + ThreadPool& + operator=(ThreadPool&&) = delete; + + void + Init(); + + void + ShutDown(); + + template + auto + // Submit(F&& f, Args&&... args) -> std::future; + Submit(F&& f, Args&&... args) -> std::future { + std::function func = std::bind(std::forward(f), std::forward(args)...); + auto task_ptr = std::make_shared>(func); + + std::function wrap_func = [task_ptr]() { (*task_ptr)(); }; + + work_queue_.enqueue(wrap_func); + + condition_lock_.notify_one(); + + return task_ptr->get_future(); + } + + public: + bool shutdown_; + SafeQueue> work_queue_; + std::vector threads_; + std::mutex mutex_; + std::condition_variable condition_lock_; +}; + +class Worker { + private: + int id_; + ThreadPool* pool_; + + public: + Worker(ThreadPool* pool, const int id) : pool_(pool), id_(id) { + } + + void + operator()() { + std::function func; + bool dequeue; + while (!pool_->shutdown_) { + std::unique_lock lock(pool_->mutex_); + if (pool_->work_queue_.empty()) { + pool_->condition_lock_.wait(lock); + } + dequeue = pool_->work_queue_.dequeue(func); + lock.unlock(); + if (dequeue) { + func(); + } + } + } +}; + +} // namespace milvus diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index bfaa342ec5..be3aa531cf 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -82,6 +82,7 @@ if (LINUX) gtest gtest_main milvus_segcore + milvus_storage milvus_indexbuilder ) install(TARGETS index_builder_test DESTINATION unittest) @@ -94,6 +95,7 @@ add_executable(all_tests target_link_libraries(all_tests gtest milvus_segcore + milvus_storage milvus_indexbuilder pthread ) diff --git a/internal/core/unittest/test_disk_file_manager_test.cpp b/internal/core/unittest/test_disk_file_manager_test.cpp index fb26e3a05a..b8a61883af 100644 --- a/internal/core/unittest/test_disk_file_manager_test.cpp +++ b/internal/core/unittest/test_disk_file_manager_test.cpp @@ -9,14 +9,18 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License +#include #include #include #include +#include #include "common/Slice.h" #include "storage/Event.h" #include "storage/LocalChunkManager.h" +#include "storage/MinioChunkManager.h" #include "storage/DiskFileManagerImpl.h" +#include "storage/ThreadPool.h" #include "config/ConfigChunkManager.h" #include "config/ConfigKnowhere.h" #include "test_utils/indexbuilder_test_utils.h" @@ -46,8 +50,12 @@ class DiskAnnFileManagerTest : public testing::Test { TEST_F(DiskAnnFileManagerTest, AddFilePositive) { auto& lcm = LocalChunkManager::GetInstance(); + auto rcm = std::make_unique(storage_config_); string testBucketName = "test-diskann"; storage_config_.bucket_name = testBucketName; + if (!rcm->BucketExists(testBucketName)) { + rcm->CreateBucket(testBucketName); + } std::string indexFilePath = "/tmp/diskann/index_files/1000/index"; auto exist = lcm.Exist(indexFilePath); @@ -73,6 +81,7 @@ TEST_F(DiskAnnFileManagerTest, AddFilePositive) { std::vector remote_files; for (auto& file2size : remote_files_to_size) { + std::cout << file2size.first << std::endl; remote_files.emplace_back(file2size.first); } diskAnnFileManager->CacheIndexToDisk(remote_files); @@ -92,3 +101,104 @@ TEST_F(DiskAnnFileManagerTest, AddFilePositive) { EXPECT_EQ(rawData[4], data[4]); } } + +TEST_F(DiskAnnFileManagerTest, AddFilePositiveParallel) { + auto& lcm = LocalChunkManager::GetInstance(); + auto rcm = std::make_unique(storage_config_); + string testBucketName = "test-diskann"; + storage_config_.bucket_name = testBucketName; + if (!rcm->BucketExists(testBucketName)) { + rcm->CreateBucket(testBucketName); + } + + std::string indexFilePath = "/tmp/diskann/index_files/1000/index"; + auto exist = lcm.Exist(indexFilePath); + EXPECT_EQ(exist, false); + uint64_t index_size = 50 << 20; + lcm.CreateFile(indexFilePath); + std::vector data(index_size); + lcm.Write(indexFilePath, data.data(), index_size); + + // collection_id: 1, partition_id: 2, segment_id: 3 + // field_id: 100, index_build_id: 1000, index_version: 1 + FieldDataMeta filed_data_meta = {1, 2, 3, 100}; + IndexMeta index_meta = {3, 100, 1000, 1, "index"}; + + int64_t slice_size = milvus::index_file_slice_size << 20; + auto diskAnnFileManager = std::make_shared(filed_data_meta, index_meta, storage_config_); + auto ok = diskAnnFileManager->AddFile(indexFilePath); + EXPECT_EQ(ok, true); + + auto remote_files_to_size = diskAnnFileManager->GetRemotePathsToFileSize(); + auto num_slice = index_size / slice_size; + EXPECT_EQ(remote_files_to_size.size(), index_size % slice_size == 0 ? num_slice : num_slice + 1); + + std::vector remote_files; + for (auto& file2size : remote_files_to_size) { + std::cout << file2size.first << std::endl; + remote_files.emplace_back(file2size.first); + } + diskAnnFileManager->CacheIndexToDisk(remote_files); + auto local_files = diskAnnFileManager->GetLocalFilePaths(); + for (auto& file : local_files) { + auto file_size = lcm.Size(file); + auto buf = std::unique_ptr(new uint8_t[file_size]); + lcm.Read(file, buf.get(), file_size); + + auto index = FieldData(buf.get(), file_size); + auto payload = index.get_payload(); + auto rows = payload->rows; + auto rawData = payload->raw_data; + + EXPECT_EQ(rows, index_size); + EXPECT_EQ(rawData[0], data[0]); + EXPECT_EQ(rawData[4], data[4]); + } +} + +int +test_worker(string s) { + std::cout << s << std::endl; + sleep(4); + std::cout << s << std::endl; + return 1; +} + +TEST_F(DiskAnnFileManagerTest, TestThreadPool) { + auto thread_pool = new milvus::ThreadPool(50); + std::vector> futures; + auto start = chrono::system_clock::now(); + for (int i = 0; i < 100; i++) { + futures.push_back(thread_pool->Submit(test_worker, "test_id" + std::to_string(i))); + } + for (auto& future : futures) { + EXPECT_EQ(future.get(), 1); + } + auto end = chrono::system_clock::now(); + auto duration = chrono::duration_cast(end - start); + auto second = double(duration.count()) * chrono::microseconds::period::num / chrono::microseconds::period::den; + EXPECT_LT(second, 4 * 100); +} + +int +test_exception(string s) { + if (s == "test_id60") { + throw std::runtime_error("run time error"); + } + return 1; +} + +TEST_F(DiskAnnFileManagerTest, TestThreadPoolException) { + try { + auto thread_pool = new milvus::ThreadPool(50); + std::vector> futures; + for (int i = 0; i < 100; i++) { + futures.push_back(thread_pool->Submit(test_exception, "test_id" + std::to_string(i))); + } + for (auto& future : futures) { + future.get(); + } + } catch (std::exception& e) { + EXPECT_EQ(std::string(e.what()), "run time error"); + } +} \ No newline at end of file diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 3c903f4bd4..44f49271fa 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -156,6 +156,9 @@ func (i *IndexNode) initKnowhere() { cIndexSliceSize := C.int64_t(Params.CommonCfg.IndexSliceSize) C.InitIndexSliceSize(cIndexSliceSize) + cThreadCoreCoefficient := C.int64_t(Params.CommonCfg.ThreadCoreCoefficient) + C.InitThreadCoreCoefficient(cThreadCoreCoefficient) + initcore.InitLocalStorageConfig(&Params) } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index ab1eac67c9..6fe7dc96ce 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -218,6 +218,9 @@ func (node *QueryNode) InitSegcore() { cIndexSliceSize := C.int64_t(Params.CommonCfg.IndexSliceSize) C.InitIndexSliceSize(cIndexSliceSize) + cThreadCoreCoefficient := C.int64_t(Params.CommonCfg.ThreadCoreCoefficient) + C.InitThreadCoreCoefficient(cThreadCoreCoefficient) + initcore.InitLocalStorageConfig(&Params) } diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 8f9db5c69a..814c935180 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -32,8 +32,9 @@ const ( DefaultRetentionDuration = 3600 * 24 // DefaultIndexSliceSize defines the default slice size of index file when serializing. - DefaultIndexSliceSize = 16 - DefaultGracefulTime = 5000 //ms + DefaultIndexSliceSize = 16 + DefaultGracefulTime = 5000 //ms + DefaultThreadCoreCoefficient = 50 DefaultSessionTTL = 60 //s DefaultSessionRetryTimes = 30 @@ -143,6 +144,7 @@ type commonConfig struct { EntityExpirationTTL time.Duration IndexSliceSize int64 + ThreadCoreCoefficient int64 MaxDegree int64 SearchListSize int64 PGCodeBudgetGBRatio float64 @@ -203,6 +205,7 @@ func (p *commonConfig) init(base *BaseTable) { p.initBeamWidthRatio() p.initGracefulTime() p.initStorageType() + p.initThreadCoreCoefficient() p.initEnableAuthorization() @@ -401,6 +404,10 @@ func (p *commonConfig) initIndexSliceSize() { p.IndexSliceSize = p.Base.ParseInt64WithDefault("common.indexSliceSize", DefaultIndexSliceSize) } +func (p *commonConfig) initThreadCoreCoefficient() { + p.ThreadCoreCoefficient = p.Base.ParseInt64WithDefault("common.threadCoreCoefficient", DefaultThreadCoreCoefficient) +} + func (p *commonConfig) initPGCodeBudgetGBRatio() { p.PGCodeBudgetGBRatio = p.Base.ParseFloatWithDefault("common.DiskIndex.PGCodeBudgetGBRatio", DefaultPGCodeBudgetGBRatio) }