make connections pool module supporting different network envirenment

This commit is contained in:
ubuntu14 2015-11-13 08:32:27 -08:00
parent e8548986f6
commit 9aad31fade
24 changed files with 74 additions and 105 deletions

View File

@ -17,8 +17,8 @@ CFLAGS = -c -g -W \
-D_POSIX_PTHREAD_SEMANTICS \ -D_POSIX_PTHREAD_SEMANTICS \
-Wno-long-long \ -Wno-long-long \
-Wformat \ -Wformat \
-DHAS_MYSQL_DLL -DHAS_MYSQL_DLL \
#-DHAS_POLARSSL -DHAS_POLARSSL
#-DUSE_DYNAMIC #-DUSE_DYNAMIC
# -Wcast-align # -Wcast-align
#-Wcast-qual #-Wcast-qual

View File

@ -2,7 +2,9 @@
------------------------------------------------------------------------ ------------------------------------------------------------------------
375) 2015.11.13 375) 2015.11.13
375.1) feature: 连接池模块增加了 set_timeout 方法,更加方便添加连接池的超时时间 375.1) feature: 连接池模块简化接口设计,允许连接集群管理器在添加新的连接池时设置超时时间,
从而使连接池集群管理器针对不同的服务器地址采用不同的超时时间,这样可以使连接池
模块适应于异构网络环境中
374) 2015.11.12 374) 2015.11.12
374.1) feature: http_request_manager 的构造函数增加了连接 http 服务器的超时时间 374.1) feature: http_request_manager 的构造函数增加了连接 http 服务器的超时时间

View File

