Optimize codes of fiber module.

This commit is contained in:
zhengshuxin 2023-07-07 16:50:22 +08:00
parent 49d73b733e
commit d2e0e4963d
12 changed files with 89 additions and 49 deletions

View File

@ -649,12 +649,13 @@ static ACL_FIBER *fiber_alloc(void (*fn)(ACL_FIBER *, void *),
id = (unsigned long) atomic_int64_add_fetch(__idgen_atomic, 1);
}
fiber->fid = id;
fiber->errnum = 0;
fiber->signum = 0;
fiber->oflag = attr ? attr->oflag : 0;
fiber->flag = 0;
fiber->status = FIBER_STATUS_NONE;
fiber->fid = id;
fiber->errnum = 0;
fiber->signum = 0;
fiber->oflag = attr ? attr->oflag : 0;
fiber->flag = 0;
fiber->status = FIBER_STATUS_NONE;
fiber->wstatus = FIBER_WAIT_NONE;
#ifdef DEBUG_LOCK
fiber->waiting = NULL;

View File

@ -13,22 +13,26 @@ extern void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);
#endif
*/
typedef enum {
FIBER_STATUS_NONE,
FIBER_STATUS_READY,
FIBER_STATUS_SUSPEND,
FIBER_STATUS_RUNNING,
FIBER_STATUS_WAIT_READ,
FIBER_STATUS_WAIT_WRITE,
FIBER_STATUS_POLL_WAIT,
FIBER_STATUS_EPOLL_WAIT,
FIBER_STATUS_WAIT_MUTEX,
FIBER_STATUS_WAIT_COND,
FIBER_STATUS_WAIT_LOCK,
FIBER_STATUS_WAIT_SEM,
FIBER_STATUS_DELAY,
FIBER_STATUS_EXITING,
} fiber_status_t;
enum {
FIBER_STATUS_NONE = (0),
FIBER_STATUS_READY = (1 << 0),
FIBER_STATUS_RUNNING = (1 << 1),
FIBER_STATUS_SUSPEND = (1 << 2),
FIBER_STATUS_EXITING = (1 << 3),
};
enum {
FIBER_WAIT_NONE = (0),
FIBER_WAIT_READ = (1 << 1),
FIBER_WAIT_WRITE = (1 << 2),
FIBER_WAIT_POLL = (1 << 3),
FIBER_WAIT_EPOLL = (1 << 4),
FIBER_WAIT_MUTEX = (1 << 5),
FIBER_WAIT_COND = (1 << 6),
FIBER_WAIT_LOCK = (1 << 7),
FIBER_WAIT_SEM = (1 << 8),
FIBER_WAIT_DELAY = (1 << 9),
};
typedef struct {
void *ctx;
@ -51,14 +55,15 @@ struct SYNC_WAITER;
struct ACL_FIBER {
FIBER_BASE *base;
long tid;
fiber_status_t status;
RING me;
long tid;
unsigned int fid;
unsigned slot;
long long when;
int errnum;
int signum;
unsigned short status;
unsigned short wstatus;
unsigned int oflag;
unsigned int flag;

View File

@ -332,12 +332,13 @@ unsigned int acl_fiber_delay(unsigned int milliseconds)
fiber = acl_fiber_running();
fiber_timer_add(fiber, milliseconds);
fiber->status = FIBER_STATUS_DELAY;
fiber->wstatus |= FIBER_WAIT_DELAY;
WAITER_INC(__thread_fiber->event);
acl_fiber_switch();
WAITER_DEC(__thread_fiber->event);
fiber->wstatus &= ~FIBER_WAIT_DELAY;
// Clear the flag been set in wakeup_timers.
fiber->flag &= ~FIBER_F_TIMER;
@ -457,7 +458,7 @@ int fiber_wait_read(FILE_EVENT *fe)
return ret;
}
fe->fiber_r->status = FIBER_STATUS_WAIT_READ;
fe->fiber_r->wstatus |= FIBER_WAIT_READ;
SET_READWAIT(fe);
if (!(fe->type & TYPE_INTERNAL)) {
@ -466,6 +467,7 @@ int fiber_wait_read(FILE_EVENT *fe)
acl_fiber_switch();
fe->fiber_r->wstatus &= ~FIBER_WAIT_READ;
fe->fiber_r = NULL;
if (!(fe->type & TYPE_INTERNAL)) {
@ -502,7 +504,7 @@ int fiber_wait_write(FILE_EVENT *fe)
return ret;
}
fe->fiber_w->status = FIBER_STATUS_WAIT_WRITE;
fe->fiber_w->wstatus |= FIBER_WAIT_WRITE;
SET_WRITEWAIT(fe);
if (!(fe->type & TYPE_INTERNAL)) {
@ -511,6 +513,7 @@ int fiber_wait_write(FILE_EVENT *fe)
acl_fiber_switch();
fe->fiber_w->wstatus &= ~FIBER_WAIT_WRITE;
fe->fiber_w = NULL;
if (!(fe->type & TYPE_INTERNAL)) {
@ -703,8 +706,6 @@ void fiber_file_close(FILE_EVENT *fe)
if (IS_READWAIT(fe) && fe->fiber_r && fe->fiber_r != curr
&& fe->fiber_r->status != FIBER_STATUS_EXITING) {
//&& fe->fiber_r->status >= FIBER_STATUS_WAIT_READ
//&& fe->fiber_r->status <= FIBER_STATUS_EPOLL_WAIT) {
// The current fiber is closing the other fiber's fd, and the
// other fiber hoding the fd is blocked by waiting for the
@ -726,8 +727,6 @@ void fiber_file_close(FILE_EVENT *fe)
if (IS_WRITEWAIT(fe) && fe->fiber_w && fe->fiber_w != curr
&& fe->fiber_w->status != FIBER_STATUS_EXITING) {
//&& fe->fiber_w->status >= FIBER_STATUS_WAIT_READ
//&& fe->fiber_w->status <= FIBER_STATUS_EPOLL_WAIT) {
CLR_WRITEWAIT(fe);
SET_CLOSING(fe);

View File

@ -687,12 +687,14 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
while (1) {
timer_cache_add(ev->epoll_list, ee->expire, &ee->me);
ee->fiber->status = FIBER_STATUS_EPOLL_WAIT;
ee->fiber->wstatus |= FIBER_WAIT_EPOLL;
WAITER_INC(ev);
acl_fiber_switch();
WAITER_DEC(ev);
ee->fiber->wstatus &= ~FIBER_WAIT_EPOLL;
if (ee->nready == 0) {
timer_cache_remove(ev->epoll_list, ee->expire, &ee->me);
}

View File

@ -529,7 +529,7 @@ ssize_t splice(int fd_in, loff_t *poff_in, int fd_out,
// The same fd_in maybe be shared by multiple fibers, so we should
// alloc one new FILE_EVENT for each operation.
FILE_ALLOC(fe, EVENT_SPLICE);
fe->fiber_r->status = FIBER_STATUS_WAIT_READ;
fe->fiber_r->wstatus |= FIBER_WAIT_READ;
event_uring_splice(ev, fe, fd_in, off_in, fd_out, off_out, len, flags,
sqe_flags, IORING_OP_SPLICE);
@ -538,6 +538,7 @@ ssize_t splice(int fd_in, loff_t *poff_in, int fd_out,
acl_fiber_switch();
WAITER_DEC(ev);
fe->fiber_r->wstatus &= ~FIBER_WAIT_READ;
fe->mask &= ~EVENT_SPLICE;
if (fe->reader_ctx.res < 0) {

View File

@ -11,7 +11,8 @@
#define IS_INVALID(fd) (fd == INVALID_SOCKET)
#endif
#ifdef SYS_UNIX
#if defined(SYS_UNIX) && !defined(DISABLE_HOOK_IO)
unsigned int sleep(unsigned int seconds)
{
if (!var_hook_sys_api) {
@ -29,6 +30,7 @@ int close(socket_t fd)
{
return acl_fiber_close(fd);
}
#endif
int WINAPI acl_fiber_close(socket_t fd)
@ -66,7 +68,7 @@ int WINAPI acl_fiber_close(socket_t fd)
#ifdef HAS_EPOLL
/* when the fd was closed by epoll_close normally, the fd
* must be a epoll fd which was created by epoll_create function
* hooked in hook_net.c
* hooked in epoll.c
*/
if (epoll_close(fd) == 0) {
return 0;
@ -237,6 +239,7 @@ ssize_t acl_fiber_recvfrom(socket_t sockfd, void *buf, size_t len,
}
#ifdef SYS_UNIX
ssize_t acl_fiber_recvmsg(socket_t sockfd, struct msghdr *msg, int flags)
{
FILE_EVENT *fe;
@ -284,11 +287,13 @@ int acl_fiber_recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
return fiber_recvmmsg(fe, msgvec, vlen, flags, timeout);
}
# endif
#endif
/****************************************************************************/
#ifdef SYS_UNIX
ssize_t acl_fiber_write(socket_t fd, const void *buf, size_t count)
{
FILE_EVENT *fe;
@ -328,6 +333,7 @@ ssize_t acl_fiber_writev(socket_t fd, const struct iovec *iov, int iovcnt)
fe = fiber_file_open_write(fd);
return fiber_writev(fe, iov, iovcnt);
}
#endif
#ifdef SYS_WIN
@ -383,6 +389,7 @@ ssize_t acl_fiber_sendto(socket_t sockfd, const void *buf, size_t len,
}
#ifdef SYS_UNIX
ssize_t acl_fiber_sendmsg(socket_t sockfd, const struct msghdr *msg, int flags)
{
FILE_EVENT *fe;
@ -430,11 +437,13 @@ int acl_fiber_sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
return fiber_sendmmsg(fe, msgvec, vlen, flags);
}
# endif
#endif
/****************************************************************************/
#if defined(SYS_UNIX) && !defined(DISABLE_HOOK_IO)
ssize_t read(socket_t fd, void *buf, size_t count)
{
return acl_fiber_read(fd, buf, count);
@ -451,7 +460,7 @@ ssize_t recv(socket_t sockfd, void *buf, size_t len, int flags)
}
ssize_t recvfrom(socket_t sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen)
struct sockaddr *src_addr, socklen_t *addrlen)
{
return acl_fiber_recvfrom(sockfd, buf, len, flags, src_addr, addrlen);
}
@ -460,9 +469,11 @@ ssize_t recvmsg(socket_t sockfd, struct msghdr *msg, int flags)
{
return acl_fiber_recvmsg(sockfd, msg, flags);
}
#endif // SYS_UNIX
#if defined(SYS_UNIX) && !defined(DISABLE_HOOK_IO)
ssize_t write(socket_t fd, const void *buf, size_t count)
{
return acl_fiber_write(fd, buf, count);
@ -479,7 +490,7 @@ ssize_t send(socket_t sockfd, const void *buf, size_t len, int flags)
}
ssize_t sendto(socket_t sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
const struct sockaddr *dest_addr, socklen_t addrlen)
{
return acl_fiber_sendto(sockfd, buf, len, flags, dest_addr, addrlen);
}

View File

@ -394,12 +394,14 @@ int WINAPI acl_fiber_poll(struct pollfd *fds, nfds_t nfds, int timeout)
}
pe->nready = 0;
pe->fiber->status = FIBER_STATUS_POLL_WAIT;
pe->fiber->wstatus |= FIBER_WAIT_POLL;
WAITER_INC(ev);
acl_fiber_switch();
WAITER_DEC(ev);
pe->fiber->wstatus &= ~FIBER_WAIT_POLL;
if (pe->nready == 0 && pe->expire >= 0) {
timer_cache_remove(ev->poll_list, pe->expire, &pe->me);
}

View File

@ -505,8 +505,7 @@ static void fiber_timeout(ACL_FIBER *fiber UNUSED, void *ctx)
// we can kill the fiber only if the fiber is waiting
// for readable ore writable of IO process.
if (fe->fiber_r->status == FIBER_STATUS_WAIT_READ
|| fe->fiber_w->status == FIBER_STATUS_WAIT_WRITE) {
if (fe->fiber_r->wstatus & (FIBER_WAIT_READ | FIBER_WAIT_WRITE)) {
tc->fiber->errnum = FIBER_EAGAIN;
acl_fiber_signal(tc->fiber, SIGINT);

View File

@ -140,7 +140,8 @@ static int fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex,
FIBER_UNLOCK(mutex);
fiber->status = FIBER_STATUS_WAIT_COND;
fiber->wstatus |= FIBER_WAIT_COND;
WAITER_INC(ev);
// Hang the current fiber and will wakeup if the timer arrives or
@ -149,6 +150,8 @@ static int fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex,
WAITER_DEC(ev);
fiber->wstatus &= ~FIBER_WAIT_COND;
FIBER_LOCK(mutex);
LOCK_COND(cond);

View File

@ -19,8 +19,7 @@ struct ACL_FIBER_RWLOCK {
ACL_FIBER_LOCK *acl_fiber_lock_create(void)
{
ACL_FIBER_LOCK *lk = (ACL_FIBER_LOCK *)
mem_malloc(sizeof(ACL_FIBER_LOCK));
ACL_FIBER_LOCK *lk = (ACL_FIBER_LOCK *) mem_malloc(sizeof(ACL_FIBER_LOCK));
lk->owner = NULL;
ring_init(&lk->me);
@ -59,12 +58,15 @@ static int __lock(ACL_FIBER_LOCK *lk, int block)
curr->waiting = lk;
#endif
curr->status = FIBER_STATUS_WAIT_LOCK;
curr->wstatus |= FIBER_WAIT_LOCK;
ev = fiber_io_event();
WAITER_INC(ev); // Just for avoiding fiber_io_loop to exit
acl_fiber_switch();
WAITER_DEC(ev);
curr->wstatus &= ~FIBER_WAIT_LOCK;
/* if switch to me because other killed me, I should detach myself;
* else if because other unlock, I'll be detached twice which is
* hamless because RING can deal with it.
@ -162,12 +164,15 @@ static int __rlock(ACL_FIBER_RWLOCK *lk, int block)
curr = acl_fiber_running();
ring_prepend(&lk->rwaiting, &curr->me);
curr->status = FIBER_STATUS_WAIT_LOCK;
curr->wstatus |= FIBER_WAIT_LOCK;
ev = fiber_io_event();
WAITER_INC(ev); // Just for avoiding fiber_io_loop to exit
acl_fiber_switch();
WAITER_DEC(ev);
curr->wstatus &= ~FIBER_WAIT_LOCK;
/* if switch to me because other killed me, I should detach myself */
ring_detach(&curr->me);
@ -201,12 +206,15 @@ static int __wlock(ACL_FIBER_RWLOCK *lk, int block)
curr = acl_fiber_running();
ring_prepend(&lk->wwaiting, &curr->me);
curr->status = FIBER_STATUS_WAIT_LOCK;
curr->wstatus |= FIBER_WAIT_LOCK;
ev = fiber_io_event();
WAITER_INC(ev); // Just for avoiding fiber_io_loop to exit
acl_fiber_switch();
WAITER_DEC(ev);
curr->wstatus &= ~FIBER_WAIT_LOCK;
/* if switch to me because other killed me, I should detach myself */
ring_detach(&curr->me);

View File

@ -487,12 +487,15 @@ static int fiber_mutex_lock_once(ACL_FIBER_MUTEX *mutex)
pthread_mutex_unlock(&mutex->lock);
fiber->status = FIBER_STATUS_WAIT_MUTEX;
fiber->wstatus |= FIBER_WAIT_MUTEX;
ev = fiber_io_event();
WAITER_INC(ev);
acl_fiber_switch();
WAITER_DEC(ev);
fiber->wstatus &= ~FIBER_WAIT_MUTEX;
if (++wakeup > 5) {
wakeup = 0;
acl_fiber_delay(100);
@ -530,12 +533,15 @@ static int fiber_mutex_lock_try(ACL_FIBER_MUTEX *mutex)
pthread_mutex_unlock(&mutex->lock);
fiber->status = FIBER_STATUS_WAIT_MUTEX;
fiber->wstatus |= FIBER_WAIT_MUTEX;
ev = fiber_io_event();
WAITER_INC(ev);
acl_fiber_switch();
WAITER_DEC(ev);
fiber->wstatus &= ~FIBER_WAIT_MUTEX;
if (++wakeup > 5) {
wakeup = 0;
acl_fiber_delay(100);

View File

@ -86,12 +86,15 @@ int acl_fiber_sem_wait(ACL_FIBER_SEM *sem)
ring_prepend(&sem->waiting, &curr->me);
curr->status = FIBER_STATUS_WAIT_SEM;
curr->wstatus |= FIBER_WAIT_SEM;
ev = fiber_io_event();
WAITER_INC(ev); // Just for avoiding fiber_io_loop to exit
acl_fiber_switch();
WAITER_DEC(ev);
curr->wstatus &= ~FIBER_WAIT_SEM;
/* If switch to me because other killed me, I should detach myself;
* else if because other unlock, I'll be detached twice which is
* hamless because RIGN can deal with it.