Merge branch 'gitee-master' into gitlab-upstream

This commit is contained in:
zhengshuxin 2023-07-27 17:34:16 +08:00
commit 27565ee8df
21 changed files with 458 additions and 125 deletions

View File

@ -80,10 +80,16 @@ add_definitions(
"-Wno-error=deprecated-declarations" "-Wno-error=deprecated-declarations"
"-Wno-deprecated-declarations" "-Wno-deprecated-declarations"
"-fPIC" "-fPIC"
"-std=c++17"
"-DHAS_ROCKSDB" "-DHAS_ROCKSDB"
"-DHAS_WT"
) )
if (${BUILD_WITH_C11} MATCHES "YES")
add_definitions("-std=c++11")
else()
add_definitions("-std=c++17")
endif()
#find_library(acl_lib acl_all PATHS /usr/lib /usr/local/lib) #find_library(acl_lib acl_all PATHS /usr/lib /usr/local/lib)
#find_library(fiber_cpp_lib fiber_cpp PATHS /usr/lib /usr/local/lib) #find_library(fiber_cpp_lib fiber_cpp PATHS /usr/lib /usr/local/lib)
#find_library(fiber_lib fiber PATHS /usr/lib /usr/local/lib) #find_library(fiber_lib fiber PATHS /usr/lib /usr/local/lib)
@ -91,25 +97,21 @@ add_definitions(
set(acl_all ${home_path}/lib_fiber/lib/libfiber_cpp.a ${home_path}/libacl_all.a) set(acl_all ${home_path}/lib_fiber/lib/libfiber_cpp.a ${home_path}/libacl_all.a)
set(fiber ${home_path}/lib_fiber/lib/libfiber.a) set(fiber ${home_path}/lib_fiber/lib/libfiber.a)
find_library(rocksdb_lib rocksdb PATHS /usr/lib /usr/local/lib) find_library(jemalloc NAMES jemalloc PATHS /usr/lib /usr/local/lib)
#set(rocksdb /usr/local/lib/librocksdb.a) find_library(rocksdb NAMES rocksdb PATHS /usr/lib /usr/local/lib)
set(rocksdb -lrocksdb) find_library(wiredtiger NAMES wiredtiger PATHS /usr/lib /usr/local/lib)
find_library(jemalloc jemalloc PATHS /usr/lib /usr/local/lib)
set(jemalloc -ljemalloc)
#set(wt -lwiredtiger) #set(wt -lwiredtiger)
#set(uring -luring) #set(uring -luring)
#set(iconv -liconv) #set(iconv -liconv)
if(CMAKE_SYSTEM_NAME MATCHES "Darwin") if(CMAKE_SYSTEM_NAME MATCHES "Darwin")
# set(lib_all ${lib_global} ${lib_rpc} ${fiber_cpp_lib} ${acl_lib} ${fiber_lib}
set(lib_all ${acl_all} set(lib_all ${acl_all}
${rocksdb} ${fiber} -liconv -lz -lpthread -ldl) ${rocksdb} ${wiredtiger} ${fiber} -liconv -lz -lpthread -ldl)
elseif(CMAKE_SYSTEM_NAME MATCHES "Linux") elseif(CMAKE_SYSTEM_NAME MATCHES "Linux")
# set(lib_all ${lib_global} ${lib_rpc} ${fiber_cpp_lib} ${acl_lib} ${fiber_lib}
set(lib_all ${acl_all} set(lib_all ${acl_all}
${rocksdb} ${wt} ${fiber} ${uring} ${iconv} ${jemalloc} -lz -lpthread -ldl) ${rocksdb} ${wiredtiger} ${fiber} ${uring} ${iconv} ${jemalloc}
-lz -lpthread -ldl)
endif() endif()
set(output_path ${CMAKE_CURRENT_SOURCE_DIR}) set(output_path ${CMAKE_CURRENT_SOURCE_DIR})

View File

@ -1,5 +1,14 @@
# make BUILD_WITH_C11=YES; or make
#
ifeq ($(BUILD_WITH_C11), YES)
BUILD_ARGS = -DBUILD_WITH_C11=YES
else
BUILD_ARGS = -DBUILD_WITH_C11=NO
endif
all: all:
@(mkdir -p build; cd build; cmake ..; make -j 4) @(mkdir -p build; cd build; cmake ${BUILD_ARGS} ..; make -j 4)
clean cl: clean cl:
@(rm -rf build pkv) @(rm -rf build pkv)

View File

@ -115,19 +115,19 @@ bool redis_handler::set(const redis_object &obj) {
return false; return false;
} }
#if 0 if (!var_cfg_disable_serialize) {
std::string buff; std::string buff;
coder_.create_object().set_string(value); coder_.create_object().set_string(value);
coder_.to_string(buff); coder_.to_string(buff);
coder_.clear(); coder_.clear();
# if 1 if (!var_cfg_disable_save) {
if (!db_->set(key, buff.c_str())) { if (!db_->set(key, buff.c_str())) {
logger_error("db set error, key=%s", key); logger_error("db set error, key=%s", key);
return false; return false;
}
}
} }
# endif
#endif
builder_.create_object().set_status("OK"); builder_.create_object().set_status("OK");
return true; return true;
@ -244,7 +244,7 @@ bool redis_handler::hset(const redis_object &obj) {
return false; return false;
} }
if (!db_->set(key, buff.c_str())) { if (!db_->set(key, buff)) {
logger_error("set key=%s, value=%s error", key, buff.c_str()); logger_error("set key=%s, value=%s error", key, buff.c_str());
return false; return false;
} }

