diff --git a/lib_acl/src/private/private_vstream.c b/lib_acl/src/private/private_vstream.c index 9d201dd82..f56f0a57f 100644 --- a/lib_acl/src/private/private_vstream.c +++ b/lib_acl/src/private/private_vstream.c @@ -416,7 +416,7 @@ TAG_AGAIN: return (n); } -static int __loop_writen(ACL_VSTREAM *stream, const void *vptr, size_t dlen) +static int __loop_writen(ACL_VSTREAM *stream, const void *vptr, ssize_t dlen) { const unsigned char *ptr; int n; @@ -433,6 +433,8 @@ static int __loop_writen(ACL_VSTREAM *stream, const void *vptr, size_t dlen) return (ACL_VSTREAM_EOF); } + assert(n <= dlen); + dlen -= n; ptr += n; } @@ -448,7 +450,7 @@ int private_vstream_writen(ACL_VSTREAM *stream, const void *vptr, size_t dlen) if (private_vstream_fflush(stream) == ACL_VSTREAM_EOF) return (ACL_VSTREAM_EOF); } - return (__loop_writen(stream, vptr, dlen)); + return (__loop_writen(stream, vptr, (ssize_t) dlen)); } int private_vstream_write(ACL_VSTREAM *stream, const void *vptr, size_t dlen) diff --git a/lib_fiber/c/src/event.c b/lib_fiber/c/src/event.c index 842c77a7f..eccc149a7 100644 --- a/lib_fiber/c/src/event.c +++ b/lib_fiber/c/src/event.c @@ -166,6 +166,12 @@ int event_checkfd(EVENT *ev, FILE_EVENT *fe) acl_fiber_set_error(0); return 0; } +#if defined(HAS_IO_URING) + } else if (EVENT_IS_IO_URING(ev)) { + fe->type = TYPE_FILE | TYPE_EVENTABLE; + acl_fiber_set_error(0); + return 1; +#endif } else { fe->type = TYPE_FILE; acl_fiber_set_error(0); @@ -250,20 +256,24 @@ int event_add_read(EVENT *ev, FILE_EVENT *fe, event_proc *proc) return 0; } + fe->r_proc = proc; + if (fe->oper & EVENT_DEL_READ) { fe->oper &= ~EVENT_DEL_READ; } if (!(fe->mask & EVENT_READ)) { + if (EVENT_IS_IO_URING(ev)) { + ev->add_read(ev, fe); + } // we should check the fd's type for the first time. - if (fe->me.parent == &fe->me) { + else if (fe->me.parent == &fe->me) { ring_prepend(&ev->events, &fe->me); } fe->oper |= EVENT_ADD_READ; } - fe->r_proc = proc; return 1; } @@ -293,19 +303,22 @@ int event_add_write(EVENT *ev, FILE_EVENT *fe, event_proc *proc) return 0; } + fe->w_proc = proc; + if (fe->oper & EVENT_DEL_WRITE) { fe->oper &= ~EVENT_DEL_WRITE; } if (!(fe->mask & EVENT_WRITE)) { - if (fe->me.parent == &fe->me) { + if (EVENT_IS_IO_URING(ev)) { + ev->add_write(ev, fe); + } else if (fe->me.parent == &fe->me) { ring_prepend(&ev->events, &fe->me); } fe->oper |= EVENT_ADD_WRITE; } - fe->w_proc = proc; return 1; } diff --git a/lib_fiber/c/src/event.h b/lib_fiber/c/src/event.h index fc54b80f7..e9106e33e 100644 --- a/lib_fiber/c/src/event.h +++ b/lib_fiber/c/src/event.h @@ -60,6 +60,15 @@ typedef void epoll_proc(EVENT *ev, EPOLL_EVENT *ee); typedef struct IOCP_EVENT IOCP_EVENT; #endif +#ifdef HAS_IO_URING +typedef struct IO_URING_CTX { + FILE_EVENT *fe; + int res; + int cnt; + unsigned mask; +} IO_URING_CTX; +#endif + /** * for each connection fd */ @@ -238,8 +247,6 @@ struct FILE_EVENT { } sendmsg_ctx; } out; - int res; - union { struct { struct sockaddr_in addr; @@ -250,6 +257,8 @@ struct FILE_EVENT { char *path; } var; + struct IO_URING_CTX reader_ctx; + struct IO_URING_CTX writer_ctx; struct __kernel_timespec rts; struct __kernel_timespec wts; int r_timeout; diff --git a/lib_fiber/c/src/event/event_io_uring.c b/lib_fiber/c/src/event/event_io_uring.c index 24e3e4a2a..8018e450b 100644 --- a/lib_fiber/c/src/event/event_io_uring.c +++ b/lib_fiber/c/src/event/event_io_uring.c @@ -13,6 +13,7 @@ typedef struct EVENT_URING { struct io_uring ring; size_t sqe_size; size_t appending; + size_t loop_count; } EVENT_URING; static void event_uring_free(EVENT *ev) @@ -41,7 +42,8 @@ static void add_read_wait(EVENT_URING *ep, FILE_EVENT *fe, int tmo_ms) sqe = io_uring_get_sqe(&ep->ring); io_uring_prep_poll_add(sqe, fe->fd, POLLIN | POLLHUP | POLLERR); - io_uring_sqe_set_data(sqe, fe); + fe->reader_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->reader_ctx); sqe->flags = IOSQE_IO_LINK; TRY_SUBMMIT(ep); @@ -62,6 +64,7 @@ static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe) } fe->mask |= EVENT_READ; + fe->reader_ctx.mask = EVENT_READ; if (LIKELY(!(fe->mask & (EVENT_POLLIN | EVENT_ACCEPT)))) { struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); @@ -95,7 +98,9 @@ static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe) fe->in.read_ctx.len, fe->in.read_ctx.off); } - io_uring_sqe_set_data(sqe, fe); + + fe->reader_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->reader_ctx); TRY_SUBMMIT(ep); } else if (fe->mask & EVENT_POLLIN) { @@ -106,7 +111,9 @@ static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe) io_uring_prep_accept(sqe, fe->fd, (struct sockaddr*) &fe->var.peer.addr, (socklen_t*) &fe->var.peer.len, 0); - io_uring_sqe_set_data(sqe, fe); + + fe->reader_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->reader_ctx); TRY_SUBMMIT(ep); } @@ -120,7 +127,8 @@ static void add_write_wait(EVENT_URING *ep, FILE_EVENT *fe, int tmo_ms) sqe = io_uring_get_sqe(&ep->ring); io_uring_prep_poll_add(sqe, fe->fd, POLLOUT | POLLHUP | POLLERR); - io_uring_sqe_set_data(sqe, fe); + fe->writer_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->writer_ctx); sqe->flags = IOSQE_IO_LINK; TRY_SUBMMIT(ep); @@ -141,6 +149,8 @@ static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe) } fe->mask |= EVENT_WRITE; + fe->writer_ctx.mask = EVENT_WRITE; + fe->writer_ctx.cnt++; if (LIKELY(!(fe->mask & (EVENT_POLLOUT | EVENT_CONNECT)))) { struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); @@ -174,7 +184,9 @@ static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe) fe->out.write_ctx.len, fe->out.write_ctx.off); } - io_uring_sqe_set_data(sqe, fe); + + fe->writer_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->writer_ctx); TRY_SUBMMIT(ep); } else if (fe->mask & EVENT_POLLOUT) { @@ -185,7 +197,9 @@ static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe) io_uring_prep_connect(sqe, fe->fd, (struct sockaddr*) &fe->var.peer.addr, (socklen_t) fe->var.peer.len); - io_uring_sqe_set_data(sqe, fe); + + fe->writer_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->writer_ctx); TRY_SUBMMIT(ep); } @@ -199,7 +213,8 @@ void event_uring_file_close(EVENT *ev, FILE_EVENT *fe) struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); io_uring_prep_close(sqe, fe->fd); - io_uring_sqe_set_data(sqe, fe); + fe->reader_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->reader_ctx); TRY_SUBMMIT(ep); } @@ -210,7 +225,8 @@ void event_uring_file_openat(EVENT *ev, FILE_EVENT *fe, int dirfd, struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); io_uring_prep_openat(sqe, dirfd, pathname, flags, mode); - io_uring_sqe_set_data(sqe, fe); + fe->reader_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->reader_ctx); TRY_SUBMMIT(ep); } @@ -220,7 +236,8 @@ void event_uring_file_unlink(EVENT *ev, FILE_EVENT *fe, const char *pathname) struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); io_uring_prep_unlink(sqe, pathname, 0); - io_uring_sqe_set_data(sqe, fe); + fe->reader_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->reader_ctx); TRY_SUBMMIT(ep); } @@ -232,7 +249,8 @@ void event_uring_file_statx(EVENT *ev, FILE_EVENT *fe, int dirfd, struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); io_uring_prep_statx(sqe, dirfd, pathname, flags, mask, statxbuf); - io_uring_sqe_set_data(sqe, fe); + fe->reader_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->reader_ctx); TRY_SUBMMIT(ep); } @@ -243,7 +261,8 @@ void event_uring_file_renameat2(EVENT *ev, FILE_EVENT *fe, int olddirfd, struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); io_uring_prep_renameat(sqe, olddirfd, oldpath, newdirfd, newpath, flags); - io_uring_sqe_set_data(sqe, fe); + fe->reader_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->reader_ctx); TRY_SUBMMIT(ep); } @@ -254,7 +273,8 @@ void event_uring_mkdirat(EVENT *ev, FILE_EVENT *fe, int dirfd, struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); io_uring_prep_mkdirat(sqe, dirfd, pathname, mode); - io_uring_sqe_set_data(sqe, fe); + fe->reader_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->reader_ctx); TRY_SUBMMIT(ep); } @@ -267,7 +287,8 @@ void event_uring_splice(EVENT *ev, FILE_EVENT *fe, int fd_in, loff_t off_in, io_uring_prep_splice(sqe, fd_in, off_in, fd_out, off_out, len, splice_flags); - io_uring_sqe_set_data(sqe, fe); + fe->reader_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->reader_ctx); sqe->flags |= sqe_flags; sqe->opcode = opcode; TRY_SUBMMIT(ep); @@ -284,7 +305,8 @@ void event_uring_sendfile(EVENT *ev, FILE_EVENT *fe, int out, int in, unsigned flags = SPLICE_F_MOVE | SPLICE_F_MORE; // | SPLICE_F_NONBLOCK; io_uring_prep_splice(sqe, in, off, fe->var.pipefd[1], -1, cnt, flags); - io_uring_sqe_set_data(sqe, fe); + fe->reader_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->reader_ctx); sqe->flags |= IOSQE_IO_LINK | SPLICE_F_FD_IN_FIXED | IOSQE_ASYNC; sqe->opcode = IORING_OP_SPLICE; @@ -293,7 +315,8 @@ void event_uring_sendfile(EVENT *ev, FILE_EVENT *fe, int out, int in, flags = 0; sqe = io_uring_get_sqe(&ep->ring); io_uring_prep_splice(sqe, fe->var.pipefd[0], -1, out, -1, cnt, flags); - io_uring_sqe_set_data(sqe, fe); + fe->reader_ctx.fe = fe; + io_uring_sqe_set_data(sqe, &fe->reader_ctx); sqe->opcode = IORING_OP_SPLICE; TRY_SUBMMIT(ep); @@ -324,7 +347,7 @@ static int event_uring_del_write(EVENT_URING *ep UNUSED, FILE_EVENT *fe) static void handle_read(EVENT *ev, FILE_EVENT *fe, int res) { - fe->res = res; + fe->reader_ctx.res = res; if (LIKELY(!(fe->mask & (EVENT_ACCEPT | EVENT_POLLIN)))) { if ((fe->type & TYPE_FILE) && res > 0) { @@ -335,7 +358,7 @@ static void handle_read(EVENT *ev, FILE_EVENT *fe, int res) } } } else if (fe->mask & EVENT_ACCEPT) { - // fe->res = res; + // Do nothing! } else if (fe->mask & EVENT_POLLIN) { if (res & (POLLIN | ERR)) { if (res & POLLERR) { @@ -363,7 +386,7 @@ static void handle_read(EVENT *ev, FILE_EVENT *fe, int res) static void handle_write(EVENT *ev, FILE_EVENT *fe, int res) { - fe->res = res; + fe->writer_ctx.res = res; if (LIKELY(!(fe->mask & (EVENT_CONNECT | EVENT_POLLOUT)))) { if ((fe->type & TYPE_FILE) && res > 0) { @@ -374,7 +397,7 @@ static void handle_write(EVENT *ev, FILE_EVENT *fe, int res) } } } else if (fe->mask & EVENT_CONNECT) { - //fe->res = res; + // Do nothing! } else if (fe->mask & EVENT_POLLOUT) { if (res & (POLLOUT | ERR)) { if (res & POLLERR) { @@ -402,12 +425,21 @@ static void handle_write(EVENT *ev, FILE_EVENT *fe, int res) } } -static void handle_one(EVENT *ev, FILE_EVENT *fe, int res) +static void handle_one(EVENT *ev, IO_URING_CTX *ctx, int res) { - if ((fe->mask & EVENT_READ) && fe->r_proc) { + FILE_EVENT *fe = ctx->fe; + + if (ctx == &fe->reader_ctx && ctx->mask == EVENT_READ && fe->r_proc) { + fe->mask &= ~EVENT_READ; handle_read(ev, fe, res); + ctx->mask = 0; return; - } else if ((fe->mask & EVENT_WRITE) && fe->w_proc) { + } + + if (ctx == &fe->writer_ctx && ctx->mask == EVENT_WRITE && fe->w_proc) { + fe->writer_ctx.cnt--; + fe->mask &= ~EVENT_WRITE; + ctx->mask = 0; handle_write(ev, fe, res); return; } @@ -424,7 +456,7 @@ static void handle_one(EVENT *ev, FILE_EVENT *fe, int res) | EVENT_DIR_MKDIRAT \ | EVENT_SPLICE) - fe->res = res; + fe->reader_ctx.res = res; if (fe->mask & FLAGS) { fe->r_proc(ev, fe); @@ -436,31 +468,27 @@ static void handle_one(EVENT *ev, FILE_EVENT *fe, int res) static int peek_more(EVENT_URING *ep) { -#define PEEK_FOREACH -//#define PEEK_BATCH - -#if defined(PEEK_FOREACH) - struct io_uring_cqe *cqe; unsigned head, count = 0; - FILE_EVENT *fe; + IO_URING_CTX *ctx; int ret; io_uring_for_each_cqe(&ep->ring, head, cqe) { count++; - fe = (FILE_EVENT*) io_uring_cqe_get_data(cqe); + ctx = (IO_URING_CTX*) io_uring_cqe_get_data(cqe); ret = cqe->res; + //io_uring_cqe_seen(&ep->ring, cqe); if (ret == -ENOBUFS) { msg_error("%s(%d): ENOBUFS error", __FUNCTION__, __LINE__); return -1; } - if (ret == -ETIME || ret == -ECANCELED || fe == NULL) { + if (ret == -ETIME || ret == -ECANCELED || ctx == NULL) { continue; } - handle_one((EVENT*) ep, fe, ret); + handle_one((EVENT*) ep, ctx, ret); } if (count > 0) { @@ -468,75 +496,13 @@ static int peek_more(EVENT_URING *ep) } return count; - -#elif defined(PEEK_BATCH) - -#define PEEK_MAX 100 - - struct io_uring_cqe *cqes[PEEK_MAX + 1]; - FILE_EVENT *fe; - unsigned n, i; - int ret, cnt = 0; - - while ((n = io_uring_peek_batch_cqe(&ep->ring, cqes, PEEK_MAX)) > 0) { - for (i = 0; i < n; i++) { - ret = cqes[i]->res; - fe = (FILE_EVENT*) io_uring_cqe_get_data(cqes[i]); - - if (ret == -ENOBUFS) { - msg_error("%s(%d): ENOBUFS error", - __FUNCTION__, __LINE__); - return -1; - } - - if (ret == -ETIME || ret == -ECANCELED || fe == NULL) { - continue; - } - - handle_one((EVENT*) ep, fe, ret); - } - - io_uring_cq_advance(&ep->ring, n); - cnt += n; - } - - return cnt; - -#else - - int cnt = 0, ret; - struct io_uring_cqe *cqe; - FILE_EVENT *fe; - - while (1) { - ret = io_uring_peek_cqe(&ep->ring, &cqe); - if (ret == -EAGAIN) { - break; - } - - ret = cqe->res; - fe = (FILE_EVENT*) io_uring_cqe_get_data(cqe); - io_uring_cqe_seen(&ep->ring, cqe); - - if (ret == -ETIME || ret == -ECANCELED || fe == NULL) { - continue; - } else if (ret < 0) { - return -1; - } - - handle_one((EVENT*) ep, fe, ret); - cnt++; - } - return cnt; - -#endif } static int submit_and_wait(EVENT_URING *ep, int timeout) { struct __kernel_timespec ts, *tp; struct io_uring_cqe *cqe; - FILE_EVENT *fe; + IO_URING_CTX *ctx; int ret; if (timeout >= 0) { @@ -551,7 +517,8 @@ static int submit_and_wait(EVENT_URING *ep, int timeout) if (ep->appending > 0) { ep->appending = 0; \ - ret = io_uring_submit_and_wait_timeout(&ep->ring, &cqe, 1, tp, NULL); + ret = io_uring_submit_and_wait_timeout(&ep->ring, &cqe, + 1, tp, NULL); } else { ret = io_uring_wait_cqes(&ep->ring, &cqe, 1, tp, NULL); } @@ -569,7 +536,7 @@ static int submit_and_wait(EVENT_URING *ep, int timeout) } ret = cqe->res; - fe = (FILE_EVENT*) io_uring_cqe_get_data(cqe); + ctx = (IO_URING_CTX*) io_uring_cqe_get_data(cqe); io_uring_cqe_seen(&ep->ring, cqe); @@ -578,11 +545,11 @@ static int submit_and_wait(EVENT_URING *ep, int timeout) return -1; } - if (ret == -ETIME || ret == -ECANCELED || fe == NULL) { + if (ret == -ETIME || ret == -ECANCELED || ctx == NULL) { return 1; } - handle_one((EVENT*) ep, fe, ret); + handle_one((EVENT*) ep, ctx, ret); return 1; } @@ -591,6 +558,7 @@ static int event_uring_wait(EVENT *ev, int timeout) EVENT_URING *ep = (EVENT_URING*) ev; int ret, count = 0; + ep->loop_count++; ret = submit_and_wait(ep, timeout); if (ret == 0) { return 0; diff --git a/lib_fiber/c/src/fbase_event.c b/lib_fiber/c/src/fbase_event.c index dcfa71523..2ffa1658e 100644 --- a/lib_fiber/c/src/fbase_event.c +++ b/lib_fiber/c/src/fbase_event.c @@ -21,6 +21,8 @@ #include "common/iostuff.h" #include "fiber.h" +//#undef HAS_EVENTFD + void fbase_event_open(FIBER_BASE *fbase) { #if defined(HAS_EVENTFD) @@ -55,8 +57,10 @@ void fbase_event_open(FIBER_BASE *fbase) __FILE__, __LINE__, __FUNCTION__, (int) fbase->event_in); } - non_blocking(fbase->event_in, 1); - non_blocking(fbase->event_out, 1); + //if (!EVENT_IS_IO_URING(fiber_io_event())) { + non_blocking(fbase->event_in, 1); + non_blocking(fbase->event_out, 1); + //} } void fbase_event_close(FIBER_BASE *fbase) @@ -76,20 +80,27 @@ int fbase_event_wait(FIBER_BASE *fbase) { long long n; int ret, interrupt = 0, err; + FILE_EVENT *fe; if (fbase->event_in == INVALID_SOCKET) { msg_fatal("%s(%d), %s: invalid event_in=%d", __FILE__, __LINE__, __FUNCTION__, (int) fbase->event_in); } + fe = fiber_file_open_read(fbase->event_in); + fe->type = TYPE_SPIPE | TYPE_EVENTABLE; + while (1) { //if (acl_fiber_scheduled() && read_wait(fbase->event_in, -1) == -1) { +#if 1 if (read_wait(fbase->event_in, -1) == -1) { msg_error("%s(%d), %s: read_wait error=%s, fd=%d", __FILE__, __LINE__, __FUNCTION__, last_serror(), fbase->event_in); return -1; } +#endif + #ifdef SYS_WIN ret = (int) acl_fiber_recv(fbase->event_in, (char*) &n, sizeof(n), 0); #else @@ -141,6 +152,7 @@ int fbase_event_wakeup(FIBER_BASE *fbase) { long long n = 1; int ret, interrupt = 0; + FILE_EVENT *fe; /** * if (LIKELY(atomic_int64_cas(fbase->atomic, 0, 1) != 0)) { @@ -154,6 +166,9 @@ int fbase_event_wakeup(FIBER_BASE *fbase) fbase, (int) fbase->event_out); } + fe = fiber_file_open_write(fbase->event_out); + fe->type = TYPE_SPIPE | TYPE_EVENTABLE; + while (1) { #ifdef SYS_WIN ret = (int) acl_fiber_send(fbase->event_out, (char*) &n, sizeof(n), 0); @@ -166,9 +181,11 @@ int fbase_event_wakeup(FIBER_BASE *fbase) } if (ret >= 0) { - msg_fatal("%s(%d), %s: write ret=%d invalid length, " - "interrupt=%d", __FILE__, __LINE__, - __FUNCTION__, ret, interrupt); + msg_fatal("%s(%d), %s: fiber=%d, write ret=%d invalid" + " length, interrupt=%d, fd=%d", + __FILE__, __LINE__, __FUNCTION__, + acl_fiber_self(), ret, interrupt, + (int) fbase->event_out); } if (acl_fiber_last_error() == EINTR) { diff --git a/lib_fiber/c/src/file_event.c b/lib_fiber/c/src/file_event.c index dde66767f..79088d410 100644 --- a/lib_fiber/c/src/file_event.c +++ b/lib_fiber/c/src/file_event.c @@ -26,6 +26,8 @@ void file_event_init(FILE_EVENT *fe, socket_t fd) memset(&fe->in, 0, sizeof(fe->in)); memset(&fe->out, 0, sizeof(fe->out)); memset(&fe->var, 0, sizeof(fe->var)); + memset(&fe->reader_ctx, 0, sizeof(fe->reader_ctx)); + memset(&fe->writer_ctx, 0, sizeof(fe->writer_ctx)); fe->r_timeout = -1; fe->w_timeout = -1; #endif @@ -56,6 +58,7 @@ FILE_EVENT *file_event_alloc(socket_t fd) static void file_event_free(FILE_EVENT *fe) { + memset(fe, 0, sizeof(*fe)); mem_free(fe); } diff --git a/lib_fiber/c/src/hook/file.c b/lib_fiber/c/src/hook/file.c index a474b1ed4..4ba8887c0 100644 --- a/lib_fiber/c/src/hook/file.c +++ b/lib_fiber/c/src/hook/file.c @@ -66,10 +66,10 @@ int file_close(EVENT *ev, FILE_EVENT *fe) fe->mask &= ~EVENT_FILE_CLOSE; - if (fe->res == 0) { + if (fe->reader_ctx.res == 0) { return 0; } else { - acl_fiber_set_error(-fe->res); + acl_fiber_set_error(-fe->reader_ctx.res); return -1; } } @@ -109,14 +109,14 @@ int openat(int dirfd, const char *pathname, int flags, ...) free(fe->var.path); fe->var.path = NULL; - if (fe->res >= 0) { - fe->fd = fe->res; + if (fe->reader_ctx.res >= 0) { + fe->fd = fe->reader_ctx.res; fe->type = TYPE_FILE | TYPE_EVENTABLE; fiber_file_set(fe); // Save the fe for the future using. return fe->fd; } - acl_fiber_set_error(-fe->res); + acl_fiber_set_error(-fe->reader_ctx.res); file_event_unrefer(fe); return -1; } @@ -162,11 +162,11 @@ int unlink(const char *pathname) free(fe->var.path); fe->var.path = NULL; - if (fe->res == 0) { + if (fe->reader_ctx.res == 0) { file_event_unrefer(fe); return 0; } else { - acl_fiber_set_error(-fe->res); + acl_fiber_set_error(-fe->reader_ctx.res); file_event_unrefer(fe); return -1; } @@ -204,11 +204,11 @@ int renameat2(int olddirfd, const char *oldpath, free(fe->in.read_ctx.buf); free(fe->var.path); - if (fe->res == 0) { + if (fe->reader_ctx.res == 0) { file_event_unrefer(fe); return 0; } else { - acl_fiber_set_error(-fe->res); + acl_fiber_set_error(-fe->reader_ctx.res); file_event_unrefer(fe); return -1; } @@ -257,13 +257,13 @@ int statx(int dirfd, const char *pathname, int flags, unsigned int mask, free(fe->in.read_ctx.buf); fe->in.read_ctx.buf = NULL; - if (fe->res == 0) { + if (fe->reader_ctx.res == 0) { memcpy(statxbuf, fe->var.statxbuf, sizeof(struct statx)); free(fe->var.statxbuf); file_event_unrefer(fe); return 0; } else { - acl_fiber_set_error(-fe->res); + acl_fiber_set_error(-fe->reader_ctx.res); free(fe->var.statxbuf); file_event_unrefer(fe); return -1; @@ -324,11 +324,11 @@ int mkdirat(int dirfd, const char *pathname, mode_t mode) fe->mask &= ~EVENT_DIR_MKDIRAT; free(fe->var.path); - if (fe->res == 0) { + if (fe->reader_ctx.res == 0) { file_event_unrefer(fe); return 0; } else { - acl_fiber_set_error(-fe->res); + acl_fiber_set_error(-fe->reader_ctx.res); file_event_unrefer(fe); return -1; } @@ -443,21 +443,21 @@ ssize_t splice(int fd_in, loff_t *poff_in, int fd_out, fe->mask &= ~EVENT_SPLICE; - if (fe->res < 0) { - acl_fiber_set_error(-fe->res); + if (fe->reader_ctx.res < 0) { + acl_fiber_set_error(-fe->reader_ctx.res); file_event_unrefer(fe); return -1; } if (off_in != -1 && poff_in) { - *poff_in += fe->res; + *poff_in += fe->reader_ctx.res; } if (off_out != -1 && poff_out) { - *poff_out += fe->res; + *poff_out += fe->reader_ctx.res; } - ret = fe->res; + ret = fe->reader_ctx.res; file_event_unrefer(fe); return ret; } diff --git a/lib_fiber/c/src/hook/io.c b/lib_fiber/c/src/hook/io.c index 0c21e3028..dfc13b684 100644 --- a/lib_fiber/c/src/hook/io.c +++ b/lib_fiber/c/src/hook/io.c @@ -127,7 +127,7 @@ static int iocp_wait_read(FILE_EVENT *fe) // Must clear the EVENT_READ flags in order to set IO event // for each IO process. fe->mask &= ~EVENT_READ; - fe->res = 0; + fe->reader_ctx.res = 0; if (fiber_wait_read(fe) < 0) { return -1; @@ -144,8 +144,8 @@ static int iocp_wait_read(FILE_EVENT *fe) return -1; } - if (fe->res >= 0) { - return fe->res; + if (fe->reader_ctx.res >= 0) { + return fe->reader_ctx.res; } err = acl_fiber_last_error(); @@ -168,14 +168,14 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len) * which maybe used in iocp_add_read() and set for polling read status. */ if (fe->sock_type == SOCK_DGRAM && fe->rbuf == fe->packet - && fe->res > 0) { + && fe->reader_ctx.res > 0) { - if (fe->res < len) { - len = fe->res; + if (fe->reader_ctx.res < len) { + len = fe->reader_ctx.res; } memcpy(buf, fe->packet, len); fe->rbuf = NULL; - fe->res = 0; + fe->reader_ctx.res = 0; return len; } @@ -530,14 +530,14 @@ static int iocp_wait_write(FILE_EVENT *fe) int err; fe->mask &= ~EVENT_WRITE; - fe->res = 0; + fe->writer_ctx.res = -1; if (wait_write(fe) == -1) { return -1; } - if (fe->res >= 0) { - return fe->res; + if (fe->writer_ctx.res >= 0) { + return fe->writer_ctx.res; } err = acl_fiber_last_error(); diff --git a/lib_fiber/c/src/hook/socket.c b/lib_fiber/c/src/hook/socket.c index 5bc7c0bf7..6d0467bc6 100644 --- a/lib_fiber/c/src/hook/socket.c +++ b/lib_fiber/c/src/hook/socket.c @@ -87,14 +87,14 @@ static socket_t fiber_iocp_accept(FILE_EVENT *fe) { fe->mask &= ~EVENT_READ; fe->mask |= EVENT_ACCEPT; - fe->res = INVALID_SOCKET; + fe->reader_ctx.res = INVALID_SOCKET; if (fiber_wait_read(fe) < 0) { msg_error("%s(%d): fiber_wait_read error=%s, fd=%d", __FUNCTION__, __LINE__, last_serror(), (int) fe->fd); return INVALID_SOCKET; } - return fe->res; + return fe->reader_ctx.res; } #endif @@ -292,8 +292,8 @@ static socket_t fiber_iocp_connect(FILE_EVENT *fe) } fe->mask &= ~EVENT_CONNECT; - if (fe->res < 0) { - acl_fiber_set_error(-fe->res); + if (fe->writer_ctx.res < 0) { + acl_fiber_set_error(-fe->writer_ctx.res); return -1; } return 0;