feat(db/env): add threading related schedule

Former-commit-id: d685ead572b229a60aad1342ec821d0b37d58c70
This commit is contained in:
Xu Peng 2019-04-14 18:24:21 +08:00 committed by xj.lin
parent 34d50b5a55
commit 0352db4023
3 changed files with 85 additions and 19 deletions

View File

@ -1,11 +1,49 @@
#inlcude "env.h"
#include "env.h"
namespace vecengine {
namespace zilliz {
namespace vecwise {
namespace engine {
DBConfig::DBConfig()
: _mem_sync_interval(10),
_file_merge_trigger_number(20),
_index_file_build_trigger_size(100000) {
Env::Env()
: _bg_work_cv(&_bg_work_mutex),
_bg_work_started(false) {
}
} // namespace vecengine
void Env::schedule(void (*function_)(void* arg_), void* arg_) {
std::lock_guard<std::mutex> lock;
if (!_bg_work_started) {
_bg_work_started = true;
std::thread bg_thread(Env::BackgroundThreadEntryPoint, this);
bg_thread.detach();
}
if (_bg_work_queue.empty()) {
_bg_work_cv.notify_one();
}
_bg_work_queue.emplace(function_, arg_);
}
void Env::backgroud_thread_main() {
while (true) {
std::lock_guard<std::mutex> lock;
while (_bg_work_queue.empty()) {
_bg_work_cv.wait();
}
assert(!_bg_work_queue.empty());
auto bg_function = _bg_work_queue.front()._function;
void* bg_arg = _bg_work_queue.front()._arg;
_bg_work_queue.pop();
lock.unlock();
bg_function(bg_arg);
}
}
Env::~Env() {}
} // namespace engine
} // namespace vecwise
} // namespace zilliz

View File

@ -1,23 +1,49 @@
#ifndef STORAGE_VECENGINE_ENV_H_
#define STORAGE_VECENGINE_ENV_H_
namespace vecengine {
#include <condition_variable>
#include <thread>
#include <mutex>
#include <queue>
/* struct Options { */
/* std::string _db_location; */
/* size_t _mem_sync_interval; */
/* size_t _file_merge_trigger_number; */
/* size_t _index_file_build_trigger_size; */
/* }; // Config */
namespace zilliz {
namespace vecwise {
namespace engine {
class Env {
public:
Env() = default;
Env();
Env(const Env&) = delete;
Env& operator=(const Env&) = delete;
void schedule(void (*function_)(void* arg_), void* arg_);
virtual ~Env();
protected:
void backgroud_thread_main();
static void BackgroundThreadEntryPoint(Env* env) {
env->backgroud_thread_main();
}
struct BGWork {
explicit BGWork(void (*function_)(void*), void* arg_)
: _function(function_), _arg(arg_) {}
void (* const _function)(void*);
void* const _arg;
};
std::mutex _bg_work_mutex;
std::condition_variable _bg_work_cv;
std::queue<BGWork> _bg_work_queue;
bool _bg_work_started;
private:
Options _option;
}; // Env
} //namespace vecengine
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#endif // STORAGE_VECENGINE_ENV_H_

View File

@ -6,7 +6,9 @@
namespace vecengine {
struct Options {
uint16_t memory_sync_interval = 10;
uint16_t raw_file_merge_trigger_number = 100;
size_t raw_to_index_trigger_size = 100000;
}; // Options