test redis_pipeline

This commit is contained in:
shuxin   zheng 2021-01-05 19:35:52 +08:00
parent 96cab01c5a
commit 73a4620896

View File

@ -45,6 +45,7 @@ bool redis_pipeline_channel::start_thread(void)
void redis_pipeline_channel::push(redis_pipeline_message* msg)
{
msgs_.push_back(msg);
#ifdef USE_MBOX
box_.push(msg);
#else
@ -54,10 +55,15 @@ void redis_pipeline_channel::push(redis_pipeline_message* msg)
void redis_pipeline_channel::flush(void)
{
#ifdef USE_MBOX
box_.push(&message_flush_);
#if 1
flush_all();
msgs_.clear();
#else
# ifdef USE_MBOX
box_.push(&message_flush_);
# else
box_.push(&message_flush_, false);
# endif
#endif
}
@ -155,12 +161,21 @@ void redis_pipeline_channel::wait_one(socket_stream& conn,
void* redis_pipeline_channel::run(void)
{
socket_stream* conn = client_->get_stream();
if (conn == NULL) {
printf("conn null\n");
return NULL;
}
while (!client_->eof()) {
redis_pipeline_message* msg = box_.pop();
if (msg == NULL) {
break;
}
#if 1
wait_one(*conn, *msg);
#else
switch (msg->get_type()) {
case redis_pipeline_t_cmd:
msgs_.push_back(msg);
@ -169,14 +184,12 @@ void* redis_pipeline_channel::run(void)
flush_all();
wait_all();
break;
case redis_pipeline_t_stop:
goto END;
default:
break;
}
#endif
}
END:
return NULL;
}