diff --git a/lib_acl/include/stdlib/acl_define_unix.h b/lib_acl/include/stdlib/acl_define_unix.h index 7a4700098..c12137c2a 100644 --- a/lib_acl/include/stdlib/acl_define_unix.h +++ b/lib_acl/include/stdlib/acl_define_unix.h @@ -54,6 +54,7 @@ # define ACL_API # define ACL_ETIMEDOUT ETIMEDOUT +# define ACL_ETIME ETIME # define ACL_ENOMEM ENOMEM # define ACL_EINVAL EINVAL diff --git a/lib_acl/include/stdlib/acl_define_win32.h b/lib_acl/include/stdlib/acl_define_win32.h index 0cebe3b8e..2f0c36652 100644 --- a/lib_acl/include/stdlib/acl_define_win32.h +++ b/lib_acl/include/stdlib/acl_define_win32.h @@ -124,6 +124,7 @@ /* errno define */ #if defined(_WIN32) || defined(_WIN64) # define ACL_ETIMEDOUT WSAETIMEDOUT +# define ACL_ETIME WSAETIMEDOUT # define ACL_ENOMEM WSAENOBUFS # define ACL_EINVAL WSAEINVAL diff --git a/lib_acl/src/stdlib/iostuff/acl_read_wait.c b/lib_acl/src/stdlib/iostuff/acl_read_wait.c index 0d720d84c..9b0e66a5d 100644 --- a/lib_acl/src/stdlib/iostuff/acl_read_wait.c +++ b/lib_acl/src/stdlib/iostuff/acl_read_wait.c @@ -230,7 +230,7 @@ int acl_read_epoll_wait(ACL_SOCKET fd, int delay) if (delay == 0) { acl_set_error(ACL_EAGAIN); } else { - acl_set_error(ACL_ETIMEDOUT); + acl_set_error(ACL_ETIME); } ret = -1; break; diff --git a/lib_fiber/c/include/fiber/fiber_define.h b/lib_fiber/c/include/fiber/fiber_define.h index a7d6ee454..14dba4f44 100644 --- a/lib_fiber/c/include/fiber/fiber_define.h +++ b/lib_fiber/c/include/fiber/fiber_define.h @@ -27,6 +27,7 @@ typedef SOCKET socket_t; typedef int socklen_t; # define FIBER_ETIMEDOUT WSAETIMEDOUT +# define FIBER_ETIME WSAETIMEDOUT # define FIBER_ENOMEM WSAENOBUFS # define FIBER_EINVAL WSAEINVAL # define FIBER_ECONNREFUSED WSAECONNREFUSED @@ -59,6 +60,7 @@ typedef int socklen_t; typedef int socket_t; # define FIBER_ETIMEDOUT ETIMEDOUT +# define FIBER_ETIME ETIME # define FIBER_ENOMEM ENOMEM # define FIBER_EINVAL EINVAL # define FIBER_ECONNREFUSED ECONNREFUSED diff --git a/lib_fiber/c/src/common/read_wait.c b/lib_fiber/c/src/common/read_wait.c index ebc4b3b45..93ba46e63 100644 --- a/lib_fiber/c/src/common/read_wait.c +++ b/lib_fiber/c/src/common/read_wait.c @@ -23,7 +23,7 @@ int read_wait(socket_t fd, int delay) } return -1; case 0: - acl_fiber_set_error(FIBER_ETIMEDOUT); + acl_fiber_set_error(FIBER_ETIME); return -1; default: if ((fds.revents & POLLIN)) { diff --git a/lib_fiber/c/src/event/event_io_uring.c b/lib_fiber/c/src/event/event_io_uring.c index 6b1a8fb45..663a54ec4 100644 --- a/lib_fiber/c/src/event/event_io_uring.c +++ b/lib_fiber/c/src/event/event_io_uring.c @@ -23,6 +23,13 @@ static void event_uring_free(EVENT *ev) mem_free(ep); } +#define TRY_SUBMMIT(e) do { \ + if (++(e)->appending >= (e)->sqe_size) { \ + (e)->appending = 0; \ + io_uring_submit(&(e)->ring); \ + } \ +} while (0) + static void add_read_wait(EVENT_URING *ep, FILE_EVENT *fe, int tmo_ms) { struct io_uring_sqe *sqe; @@ -31,12 +38,9 @@ static void add_read_wait(EVENT_URING *ep, FILE_EVENT *fe, int tmo_ms) io_uring_prep_poll_add(sqe, fe->fd, POLLIN | POLLHUP | POLLERR); io_uring_sqe_set_data(sqe, fe); sqe->flags = IOSQE_IO_LINK; - if (++ep->appending >= ep->sqe_size) { - ep->appending = 0; - io_uring_submit(&ep->ring); - } file_event_refer(fe); + TRY_SUBMMIT(ep); fe->rts.tv_sec = tmo_ms / 1000; fe->rts.tv_nsec = (((long long) tmo_ms) % 1000) * 1000000; @@ -46,11 +50,7 @@ static void add_read_wait(EVENT_URING *ep, FILE_EVENT *fe, int tmo_ms) io_uring_sqe_set_data(sqe, fe); file_event_refer(fe); - - if (++ep->appending >= ep->sqe_size) { - ep->appending = 0; - io_uring_submit(&ep->ring); - } + TRY_SUBMMIT(ep); } static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe) @@ -72,18 +72,14 @@ static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe) (struct sockaddr*) &fe->peer_addr, (socklen_t*) &fe->addr_len, 0); io_uring_sqe_set_data(sqe, fe); - if (++ep->appending >= ep->sqe_size) { - ep->appending = 0; - io_uring_submit(&ep->ring); - } + + TRY_SUBMMIT(ep); } else { struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); io_uring_prep_read(sqe, fe->fd, fe->rbuf, fe->rsize, 0); io_uring_sqe_set_data(sqe, fe); - if (++ep->appending >= ep->sqe_size) { - ep->appending = 0; - io_uring_submit(&ep->ring); - } + + TRY_SUBMMIT(ep); } return 0; @@ -137,6 +133,60 @@ static int event_uring_del_write(EVENT_URING *ep UNUSED, FILE_EVENT *fe) return 0; } +static void handle_read(EVENT *ev, FILE_EVENT *fe, int res) +{ + if (fe->mask & EVENT_ACCEPT) { + fe->iocp_sock = res; + } else if (fe->mask & EVENT_POLLIN) { + if (res == -ETIME) { + printf("fd=%d timeout, fe=%p\r\n", fe->fd, fe); + file_event_unrefer(fe); + return; + } else if (res == -ECANCELED) { + printf("fd=%d canceled, fe=%p\n", fe->fd, fe); + file_event_unrefer(fe); + return; + } else if (res & POLLIN) { + fe->mask &= ~EVENT_POLLIN; + CLR_READWAIT(fe); + } else { + printf("unknown res=%d, fd=%d\n", res, fe->fd); + } + } else { + fe->rlen = res; + } + + fe->mask &= ~EVENT_READ; + fe->r_proc(ev, fe); +} + +static void handle_write(EVENT *ev, FILE_EVENT *fe, int res) +{ + if (fe->mask & EVENT_CONNECT) { + fe->iocp_sock = res; + } else if (fe->mask & EVENT_POLLOUT) { + if (res == -ETIME) { + printf("fd=%d timeout, fe=%p\r\n", fe->fd, fe); + file_event_unrefer(fe); + return; + } else if (res == -ECANCELED) { + printf("fd=%d canceled, fe=%p\n", fe->fd, fe); + file_event_unrefer(fe); + return; + } else if (res & POLLIN) { + fe->mask &= ~EVENT_POLLOUT; + CLR_WRITEWAIT(fe); + } else { + printf("unknown res=%d, fd=%d\n", res, fe->fd); + } + } else { + fe->wlen = res; + } + + fe->mask &= ~EVENT_WRITE; + fe->w_proc(ev, fe); +} + static int event_uring_wait(EVENT *ev, int timeout) { EVENT_URING *ep = (EVENT_URING*) ev; @@ -157,8 +207,6 @@ static int event_uring_wait(EVENT *ev, int timeout) } if (ep->appending > 0) { - printf("\r\n>>>>fid=%d, submit append=%d\r\n", - acl_fiber_self(), (int) ep->appending); ep->appending = 0; io_uring_submit(&ep->ring); } @@ -197,43 +245,17 @@ static int event_uring_wait(EVENT *ev, int timeout) return -1; } - usleep(1000000); + //usleep(100000); if ((fe->mask & EVENT_READ) && fe->r_proc) { - fe->mask &= ~EVENT_READ; - if (fe->mask & EVENT_ACCEPT) { - fe->iocp_sock = res; - } else if (fe->mask & EVENT_POLLIN) { - if (res == -ETIME) { - printf("fd read timeout=%d\r\n", fe->fd); - fe->rlen = -1; - file_event_unrefer(fe); - } else if (res == -ECANCELED) { - printf("fd=%d canceled\n", fe->fd); - file_event_unrefer(fe); - continue; - } else if (res & POLLIN) { - } - } else { - fe->rlen = res; - } - - fe->r_proc(ev, fe); + handle_read(ev, fe, res); } if ((fe->mask & EVENT_WRITE) && fe->w_proc) { - fe->mask &= ~EVENT_WRITE; - if (fe->mask & EVENT_CONNECT) { - fe->iocp_sock = res; - } else { - fe->wlen = res; - } - - fe->w_proc(ev, fe); + handle_write(ev, fe, res); } } - printf(">>>fid=%d, handle count=%d<<<<\n\n", acl_fiber_self(), count); return count; } diff --git a/lib_fiber/c/src/fiber_cond.c b/lib_fiber/c/src/fiber_cond.c index 3b5c3e25a..af764605b 100644 --- a/lib_fiber/c/src/fiber_cond.c +++ b/lib_fiber/c/src/fiber_cond.c @@ -144,7 +144,7 @@ int acl_fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_EVENT *event, msg_fatal("%s(%d), %s: wait event error", __FILE__, __LINE__, __FUNCTION__); } - return ETIMEDOUT; + return FIBER_ETIME; } __ll_lock(cond); diff --git a/lib_fiber/c/src/hook/epoll.c b/lib_fiber/c/src/hook/epoll.c index 1d68a3716..cfc23fe5d 100644 --- a/lib_fiber/c/src/hook/epoll.c +++ b/lib_fiber/c/src/hook/epoll.c @@ -652,7 +652,7 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) now = event_get_stamp(ev); if (ee->expire > 0 && now >= ee->expire) { - acl_fiber_set_error(FIBER_ETIMEDOUT); + acl_fiber_set_error(FIBER_ETIME); break; } } diff --git a/lib_fiber/c/src/hook/poll.c b/lib_fiber/c/src/hook/poll.c index c64dec09f..dc526cb49 100644 --- a/lib_fiber/c/src/hook/poll.c +++ b/lib_fiber/c/src/hook/poll.c @@ -385,7 +385,7 @@ int WINAPI acl_fiber_poll(struct pollfd *fds, nfds_t nfds, int timeout) now = event_get_stamp(ev); if (pe->expire > 0 && now >= pe->expire) { - acl_fiber_set_error(FIBER_ETIMEDOUT); + acl_fiber_set_error(FIBER_ETIME); break; } }