test fiber.

This commit is contained in:
zhengshuxin 2024-07-25 13:37:05 +08:00
parent 09463329d0
commit 79fda265b1
12 changed files with 84 additions and 21 deletions

View File

@ -477,7 +477,7 @@ static void fiber_signal(ACL_FIBER *fiber, int signum, int sync)
ring_detach(&fiber->me); // This is safety!
#endif
acl_fiber_ready(fiber);
FIBER_READY(fiber);
// Yield myself if in synchronous mode.
if (sync) {
@ -554,7 +554,7 @@ int acl_fiber_yield(void)
// Reset the current fiber's status in order to be added to
// ready queue again.
__thread_fiber->running->status = FIBER_STATUS_NONE;
acl_fiber_ready(__thread_fiber->running);
FIBER_READY(__thread_fiber->running);
acl_fiber_switch();
return 1;
@ -672,6 +672,18 @@ static ACL_FIBER *fiber_alloc(void (*fn)(ACL_FIBER *, void *),
fiber->status = FIBER_STATUS_NONE;
fiber->wstatus = FIBER_WAIT_NONE;
#ifdef DEBUG_READY
fiber->ctag[0] = 0;
fiber->cline = 0;
fiber->ltag[0] = 0;
fiber->lline = 0;
fiber->curr = 0;
fiber->last = 0;
fiber->lstatus = 0;
fiber->lwstatus = 0;
fiber->lflag = 0;
#endif
#ifdef DEBUG_LOCK
fiber->waiting = NULL;
ring_init(&fiber->holding);
@ -731,7 +743,7 @@ ACL_FIBER *acl_fiber_create2(const ACL_FIBER_ATTR *attr,
fiber->slot = __thread_fiber->slot;
__thread_fiber->fibers[__thread_fiber->slot++] = fiber;
acl_fiber_ready(fiber);
FIBER_READY(fiber);
if (__schedule_auto && !acl_fiber_scheduled()) {
acl_fiber_schedule();
}

View File

@ -84,8 +84,41 @@ struct ACL_FIBER {
FIBER_LOCAL **locals;
int nlocal;
#define DEBUG_READY
#ifdef DEBUG_READY
int cline;
int lline;
char ctag[32];
char ltag[32];
long long curr;
long long last;
unsigned short lstatus;
unsigned short lwstatus;
unsigned int lflag;
#endif
};
#ifdef DEBUG_READY
# define FIBER_READY(f) { \
if ((f)->ctag[0] != 0) { \
SAFE_STRNCPY((f)->ltag, (f)->ctag, sizeof((f)->ltag)); \
(f)->lline = (f)->cline; \
(f)->last = (f)->curr; \
(f)->lstatus = (f)->status; \
(f)->lwstatus = (f)->wstatus; \
(f)->lflag = (f)->flag; \
} \
SAFE_STRNCPY((f)->ctag, __FUNCTION__, sizeof((f)->ctag)); \
(f)->cline = __LINE__; \
SET_TIME((f)->curr); \
acl_fiber_ready((f)); \
}
#else
# define FIBER_READY(f) acl_fiber_ready((f))
#endif
/* in fiber.c */
extern __thread int var_hook_sys_api;

View File

@ -228,7 +228,7 @@ static void wakeup_timers(TIMER_CACHE *timers, long long now)
// we detatch fb->me from timer node and append it to
// the ready ring in acl_fiber_ready.
ring_detach(&fb->me);
acl_fiber_ready(fb);
FIBER_READY(fb);
}
next = TIMER_NEXT(timers, node);
@ -453,7 +453,7 @@ static void read_callback(EVENT *ev, FILE_EVENT *fe)
* the other fiber acl_fiber_kill() the fiber_r before.
*/
if (fe->fiber_r && fe->fiber_r->status != FIBER_STATUS_READY) {
acl_fiber_ready(fe->fiber_r);
FIBER_READY(fe->fiber_r);
}
}
@ -490,12 +490,24 @@ int fiber_wait_read(FILE_EVENT *fe)
WAITER_INC(__thread_fiber->event);
}
if (fe->mask & EVENT_SO_RCVTIMEO && fe->r_timeout > 0) {
if ((fe->mask & EVENT_SO_RCVTIMEO) && fe->r_timeout > 0) {
fiber_timer_add(curr, fe->r_timeout);
}
acl_fiber_switch();
if (fe->fiber_r == NULL) {
#ifdef DEBUG_READY
msg_error("%s(%d): fiber_r NULL, ltag=%s, lline=%d, ctag=%s, "
"cline=%d, curr=%lld, last=%lld",
__FUNCTION__, __LINE__, curr->ltag,
curr->lline, curr->ctag, curr->cline,
curr->curr, curr->last);
#else
msg_error("%s(%d): fiber_r NULL", __FUNCTION__, __LINE__);
#endif
}
fe->fiber_r->wstatus &= ~FIBER_WAIT_READ;
fe->fiber_r = NULL;
@ -509,6 +521,7 @@ int fiber_wait_read(FILE_EVENT *fe)
// fiber's wakeup process wasn't from read_callback normally.
event_del_read(__thread_fiber->event, fe, 1);
acl_fiber_set_error(curr->errnum);
fiber_timer_del(curr);
return -1;
} else if (curr->flag & FIBER_F_TIMER) {
// If the IO reading timeout set in setsockopt.
@ -521,6 +534,8 @@ int fiber_wait_read(FILE_EVENT *fe)
acl_fiber_set_errno(curr, FIBER_EAGAIN);
acl_fiber_set_error(FIBER_EAGAIN);
return -1;
} else if ((fe->mask & EVENT_SO_RCVTIMEO) && fe->r_timeout > 0) {
fiber_timer_del(curr);
}
// else: the IO read event should has been removed in read_callback.
@ -537,7 +552,7 @@ static void write_callback(EVENT *ev, FILE_EVENT *fe)
* not be set in ready queue again.
*/
if (fe->fiber_w && fe->fiber_w->status != FIBER_STATUS_READY) {
acl_fiber_ready(fe->fiber_w);
FIBER_READY(fe->fiber_w);
}
}
@ -568,7 +583,7 @@ int fiber_wait_write(FILE_EVENT *fe)
}
if (fe->mask & EVENT_SO_SNDTIMEO && fe->w_timeout > 0) {
if ((fe->mask & EVENT_SO_SNDTIMEO) && fe->w_timeout > 0) {
fiber_timer_add(curr, fe->w_timeout);
}
@ -584,6 +599,7 @@ int fiber_wait_write(FILE_EVENT *fe)
if (acl_fiber_canceled(curr)) {
event_del_write(__thread_fiber->event, fe, 1);
acl_fiber_set_error(curr->errnum);
fiber_timer_del(curr);
return -1;
} else if (curr->flag & FIBER_F_TIMER) {
curr->flag &= ~FIBER_F_TIMER;
@ -592,6 +608,8 @@ int fiber_wait_write(FILE_EVENT *fe)
acl_fiber_set_errno(curr, FIBER_EAGAIN);
acl_fiber_set_error(FIBER_EAGAIN);
return -1;
} else if ((fe->mask & EVENT_SO_SNDTIMEO) && fe->w_timeout > 0) {
fiber_timer_del(curr);
}
return ret;

View File

@ -619,7 +619,7 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
static void epoll_callback(EVENT *ev fiber_unused, EPOLL_EVENT *ee)
{
if (ee->fiber->status != FIBER_STATUS_READY) {
acl_fiber_ready(ee->fiber);
FIBER_READY(ee->fiber);
}
}

View File

@ -37,7 +37,7 @@
static void file_read_callback(EVENT *ev UNUSED, FILE_EVENT *fe)
{
if (fe->fiber_r->status != FIBER_STATUS_READY) {
acl_fiber_ready(fe->fiber_r);
FIBER_READY(fe->fiber_r);
}
}

View File

@ -310,7 +310,7 @@ static void pollfds_copy(struct pollfd *fds, const pollfds *pfds)
static void poll_callback(EVENT *ev fiber_unused, POLL_EVENT *pe)
{
if (pe->fiber->status != FIBER_STATUS_READY) {
acl_fiber_ready(pe->fiber);
FIBER_READY(pe->fiber);
}
}

View File

@ -252,7 +252,7 @@ static void alt_exec(FIBER_ALT *a)
alt_all_dequeue(other->xalt);
other->xalt[0].xalt = other;
acl_fiber_ready(other->fiber);
FIBER_READY(other->fiber);
} else
alt_copy(a, NULL);
}

