From 8a78d0edabba2d03543b4097560d069ae3961b3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?shuxin=20=E3=80=80=E3=80=80zheng?= Date: Wed, 26 Jul 2023 18:22:43 +0800 Subject: [PATCH 1/6] format codes in pkv. --- app/wizard_demo/pkv/action/redis_handler.cpp | 2 +- app/wizard_demo/pkv/proto/redis_coder.h | 2 +- app/wizard_demo/pkv/proto/redis_object.h | 6 +----- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/app/wizard_demo/pkv/action/redis_handler.cpp b/app/wizard_demo/pkv/action/redis_handler.cpp index 052db2183..9e99deec8 100644 --- a/app/wizard_demo/pkv/action/redis_handler.cpp +++ b/app/wizard_demo/pkv/action/redis_handler.cpp @@ -244,7 +244,7 @@ bool redis_handler::hset(const redis_object &obj) { return false; } - if (!db_->set(key, buff.c_str())) { + if (!db_->set(key, buff)) { logger_error("set key=%s, value=%s error", key, buff.c_str()); return false; } diff --git a/app/wizard_demo/pkv/proto/redis_coder.h b/app/wizard_demo/pkv/proto/redis_coder.h index ed3204c7b..c0453e7de 100644 --- a/app/wizard_demo/pkv/proto/redis_coder.h +++ b/app/wizard_demo/pkv/proto/redis_coder.h @@ -11,7 +11,7 @@ namespace pkv { class redis_coder { public: - redis_coder(size_t cache_max = 10000); + explicit redis_coder(size_t cache_max = 10000); ~redis_coder(); const char* update(const char* data, size_t& len); diff --git a/app/wizard_demo/pkv/proto/redis_object.h b/app/wizard_demo/pkv/proto/redis_object.h index 41b0ee8c3..7bef6787b 100644 --- a/app/wizard_demo/pkv/proto/redis_object.h +++ b/app/wizard_demo/pkv/proto/redis_object.h @@ -10,12 +10,8 @@ namespace pkv { -class redis_object; -using shared_redis = std::shared_ptr; - typedef enum { REDIS_OBJ_UNKOWN, - REDIS_OBJ_NIL, REDIS_OBJ_ERROR, REDIS_OBJ_STATUS, REDIS_OBJ_INTEGER, @@ -85,7 +81,7 @@ private: std::vector objs_; private: - const char* get_line(const char*, size_t&, std::string&, bool&); + static const char* get_line(const char*, size_t&, std::string&, bool&); const char* get_length(const char*, size_t&, int&, bool&); const char* get_data(const char*, size_t&, size_t); From 4d1505a2aebd0198dd68f054872e8fe2b10a7f6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?shuxin=20=E3=80=80=E3=80=80zheng?= Date: Thu, 27 Jul 2023 14:34:53 +0800 Subject: [PATCH 2/6] Add wiredtiger db in pkv. --- app/wizard_demo/pkv/CMakeLists.txt | 17 ++-- app/wizard_demo/pkv/action/redis_handler.cpp | 22 ++--- app/wizard_demo/pkv/db/db.cpp | 16 +++ app/wizard_demo/pkv/db/db.h | 1 + app/wizard_demo/pkv/db/rocksdb/rdb.cpp | 13 ++- app/wizard_demo/pkv/db/rocksdb/rdb.h | 28 +++--- app/wizard_demo/pkv/db/wt/wdb.cpp | 99 +++++++++++++++++++ app/wizard_demo/pkv/db/wt/wdb.h | 53 ++++++++++ app/wizard_demo/pkv/db/wt/wt_sess.cpp | 85 ++++++++++++++++ app/wizard_demo/pkv/db/wt/wt_sess.h | 33 +++++++ app/wizard_demo/pkv/master_service.cpp | 36 +++++-- app/wizard_demo/pkv/pkv.cf | 23 +---- app/wizard_demo/pkv/stdafx.h | 4 + .../samples/benchmark/wiredtiger/Makefile.in | 5 +- .../samples/benchmark/wiredtiger/main.cpp | 2 +- .../samples/redis_pipeline/redis_pipeline.cpp | 18 ++-- 16 files changed, 374 insertions(+), 81 deletions(-) create mode 100644 app/wizard_demo/pkv/db/wt/wdb.cpp create mode 100644 app/wizard_demo/pkv/db/wt/wdb.h create mode 100644 app/wizard_demo/pkv/db/wt/wt_sess.cpp create mode 100644 app/wizard_demo/pkv/db/wt/wt_sess.h diff --git a/app/wizard_demo/pkv/CMakeLists.txt b/app/wizard_demo/pkv/CMakeLists.txt index 3c16ef279..f3fcdc4f6 100644 --- a/app/wizard_demo/pkv/CMakeLists.txt +++ b/app/wizard_demo/pkv/CMakeLists.txt @@ -82,6 +82,7 @@ add_definitions( "-fPIC" "-std=c++17" "-DHAS_ROCKSDB" + "-DHAS_WT" ) #find_library(acl_lib acl_all PATHS /usr/lib /usr/local/lib) @@ -91,25 +92,21 @@ add_definitions( set(acl_all ${home_path}/lib_fiber/lib/libfiber_cpp.a ${home_path}/libacl_all.a) set(fiber ${home_path}/lib_fiber/lib/libfiber.a) -find_library(rocksdb_lib rocksdb PATHS /usr/lib /usr/local/lib) -#set(rocksdb /usr/local/lib/librocksdb.a) -set(rocksdb -lrocksdb) - -find_library(jemalloc jemalloc PATHS /usr/lib /usr/local/lib) -set(jemalloc -ljemalloc) +find_library(jemalloc NAMES jemalloc PATHS /usr/lib /usr/local/lib) +find_library(rocksdb NAMES rocksdb PATHS /usr/lib /usr/local/lib) +find_library(wiredtiger NAMES wiredtiger PATHS /usr/lib /usr/local/lib) #set(wt -lwiredtiger) #set(uring -luring) #set(iconv -liconv) if(CMAKE_SYSTEM_NAME MATCHES "Darwin") -# set(lib_all ${lib_global} ${lib_rpc} ${fiber_cpp_lib} ${acl_lib} ${fiber_lib} set(lib_all ${acl_all} - ${rocksdb} ${fiber} -liconv -lz -lpthread -ldl) + ${rocksdb} ${wiredtiger} ${fiber} -liconv -lz -lpthread -ldl) elseif(CMAKE_SYSTEM_NAME MATCHES "Linux") -# set(lib_all ${lib_global} ${lib_rpc} ${fiber_cpp_lib} ${acl_lib} ${fiber_lib} set(lib_all ${acl_all} - ${rocksdb} ${wt} ${fiber} ${uring} ${iconv} ${jemalloc} -lz -lpthread -ldl) + ${rocksdb} ${wiredtiger} ${fiber} ${uring} ${iconv} ${jemalloc} + -lz -lpthread -ldl) endif() set(output_path ${CMAKE_CURRENT_SOURCE_DIR}) diff --git a/app/wizard_demo/pkv/action/redis_handler.cpp b/app/wizard_demo/pkv/action/redis_handler.cpp index 9e99deec8..6778dc1a8 100644 --- a/app/wizard_demo/pkv/action/redis_handler.cpp +++ b/app/wizard_demo/pkv/action/redis_handler.cpp @@ -115,19 +115,19 @@ bool redis_handler::set(const redis_object &obj) { return false; } -#if 0 - std::string buff; - coder_.create_object().set_string(value); - coder_.to_string(buff); - coder_.clear(); + if (!var_cfg_disable_serialize) { + std::string buff; + coder_.create_object().set_string(value); + coder_.to_string(buff); + coder_.clear(); -# if 1 - if (!db_->set(key, buff.c_str())) { - logger_error("db set error, key=%s", key); - return false; + if (!var_cfg_disable_save) { + if (!db_->set(key, buff.c_str())) { + logger_error("db set error, key=%s", key); + return false; + } + } } -# endif -#endif builder_.create_object().set_status("OK"); return true; diff --git a/app/wizard_demo/pkv/db/db.cpp b/app/wizard_demo/pkv/db/db.cpp index 726150882..e10242686 100644 --- a/app/wizard_demo/pkv/db/db.cpp +++ b/app/wizard_demo/pkv/db/db.cpp @@ -1,5 +1,13 @@ #include "stdafx.h" + +#ifdef HAS_ROCKSDB #include "rocksdb/rdb.h" +#endif + +#ifdef HAS_WT +#include "wt/wdb.h" +#endif + #include "db.h" namespace pkv { @@ -34,4 +42,12 @@ shared_db db::create_rdb() { #endif } +shared_db db::create_wdb() { +#ifdef HAS_WT + return std::make_shared(); +#else + return std::make_shared(); +#endif +} + } // namespace pkv diff --git a/app/wizard_demo/pkv/db/db.h b/app/wizard_demo/pkv/db/db.h index 694f798f1..8ffcbf180 100644 --- a/app/wizard_demo/pkv/db/db.h +++ b/app/wizard_demo/pkv/db/db.h @@ -16,6 +16,7 @@ public: virtual bool del(const std::string& key) = 0; static shared_db create_rdb(); + static shared_db create_wdb(); }; } // namespace pkv diff --git a/app/wizard_demo/pkv/db/rocksdb/rdb.cpp b/app/wizard_demo/pkv/db/rocksdb/rdb.cpp index 5ce878689..cbd30a2e1 100644 --- a/app/wizard_demo/pkv/db/rocksdb/rdb.cpp +++ b/app/wizard_demo/pkv/db/rocksdb/rdb.cpp @@ -19,17 +19,24 @@ rdb::~rdb() { } bool rdb::open(const char* path) { + path_ = path; + path_ += "/rdb"; + + if (acl_make_dirs(path_.c_str(), 0755) == -1) { + logger_error("create %s error=%s", path_.c_str(), acl::last_serror()); + return -1; + } + Options options; options.IncreaseParallelism(); // options.OptimizeLevelStyleCompaction(); options.create_if_missing = true; - Status s = DB::Open(options, path, &db_); + Status s = DB::Open(options, path_.c_str(), &db_); if (!s.ok()) { - logger_error("open %s db error: %s", path, s.getState()); + logger_error("open %s db error: %s", path_.c_str(), s.getState()); return false; } - path_ = path; return true; } diff --git a/app/wizard_demo/pkv/db/rocksdb/rdb.h b/app/wizard_demo/pkv/db/rocksdb/rdb.h index f69be4008..3a08c02d8 100644 --- a/app/wizard_demo/pkv/db/rocksdb/rdb.h +++ b/app/wizard_demo/pkv/db/rocksdb/rdb.h @@ -5,34 +5,34 @@ #ifdef HAS_ROCKSDB namespace rocksdb { - class DB; + class DB; } namespace pkv { class rdb : public db { public: - rdb(); - ~rdb() override; + rdb(); + ~rdb() override; protected: - // @override - bool open(const char* path) override; + // @override + bool open(const char* path) override; - // @override - bool set(const std::string& key, const std::string& value) override; + // @override + bool set(const std::string& key, const std::string& value) override; - // @override - bool get(const std::string& key, std::string& value) override; + // @override + bool get(const std::string& key, std::string& value) override; - // @override - bool del(const std::string& key) override; + // @override + bool del(const std::string& key) override; private: - std::string path_; - rocksdb::DB* db_; + std::string path_; + rocksdb::DB* db_; }; } // namespace pkv -#endif // HAS_ROCKSDB +#endif // HAS_ROCKSDB \ No newline at end of file diff --git a/app/wizard_demo/pkv/db/wt/wdb.cpp b/app/wizard_demo/pkv/db/wt/wdb.cpp new file mode 100644 index 000000000..892a63654 --- /dev/null +++ b/app/wizard_demo/pkv/db/wt/wdb.cpp @@ -0,0 +1,99 @@ +// +// Created by shuxin   zheng on 2023/7/27. +// + +#include "stdafx.h" + +#ifdef HAS_WT + +#include +#include "wt_sess.h" +#include "wdb.h" + +namespace pkv { + +wdb::wdb(size_t cache_max) : db_(nullptr), cache_max_(cache_max) {} + +wdb::~wdb() { + if (db_) { + db_->close(db_, nullptr); + } +} + +bool wdb::open(const char *path) { + path_ = path; + path_ += "/wdb"; + if (acl_make_dirs(path_.c_str(), 0755) == -1) { + logger_error("create dir=%s error=%s", path_.c_str(), acl::last_serror()); + return false; + } + + int ret = wiredtiger_open(path_.c_str(), nullptr, "create", &db_); + if (ret != 0) { + logger_error("open %s error=%d", path_.c_str(), ret); + return false; + } + + return true; +} + +bool wdb::set(const std::string &key, const std::string &value) { + auto sess = get_session(); + if (sess == nullptr) { + return false; + } + + bool ret = sess->add(key, value); + put_session(sess); + return ret; +} + +bool wdb::get(const std::string &key, std::string &value) { + auto sess = get_session(); + if (sess == nullptr) { + return false; + } + + bool ret = sess->get(key, value); + put_session(sess); + return ret; +} + +bool wdb::del(const std::string &key) { + auto sess = get_session(); + if (sess == nullptr) { + return false; + } + + bool ret = sess->del(key); + put_session(sess); + return ret; +} + +wt_sess *wdb::get_session() { + if (sessions_.empty()) { + auto sess = new wt_sess(*this); + if (sess->open()) { + return sess; + } + + delete sess; + return nullptr; + } + + auto sess = sessions_.back(); + sessions_.pop_back(); + return sess; +} + +void wdb::put_session(wt_sess* sess) { + if (sessions_.size() < cache_max_) { + sessions_.emplace_back(sess); + } else { + delete sess; + } +} + +} // namespace pkv + +#endif // HAS_WT \ No newline at end of file diff --git a/app/wizard_demo/pkv/db/wt/wdb.h b/app/wizard_demo/pkv/db/wt/wdb.h new file mode 100644 index 000000000..aa34dbeb1 --- /dev/null +++ b/app/wizard_demo/pkv/db/wt/wdb.h @@ -0,0 +1,53 @@ +// +// Created by shuxin   zheng on 2023/7/27. +// + +#pragma once + +#include "db/db.h" + +#ifdef HAS_WT + +typedef struct __wt_connection WT_CONNECTION; + +namespace pkv { + +class wt_sess; + +class wdb : public db { +public: + explicit wdb(size_t cache_max = 10000); + ~wdb() override; + +protected: + // @override + bool open(const char* path) override; + + // @override + bool set(const std::string& key, const std::string& value) override; + + // @override + bool get(const std::string& key, std::string& value) override; + + // @override + bool del(const std::string& key) override; + +public: + WT_CONNECTION* get_db() const { + return db_; + } + +private: + std::string path_; + WT_CONNECTION *db_; + + std::vector sessions_; + size_t cache_max_; + + wt_sess* get_session(); + void put_session(wt_sess* sess); +}; + +} // namespace pkv + +#endif // HAS_WT \ No newline at end of file diff --git a/app/wizard_demo/pkv/db/wt/wt_sess.cpp b/app/wizard_demo/pkv/db/wt/wt_sess.cpp new file mode 100644 index 000000000..6e7323837 --- /dev/null +++ b/app/wizard_demo/pkv/db/wt/wt_sess.cpp @@ -0,0 +1,85 @@ +// +// Created by shuxin   zheng on 2023/7/27. +// + +#include "stdafx.h" +#include "wdb.h" +#include "wt_sess.h" + +namespace pkv { + +wt_sess::wt_sess(wdb &db) : db_(db), sess_(nullptr), curs_(nullptr) {} + +wt_sess::~wt_sess() = default; + +bool wt_sess::open() { + int ret = db_.get_db()->open_session(db_.get_db(), nullptr, nullptr, &sess_); + if (ret != 0) { + logger_error("open session error %d", ret); + return false; + } + + ret = sess_->create(sess_, "table:access", "key_format=S, value_format=S"); + if (ret != 0) { + logger_error("create session error %d", ret); + return false; + } + + ret = sess_->open_cursor(sess_, "table:access", nullptr, nullptr, &curs_); + if (ret != 0) { + logger_error("open cursor error %d", ret); + return false; + } + + return true; +} + +bool wt_sess::add(const std::string &key, const std::string &value) { + assert(curs_); + curs_->set_key(curs_, key.c_str()); + curs_->set_value(curs_, value.c_str()); + int ret = curs_->insert(curs_); + if (ret != 0) { + logger_error("insert %s %s error=%d", key.c_str(), value.c_str(), ret); + curs_->reset(curs_); + return false; + } + curs_->reset(curs_); + return true; +} + +bool wt_sess::get(const std::string &key, std::string &value) { + assert(curs_); + curs_->set_key(curs_, key.c_str()); + int ret = curs_->search(curs_); + if (ret != 0) { + logger("search key=%s error=%d", key.c_str(), ret); + return false; + } + + const char* v; + ret = curs_->get_value(curs_, &v); + if (ret != 0) { + logger_error("get_value key=%s errort=%d", key.c_str(), ret); + return false; + } + + value = v; + curs_->reset(curs_); + return true; +} + +bool wt_sess::del(const std::string &key) { + assert(curs_); + curs_->set_key(curs_, key.c_str()); + int ret = curs_->remove(curs_); + if (ret != 0) { + logger_error("remove key=%s error=%d", key.c_str(), ret); + return false; + } + + curs_->reset(curs_); + return true; +} + +} // namespace pkv \ No newline at end of file diff --git a/app/wizard_demo/pkv/db/wt/wt_sess.h b/app/wizard_demo/pkv/db/wt/wt_sess.h new file mode 100644 index 000000000..193fbfd55 --- /dev/null +++ b/app/wizard_demo/pkv/db/wt/wt_sess.h @@ -0,0 +1,33 @@ +// +// Created by shuxin   zheng on 2023/7/27. +// + +#pragma once + +#include + +namespace pkv { + +class wdb; + +class wt_sess { +public: + explicit wt_sess(wdb& db); + ~wt_sess(); + + bool open(); + + bool add(const std::string& key, const std::string& value); + + bool get(const std::string& key, std::string& value); + + bool del(const std::string& key); + +private: + wdb& db_; + + WT_SESSION *sess_; + WT_CURSOR *curs_; +}; + +} // namespace pkv diff --git a/app/wizard_demo/pkv/master_service.cpp b/app/wizard_demo/pkv/master_service.cpp index f50f93a7c..cb2ce074f 100644 --- a/app/wizard_demo/pkv/master_service.cpp +++ b/app/wizard_demo/pkv/master_service.cpp @@ -4,27 +4,31 @@ #include "master_service.h" static char *var_cfg_dbpath; +static char *var_cfg_dbtype; acl::master_str_tbl var_conf_str_tab[] = { - { "dbpath", "./dbpath", &var_cfg_dbpath }, + { "dbpath", "./dbpath", &var_cfg_dbpath }, + { "dbtype", "rdb", &var_cfg_dbtype }, - { 0, 0, 0 } + { 0, 0, 0 } }; -static int var_cfg_debug_enable; +int var_cfg_disable_serialize; +int var_cfg_disable_save; acl::master_bool_tbl var_conf_bool_tab[] = { - { "debug_enable", 1, &var_cfg_debug_enable }, + { "disable_serialize", 0, &var_cfg_disable_serialize }, + { "disable_save", 0, &var_cfg_disable_save }, - { 0, 0, 0 } + { 0, 0, 0 } }; static int var_cfg_io_timeout; static int var_cfg_buf_size; acl::master_int_tbl var_conf_int_tab[] = { - { "io_timeout", 120, &var_cfg_io_timeout, 0, 0 }, - { "buf_size", 8192, &var_cfg_buf_size, 0, 0 }, + { "io_timeout", 120, &var_cfg_io_timeout, 0, 0 }, + { "buf_size", 8192, &var_cfg_buf_size, 0, 0 }, { 0, 0 , 0 , 0, 0 } }; @@ -88,9 +92,21 @@ void master_service::proc_on_listen(acl::server_socket& ss) { void master_service::proc_on_init() { logger(">>>proc_on_init<<<"); - db_ = db::create_rdb(); - if (!db_->open(var_cfg_dbpath)) { - logger_error("open db(%s) error %s", var_cfg_dbpath, acl::last_serror()); + if (strcasecmp(var_cfg_dbtype, "rdb") == 0) { + db_ = db::create_rdb(); + if (!db_->open(var_cfg_dbpath)) { + logger_error("open db(%s) error %s", var_cfg_dbpath, + acl::last_serror()); + exit(1); + } + } else if (strcasecmp(var_cfg_dbtype, "wdb") == 0) { + db_ = db::create_wdb(); + if (!db_->open(var_cfg_dbpath)) { + logger_error("open db(%s) error", var_cfg_dbpath); + exit(1); + } + } else { + logger_error("unknown dbtype=%s", var_cfg_dbtype); exit(1); } } diff --git a/app/wizard_demo/pkv/pkv.cf b/app/wizard_demo/pkv/pkv.cf index 2eb0d4a1a..bb7bfb02b 100644 --- a/app/wizard_demo/pkv/pkv.cf +++ b/app/wizard_demo/pkv/pkv.cf @@ -123,26 +123,7 @@ service pkv ############################################################################ # 綺綏援臀蕁 -# mysql ≦医 -# mysql_dbaddr = /tmp/mysql.sock -# mysql_dbaddr = 10.0.250.199:3306 -# 菴・ mysql 井綺菴・羆紊у -# mysql_dbmax = 200 -# ping mysql 菴・顔狗, 篁ョ筝阪篏 -# mysql_dbping = 10 -# mysql 菴・腥咲牙狗顔顔, 篁ョ筝阪篏 -# mysql_dbtimeout = 30 - -# 井綺腱 -# mysql_dbname = fiber_db -# 井綺莅翠 -# mysql_dbuser = fiber_user -# 井綺決翠絲 -# mysql_dbpass = 111111 - -# 莨阪絖倶篆≧ -# debug_mem = 1 -# 筝筝膾睡筝菴・莚 -# loop_read = 1 buf_size = 8192 + dbpath = ./db + dbtype = wdb } diff --git a/app/wizard_demo/pkv/stdafx.h b/app/wizard_demo/pkv/stdafx.h index fc909499c..4ae84592c 100644 --- a/app/wizard_demo/pkv/stdafx.h +++ b/app/wizard_demo/pkv/stdafx.h @@ -60,6 +60,10 @@ #endif // !_WIN32 && !_WIN64 extern acl::master_str_tbl var_conf_str_tab[]; + +extern int var_cfg_disable_serialize; +extern int var_cfg_disable_save; extern acl::master_bool_tbl var_conf_bool_tab[]; + extern acl::master_int_tbl var_conf_int_tab[]; extern acl::master_int64_tbl var_conf_int64_tab[]; diff --git a/lib_acl_cpp/samples/benchmark/wiredtiger/Makefile.in b/lib_acl_cpp/samples/benchmark/wiredtiger/Makefile.in index f6d0243d0..bc713be3b 100644 --- a/lib_acl_cpp/samples/benchmark/wiredtiger/Makefile.in +++ b/lib_acl_cpp/samples/benchmark/wiredtiger/Makefile.in @@ -9,7 +9,7 @@ CFLAGS = -c -g -W -Wall -Wcast-qual -Wcast-align \ ########################################################### #Check system: # Linux, SunOS, Solaris, BSD variants, AIX, HP-UX -SYSLIB = -lpthread -lcrypt +SYSLIB = -lpthread CHECKSYSRES = @echo "Unknow system type!";exit 1 UNIXNAME = $(shell uname -sm) @@ -67,7 +67,8 @@ CFLAGS += -I. -I../.. -I../../../include -I../../../../lib_acl/include -I../../. EXTLIBS = CFLAGS += -I/home/zsx/download/db/wiredtiger/github #EXTLIBS = -L/home/zsx/download/db/wiredtiger/github/.libs -lwiredtiger -Wl,-rpath=/home/zsx/download/db/wiredtiger/github/.libs -EXTLIBS = /usr/local/lib/libwiredtiger.a +#EXTLIBS = /usr/local/lib/libwiredtiger.a +EXTLIBS = -lwiredtiger SYSLIB += -ldl LDFLAGS = -L../../../lib -l_acl_cpp -L../../../../lib_protocol/lib -l_protocol -L../../../../lib_acl/lib -l_acl \ $(EXTLIBS) $(SYSLIB) diff --git a/lib_acl_cpp/samples/benchmark/wiredtiger/main.cpp b/lib_acl_cpp/samples/benchmark/wiredtiger/main.cpp index 63baccb0f..b2968f3c4 100644 --- a/lib_acl_cpp/samples/benchmark/wiredtiger/main.cpp +++ b/lib_acl_cpp/samples/benchmark/wiredtiger/main.cpp @@ -50,7 +50,7 @@ public: ~wdb_sess(void) {} bool open(void) { - bool ret = db_.get_conn()->open_session(db_.get_conn(), + int ret = db_.get_conn()->open_session(db_.get_conn(), NULL, NULL, &session_); if (ret != 0) { printf("open session failed, ret=%d\r\n", ret); diff --git a/lib_fiber/samples/redis_pipeline/redis_pipeline.cpp b/lib_fiber/samples/redis_pipeline/redis_pipeline.cpp index 568a9b482..10bba3a4e 100644 --- a/lib_fiber/samples/redis_pipeline/redis_pipeline.cpp +++ b/lib_fiber/samples/redis_pipeline/redis_pipeline.cpp @@ -12,7 +12,7 @@ static bool test_del(acl::redis_key& cmd, size_t tid, size_t fid, int i) printf("del key: %s error: %s\r\n", key.c_str(), cmd.result_error()); return false; - } else if (i < 10) { + } else if (i < 1) { printf("del ok, key: %s\r\n", key.c_str()); } return true; @@ -27,7 +27,7 @@ static bool test_expire(acl::redis_key& cmd, size_t tid, size_t fid, int i) printf("expire key: %s error: %s\r\n", key.c_str(), cmd.result_error()); return false; - } else if (i < 10) { + } else if (i < 1) { printf("expire ok, key: %s\r\n", key.c_str()); } @@ -44,7 +44,7 @@ static bool test_ttl(acl::redis_key& cmd, size_t tid, size_t fid, int i) printf("get ttl key: %s error: %s\r\n", key.c_str(), cmd.result_error()); return false; - } else if (i < 10) { + } else if (i < 1) { printf("ttl ok, key: %s, ttl: %d\r\n", key.c_str(), ttl); } return true; @@ -56,11 +56,11 @@ static bool test_exists(acl::redis_key& cmd, size_t tid, size_t fid, int i) key.format("%s_%zd_%zd_%d", __keypre.c_str(), tid, fid, i); if (!cmd.exists(key.c_str())) { - if (i < 10) { + if (i < 1) { printf("no exists key: %s\r\n", key.c_str()); } } else { - if (i < 10) { + if (i < 1) { printf("exists key: %s\r\n", key.c_str()); } } @@ -76,7 +76,7 @@ static bool test_type(acl::redis_key& cmd, size_t tid, size_t fid, int i) if (ret == acl::REDIS_KEY_NONE) { printf("unknown type key: %s\r\n", key.c_str()); return false; - } else if (i < 10) { + } else if (i < 1) { printf("type ok, key: %s, ret: %d\r\n", key.c_str(), ret); } return true; @@ -92,7 +92,7 @@ static bool test_set(acl::redis_string& cmd, size_t tid, size_t fid, int i) bool ret = cmd.set(key.c_str(), value.c_str()); return ret; - if (i < 10) { + if (i < 1) { printf("set key: %s, value: %s %s\r\n", key.c_str(), value.c_str(), ret ? "ok" : "error"); } @@ -107,7 +107,7 @@ static bool test_get(acl::redis_string& cmd, size_t tid, size_t fid, int i) acl::string value; bool ret = cmd.get(key.c_str(), value); - if (i < 10) { + if (i < 1) { printf("get key: %s, value: %s %s, len: %d\r\n", key.c_str(), value.c_str(), ret ? "ok" : "error", (int) value.length()); @@ -152,7 +152,7 @@ static bool test_hmget(acl::redis_hash& cmd, size_t tid, size_t fid, int i) return false; } - if (i < 10) { + if (i < 1) { printf("key=%s:", key.c_str()); assert(names.size() == values.size()); size_t n = names.size(); From ae77f1166709fff12ee71635e56b7fde9dbee17e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?shuxin=20=E3=80=80=E3=80=80zheng?= Date: Thu, 27 Jul 2023 15:56:49 +0800 Subject: [PATCH 3/6] Test wiredtiger vs rocksdb been used by pkv. --- app/wizard_demo/pkv/db/wt/wt_sess.cpp | 5 +- app/wizard_demo/pkv/pkv.cf | 2 +- .../redis/redis_pipeline2/redis_pipeline.cpp | 62 ++++++++++++------- 3 files changed, 43 insertions(+), 26 deletions(-) diff --git a/app/wizard_demo/pkv/db/wt/wt_sess.cpp b/app/wizard_demo/pkv/db/wt/wt_sess.cpp index 6e7323837..c09ad443f 100644 --- a/app/wizard_demo/pkv/db/wt/wt_sess.cpp +++ b/app/wizard_demo/pkv/db/wt/wt_sess.cpp @@ -54,6 +54,7 @@ bool wt_sess::get(const std::string &key, std::string &value) { int ret = curs_->search(curs_); if (ret != 0) { logger("search key=%s error=%d", key.c_str(), ret); + curs_->reset(curs_); return false; } @@ -61,6 +62,7 @@ bool wt_sess::get(const std::string &key, std::string &value) { ret = curs_->get_value(curs_, &v); if (ret != 0) { logger_error("get_value key=%s errort=%d", key.c_str(), ret); + curs_->reset(curs_); return false; } @@ -73,8 +75,9 @@ bool wt_sess::del(const std::string &key) { assert(curs_); curs_->set_key(curs_, key.c_str()); int ret = curs_->remove(curs_); - if (ret != 0) { + if (ret != 0 && ret != WT_NOTFOUND) { logger_error("remove key=%s error=%d", key.c_str(), ret); + curs_->reset(curs_); return false; } diff --git a/app/wizard_demo/pkv/pkv.cf b/app/wizard_demo/pkv/pkv.cf index bb7bfb02b..91f3deade 100644 --- a/app/wizard_demo/pkv/pkv.cf +++ b/app/wizard_demo/pkv/pkv.cf @@ -125,5 +125,5 @@ service pkv buf_size = 8192 dbpath = ./db - dbtype = wdb + dbtype = rdb } diff --git a/lib_acl_cpp/samples/redis/redis_pipeline2/redis_pipeline.cpp b/lib_acl_cpp/samples/redis/redis_pipeline2/redis_pipeline.cpp index 3edafb123..7650521c4 100644 --- a/lib_acl_cpp/samples/redis/redis_pipeline2/redis_pipeline.cpp +++ b/lib_acl_cpp/samples/redis/redis_pipeline2/redis_pipeline.cpp @@ -2,20 +2,34 @@ #include "util.h" static int __threads_exit = 0; +static acl::string __cmd("del"); +static acl::atomic_long __count; class redis_command { public: - redis_command(acl::redis_client_pipeline& pipeline, const char* key) - : key_(key) + redis_command(acl::redis_client_pipeline& pipeline) + : cmd_(&pipeline) // Must set pipeline before calling cmd_.get_pipeline_message() , msg_(cmd_.get_pipeline_message()) { cmd_.set_pipeline(&pipeline); - argc_ = 2; - argv_[0] = "del"; - argv_[1] = key_; + msg_.set_option(cmd_.get_dbuf(), 1, NULL); + + long long id = __count.fetch_add(1); + acl::string key; + key.format("key-%lld", id); + argv_[0] = __cmd.c_str(); + argv_[1] = key; lens_[0] = strlen(argv_[0]); lens_[1] = strlen(argv_[1]); + if (__cmd == "set") { + argc_ = 3; + argv_[2] = "value"; + lens_[2] = strlen(argv_[2]); + } else { + argc_ = 2; + } + // computer the hash slot for redis cluster node cmd_.hash_slot(argv_[1]); @@ -36,18 +50,18 @@ public: } private: - acl::string key_; acl::redis cmd_; acl::redis_pipeline_message& msg_; size_t argc_; - const char* argv_[2]; - size_t lens_[2]; + const char* argv_[4]; + size_t lens_[4]; }; class test_thread : public acl::thread { public: - test_thread(acl::locker& locker, acl::redis_client_pipeline& pipeline, + test_thread(acl::locker& locker, + acl::redis_client_pipeline& pipeline, int once_count, int count) : locker_(locker) , pipeline_(pipeline) @@ -61,30 +75,31 @@ public: protected: // @override void* run(void) { - acl::string key; + for (size_t i = 0; i < (size_t) count_; i++) { + run_once(); + } + + locker_.lock(); + __threads_exit++; + locker_.unlock(); + return NULL; + } + + void run_once(void) { // parepare for a lot of redis commands in one request std::vector commands; for (size_t i = 0; i < (size_t) once_count_; i++) { - key.format("test-key-%d", (int) i); - redis_command* command = new redis_command(pipeline_, key); + redis_command* command = new redis_command(pipeline_); commands.push_back(command); } - for (size_t i = 0; i < (size_t) count_; i++) { - request(commands); - } + request(commands); // free all requests commands for (std::vector::iterator it = commands.begin(); it != commands.end(); ++it) { delete *it; } - - locker_.lock(); - __threads_exit++; - locker_.unlock(); - - return NULL; } private: @@ -136,7 +151,6 @@ int main(int argc, char* argv[]) { int ch, count = 10, once_count = 10; int max_threads = 10; acl::string addr("127.0.0.1:6379"), passwd; - acl::string cmd("del"); while ((ch = getopt(argc, argv, "ha:s:N:n:t:p:")) > 0) { switch (ch) { @@ -144,7 +158,7 @@ int main(int argc, char* argv[]) { usage(argv[0]); return 0; case 'a': - cmd = optarg; + __cmd = optarg; break; case 's': addr = optarg; @@ -214,7 +228,7 @@ int main(int argc, char* argv[]) { long long int total = max_threads * once_count * count; double inter = util::stamp_sub(&end, &begin); - printf("total %s: %lld, spent: %0.2f ms, speed: %0.2f\r\n", cmd.c_str(), + printf("total %s: %lld, spent: %0.2f ms, speed: %0.2f\r\n", __cmd.c_str(), total, inter, (total * 1000) /(inter > 0 ? inter : 1)); pipeline.stop_thread(); From 7a7d43f476b62f41240b4e939a7e89fc73069d60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?shuxin=20=E3=80=80=E3=80=80zheng?= Date: Thu, 27 Jul 2023 15:58:57 +0800 Subject: [PATCH 4/6] test pkv --- app/wizard_demo/pkv/pkv.cf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/wizard_demo/pkv/pkv.cf b/app/wizard_demo/pkv/pkv.cf index 91f3deade..9dfbf6e37 100644 --- a/app/wizard_demo/pkv/pkv.cf +++ b/app/wizard_demo/pkv/pkv.cf @@ -124,6 +124,6 @@ service pkv # 綺綏援臀蕁 buf_size = 8192 - dbpath = ./db + dbpath = ./data dbtype = rdb } From 6de660c536154490edf55864544f37148667ce9d Mon Sep 17 00:00:00 2001 From: zhengshuxin Date: Thu, 27 Jul 2023 16:58:07 +0800 Subject: [PATCH 5/6] Optimize and test pkv. --- app/wizard_demo/pkv/CMakeLists.txt | 2 +- app/wizard_demo/pkv/pkv.cf | 4 ++-- app/wizard_demo/pkv/proto/c++_patch.h | 11 +++++++++++ app/wizard_demo/pkv/proto/redis_coder.h | 7 ++++--- app/wizard_demo/pkv/proto/redis_object.h | 15 ++++++++------- 5 files changed, 26 insertions(+), 13 deletions(-) create mode 100644 app/wizard_demo/pkv/proto/c++_patch.h diff --git a/app/wizard_demo/pkv/CMakeLists.txt b/app/wizard_demo/pkv/CMakeLists.txt index f3fcdc4f6..55ce939ed 100644 --- a/app/wizard_demo/pkv/CMakeLists.txt +++ b/app/wizard_demo/pkv/CMakeLists.txt @@ -80,7 +80,7 @@ add_definitions( "-Wno-error=deprecated-declarations" "-Wno-deprecated-declarations" "-fPIC" - "-std=c++17" + "-std=c++11" "-DHAS_ROCKSDB" "-DHAS_WT" ) diff --git a/app/wizard_demo/pkv/pkv.cf b/app/wizard_demo/pkv/pkv.cf index 9dfbf6e37..39c0317fe 100644 --- a/app/wizard_demo/pkv/pkv.cf +++ b/app/wizard_demo/pkv/pkv.cf @@ -9,7 +9,7 @@ service pkv # for master_type = unix # master_service = echo.sock # for master_type = sock - master_service = 127.0.0.1|5001 + master_service = 127.0.0.1|19001 # ∞筝阪絅・ # master_service = aio_echo.sock @@ -125,5 +125,5 @@ service pkv buf_size = 8192 dbpath = ./data - dbtype = rdb + dbtype = wdb } diff --git a/app/wizard_demo/pkv/proto/c++_patch.h b/app/wizard_demo/pkv/proto/c++_patch.h new file mode 100644 index 000000000..dde852ef2 --- /dev/null +++ b/app/wizard_demo/pkv/proto/c++_patch.h @@ -0,0 +1,11 @@ +// +// Created by shuxin zheng on 2023/7/27 +// + +#pragma once + +#if __cplusplus >= 201703L +# define NODISCARD [[nodiscard]] +#else +# define NODISCARD +#endif diff --git a/app/wizard_demo/pkv/proto/redis_coder.h b/app/wizard_demo/pkv/proto/redis_coder.h index c0453e7de..a81fb42bd 100644 --- a/app/wizard_demo/pkv/proto/redis_coder.h +++ b/app/wizard_demo/pkv/proto/redis_coder.h @@ -5,6 +5,7 @@ #pragma once #include +#include "c++_patch.h" #include "redis_object.h" namespace pkv { @@ -16,18 +17,18 @@ public: const char* update(const char* data, size_t& len); - [[nodiscard]] const std::vector& get_objects() const { + NODISCARD const std::vector& get_objects() const { return objs_; } - [[nodiscard]] redis_object* get_curr() const { + NODISCARD redis_object* get_curr() const { return curr_; } void clear(); public: - [[nodiscard]] redis_object& create_object(); + NODISCARD redis_object& create_object(); bool to_string(std::string& out) const; diff --git a/app/wizard_demo/pkv/proto/redis_object.h b/app/wizard_demo/pkv/proto/redis_object.h index 7bef6787b..e2442f4b9 100644 --- a/app/wizard_demo/pkv/proto/redis_object.h +++ b/app/wizard_demo/pkv/proto/redis_object.h @@ -6,6 +6,7 @@ #include #include +#include "c++_patch.h" #include "redis_type.h" namespace pkv { @@ -33,27 +34,27 @@ public: public: const char* update(const char* data, size_t& len); - [[nodiscard]] bool finish() const { + NODISCARD bool finish() const { return status_ == redis_s_finish; } - [[nodiscard]] bool failed() const { + NODISCARD bool failed() const { return status_ == redis_s_null; } - [[nodiscard]] int get_status() const { + NODISCARD int get_status() const { return status_; } - [[nodiscard]] redis_obj_t get_type() const { + NODISCARD redis_obj_t get_type() const { return type_; } - [[nodiscard]] const char* get_cmd() const; + NODISCARD const char* get_cmd() const; - [[nodiscard]] const char* get_str() const; + NODISCARD const char* get_str() const; - [[nodiscard]] const std::vector& get_objects() const { + NODISCARD const std::vector& get_objects() const { return objs_; } From 162b2d4c1d303e0c5f893acc9b67e0666606088e Mon Sep 17 00:00:00 2001 From: zhengshuxin Date: Thu, 27 Jul 2023 17:31:32 +0800 Subject: [PATCH 6/6] Add build condition with c++11 or with c++17 for pkv. --- app/wizard_demo/pkv/CMakeLists.txt | 7 ++++++- app/wizard_demo/pkv/Makefile | 11 ++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/app/wizard_demo/pkv/CMakeLists.txt b/app/wizard_demo/pkv/CMakeLists.txt index 55ce939ed..b544fca1b 100644 --- a/app/wizard_demo/pkv/CMakeLists.txt +++ b/app/wizard_demo/pkv/CMakeLists.txt @@ -80,11 +80,16 @@ add_definitions( "-Wno-error=deprecated-declarations" "-Wno-deprecated-declarations" "-fPIC" - "-std=c++11" "-DHAS_ROCKSDB" "-DHAS_WT" ) +if (${BUILD_WITH_C11} MATCHES "YES") + add_definitions("-std=c++11") +else() + add_definitions("-std=c++17") +endif() + #find_library(acl_lib acl_all PATHS /usr/lib /usr/local/lib) #find_library(fiber_cpp_lib fiber_cpp PATHS /usr/lib /usr/local/lib) #find_library(fiber_lib fiber PATHS /usr/lib /usr/local/lib) diff --git a/app/wizard_demo/pkv/Makefile b/app/wizard_demo/pkv/Makefile index 22975ed7d..60bb52736 100644 --- a/app/wizard_demo/pkv/Makefile +++ b/app/wizard_demo/pkv/Makefile @@ -1,5 +1,14 @@ + +# make BUILD_WITH_C11=YES; or make +# +ifeq ($(BUILD_WITH_C11), YES) + BUILD_ARGS = -DBUILD_WITH_C11=YES +else + BUILD_ARGS = -DBUILD_WITH_C11=NO +endif + all: - @(mkdir -p build; cd build; cmake ..; make -j 4) + @(mkdir -p build; cd build; cmake ${BUILD_ARGS} ..; make -j 4) clean cl: @(rm -rf build pkv)