connect_manager: support fiber running in threads

This commit is contained in:
zsx 2018-12-16 16:47:51 +08:00
parent d9bdd8aac6
commit bd6375fe2a
8 changed files with 91 additions and 34 deletions

View File

@ -24,7 +24,7 @@
#include "init.h"
static char *version = "3.4.1-36 20181206";
static char *version = "3.4.1-37 20181216";
const char *acl_version(void)
{

View File

@ -1,5 +1,9 @@
修改历史列表:
556) 2018.12.16
556.1) feature: connect_manager 连接池管理器可以更好地支持协程的多线程模式,
使用者仅需调用 connect_manager::bind_thread(true) 即可
555) 2018.12.12
555.1) bugfix: redis_stream::get_one_message 存在一处数据提取的错误

View File

@ -215,14 +215,18 @@ void connect_manager::thread_onexit(void* ctx)
}
delete mit->second;
manager->manager_.erase(mit);
//printf("thread id=%lu, %lu exit\r\n", id, pthread_self());
}
static acl_pthread_key_t once_key;
void connect_manager::thread_oninit(void)
{
if (acl_pthread_key_create(&once_key, thread_onexit) != 0) {
logger_fatal("pthread_key_create failed");
int ret = acl_pthread_key_create(&once_key, thread_onexit);
if (ret != 0) {
char buf[256];
logger_fatal("pthread_key_create error=%s",
acl_strerror(ret, buf, sizeof(buf)));
}
}
@ -231,18 +235,26 @@ static acl_pthread_once_t once_control = ACL_PTHREAD_ONCE_INIT;
conns_pools& connect_manager::get_pools_by_id(unsigned long id)
{
manager_it mit = manager_.find(id);
if (mit == manager_.end()) {
if (acl_pthread_once(&once_control, thread_oninit) != 0) {
logger_fatal("pthread_once failed");
}
acl_pthread_setspecific(once_key, this);
conns_pools* pools = new conns_pools;
manager_[id] = pools;
return *pools;
} else {
if (mit != manager_.end()) {
return *mit->second;
}
conns_pools* pools = new conns_pools;
manager_[id] = pools;
//printf("thread id=%lu create pools, %lu\r\n", id, pthread_self());
if (id == DEFAULT_ID) {
return *pools;
}
int ret = acl_pthread_once(&once_control, thread_oninit);
if (ret != 0) {
char buf[256];
logger_fatal("pthread_once error=%s",
acl_strerror(ret, buf, sizeof(buf)));
}
acl_pthread_setspecific(once_key, this);
return *pools;
}
void connect_manager::remove(pools_t& pools, const char* addr)

View File

@ -21,7 +21,7 @@ CFLAGS = -c -g -W \
-DUSE_JMP \
-Wmissing-prototypes \
-Wcast-qual \
#-DUSE_VALGRIND \
-DUSE_VALGRIND \
#-I/usr/local/include
#-DUSE_PRINTF_MACRO
#-Wno-clobbered

View File

@ -7,6 +7,7 @@ static void usage(const char *procname)
{
printf("usage: %s -h [help]\r\n"
" -s redis_addr\r\n"
" -g [if use global cluster, default: false]\r\n"
" -p passwd\r\n"
" -n operation_count\r\n"
" -c fibers count\r\n"
@ -20,8 +21,9 @@ int main(int argc, char *argv[])
int ch, nthreads = 1, fibers_max = 100, oper_count = 100;
acl::string addr("127.0.0.1:6379"), passwd;
int conn_timeout = 2, rw_timeout = 2;
bool use_global_cluster = false;
while ((ch = getopt(argc, argv, "hs:n:c:r:t:m:p:")) > 0)
while ((ch = getopt(argc, argv, "hs:n:c:r:t:m:p:g")) > 0)
{
switch (ch) {
case 'h':
@ -48,22 +50,37 @@ int main(int argc, char *argv[])
case 'p':
passwd = optarg;
break;
case 'g':
use_global_cluster = true;
break;
default:
break;
}
}
acl::acl_cpp_init();
acl::log::stdout_open(true);
acl_fiber_msg_stdout_enable(1);
std::vector<acl::thread*> threads;
int stack_size = STACK_SIZE;
acl::redis_client_cluster cluster;
cluster.bind_thread(true);
cluster.set(addr, 0, conn_timeout, rw_timeout);
cluster.set_password("default", passwd);
for (int i = 0; i < nthreads; i++)
{
redis_thread* thread = new redis_thread(addr, passwd,
conn_timeout, rw_timeout, fibers_max,
stack_size, oper_count);
redis_thread* thread;
if (use_global_cluster) {
thread = new redis_thread(cluster,
fibers_max, stack_size, oper_count);
} else {
thread = new redis_thread(addr, passwd,
conn_timeout, rw_timeout, fibers_max,
stack_size, oper_count);
}
thread->set_detachable(false);
thread->set_stacksize(stack_size * (fibers_max + 6400));
threads.push_back(thread);

View File

@ -109,23 +109,41 @@ void redis_thread::fiber_redis(ACL_FIBER *fiber, void *ctx)
redis_thread::redis_thread(const char* addr, const char* passwd,
int conn_timeout, int rw_timeout, int fibers_max, int stack_size,
int oper_count)
: addr_(addr)
, passwd_(passwd)
, conn_timeout_(conn_timeout)
, rw_timeout_(rw_timeout)
, fibers_max_(fibers_max)
, fibers_cnt_(fibers_max)
, stack_size_(stack_size)
, oper_count_(oper_count)
: addr_(addr)
, passwd_(passwd)
, conn_timeout_(conn_timeout)
, rw_timeout_(rw_timeout)
, fibers_max_(fibers_max)
, fibers_cnt_(fibers_max)
, stack_size_(stack_size)
, oper_count_(oper_count)
{
printf("addr: %s\r\n", addr_.c_str());
cluster_internal_ = new acl::redis_client_cluster;
cluster_ = cluster_internal_;
cluster_->set(addr_.c_str(), 0, conn_timeout_, rw_timeout_);
cluster_->set_password("default", passwd_);
}
redis_thread::redis_thread(acl::redis_client_cluster& cluster,
int fibers_max, int stack_size, int oper_count)
: fibers_max_(fibers_max)
, fibers_cnt_(fibers_max)
, stack_size_(stack_size)
, oper_count_(oper_count)
, cluster_(&cluster)
, cluster_internal_(NULL)
{
}
redis_thread::~redis_thread(void)
{
delete cluster_internal_;
}
void* redis_thread::run(void)
{
printf("addr: %s\r\n", addr_.c_str());
cluster_.set(addr_.c_str(), 0, conn_timeout_, rw_timeout_);
cluster_.set_password("default", passwd_);
gettimeofday(&begin_, NULL);
for (int i = 0; i < fibers_max_; i++)

View File

@ -3,13 +3,15 @@ class redis_thread : public acl::thread
public:
redis_thread(const char* addr, const char* passwd, int conn_timeout,
int rw_timeout, int fibers_max, int stack_size, int oper_count);
redis_thread(acl::redis_client_cluster& cluster,
int fibers_max, int stack_size, int oper_count);
~redis_thread(void) {}
~redis_thread(void);
private:
acl::redis_client_cluster& get_cluster(void)
{
return cluster_;
return *cluster_;
}
int get_fibers_max(void) const
@ -45,7 +47,8 @@ private:
int stack_size_;
int oper_count_;
struct timeval begin_;
acl::redis_client_cluster cluster_;
acl::redis_client_cluster* cluster_;
acl::redis_client_cluster* cluster_internal_;
static void fiber_redis(ACL_FIBER *fiber, void *ctx);
};

View File

@ -1,4 +1,4 @@
%define release_id 36
%define release_id 37
Summary: The powerful c/c++ library and server framework
Name: acl-libs
@ -133,6 +133,9 @@ fi
%changelog
* Sun Dec 16 2018 zhengshuxin@qiyi.com 3.4.1-37-20181216.16
- connect_manager: support fiber in thread mode
* Thu Dec 06 2018 zhengshuxin@qiyi.com 3.4.1-36-20181206.15
- acl_threads_server.c: fixed one crashed bug in client_wakeup
- fiber_tbox.hpp: make push more safety