test fiber_mutex

This commit is contained in:
zhengshuxin 2022-11-11 09:53:23 -05:00
parent d4981e0a9a
commit 821d06d573
4 changed files with 86 additions and 36 deletions

View File

@ -530,7 +530,7 @@ AGAIN:
if (ret < 0) { if (ret < 0) {
if (ret == -ETIME) { if (ret == -ETIME) {
printf("thread-%lu: Got etime\n", pthread_self()); printf("thread-%lu: Got etime, waiter=%d\n", pthread_self(), (int) ep->event.waiter);
return 0; return 0;
} else if (ret == -EAGAIN) { } else if (ret == -EAGAIN) {
printf("Got eagain\n"); printf("Got eagain\n");

View File

@ -425,6 +425,8 @@ static void fiber_swap(ACL_FIBER *from, ACL_FIBER *to)
__thread_fiber->fibers[slot]->slot = (unsigned) slot; __thread_fiber->fibers[slot]->slot = (unsigned) slot;
ring_prepend(&__thread_fiber->dead, &from->me); ring_prepend(&__thread_fiber->dead, &from->me);
} else {
from->status = FIBER_STATUS_SUSPEND;
} }
if (to->status != FIBER_STATUS_EXITING) { if (to->status != FIBER_STATUS_EXITING) {

View File

@ -41,6 +41,8 @@ void acl_fiber_mutex_free(ACL_FIBER_MUTEX *mutex)
mem_free(mutex); mem_free(mutex);
} }
static __thread int __cnt = 0;
int acl_fiber_mutex_lock(ACL_FIBER_MUTEX *mutex) int acl_fiber_mutex_lock(ACL_FIBER_MUTEX *mutex)
{ {
int wakeup = 0; int wakeup = 0;
@ -49,7 +51,9 @@ int acl_fiber_mutex_lock(ACL_FIBER_MUTEX *mutex)
while (1) { while (1) {
if (atomic_int64_cas(mutex->atomic, 0, 1) == 0) { if (atomic_int64_cas(mutex->atomic, 0, 1) == 0) {
pthread_mutex_lock(&mutex->thread_lock); //pthread_mutex_lock(&mutex->thread_lock);
__cnt++;
//printf(">>>>thread-%lu, cnt=%d\n", pthread_self(), __cnt);
return 0; return 0;
} }
@ -67,19 +71,21 @@ int acl_fiber_mutex_lock(ACL_FIBER_MUTEX *mutex)
fiber = acl_fiber_running(); fiber = acl_fiber_running();
fiber->waiter = sync_waiter_get(); fiber->waiter = sync_waiter_get();
sync_waiter_append(fiber->waiter, fiber); //sync_waiter_append(fiber->waiter, fiber);
#if 1
{
ITER iter;
foreach(iter, mutex->waiters) {
ACL_FIBER *fb = (ACL_FIBER*) iter.data;
assert(fb != fiber);
}
}
#endif
pthread_mutex_lock(&mutex->lock); pthread_mutex_lock(&mutex->lock);
array_append(mutex->waiters, fiber); array_append(mutex->waiters, fiber);
if (atomic_int64_cas(mutex->atomic, 0, 1) == 0) {
int r = array_delete_obj(mutex->waiters, fiber, NULL);
pthread_mutex_unlock(&mutex->lock);
//pthread_mutex_lock(&mutex->thread_lock);
printf("thread-%lu, ok fb=%p, wakeup=%d, cnt=%d, r=%d\n",
pthread_self(), fiber, wakeup, __cnt, r);
assert(r == 0);
return 0;
}
pthread_mutex_unlock(&mutex->lock); pthread_mutex_unlock(&mutex->lock);
ev = fiber_io_event(); ev = fiber_io_event();
@ -87,6 +93,22 @@ int acl_fiber_mutex_lock(ACL_FIBER_MUTEX *mutex)
acl_fiber_switch(); acl_fiber_switch();
ev->waiter--; ev->waiter--;
#if 1
pthread_mutex_lock(&mutex->lock);
{
ITER iter;
foreach(iter, mutex->waiters) {
ACL_FIBER *fb = (ACL_FIBER*) iter.data;
if (fb == fiber) {
printf(">>>thread-%lu, dup, why? fb=%p\n",
pthread_self(), fb);
assert(fb != fiber);
}
}
}
pthread_mutex_unlock(&mutex->lock);
#endif
if (++wakeup > 5) { if (++wakeup > 5) {
wakeup = 0; wakeup = 0;
acl_fiber_delay(100); acl_fiber_delay(100);
@ -99,16 +121,22 @@ int acl_fiber_mutex_unlock(ACL_FIBER_MUTEX *mutex)
ACL_FIBER *fiber; ACL_FIBER *fiber;
pthread_mutex_lock(&mutex->lock); pthread_mutex_lock(&mutex->lock);
fiber = (ACL_FIBER*) array_head(mutex->waiters); fiber = (ACL_FIBER*) array_pop_front(mutex->waiters);
(void) array_pop_front(mutex->waiters);
pthread_mutex_unlock(&mutex->lock);
pthread_mutex_unlock(&mutex->thread_lock); //pthread_mutex_unlock(&mutex->thread_lock);
if (atomic_int64_cas(mutex->atomic, 1, 0) != 1) { if (atomic_int64_cas(mutex->atomic, 1, 0) != 1) {
assert(0);
return -1; return -1;
} }
pthread_mutex_unlock(&mutex->lock);
if (fiber) { if (fiber) {
if (fiber->status == FIBER_STATUS_SUSPEND)
printf(">>>thread-%lu, pop suspend fb=%p\n",
pthread_self(), fiber);
else
printf(">>>thread-%lu, pop not %d fb=%p\n",
pthread_self(), fiber->status, fiber);
sync_waiter_wakeup(fiber->waiter, fiber); sync_waiter_wakeup(fiber->waiter, fiber);
} }

View File

@ -10,14 +10,15 @@
struct SYNC_WAITER { struct SYNC_WAITER {
pthread_mutex_t lock; pthread_mutex_t lock;
RING waiters; ARRAY *waiters;
RING ready; ARRAY *ready;
ACL_FIBER *fb; ACL_FIBER *fb;
ATOMIC *atomic; ATOMIC *atomic;
long long value; long long value;
#ifdef USE_MBOX #ifdef USE_MBOX
MBOX *box; MBOX *box;
#endif #endif
int left;
int stop; int stop;
}; };
@ -26,8 +27,8 @@ static SYNC_WAITER *sync_waiter_new(void)
SYNC_WAITER *waiter = (SYNC_WAITER*) mem_calloc(1, sizeof(SYNC_WAITER)); SYNC_WAITER *waiter = (SYNC_WAITER*) mem_calloc(1, sizeof(SYNC_WAITER));
pthread_mutex_init(&waiter->lock, NULL); pthread_mutex_init(&waiter->lock, NULL);
ring_init(&waiter->waiters); waiter->waiters = array_create(100, ARRAY_F_UNORDER);
ring_init(&waiter->ready); waiter->ready = array_create(100, ARRAY_F_UNORDER);
waiter->atomic = atomic_new(); waiter->atomic = atomic_new();
atomic_set(waiter->atomic, &waiter->value); atomic_set(waiter->atomic, &waiter->value);
atomic_int64_set(waiter->atomic, 0); atomic_int64_set(waiter->atomic, 0);
@ -35,6 +36,7 @@ static SYNC_WAITER *sync_waiter_new(void)
#ifdef USE_MBOX #ifdef USE_MBOX
waiter->box = mbox_create(MBOX_T_MPSC); waiter->box = mbox_create(MBOX_T_MPSC);
#endif #endif
waiter->left = 0;
return waiter; return waiter;
} }
@ -43,6 +45,8 @@ static void sync_waiter_free(SYNC_WAITER *waiter)
{ {
pthread_mutex_destroy(&waiter->lock); pthread_mutex_destroy(&waiter->lock);
atomic_free(waiter->atomic); atomic_free(waiter->atomic);
array_free(waiter->waiters, NULL);
array_free(waiter->ready, NULL);
#ifdef USE_MBOX #ifdef USE_MBOX
mbox_free(waiter->box, NULL); mbox_free(waiter->box, NULL);
#endif #endif
@ -65,21 +69,22 @@ static void thread_init(void)
} }
} }
/*
static void check_timedout(SYNC_WAITER *waiter) static void check_timedout(SYNC_WAITER *waiter)
{ {
(void) waiter; (void) waiter;
} }
*/
#ifndef USE_MBOX #ifndef USE_MBOX
static void wakeup_waiters(SYNC_WAITER *waiter) static void wakeup_waiters(SYNC_WAITER *waiter)
{ {
RING *head; ACL_FIBER *fb;
int n = 0; int n = 0;
pthread_mutex_lock(&waiter->lock); pthread_mutex_lock(&waiter->lock);
while ((head = ring_pop_head(&waiter->ready)) != NULL) { while ((fb = (ACL_FIBER*) array_pop_front(waiter->ready)) != NULL) {
ACL_FIBER *fiber = RING_TO_APPL(head, ACL_FIBER, me); acl_fiber_ready(fb);
acl_fiber_ready(fiber);
n++; n++;
} }
pthread_mutex_unlock(&waiter->lock); pthread_mutex_unlock(&waiter->lock);
@ -92,7 +97,8 @@ static void wakeup_waiters(SYNC_WAITER *waiter)
} }
#endif #endif
static void fiber_waiting(ACL_FIBER *fb, void *ctx) static __thread int __cnt = 0;
static void fiber_waiting(ACL_FIBER *fiber, void *ctx)
{ {
SYNC_WAITER *waiter = (SYNC_WAITER*) ctx; SYNC_WAITER *waiter = (SYNC_WAITER*) ctx;
int delay = -1; int delay = -1;
@ -100,18 +106,28 @@ static void fiber_waiting(ACL_FIBER *fb, void *ctx)
while (!waiter->stop) { while (!waiter->stop) {
#ifdef USE_MBOX #ifdef USE_MBOX
int res; int res;
ACL_FIBER *fiber = mbox_read(waiter->box, delay, &res); ACL_FIBER *fb = mbox_read(waiter->box, delay, &res);
if (fiber) { if (fb) {
acl_fiber_ready(fiber); int left;
__cnt++;
pthread_mutex_lock(&waiter->lock);
left = --waiter->left;
pthread_mutex_unlock(&waiter->lock);
printf(">>>thread-%lu, read one fb=%p, cnt=%d, fb status=%d, left=%d\n",
pthread_self(), fb, __cnt, fb->status, left);
assert(fb->status == FIBER_STATUS_SUSPEND);
acl_fiber_ready(fb);
} else {
assert(0);
} }
#else #else
if (read_wait(fb->base.event_in, delay) == -1) { if (read_wait(fiber->base.event_in, delay) == -1) {
check_timedout(waiter); check_timedout(waiter);
//wakeup_waiters(waiter); //wakeup_waiters(waiter);
continue; continue;
} }
if (fbase_event_wait(&fb->base) == -1) { if (fbase_event_wait(&fiber->base) == -1) {
abort(); abort();
} }
wakeup_waiters(waiter); wakeup_waiters(waiter);
@ -124,9 +140,9 @@ static void fiber_waiting(ACL_FIBER *fb, void *ctx)
#endif #endif
} }
printf(">>>begin free base=%p\n", &fb->base); printf(">>>begin free base=%p\n", &fiber->base);
#ifndef USE_MBOX #ifndef USE_MBOX
fbase_event_close(&fb->base); fbase_event_close(&fiber->base);
#endif #endif
} }
@ -158,7 +174,7 @@ SYNC_WAITER *sync_waiter_get(void)
void sync_waiter_append(SYNC_WAITER *waiter, ACL_FIBER *fb) void sync_waiter_append(SYNC_WAITER *waiter, ACL_FIBER *fb)
{ {
pthread_mutex_lock(&waiter->lock); pthread_mutex_lock(&waiter->lock);
ring_prepend(&waiter->waiters, &fb->me); array_append(waiter->waiters, fb);
pthread_mutex_unlock(&waiter->lock); pthread_mutex_unlock(&waiter->lock);
} }
@ -166,13 +182,17 @@ void sync_waiter_wakeup(SYNC_WAITER *waiter, ACL_FIBER *fb)
{ {
#ifdef USE_MBOX #ifdef USE_MBOX
pthread_mutex_lock(&waiter->lock); pthread_mutex_lock(&waiter->lock);
ring_detach(&fb->me); array_delete_obj(waiter->waiters, fb, NULL);
waiter->left++;
printf("thread-%lu >>>wakeup left=%d, fb=%p, status=%d\n",
pthread_self(), waiter->left, fb, fb->status);
pthread_mutex_unlock(&waiter->lock); pthread_mutex_unlock(&waiter->lock);
mbox_send(waiter->box, fb); mbox_send(waiter->box, fb);
#else #else
pthread_mutex_lock(&waiter->lock); pthread_mutex_lock(&waiter->lock);
ring_detach(&fb->me); array_delete_obj(waiter->waiters, fb, NULL);
ring_prepend(&waiter->ready, &fb->me); array_append(waiter->waiters, fb);
waiter->left++;
pthread_mutex_unlock(&waiter->lock); pthread_mutex_unlock(&waiter->lock);
// Only one wakeup action can be executed in the same time. // Only one wakeup action can be executed in the same time.