diff --git a/lib_acl_cpp/include/acl_cpp/connpool/connect_pool.hpp b/lib_acl_cpp/include/acl_cpp/connpool/connect_pool.hpp index 83f2b8147..be1d17739 100644 --- a/lib_acl_cpp/include/acl_cpp/connpool/connect_pool.hpp +++ b/lib_acl_cpp/include/acl_cpp/connpool/connect_pool.hpp @@ -209,7 +209,10 @@ public: return key_; } + // 增加本对象引用计数 void refer(); + + // 减少本对象引用计数 void unrefer(); protected: @@ -228,6 +231,7 @@ protected: protected: bool alive_; // 是否属正常 + ssize_t refers_; // 当前连接池对象的引用计数 bool delay_destroy_; // 是否设置了延迟自销毁 // 有问题的服务器的可以重试的时间间隔,不可用连接池对象再次被启用的时间间隔 int retry_inter_; @@ -252,14 +256,17 @@ protected: time_t last_; // 上次记录的时间截 std::list pool_; // 连接池集合 - size_t check_conns(size_t count); - size_t check_conns(size_t count, thread_pool& threads); + size_t check_dead(size_t count); + size_t check_dead(size_t count, thread_pool& threads); void keep_conns(size_t min); void keep_conns(size_t min, thread_pool& threads); size_t kick_idle_conns(time_t ttl); // 关闭过期的连接 connect_client* peek_back(); // 从尾部 Peek 连接 void put_front(connect_client* conn); // 向头部 Put 连接 + + void count_inc(bool exclusive); // 增加连接数及对象引用计数 + void count_dec(bool exclusive); // 减少连接数及对象引用计数 }; class ACL_CPP_API connect_guard : public noncopyable { diff --git a/lib_acl_cpp/samples/connect_manager/connect_pool.cpp b/lib_acl_cpp/samples/connect_manager/connect_pool.cpp index 96aeaaf21..725f9312b 100644 --- a/lib_acl_cpp/samples/connect_manager/connect_pool.cpp +++ b/lib_acl_cpp/samples/connect_manager/connect_pool.cpp @@ -8,6 +8,7 @@ connect_pool::connect_pool(const char* addr, size_t count, size_t idx) , conn_timeout_(30) , rw_timeout_(30) { + this->set_conns_min(count); } connect_pool::~connect_pool() diff --git a/lib_acl_cpp/src/connpool/connect_manager.cpp b/lib_acl_cpp/src/connpool/connect_manager.cpp index 4eb8486b9..20dc4aead 100644 --- a/lib_acl_cpp/src/connpool/connect_manager.cpp +++ b/lib_acl_cpp/src/connpool/connect_manager.cpp @@ -448,10 +448,8 @@ size_t connect_manager::check_conns(size_t step, bool check_idle, bool kick_dead, bool keep_conns, thread_pool* threads, size_t* left) { std::vector pools; - size_t nfreed = 0; pools_dump(step, pools); - if (pools.empty()) { if (left) { *left = 0; @@ -459,6 +457,8 @@ size_t connect_manager::check_conns(size_t step, bool check_idle, return 0; } + size_t nfreed = 0; + if (check_idle) { nfreed += check_idle_conns(pools); } diff --git a/lib_acl_cpp/src/connpool/connect_pool.cpp b/lib_acl_cpp/src/connpool/connect_pool.cpp index 0dc24f7b5..a2d70c35f 100644 --- a/lib_acl_cpp/src/connpool/connect_pool.cpp +++ b/lib_acl_cpp/src/connpool/connect_pool.cpp @@ -48,6 +48,7 @@ private: connect_pool::connect_pool(const char* addr, size_t max, size_t idx /* = 0 */) : alive_(true) +, refers_(0) , delay_destroy_(false) , last_dead_(0) , conn_timeout_(30) @@ -77,6 +78,46 @@ connect_pool::~connect_pool() } } +void connect_pool::count_inc(bool exclusive) { + if (exclusive) { + lock_.lock(); + } + ++count_; + ++refers_; + if (exclusive) { + lock_.unlock(); + } +} + +void connect_pool::count_dec(bool exclusive) { + if (exclusive) { + lock_.lock(); + } + --count_; + --refers_; + if (exclusive) { + lock_.unlock(); + } +} + +void connect_pool::refer() +{ + lock_.lock(); + ++refers_; + lock_.unlock(); +} + +void connect_pool::unrefer() +{ + lock_.lock(); + if (--refers_ <= 0 && delay_destroy_) { + lock_.unlock(); + delete this; + } else { + lock_.unlock(); + } +} + void connect_pool::set_key(const char* key) { ACL_SAFE_STRNCPY(key_, key, sizeof(key_)); @@ -206,8 +247,8 @@ connect_client* connect_pool::peek(bool on, double* tc, bool* old) return conn; } else if (max_ > 0 && count_ >= max_) { - logger_error("too many connections, max: %d, curr: %d," - " server: %s", (int) max_, (int) count_, addr_); + logger_error("too many connections, max: %zd, curr: %zd," + " server: %s", max_, count_, addr_); lock_.unlock(); SET_TIME_COST; @@ -222,7 +263,7 @@ connect_client* connect_pool::peek(bool on, double* tc, bool* old) } // 将以下三个值预 +1 - count_++; + count_inc(false); total_used_++; current_used_++; @@ -232,7 +273,7 @@ connect_client* connect_pool::peek(bool on, double* tc, bool* old) conn = create_connect(); if (conn == NULL) { lock_.lock(); - count_--; + count_dec(false); total_used_--; current_used_--; #ifdef AUTO_SET_ALIVE @@ -252,7 +293,7 @@ connect_client* connect_pool::peek(bool on, double* tc, bool* old) if (!conn->open()) { lock_.lock(); // 因为打开连接失败,所以还需将上面预 +1 的三个成员再 -1 - count_--; + count_dec(false); total_used_--; current_used_--; #ifdef AUTO_SET_ALIVE @@ -274,12 +315,10 @@ connect_client* connect_pool::peek(bool on, double* tc, bool* old) void connect_pool::bind_one(connect_client* conn) { - lock_.lock(); if (conn->get_pool() != this) { conn->set_pool(this); - count_++; + count_inc(true); } - lock_.unlock(); } void connect_pool::put(connect_client* conn, bool keep /* = true */, @@ -292,11 +331,11 @@ void connect_pool::put(connect_client* conn, bool keep /* = true */, // 检查是否设置了自销毁标志位 if (delay_destroy_) { if (conn->get_pool() == this) { - count_--; + count_dec(false); } delete conn; - if (count_ <= 0) { + if (refers_ <= 0) { // 如果引用计数为 0 则自销毁 lock_.unlock(); delete this; @@ -315,7 +354,7 @@ void connect_pool::put(connect_client* conn, bool keep /* = true */, } else { acl_assert(count_ > 0); if (conn->get_pool() == this) { - count_--; + count_dec(false); } delete conn; } @@ -336,24 +375,6 @@ void connect_pool::put(connect_client* conn, bool keep /* = true */, } } -void connect_pool::refer() -{ - lock_.lock(); - ++count_; - lock_.unlock(); -} - -void connect_pool::unrefer() -{ - lock_.lock(); - if (--count_ <= 0 && delay_destroy_) { - lock_.unlock(); - delete this; - } else { - lock_.unlock(); - } -} - void connect_pool::set_delay_destroy() { lock_.lock(); @@ -397,11 +418,11 @@ size_t connect_pool::check_idle(time_t ttl, bool exclusive /* true */) std::list::iterator it = pool_.begin(); for (; it != pool_.end(); ++it) { delete *it; + count_dec(false); n++; } pool_.clear(); - count_ = 0; if (exclusive) { lock_.unlock(); @@ -448,7 +469,7 @@ size_t connect_pool::kick_idle_conns(time_t ttl) // Decrease connections count only if the connection is mine. if ((*it)->get_pool() == this) { - count_--; + count_dec(false); } delete *it; @@ -471,12 +492,12 @@ size_t connect_pool::check_dead(thread_pool* threads /* NULL */) } if (threads == NULL) { - return check_conns(count); + return check_dead(count); } - return check_conns(count, *threads); + return check_dead(count, *threads); } -size_t connect_pool::check_conns(size_t count) +size_t connect_pool::check_dead(size_t count) { size_t n = 0; for (size_t i = 0; i < count; i++) { @@ -491,19 +512,19 @@ size_t connect_pool::check_conns(size_t count) } if (conn->get_pool() == this) { - lock_.lock(); - --count_; - lock_.unlock(); + count_dec(true); } delete conn; n++; } + return n; } -size_t connect_pool::check_conns(size_t count, thread_pool& threads) +size_t connect_pool::check_dead(size_t count, thread_pool& threads) { // Check all connections in threads pool. + size_t n = 0; tbox box; std::vector jobs; @@ -518,6 +539,9 @@ size_t connect_pool::check_conns(size_t count, thread_pool& threads) threads.execute(job); } + struct timeval begin; + gettimeofday(&begin, NULL); + bool found; for (size_t i = 0; i < jobs.size(); i++) { (void) box.pop(-1, &found); @@ -526,6 +550,12 @@ size_t connect_pool::check_conns(size_t count, thread_pool& threads) } } + struct timeval end; + gettimeofday(&end, NULL); + double tc = stamp_sub(end, begin); + logger("Threads: limit=%zd, count=%d; jobs count=%zd, %zd, time cost=%.2f ms", + threads.get_limit(), threads.threads_count(), jobs.size(), count, tc); + for (std::vector::iterator it = jobs.begin(); it != jobs.end(); ++it) { connect_client* conn = &(*it)->get_conn(); @@ -539,19 +569,19 @@ size_t connect_pool::check_conns(size_t count, thread_pool& threads) delete *it; if (conn->get_pool() == this) { - lock_.lock(); - --count_; - lock_.unlock(); + count_dec(true); } delete conn; n++; } + return n; } connect_client* connect_pool::peek_back() { lock_.lock(); +#if 1 std::list::reverse_iterator rit = pool_.rbegin(); if (rit == pool_.rend()) { lock_.unlock(); @@ -561,6 +591,15 @@ connect_client* connect_pool::peek_back() std::list::iterator it = --rit.base(); connect_client* conn = *it; pool_.erase(it); +#else + std::list::reverse_iterator rit = pool_.rbegin(); + if (rit == pool_.rend()) { + lock_.unlock(); + return NULL; + } + connect_client* conn = *rit; + pool_.erase(--rit.base()); +#endif lock_.unlock(); return conn; } @@ -575,11 +614,11 @@ void connect_pool::put_front(connect_client* conn) // 检查是否设置了自销毁标志位 if (delay_destroy_) { if (conn->get_pool() == this) { - count_--; + count_dec(false); } delete conn; - if (count_ <= 0) { + if (refers_ <= 0) { // 如果引用计数为 0 则自销毁 lock_.unlock(); delete this; @@ -625,7 +664,7 @@ void connect_pool::keep_conns(thread_pool* threads /* NULL */) void connect_pool::keep_conns(size_t min) { for (size_t i = 0; i < min; i++) { - connect_client* conn = create_connect(); + connect_client* conn = this->create_connect(); if (conn == NULL) { logger_error("Create connection error"); break; @@ -641,7 +680,8 @@ void connect_pool::keep_conns(size_t min) put(conn, true); lock_.lock(); - count_++; + alive_ = true; + count_inc(false); if (max_ > 0 && count_ >= max_) { lock_.unlock(); break; @@ -655,14 +695,17 @@ void connect_pool::keep_conns(size_t min, thread_pool& threads) tbox box; std::vector jobs; for (size_t i = 0; i < min; i++) { - connect_client* conn = create_connect(); + connect_client* conn = this->create_connect(); check_job* job = NEW check_job(box, *conn, false); jobs.push_back(job); threads.execute(job); } - bool found; + struct timeval begin; + gettimeofday(&begin, NULL); + // Waiting all jobs finished. + bool found; for (size_t i = 0; i < jobs.size(); i++) { (void) box.pop(-1, &found); if (!found) { @@ -670,6 +713,12 @@ void connect_pool::keep_conns(size_t min, thread_pool& threads) } } + struct timeval end; + gettimeofday(&end, NULL); + double tc = stamp_sub(end, begin); + logger("Threads: limit=%zd, count=%d; jobs count=%zd, time cost=%.2f ms", + threads.get_limit(), threads.threads_count(), jobs.size(), tc); + for (std::vector::iterator it = jobs.begin(); it != jobs.end(); ++it) { connect_client* conn = &(*it)->get_conn(); @@ -683,7 +732,10 @@ void connect_pool::keep_conns(size_t min, thread_pool& threads) conn->set_pool(this); put(conn, true); - count_++; + + lock_.lock(); + alive_ = true; + count_inc(false); lock_.unlock(); } } diff --git a/lib_acl_cpp/src/stdlib/util.cpp b/lib_acl_cpp/src/stdlib/util.cpp index ed9e80be0..e0fc9f530 100644 --- a/lib_acl_cpp/src/stdlib/util.cpp +++ b/lib_acl_cpp/src/stdlib/util.cpp @@ -60,18 +60,8 @@ long long get_curr_stamp(void) double stamp_sub(const struct timeval& from, const struct timeval& sub) { - struct timeval res; - - memcpy(&res, &from, sizeof(struct timeval)); - - res.tv_usec -= sub.tv_usec; - if (res.tv_usec < 0) { - --res.tv_sec; - res.tv_usec += 1000000; - } - - res.tv_sec -= sub.tv_sec; - return res.tv_sec * 1000.0 + res.tv_usec / 1000.0; + return (from.tv_sec - sub.tv_sec) * 1000 + + (from.tv_usec - sub.tv_usec) / 1000; } } // namespace acl