test redis_pipeline

This commit is contained in:
shuxin   zheng 2021-01-06 18:17:20 +08:00
parent 66942cf5ce
commit 333d814497
2 changed files with 39 additions and 53 deletions

View File

@ -145,12 +145,12 @@ private:
std::vector<redis_pipeline_message*> msgs_;
public:
void push(redis_pipeline_message* msg);
void flush(void);
private:
bool flush_all(void);
void wait_all(void);
void wait_one(socket_stream& conn, redis_pipeline_message& msg);
bool wait_all(void);
bool wait_one(socket_stream& conn, redis_pipeline_message& msg);
void all_failed(void);
};
class ACL_CPP_API redis_client_pipeline : public thread {
@ -192,7 +192,6 @@ private:
std::vector<char*> addrs_;
const char** slot_addrs_;
void flush_all(void);
void set_slot(size_t slot, const char* addr);
void set_all_slot(void);
void start_channels(void);

View File

@ -43,23 +43,11 @@ bool redis_pipeline_channel::start_thread(void)
return true;
}
#define FLUSH_ALONE
void redis_pipeline_channel::push(redis_pipeline_message* msg)
{
#ifndef FLUSH_ALONE
msgs_.push_back(msg);
#endif
box_.push(msg, false);
}
void redis_pipeline_channel::flush(void)
{
flush_all();
msgs_.clear();
}
bool redis_pipeline_channel::flush_all(void)
{
if (msgs_.empty()) {
@ -92,7 +80,7 @@ bool redis_pipeline_channel::flush_all(void)
return true;
}
void redis_pipeline_channel::wait_one(socket_stream& conn,
bool redis_pipeline_channel::wait_one(socket_stream& conn,
redis_pipeline_message& msg)
{
dbuf_pool* dbuf = msg.get_cmd()->get_dbuf();
@ -108,10 +96,14 @@ void redis_pipeline_channel::wait_one(socket_stream& conn,
} else {
result = client_->get_object(conn, dbuf);
}
if (result == NULL) {
return false;
}
int type = result->get_type();
if (type == REDIS_RESULT_UNKOWN || type != REDIS_RESULT_ERROR) {
msg.push(result);
return;
return true;
}
#define EQ(x, y) !strncasecmp((x), (y), sizeof(y) -1)
@ -134,31 +126,44 @@ void redis_pipeline_channel::wait_one(socket_stream& conn,
} else {
msg.push(result);
}
return true;
}
void redis_pipeline_channel::wait_all(void)
void redis_pipeline_channel::all_failed()
{
socket_stream* conn = client_->get_stream();
if (conn == NULL) {
printf("get_stream null\r\n");
return;
}
for (std::vector<redis_pipeline_message*>::iterator it = msgs_.begin();
it != msgs_.end(); ++it) {
wait_one(*conn, **it);
std::vector<redis_pipeline_message*>::iterator it;
for (it = msgs_.begin(); it != msgs_.end(); ++it) {
(*it)->push(NULL);
}
msgs_.clear();
}
void* redis_pipeline_channel::run(void)
bool redis_pipeline_channel::wait_all(void)
{
socket_stream* conn = client_->get_stream();
if (conn == NULL) {
printf("conn null\n");
return NULL;
logger_error("get_stream null");
all_failed();
return false;
}
std::vector<redis_pipeline_message*>::iterator it;
for (it = msgs_.begin(); it != msgs_.end(); ++it) {
if (!wait_one(*conn, **it)) {
break;
}
}
for (; it != msgs_.end(); ++it) {
(*it)->push(NULL);
}
msgs_.clear();
return true;
}
void* redis_pipeline_channel::run(void)
{
bool success;
int timeout = -1;
@ -169,16 +174,15 @@ void* redis_pipeline_channel::run(void)
break;
}
timeout = -1;
#ifdef FLUSH_ALONE
flush_all();
wait_all();
#endif
if (!flush_all() || !wait_all()) {
logger_error("failed ...");
break;
}
continue;
}
timeout = 0;
#ifdef FLUSH_ALONE
switch (msg->get_type()) {
case redis_pipeline_t_cmd:
msgs_.push_back(msg);
@ -186,9 +190,6 @@ void* redis_pipeline_channel::run(void)
default:
break;
}
#else
wait_one(*conn, *msg);
#endif
}
return NULL;
@ -308,9 +309,6 @@ void* redis_client_pipeline::run(void)
break;
} else {
timeout = -1;
#ifndef FLUSH_ALONE
flush_all();
#endif
}
}
@ -318,17 +316,6 @@ void* redis_client_pipeline::run(void)
return NULL;
}
void redis_client_pipeline::flush_all(void)
{
const token_node* iter = channels_->first_node();
while (iter) {
redis_pipeline_channel* channel = (redis_pipeline_channel*)
iter->get_ctx();
channel->flush();
iter = channels_->next_node();
}
}
void redis_client_pipeline::set_slot(size_t slot, const char* addr)
{
if (slot >= max_slot_ || addr == NULL || *addr == 0) {