optimize and test fiber_cond

This commit is contained in:
zhengshuxin 2022-11-14 04:39:28 -05:00
parent 0ad17e5048
commit 84ab52a4a5
6 changed files with 99 additions and 19 deletions

View File

@ -5,12 +5,9 @@
#include "fiber/fiber_cond.h"
#include "common/pthread_patch.h"
#include "fiber.h"
#include "sync_timer.h"
struct ACL_FIBER_COND {
ARRAY *waiters;
pthread_mutex_t mutex;
};
#include "sync_type.h"
#include "sync_timer.h"
ACL_FIBER_COND *acl_fiber_cond_create(unsigned flag fiber_unused)
{
@ -31,6 +28,8 @@ ACL_FIBER_COND *acl_fiber_cond_create(unsigned flag fiber_unused)
pthread_mutex_init(&cond->mutex, NULL);
#endif
pthread_cond_init(&cond->thread_cond, NULL);
return cond;
}
@ -38,6 +37,7 @@ void acl_fiber_cond_free(ACL_FIBER_COND *cond)
{
array_free(cond->waiters, NULL);
pthread_mutex_destroy(&cond->mutex);
pthread_cond_destroy(&cond->thread_cond);
mem_free(cond);
}
@ -71,13 +71,14 @@ void acl_fiber_cond_free(ACL_FIBER_COND *cond)
} \
} while (0)
int acl_fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex,
static int fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex,
int delay_ms)
{
EVENT *ev = fiber_io_event();
ACL_FIBER *fiber = acl_fiber_running();
SYNC_OBJ *obj = (SYNC_OBJ*) mem_calloc(1, sizeof(SYNC_OBJ));
SYNC_OBJ *obj = (SYNC_OBJ*) mem_malloc(sizeof(SYNC_OBJ));
obj->type = SYNC_OBJ_T_FIBER;
obj->fb = fiber;
obj->delay = delay_ms;
obj->status = 0;
@ -110,6 +111,49 @@ int acl_fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex,
return 0;
}
static int thread_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex,
int delay_ms)
{
SYNC_OBJ *obj = (SYNC_OBJ*) mem_calloc(1, sizeof(SYNC_OBJ));
struct timespec timeout;
struct timeval tv;
int ret;
obj->type = SYNC_OBJ_T_THREAD;
LOCK_COND(cond);
array_append(cond->waiters, obj);
UNLOCK_COND(cond);
gettimeofday(&tv, NULL);
timeout.tv_sec = tv.tv_sec + delay_ms / 1000;
timeout.tv_nsec = tv.tv_usec * 1000 + (delay_ms % 1000) * 1000 * 1000;
ret = pthread_cond_timedwait(&cond->thread_cond,
&mutex->thread_lock, &timeout);
LOCK_COND(cond);
// If the object hasn't been poped by producer, we should remove it
// from the waiters' array and free it here.
if (array_delete_obj(cond->waiters, obj, NULL) == 0) {
mem_free(obj);
}
// else: the object must be poped by acl_fiber_cond_signal, and will
// be freed there.
UNLOCK_COND(cond);
return ret;
}
int acl_fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex,
int delay_ms)
{
if (var_hook_sys_api) {
return fiber_cond_timedwait(cond, mutex, delay_ms);
} else {
return thread_cond_timedwait(cond, mutex, delay_ms);
}
}
int acl_fiber_cond_wait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex)
{
return acl_fiber_cond_timedwait(cond, mutex, -1);
@ -118,14 +162,26 @@ int acl_fiber_cond_wait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex)
int acl_fiber_cond_signal(ACL_FIBER_COND *cond)
{
SYNC_OBJ *obj;
int ret = 0;
LOCK_COND(cond);
obj = array_pop_front(cond->waiters);
if (obj) {
sync_timer_wakeup(obj->timer, obj);
if (obj == NULL) {
UNLOCK_COND(cond);
return 0;
}
if (obj->type == SYNC_OBJ_T_FIBER) {
sync_timer_wakeup(obj->timer, obj);
} else if (obj->type == SYNC_OBJ_T_THREAD) {
ret = pthread_cond_signal(&obj->cond->thread_cond);
mem_free(obj);
} else {
msg_fatal("%s: unknown type=%d", __FUNCTION__, obj->type);
}
UNLOCK_COND(cond);
return 0;
return ret;
}
int fiber_cond_delete_waiter(ACL_FIBER_COND *cond, SYNC_OBJ *obj)

View File

@ -4,16 +4,9 @@
#include "fiber/libfiber.h"
#include "fiber/fiber_mutex.h"
#include "fiber.h"
#include "sync_waiter.h"
struct ACL_FIBER_MUTEX {
unsigned flags;
ATOMIC *atomic;
long long value;
ARRAY *waiters;
pthread_mutex_t lock;
pthread_mutex_t thread_lock;
};
#include "sync_type.h"
#include "sync_waiter.h"
ACL_FIBER_MUTEX *acl_fiber_mutex_create(unsigned flags)
{

View File

@ -113,6 +113,7 @@ static void fiber_waiting(ACL_FIBER *fiber fiber_unused, void *ctx)
while (!timer->stop) {
SYNC_MSG *msg = mbox_read(timer->box, delay, &res);
if (msg == NULL) {
delay = check_expire(ev, timer);
continue;
@ -139,6 +140,7 @@ static void fiber_waiting(ACL_FIBER *fiber fiber_unused, void *ctx)
__FUNCTION__, __LINE__, msg->action);
break;
}
mem_free(msg);
delay = check_expire(ev, timer);

View File

@ -11,8 +11,13 @@ typedef struct SYNC_OBJ {
ACL_FIBER *fb;
ACL_FIBER_COND *cond;
int type;
#define SYNC_OBJ_T_FIBER 1
#define SYNC_OBJ_T_THREAD 2
long long expire;
int delay;
int status;
#define SYNC_STATUS_TIMEOUT (1 << 0)
#define SYNC_STATUS_DELAYED (1 << 1)
@ -20,14 +25,18 @@ typedef struct SYNC_OBJ {
typedef struct SYNC_MSG {
SYNC_OBJ *obj;
int action;
#define SYNC_ACTION_AWAIT 1
#define SYNC_ACTION_WAKEUP 2
} SYNC_MSG;
// In sync_timer.c
SYNC_TIMER *sync_timer_get(void);
void sync_timer_await(SYNC_TIMER *waiter, SYNC_OBJ *obj);
void sync_timer_wakeup(SYNC_TIMER *waiter, SYNC_OBJ *obj);
// In fiber_cond.c
int fiber_cond_delete_waiter(ACL_FIBER_COND *cond, SYNC_OBJ *obj);
#endif

View File

@ -0,0 +1,19 @@
#ifndef __SYNC_TYPE_INCLUDE_H__
#define __SYNC_TYPE_INCLUDE_H__
struct ACL_FIBER_MUTEX {
unsigned flags;
ATOMIC *atomic;
long long value;
ARRAY *waiters;
pthread_mutex_t lock;
pthread_mutex_t thread_lock;
};
struct ACL_FIBER_COND {
ARRAY *waiters;
pthread_mutex_t mutex;
pthread_cond_t thread_cond;
};
#endif

View File

@ -4,6 +4,7 @@
typedef struct SYNC_WAITER SYNC_WAITER;
typedef struct ACL_FIBER ACL_FIBER;
// In sync_waiter.c
SYNC_WAITER *sync_waiter_get(void);
void sync_waiter_wakeup(SYNC_WAITER *waiter, ACL_FIBER *fb);