Optimize sync_timer.c by using the global timer.

This commit is contained in:
zhengshuxin 2022-12-28 00:01:44 +08:00
parent d2ac79798a
commit eae6be59ed
8 changed files with 103 additions and 29 deletions

View File

@ -273,8 +273,9 @@ FIBER_API ACL_FIBER* acl_fiber_create_timer(unsigned int milliseconds,
* Reset the timer milliseconds time before the timer fiber wakeup
* @param timer {ACL_FIBER*} the fiber created by acl_fiber_create_timer
* @param milliseconds {unsigned int} the new timer wakeup milliseconds
* @return {int} return 0 if rest timer success, else return -1 if failed
*/
FIBER_API void acl_fiber_reset_timer(ACL_FIBER* timer, unsigned int milliseconds);
FIBER_API int acl_fiber_reset_timer(ACL_FIBER* timer, unsigned int milliseconds);
/**
* Set the DNS service addr

View File

@ -80,15 +80,18 @@ void timer_cache_add(TIMER_CACHE *cache, long long expire, RING *entry)
ring_append(&node->ring, entry);
}
void timer_cache_remove(TIMER_CACHE *cache, long long expire, RING *entry)
int timer_cache_remove(TIMER_CACHE *cache, long long expire, RING *entry)
{
TIMER_CACHE_NODE n, *node;
n.expire = expire;
node = avl_find(&cache->tree, &n, NULL);
if (node == NULL) {
msg_error("not found expire=%lld", expire);
return;
return 0;
}
if (entry->parent != &node->ring) {
return 0;
}
ring_detach(entry);
@ -96,6 +99,7 @@ void timer_cache_remove(TIMER_CACHE *cache, long long expire, RING *entry)
if (ring_size(&node->ring) == 0) {
timer_cache_free_node(cache, node);
}
return 1;
}
void timer_cache_free_node(TIMER_CACHE *cache, TIMER_CACHE_NODE *node)
@ -113,7 +117,7 @@ void timer_cache_free_node(TIMER_CACHE *cache, TIMER_CACHE_NODE *node)
int timer_cache_remove_exist(TIMER_CACHE *cache, long long expire, RING *entry)
{
RING_ITER iter;
//RING_ITER iter;
TIMER_CACHE_NODE n, *node;
n.expire = expire;
@ -122,6 +126,7 @@ int timer_cache_remove_exist(TIMER_CACHE *cache, long long expire, RING *entry)
return 0;
}
#if 0
ring_foreach(iter, &node->ring) {
if (iter.ptr == entry) {
ring_detach(entry);
@ -131,6 +136,16 @@ int timer_cache_remove_exist(TIMER_CACHE *cache, long long expire, RING *entry)
return 1;
}
}
#else
if (entry->parent != &node->ring) {
return 0;
}
return 0;
ring_detach(entry);
if (ring_size(&node->ring) == 0) {
timer_cache_free_node(cache, node);
}
#endif
return 1;
}

View File

@ -29,7 +29,7 @@ TIMER_CACHE *timer_cache_create(void);
unsigned timer_cache_size(TIMER_CACHE *cache);
void timer_cache_free(TIMER_CACHE *cache);
void timer_cache_add(TIMER_CACHE *cache, long long expire, RING *entry);
void timer_cache_remove(TIMER_CACHE *cache, long long expire, RING *entry);
int timer_cache_remove(TIMER_CACHE *cache, long long expire, RING *entry);
void timer_cache_free_node(TIMER_CACHE *cache, TIMER_CACHE_NODE *node);
int timer_cache_remove_exist(TIMER_CACHE *cache, long long expire, RING *entry);

View File

@ -62,6 +62,7 @@ struct ACL_FIBER {
#define FIBER_F_CLOSED (unsigned) (1 << 2)
#define FIBER_F_SIGNALED (unsigned) (1 << 3)
#define FIBER_F_CANCELED (FIBER_F_KILLED | FIBER_F_CLOSED | FIBER_F_SIGNALED)
#define FIBER_F_TIMER (unsigned) (1 << 4)
RING holding;
ACL_FIBER_LOCK *waiting;
@ -123,6 +124,8 @@ int fiber_wait_read(FILE_EVENT *fe);
int fiber_wait_write(FILE_EVENT *fe);
EVENT *fiber_io_event(void);
void fiber_timer_add(ACL_FIBER *fiber, unsigned milliseconds);
int fiber_timer_del(ACL_FIBER *fiber);
FILE_EVENT *fiber_file_open_read(socket_t fd);
FILE_EVENT *fiber_file_open_write(socket_t fd);

View File

@ -188,6 +188,9 @@ static void wakeup_timers(TIMER_CACHE *timers, long long now)
}
while ((fb = (ACL_FIBER*) array_pop_back(timers->objs))) {
// Set the flag that the fiber wakeuped for the
// timer's arriving.
fb->flag |= FIBER_F_TIMER;
acl_fiber_ready(fb);
}
@ -327,6 +330,9 @@ unsigned int acl_fiber_delay(unsigned int milliseconds)
WAITER_DEC(__thread_fiber->event);
// Clear the flag been set in wakeup_timers.
fiber->flag &= ~FIBER_F_TIMER;
if (timer_cache_size(__thread_fiber->ev_timer) == 0) {
ev->timeout = -1;
}
@ -380,16 +386,45 @@ ACL_FIBER *acl_fiber_create_timer(unsigned int milliseconds, size_t size,
return fiber;
}
void acl_fiber_reset_timer(ACL_FIBER *fiber, unsigned int milliseconds)
int acl_fiber_reset_timer(ACL_FIBER *fiber, unsigned int milliseconds)
{
long long when;
// The previous timer with the fiber must be removed first.
int ret = timer_cache_remove(__thread_fiber->ev_timer,
fiber->when, &fiber->me);
if (ret == 0) {
msg_error("%s(%d): not found fiber=%p, fid=%d",
__FUNCTION__, __LINE__, fiber, acl_fiber_id(fiber));
return -1;
}
fiber_timer_add(fiber, milliseconds);
return 0;
}
void fiber_timer_add(ACL_FIBER *fiber, unsigned milliseconds)
{
EVENT *ev = fiber_io_event();
long long now = event_get_stamp(ev);
TIMER_CACHE_NODE *timer;
fiber->when = now + milliseconds;
timer_cache_add(__thread_fiber->ev_timer, fiber->when, &fiber->me);
timer = TIMER_FIRST(__thread_fiber->ev_timer);
if (timer->expire <= now) {
ev->timeout = 0;
} else {
ev->timeout = (int) (fiber->when - now);
}
}
int fiber_timer_del(ACL_FIBER *fiber)
{
fiber_io_check();
when = fiber_io_stamp();
when += milliseconds;
fiber->when = when;
fiber->status = FIBER_STATUS_READY;
return timer_cache_remove(__thread_fiber->ev_timer,
fiber->when, &fiber->me);
}
unsigned int acl_fiber_sleep(unsigned int seconds)

View File

@ -108,8 +108,9 @@ static int fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex,
array_delete_obj(cond->waiters, obj, NULL);
UNLOCK_COND(cond);
if (obj->status & SYNC_STATUS_TIMEOUT) {
if (fiber->flag & FIBER_F_TIMER) {
// The obj has been deleted in sync_timer.c when timeout.
fiber->flag &= ~FIBER_F_TIMER;
sync_obj_unrefer(obj);
return FIBER_ETIME;
}

View File

@ -83,7 +83,8 @@ static int check_expire(EVENT *ev, SYNC_TIMER *timer)
// be returned if the obj has been in the cond, or else
// -1 will return, so we just set the DELAYED flag.
if (fiber_cond_delete_waiter(obj->cond, obj) == 0) {
obj->status = SYNC_STATUS_TIMEOUT;
//obj->status = SYNC_STATUS_TIMEOUT;
obj->fb->flag |= FIBER_F_TIMER;
acl_fiber_ready(obj->fb);
} else {
obj->status = SYNC_STATUS_DELAYED;
@ -109,7 +110,25 @@ static int check_expire(EVENT *ev, SYNC_TIMER *timer)
return 100;
}
static void handle_wakeup(SYNC_TIMER *timer, SYNC_OBJ *obj)
#define USE_GLOBAL_TIMER
#if defined(USE_GLOBAL_TIMER)
static void wakeup_waiter(SYNC_TIMER *timer UNUSED, SYNC_OBJ *obj)
{
// The fiber must has been wakeuped by the other fiber or thread.
// No timer has been set if delay < 0,
if (obj->delay < 0) {
acl_fiber_ready(obj->fb);
}
// The wakeup is earlier than the timer if deleting timer successfully.
else if (fiber_timer_del(obj->fb) == 1) {
acl_fiber_ready(obj->fb);
}
// else: The fiber has been wakeuped by the timer.
}
#else
static void wakeup_waiter(SYNC_TIMER *timer, SYNC_OBJ *obj)
{
if (obj->delay < 0) {
acl_fiber_ready(obj->fb);
@ -118,10 +137,10 @@ static void handle_wakeup(SYNC_TIMER *timer, SYNC_OBJ *obj)
acl_fiber_ready(obj->fb);
} else if (obj->status & SYNC_STATUS_DELAYED) {
acl_fiber_ready(obj->fb);
} else {
msg_error("%s(%d): no obj=%p", __FUNCTION__, __LINE__, obj);
}
// else: the fiber has been wakeuped by the timer.
}
#endif
static void fiber_waiting(ACL_FIBER *fiber fiber_unused, void *ctx)
{
@ -143,6 +162,7 @@ static void fiber_waiting(ACL_FIBER *fiber fiber_unused, void *ctx)
//assert(obj->fb->status == FIBER_STATUS_SUSPEND);
switch (msg->action) {
#if !defined(USE_GLOBAL_TIMER)
case SYNC_ACTION_AWAIT:
assert (obj->delay >= 0);
obj->expire = event_get_stamp(ev) + obj->delay;
@ -151,8 +171,9 @@ static void fiber_waiting(ACL_FIBER *fiber fiber_unused, void *ctx)
delay = obj->delay;
}
break;
#endif
case SYNC_ACTION_WAKEUP:
handle_wakeup(timer, obj);
wakeup_waiter(timer, obj);
break;
default:
msg_fatal("%s(%d): unkown action=%d",
@ -190,7 +211,11 @@ SYNC_TIMER *sync_timer_get(void)
void sync_timer_await(SYNC_TIMER *timer, SYNC_OBJ *obj)
{
#if 1
#if defined(USE_GLOBAL_TIMER)
(void) timer;
assert (obj->delay >= 0);
fiber_timer_add(obj->fb, obj->delay);
#else
SYNC_MSG *msg;
assert (obj->delay >= 0);
@ -198,12 +223,6 @@ void sync_timer_await(SYNC_TIMER *timer, SYNC_OBJ *obj)
msg->obj = obj;
msg->action = SYNC_ACTION_AWAIT;
mbox_send(timer->box, msg);
#else
EVENT *ev = fiber_io_event();
assert (obj->delay >= 0);
obj->expire = event_get_stamp(ev) + obj->delay;
timer_cache_add(timer->waiters, obj->expire, &obj->me);
#endif
}
@ -226,6 +245,6 @@ void sync_timer_wakeup(SYNC_TIMER *timer, SYNC_OBJ *obj)
mbox_send(timer->box, msg);
fiber_file_cache_put(fe);
} else {
handle_wakeup(timer, obj);
wakeup_waiter(timer, obj);
}
}

View File

@ -95,9 +95,9 @@ static void do_consume(void)
}
if (ret != 0) {
printf("thread-%lu, fiber-%d: timedwait error=%s\r\n",
printf("thread-%lu, fiber-%d: timedwait error=%s, timeout=%d\r\n",
(unsigned long) pthread_self(),
acl_fiber_self(), strerror(ret));
acl_fiber_self(), strerror(ret), __wait_timeout);
continue;
}