Merge branch 'gitee-master' into gitlab-upstream

This commit is contained in:
zhengshuxin 2024-08-20 11:37:41 +08:00
commit 38ef84e054
12 changed files with 197 additions and 100 deletions

View File

@ -178,10 +178,9 @@ public:
*
* @param step {size_t}
* @param left {size_t*}
* @param kick_dead {book}
* @return {size_t}
*/
size_t check_idle(size_t step, size_t* left = NULL, bool kick_dead = false);
size_t check_idle_conns(size_t step, size_t* left = NULL);
/**
*
@ -189,7 +188,26 @@ public:
* @param left {size_t*}
* @return {size_t}
*/
size_t check_dead(size_t step, size_t* left = NULL);
size_t check_dead_conns(size_t step, size_t* left = NULL);
/**
*
* @param step {size_t}
* @return {size_t}
*/
size_t keep_min_conns(size_t step);
/**
*
* @param step {size_t}
* @param check_idle {bool}
* @param kick_dead {bool}
* @param keep_conns {bool}
* @param left {size_t*}
* @return {size_t}
*/
size_t check_conns(size_t step, bool check_idle, bool kick_dead,
bool keep_conns, size_t* left = NULL);
/**
*

View File

@ -60,21 +60,22 @@ public:
/**
*
* @param on {bool}
* @param check_idle {bool}
* @param kick_dead {bool}
* true connect_client alive()
* @param keep_conns {bool}
* @param step {bool}
* @return {connect_monitor&}
*/
connect_monitor& set_check_idle(bool on, bool kick_dead = false,
size_t step = 1);
connect_monitor& set_check_conns(bool check_idle, bool kick_dead,
bool keep_conns, size_t step = 0);
/**
*
* @return {bool}
*/
bool check_idle_on() const {
return check_idle_on_;
return check_idle_;
}
/**
@ -85,12 +86,20 @@ public:
return kick_dead_;
}
/**
*
* @return {bool}
*/
bool keep_conns_on() const {
return keep_conns_;
}
/**
* check_idle_on() true
* @return {size_t}
*/
size_t get_check_idle_step() const {
return check_idle_step_;
size_t get_check_step() const {
return check_step_;
}
/**
@ -186,11 +195,10 @@ private:
bool check_server_; // 是否检查服务端可用性
int check_inter_; // 检测连接池状态的时间间隔(秒)
int conn_timeout_; // 连接服务器的超时时间
bool check_idle_on_; // 是否检测并关闭过期空闲连接
bool check_idle_; // 是否检测并关闭过期空闲连接
bool kick_dead_; // 是否删除异常连接
size_t check_idle_step_; // 每次检测连接数限制
bool check_dead_on_; // 是否检测并关闭断开的连接
size_t check_dead_step_; // 每次检测异常连接的数量限制
bool keep_conns_; // 是否保持每个连接池最小连接数
size_t check_step_; // 每次检测连接池个数限制
rpc_service* rpc_service_; // 异步 RPC 通信服务句柄
};

View File

@ -9,6 +9,13 @@ namespace acl {
class connect_manager;
class connect_client;
typedef enum {
cpool_put_oper_none = 0,
cpool_put_check_idle = 1,
cpool_put_check_dead = (1 << 1),
cpool_put_keep_conns = (1 << 2),
} cpool_put_oper_t;
/**
*
* create_connect
@ -97,8 +104,11 @@ public:
*
* @param conn {redis_client*}
* @param keep {bool}
* @param oper {cpool_put_oper_t}
* cpool_put_oper_t
*/
void put(connect_client* conn, bool keep = true);
void put(connect_client* conn, bool keep = true,
cpool_put_oper_t oper = cpool_put_check_idle);
/**
*
@ -107,23 +117,7 @@ public:
* @return {size_t}
*/
size_t check_idle(time_t ttl, bool exclusive = true);
/**
* 使 set_idle_ttl()
* @param exclusive {bool}
* @param kick_dead {bool}
* @return {size_t}
*/
size_t check_idle(bool kick_dead, bool exclusive = true);
/**
*
* @param ttl {time_t} >= 0
* @param exclusive {bool}
* @param kick_dead {bool}
* @return {size_t}
*/
size_t check_idle(time_t ttl, bool kick_dead, bool exclusive = true);
size_t check_idle(bool exclusive = true);
/**
*

View File

@ -5,8 +5,6 @@
connect_pool::connect_pool(const char* addr, size_t count, size_t idx)
: acl::connect_pool(addr, count, idx)
, addr_(addr)
, count_(count)
, idx_(idx)
, conn_timeout_(30)
, rw_timeout_(30)
{

View File

@ -14,8 +14,6 @@ protected:
private:
acl::string addr_;
size_t count_;
size_t idx_;
int conn_timeout_;
int rw_timeout_;
};

View File

@ -47,7 +47,7 @@ static void init(const char* addrs, int count, int check_type, const char* proto
check_type != 0);
monitor->set_check_inter(check_inter);
monitor->set_conn_timeout(conn_timeout);
monitor->set_check_idle(true, true);
monitor->set_check_conns(true, true, true);
if (check_type == 2) {
monitor->open_rpc_service(10, NULL);

View File

@ -58,18 +58,15 @@ void mymonitor::nio_check(acl::check_client& checker,
void mymonitor::on_connected(const acl::check_client& checker, double cost)
{
printf("addr=%s, connected, cost=%.2f\r\n",
checker.get_addr(), cost);
printf("addr=%s, connected, cost=%.2f\r\n", checker.get_addr(), cost);
}
void mymonitor::on_refuse(const acl::check_client& checker, double cost)
void mymonitor::on_refuse(const char* addr, double cost)
{
printf("addr=%s, connection refused, cost=%.2f\r\n",
checker.get_addr(), cost);
printf("addr=%s, connection refused, cost=%.2f\r\n", addr, cost);
}
void mymonitor::on_timeout(const acl::check_client& checker, double cost)
void mymonitor::on_timeout(const char* addr, double cost)
{
printf("addr=%s, connection timeout, cost=%.2f\r\n",
checker.get_addr(), cost);
printf("addr=%s, connection timeout, cost=%.2f\r\n", addr, cost);
}

View File

@ -38,10 +38,10 @@ protected:
void on_connected(const acl::check_client& checker, double cost);
// @override
void on_refuse(const acl::check_client& checker, double cost);
void on_refuse(const char* addr, double cost);
// @override
void on_timeout(const acl::check_client& checker, double cost);
void on_timeout(const char* addr, double cost);
private:
acl::string proto_;

View File

@ -33,11 +33,10 @@ void check_timer::timer_callback(unsigned int id)
connect_manager& manager = monitor_.get_manager();
// 自动检测并关闭过期空闲长连接
if (monitor_.check_idle_on()) {
manager.check_idle(monitor_.get_check_idle_step(), NULL,
monitor_.kick_dead_on());
}
// 自动检测并关闭过期空闲长连接,自动关闭异常连接,自动保持每个连接池的最小连接数
manager.check_conns(monitor_.get_check_step(),
monitor_.check_idle_on(), monitor_.kick_dead_on(),
monitor_.keep_conns_on(), NULL);
if (!monitor_.check_server_on()) {
return;

View File

@ -386,8 +386,7 @@ connect_pool* connect_manager::get(const char* addr,
//////////////////////////////////////////////////////////////////////////
size_t connect_manager::check_idle(size_t step, size_t* left /* NULL */,
bool kick_dead /* false */)
size_t connect_manager::check_idle_conns(size_t step, size_t* left /* NULL */)
{
std::vector<connect_pool*> pools_tmp;
size_t nleft = 0, nfreed = 0, pools_size, check_max, check_pos;
@ -419,10 +418,8 @@ size_t connect_manager::check_idle(size_t step, size_t* left /* NULL */,
for (std::vector<connect_pool*>::iterator it = pools_tmp.begin();
it != pools_tmp.end(); ++it) {
size_t ret = (*it)->check_idle(kick_dead, true);
if (ret > 0) {
nfreed += ret;
}
size_t ret = (*it)->check_idle(true);
nfreed += ret;
nleft += (*it)->get_count();
(*it)->unrefer(); // Decrease reference added before.
@ -434,7 +431,7 @@ size_t connect_manager::check_idle(size_t step, size_t* left /* NULL */,
return nfreed;
}
size_t connect_manager::check_dead(size_t step, size_t* left /* NULL */)
size_t connect_manager::check_dead_conns(size_t step, size_t* left /* NULL */)
{
std::vector<connect_pool*> pools_tmp;
size_t nleft = 0, nfreed = 0, pools_size, check_max, check_pos;
@ -467,9 +464,7 @@ size_t connect_manager::check_dead(size_t step, size_t* left /* NULL */)
for (std::vector<connect_pool*>::iterator it = pools_tmp.begin();
it != pools_tmp.end(); ++it) {
size_t ret = (*it)->check_dead();
if (ret > 0) {
nfreed += ret;
}
nfreed += ret;
nleft += (*it)->get_count();
(*it)->unrefer();
}
@ -480,6 +475,99 @@ size_t connect_manager::check_dead(size_t step, size_t* left /* NULL */)
return nfreed;
}
size_t connect_manager::keep_min_conns(size_t step)
{
std::vector<connect_pool*> pools_tmp;
size_t nleft = 0, pools_size, check_max, check_pos;
unsigned long id = get_id();
lock_.lock();
conns_pools& pools = get_pools_by_id(id);
pools_size = pools.pools.size();
if (pools_size == 0) {
lock_.unlock();
return 0;
}
if (step == 0 || step > pools_size) {
step = pools_size;
}
check_pos = pools.check_next++ % pools_size;
check_max = check_pos + step;
while (check_pos < pools_size && check_pos < check_max) {
connect_pool* pool = pools.pools[check_pos++ % pools_size];
pool->refer();
pools_tmp.push_back(pool);
}
lock_.unlock();
for (std::vector<connect_pool*>::iterator it = pools_tmp.begin();
it != pools_tmp.end(); ++it) {
(*it)->keep_conns();
nleft += (*it)->get_count();
(*it)->unrefer();
}
return nleft;
}
size_t connect_manager::check_conns(size_t step, bool check_idle,
bool kick_dead, bool keep_conns, size_t* left /* NULL */)
{
std::vector<connect_pool*> pools_tmp;
size_t nleft = 0, nfreed = 0, pools_size, check_max, check_pos;
unsigned long id = get_id();
lock_.lock();
conns_pools& pools = get_pools_by_id(id);
pools_size = pools.pools.size();
if (pools_size == 0) {
lock_.unlock();
return 0;
}
if (step == 0 || step > pools_size) {
step = pools_size;
}
check_pos = pools.check_next++ % pools_size;
check_max = check_pos + step;
while (check_pos < pools_size && check_pos < check_max) {
connect_pool* pool = pools.pools[check_pos++ % pools_size];
pool->refer();
pools_tmp.push_back(pool);
}
lock_.unlock();
for (std::vector<connect_pool*>::iterator it = pools_tmp.begin();
it != pools_tmp.end(); ++it) {
if (check_idle) {
size_t ret = (*it)->check_idle();
nfreed += ret;
}
if (kick_dead) {
size_t ret = (*it)->check_dead();
nfreed += ret;
}
if (keep_conns) {
(*it)->keep_conns();
}
nleft += (*it)->get_count();
(*it)->unrefer();
}
if (left) {
*left = nleft;
}
return nfreed;
}
void connect_manager::create_pools_for(pools_t& pools)
{
std::map<string, conn_config>::const_iterator cit = addrs_.begin();

View File

@ -21,11 +21,10 @@ connect_monitor::connect_monitor(connect_manager& manager, bool check_server /*
, check_server_(check_server)
, check_inter_(1)
, conn_timeout_(10)
, check_idle_on_(false)
, check_idle_(false)
, kick_dead_(false)
, check_idle_step_(128)
, check_dead_on_(false)
, check_dead_step_(128)
, keep_conns_(false)
, check_step_(128)
, rpc_service_(NULL)
{
}
@ -61,12 +60,13 @@ connect_monitor& connect_monitor::set_conn_timeout(int n)
return *this;
}
connect_monitor& connect_monitor::set_check_idle(bool on,
bool kick_dead, size_t step)
connect_monitor& connect_monitor::set_check_conns(bool check_idle,
bool kick_dead, bool keep_conns, size_t step)
{
check_idle_on_ = on;
kick_dead_ = kick_dead;
check_idle_step_ = step;
check_idle_ = check_idle;
kick_dead_ = kick_dead;
keep_conns_ = keep_conns;
check_step_ = step;
return *this;
}

View File

@ -245,7 +245,8 @@ void connect_pool::bind_one(connect_client* conn)
lock_.unlock();
}
void connect_pool::put(connect_client* conn, bool keep /* = true */)
void connect_pool::put(connect_client* conn, bool keep /* = true */,
cpool_put_oper_t oper /* cpool_put_check_idle */)
{
time_t now = time(NULL);
@ -284,8 +285,15 @@ void connect_pool::put(connect_client* conn, bool keep /* = true */)
if (check_inter_ >= 0 && now - last_check_ >= check_inter_) {
lock_.unlock();
(void) check_idle(false, true);
if (oper & cpool_put_check_idle) {
(void) check_idle(idle_ttl_, true);
}
if (oper & cpool_put_check_dead) {
(void) check_dead();
}
if (oper & cpool_put_keep_conns) {
keep_conns();
}
} else {
lock_.unlock();
}
@ -326,17 +334,12 @@ void connect_pool::set_alive(bool yes /* true | false */)
lock_.unlock();
}
size_t connect_pool::check_idle(bool exclusive /* true */)
{
return check_idle(idle_ttl_, exclusive);
}
size_t connect_pool::check_idle(time_t ttl, bool exclusive /* true */)
{
return check_idle(ttl, false, exclusive);
}
size_t connect_pool::check_idle(bool kick_dead, bool exclusive /* true */)
{
return check_idle(idle_ttl_, kick_dead, exclusive);
}
size_t connect_pool::check_idle(time_t ttl, bool kick_dead, bool exclusive)
{
if (exclusive) {
lock_.lock();
@ -373,20 +376,10 @@ size_t connect_pool::check_idle(time_t ttl, bool kick_dead, bool exclusive)
n += kick_idle_conns(ttl);
}
size_t count = count_;
if (exclusive) {
lock_.unlock();
}
if (kick_dead) {
n += check_dead(count);
}
if (min_ > 0) {
keep_conns();
}
return n;
}
@ -412,9 +405,9 @@ size_t connect_pool::kick_idle_conns(time_t ttl)
}
// If min > 0, try to keep the minimal count of connections.
if (min_ > 0 && count_ <= min_) {
break;
}
//if (min_ > 0 && count_ <= min_) {
// break;
//}
// Decrease connections count only if the connection is mine.
if ((*it)->get_pool() == this) {
@ -537,12 +530,16 @@ void connect_pool::keep_conns()
break;
}
lock_.lock();
count_++;
lock_.unlock();
conn->set_pool(this);
put(conn, true);
lock_.lock();
count_++;
if (max_ > 0 && count_ >= max_) {
lock_.unlock();
break;
}
lock_.unlock();
}
}