View File

@ -118,7 +118,7 @@ void acl_fiber_lock_unlock(ACL_FIBER_LOCK *lk)
if ((lk->owner = ready) != NULL) {
ring_detach(&ready->me);
acl_fiber_ready(ready);
FIBER_READY(ready);
acl_fiber_yield();
}
}
@ -242,7 +242,7 @@ void acl_fiber_rwlock_runlock(ACL_FIBER_RWLOCK *lk)
if (--lk->readers == 0 && (fiber = FIRST_FIBER(&lk->wwaiting))) {
ring_detach(&lk->wwaiting);
lk->writer = fiber;
acl_fiber_ready(fiber);
FIBER_READY(fiber);
acl_fiber_yield();
}
}
@ -267,14 +267,14 @@ void acl_fiber_rwlock_wunlock(ACL_FIBER_RWLOCK *lk)
while ((fiber = FIRST_FIBER(&lk->rwaiting)) != NULL) {
ring_detach(&lk->rwaiting);
lk->readers++;
acl_fiber_ready(fiber);
FIBER_READY(fiber);
n++;
}
if (lk->readers == 0 && (fiber = FIRST_FIBER(&lk->wwaiting)) != NULL) {
ring_detach(&lk->wwaiting);
lk->writer = fiber;
acl_fiber_ready(fiber);
FIBER_READY(fiber);
n++;
}

