From 66942cf5ce55d7e96dbe01f1feadffa5ef127cd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?shuxin=20=E3=80=80=E3=80=80zheng?= Date: Wed, 6 Jan 2021 14:35:12 +0800 Subject: [PATCH] optimize and test redis_pipeline --- lib_acl/src/stdlib/acl_mbox.c | 4 +- .../acl_cpp/redis/redis_client_pipeline.hpp | 11 ++- lib_acl_cpp/include/acl_cpp/stdlib/mbox.hpp | 4 +- .../src/redis/redis_client_pipeline.cpp | 72 ++++++------------- 4 files changed, 30 insertions(+), 61 deletions(-) diff --git a/lib_acl/src/stdlib/acl_mbox.c b/lib_acl/src/stdlib/acl_mbox.c index 57bc440fe..f22657a63 100644 --- a/lib_acl/src/stdlib/acl_mbox.c +++ b/lib_acl/src/stdlib/acl_mbox.c @@ -137,9 +137,9 @@ void *acl_mbox_read(ACL_MBOX *mbox, int timeout, int *success) mbox->nread++; #ifdef ACL_UNIX - if (timeout >= 0 && acl_read_poll_wait(mbox->in, timeout) < 0) + if (timeout > 0 && acl_read_poll_wait(mbox->in, timeout) < 0) #else - if (timeout >= 0 && acl_read_select_wait(mbox->in, timeout) < 0) + if (timeout > 0 && acl_read_select_wait(mbox->in, timeout) < 0) #endif { if (acl_last_error() == ACL_ETIMEDOUT) { diff --git a/lib_acl_cpp/include/acl_cpp/redis/redis_client_pipeline.hpp b/lib_acl_cpp/include/acl_cpp/redis/redis_client_pipeline.hpp index 1e05ab6e4..8fe84160c 100644 --- a/lib_acl_cpp/include/acl_cpp/redis/redis_client_pipeline.hpp +++ b/lib_acl_cpp/include/acl_cpp/redis/redis_client_pipeline.hpp @@ -23,7 +23,6 @@ class redis_client; typedef enum { redis_pipeline_t_cmd, - redis_pipeline_t_flush, redis_pipeline_t_stop, } redis_pipeline_type_t; @@ -125,8 +124,6 @@ public: const char* addr, int conn_timeout, int rw_timeout, bool retry); ~redis_pipeline_channel(void); - void push(redis_pipeline_message* msg); - bool start_thread(void); public: @@ -141,15 +138,14 @@ protected: private: redis_client_pipeline& pipeline_; - redis_pipeline_message message_flush_; string addr_; string buf_; redis_client* client_; BOX box_; std::vector msgs_; public: + void push(redis_pipeline_message* msg); void flush(void); - bool has_messages_; private: bool flush_all(void); @@ -162,10 +158,11 @@ public: redis_client_pipeline(const char* addr); ~redis_client_pipeline(void); - void push(redis_pipeline_message* msg); - const redis_result* run(redis_pipeline_message& msg); +public: + void push(redis_pipeline_message* msg); + public: redis_client_pipeline& set_password(const char* passwd); redis_client_pipeline& set_timeout(int conn_timeout, int rw_timeout); diff --git a/lib_acl_cpp/include/acl_cpp/stdlib/mbox.hpp b/lib_acl_cpp/include/acl_cpp/stdlib/mbox.hpp index cf45c8d54..390a2d31a 100644 --- a/lib_acl_cpp/include/acl_cpp/stdlib/mbox.hpp +++ b/lib_acl_cpp/include/acl_cpp/stdlib/mbox.hpp @@ -72,10 +72,12 @@ public: /** * 发送消息对象 * @param t {T*} 非空消息对象 + * @param dummy {bool} 目前无任何用处,仅是为了与 tbox 接口一致 * @return {bool} 发送是否成功 */ - bool push(T* t) + bool push(T* t, bool dummy = false) { + (void) dummy; return mbox_send(mbox_, t); } diff --git a/lib_acl_cpp/src/redis/redis_client_pipeline.cpp b/lib_acl_cpp/src/redis/redis_client_pipeline.cpp index 65f92643d..4178ea855 100644 --- a/lib_acl_cpp/src/redis/redis_client_pipeline.cpp +++ b/lib_acl_cpp/src/redis/redis_client_pipeline.cpp @@ -14,10 +14,8 @@ namespace acl { redis_pipeline_channel::redis_pipeline_channel(redis_client_pipeline& pipeline, const char* addr, int conn_timeout, int rw_timeout, bool retry) : pipeline_(pipeline) -, message_flush_(NULL, redis_pipeline_t_flush) , addr_(addr) , buf_(81920) -, has_messages_(false) { client_ = NEW redis_client(addr, conn_timeout, rw_timeout, retry); } @@ -49,36 +47,17 @@ bool redis_pipeline_channel::start_thread(void) void redis_pipeline_channel::push(redis_pipeline_message* msg) { -#ifdef FLUSH_ALONE - if (!has_messages_) { - has_messages_ = true; - } -#else +#ifndef FLUSH_ALONE msgs_.push_back(msg); #endif -#ifdef USE_MBOX - box_.push(msg); -#else box_.push(msg, false); -#endif } void redis_pipeline_channel::flush(void) { -#ifdef FLUSH_ALONE - if (has_messages_) { -# ifdef USE_MBOX - box_.push(&message_flush_); -# else - box_.push(&message_flush_, false); -# endif - has_messages_ = false; - } -#else flush_all(); msgs_.clear(); -#endif } bool redis_pipeline_channel::flush_all(void) @@ -113,21 +92,6 @@ bool redis_pipeline_channel::flush_all(void) return true; } -void redis_pipeline_channel::wait_all(void) -{ - socket_stream* conn = client_->get_stream(); - if (conn == NULL) { - printf("get_stream null\r\n"); - return; - } - - for (std::vector::iterator it = msgs_.begin(); - it != msgs_.end(); ++it) { - wait_one(*conn, **it); - } - msgs_.clear(); -} - void redis_pipeline_channel::wait_one(socket_stream& conn, redis_pipeline_message& msg) { @@ -172,6 +136,21 @@ void redis_pipeline_channel::wait_one(socket_stream& conn, } } +void redis_pipeline_channel::wait_all(void) +{ + socket_stream* conn = client_->get_stream(); + if (conn == NULL) { + printf("get_stream null\r\n"); + return; + } + + for (std::vector::iterator it = msgs_.begin(); + it != msgs_.end(); ++it) { + wait_one(*conn, **it); + } + msgs_.clear(); +} + void* redis_pipeline_channel::run(void) { socket_stream* conn = client_->get_stream(); @@ -190,8 +169,10 @@ void* redis_pipeline_channel::run(void) break; } timeout = -1; +#ifdef FLUSH_ALONE flush_all(); wait_all(); +#endif continue; } @@ -202,10 +183,6 @@ void* redis_pipeline_channel::run(void) case redis_pipeline_t_cmd: msgs_.push_back(msg); break; - case redis_pipeline_t_flush: - flush_all(); - wait_all(); - break; default: break; } @@ -277,22 +254,13 @@ redis_client_pipeline & redis_client_pipeline::set_max_slot(size_t max_slot) const redis_result* redis_client_pipeline::run(redis_pipeline_message& msg) { -#ifdef USE_MBOX - box_.push(&msg); -#else box_.push(&msg, false); -#endif - return msg.wait(); } void redis_client_pipeline::push(redis_pipeline_message *msg) { -#ifdef USE_MBOX - box_.push(msg); -#else box_.push(msg, false); -#endif } void* redis_client_pipeline::run(void) @@ -340,7 +308,9 @@ void* redis_client_pipeline::run(void) break; } else { timeout = -1; - //flush_all(); +#ifndef FLUSH_ALONE + flush_all(); +#endif } }