diff --git a/cpp/src/db/db_impl.cpp b/cpp/src/db/db_impl.cpp index b5c85b1c20..7f7e19a748 100644 --- a/cpp/src/db/db_impl.cpp +++ b/cpp/src/db/db_impl.cpp @@ -1,4 +1,5 @@ #include +#include #include "db_impl.h" namespace vecengine { @@ -9,8 +10,10 @@ DBImpl::DBImpl(const Options& options_, const std::string& name_) _options(options_), _bg_work_finish_signal(_mutex), _bg_compaction_scheduled(false), + _shutting_down(false), _pMeta(new DBMetaImpl(*(_options.pMetaOptions))), _pMemMgr(new MemManager(_pMeta)) { + start_timer_task(Options.memory_sync_interval); } Status DBImpl::add_group(const GroupOptions& options_, @@ -39,7 +42,27 @@ Status DBImpl::get_group_files(const std::string& group_id_, Status DBImpl::add_vectors(const std::string& group_id_, size_t n, const float* vectors, IDNumbers& vector_ids_) { - return _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_); + Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_); + if (!status.ok()) { + return status; + } +} + +void DBImpl::start_timer_task(int interval_) { + std::thread bg_task(&DBImpl::background_timer_task, this, interval_); + bg_task.detach(); +} + +void DBImpl::background_timer_task(int interval_) { + Status status; + while (true) { + if (!_bg_error.ok()) break; + if (_shutting_down.load(std::memory_order_acquire)) break; + + std::this_thread::sleep_for(std::chrono::seconds(interval_)); + + try_schedule_compaction(); + } } void DBImpl::try_schedule_compaction() { @@ -61,14 +84,21 @@ void DBImpl::background_call() { if (!_bg_error.ok()) return; background_compaction(); + + _bg_compaction_scheduled = false; + _bg_work_finish_signal.notify_all(); } void DBImpl::background_compaction() { - + _pMemMgr->serialize(); } -void DBImpl::compact_memory() { - +DBImpl::~DBImpl() { + std::lock_guard _mutex; + _shutting_down.store(true, std::memory_order_release); + while (_bg_compaction_scheduled) { + _bg_work_finish_signal.wait(); + } } /* diff --git a/cpp/src/db/db_impl.h b/cpp/src/db/db_impl.h index 03fc3d3c0b..70ab2c5a2e 100644 --- a/cpp/src/db/db_impl.h +++ b/cpp/src/db/db_impl.h @@ -49,6 +49,7 @@ private: std::condition_variable _bg_work_finish_signal; bool _bg_compaction_scheduled; Status _bg_error; + std::atomic _shutting_down; std::shared_ptr _pMeta; std::shared_ptr _pMemMgr; diff --git a/cpp/src/db/memvectors.cpp b/cpp/src/db/memvectors.cpp index f546fcf02b..f76589a628 100644 --- a/cpp/src/db/memvectors.cpp +++ b/cpp/src/db/memvectors.cpp @@ -52,7 +52,7 @@ MemVectors::~MemVectors() { * MemManager */ -MemVectors* MemManager::get_mem_by_group(const std::string& group_id_) { +VectorsPtr MemManager::get_mem_by_group(const std::string& group_id_) { auto memIt = _memMap.find(group_id_); if memIt != _memMap.end() { return &(memIt->second); @@ -63,15 +63,16 @@ MemVectors* MemManager::get_mem_by_group(const std::string& group_id_) { if (!status.ok()) { return nullptr; } - _memMap[group_id] = MemVectors(group_info.dimension, group_info.next_file_location); - return &(_memMap[group_id]); + _memMap[group_id] = std::shared_ptr(new MemVectors(group_info.dimension, + group_info.next_file_location)); + return _memMap[group_id]; } Status MemManager::add_vectors(const std::string& group_id_, size_t n_, const float* vectors_, IDNumbers& vector_ids_) { - // PXU TODO + std::lock_guard lock(_mutex); return add_vectors_no_lock(group_id_, n_, vectors_, vector_ids_); } @@ -86,5 +87,35 @@ Status MemManager::add_vectors_no_lock(const std::string& group_id_, return mem->add(n, vectors, vector_ids_); } +Status MemManager::mark_memory_as_immutable() { + std::lock_guard lock(_mutex); + for (auto& kv: _memMap) { + _immMems.push_back(kv.second); + } + _memMap.clear(); +} + +/* bool MemManager::need_serialize(double interval) { */ +/* if (_immMems.size() > 0) { */ +/* return false; */ +/* } */ + +/* auto diff = std::difftime(std::time(nullptr), _last_compact_time); */ +/* if (diff >= interval) { */ +/* return true; */ +/* } */ + +/* return false; */ +/* } */ + +Status MemManager::serialize() { + mark_memory_as_immutable(); + for (auto& mem : _immMems) { + mem->serialize() + } + _immMems.clear(); + /* _last_compact_time = std::time(nullptr); */ +} + } // namespace vecengine diff --git a/cpp/src/db/memvectors.h b/cpp/src/db/memvectors.h index a6741971a1..aff162a15e 100644 --- a/cpp/src/db/memvectors.h +++ b/cpp/src/db/memvectors.h @@ -3,6 +3,7 @@ #include #include +#include #include "id_generators.h" #include "status.h" @@ -42,20 +43,28 @@ class Meta; class MemManager { public: - MemManager(const std::shared_ptr& meta_) : _pMeta(meta_) {} + typedef std::shared_ptr VectorsPtr; + MemManager(const std::shared_ptr& meta_) + : _pMeta(meta_), _last_compact_time(std::time(nullptr)) {} - MemVectors* get_mem_by_group(const std::string& group_id_); + VectorsPtr get_mem_by_group(const std::string& group_id_); Status add_vectors(const std::string& group_id_, size_t n_, const float* vectors_, IDNumbers& vector_ids_); + Status serialize(); + private: Status add_vectors_no_lock(const std::string& group_id_, size_t n_, const float* vectors_, IDNumbers& vector_ids_); - typedef std::map MemMap; + typedef std::map MemMap; + typedef std::vector ImmMemPool; MemMap _memMap; + ImmMemPool _immMems; std::shared_ptr _pMeta; + std::time_t _last_compact_time; + std::mutex _mutex; }; // MemManager