optimize and test redis_pipeline

This commit is contained in:
shuxin   zheng 2021-01-06 14:35:12 +08:00
parent d811990ade
commit 66942cf5ce
4 changed files with 30 additions and 61 deletions

View File

@ -137,9 +137,9 @@ void *acl_mbox_read(ACL_MBOX *mbox, int timeout, int *success)
mbox->nread++;
#ifdef ACL_UNIX
if (timeout >= 0 && acl_read_poll_wait(mbox->in, timeout) < 0)
if (timeout > 0 && acl_read_poll_wait(mbox->in, timeout) < 0)
#else
if (timeout >= 0 && acl_read_select_wait(mbox->in, timeout) < 0)
if (timeout > 0 && acl_read_select_wait(mbox->in, timeout) < 0)
#endif
{
if (acl_last_error() == ACL_ETIMEDOUT) {

View File

@ -23,7 +23,6 @@ class redis_client;
typedef enum {
redis_pipeline_t_cmd,
redis_pipeline_t_flush,
redis_pipeline_t_stop,
} redis_pipeline_type_t;
@ -125,8 +124,6 @@ public:
const char* addr, int conn_timeout, int rw_timeout, bool retry);
~redis_pipeline_channel(void);
void push(redis_pipeline_message* msg);
bool start_thread(void);
public:
@ -141,15 +138,14 @@ protected:
private:
redis_client_pipeline& pipeline_;
redis_pipeline_message message_flush_;
string addr_;
string buf_;
redis_client* client_;
BOX<redis_pipeline_message> box_;
std::vector<redis_pipeline_message*> msgs_;
public:
void push(redis_pipeline_message* msg);
void flush(void);
bool has_messages_;
private:
bool flush_all(void);
@ -162,10 +158,11 @@ public:
redis_client_pipeline(const char* addr);
~redis_client_pipeline(void);
void push(redis_pipeline_message* msg);
const redis_result* run(redis_pipeline_message& msg);
public:
void push(redis_pipeline_message* msg);
public:
redis_client_pipeline& set_password(const char* passwd);
redis_client_pipeline& set_timeout(int conn_timeout, int rw_timeout);

View File

@ -72,10 +72,12 @@ public:
/**
*
* @param t {T*}
* @param dummy {bool} tbox
* @return {bool}
*/
bool push(T* t)
bool push(T* t, bool dummy = false)
{
(void) dummy;
return mbox_send(mbox_, t);
}

View File

@ -14,10 +14,8 @@ namespace acl {
redis_pipeline_channel::redis_pipeline_channel(redis_client_pipeline& pipeline,
const char* addr, int conn_timeout, int rw_timeout, bool retry)
: pipeline_(pipeline)
, message_flush_(NULL, redis_pipeline_t_flush)
, addr_(addr)
, buf_(81920)
, has_messages_(false)
{
client_ = NEW redis_client(addr, conn_timeout, rw_timeout, retry);
}
@ -49,36 +47,17 @@ bool redis_pipeline_channel::start_thread(void)
void redis_pipeline_channel::push(redis_pipeline_message* msg)
{
#ifdef FLUSH_ALONE
if (!has_messages_) {
has_messages_ = true;
}
#else
#ifndef FLUSH_ALONE
msgs_.push_back(msg);
#endif
#ifdef USE_MBOX
box_.push(msg);
#else
box_.push(msg, false);
#endif
}
void redis_pipeline_channel::flush(void)
{
#ifdef FLUSH_ALONE
if (has_messages_) {
# ifdef USE_MBOX
box_.push(&message_flush_);
# else
box_.push(&message_flush_, false);
# endif
has_messages_ = false;
}
#else
flush_all();
msgs_.clear();
#endif
}
bool redis_pipeline_channel::flush_all(void)
@ -113,21 +92,6 @@ bool redis_pipeline_channel::flush_all(void)
return true;
}
void redis_pipeline_channel::wait_all(void)
{
socket_stream* conn = client_->get_stream();
if (conn == NULL) {
printf("get_stream null\r\n");
return;
}
for (std::vector<redis_pipeline_message*>::iterator it = msgs_.begin();
it != msgs_.end(); ++it) {
wait_one(*conn, **it);
}
msgs_.clear();
}
void redis_pipeline_channel::wait_one(socket_stream& conn,
redis_pipeline_message& msg)
{
@ -172,6 +136,21 @@ void redis_pipeline_channel::wait_one(socket_stream& conn,
}
}
void redis_pipeline_channel::wait_all(void)
{
socket_stream* conn = client_->get_stream();
if (conn == NULL) {
printf("get_stream null\r\n");
return;
}
for (std::vector<redis_pipeline_message*>::iterator it = msgs_.begin();
it != msgs_.end(); ++it) {
wait_one(*conn, **it);
}
msgs_.clear();
}
void* redis_pipeline_channel::run(void)
{
socket_stream* conn = client_->get_stream();
@ -190,8 +169,10 @@ void* redis_pipeline_channel::run(void)
break;
}
timeout = -1;
#ifdef FLUSH_ALONE
flush_all();
wait_all();
#endif
continue;
}
@ -202,10 +183,6 @@ void* redis_pipeline_channel::run(void)
case redis_pipeline_t_cmd:
msgs_.push_back(msg);
break;
case redis_pipeline_t_flush:
flush_all();
wait_all();
break;
default:
break;
}
@ -277,22 +254,13 @@ redis_client_pipeline & redis_client_pipeline::set_max_slot(size_t max_slot)
const redis_result* redis_client_pipeline::run(redis_pipeline_message& msg)
{
#ifdef USE_MBOX
box_.push(&msg);
#else
box_.push(&msg, false);
#endif
return msg.wait();
}
void redis_client_pipeline::push(redis_pipeline_message *msg)
{
#ifdef USE_MBOX
box_.push(msg);
#else
box_.push(msg, false);
#endif
}
void* redis_client_pipeline::run(void)
@ -340,7 +308,9 @@ void* redis_client_pipeline::run(void)
break;
} else {
timeout = -1;
//flush_all();
#ifndef FLUSH_ALONE
flush_all();
#endif
}
}