add ACL_ATOMIC_CLOCK object for statics of running status

This commit is contained in:
zhengshuxin 2017-07-03 14:40:06 +08:00
parent 23080968f0
commit d6385832d6
6 changed files with 154 additions and 86 deletions

View File

@ -19,6 +19,17 @@ ACL_API void acl_atomic_int64_set(ACL_ATOMIC *self, long long n);
ACL_API long long acl_atomic_int64_fetch_add(ACL_ATOMIC *self, long long n);
ACL_API long long acl_atomic_int64_add_fetch(ACL_ATOMIC *self, long long n);
typedef struct ACL_ATOMIC_CLOCK ACL_ATOMIC_CLOCK;
ACL_API ACL_ATOMIC_CLOCK *acl_atomic_clock_alloc(void);
ACL_API void acl_atomic_clock_free(ACL_ATOMIC_CLOCK *clk);
ACL_API long long acl_atomic_clock_count_add(ACL_ATOMIC_CLOCK *clk, int n);
ACL_API long long acl_atomic_clock_users_add(ACL_ATOMIC_CLOCK *clk, int n);
ACL_API void acl_atomic_clock_users_count_inc(ACL_ATOMIC_CLOCK *clk);
ACL_API long long acl_atomic_clock_count(ACL_ATOMIC_CLOCK *clk);
ACL_API long long acl_atomic_clock_atime(ACL_ATOMIC_CLOCK *clk);
ACL_API long long acl_atomic_clock_users(ACL_ATOMIC_CLOCK *clk);
#ifdef __cplusplus
}
#endif

View File

