optimize redis_pipeline

This commit is contained in:
zhengshuxin 2021-01-02 20:48:36 +08:00
parent 91a9d26c69
commit 6554585b29
3 changed files with 22 additions and 4 deletions

View File

@ -107,6 +107,7 @@ public:
const redis_result* run(redis_pipeline_message& msg); const redis_result* run(redis_pipeline_message& msg);
public: public:
redis_client_pipeline& set_password(const char* passwd);
redis_client_pipeline& set_timeout(int conn_timeout, int rw_timeout); redis_client_pipeline& set_timeout(int conn_timeout, int rw_timeout);
redis_client_pipeline& set_retry(bool on); redis_client_pipeline& set_retry(bool on);
redis_client_pipeline& set_channels(size_t n); redis_client_pipeline& set_channels(size_t n);

View File

@ -33,13 +33,13 @@ redis_pipeline_channel & redis_pipeline_channel::set_passwd(const char *passwd)
bool redis_pipeline_channel::start_thread(void) bool redis_pipeline_channel::start_thread(void)
{ {
if (!passwd_.empty()) {
conn_->set_password(passwd_);
}
if (!((connect_client*) conn_)->open()) { if (!((connect_client*) conn_)->open()) {
logger_error("open %s error %s", addr_.c_str(), last_serror()); logger_error("open %s error %s", addr_.c_str(), last_serror());
return false; return false;
} }
if (!passwd_.empty()) {
conn_->set_password(passwd_);
}
this->start(); this->start();
return true; return true;
} }
@ -173,6 +173,14 @@ redis_client_pipeline& redis_client_pipeline::set_channels(size_t n)
return *this; return *this;
} }
redis_client_pipeline& redis_client_pipeline::set_password(const char* passwd)
{
if (passwd && *passwd) {
passwd_ = passwd;
}
return *this;
}
redis_client_pipeline & redis_client_pipeline::set_max_slot(size_t max_slot) { redis_client_pipeline & redis_client_pipeline::set_max_slot(size_t max_slot) {
max_slot_ = max_slot; max_slot_ = max_slot;
return *this; return *this;
@ -324,6 +332,9 @@ void redis_client_pipeline::start_channels(void) {
cit != addrs_.end(); ++cit) { cit != addrs_.end(); ++cit) {
redis_pipeline_channel* channel = NEW redis_pipeline_channel( redis_pipeline_channel* channel = NEW redis_pipeline_channel(
*cit, conn_timeout_, rw_timeout_, retry_); *cit, conn_timeout_, rw_timeout_, retry_);
if (!passwd_.empty()) {
channel->set_passwd(passwd_);
}
if (channel->start_thread()) { if (channel->start_thread()) {
channels_->insert(*cit, channel); channels_->insert(*cit, channel);
} else { } else {
@ -337,6 +348,9 @@ void redis_client_pipeline::start_channels(void) {
redis_pipeline_channel* channel = NEW redis_pipeline_channel( redis_pipeline_channel* channel = NEW redis_pipeline_channel(
addr_, conn_timeout_, rw_timeout_, retry_); addr_, conn_timeout_, rw_timeout_, retry_);
if (!passwd_.empty()) {
channel->set_passwd(passwd_);
}
if (channel->start_thread()) { if (channel->start_thread()) {
channels_->insert(addr_, channel); channels_->insert(addr_, channel);
} else { } else {

View File

@ -191,7 +191,7 @@ protected:
break; break;
} }
if (i > 0 && i % 1000 == 0) { if (i > 0 && i % 5000 == 0) {
char tmp[128]; char tmp[128];
acl::safe_snprintf(tmp, sizeof(tmp), "%d", i); acl::safe_snprintf(tmp, sizeof(tmp), "%d", i);
acl::meter_time(__FILE__, __LINE__, tmp); acl::meter_time(__FILE__, __LINE__, tmp);
@ -319,6 +319,9 @@ int main(int argc, char* argv[])
acl::redis_client_pipeline pipeline(addrs); acl::redis_client_pipeline pipeline(addrs);
pipeline.set_timeout(conn_timeout, rw_timeout) pipeline.set_timeout(conn_timeout, rw_timeout)
.set_channels((size_t) pipe_channels); .set_channels((size_t) pipe_channels);
if (!passwd.empty()) {
pipeline.set_password(passwd);
}
pipeline.start(); pipeline.start();
struct timeval begin; struct timeval begin;