optimize redis pipeline mode

This commit is contained in:
zhengshuxin 2021-01-31 15:43:29 +08:00
parent b63287774d
commit f5bc676fbd
4 changed files with 70 additions and 26 deletions

View File

@ -34,21 +34,32 @@ typedef enum {
*/
class redis_pipeline_message {
public:
redis_pipeline_message(redis_command* cmd, redis_pipeline_type_t type)
redis_pipeline_message(redis_command* cmd, redis_pipeline_type_t type,
bool use_mbox = true)
: cmd_(cmd)
, type_(type)
, nchild_(0)
, timeout_(NULL)
, box_(false, false)
, result_(NULL)
, addr_(NULL)
, redirect_count_(0)
, argc_(0)
, argv_(NULL)
, lens_(NULL)
{}
{
if (use_mbox) {
mbox_ = new mbox<redis_pipeline_message>(false, false);
tbox_ = NULL;
} else {
tbox_ = new tbox<redis_pipeline_message>(false);
mbox_ = NULL;
}
}
~redis_pipeline_message(void) {}
~redis_pipeline_message(void) {
delete mbox_;
delete tbox_;
}
redis_pipeline_message& set_type(redis_pipeline_type_t type) {
type_ = type;
@ -94,11 +105,19 @@ public:
void push(const redis_result* result) {
result_ = result;
box_.push(this);
if (mbox_) {
mbox_->push(this, false);
} else {
tbox_->push(this, false);
}
}
const redis_result* wait(void) {
box_.pop();
if (mbox_) {
mbox_->pop();
} else {
tbox_->pop();
}
return result_;
}
@ -115,7 +134,8 @@ private:
redis_pipeline_type_t type_;
size_t nchild_;
int* timeout_;
mbox<redis_pipeline_message> box_;
mbox<redis_pipeline_message>* mbox_;
tbox<redis_pipeline_message>* tbox_;
const redis_result* result_;
const char* addr_;

View File

@ -145,7 +145,20 @@ public:
return cluster_;
}
void set_pipeline(redis_client_pipeline* pipeline);
/**
* pipeline通信对象使pipeline模式
* set the redis communication in pipeline mode
* @param pipline {redis_client_pipeline*} pipline communication object
* @param use_mbox {bool} if using acl::mbox to get the result,
* acl::tbox will be used if use_mbox is set to false; acl::mbox
* will be used as default internal.
*/
void set_pipeline(redis_client_pipeline* pipeline, bool use_mbox = true);
/**
* get the redis pipline communication object been set before
* @return {redis_client_pipeline*} return NULL if not set
*/
redis_client_pipeline* get_pipeline(void) const
{
return pipeline_;
@ -467,6 +480,7 @@ protected:
redis_client* conn_;
redis_client_cluster* cluster_;
redis_client_pipeline* pipeline_;
bool pipe_use_mbox_;
int slot_;
int redirect_max_;
int redirect_sleep_;

View File

@ -3,7 +3,7 @@
static acl::string __keypre("test_key_cluster");
static bool test_del(acl::redis_key& cmd, int i)
static bool test_del(acl::redis& cmd, int i)
{
cmd.clear();
@ -21,7 +21,7 @@ static bool test_del(acl::redis_key& cmd, int i)
return true;
}
static bool test_expire(acl::redis_key& cmd, int i)
static bool test_expire(acl::redis& cmd, int i)
{
cmd.clear();
@ -39,7 +39,7 @@ static bool test_expire(acl::redis_key& cmd, int i)
return true;
}
static bool test_ttl(acl::redis_key& cmd, int i)
static bool test_ttl(acl::redis& cmd, int i)
{
cmd.clear();
@ -57,7 +57,7 @@ static bool test_ttl(acl::redis_key& cmd, int i)
return true;
}
static bool test_exists(acl::redis_key& cmd, int i)
static bool test_exists(acl::redis& cmd, int i)
{
cmd.clear();
@ -76,7 +76,7 @@ static bool test_exists(acl::redis_key& cmd, int i)
return true;
}
static bool test_type(acl::redis_key& cmd, int i)
static bool test_type(acl::redis& cmd, int i)
{
cmd.clear();
@ -93,7 +93,7 @@ static bool test_type(acl::redis_key& cmd, int i)
return true;
}
static bool test_set(acl::redis_string& cmd, int i)
static bool test_set(acl::redis& cmd, int i)
{
cmd.clear();
@ -112,7 +112,7 @@ static bool test_set(acl::redis_string& cmd, int i)
return ret;
}
static bool test_get(acl::redis_string& cmd, int i)
static bool test_get(acl::redis& cmd, int i)
{
cmd.clear();
@ -136,9 +136,10 @@ class test_thread : public acl::thread
{
public:
test_thread(acl::locker& locker, acl::redis_client_pipeline& conns,
const char* cmd, int n)
bool pipeline_use_mbox, const char* cmd, int n)
: locker_(locker)
, conns_(conns)
, pipeline_use_mbox_(pipeline_use_mbox)
, cmd_(cmd)
, n_(n)
{}
@ -150,11 +151,11 @@ protected:
void* run(void)
{
bool ret;
acl::redis_key cmd_key;
cmd_key.set_pipeline(&conns_);
acl::redis cmd_key;
cmd_key.set_pipeline(&conns_, pipeline_use_mbox_);
acl::redis_string cmd_string;
cmd_string.set_pipeline(&conns_);
acl::redis cmd_string;
cmd_string.set_pipeline(&conns_, pipeline_use_mbox_);
for (int i = 0; i < n_; i++) {
if (cmd_ == "set") {
@ -216,8 +217,9 @@ protected:
private:
acl::locker& locker_;
acl::redis_client_pipeline& conns_;
bool pipeline_use_mbox_;
acl::string cmd_;
int n_;
int n_;
};
static void usage(const char* procname)
@ -228,6 +230,7 @@ static void usage(const char* procname)
"-t max_threads[default: 10]\r\n"
"-r retry_for_cluster_resnum[default: 10]\r\n"
"-p password [set the password of redis cluster]\r\n"
"-m [if use mbox in pipeline mode, default: false]\r\n"
"-a cmd[set|get|expire|ttl|exists|type|del]\r\n",
procname);
}
@ -236,9 +239,10 @@ int main(int argc, char* argv[])
{
int ch, n = 1;
int max_threads = 10;
bool pipeline_use_mbox = false;
acl::string addr("127.0.0.1:6379"), cmd("del"), passwd;
while ((ch = getopt(argc, argv, "hs:n:t:a:p:")) > 0) {
while ((ch = getopt(argc, argv, "hs:n:t:a:p:m")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
@ -257,7 +261,10 @@ int main(int argc, char* argv[])
break;
case 'p':
passwd = optarg;
break;;
break;
case 'm':
pipeline_use_mbox = true;
break;
default:
break;
}
@ -280,7 +287,7 @@ int main(int argc, char* argv[])
std::vector<test_thread*> threads;
for (int i = 0; i < max_threads; i++) {
test_thread* thread = new test_thread(locker, pipeline,
cmd.c_str(), n);
pipeline_use_mbox, cmd.c_str(), n);
threads.push_back(thread);
thread->set_detachable(true);
thread->start();

View File

@ -38,6 +38,7 @@ void redis_command::init(void)
slice_res_ = false;
result_ = NULL;
pipe_msg_ = NULL;
pipe_use_mbox_ = true;
addr_[0] = 0;
dbuf_ = new dbuf_pool();
}
@ -184,9 +185,11 @@ void redis_command::set_cluster(redis_client_cluster* cluster)
redirect_sleep_ = cluster->get_redirect_sleep();
}
void redis_command::set_pipeline(redis_client_pipeline* pipeline)
void redis_command::set_pipeline(redis_client_pipeline* pipeline,
bool use_mbox /* true */)
{
pipeline_ = pipeline;
pipe_use_mbox_ = use_mbox;
}
// 分析重定向信息,获得重定向的服务器地址
@ -219,7 +222,7 @@ redis_pipeline_message& redis_command::get_pipeline_message(void)
{
if (pipe_msg_ == NULL) {
pipe_msg_ = NEW redis_pipeline_message(
this, redis_pipeline_t_cmd);
this, redis_pipeline_t_cmd, pipe_use_mbox_);
}
return *pipe_msg_;
}