@ -40,6 +40,7 @@
#include "stdlib/acl_stringops.h"
#include "stdlib/acl_myflock.h"
#include "stdlib/acl_argv.h"
#include "stdlib/acl_atomic.h"
#include "net/acl_sane_socket.h"
#include "net/acl_vstream_net.h"
#include "net/acl_ifconf.h"
@ -154,55 +155,7 @@ static char **__service_argv;
static void *__service_ctx;
static int __daemon_mode = 1;
typedef struct {
long long last;
long long used;
ACL_ATOMIC *time_atomic;
ACL_ATOMIC *used_atomic;
} RUNNING_STATUS;
static RUNNING_STATUS *__status = NULL;
static void running_status_init(void)
{
acl_assert(__status == NULL);
__status = (RUNNING_STATUS *) acl_mymalloc(sizeof(RUNNING_STATUS));
__status->last = (long long) time(NULL);
__status->used = 0;
__status->time_atomic = acl_atomic_new();
__status->used_atomic = acl_atomic_new();
acl_atomic_set(__status->time_atomic, &__status->last);
acl_atomic_set(__status->used_atomic, &__status->used);
}
static void running_status_free(void)
{
acl_assert(__status);
acl_atomic_free(__status->time_atomic);
acl_atomic_free(__status->used_atomic);
acl_myfree(__status);
__status = NULL;
}
static long long status_last(void)
{
acl_assert(__status);
return __status->last;
}
static long long status_increase_used(void)
{
long long n;
acl_assert(__status);
n = acl_atomic_int64_add_fetch(__status->used_atomic, 1);
if (n < 0)
acl_atomic_int64_set(__status->used_atomic, 0);
acl_atomic_int64_set(__status->time_atomic, (long long) time(NULL));
return n;
}
static ACL_ATOMIC_CLOCK *__clock = NULL;
ACL_EVENT *acl_udp_server_event(void)
{
@ -283,10 +236,10 @@ static void udp_server_exit(void)
if (__main_event)
acl_event_free(__main_event);
running_status_free();
acl_atomic_clock_free(__clock);
__clock = NULL;
acl_msg_close();
exit(0);
}
@ -323,7 +276,7 @@ static void udp_server_read(int event_type, ACL_EVENT *event acl_unused,
/* 清除发生在 UDP 套接字上的临时性错误,以免事件引擎报错 */
stream->flag = 0;
status_increase_used();
acl_atomic_clock_count_add(__clock, 1);
}
static void udp_server_init(const char *procname)
@ -581,7 +534,7 @@ static void udp_server_timeout(int type acl_unused,
{
const char *myname = "udp_server_timeout";
time_t now = time(NULL);
long long last = status_last();
long long last = acl_atomic_clock_atime(__clock) / 1000000;
long long time_left = (long long) ((acl_event_cancel_timer(event,
udp_server_timeout, event) + 999999) / 1000000);
@ -631,7 +584,7 @@ static void servers_start(SERVER *servers, int nthreads)
acl_pthread_attr_t attr;
int i;
running_status_init();
__clock = acl_atomic_clock_alloc();
acl_pthread_attr_init(&attr);
acl_pthread_attr_setdetachstate(&attr, ACL_PTHREAD_CREATE_DETACHED);

View File

@ -182,3 +182,95 @@ long long acl_atomic_int64_add_fetch(ACL_ATOMIC *self, long long n)
# endif
#endif
}
/****************************************************************************/
struct ACL_ATOMIC_CLOCK {
long long atime;
long long count;
long long users;
ACL_ATOMIC *atime_atomic;
ACL_ATOMIC *count_atomic;
ACL_ATOMIC *users_atomic;
};
ACL_ATOMIC_CLOCK *acl_atomic_clock_alloc(void)
{
ACL_ATOMIC_CLOCK *clk = (ACL_ATOMIC_CLOCK *)
acl_mycalloc(1, sizeof(ACL_ATOMIC_CLOCK));
struct timeval tv;
gettimeofday(&tv, NULL);
clk->atime = (long long) tv.tv_sec * 1000000 + (long long) tv.tv_usec;
clk->count = 0;
clk->users = 0;
clk->atime_atomic = acl_atomic_new();
clk->count_atomic = acl_atomic_new();
clk->users_atomic = acl_atomic_new();
acl_atomic_set(clk->atime_atomic, &clk->atime);
acl_atomic_set(clk->count_atomic, &clk->count);
acl_atomic_set(clk->users_atomic, &clk->users);
return clk;
}
void acl_atomic_clock_free(ACL_ATOMIC_CLOCK *clk)
{
acl_atomic_free(clk->atime_atomic);
acl_atomic_free(clk->count_atomic);
acl_atomic_free(clk->users_atomic);
acl_myfree(clk);
}
long long acl_atomic_clock_count_add(ACL_ATOMIC_CLOCK *clk, int n)
{
long long now;
struct timeval tv;
gettimeofday(&tv, NULL);
now = (long long) tv.tv_sec * 1000000 + (long long) tv.tv_usec;
acl_atomic_int64_set(clk->atime_atomic, now);
return acl_atomic_int64_add_fetch(clk->count_atomic, n);
}
long long acl_atomic_clock_users_add(ACL_ATOMIC_CLOCK *clk, int n)
{
long long now;
struct timeval tv;
gettimeofday(&tv, NULL);
now = (long long) tv.tv_sec * 1000000 + (long long) tv.tv_usec;
acl_atomic_int64_set(clk->atime_atomic, now);
return acl_atomic_int64_add_fetch(clk->users_atomic, n);
}
void acl_atomic_clock_users_count_inc(ACL_ATOMIC_CLOCK *clk)
{
long long now;
struct timeval tv;
gettimeofday(&tv, NULL);
now = (long long) tv.tv_sec * 1000000 + (long long) tv.tv_usec;
acl_atomic_int64_set(clk->atime_atomic, now);
acl_atomic_int64_add_fetch(clk->count_atomic, 1);
acl_atomic_int64_add_fetch(clk->users_atomic, 1);
}
long long acl_atomic_clock_count(ACL_ATOMIC_CLOCK *clk)
{
return acl_atomic_int64_add_fetch(clk->count_atomic, 0);
}
long long acl_atomic_clock_atime(ACL_ATOMIC_CLOCK *clk)
{
return acl_atomic_int64_add_fetch(clk->atime_atomic, 0);
}
long long acl_atomic_clock_users(ACL_ATOMIC_CLOCK *clk)
{
return acl_atomic_int64_add_fetch(clk->users_atomic, 0);
}

View File

@ -185,6 +185,8 @@ static void fiber_io_loop(ACL_FIBER *self acl_unused, void *ctx)
}
}
#define CHECK_MIN
unsigned int acl_fiber_delay(unsigned int milliseconds)
{
acl_int64 when, now;
@ -218,10 +220,10 @@ unsigned int acl_fiber_delay(unsigned int milliseconds)
#ifdef CHECK_MIN
if ((min >= 0 && min < ev->timeout) || ev->timeout < 0)
ev->timeout = (int) min;
#else
ev->timeout = 10;
#endif
ev->timeout = 1;
fiber = acl_fiber_running();
fiber->when = when;
acl_ring_detach(&fiber->me);
@ -237,6 +239,8 @@ unsigned int acl_fiber_delay(unsigned int milliseconds)
if (acl_ring_size(&__thread_fiber->ev_timer) == 0)
ev->timeout = -1;
else
ev->timeout = min;
SET_TIME(now);
if (now < when)

View File

@ -32,7 +32,7 @@ static ACL_CONFIG_INT_TABLE __conf_int_tab[] = {
{ "fiber_use_limit", 0, &acl_var_fiber_use_limit, 0, 0 },
{ "fiber_idle_limit", 0, &acl_var_fiber_idle_limit, 0 , 0 },
{ "fiber_wait_limit", 0, &acl_var_fiber_wait_limit, 0, 0 },
{ "fiber_threads", 2, &acl_var_fiber_threads, 0, 0 },
{ "fiber_threads", 1, &acl_var_fiber_threads, 0, 0 },
{ 0, 0, 0, 0, 0 },
};
@ -74,10 +74,9 @@ static char *__deny_info = NULL;
static ACL_MASTER_SERVER_LISTEN_FN __server_on_listen = NULL;
static unsigned __server_generation;
static int __server_stopping = 0;
static int __nclients = 0;
static unsigned __nused = 0;
static ACL_ATOMIC_CLOCK *__clock = NULL;
typedef struct FIBER_SERVER {
acl_pthread_t tid;
@ -119,11 +118,12 @@ static void fiber_monitor_master(ACL_FIBER *fiber, void *ctx)
stat_stream->rw_timeout = 0;
ret = acl_vstream_read(stat_stream, buf, sizeof(buf));
acl_msg_info("%s(%d), %s: disconnect(%d) from acl_master, clients %d",
__FILE__, __LINE__, __FUNCTION__, ret, __nclients);
acl_msg_info("%s(%d), %s: disconnect(%d) from acl_master, clients %lld",
__FILE__, __LINE__, __FUNCTION__,
ret, acl_atomic_clock_users(__clock));
while (!acl_var_fiber_quick_abort) {
if (__nclients <= 0) {
if (acl_atomic_clock_users(__clock) <= 0) {
acl_msg_warn("%s(%d), %s: all clients closed!",
__FILE__, __LINE__, __FUNCTION__);
break;
@ -131,13 +131,17 @@ static void fiber_monitor_master(ACL_FIBER *fiber, void *ctx)
acl_fiber_sleep(1);
n++;
if (acl_var_fiber_wait_limit > 0 && n >= acl_var_fiber_wait_limit) {
acl_msg_warn("%s(%d), %s: too long, clients: %d",
__FILE__, __LINE__, __FUNCTION__, __nclients);
if (acl_var_fiber_wait_limit > 0 &&
n >= acl_var_fiber_wait_limit) {
acl_msg_warn("%s(%d), %s: too long, clients: %lld",
__FILE__, __LINE__, __FUNCTION__,
acl_atomic_clock_users(__clock));
break;
}
acl_msg_info("%s(%d), %s: waiting %d, clients %d",
__FILE__, __LINE__, __FUNCTION__, n, __nclients);
acl_msg_info("%s(%d), %s: waiting %d, clients %lld",
__FILE__, __LINE__, __FUNCTION__,
n, acl_atomic_clock_users(__clock));
}
server_exit(fiber, 0);
@ -153,12 +157,14 @@ static void fiber_monitor_used(ACL_FIBER *fiber, void *ctx acl_unused)
}
while (!__server_stopping) {
if (__nclients > 0) {
if (acl_atomic_clock_users(__clock) > 0) {
acl_fiber_sleep(1);
continue;
}
if (__nused >= (unsigned) acl_var_fiber_use_limit) {
if (acl_atomic_clock_count(__clock) >=
(unsigned) acl_var_fiber_use_limit) {
acl_msg_info("%s(%d), %s: use_limit reached %d",
__FILE__, __LINE__, __FUNCTION__,
acl_var_fiber_use_limit);
@ -175,16 +181,18 @@ static void fiber_monitor_idle(ACL_FIBER *fiber, void *ctx acl_unused)
time_t last = time(NULL);
while (!__server_stopping) {
if (__nclients > 0) {
if (acl_atomic_clock_users(__clock) > 0) {
acl_fiber_sleep(1);
time(&last);
continue;
}
if (time(NULL) - last >= acl_var_fiber_idle_limit) {
acl_msg_info("%s(%d), %s: idle_limit reached %d",
__FILE__, __LINE__, __FUNCTION__,
acl_var_fiber_idle_limit);
acl_msg_info("%s(%d), %s: idle_limit reached %d,"
"users %lld, used %lld", __FILE__, __LINE__,
__FUNCTION__, acl_var_fiber_idle_limit,
acl_atomic_clock_users(__clock),
acl_atomic_clock_count(__clock));
server_stop(fiber);
break;
}
@ -215,13 +223,9 @@ static void fiber_client(ACL_FIBER *fiber acl_unused, void *ctx)
return;
}
__nclients++;
__nused++;
acl_atomic_clock_users_count_inc(__clock);
__service(cstream, ctx);
__nclients--;
acl_atomic_clock_users_add(__clock, -1);
acl_vstream_close(cstream);
}
@ -258,10 +262,11 @@ static int dispatch_report(ACL_VSTREAM *conn)
{
char buf[256];
snprintf(buf, sizeof(buf), "count=%d&used=%u&pid=%u&type=%s"
snprintf(buf, sizeof(buf), "count=%lld&used=%llu&pid=%u&type=%s"
"&max_threads=%d&curr_threads=%d&busy_threads=%d&qlen=0\r\n",
__nclients, __nused, (unsigned) getpid(),
acl_var_fiber_dispatch_type, 1, 1, 1);
acl_atomic_clock_users(__clock),
(unsigned long long) acl_atomic_clock_count(__clock),
(unsigned) getpid(), acl_var_fiber_dispatch_type, 1, 1, 1);
if (acl_vstream_writen(conn, buf, strlen(buf)) == ACL_VSTREAM_EOF) {
acl_msg_warn("%s(%d), %s: write to master_dispatch(%s) failed",
@ -466,7 +471,7 @@ static void *thread_main(void *ctx)
static void fiber_sleep(ACL_FIBER *fiber acl_unused, void *ctx acl_unused)
{
while (1)
sleep(1);
acl_fiber_sleep(1);
}
static void main_thread_loop(void)
@ -502,6 +507,7 @@ static void servers_start(FIBER_SERVER *servers, int nthreads)
acl_pthread_attr_t attr;
int i;
__clock = acl_atomic_clock_alloc();
acl_pthread_attr_init(&attr);
acl_pthread_attr_setdetachstate(&attr, ACL_PTHREAD_CREATE_DETACHED);

View File

@ -53,9 +53,9 @@ service master_fiber {
# 是否允许产生 core 文件
# fiber_enable_core = 1
# 每个进程实例处理连接数的最大次数超过此值后进程实例主动退出
fiber_use_limit = 0
fiber_use_limit = 1000
# 每个进程实例的空闲超时时间超过此值后进程实例主动退出
fiber_idle_limit = 0
fiber_idle_limit = 10
# 进程运行时所在的路径
fiber_queue_dir = {install_path}/var
# 读写超时时间, 单位为秒
@ -64,6 +64,8 @@ service master_fiber {
fiber_buf_size = 8192
# 进程运行时的用户身份
fiber_owner = root
# 同时启动的线程数
fiber_threads = 2
# 当启用 master_dispatch 连接分开服务后该配置指定 master_dispatch 所监听的
# 域套接口的全路径这样本子进程就可以从 master_dispatch 获得客户端连接