@ -18,21 +18,9 @@ class connect_monitor;
class ACL_CPP_API connect_manager class ACL_CPP_API connect_manager
{ {
public: public:
/** connect_manager();
*
* @param conn_timeout {int} ()
* @param rw_timeout {int} IO ()
*/
connect_manager(int conn_timeout = 30, int rw_timeout = 30);
virtual ~connect_manager(); virtual ~connect_manager();
/**
* IO
* @param conn_timeout {int} ()
* @param rw_timeout {int} IO ()
*/
void set_timeout(int conn_timeout, int rw_timeout);
/** /**
* set * set
* @param default_addr {const char*} * @param default_addr {const char*}
@ -41,21 +29,26 @@ public:
* : IP:PORT:COUNT;IP:PORT:COUNT;IP:PORT;IP:PORT ... * : IP:PORT:COUNT;IP:PORT:COUNT;IP:PORT;IP:PORT ...
* IP:PORT:COUNT,IP:PORT:COUNT,IP:PORT;IP:PORT ... * IP:PORT:COUNT,IP:PORT:COUNT,IP:PORT;IP:PORT ...
* 127.0.0.1:7777:50;192.168.1.1:7777:10;127.0.0.1:7778 * 127.0.0.1:7777:50;192.168.1.1:7777:10;127.0.0.1:7778
* @param default_count {size_t} addr_list * @param count {size_t} addr_list
* COUNT 便 0 * COUNT 便 0
* @param conn_timeout {int} ()
* @param rw_timeout {int} IO ()
* default_addr addr_list * default_addr addr_list
*/ */
void init(const char* default_addr, const char* addr_list, void init(const char* default_addr, const char* addr_list,
size_t default_count); size_t count, int conn_timeout = 30, int rw_timeout = 30);
/** /**
* *
* @param addr {const char*} (ip:port) * @param addr {const char*} (ip:port)
* @param count {size_t} , 0 * @param count {size_t} , 0
* *
* @param conn_timeout {int} ()
* @param rw_timeout {int} IO ()
* @return {connect_pool&} * @return {connect_pool&}
*/ */
connect_pool& set(const char* addr, size_t count); connect_pool& set(const char* addr, size_t count,
int conn_timeout = 30, int rw_timeout = 30);
/** /**
* *
@ -185,8 +178,6 @@ protected:
protected: protected:
string default_addr_; // 缺省的服务地址 string default_addr_; // 缺省的服务地址
int conn_timeout_; // 连接服务器超时时间(秒)
int rw_timeout_; // 与服务器的 IO 超时时间(秒)
connect_pool* default_pool_; // 缺省的服务连接池 connect_pool* default_pool_; // 缺省的服务连接池
std::vector<connect_pool*> pools_; // 所有的服务连接池 std::vector<connect_pool*> pools_; // 所有的服务连接池
size_t service_idx_; // 下一个要访问的的下标值 size_t service_idx_; // 下一个要访问的的下标值
@ -196,7 +187,8 @@ protected:
connect_monitor* monitor_; // 后台检测线程句柄 connect_monitor* monitor_; // 后台检测线程句柄
// 设置除缺省服务之外的服务器集群 // 设置除缺省服务之外的服务器集群
void set_service_list(const char* addr_list, int count); void set_service_list(const char* addr_list, int count,
int conn_timeout, int rw_timeout);
}; };
} // namespace acl } // namespace acl

View File

@ -11,12 +11,7 @@ namespace acl
class ACL_CPP_API http_request_manager : public acl::connect_manager class ACL_CPP_API http_request_manager : public acl::connect_manager
{ {
public: public:
/** http_request_manager();
*
* @param conn_timeout {int} ()
* @param rw_timeout {int} IO ()
*/
http_request_manager(int conn_timeout = 30, int rw_timeout = 30);
virtual ~http_request_manager(); virtual ~http_request_manager();
protected: protected:

View File

@ -23,14 +23,9 @@ public:
/** /**
* ; * ;
* constructor * constructor
* @param conn_timeout {int} ();
* timeout in seconds for connecting the redis-server
* @param rw_timeout {int}  IO ();
* read/write timeout in seconds from/to the redis-server
* @param max_slot {int} ; the max hash-slot value of keys * @param max_slot {int} ; the max hash-slot value of keys
*/ */
redis_client_cluster(int conn_timeout = 30, int rw_timeout = 30, redis_client_cluster(int max_slot = 16384);
int max_slot = 16384);
virtual ~redis_client_cluster(); virtual ~redis_client_cluster();
/** /**

View File

@ -21,18 +21,19 @@ static void sleep_while(int n)
// 初始化过程 // 初始化过程
static void init(const char* addrs, int count) static void init(const char* addrs, int count)
{ {
int cocurrent = 100, conn_timeout = 100, rw_timeout = 200;
// 创建 HTTP 请求连接池集群管理对象 // 创建 HTTP 请求连接池集群管理对象
__conn_manager = new http_request_manager(); __conn_manager = new http_request_manager();
__conn_manager->set_timeout(100, 120);
// 添加服务器集群地址 // 添加服务器集群地址
__conn_manager->init(addrs, addrs, 100); __conn_manager->init(addrs, addrs, cocurrent, conn_timeout, rw_timeout);
printf(">>>start monitor thread\r\n"); printf(">>>start monitor thread\r\n");
int check_inter = 1, conn_timeout = 5; int check_inter = 1, check_conn_timeout = 5;
acl::connect_monitor* monitor = new acl::connect_monitor(*__conn_manager); acl::connect_monitor* monitor = new acl::connect_monitor(*__conn_manager);
(*monitor).set_check_inter(check_inter).set_conn_timeout(conn_timeout); (*monitor).set_check_inter(check_inter).set_conn_timeout(check_conn_timeout);
// 启动后台检测线程 // 启动后台检测线程
__conn_manager->start_monitor(monitor); __conn_manager->start_monitor(monitor);

View File

@ -88,8 +88,8 @@ int main(void)
int conn_timeout = 10, rw_timeout = 10, max_conns = 100; int conn_timeout = 10, rw_timeout = 10, max_conns = 100;
// declare redis cluster ojbect // declare redis cluster ojbect
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
cluster.set(redis_addr, max_conns); cluster.set(redis_addr, max_conns, conn_timeout, rw_timeout);
// redis operation command // redis operation command
acl::redis_string cmd_string; acl::redis_string cmd_string;
@ -156,8 +156,8 @@ int main(void)
int conn_timeout = 10, rw_timeout = 10, max_conns = 100; int conn_timeout = 10, rw_timeout = 10, max_conns = 100;
// declare redis cluster ojbect // declare redis cluster ojbect
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
cluster.set(redis_addr, max_conns); cluster.set(redis_addr, max_conns, conn_timeout, rw_timeout);
// redis operation command // redis operation command
acl::redis cmd; acl::redis cmd;
@ -204,8 +204,8 @@ int main(void)
int conn_timeout = 10, rw_timeout = 10; int conn_timeout = 10, rw_timeout = 10;
// declare redis cluster ojbect // declare redis cluster ojbect
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
cluster.set(redis_addr, __max_conns); cluster.set(redis_addr, __max_conns, conn_timeout, rw_timeout);
pthread_attr_t attr; pthread_attr_t attr;
pthread_attr_init(&attr); pthread_attr_init(&attr);

View File

@ -292,7 +292,7 @@ int main(int argc, char* argv[])
acl::acl_cpp_init(); acl::acl_cpp_init();
acl::log::stdout_open(true); acl::log::stdout_open(true);
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
// 当某个连接池结点出问题,设置探测该连接结点是否恢复的时间间隔(秒),当该值 // 当某个连接池结点出问题,设置探测该连接结点是否恢复的时间间隔(秒),当该值
// 为 0 时,则不检测 // 为 0 时,则不检测
@ -304,7 +304,7 @@ int main(int argc, char* argv[])
// 当重定向次数 >= 2 时每次再重定向此函数设置休息的时间(毫秒) // 当重定向次数 >= 2 时每次再重定向此函数设置休息的时间(毫秒)
cluster.set_redirect_sleep(nsleep); cluster.set_redirect_sleep(nsleep);
cluster.init(NULL, addrs.c_str(), max_threads); cluster.init(NULL, addrs.c_str(), max_threads, conn_timeout, rw_timeout);
// 设置连接 redis 集群的密码,第一个参数为一个 redis 服务节点的服务地址, // 设置连接 redis 集群的密码,第一个参数为一个 redis 服务节点的服务地址,
// 当第一个参数值为 default 时,则设置了所有节点的统一连接密码 // 当第一个参数值为 default 时,则设置了所有节点的统一连接密码

View File

@ -360,7 +360,7 @@ int main(int argc, char* argv[])
acl::acl_cpp_init(); acl::acl_cpp_init();
acl::log::stdout_open(true); acl::log::stdout_open(true);
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
// 当某个连接池结点出问题,设置探测该连接结点是否恢复的时间间隔(秒),当该值 // 当某个连接池结点出问题,设置探测该连接结点是否恢复的时间间隔(秒),当该值
// 为 0 时,则不检测 // 为 0 时,则不检测
@ -372,7 +372,7 @@ int main(int argc, char* argv[])
// 当重定向次数 >= 2 时每次再重定向此函数设置休息的时间(毫秒) // 当重定向次数 >= 2 时每次再重定向此函数设置休息的时间(毫秒)
cluster.set_redirect_sleep(nsleep); cluster.set_redirect_sleep(nsleep);
cluster.init(NULL, addrs.c_str(), max_threads); cluster.init(NULL, addrs.c_str(), max_threads, conn_timeout, rw_timeout);
// 是否需要将所有哈希槽的对应关系提前设置好,这样可以去掉运行时动态添加 // 是否需要将所有哈希槽的对应关系提前设置好,这样可以去掉运行时动态添加
// 哈希槽的过程,从而可以提高运行时的效率 // 哈希槽的过程,从而可以提高运行时的效率

View File

@ -100,7 +100,7 @@ static bool test_info(acl::redis_cluster& redis)
static bool preset_all(const char* addr) static bool preset_all(const char* addr)
{ {
int max_slot = 16384; int max_slot = 16384;
acl::redis_client_cluster cluster(10, 10, max_slot); acl::redis_client_cluster cluster(max_slot);
cluster.set_all_slot(addr, 100); cluster.set_all_slot(addr, 100);
for (int i = 0; i < max_slot; i++) for (int i = 0; i < max_slot; i++)

View File

@ -254,8 +254,8 @@ int main(int argc, char* argv[])
acl::acl_cpp_init(); acl::acl_cpp_init();
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
cluster.set(addr.c_str(), 100); cluster.set(addr.c_str(), 100, conn_timeout, rw_timeout);
acl::redis redis; acl::redis redis;
redis.set_cluster(&cluster, 100); redis.set_cluster(&cluster, 100);

View File

@ -413,8 +413,8 @@ int main(int argc, char* argv[])
acl::acl_cpp_init(); acl::acl_cpp_init();
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
cluster.set(addr.c_str(), 100); cluster.set(addr.c_str(), 100, conn_timeout, rw_timeout);
acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout); acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout);
client.set_slice_request(slice_req); client.set_slice_request(slice_req);

View File

@ -143,8 +143,8 @@ int main(int argc, char* argv[])
acl::acl_cpp_init(); acl::acl_cpp_init();
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
cluster.set(addr.c_str(), 100); cluster.set(addr.c_str(), 100, conn_timeout, rw_timeout);
acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout); acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout);
client.set_slice_request(slice_req); client.set_slice_request(slice_req);

View File

@ -157,8 +157,8 @@ int main(int argc, char* argv[])
acl::acl_cpp_init(); acl::acl_cpp_init();
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
cluster.set(addr.c_str(), 100); cluster.set(addr.c_str(), 100, conn_timeout, rw_timeout);
acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout); acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout);

View File

@ -746,8 +746,8 @@ int main(int argc, char* argv[])
acl::acl_cpp_init(); acl::acl_cpp_init();
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
cluster.set(addr.c_str(), 100); cluster.set(addr.c_str(), 100, conn_timeout, rw_timeout);
acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout); acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout);

View File

@ -217,8 +217,8 @@ int main(int argc, char* argv[])
acl::acl_cpp_init(); acl::acl_cpp_init();
acl::redis_client_cluster manager(conn_timeout, rw_timeout); acl::redis_client_cluster manager;
manager.set(addr.c_str(), max_threads); manager.set(addr.c_str(), max_threads, conn_timeout, rw_timeout);
std::vector<test_thread*> threads; std::vector<test_thread*> threads;
for (int i = 0; i < max_threads; i++) for (int i = 0; i < max_threads; i++)

View File

@ -163,8 +163,8 @@ int main(int argc, char* argv[])
acl::acl_cpp_init(); acl::acl_cpp_init();
acl::log::stdout_open(true); acl::log::stdout_open(true);
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
cluster.set(addr.c_str(), 100); cluster.set(addr.c_str(), 100, conn_timeout, rw_timeout);
acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout); acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout);

View File

@ -444,8 +444,8 @@ int main(int argc, char* argv[])
acl::acl_cpp_init(); acl::acl_cpp_init();
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
cluster.set(addr.c_str(), 100); cluster.set(addr.c_str(), 100, conn_timeout, rw_timeout);
acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout); acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout);

View File

@ -699,8 +699,8 @@ int main(int argc, char* argv[])
acl::acl_cpp_init(); acl::acl_cpp_init();
acl::log::stdout_open(true); acl::log::stdout_open(true);
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
cluster.set(addr.c_str(), 100); cluster.set(addr.c_str(), 100, conn_timeout, rw_timeout);
acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout); acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout);

View File

@ -586,8 +586,8 @@ int main(int argc, char* argv[])
acl::acl_cpp_init(); acl::acl_cpp_init();
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
cluster.set(addr.c_str(), 100); cluster.set(addr.c_str(), 100, conn_timeout, rw_timeout);
acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout); acl::redis_client client(addr.c_str(), conn_timeout, rw_timeout);

View File

@ -91,8 +91,8 @@ private:
bool test_redis_session(const char* addr, int n, int max_threads) bool test_redis_session(const char* addr, int n, int max_threads)
{ {
int conn_timeout = 10, rw_timeout = 10; int conn_timeout = 10, rw_timeout = 10;
acl::redis_client_cluster cluster(conn_timeout, rw_timeout); acl::redis_client_cluster cluster;
cluster.set(addr, max_threads); cluster.set(addr, max_threads, conn_timeout, rw_timeout);
std::vector<test_thread*> threads; std::vector<test_thread*> threads;
for (int i = 0; i < max_threads; i++) for (int i = 0; i < max_threads; i++)
@ -146,8 +146,8 @@ bool test_redis_session_attrs(const char* addr, int n)
attrs[name] = value; attrs[name] = value;
} }
acl::redis_client_cluster cluster(10, 10); acl::redis_client_cluster cluster;
cluster.set(addr, 1); cluster.set(addr, 1, 10, 10);
acl::redis_session sess(cluster, 1); acl::redis_session sess(cluster, 1);
if (sess.set_attrs(attrs) == false) if (sess.set_attrs(attrs) == false)

View File

@ -9,10 +9,8 @@
namespace acl namespace acl
{ {
connect_manager::connect_manager(int conn_timeout, int rw_timeout) connect_manager::connect_manager()
: conn_timeout_(conn_timeout) : default_pool_(NULL)
, rw_timeout_(rw_timeout)
, default_pool_(NULL)
, service_idx_(0) , service_idx_(0)
, stat_inter_(1) , stat_inter_(1)
, retry_inter_(1) , retry_inter_(1)
@ -33,18 +31,6 @@ connect_manager::~connect_manager()
lock_.unlock(); lock_.unlock();
} }
void connect_manager::set_timeout(int conn_timeout, int rw_timeout)
{
conn_timeout_ = conn_timeout;
rw_timeout_ = rw_timeout;
for (std::vector<connect_pool*>::iterator it = pools_.begin();
it != pools_.end(); ++it)
{
(*it)->set_timeout(conn_timeout, rw_timeout);
}
}
// 分析一个服务器地址格式IP:PORT[:MAX_CONN] // 分析一个服务器地址格式IP:PORT[:MAX_CONN]
// 返回值 < 0 表示非法的地址 // 返回值 < 0 表示非法的地址
static int check_addr(const char* addr, string& buf, size_t default_count) static int check_addr(const char* addr, string& buf, size_t default_count)
@ -93,11 +79,11 @@ void connect_manager::set_retry_inter(int n)
lock_.unlock(); lock_.unlock();
} }
void connect_manager::init(const char* default_addr, void connect_manager::init(const char* default_addr, const char* addr_list,
const char* addr_list, size_t count) size_t count, int conn_timeout /* = 30 */, int rw_timeout /* = 30 */)
{ {
if (addr_list != NULL && *addr_list != 0) if (addr_list != NULL && *addr_list != 0)
set_service_list(addr_list, count); set_service_list(addr_list, count, conn_timeout, rw_timeout);
// 创建缺省服务连接池对象,该对象一同放入总的连接池集群中 // 创建缺省服务连接池对象,该对象一同放入总的连接池集群中
if (default_addr != NULL && *default_addr != 0) if (default_addr != NULL && *default_addr != 0)
@ -107,8 +93,8 @@ void connect_manager::init(const char* default_addr,
if (max < 0) if (max < 0)
logger("no default connection set"); logger("no default connection set");
else else
default_pool_ = &set(default_addr_.c_str(), default_pool_ = &set(default_addr_.c_str(), max,
(size_t) max); conn_timeout, rw_timeout);
} }
else else
logger("no default connection set"); logger("no default connection set");
@ -118,7 +104,8 @@ void connect_manager::init(const char* default_addr,
logger_fatal("no connection available!"); logger_fatal("no connection available!");
} }
void connect_manager::set_service_list(const char* addr_list, int count) void connect_manager::set_service_list(const char* addr_list, int count,
int conn_timeout, int rw_timeout)
{ {
if (addr_list == NULL || *addr_list == 0) if (addr_list == NULL || *addr_list == 0)
{ {
@ -141,7 +128,7 @@ void connect_manager::set_service_list(const char* addr_list, int count)
logger_error("invalid server addr: %s", addr.c_str()); logger_error("invalid server addr: %s", addr.c_str());
continue; continue;
} }
(void) set(addr.c_str(), max); (void) set(addr.c_str(), max, conn_timeout, rw_timeout);
logger("add one service: %s, max connect: %d", logger("add one service: %s, max connect: %d",
addr.c_str(), max); addr.c_str(), max);
} }
@ -149,7 +136,8 @@ void connect_manager::set_service_list(const char* addr_list, int count)
acl_myfree(buf); acl_myfree(buf);
} }
connect_pool& connect_manager::set(const char* addr, size_t count) connect_pool& connect_manager::set(const char* addr, size_t count,
int conn_timeout /* = 30 */, int rw_timeout /* = 30 */)
{ {
char key[256]; char key[256];
ACL_SAFE_STRNCPY(key, addr, sizeof(key)); ACL_SAFE_STRNCPY(key, addr, sizeof(key));
@ -169,7 +157,7 @@ connect_pool& connect_manager::set(const char* addr, size_t count)
connect_pool* pool = create_pool(key, count, pools_.size() - 1); connect_pool* pool = create_pool(key, count, pools_.size() - 1);
pool->set_retry_inter(retry_inter_); pool->set_retry_inter(retry_inter_);
pool->set_timeout(conn_timeout_, rw_timeout_); pool->set_timeout(conn_timeout, rw_timeout);
pools_.push_back(pool); pools_.push_back(pool);
lock_.unlock(); lock_.unlock();

View File

@ -5,9 +5,7 @@
namespace acl namespace acl
{ {
http_request_manager::http_request_manager( http_request_manager::http_request_manager()
int conn_timeout /* = 30 */, int rw_timeout /* = 30 */)
: connect_manager(conn_timeout, rw_timeout)
{ {
} }

View File

@ -10,10 +10,8 @@
namespace acl namespace acl
{ {
redis_client_cluster::redis_client_cluster(int conn_timeout /* = 30 */, redis_client_cluster::redis_client_cluster(int max_slot /* = 16384 */)
int rw_timeout /* = 30 */, int max_slot /* = 16384 */) : max_slot_(max_slot)
: connect_manager(conn_timeout, rw_timeout)
, max_slot_(max_slot)
, redirect_max_(15) , redirect_max_(15)
, redirect_sleep_(100) , redirect_sleep_(100)
{ {