Merge branch 'gitee-master' into gitlab-upstream

This commit is contained in:
zhengshuxin 2024-08-21 11:48:46 +08:00
commit 09753e0564
5 changed files with 114 additions and 64 deletions

View File

@ -209,7 +209,10 @@ public:
return key_; return key_;
} }
// 增加本对象引用计数
void refer(); void refer();
// 减少本对象引用计数
void unrefer(); void unrefer();
protected: protected:
@ -228,6 +231,7 @@ protected:
protected: protected:
bool alive_; // 是否属正常 bool alive_; // 是否属正常
ssize_t refers_; // 当前连接池对象的引用计数
bool delay_destroy_; // 是否设置了延迟自销毁 bool delay_destroy_; // 是否设置了延迟自销毁
// 有问题的服务器的可以重试的时间间隔,不可用连接池对象再次被启用的时间间隔 // 有问题的服务器的可以重试的时间间隔,不可用连接池对象再次被启用的时间间隔
int retry_inter_; int retry_inter_;
@ -252,14 +256,17 @@ protected:
time_t last_; // 上次记录的时间截 time_t last_; // 上次记录的时间截
std::list<connect_client*> pool_; // 连接池集合 std::list<connect_client*> pool_; // 连接池集合
size_t check_conns(size_t count); size_t check_dead(size_t count);
size_t check_conns(size_t count, thread_pool& threads); size_t check_dead(size_t count, thread_pool& threads);
void keep_conns(size_t min); void keep_conns(size_t min);
void keep_conns(size_t min, thread_pool& threads); void keep_conns(size_t min, thread_pool& threads);
size_t kick_idle_conns(time_t ttl); // 关闭过期的连接 size_t kick_idle_conns(time_t ttl); // 关闭过期的连接
connect_client* peek_back(); // 从尾部 Peek 连接 connect_client* peek_back(); // 从尾部 Peek 连接
void put_front(connect_client* conn); // 向头部 Put 连接 void put_front(connect_client* conn); // 向头部 Put 连接
void count_inc(bool exclusive); // 增加连接数及对象引用计数
void count_dec(bool exclusive); // 减少连接数及对象引用计数
}; };
class ACL_CPP_API connect_guard : public noncopyable { class ACL_CPP_API connect_guard : public noncopyable {

View File

@ -8,6 +8,7 @@ connect_pool::connect_pool(const char* addr, size_t count, size_t idx)
, conn_timeout_(30) , conn_timeout_(30)
, rw_timeout_(30) , rw_timeout_(30)
{ {
this->set_conns_min(count);
} }
connect_pool::~connect_pool() connect_pool::~connect_pool()

View File

@ -448,10 +448,8 @@ size_t connect_manager::check_conns(size_t step, bool check_idle,
bool kick_dead, bool keep_conns, thread_pool* threads, size_t* left) bool kick_dead, bool keep_conns, thread_pool* threads, size_t* left)
{ {
std::vector<connect_pool*> pools; std::vector<connect_pool*> pools;
size_t nfreed = 0;
pools_dump(step, pools); pools_dump(step, pools);
if (pools.empty()) { if (pools.empty()) {
if (left) { if (left) {
*left = 0; *left = 0;
@ -459,6 +457,8 @@ size_t connect_manager::check_conns(size_t step, bool check_idle,
return 0; return 0;
} }
size_t nfreed = 0;
if (check_idle) { if (check_idle) {
nfreed += check_idle_conns(pools); nfreed += check_idle_conns(pools);
} }

View File

@ -48,6 +48,7 @@ private:
connect_pool::connect_pool(const char* addr, size_t max, size_t idx /* = 0 */) connect_pool::connect_pool(const char* addr, size_t max, size_t idx /* = 0 */)
: alive_(true) : alive_(true)
, refers_(0)
, delay_destroy_(false) , delay_destroy_(false)
, last_dead_(0) , last_dead_(0)
, conn_timeout_(30) , conn_timeout_(30)
@ -77,6 +78,46 @@ connect_pool::~connect_pool()
} }
} }
void connect_pool::count_inc(bool exclusive) {
if (exclusive) {
lock_.lock();
}
++count_;
++refers_;
if (exclusive) {
lock_.unlock();
}
}
void connect_pool::count_dec(bool exclusive) {
if (exclusive) {
lock_.lock();
}
--count_;
--refers_;
if (exclusive) {
lock_.unlock();
}
}
void connect_pool::refer()
{
lock_.lock();
++refers_;
lock_.unlock();
}
void connect_pool::unrefer()
{
lock_.lock();
if (--refers_ <= 0 && delay_destroy_) {
lock_.unlock();
delete this;
} else {
lock_.unlock();
}
}
void connect_pool::set_key(const char* key) void connect_pool::set_key(const char* key)
{ {
ACL_SAFE_STRNCPY(key_, key, sizeof(key_)); ACL_SAFE_STRNCPY(key_, key, sizeof(key_));
@ -206,8 +247,8 @@ connect_client* connect_pool::peek(bool on, double* tc, bool* old)
return conn; return conn;
} else if (max_ > 0 && count_ >= max_) { } else if (max_ > 0 && count_ >= max_) {
logger_error("too many connections, max: %d, curr: %d," logger_error("too many connections, max: %zd, curr: %zd,"
" server: %s", (int) max_, (int) count_, addr_); " server: %s", max_, count_, addr_);
lock_.unlock(); lock_.unlock();
SET_TIME_COST; SET_TIME_COST;
@ -222,7 +263,7 @@ connect_client* connect_pool::peek(bool on, double* tc, bool* old)
} }
// 将以下三个值预 +1 // 将以下三个值预 +1
count_++; count_inc(false);
total_used_++; total_used_++;
current_used_++; current_used_++;
@ -232,7 +273,7 @@ connect_client* connect_pool::peek(bool on, double* tc, bool* old)
conn = create_connect(); conn = create_connect();
if (conn == NULL) { if (conn == NULL) {
lock_.lock(); lock_.lock();
count_--; count_dec(false);
total_used_--; total_used_--;
current_used_--; current_used_--;
#ifdef AUTO_SET_ALIVE #ifdef AUTO_SET_ALIVE
@ -252,7 +293,7 @@ connect_client* connect_pool::peek(bool on, double* tc, bool* old)
if (!conn->open()) { if (!conn->open()) {
lock_.lock(); lock_.lock();
// 因为打开连接失败,所以还需将上面预 +1 的三个成员再 -1 // 因为打开连接失败,所以还需将上面预 +1 的三个成员再 -1
count_--; count_dec(false);
total_used_--; total_used_--;
current_used_--; current_used_--;
#ifdef AUTO_SET_ALIVE #ifdef AUTO_SET_ALIVE
@ -274,12 +315,10 @@ connect_client* connect_pool::peek(bool on, double* tc, bool* old)
void connect_pool::bind_one(connect_client* conn) void connect_pool::bind_one(connect_client* conn)
{ {
lock_.lock();
if (conn->get_pool() != this) { if (conn->get_pool() != this) {
conn->set_pool(this); conn->set_pool(this);
count_++; count_inc(true);
} }
lock_.unlock();
} }
void connect_pool::put(connect_client* conn, bool keep /* = true */, void connect_pool::put(connect_client* conn, bool keep /* = true */,
@ -292,11 +331,11 @@ void connect_pool::put(connect_client* conn, bool keep /* = true */,
// 检查是否设置了自销毁标志位 // 检查是否设置了自销毁标志位
if (delay_destroy_) { if (delay_destroy_) {
if (conn->get_pool() == this) { if (conn->get_pool() == this) {
count_--; count_dec(false);
} }
delete conn; delete conn;
if (count_ <= 0) { if (refers_ <= 0) {
// 如果引用计数为 0 则自销毁 // 如果引用计数为 0 则自销毁
lock_.unlock(); lock_.unlock();
delete this; delete this;
@ -315,7 +354,7 @@ void connect_pool::put(connect_client* conn, bool keep /* = true */,
} else { } else {
acl_assert(count_ > 0); acl_assert(count_ > 0);
if (conn->get_pool() == this) { if (conn->get_pool() == this) {
count_--; count_dec(false);
} }
delete conn; delete conn;
} }
@ -336,24 +375,6 @@ void connect_pool::put(connect_client* conn, bool keep /* = true */,
} }
} }
void connect_pool::refer()
{
lock_.lock();
++count_;
lock_.unlock();
}
void connect_pool::unrefer()
{
lock_.lock();
if (--count_ <= 0 && delay_destroy_) {
lock_.unlock();
delete this;
} else {
lock_.unlock();
}
}
void connect_pool::set_delay_destroy() void connect_pool::set_delay_destroy()
{ {
lock_.lock(); lock_.lock();
@ -397,11 +418,11 @@ size_t connect_pool::check_idle(time_t ttl, bool exclusive /* true */)
std::list<connect_client*>::iterator it = pool_.begin(); std::list<connect_client*>::iterator it = pool_.begin();
for (; it != pool_.end(); ++it) { for (; it != pool_.end(); ++it) {
delete *it; delete *it;
count_dec(false);
n++; n++;
} }
pool_.clear(); pool_.clear();
count_ = 0;
if (exclusive) { if (exclusive) {
lock_.unlock(); lock_.unlock();
@ -448,7 +469,7 @@ size_t connect_pool::kick_idle_conns(time_t ttl)
// Decrease connections count only if the connection is mine. // Decrease connections count only if the connection is mine.
if ((*it)->get_pool() == this) { if ((*it)->get_pool() == this) {
count_--; count_dec(false);
} }
delete *it; delete *it;
@ -471,12 +492,12 @@ size_t connect_pool::check_dead(thread_pool* threads /* NULL */)
} }
if (threads == NULL) { if (threads == NULL) {
return check_conns(count); return check_dead(count);
} }
return check_conns(count, *threads); return check_dead(count, *threads);
} }
size_t connect_pool::check_conns(size_t count) size_t connect_pool::check_dead(size_t count)
{ {
size_t n = 0; size_t n = 0;
for (size_t i = 0; i < count; i++) { for (size_t i = 0; i < count; i++) {
@ -491,19 +512,19 @@ size_t connect_pool::check_conns(size_t count)
} }
if (conn->get_pool() == this) { if (conn->get_pool() == this) {
lock_.lock(); count_dec(true);
--count_;
lock_.unlock();
} }
delete conn; delete conn;
n++; n++;
} }
return n; return n;
} }
size_t connect_pool::check_conns(size_t count, thread_pool& threads) size_t connect_pool::check_dead(size_t count, thread_pool& threads)
{ {
// Check all connections in threads pool. // Check all connections in threads pool.
size_t n = 0; size_t n = 0;
tbox<bool> box; tbox<bool> box;
std::vector<check_job*> jobs; std::vector<check_job*> jobs;
@ -518,6 +539,9 @@ size_t connect_pool::check_conns(size_t count, thread_pool& threads)
threads.execute(job); threads.execute(job);
} }
struct timeval begin;
gettimeofday(&begin, NULL);
bool found; bool found;
for (size_t i = 0; i < jobs.size(); i++) { for (size_t i = 0; i < jobs.size(); i++) {
(void) box.pop(-1, &found); (void) box.pop(-1, &found);
@ -526,6 +550,12 @@ size_t connect_pool::check_conns(size_t count, thread_pool& threads)
} }
} }
struct timeval end;
gettimeofday(&end, NULL);
double tc = stamp_sub(end, begin);
logger("Threads: limit=%zd, count=%d; jobs count=%zd, %zd, time cost=%.2f ms",
threads.get_limit(), threads.threads_count(), jobs.size(), count, tc);
for (std::vector<check_job*>::iterator it = jobs.begin(); for (std::vector<check_job*>::iterator it = jobs.begin();
it != jobs.end(); ++it) { it != jobs.end(); ++it) {
connect_client* conn = &(*it)->get_conn(); connect_client* conn = &(*it)->get_conn();
@ -539,19 +569,19 @@ size_t connect_pool::check_conns(size_t count, thread_pool& threads)
delete *it; delete *it;
if (conn->get_pool() == this) { if (conn->get_pool() == this) {
lock_.lock(); count_dec(true);
--count_;
lock_.unlock();
} }
delete conn; delete conn;
n++; n++;
} }
return n; return n;
} }
connect_client* connect_pool::peek_back() connect_client* connect_pool::peek_back()
{ {
lock_.lock(); lock_.lock();
#if 1
std::list<connect_client*>::reverse_iterator rit = pool_.rbegin(); std::list<connect_client*>::reverse_iterator rit = pool_.rbegin();
if (rit == pool_.rend()) { if (rit == pool_.rend()) {
lock_.unlock(); lock_.unlock();
@ -561,6 +591,15 @@ connect_client* connect_pool::peek_back()
std::list<connect_client*>::iterator it = --rit.base(); std::list<connect_client*>::iterator it = --rit.base();
connect_client* conn = *it; connect_client* conn = *it;
pool_.erase(it); pool_.erase(it);
#else
std::list<connect_client*>::reverse_iterator rit = pool_.rbegin();
if (rit == pool_.rend()) {
lock_.unlock();
return NULL;
}
connect_client* conn = *rit;
pool_.erase(--rit.base());
#endif
lock_.unlock(); lock_.unlock();
return conn; return conn;
} }
@ -575,11 +614,11 @@ void connect_pool::put_front(connect_client* conn)
// 检查是否设置了自销毁标志位 // 检查是否设置了自销毁标志位
if (delay_destroy_) { if (delay_destroy_) {
if (conn->get_pool() == this) { if (conn->get_pool() == this) {
count_--; count_dec(false);
} }
delete conn; delete conn;
if (count_ <= 0) { if (refers_ <= 0) {
// 如果引用计数为 0 则自销毁 // 如果引用计数为 0 则自销毁
lock_.unlock(); lock_.unlock();
delete this; delete this;
@ -625,7 +664,7 @@ void connect_pool::keep_conns(thread_pool* threads /* NULL */)
void connect_pool::keep_conns(size_t min) void connect_pool::keep_conns(size_t min)
{ {
for (size_t i = 0; i < min; i++) { for (size_t i = 0; i < min; i++) {
connect_client* conn = create_connect(); connect_client* conn = this->create_connect();
if (conn == NULL) { if (conn == NULL) {
logger_error("Create connection error"); logger_error("Create connection error");
break; break;
@ -641,7 +680,8 @@ void connect_pool::keep_conns(size_t min)
put(conn, true); put(conn, true);
lock_.lock(); lock_.lock();
count_++; alive_ = true;
count_inc(false);
if (max_ > 0 && count_ >= max_) { if (max_ > 0 && count_ >= max_) {
lock_.unlock(); lock_.unlock();
break; break;
@ -655,14 +695,17 @@ void connect_pool::keep_conns(size_t min, thread_pool& threads)
tbox<bool> box; tbox<bool> box;
std::vector<check_job*> jobs; std::vector<check_job*> jobs;
for (size_t i = 0; i < min; i++) { for (size_t i = 0; i < min; i++) {
connect_client* conn = create_connect(); connect_client* conn = this->create_connect();
check_job* job = NEW check_job(box, *conn, false); check_job* job = NEW check_job(box, *conn, false);
jobs.push_back(job); jobs.push_back(job);
threads.execute(job); threads.execute(job);
} }
bool found; struct timeval begin;
gettimeofday(&begin, NULL);
// Waiting all jobs finished. // Waiting all jobs finished.
bool found;
for (size_t i = 0; i < jobs.size(); i++) { for (size_t i = 0; i < jobs.size(); i++) {
(void) box.pop(-1, &found); (void) box.pop(-1, &found);
if (!found) { if (!found) {
@ -670,6 +713,12 @@ void connect_pool::keep_conns(size_t min, thread_pool& threads)
} }
} }
struct timeval end;
gettimeofday(&end, NULL);
double tc = stamp_sub(end, begin);
logger("Threads: limit=%zd, count=%d; jobs count=%zd, time cost=%.2f ms",
threads.get_limit(), threads.threads_count(), jobs.size(), tc);
for (std::vector<check_job*>::iterator it = jobs.begin(); for (std::vector<check_job*>::iterator it = jobs.begin();
it != jobs.end(); ++it) { it != jobs.end(); ++it) {
connect_client* conn = &(*it)->get_conn(); connect_client* conn = &(*it)->get_conn();
@ -683,7 +732,10 @@ void connect_pool::keep_conns(size_t min, thread_pool& threads)
conn->set_pool(this); conn->set_pool(this);
put(conn, true); put(conn, true);
count_++;
lock_.lock();
alive_ = true;
count_inc(false);
lock_.unlock(); lock_.unlock();
} }
} }

View File

@ -60,18 +60,8 @@ long long get_curr_stamp(void)
double stamp_sub(const struct timeval& from, const struct timeval& sub) double stamp_sub(const struct timeval& from, const struct timeval& sub)
{ {
struct timeval res; return (from.tv_sec - sub.tv_sec) * 1000
+ (from.tv_usec - sub.tv_usec) / 1000;
memcpy(&res, &from, sizeof(struct timeval));
res.tv_usec -= sub.tv_usec;
if (res.tv_usec < 0) {
--res.tv_sec;
res.tv_usec += 1000000;
}
res.tv_sec -= sub.tv_sec;
return res.tv_sec * 1000.0 + res.tv_usec / 1000.0;
} }
} // namespace acl } // namespace acl