diff --git a/lib_fiber/c/src/sync/fiber_cond.c b/lib_fiber/c/src/sync/fiber_cond.c index 00d431cdf..6e7705287 100644 --- a/lib_fiber/c/src/sync/fiber_cond.c +++ b/lib_fiber/c/src/sync/fiber_cond.c @@ -5,12 +5,9 @@ #include "fiber/fiber_cond.h" #include "common/pthread_patch.h" #include "fiber.h" -#include "sync_timer.h" -struct ACL_FIBER_COND { - ARRAY *waiters; - pthread_mutex_t mutex; -}; +#include "sync_type.h" +#include "sync_timer.h" ACL_FIBER_COND *acl_fiber_cond_create(unsigned flag fiber_unused) { @@ -31,6 +28,8 @@ ACL_FIBER_COND *acl_fiber_cond_create(unsigned flag fiber_unused) pthread_mutex_init(&cond->mutex, NULL); #endif + pthread_cond_init(&cond->thread_cond, NULL); + return cond; } @@ -38,6 +37,7 @@ void acl_fiber_cond_free(ACL_FIBER_COND *cond) { array_free(cond->waiters, NULL); pthread_mutex_destroy(&cond->mutex); + pthread_cond_destroy(&cond->thread_cond); mem_free(cond); } @@ -71,13 +71,14 @@ void acl_fiber_cond_free(ACL_FIBER_COND *cond) } \ } while (0) -int acl_fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex, +static int fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex, int delay_ms) { EVENT *ev = fiber_io_event(); ACL_FIBER *fiber = acl_fiber_running(); - SYNC_OBJ *obj = (SYNC_OBJ*) mem_calloc(1, sizeof(SYNC_OBJ)); + SYNC_OBJ *obj = (SYNC_OBJ*) mem_malloc(sizeof(SYNC_OBJ)); + obj->type = SYNC_OBJ_T_FIBER; obj->fb = fiber; obj->delay = delay_ms; obj->status = 0; @@ -110,6 +111,49 @@ int acl_fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex, return 0; } +static int thread_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex, + int delay_ms) +{ + SYNC_OBJ *obj = (SYNC_OBJ*) mem_calloc(1, sizeof(SYNC_OBJ)); + struct timespec timeout; + struct timeval tv; + int ret; + + obj->type = SYNC_OBJ_T_THREAD; + + LOCK_COND(cond); + array_append(cond->waiters, obj); + UNLOCK_COND(cond); + + gettimeofday(&tv, NULL); + timeout.tv_sec = tv.tv_sec + delay_ms / 1000; + timeout.tv_nsec = tv.tv_usec * 1000 + (delay_ms % 1000) * 1000 * 1000; + + ret = pthread_cond_timedwait(&cond->thread_cond, + &mutex->thread_lock, &timeout); + LOCK_COND(cond); + // If the object hasn't been poped by producer, we should remove it + // from the waiters' array and free it here. + if (array_delete_obj(cond->waiters, obj, NULL) == 0) { + mem_free(obj); + } + // else: the object must be poped by acl_fiber_cond_signal, and will + // be freed there. + UNLOCK_COND(cond); + + return ret; +} + +int acl_fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex, + int delay_ms) +{ + if (var_hook_sys_api) { + return fiber_cond_timedwait(cond, mutex, delay_ms); + } else { + return thread_cond_timedwait(cond, mutex, delay_ms); + } +} + int acl_fiber_cond_wait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex) { return acl_fiber_cond_timedwait(cond, mutex, -1); @@ -118,14 +162,26 @@ int acl_fiber_cond_wait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex) int acl_fiber_cond_signal(ACL_FIBER_COND *cond) { SYNC_OBJ *obj; + int ret = 0; LOCK_COND(cond); obj = array_pop_front(cond->waiters); - if (obj) { - sync_timer_wakeup(obj->timer, obj); + if (obj == NULL) { + UNLOCK_COND(cond); + return 0; } + + if (obj->type == SYNC_OBJ_T_FIBER) { + sync_timer_wakeup(obj->timer, obj); + } else if (obj->type == SYNC_OBJ_T_THREAD) { + ret = pthread_cond_signal(&obj->cond->thread_cond); + mem_free(obj); + } else { + msg_fatal("%s: unknown type=%d", __FUNCTION__, obj->type); + } + UNLOCK_COND(cond); - return 0; + return ret; } int fiber_cond_delete_waiter(ACL_FIBER_COND *cond, SYNC_OBJ *obj) diff --git a/lib_fiber/c/src/sync/fiber_mutex.c b/lib_fiber/c/src/sync/fiber_mutex.c index ac84a2fbf..a1b258204 100644 --- a/lib_fiber/c/src/sync/fiber_mutex.c +++ b/lib_fiber/c/src/sync/fiber_mutex.c @@ -4,16 +4,9 @@ #include "fiber/libfiber.h" #include "fiber/fiber_mutex.h" #include "fiber.h" -#include "sync_waiter.h" -struct ACL_FIBER_MUTEX { - unsigned flags; - ATOMIC *atomic; - long long value; - ARRAY *waiters; - pthread_mutex_t lock; - pthread_mutex_t thread_lock; -}; +#include "sync_type.h" +#include "sync_waiter.h" ACL_FIBER_MUTEX *acl_fiber_mutex_create(unsigned flags) { diff --git a/lib_fiber/c/src/sync/sync_timer.c b/lib_fiber/c/src/sync/sync_timer.c index 14e0407d7..8c314fd36 100644 --- a/lib_fiber/c/src/sync/sync_timer.c +++ b/lib_fiber/c/src/sync/sync_timer.c @@ -113,6 +113,7 @@ static void fiber_waiting(ACL_FIBER *fiber fiber_unused, void *ctx) while (!timer->stop) { SYNC_MSG *msg = mbox_read(timer->box, delay, &res); + if (msg == NULL) { delay = check_expire(ev, timer); continue; @@ -139,6 +140,7 @@ static void fiber_waiting(ACL_FIBER *fiber fiber_unused, void *ctx) __FUNCTION__, __LINE__, msg->action); break; } + mem_free(msg); delay = check_expire(ev, timer); diff --git a/lib_fiber/c/src/sync/sync_timer.h b/lib_fiber/c/src/sync/sync_timer.h index 0df4827d1..1bb5fd261 100644 --- a/lib_fiber/c/src/sync/sync_timer.h +++ b/lib_fiber/c/src/sync/sync_timer.h @@ -11,8 +11,13 @@ typedef struct SYNC_OBJ { ACL_FIBER *fb; ACL_FIBER_COND *cond; + int type; +#define SYNC_OBJ_T_FIBER 1 +#define SYNC_OBJ_T_THREAD 2 + long long expire; int delay; + int status; #define SYNC_STATUS_TIMEOUT (1 << 0) #define SYNC_STATUS_DELAYED (1 << 1) @@ -20,14 +25,18 @@ typedef struct SYNC_OBJ { typedef struct SYNC_MSG { SYNC_OBJ *obj; + int action; #define SYNC_ACTION_AWAIT 1 #define SYNC_ACTION_WAKEUP 2 } SYNC_MSG; +// In sync_timer.c SYNC_TIMER *sync_timer_get(void); void sync_timer_await(SYNC_TIMER *waiter, SYNC_OBJ *obj); void sync_timer_wakeup(SYNC_TIMER *waiter, SYNC_OBJ *obj); + +// In fiber_cond.c int fiber_cond_delete_waiter(ACL_FIBER_COND *cond, SYNC_OBJ *obj); #endif diff --git a/lib_fiber/c/src/sync/sync_type.h b/lib_fiber/c/src/sync/sync_type.h new file mode 100644 index 000000000..b57536f97 --- /dev/null +++ b/lib_fiber/c/src/sync/sync_type.h @@ -0,0 +1,19 @@ +#ifndef __SYNC_TYPE_INCLUDE_H__ +#define __SYNC_TYPE_INCLUDE_H__ + +struct ACL_FIBER_MUTEX { + unsigned flags; + ATOMIC *atomic; + long long value; + ARRAY *waiters; + pthread_mutex_t lock; + pthread_mutex_t thread_lock; +}; + +struct ACL_FIBER_COND { + ARRAY *waiters; + pthread_mutex_t mutex; + pthread_cond_t thread_cond; +}; + +#endif diff --git a/lib_fiber/c/src/sync/sync_waiter.h b/lib_fiber/c/src/sync/sync_waiter.h index 1e1a58c62..c87623d38 100644 --- a/lib_fiber/c/src/sync/sync_waiter.h +++ b/lib_fiber/c/src/sync/sync_waiter.h @@ -4,6 +4,7 @@ typedef struct SYNC_WAITER SYNC_WAITER; typedef struct ACL_FIBER ACL_FIBER; +// In sync_waiter.c SYNC_WAITER *sync_waiter_get(void); void sync_waiter_wakeup(SYNC_WAITER *waiter, ACL_FIBER *fb);