fixed bugs in fiber module.

This commit is contained in:
zhengshuxin 2017-01-12 20:09:06 +08:00
parent 627ec25f93
commit cfd3cd98ee
8 changed files with 327 additions and 151 deletions

View File

@ -299,10 +299,10 @@ typedef struct ACL_FIBER_SEM ACL_FIBER_SEM;
/**
*
* @param max {int} > 0
* @param num {int} >= 0
* @return {ACL_FIBER_SEM *}
*/
ACL_FIBER_SEM *acl_fiber_sem_create(int max);
ACL_FIBER_SEM *acl_fiber_sem_create(int num);
/**
*

View File

@ -10,18 +10,28 @@
#include "event_epoll.h"
#include "event.h"
//#define DEBUG
#ifdef DEBUG
# define ASSERT assert
#else
# define ASSERT (void)
#endif
EVENT *event_create(int size)
{
int i;
EVENT *ev = event_epoll_create(size);
ev->events = (FILE_EVENT *) acl_mycalloc(size, sizeof(FILE_EVENT));
ev->defers = (DEFER_DELETE *) acl_mycalloc(size, sizeof(FILE_EVENT));
ev->fired = (FIRED_EVENT *) acl_mycalloc(size, sizeof(FIRED_EVENT));
ev->setsize = size;
ev->maxfd = -1;
ev->ndefer = 0;
ev->timeout = -1;
ev->events = (FILE_EVENT *) acl_mycalloc(size, sizeof(FILE_EVENT));
ev->r_defers = (DEFER_DELETE *) acl_mycalloc(size, sizeof(FILE_EVENT));
ev->w_defers = (DEFER_DELETE *) acl_mycalloc(size, sizeof(FILE_EVENT));
ev->fired = (FIRED_EVENT *) acl_mycalloc(size, sizeof(FIRED_EVENT));
ev->timeout = -1;
ev->setsize = size;
ev->maxfd = -1;
ev->r_ndefer = 0;
ev->w_ndefer = 0;
acl_ring_init(&ev->poll_list);
acl_ring_init(&ev->epoll_list);
@ -29,9 +39,10 @@ EVENT *event_create(int size)
* vector with it.
*/
for (i = 0; i < size; i++) {
ev->events[i].mask = EVENT_NONE;
ev->events[i].mask_fired = EVENT_NONE;
ev->events[i].defer = NULL;
ev->events[i].mask = EVENT_NONE;
ev->events[i].mask_fired = EVENT_NONE;
ev->events[i].r_defer = NULL;
ev->events[i].w_defer = NULL;
}
return ev;
@ -55,14 +66,16 @@ int event_size(EVENT *ev)
void event_free(EVENT *ev)
{
FILE_EVENT *events = ev->events;
DEFER_DELETE *defers = ev->defers;
FIRED_EVENT *fired = ev->fired;
FILE_EVENT *events = ev->events;
FIRED_EVENT *fired = ev->fired;
DEFER_DELETE *r_defers = ev->r_defers;
DEFER_DELETE *w_defers = ev->w_defers;
ev->free(ev);
acl_myfree(events);
acl_myfree(defers);
acl_myfree(r_defers);
acl_myfree(w_defers);
acl_myfree(fired);
}
@ -101,9 +114,86 @@ static int check_fdtype(int fd)
return -1;
}
#define DEL_DELAY
#ifdef DEL_DELAY
static int event_defer_r_merge(EVENT *ev, int fd, int mask)
{
FILE_EVENT *fe = &ev->events[fd];
int fd2, pos = fe->r_defer->pos;
int to_mask = mask | (fe->mask & ~(ev->r_defers[pos].mask));
ASSERT(to_mask != 0);
ev->r_ndefer--;
ASSERT(ev->r_ndefer >= 0);
fd2 = ev->r_defers[ev->r_ndefer].fd;
if (ev->r_ndefer > 0) {
ev->r_defers[pos].mask = ev->r_defers[ev->r_ndefer].mask;
ev->r_defers[pos].pos = pos;
ev->r_defers[pos].fd = fd2;
ev->events[fd2].r_defer = &ev->r_defers[pos];
} else {
if (fd2 >= 0)
ev->events[fd2].r_defer = NULL;
ev->r_defers[0].mask = EVENT_NONE;
ev->r_defers[0].pos = 0;
}
if (ev->add(ev, fd, to_mask) == -1) {
acl_msg_error("mod fd(%d) error: %s", fd, acl_last_serror());
return -1;
}
ev->r_defers[ev->r_ndefer].fd = -1;
fe->r_defer = NULL;
fe->mask = to_mask;
return 0;
}
static int event_defer_w_merge(EVENT *ev, int fd, int mask)
{
FILE_EVENT *fe = &ev->events[fd];
int fd2, pos = fe->w_defer->pos;
int to_mask = mask | (fe->mask & ~(ev->w_defers[pos].mask));
ASSERT(to_mask != 0);
ev->w_ndefer--;
fd2 = ev->w_defers[ev->w_ndefer].fd;
if (ev->w_ndefer > 0) {
ev->w_defers[pos].mask = ev->w_defers[ev->w_ndefer].mask;
ev->w_defers[pos].pos = pos;
ev->w_defers[pos].fd = fd2;
ev->events[fd2].w_defer = &ev->w_defers[pos];
} else {
if (fd2 >= 0)
ev->events[fd2].w_defer = NULL;
ev->w_defers[0].mask = EVENT_NONE;
ev->w_defers[0].pos = 0;
}
if (ev->add(ev, fd, to_mask) == -1) {
acl_msg_error("mod fd(%d) error: %s", fd, acl_last_serror());
return -1;
}
ev->w_defers[ev->w_ndefer].fd = -1;
fe->w_defer = NULL;
fe->mask = to_mask;
return 0;
}
#endif /* !DEL_DELAY */
int event_add(EVENT *ev, int fd, int mask, event_proc *proc, void *ctx)
{
FILE_EVENT *fe;
int nmerged = 0;
if (fd >= ev->setsize) {
acl_msg_error("fd: %d >= setsize: %d", fd, ev->setsize);
@ -113,48 +203,34 @@ int event_add(EVENT *ev, int fd, int mask, event_proc *proc, void *ctx)
fe = &ev->events[fd];
if (fe->defer != NULL) {
int fd2, pos = fe->defer->pos;
int to_mask = mask | (fe->mask & ~(ev->defers[pos].mask));
assert(to_mask != 0);
ev->ndefer--;
fd2 = ev->defers[ev->ndefer].fd;
if (ev->ndefer > 0) {
ev->defers[pos].mask = ev->defers[ev->ndefer].mask;
ev->defers[pos].pos = pos;
ev->defers[pos].fd = fd2;
ev->events[fd2].defer = &ev->defers[pos];
} else {
if (fd2 >= 0)
ev->events[fd2].defer = NULL;
ev->defers[0].mask = EVENT_NONE;
ev->defers[0].pos = 0;
}
if (ev->add(ev, fd, to_mask) == -1) {
acl_msg_error("mod fd(%d) error: %s",
fd, acl_last_serror());
return -1;
}
ev->defers[ev->ndefer].fd = -1;
fe->defer = NULL;
fe->mask = to_mask;
} else {
if (fe->type == TYPE_NONE) {
if (check_fdtype(fd) < 0) {
fe->type = TYPE_NOSOCK;
return 0;
}
if (fe->type == TYPE_NOSOCK)
return 0;
else if (fe->type == TYPE_NONE) {
if (check_fdtype(fd) == 0)
fe->type = TYPE_SOCK;
} else if (fe->type == TYPE_NOSOCK)
else {
fe->type = TYPE_NOSOCK;
return 0;
}
}
#ifdef DEL_DELAY
if ((mask & EVENT_READABLE) && fe->r_defer != NULL) {
if (event_defer_r_merge(ev, fd, mask) < 0)
return -1;
else
nmerged++;
}
if ((mask & EVENT_WRITABLE) && fe->w_defer != NULL) {
if (event_defer_w_merge(ev, fd, mask) < 0)
return -1;
else
nmerged++;
}
#endif
if (nmerged == 0) {
if (ev->add(ev, fd, mask) == -1) {
acl_msg_error("add fd(%d) error: %s",
fd, acl_last_serror());
@ -164,12 +240,15 @@ int event_add(EVENT *ev, int fd, int mask, event_proc *proc, void *ctx)
fe->mask |= mask;
}
if (mask & EVENT_READABLE)
if (mask & EVENT_READABLE) {
fe->r_proc = proc;
if (mask & EVENT_WRITABLE)
fe->w_proc = proc;
fe->r_ctx = ctx;
}
fe->ctx = ctx;
if (mask & EVENT_WRITABLE) {
fe->w_proc = proc;
fe->w_ctx = ctx;
}
if (fd > ev->maxfd)
ev->maxfd = fd;
@ -190,18 +269,17 @@ static void __event_del(EVENT *ev, int fd, int mask)
fe = &ev->events[fd];
if (fe->mask == EVENT_NONE) {
/* acl_msg_info("----mask NONE, fd: %d----", fd); */
fe->mask_fired = EVENT_NONE;
fe->defer = NULL;
fe->r_defer = NULL;
fe->w_defer = NULL;
fe->pe = NULL;
} else if (ev->del(ev, fd, mask) == 1) {
fe->mask_fired = EVENT_NONE;
fe->type = TYPE_NONE;
fe->defer = NULL;
fe->pe = NULL;
fe->mask = fe->mask & (~mask);
fe->mask = fe->mask & (~mask);
} else
fe->mask = fe->mask & (~mask);
fe->mask = fe->mask & (~mask);
if (fd == ev->maxfd && fe->mask == EVENT_NONE) {
/* Update the max fd */
@ -214,68 +292,130 @@ static void __event_del(EVENT *ev, int fd, int mask)
}
}
#define DEL_DELAY
#ifdef DEL_DELAY
static void event_defer_r_del(EVENT *ev, FILE_EVENT *fe)
{
int fd;
ev->r_ndefer--;
ASSERT(ev->r_ndefer >= 0);
fd = ev->r_defers[ev->r_ndefer].fd;
if (ev->r_ndefer > 0) {
int pos = fe->r_defer->pos;
ev->r_defers[pos].mask = ev->r_defers[ev->r_ndefer].mask;
ev->r_defers[pos].pos = fe->r_defer->pos;
ev->r_defers[pos].fd = fd;
/* move the last item here */
ev->events[fd].r_defer = &ev->r_defers[pos];
} else {
if (fd >= 0)
ev->events[fd].r_defer = NULL;
ev->r_defers[0].mask = EVENT_NONE;
ev->r_defers[0].pos = 0;
}
ev->r_defers[ev->r_ndefer].fd = -1;
fe->r_defer = NULL;
}
static void event_defer_w_del(EVENT *ev, FILE_EVENT *fe)
{
int fd;
ev->w_ndefer--;
fd = ev->w_defers[ev->w_ndefer].fd;
if (ev->w_ndefer > 0) {
int pos = fe->w_defer->pos;
ev->w_defers[pos].mask = ev->w_defers[ev->w_ndefer].mask;
ev->w_defers[pos].pos = fe->w_defer->pos;
ev->w_defers[pos].fd = fd;
/* move the last item here */
ev->events[fd].w_defer = &ev->w_defers[pos];
} else {
if (fd >= 0)
ev->events[fd].w_defer = NULL;
ev->w_defers[0].mask = EVENT_NONE;
ev->w_defers[0].pos = 0;
}
ev->w_defers[ev->w_ndefer].fd = -1;
fe->w_defer = NULL;
}
static void event_error_del(EVENT *ev, int fd)
{
FILE_EVENT *fe = &ev->events[fd];
if (fe->r_defer != NULL)
event_defer_r_del(ev, fe);
if (fe->w_defer != NULL)
event_defer_w_del(ev, fe);
__event_del(ev, fd, fe->mask);
}
static void event_defer_r_add(EVENT *ev, int fd)
{
ev->r_defers[ev->r_ndefer].fd = fd;
ev->r_defers[ev->r_ndefer].mask = EVENT_READABLE;
ev->r_defers[ev->r_ndefer].pos = ev->r_ndefer;
ev->events[fd].r_defer = &ev->r_defers[ev->r_ndefer];
ev->r_ndefer++;
}
static void event_defer_w_add(EVENT *ev, int fd)
{
ev->w_defers[ev->w_ndefer].fd = fd;
ev->w_defers[ev->w_ndefer].mask = EVENT_WRITABLE;
ev->w_defers[ev->w_ndefer].pos = ev->w_ndefer;
ev->events[fd].w_defer = &ev->w_defers[ev->w_ndefer];
ev->w_ndefer++;
}
void event_del(EVENT *ev, int fd, int mask)
{
FILE_EVENT *fe;
fe = &ev->events[fd];
if (fe->type == TYPE_NOSOCK) {
fe->type = TYPE_NONE;
return;
if (ev->events[fd].type == TYPE_NOSOCK)
ev->events[fd].type = TYPE_NONE;
else if ((mask & EVENT_ERROR) != 0)
event_error_del(ev, fd);
else {
if (mask & EVENT_READABLE)
event_defer_r_add(ev, fd);
if (mask & EVENT_WRITABLE)
event_defer_w_add(ev, fd);
}
#ifdef DEL_DELAY
if ((mask & EVENT_ERROR) == 0 && (mask & EVENT_WRITABLE) == 0) {
ev->defers[ev->ndefer].fd = fd;
ev->defers[ev->ndefer].mask = mask;
ev->defers[ev->ndefer].pos = ev->ndefer;
ev->events[fd].defer = &ev->defers[ev->ndefer];
ev->ndefer++;
return;
}
#endif
if (fe->defer != NULL) {
int fd2;
ev->ndefer--;
fd2 = ev->defers[ev->ndefer].fd;
if (ev->ndefer > 0) {
int pos = fe->defer->pos;
ev->defers[pos].mask = ev->defers[ev->ndefer].mask;
ev->defers[pos].pos = fe->defer->pos;
ev->defers[pos].fd = fd2;
ev->events[fd2].defer = &ev->defers[pos];
} else {
if (fd2 >= 0)
ev->events[fd2].defer = NULL;
ev->defers[0].mask = EVENT_NONE;
ev->defers[0].pos = 0;
}
ev->defers[ev->ndefer].fd = -1;
fe->defer = NULL;
}
#ifdef DEL_DELAY
__event_del(ev, fd, fe->mask);
#else
__event_del(ev, fd, mask);
#endif
}
#else
void event_del(EVENT *ev, int fd, int mask)
{
if (ev->events[fd].type == TYPE_NOSOCK)
ev->events[fd].type = TYPE_NONE;
else
__event_del(ev, fd, mask);
}
#endif /* !DEL_DELAY */
int event_process(EVENT *ev, int timeout)
{
int processed = 0, numevents, j;
int mask, fd, rfired, ndefer;
int mask, fd, rfired;
FILE_EVENT *fe;
#ifdef DEL_DELAY
int ndefer;
#endif
if (ev->timeout < 0) {
if (timeout < 0)
@ -291,14 +431,26 @@ int event_process(EVENT *ev, int timeout)
if (timeout > 1000)
timeout = 1000;
ndefer = ev->ndefer;
#ifdef DEL_DELAY
ndefer = ev->r_ndefer;
for (j = 0; j < ndefer; j++) {
__event_del(ev, ev->defers[j].fd, ev->defers[j].mask);
ev->events[ev->defers[j].fd].defer = NULL;
ev->defers[j].fd = -1;
ev->ndefer--;
__event_del(ev, ev->r_defers[j].fd, ev->r_defers[j].mask);
ev->events[ev->r_defers[j].fd].r_defer = NULL;
ev->r_defers[j].fd = -1;
ev->r_ndefer--;
}
ASSERT(ev->r_ndefer == 0);
ndefer = ev->w_ndefer;
for (j = 0; j < ndefer; j++) {
__event_del(ev, ev->w_defers[j].fd, ev->w_defers[j].mask);
ev->events[ev->w_defers[j].fd].w_defer = NULL;
ev->w_defers[j].fd = -1;
ev->w_ndefer--;
}
ASSERT(ev->w_ndefer == 0);
#endif
numevents = ev->loop(ev, timeout);
@ -315,13 +467,13 @@ int event_process(EVENT *ev, int timeout)
*/
if (fe->mask & mask & EVENT_READABLE) {
rfired = 1;
fe->r_proc(ev, fd, fe->ctx, mask);
fe->r_proc(ev, fd, fe->r_ctx, EVENT_READABLE);
} else
rfired = 0;
if (fe->mask & mask & EVENT_WRITABLE) {
if (!rfired || fe->w_proc != fe->r_proc)
fe->w_proc(ev, fd, fe->ctx, mask);
fe->w_proc(ev, fd, fe->w_ctx, EVENT_WRITABLE);
}
processed++;

View File

@ -32,9 +32,11 @@ struct FILE_EVENT {
int mask_fired;
event_proc *r_proc;
event_proc *w_proc;
void *r_ctx;
void *w_ctx;
POLL_EVENT *pe;
void *ctx;
DEFER_DELETE *defer;
DEFER_DELETE *r_defer;
DEFER_DELETE *w_defer;
};
struct POLL_EVENT {
@ -80,20 +82,22 @@ struct DEFER_DELETE {
};
struct EVENT {
int timeout;
int setsize;
int maxfd;
int ndefer;
int timeout;
int r_ndefer;
int w_ndefer;
FILE_EVENT *events;
FIRED_EVENT *fired;
DEFER_DELETE *defers;
DEFER_DELETE *r_defers;
DEFER_DELETE *w_defers;
ACL_RING poll_list;
ACL_RING epoll_list;
ACL_RING_ITER iter;
const char *(*name)(void);
int (*handle)(EVENT *);
int (*loop)(EVENT *, int timeout);
int (*loop)(EVENT *, int);
int (*add)(EVENT *, int, int);
int (*del)(EVENT *, int, int);
void (*free)(EVENT *);

View File

@ -67,7 +67,7 @@ static int epoll_event_add(EVENT *ev, int fd, int mask)
ee.data.ptr = NULL;
ee.data.fd = fd;
#if 0
#if 1
mask |= ev->events[fd].mask; /* Merge old events */
#endif
@ -119,6 +119,9 @@ static int epoll_event_del(EVENT *ev, int fd, int delmask)
if (mask != EVENT_NONE) {
if (__sys_epoll_ctl(ep->epfd, EPOLL_CTL_MOD, fd, &ee) < 0) {
fiber_save_errno();
if (errno == EEXIST)
return 0;
acl_msg_error("%s(%d), epoll_ctl error: %s, fd: %d",
__FUNCTION__, __LINE__, acl_last_serror(), fd);
return -1;

View File

@ -5,7 +5,6 @@
typedef struct {
EVENT *event;
ACL_FIBER **io_fibers;
size_t io_count;
ACL_FIBER *ev_fiber;
ACL_RING ev_timer;
@ -50,8 +49,6 @@ static void thread_free(void *ctx)
if (tf->event)
event_free(tf->event);
if (tf->io_fibers)
acl_myfree(tf->io_fibers);
acl_myfree(tf);
if (__main_fiber == __thread_fiber)
@ -89,8 +86,6 @@ void fiber_io_check(void)
__thread_fiber = (FIBER_TLS *) acl_mymalloc(sizeof(FIBER_TLS));
__thread_fiber->event = event_create(__maxfd);
__thread_fiber->io_fibers = (ACL_FIBER **)
acl_mycalloc(__maxfd, sizeof(ACL_FIBER *));
__thread_fiber->ev_fiber = acl_fiber_create(fiber_io_loop,
__thread_fiber->event, STACK_SIZE);
__thread_fiber->io_count = 0;
@ -305,53 +300,61 @@ unsigned int acl_fiber_sleep(unsigned int seconds)
return acl_fiber_delay(seconds * 1000) / 1000;
}
static void read_callback(EVENT *ev, int fd, void *ctx acl_unused, int mask)
static void read_callback(EVENT *ev, int fd, void *ctx, int mask)
{
ACL_FIBER *me = (ACL_FIBER *) ctx;
event_del(ev, fd, mask);
acl_fiber_ready(__thread_fiber->io_fibers[fd]);
acl_fiber_ready(me);
__thread_fiber->io_count--;
__thread_fiber->io_fibers[fd] = NULL;
}
void fiber_wait_read(int fd)
{
ACL_FIBER *me;
fiber_io_check();
me = acl_fiber_running();
if (event_add(__thread_fiber->event,
fd, EVENT_READABLE, read_callback, NULL) <= 0)
fd, EVENT_READABLE, read_callback, me) <= 0)
{
//acl_msg_info(">>>%s(%d): fd: %d, not sock<<<",
// __FUNCTION__, __LINE__, fd);
return;
}
__thread_fiber->io_fibers[fd] = acl_fiber_running();
__thread_fiber->io_count++;
acl_fiber_switch();
}
static void write_callback(EVENT *ev, int fd, void *ctx acl_unused, int mask)
static void write_callback(EVENT *ev, int fd, void *ctx, int mask)
{
ACL_FIBER *me = (ACL_FIBER *) ctx;
event_del(ev, fd, mask);
acl_fiber_ready(__thread_fiber->io_fibers[fd]);
acl_fiber_ready(me);
__thread_fiber->io_count--;
__thread_fiber->io_fibers[fd] = NULL;
}
void fiber_wait_write(int fd)
{
ACL_FIBER *me;
fiber_io_check();
me = acl_fiber_running();
if (event_add(__thread_fiber->event, fd,
EVENT_WRITABLE, write_callback, NULL) <= 0)
EVENT_WRITABLE, write_callback, me) <= 0)
{
return;
}
__thread_fiber->io_fibers[fd] = acl_fiber_running();
__thread_fiber->io_count++;
acl_fiber_switch();

View File

@ -704,10 +704,12 @@ inline ssize_t fiber_write(int fd, const void *buf, size_t count)
fiber_wait_write(fd);
me = acl_fiber_running();
if (acl_fiber_killed(me))
if (acl_fiber_killed(me)) {
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
return -1;
}
}
}
@ -739,10 +741,12 @@ inline ssize_t fiber_writev(int fd, const struct iovec *iov, int iovcnt)
fiber_wait_write(fd);
me = acl_fiber_running();
if (acl_fiber_killed(me))
if (acl_fiber_killed(me)) {
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
return -1;
}
}
}
@ -774,10 +778,12 @@ inline ssize_t fiber_send(int sockfd, const void *buf, size_t len, int flags)
fiber_wait_write(sockfd);
me = acl_fiber_running();
if (acl_fiber_killed(me))
if (acl_fiber_killed(me)) {
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
return -1;
}
}
}
@ -811,10 +817,12 @@ inline ssize_t fiber_sendto(int sockfd, const void *buf, size_t len, int flags,
fiber_wait_write(sockfd);
me = acl_fiber_running();
if (acl_fiber_killed(me))
if (acl_fiber_killed(me)) {
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
return -1;
}
}
}
@ -846,10 +854,12 @@ inline ssize_t fiber_sendmsg(int sockfd, const struct msghdr *msg, int flags)
fiber_wait_write(sockfd);
me = acl_fiber_running();
if (acl_fiber_killed(me))
if (acl_fiber_killed(me)) {
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
return -1;
}
}
}

View File

@ -1,3 +1,7 @@
39) 2017.1.12
39.1) bugfix: 协程在双通模式下,如果写时堵塞会导致读也堵塞,是因为延迟关闭
及事件设置不合理
38) 2017.1.5
38.1) bugfix: fiber.c 中函数 acl_fiber_killed 中的判断有误

View File

@ -50,7 +50,7 @@ static void echo_client(ACL_FIBER *fiber acl_unused, void *ctx)
while (1) {
if (__rw_timeout > 0) {
ret = check_read(*cfd, 10000);
ret = check_read(*cfd, 1000000);
if (ret < 0)
break;
if (ret == 0)