View File

@ -1,5 +1,13 @@
#include "stdafx.h" #include "stdafx.h"
#ifdef HAS_ROCKSDB
#include "rocksdb/rdb.h" #include "rocksdb/rdb.h"
#endif
#ifdef HAS_WT
#include "wt/wdb.h"
#endif
#include "db.h" #include "db.h"
namespace pkv { namespace pkv {
@ -34,4 +42,12 @@ shared_db db::create_rdb() {
#endif #endif
} }
shared_db db::create_wdb() {
#ifdef HAS_WT
return std::make_shared<wdb>();
#else
return std::make_shared<dummy_db>();
#endif
}
} // namespace pkv } // namespace pkv

View File

@ -16,6 +16,7 @@ public:
virtual bool del(const std::string& key) = 0; virtual bool del(const std::string& key) = 0;
static shared_db create_rdb(); static shared_db create_rdb();
static shared_db create_wdb();
}; };
} // namespace pkv } // namespace pkv

View File

@ -19,17 +19,24 @@ rdb::~rdb() {
} }
bool rdb::open(const char* path) { bool rdb::open(const char* path) {
path_ = path;
path_ += "/rdb";
if (acl_make_dirs(path_.c_str(), 0755) == -1) {
logger_error("create %s error=%s", path_.c_str(), acl::last_serror());
return -1;
}
Options options; Options options;
options.IncreaseParallelism(); options.IncreaseParallelism();
// options.OptimizeLevelStyleCompaction(); // options.OptimizeLevelStyleCompaction();
options.create_if_missing = true; options.create_if_missing = true;
Status s = DB::Open(options, path, &db_); Status s = DB::Open(options, path_.c_str(), &db_);
if (!s.ok()) { if (!s.ok()) {
logger_error("open %s db error: %s", path, s.getState()); logger_error("open %s db error: %s", path_.c_str(), s.getState());
return false; return false;
} }
path_ = path;
return true; return true;
} }

View File

