diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 5168b9549b..03ffe14731 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -19,6 +19,7 @@ Please mark all change in change log and use the ticket from JIRA. ## New Feature - MS-137 - Integrate knowhere +- MS-180 - Add new mem manager ## Task diff --git a/cpp/conf/server_config.template b/cpp/conf/server_config.template index 0383e00b53..f0cd6d5e52 100644 --- a/cpp/conf/server_config.template +++ b/cpp/conf/server_config.template @@ -2,7 +2,7 @@ server_config: address: 0.0.0.0 port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534 gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1 - mode: single # milvus deployment type: single, cluster + mode: single # milvus deployment type: single, cluster, read_only db_config: db_path: @MILVUS_DB_PATH@ # milvus data storage path @@ -15,6 +15,8 @@ db_config: index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB archive_disk_threshold: 512 # triger archive action if storage size exceed this value, unit: GB archive_days_threshold: 30 # files older than x days will be archived, unit: day + maximum_memory: 4 # maximum memory allowed, default: 4, unit: GB, should be at least 1 GB. + # the sum of maximum_memory and cpu_cache_capacity should be less than total memory metric_config: is_startup: off # if monitoring start: on, off diff --git a/cpp/src/db/Constants.h b/cpp/src/db/Constants.h index 2bb2e0a064..1ba02b1d55 100644 --- a/cpp/src/db/Constants.h +++ b/cpp/src/db/Constants.h @@ -11,6 +11,9 @@ namespace engine { const size_t K = 1024UL; const size_t M = K*K; +const size_t G = K*M; +const size_t T = K*G; + const size_t MAX_TABLE_FILE_MEM = 128 * M; const int VECTOR_TYPE_SIZE = sizeof(float); diff --git a/cpp/src/db/MemManager.cpp b/cpp/src/db/MemManager.cpp index e36b0c45ba..ba8517cdbd 100644 --- a/cpp/src/db/MemManager.cpp +++ b/cpp/src/db/MemManager.cpp @@ -8,6 +8,7 @@ #include "MetaConsts.h" #include "EngineFactory.h" #include "metrics/Metrics.h" +#include "Log.h" #include #include @@ -128,6 +129,10 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id, size_t n, const float* vectors, IDNumbers& vector_ids) { + + LOG(DEBUG) << "MemManager::InsertVectorsNoLock: mutable mem = " << GetCurrentMutableMem() << + ", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem(); + MemVectorsPtr mem = GetMemByTable(table_id); if (mem == nullptr) { return Status::NotFound("Group " + table_id + " not found!"); @@ -192,6 +197,26 @@ Status MemManager::EraseMemVector(const std::string& table_id) { return Status::OK(); } +size_t MemManager::GetCurrentMutableMem() { + size_t totalMem = 0; + for (auto& kv : mem_id_map_) { + auto memVector = kv.second; + totalMem += memVector->Size(); + } + return totalMem; +} + +size_t MemManager::GetCurrentImmutableMem() { + size_t totalMem = 0; + for (auto& memVector : immu_mem_list_) { + totalMem += memVector->Size(); + } + return totalMem; +} + +size_t MemManager::GetCurrentMem() { + return GetCurrentMutableMem() + GetCurrentImmutableMem(); +} } // namespace engine } // namespace milvus diff --git a/cpp/src/db/MemManager.h b/cpp/src/db/MemManager.h index 95303889db..e8460c7a6d 100644 --- a/cpp/src/db/MemManager.h +++ b/cpp/src/db/MemManager.h @@ -78,6 +78,12 @@ public: Status EraseMemVector(const std::string& table_id) override; + size_t GetCurrentMutableMem() override; + + size_t GetCurrentImmutableMem() override; + + size_t GetCurrentMem() override; + private: MemVectorsPtr GetMemByTable(const std::string& table_id); diff --git a/cpp/src/db/MemManagerAbstract.h b/cpp/src/db/MemManagerAbstract.h index 74222df1e8..58c73ba6f8 100644 --- a/cpp/src/db/MemManagerAbstract.h +++ b/cpp/src/db/MemManagerAbstract.h @@ -16,6 +16,12 @@ public: virtual Status EraseMemVector(const std::string& table_id) = 0; + virtual size_t GetCurrentMutableMem() = 0; + + virtual size_t GetCurrentImmutableMem() = 0; + + virtual size_t GetCurrentMem() = 0; + }; // MemManagerAbstract using MemManagerAbstractPtr = std::shared_ptr; diff --git a/cpp/src/db/MemTable.cpp b/cpp/src/db/MemTable.cpp index b282ad375a..ba3875fbb5 100644 --- a/cpp/src/db/MemTable.cpp +++ b/cpp/src/db/MemTable.cpp @@ -49,13 +49,15 @@ size_t MemTable::GetTableFileCount() { } Status MemTable::Serialize() { - for (auto& memTableFile : mem_table_file_list_) { - auto status = memTableFile->Serialize(); + for (auto memTableFile = mem_table_file_list_.begin(); memTableFile != mem_table_file_list_.end(); ) { + auto status = (*memTableFile)->Serialize(); if (!status.ok()) { std::string errMsg = "MemTable::Serialize failed: " + status.ToString(); ENGINE_LOG_ERROR << errMsg; return Status::Error(errMsg); } + std::lock_guard lock(mutex_); + memTableFile = mem_table_file_list_.erase(memTableFile); } return Status::OK(); } @@ -64,10 +66,19 @@ bool MemTable::Empty() { return mem_table_file_list_.empty(); } -std::string MemTable::GetTableId() { +const std::string& MemTable::GetTableId() const { return table_id_; } +size_t MemTable::GetCurrentMem() { + std::lock_guard lock(mutex_); + size_t totalMem = 0; + for (auto& memTableFile : mem_table_file_list_) { + totalMem += memTableFile->GetCurrentMem(); + } + return totalMem; +} + } // namespace engine } // namespace milvus } // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/MemTable.h b/cpp/src/db/MemTable.h index e09d6ddac1..9bae932e62 100644 --- a/cpp/src/db/MemTable.h +++ b/cpp/src/db/MemTable.h @@ -4,7 +4,7 @@ #include "MemTableFile.h" #include "VectorSource.h" -#include +#include namespace zilliz { namespace milvus { @@ -30,7 +30,9 @@ public: bool Empty(); - std::string GetTableId(); + const std::string& GetTableId() const; + + size_t GetCurrentMem(); private: const std::string table_id_; @@ -41,6 +43,8 @@ private: Options options_; + std::mutex mutex_; + }; //MemTable } // namespace engine diff --git a/cpp/src/db/NewMemManager.cpp b/cpp/src/db/NewMemManager.cpp index 19aba68eb7..3c78f37101 100644 --- a/cpp/src/db/NewMemManager.cpp +++ b/cpp/src/db/NewMemManager.cpp @@ -1,5 +1,9 @@ #include "NewMemManager.h" #include "VectorSource.h" +#include "Log.h" +#include "Constants.h" + +#include namespace zilliz { namespace milvus { @@ -20,6 +24,9 @@ Status NewMemManager::InsertVectors(const std::string& table_id_, const float* vectors_, IDNumbers& vector_ids_) { + while (GetCurrentMem() > options_.maximum_memory) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } std::unique_lock lock(mutex_); @@ -30,6 +37,10 @@ Status NewMemManager::InsertVectorsNoLock(const std::string& table_id, size_t n, const float* vectors, IDNumbers& vector_ids) { + + LOG(DEBUG) << "NewMemManager::InsertVectorsNoLock: mutable mem = " << GetCurrentMutableMem() << + ", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem(); + MemTablePtr mem = GetMemByTable(table_id); VectorSource::Ptr source = std::make_shared(n, vectors); @@ -64,6 +75,12 @@ Status NewMemManager::Serialize(std::set& table_ids) { table_ids.insert(mem->GetTableId()); } immu_mem_list_.clear(); +// for (auto mem = immu_mem_list_.begin(); mem != immu_mem_list_.end(); ) { +// (*mem)->Serialize(); +// table_ids.insert((*mem)->GetTableId()); +// mem = immu_mem_list_.erase(mem); +// LOG(DEBUG) << "immu_mem_list_ size = " << immu_mem_list_.size(); +// } return Status::OK(); } @@ -87,6 +104,27 @@ Status NewMemManager::EraseMemVector(const std::string& table_id) { return Status::OK(); } +size_t NewMemManager::GetCurrentMutableMem() { + size_t totalMem = 0; + for (auto& kv : mem_id_map_) { + auto memTable = kv.second; + totalMem += memTable->GetCurrentMem(); + } + return totalMem; +} + +size_t NewMemManager::GetCurrentImmutableMem() { + size_t totalMem = 0; + for (auto& memTable : immu_mem_list_) { + totalMem += memTable->GetCurrentMem(); + } + return totalMem; +} + +size_t NewMemManager::GetCurrentMem() { + return GetCurrentMutableMem() + GetCurrentImmutableMem(); +} + } // namespace engine } // namespace milvus } // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/NewMemManager.h b/cpp/src/db/NewMemManager.h index a5f5a9ca13..9883480404 100644 --- a/cpp/src/db/NewMemManager.h +++ b/cpp/src/db/NewMemManager.h @@ -31,6 +31,12 @@ public: Status EraseMemVector(const std::string& table_id) override; + size_t GetCurrentMutableMem() override; + + size_t GetCurrentImmutableMem() override; + + size_t GetCurrentMem() override; + private: MemTablePtr GetMemByTable(const std::string& table_id); diff --git a/cpp/src/db/Options.h b/cpp/src/db/Options.h index 39d0a15019..47bbb45bbc 100644 --- a/cpp/src/db/Options.h +++ b/cpp/src/db/Options.h @@ -61,6 +61,7 @@ struct Options { size_t index_trigger_size = ONE_GB; //unit: byte DBMetaOptions meta; int mode = MODE::SINGLE; + float maximum_memory = 4 * ONE_GB; }; // Options diff --git a/cpp/src/server/DBWrapper.cpp b/cpp/src/server/DBWrapper.cpp index fca15cb65a..bed4440d5e 100644 --- a/cpp/src/server/DBWrapper.cpp +++ b/cpp/src/server/DBWrapper.cpp @@ -23,6 +23,14 @@ DBWrapper::DBWrapper() { if(index_size > 0) {//ensure larger than zero, unit is MB opt.index_trigger_size = (size_t)index_size * engine::ONE_MB; } + float maximum_memory = config.GetFloatValue(CONFIG_MAXMIMUM_MEMORY); + if (maximum_memory > 1.0) { + opt.maximum_memory = maximum_memory * engine::ONE_GB; + } + else { + std::cout << "ERROR: maximum_memory should be at least 1 GB" << std::endl; + kill(0, SIGUSR1); + } ConfigNode& serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER); std::string mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single"); diff --git a/cpp/src/server/ServerConfig.h b/cpp/src/server/ServerConfig.h index 0ec04eed8c..b3b95eb8b6 100644 --- a/cpp/src/server/ServerConfig.h +++ b/cpp/src/server/ServerConfig.h @@ -26,6 +26,7 @@ static const std::string CONFIG_DB_PATH = "db_path"; static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold"; static const std::string CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold"; static const std::string CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold"; +static const std::string CONFIG_MAXMIMUM_MEMORY = "maximum_memory"; static const std::string CONFIG_LOG = "log_config"; diff --git a/cpp/unittest/db/mem_test.cpp b/cpp/unittest/db/mem_test.cpp index 915610adcc..818c3a6388 100644 --- a/cpp/unittest/db/mem_test.cpp +++ b/cpp/unittest/db/mem_test.cpp @@ -8,6 +8,8 @@ #include "db/Constants.h" #include "db/EngineFactory.h" #include "metrics/Metrics.h" +#include "db/MetaConsts.h" +#include "boost/filesystem.hpp" #include #include @@ -34,9 +36,6 @@ namespace { vectors.clear(); vectors.resize(n*TABLE_DIM); float* data = vectors.data(); -// std::random_device rd; -// std::mt19937 gen(rd()); -// std::uniform_real_distribution<> dis(0.0, 1.0); for(int i = 0; i < n; i++) { for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); data[TABLE_DIM * i] += i / 2000.; @@ -44,7 +43,7 @@ namespace { } } -TEST(MEM_TEST, VECTOR_SOURCE_TEST) { +TEST_F(NewMemManagerTest, VECTOR_SOURCE_TEST) { std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); @@ -91,7 +90,7 @@ TEST(MEM_TEST, VECTOR_SOURCE_TEST) { ASSERT_TRUE(status.ok()); } -TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { +TEST_F(NewMemManagerTest, MEM_TABLE_FILE_TEST) { std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); auto options = engine::OptionsFactory::Build(); @@ -135,7 +134,7 @@ TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { ASSERT_TRUE(status.ok()); } -TEST(MEM_TEST, MEM_TABLE_TEST) { +TEST_F(NewMemManagerTest, MEM_TABLE_TEST) { std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); auto options = engine::OptionsFactory::Build(); @@ -201,7 +200,7 @@ TEST(MEM_TEST, MEM_TABLE_TEST) { ASSERT_TRUE(status.ok()); } -TEST(MEM_TEST, MEM_MANAGER_TEST) { +TEST_F(NewMemManagerTest, SERIAL_INSERT_SEARCH_TEST) { auto options = engine::OptionsFactory::Build(); options.meta.path = "/tmp/milvus_test"; @@ -218,7 +217,6 @@ TEST(MEM_TEST, MEM_MANAGER_TEST) { ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); std::map> search_vectors; -// std::map> vectors_ids_map; { engine::IDNumbers vector_ids; int64_t nb = 1024000; @@ -227,24 +225,13 @@ TEST(MEM_TEST, MEM_MANAGER_TEST) { engine::Status status = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); ASSERT_TRUE(status.ok()); -// std::ofstream myfile("mem_test.txt"); -// for (int64_t i = 0; i < nb; ++i) { -// int64_t vector_id = vector_ids[i]; -// std::vector vectors; -// for (int64_t j = 0; j < TABLE_DIM; j++) { -// vectors.emplace_back(xb[i*TABLE_DIM + j]); -//// std::cout << xb[i*TABLE_DIM + j] << std::endl; -// } -// vectors_ids_map[vector_id] = vectors; -// } - std::this_thread::sleep_for(std::chrono::seconds(3)); std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution dis(0, nb - 1); - int64_t numQuery = 1000; + int64_t numQuery = 20; for (int64_t i = 0; i < numQuery; ++i) { int64_t index = dis(gen); std::vector search; @@ -252,17 +239,7 @@ TEST(MEM_TEST, MEM_MANAGER_TEST) { search.push_back(xb[index * TABLE_DIM + j]); } search_vectors.insert(std::make_pair(vector_ids[index], search)); -// std::cout << "index: " << index << " vector_ids[index]: " << vector_ids[index] << std::endl; } - -// for (int64_t i = 0; i < nb; i += 100000) { -// std::vector search; -// for (int64_t j = 0; j < TABLE_DIM; j++) { -// search.push_back(xb[i * TABLE_DIM + j]); -// } -// search_vectors.insert(std::make_pair(vector_ids[i], search)); -// } - } int k = 10; @@ -270,26 +247,16 @@ TEST(MEM_TEST, MEM_MANAGER_TEST) { auto& search = pair.second; engine::QueryResults results; stat = db_->Query(TABLE_NAME, k, 1, search.data(), results); - for(int t = 0; t < k; t++) { -// std::cout << "ID=" << results[0][t].first << " DISTANCE=" << results[0][t].second << std::endl; - -// std::cout << vectors_ids_map[results[0][t].first].size() << std::endl; -// for (auto& data : vectors_ids_map[results[0][t].first]) { -// std::cout << data << " "; -// } -// std::cout << std::endl; - } - // std::cout << "results[0][0].first: " << results[0][0].first << " pair.first: " << pair.first << " results[0][0].second: " << results[0][0].second << std::endl; ASSERT_EQ(results[0][0].first, pair.first); ASSERT_LT(results[0][0].second, 0.00001); } - stat = db_->DropAll(); - ASSERT_TRUE(stat.ok()); + delete db_; + boost::filesystem::remove_all(options.meta.path); } -TEST(MEM_TEST, INSERT_TEST) { +TEST_F(NewMemManagerTest, INSERT_TEST) { auto options = engine::OptionsFactory::Build(); options.meta.path = "/tmp/milvus_test"; @@ -307,9 +274,9 @@ TEST(MEM_TEST, INSERT_TEST) { auto start_time = METRICS_NOW_TIME; - int insert_loop = 1000; + int insert_loop = 20; for (int i = 0; i < insert_loop; ++i) { - int64_t nb = 204800; + int64_t nb = 409600; std::vector xb; BuildVectors(nb, xb); engine::IDNumbers vector_ids; @@ -318,10 +285,91 @@ TEST(MEM_TEST, INSERT_TEST) { } auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time, end_time); - std::cout << "total_time(ms) : " << total_time << std::endl; + LOG(DEBUG) << "total_time spent in INSERT_TEST (ms) : " << total_time; - stat = db_->DropAll(); - ASSERT_TRUE(stat.ok()); + delete db_; + boost::filesystem::remove_all(options.meta.path); } +TEST_F(NewMemManagerTest, CONCURRENT_INSERT_SEARCH_TEST) { + + auto options = engine::OptionsFactory::Build(); + options.meta.path = "/tmp/milvus_test"; + options.meta.backend_uri = "sqlite://:@:/"; + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); + + engine::IDNumbers vector_ids; + engine::IDNumbers target_ids; + + int64_t nb = 409600; + std::vector xb; + BuildVectors(nb, xb); + + int64_t qb = 5; + std::vector qxb; + BuildVectors(qb, qxb); + + std::thread search([&]() { + engine::QueryResults results; + int k = 10; + std::this_thread::sleep_for(std::chrono::seconds(2)); + + INIT_TIMER; + std::stringstream ss; + uint64_t count = 0; + uint64_t prev_count = 0; + + for (auto j=0; j<10; ++j) { + ss.str(""); + db_->Size(count); + prev_count = count; + + START_TIMER; + stat = db_->Query(TABLE_NAME, k, qb, qxb.data(), results); + ss << "Search " << j << " With Size " << count/engine::meta::M << " M"; + STOP_TIMER(ss.str()); + + ASSERT_STATS(stat); + for (auto k=0; k= prev_count); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + }); + + int loop = 20; + + for (auto i=0; iInsertVectors(TABLE_NAME, qb, qxb.data(), target_ids); + ASSERT_EQ(target_ids.size(), qb); + } else { + db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + } + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + search.join(); + + delete db_; + boost::filesystem::remove_all(options.meta.path); + +}; + diff --git a/cpp/unittest/db/utils.cpp b/cpp/unittest/db/utils.cpp index 70c0712549..ae05c59d3b 100644 --- a/cpp/unittest/db/utils.cpp +++ b/cpp/unittest/db/utils.cpp @@ -106,6 +106,18 @@ zilliz::milvus::engine::Options MySQLDBTest::GetOptions() { return options; } +void NewMemManagerTest::InitLog() { + el::Configurations defaultConf; + defaultConf.setToDefault(); + defaultConf.set(el::Level::Debug, + el::ConfigurationType::Format, "[%thread-%datetime-%level]: %msg (%fbase:%line)"); + el::Loggers::reconfigureLogger("default", defaultConf); +} + +void NewMemManagerTest::SetUp() { + InitLog(); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); if (argc > 1) { diff --git a/cpp/unittest/db/utils.h b/cpp/unittest/db/utils.h index 361c24b4be..d06500de5c 100644 --- a/cpp/unittest/db/utils.h +++ b/cpp/unittest/db/utils.h @@ -87,3 +87,8 @@ class MySQLDBTest : public ::testing::Test { protected: zilliz::milvus::engine::Options GetOptions(); }; + +class NewMemManagerTest : public ::testing::Test { + void InitLog(); + virtual void SetUp() override; +};