View File

@ -612,7 +612,7 @@ int acl_fiber_mutex_unlock(ACL_FIBER_MUTEX *mutex)
if (fiber) {
if (thread_self() == fiber->tid) {
acl_fiber_ready(fiber);
FIBER_READY(fiber);
} else {
sync_waiter_wakeup(fiber->sync, fiber);
}

View File

@ -172,7 +172,7 @@ int acl_fiber_sem_post(ACL_FIBER_SEM *sem)
}
ring_detach(&ready->me);
acl_fiber_ready(ready);
FIBER_READY(ready);
/* Help the fiber to be wakeup to decrease the sem number. */
num = sem->num--;

View File

@ -74,12 +74,12 @@ static void wakeup_waiter(SYNC_TIMER *timer UNUSED, SYNC_OBJ *obj)
if (obj->delay < 0) {
// No timer has been set if delay < 0,
ring_detach(&obj->fb->me); // Safety detatch me from others.
acl_fiber_ready(obj->fb);
FIBER_READY(obj->fb);
} else if (fiber_timer_del(obj->fb) == 1) {
// Wakeup the waiting fiber before the timer arrives,
// just remove it from the timer.
ring_detach(&obj->fb->me); // Safety detatch me from others.
acl_fiber_ready(obj->fb);
FIBER_READY(obj->fb);
}
// else: The fiber has been awakened by the timer.
}

View File

@ -72,7 +72,7 @@ static void fiber_waiting(ACL_FIBER *fiber fiber_unused, void *ctx)
ACL_FIBER *fb = mbox_read(waiter->box, delay, &res);
if (fb) {
assert(fb->status == FIBER_STATUS_SUSPEND);
acl_fiber_ready(fb);
FIBER_READY(fb);
}
}
}