@ -5,34 +5,34 @@
#ifdef HAS_ROCKSDB #ifdef HAS_ROCKSDB
namespace rocksdb { namespace rocksdb {
class DB; class DB;
} }
namespace pkv { namespace pkv {
class rdb : public db { class rdb : public db {
public: public:
rdb(); rdb();
~rdb() override; ~rdb() override;
protected: protected:
// @override // @override
bool open(const char* path) override; bool open(const char* path) override;
// @override // @override
bool set(const std::string& key, const std::string& value) override; bool set(const std::string& key, const std::string& value) override;
// @override // @override
bool get(const std::string& key, std::string& value) override; bool get(const std::string& key, std::string& value) override;
// @override // @override
bool del(const std::string& key) override; bool del(const std::string& key) override;
private: private:
std::string path_; std::string path_;
rocksdb::DB* db_; rocksdb::DB* db_;
}; };
} // namespace pkv } // namespace pkv
#endif // HAS_ROCKSDB #endif // HAS_ROCKSDB

View File

@ -0,0 +1,99 @@
//
// Created by shuxin ¡¡¡¡zheng on 2023/7/27.
//
#include "stdafx.h"
#ifdef HAS_WT
#include <wiredtiger.h>
#include "wt_sess.h"
#include "wdb.h"
namespace pkv {
wdb::wdb(size_t cache_max) : db_(nullptr), cache_max_(cache_max) {}
wdb::~wdb() {
if (db_) {
db_->close(db_, nullptr);
}
}
bool wdb::open(const char *path) {
path_ = path;
path_ += "/wdb";
if (acl_make_dirs(path_.c_str(), 0755) == -1) {
logger_error("create dir=%s error=%s", path_.c_str(), acl::last_serror());
return false;
}
int ret = wiredtiger_open(path_.c_str(), nullptr, "create", &db_);
if (ret != 0) {
logger_error("open %s error=%d", path_.c_str(), ret);
return false;
}
return true;
}
bool wdb::set(const std::string &key, const std::string &value) {
auto sess = get_session();
if (sess == nullptr) {
return false;
}
bool ret = sess->add(key, value);
put_session(sess);
return ret;
}
bool wdb::get(const std::string &key, std::string &value) {
auto sess = get_session();
if (sess == nullptr) {
return false;
}
bool ret = sess->get(key, value);
put_session(sess);
return ret;
}
bool wdb::del(const std::string &key) {
auto sess = get_session();
if (sess == nullptr) {
return false;
}
bool ret = sess->del(key);
put_session(sess);
return ret;
}
wt_sess *wdb::get_session() {
if (sessions_.empty()) {
auto sess = new wt_sess(*this);
if (sess->open()) {
return sess;
}
delete sess;
return nullptr;
}
auto sess = sessions_.back();
sessions_.pop_back();
return sess;
}
void wdb::put_session(wt_sess* sess) {
if (sessions_.size() < cache_max_) {
sessions_.emplace_back(sess);
} else {
delete sess;
}
}
} // namespace pkv
#endif // HAS_WT

View File

@ -0,0 +1,53 @@
//
// Created by shuxin ¡¡¡¡zheng on 2023/7/27.
//
#pragma once
#include "db/db.h"
#ifdef HAS_WT
typedef struct __wt_connection WT_CONNECTION;
namespace pkv {
class wt_sess;
class wdb : public db {
public:
explicit wdb(size_t cache_max = 10000);
~wdb() override;
protected:
// @override
bool open(const char* path) override;
// @override
bool set(const std::string& key, const std::string& value) override;
// @override
bool get(const std::string& key, std::string& value) override;
// @override
bool del(const std::string& key) override;
public:
WT_CONNECTION* get_db() const {
return db_;
}
private:
std::string path_;
WT_CONNECTION *db_;
std::vector<wt_sess*> sessions_;
size_t cache_max_;
wt_sess* get_session();
void put_session(wt_sess* sess);
};
} // namespace pkv
#endif // HAS_WT

View File

@ -0,0 +1,88 @@
//
// Created by shuxin ¡¡¡¡zheng on 2023/7/27.
//
#include "stdafx.h"
#include "wdb.h"
#include "wt_sess.h"
namespace pkv {
wt_sess::wt_sess(wdb &db) : db_(db), sess_(nullptr), curs_(nullptr) {}
wt_sess::~wt_sess() = default;
bool wt_sess::open() {
int ret = db_.get_db()->open_session(db_.get_db(), nullptr, nullptr, &sess_);
if (ret != 0) {
logger_error("open session error %d", ret);
return false;
}
ret = sess_->create(sess_, "table:access", "key_format=S, value_format=S");
if (ret != 0) {
logger_error("create session error %d", ret);
return false;
}
ret = sess_->open_cursor(sess_, "table:access", nullptr, nullptr, &curs_);
if (ret != 0) {
logger_error("open cursor error %d", ret);
return false;
}
return true;
}
bool wt_sess::add(const std::string &key, const std::string &value) {
assert(curs_);
curs_->set_key(curs_, key.c_str());
curs_->set_value(curs_, value.c_str());
int ret = curs_->insert(curs_);
if (ret != 0) {
logger_error("insert %s %s error=%d", key.c_str(), value.c_str(), ret);
curs_->reset(curs_);
return false;
}
curs_->reset(curs_);
return true;
}
bool wt_sess::get(const std::string &key, std::string &value) {
assert(curs_);
curs_->set_key(curs_, key.c_str());
int ret = curs_->search(curs_);
if (ret != 0) {
logger("search key=%s error=%d", key.c_str(), ret);
curs_->reset(curs_);
return false;
}
const char* v;
ret = curs_->get_value(curs_, &v);
if (ret != 0) {
logger_error("get_value key=%s errort=%d", key.c_str(), ret);
curs_->reset(curs_);
return false;
}
value = v;
curs_->reset(curs_);
return true;
}
bool wt_sess::del(const std::string &key) {
assert(curs_);
curs_->set_key(curs_, key.c_str());
int ret = curs_->remove(curs_);
if (ret != 0 && ret != WT_NOTFOUND) {
logger_error("remove key=%s error=%d", key.c_str(), ret);
curs_->reset(curs_);
return false;
}
curs_->reset(curs_);
return true;
}
} // namespace pkv

View File

@ -0,0 +1,33 @@
//
// Created by shuxin ¡¡¡¡zheng on 2023/7/27.
//
#pragma once
#include <wiredtiger.h>
namespace pkv {
class wdb;
class wt_sess {
public:
explicit wt_sess(wdb& db);
~wt_sess();
bool open();
bool add(const std::string& key, const std::string& value);
bool get(const std::string& key, std::string& value);
bool del(const std::string& key);
private:
wdb& db_;
WT_SESSION *sess_;
WT_CURSOR *curs_;
};
} // namespace pkv

View File

@ -4,27 +4,31 @@
#include "master_service.h" #include "master_service.h"
static char *var_cfg_dbpath; static char *var_cfg_dbpath;
static char *var_cfg_dbtype;
acl::master_str_tbl var_conf_str_tab[] = { acl::master_str_tbl var_conf_str_tab[] = {
{ "dbpath", "./dbpath", &var_cfg_dbpath }, { "dbpath", "./dbpath", &var_cfg_dbpath },
{ "dbtype", "rdb", &var_cfg_dbtype },
{ 0, 0, 0 } { 0, 0, 0 }
}; };
static int var_cfg_debug_enable; int var_cfg_disable_serialize;
int var_cfg_disable_save;
acl::master_bool_tbl var_conf_bool_tab[] = { acl::master_bool_tbl var_conf_bool_tab[] = {
{ "debug_enable", 1, &var_cfg_debug_enable }, { "disable_serialize", 0, &var_cfg_disable_serialize },
{ "disable_save", 0, &var_cfg_disable_save },
{ 0, 0, 0 } { 0, 0, 0 }
}; };
static int var_cfg_io_timeout; static int var_cfg_io_timeout;
static int var_cfg_buf_size; static int var_cfg_buf_size;
acl::master_int_tbl var_conf_int_tab[] = { acl::master_int_tbl var_conf_int_tab[] = {
{ "io_timeout", 120, &var_cfg_io_timeout, 0, 0 }, { "io_timeout", 120, &var_cfg_io_timeout, 0, 0 },
{ "buf_size", 8192, &var_cfg_buf_size, 0, 0 }, { "buf_size", 8192, &var_cfg_buf_size, 0, 0 },
{ 0, 0 , 0 , 0, 0 } { 0, 0 , 0 , 0, 0 }
}; };
@ -88,9 +92,21 @@ void master_service::proc_on_listen(acl::server_socket& ss) {
void master_service::proc_on_init() { void master_service::proc_on_init() {
logger(">>>proc_on_init<<<"); logger(">>>proc_on_init<<<");
db_ = db::create_rdb(); if (strcasecmp(var_cfg_dbtype, "rdb") == 0) {
if (!db_->open(var_cfg_dbpath)) { db_ = db::create_rdb();
logger_error("open db(%s) error %s", var_cfg_dbpath, acl::last_serror()); if (!db_->open(var_cfg_dbpath)) {
logger_error("open db(%s) error %s", var_cfg_dbpath,
acl::last_serror());
exit(1);
}
} else if (strcasecmp(var_cfg_dbtype, "wdb") == 0) {
db_ = db::create_wdb();
if (!db_->open(var_cfg_dbpath)) {
logger_error("open db(%s) error", var_cfg_dbpath);
exit(1);
}
} else {
logger_error("unknown dbtype=%s", var_cfg_dbtype);
exit(1); exit(1);
} }
} }

View File

@ -9,7 +9,7 @@ service pkv
# for master_type = unix # for master_type = unix
# master_service = echo.sock # master_service = echo.sock
# for master_type = sock # for master_type = sock
master_service = 127.0.0.1|5001 master_service = 127.0.0.1|19001
# 服务监听为域套接口 # 服务监听为域套接口
# master_service = aio_echo.sock # master_service = aio_echo.sock
@ -123,26 +123,7 @@ service pkv
############################################################################ ############################################################################
# 应用自己的配置选项 # 应用自己的配置选项
# mysql 服务地址
# mysql_dbaddr = /tmp/mysql.sock
# mysql_dbaddr = 10.0.250.199:3306
# 连接 mysql 数据库的连接池的最大值
# mysql_dbmax = 200
# ping mysql 连接的间隔时间, 以秒为单位
# mysql_dbping = 10
# mysql 连接空闲的时间间隔, 以秒为单位
# mysql_dbtimeout = 30
# 数据库名称
# mysql_dbname = fiber_db
# 数据库访问用户
# mysql_dbuser = fiber_user
# 数据库用户访问密码
# mysql_dbpass = 111111
# 是否输出当前内存的状态信息
# debug_mem = 1
# 是否在一个线程中连接读
# loop_read = 1
buf_size = 8192 buf_size = 8192
dbpath = ./data
dbtype = wdb
} }

View File

@ -0,0 +1,11 @@
//
// Created by shuxin   zheng on 2023/7/27
//
#pragma once
#if __cplusplus >= 201703L
# define NODISCARD [[nodiscard]]
#else
# define NODISCARD
#endif

View File

@ -5,29 +5,30 @@
#pragma once #pragma once
#include <vector> #include <vector>
#include "c++_patch.h"
#include "redis_object.h" #include "redis_object.h"
namespace pkv { namespace pkv {
class redis_coder { class redis_coder {
public: public:
redis_coder(size_t cache_max = 10000); explicit redis_coder(size_t cache_max = 10000);
~redis_coder(); ~redis_coder();
const char* update(const char* data, size_t& len); const char* update(const char* data, size_t& len);
[[nodiscard]] const std::vector<redis_object*>& get_objects() const { NODISCARD const std::vector<redis_object*>& get_objects() const {
return objs_; return objs_;
} }
[[nodiscard]] redis_object* get_curr() const { NODISCARD redis_object* get_curr() const {
return curr_; return curr_;
} }
void clear(); void clear();
public: public:
[[nodiscard]] redis_object& create_object(); NODISCARD redis_object& create_object();
bool to_string(std::string& out) const; bool to_string(std::string& out) const;

View File

@ -6,16 +6,13 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "c++_patch.h"
#include "redis_type.h" #include "redis_type.h"
namespace pkv { namespace pkv {
class redis_object;
using shared_redis = std::shared_ptr<redis_object>;
typedef enum { typedef enum {
REDIS_OBJ_UNKOWN, REDIS_OBJ_UNKOWN,
REDIS_OBJ_NIL,
REDIS_OBJ_ERROR, REDIS_OBJ_ERROR,
REDIS_OBJ_STATUS, REDIS_OBJ_STATUS,
REDIS_OBJ_INTEGER, REDIS_OBJ_INTEGER,
@ -37,27 +34,27 @@ public:
public: public:
const char* update(const char* data, size_t& len); const char* update(const char* data, size_t& len);
[[nodiscard]] bool finish() const { NODISCARD bool finish() const {
return status_ == redis_s_finish; return status_ == redis_s_finish;
} }
[[nodiscard]] bool failed() const { NODISCARD bool failed() const {
return status_ == redis_s_null; return status_ == redis_s_null;
} }
[[nodiscard]] int get_status() const { NODISCARD int get_status() const {
return status_; return status_;
} }
[[nodiscard]] redis_obj_t get_type() const { NODISCARD redis_obj_t get_type() const {
return type_; return type_;
} }
[[nodiscard]] const char* get_cmd() const; NODISCARD const char* get_cmd() const;
[[nodiscard]] const char* get_str() const; NODISCARD const char* get_str() const;
[[nodiscard]] const std::vector<redis_object*>& get_objects() const { NODISCARD const std::vector<redis_object*>& get_objects() const {
return objs_; return objs_;
} }
@ -85,7 +82,7 @@ private:
std::vector<redis_object*> objs_; std::vector<redis_object*> objs_;
private: private:
const char* get_line(const char*, size_t&, std::string&, bool&); static const char* get_line(const char*, size_t&, std::string&, bool&);
const char* get_length(const char*, size_t&, int&, bool&); const char* get_length(const char*, size_t&, int&, bool&);
const char* get_data(const char*, size_t&, size_t); const char* get_data(const char*, size_t&, size_t);

View File

@ -60,6 +60,10 @@
#endif // !_WIN32 && !_WIN64 #endif // !_WIN32 && !_WIN64
extern acl::master_str_tbl var_conf_str_tab[]; extern acl::master_str_tbl var_conf_str_tab[];
extern int var_cfg_disable_serialize;
extern int var_cfg_disable_save;
extern acl::master_bool_tbl var_conf_bool_tab[]; extern acl::master_bool_tbl var_conf_bool_tab[];
extern acl::master_int_tbl var_conf_int_tab[]; extern acl::master_int_tbl var_conf_int_tab[];
extern acl::master_int64_tbl var_conf_int64_tab[]; extern acl::master_int64_tbl var_conf_int64_tab[];

View File

@ -9,7 +9,7 @@ CFLAGS = -c -g -W -Wall -Wcast-qual -Wcast-align \
########################################################### ###########################################################
#Check system: #Check system:
# Linux, SunOS, Solaris, BSD variants, AIX, HP-UX # Linux, SunOS, Solaris, BSD variants, AIX, HP-UX
SYSLIB = -lpthread -lcrypt SYSLIB = -lpthread
CHECKSYSRES = @echo "Unknow system type!";exit 1 CHECKSYSRES = @echo "Unknow system type!";exit 1
UNIXNAME = $(shell uname -sm) UNIXNAME = $(shell uname -sm)
@ -67,7 +67,8 @@ CFLAGS += -I. -I../.. -I../../../include -I../../../../lib_acl/include -I../../.
EXTLIBS = EXTLIBS =
CFLAGS += -I/home/zsx/download/db/wiredtiger/github CFLAGS += -I/home/zsx/download/db/wiredtiger/github
#EXTLIBS = -L/home/zsx/download/db/wiredtiger/github/.libs -lwiredtiger -Wl,-rpath=/home/zsx/download/db/wiredtiger/github/.libs #EXTLIBS = -L/home/zsx/download/db/wiredtiger/github/.libs -lwiredtiger -Wl,-rpath=/home/zsx/download/db/wiredtiger/github/.libs
EXTLIBS = /usr/local/lib/libwiredtiger.a #EXTLIBS = /usr/local/lib/libwiredtiger.a
EXTLIBS = -lwiredtiger
SYSLIB += -ldl SYSLIB += -ldl
LDFLAGS = -L../../../lib -l_acl_cpp -L../../../../lib_protocol/lib -l_protocol -L../../../../lib_acl/lib -l_acl \ LDFLAGS = -L../../../lib -l_acl_cpp -L../../../../lib_protocol/lib -l_protocol -L../../../../lib_acl/lib -l_acl \
$(EXTLIBS) $(SYSLIB) $(EXTLIBS) $(SYSLIB)

View File

@ -50,7 +50,7 @@ public:
~wdb_sess(void) {} ~wdb_sess(void) {}
bool open(void) { bool open(void) {
bool ret = db_.get_conn()->open_session(db_.get_conn(), int ret = db_.get_conn()->open_session(db_.get_conn(),
NULL, NULL, &session_); NULL, NULL, &session_);
if (ret != 0) { if (ret != 0) {
printf("open session failed, ret=%d\r\n", ret); printf("open session failed, ret=%d\r\n", ret);

View File

@ -2,20 +2,34 @@
#include "util.h" #include "util.h"
static int __threads_exit = 0; static int __threads_exit = 0;
static acl::string __cmd("del");
static acl::atomic_long __count;
class redis_command { class redis_command {
public: public:
redis_command(acl::redis_client_pipeline& pipeline, const char* key) redis_command(acl::redis_client_pipeline& pipeline)
: key_(key) : cmd_(&pipeline) // Must set pipeline before calling cmd_.get_pipeline_message()
, msg_(cmd_.get_pipeline_message()) , msg_(cmd_.get_pipeline_message())
{ {
cmd_.set_pipeline(&pipeline); cmd_.set_pipeline(&pipeline);
argc_ = 2; msg_.set_option(cmd_.get_dbuf(), 1, NULL);
argv_[0] = "del";
argv_[1] = key_; long long id = __count.fetch_add(1);
acl::string key;
key.format("key-%lld", id);
argv_[0] = __cmd.c_str();
argv_[1] = key;
lens_[0] = strlen(argv_[0]); lens_[0] = strlen(argv_[0]);
lens_[1] = strlen(argv_[1]); lens_[1] = strlen(argv_[1]);
if (__cmd == "set") {
argc_ = 3;
argv_[2] = "value";
lens_[2] = strlen(argv_[2]);
} else {
argc_ = 2;
}
// computer the hash slot for redis cluster node // computer the hash slot for redis cluster node
cmd_.hash_slot(argv_[1]); cmd_.hash_slot(argv_[1]);
@ -36,18 +50,18 @@ public:
} }
private: private:
acl::string key_;
acl::redis cmd_; acl::redis cmd_;
acl::redis_pipeline_message& msg_; acl::redis_pipeline_message& msg_;
size_t argc_; size_t argc_;
const char* argv_[2]; const char* argv_[4];
size_t lens_[2]; size_t lens_[4];
}; };
class test_thread : public acl::thread { class test_thread : public acl::thread {
public: public:
test_thread(acl::locker& locker, acl::redis_client_pipeline& pipeline, test_thread(acl::locker& locker,
acl::redis_client_pipeline& pipeline,
int once_count, int count) int once_count, int count)
: locker_(locker) : locker_(locker)
, pipeline_(pipeline) , pipeline_(pipeline)
@ -61,30 +75,31 @@ public:
protected: protected:
// @override // @override
void* run(void) { void* run(void) {
acl::string key; for (size_t i = 0; i < (size_t) count_; i++) {
run_once();
}
locker_.lock();
__threads_exit++;
locker_.unlock();
return NULL;
}
void run_once(void) {
// parepare for a lot of redis commands in one request // parepare for a lot of redis commands in one request
std::vector<redis_command*> commands; std::vector<redis_command*> commands;
for (size_t i = 0; i < (size_t) once_count_; i++) { for (size_t i = 0; i < (size_t) once_count_; i++) {
key.format("test-key-%d", (int) i); redis_command* command = new redis_command(pipeline_);
redis_command* command = new redis_command(pipeline_, key);
commands.push_back(command); commands.push_back(command);
} }
for (size_t i = 0; i < (size_t) count_; i++) { request(commands);
request(commands);
}
// free all requests commands // free all requests commands
for (std::vector<redis_command*>::iterator it = commands.begin(); for (std::vector<redis_command*>::iterator it = commands.begin();
it != commands.end(); ++it) { it != commands.end(); ++it) {
delete *it; delete *it;
} }
locker_.lock();
__threads_exit++;
locker_.unlock();
return NULL;
} }
private: private:
@ -136,7 +151,6 @@ int main(int argc, char* argv[]) {
int ch, count = 10, once_count = 10; int ch, count = 10, once_count = 10;
int max_threads = 10; int max_threads = 10;
acl::string addr("127.0.0.1:6379"), passwd; acl::string addr("127.0.0.1:6379"), passwd;
acl::string cmd("del");
while ((ch = getopt(argc, argv, "ha:s:N:n:t:p:")) > 0) { while ((ch = getopt(argc, argv, "ha:s:N:n:t:p:")) > 0) {
switch (ch) { switch (ch) {
@ -144,7 +158,7 @@ int main(int argc, char* argv[]) {
usage(argv[0]); usage(argv[0]);
return 0; return 0;
case 'a': case 'a':
cmd = optarg; __cmd = optarg;
break; break;
case 's': case 's':
addr = optarg; addr = optarg;
@ -214,7 +228,7 @@ int main(int argc, char* argv[]) {
long long int total = max_threads * once_count * count; long long int total = max_threads * once_count * count;
double inter = util::stamp_sub(&end, &begin); double inter = util::stamp_sub(&end, &begin);
printf("total %s: %lld, spent: %0.2f ms, speed: %0.2f\r\n", cmd.c_str(), printf("total %s: %lld, spent: %0.2f ms, speed: %0.2f\r\n", __cmd.c_str(),
total, inter, (total * 1000) /(inter > 0 ? inter : 1)); total, inter, (total * 1000) /(inter > 0 ? inter : 1));
pipeline.stop_thread(); pipeline.stop_thread();

View File

@ -12,7 +12,7 @@ static bool test_del(acl::redis_key& cmd, size_t tid, size_t fid, int i)
printf("del key: %s error: %s\r\n", printf("del key: %s error: %s\r\n",
key.c_str(), cmd.result_error()); key.c_str(), cmd.result_error());
return false; return false;
} else if (i < 10) { } else if (i < 1) {
printf("del ok, key: %s\r\n", key.c_str()); printf("del ok, key: %s\r\n", key.c_str());
} }
return true; return true;
@ -27,7 +27,7 @@ static bool test_expire(acl::redis_key& cmd, size_t tid, size_t fid, int i)
printf("expire key: %s error: %s\r\n", printf("expire key: %s error: %s\r\n",
key.c_str(), cmd.result_error()); key.c_str(), cmd.result_error());
return false; return false;
} else if (i < 10) { } else if (i < 1) {
printf("expire ok, key: %s\r\n", key.c_str()); printf("expire ok, key: %s\r\n", key.c_str());
} }
@ -44,7 +44,7 @@ static bool test_ttl(acl::redis_key& cmd, size_t tid, size_t fid, int i)
printf("get ttl key: %s error: %s\r\n", printf("get ttl key: %s error: %s\r\n",
key.c_str(), cmd.result_error()); key.c_str(), cmd.result_error());
return false; return false;
} else if (i < 10) { } else if (i < 1) {
printf("ttl ok, key: %s, ttl: %d\r\n", key.c_str(), ttl); printf("ttl ok, key: %s, ttl: %d\r\n", key.c_str(), ttl);
} }
return true; return true;
@ -56,11 +56,11 @@ static bool test_exists(acl::redis_key& cmd, size_t tid, size_t fid, int i)
key.format("%s_%zd_%zd_%d", __keypre.c_str(), tid, fid, i); key.format("%s_%zd_%zd_%d", __keypre.c_str(), tid, fid, i);
if (!cmd.exists(key.c_str())) { if (!cmd.exists(key.c_str())) {
if (i < 10) { if (i < 1) {
printf("no exists key: %s\r\n", key.c_str()); printf("no exists key: %s\r\n", key.c_str());
} }
} else { } else {
if (i < 10) { if (i < 1) {
printf("exists key: %s\r\n", key.c_str()); printf("exists key: %s\r\n", key.c_str());
} }
} }
@ -76,7 +76,7 @@ static bool test_type(acl::redis_key& cmd, size_t tid, size_t fid, int i)
if (ret == acl::REDIS_KEY_NONE) { if (ret == acl::REDIS_KEY_NONE) {
printf("unknown type key: %s\r\n", key.c_str()); printf("unknown type key: %s\r\n", key.c_str());
return false; return false;
} else if (i < 10) { } else if (i < 1) {
printf("type ok, key: %s, ret: %d\r\n", key.c_str(), ret); printf("type ok, key: %s, ret: %d\r\n", key.c_str(), ret);
} }
return true; return true;
@ -92,7 +92,7 @@ static bool test_set(acl::redis_string& cmd, size_t tid, size_t fid, int i)
bool ret = cmd.set(key.c_str(), value.c_str()); bool ret = cmd.set(key.c_str(), value.c_str());
return ret; return ret;
if (i < 10) { if (i < 1) {
printf("set key: %s, value: %s %s\r\n", key.c_str(), printf("set key: %s, value: %s %s\r\n", key.c_str(),
value.c_str(), ret ? "ok" : "error"); value.c_str(), ret ? "ok" : "error");
} }
@ -107,7 +107,7 @@ static bool test_get(acl::redis_string& cmd, size_t tid, size_t fid, int i)
acl::string value; acl::string value;
bool ret = cmd.get(key.c_str(), value); bool ret = cmd.get(key.c_str(), value);
if (i < 10) { if (i < 1) {
printf("get key: %s, value: %s %s, len: %d\r\n", printf("get key: %s, value: %s %s, len: %d\r\n",
key.c_str(), value.c_str(), ret ? "ok" : "error", key.c_str(), value.c_str(), ret ? "ok" : "error",
(int) value.length()); (int) value.length());
@ -152,7 +152,7 @@ static bool test_hmget(acl::redis_hash& cmd, size_t tid, size_t fid, int i)
return false; return false;
} }
if (i < 10) { if (i < 1) {
printf("key=%s:", key.c_str()); printf("key=%s:", key.c_str());
assert(names.size() == values.size()); assert(names.size() == values.size());
size_t n = names.size(); size_t n = names.size();