diff --git a/lib_fiber/c/src/event/event_io_uring.c b/lib_fiber/c/src/event/event_io_uring.c index 5ce54fa3a..4834b851d 100644 --- a/lib_fiber/c/src/event/event_io_uring.c +++ b/lib_fiber/c/src/event/event_io_uring.c @@ -5,7 +5,6 @@ #include #include -#include "../common/queue.h" #include "event.h" #include "event_io_uring.h" @@ -14,40 +13,16 @@ typedef struct EVENT_URING { struct io_uring ring; size_t sqe_size; size_t appending; - - QUEUE *queue; - pthread_t waiter; } EVENT_URING; -static void *submit_waiter(void *ctx) -{ - EVENT_URING *ep = (EVENT_URING*) ctx; - while (1) { - if (queue_pop(ep->queue) == NULL) { - printf("Got NULL and over now!\r\n"); - break; - } - io_uring_submit(&ep->ring); - } - - queue_push(ep->queue, NULL); - return NULL; -} - static void event_uring_free(EVENT *ev) { EVENT_URING *ep = (EVENT_URING*) ev; io_uring_queue_exit(&ep->ring); - queue_push(ep->queue, NULL); - queue_pop(ep->queue); - queue_free(ep->queue, NULL); - mem_free(ep); } -#if 1 - #define TRY_SUBMMIT(e) do { \ if (++(e)->appending >= (e)->sqe_size) { \ (e)->appending = 0; \ @@ -60,22 +35,6 @@ static void event_uring_free(EVENT *ev) io_uring_submit(&(e)->ring); \ } while (0) -#else - -#define TRY_SUBMMIT(e) do { \ - if (++(e)->appending >= (e)->sqe_size) { \ - (e)->appending = 0; \ - queue_push((e)->queue, (e)); \ - } \ -} while (0) - -#define SUBMMIT(e) do { \ - (e)->appending = 0; \ - queue_push((e)->queue, (e)); \ -} while (0) - -#endif - static void add_read_wait(EVENT_URING *ep, FILE_EVENT *fe, int tmo_ms) { struct io_uring_sqe *sqe; @@ -104,24 +63,22 @@ static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe) fe->mask |= EVENT_READ; - if (fe->mask & EVENT_POLLIN) { + if (LIKELY(!(fe->mask & (EVENT_POLLIN | EVENT_ACCEPT)))) { + struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); + io_uring_prep_read(sqe, fe->fd, fe->rbuf, fe->rsize, fe->off); + io_uring_sqe_set_data(sqe, fe); + + TRY_SUBMMIT(ep); + } else if (fe->mask & EVENT_POLLIN) { add_read_wait(ep, fe, fe->r_timeout); } else if (fe->mask & EVENT_ACCEPT) { struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); - assert(sqe); - fe->var.peer.len = (socklen_t) sizeof(fe->var.peer.addr); io_uring_prep_accept(sqe, fe->fd, (struct sockaddr*) &fe->var.peer.addr, (socklen_t*) &fe->var.peer.len, 0); io_uring_sqe_set_data(sqe, fe); - TRY_SUBMMIT(ep); - } else { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); - io_uring_prep_read(sqe, fe->fd, fe->rbuf, fe->rsize, fe->off); - io_uring_sqe_set_data(sqe, fe); - TRY_SUBMMIT(ep); } @@ -156,7 +113,13 @@ static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe) fe->mask |= EVENT_WRITE; - if (fe->mask & EVENT_POLLOUT) { + if (LIKELY(!(fe->mask & (EVENT_POLLOUT | EVENT_CONNECT)))) { + struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); + io_uring_prep_write(sqe, fe->fd, fe->wbuf, fe->wsize, fe->off); + io_uring_sqe_set_data(sqe, fe); + + TRY_SUBMMIT(ep); + } else if (fe->mask & EVENT_POLLOUT) { add_write_wait(ep, fe, fe->r_timeout); } else if (fe->mask & EVENT_CONNECT) { non_blocking(fe->fd, 1); @@ -166,12 +129,6 @@ static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe) (socklen_t) fe->var.peer.len); io_uring_sqe_set_data(sqe, fe); - TRY_SUBMMIT(ep); - } else { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); - io_uring_prep_write(sqe, fe->fd, fe->wbuf, fe->wsize, fe->off); - io_uring_sqe_set_data(sqe, fe); - TRY_SUBMMIT(ep); } @@ -305,7 +262,7 @@ static int event_uring_del_write(EVENT_URING *ep UNUSED, FILE_EVENT *fe) static void handle_read(EVENT *ev, FILE_EVENT *fe, int res) { - if (!(fe->mask & (EVENT_ACCEPT | EVENT_POLLIN))) { + if (LIKELY(!(fe->mask & (EVENT_ACCEPT | EVENT_POLLIN)))) { fe->rlen = res; if ((fe->type & TYPE_FILE) && res > 0) { fe->off += res; @@ -338,7 +295,7 @@ static void handle_read(EVENT *ev, FILE_EVENT *fe, int res) static void handle_write(EVENT *ev, FILE_EVENT *fe, int res) { - if (!(fe->mask & (EVENT_CONNECT | EVENT_POLLOUT))) { + if (LIKELY(!(fe->mask & (EVENT_CONNECT | EVENT_POLLOUT)))) { fe->wlen = res; if ((fe->type & TYPE_FILE) && res > 0) { fe->off += res; @@ -403,7 +360,105 @@ static void handle_one(EVENT *ev, FILE_EVENT *fe, int res) } } -static int wait_one(EVENT_URING *ep, int timeout) +static int peek_more(EVENT_URING *ep) +{ +#define PEEK_FOREACH +//#define PEEK_BATCH + +#if defined(PEEK_FOREACH) + + struct io_uring_cqe *cqe; + unsigned head, count = 0; + FILE_EVENT *fe; + int ret; + + io_uring_for_each_cqe(&ep->ring, head, cqe) { + count++; + fe = (FILE_EVENT*) io_uring_cqe_get_data(cqe); + ret = cqe->res; + + if (ret == -ENOBUFS) { + msg_error("%s(%d): ENOBUFS error", __FUNCTION__, __LINE__); + return -1; + } + + if (ret == -ETIME || ret == -ECANCELED || fe == NULL) { + continue; + } + + handle_one((EVENT*) ep, fe, ret); + } + + if (count > 0) { + io_uring_cq_advance(&ep->ring, count); + } + + return count; + +#elif defined(PEEK_BATCH) + +#define PEEK_MAX 100 + + struct io_uring_cqe *cqes[PEEK_MAX + 1]; + FILE_EVENT *fe; + unsigned n, i; + int ret, cnt = 0; + + while ((n = io_uring_peek_batch_cqe(&ep->ring, cqes, PEEK_MAX)) > 0) { + for (i = 0; i < n; i++) { + ret = cqes[i]->res; + fe = (FILE_EVENT*) io_uring_cqe_get_data(cqes[i]); + + if (ret == -ENOBUFS) { + msg_error("%s(%d): ENOBUFS error", + __FUNCTION__, __LINE__); + return -1; + } + + if (ret == -ETIME || ret == -ECANCELED || fe == NULL) { + continue; + } + + handle_one((EVENT*) ep, fe, ret); + } + + io_uring_cq_advance(&ep->ring, n); + cnt += n; + } + + return cnt; + +#else + + int cnt = 0, ret; + struct io_uring_cqe *cqe; + FILE_EVENT *fe; + + while (1) { + ret = io_uring_peek_cqe(&ep->ring, &cqe); + if (ret == -EAGAIN) { + break; + } + + ret = cqe->res; + fe = (FILE_EVENT*) io_uring_cqe_get_data(cqe); + io_uring_cqe_seen(&ep->ring, cqe); + + if (ret == -ETIME || ret == -ECANCELED || fe == NULL) { + continue; + } else if (ret < 0) { + return -1; + } + + handle_one((EVENT*) ep, fe, ret); + cnt++; + } + return cnt; + +#endif +} + +static int submit_and_wait(EVENT_URING *ep, int timeout) { struct __kernel_timespec ts, *tp; struct io_uring_cqe *cqe; @@ -420,7 +475,13 @@ static int wait_one(EVENT_URING *ep, int timeout) tp = NULL; } - ret = io_uring_wait_cqes(&ep->ring, &cqe, 1, tp, NULL); + if (ep->appending > 0) { + ep->appending = 0; \ + ret = io_uring_submit_and_wait_timeout(&ep->ring, &cqe, 1, tp, NULL); + } else { + ret = io_uring_wait_cqes(&ep->ring, &cqe, 1, tp, NULL); + } + if (ret < 0) { if (ret == -ETIME) { return 0; @@ -451,105 +512,28 @@ static int wait_one(EVENT_URING *ep, int timeout) return 1; } -static int peek_more(EVENT_URING *ep) -{ -#define PEEK_BATCH - -#ifdef PEEK_BATCH -#define PEEK_MAX 100 - struct io_uring_cqe *cqes[PEEK_MAX + 1]; - FILE_EVENT *fe; - unsigned n, i; - int ret, cnt = 0; - - while ((n = io_uring_peek_batch_cqe(&ep->ring, cqes, PEEK_MAX)) > 0) { - for (i = 0; i < n; i++) { - ret = cqes[i]->res; - fe = (FILE_EVENT*) io_uring_cqe_get_data(cqes[i]); - - if (ret == -ENOBUFS) { - msg_error("%s(%d): ENOBUFS error", - __FUNCTION__, __LINE__); - return -1; - } - - if (ret == -ETIME || ret == -ECANCELED || fe == NULL) { - continue; - } - - handle_one((EVENT*) ep, fe, ret); - } - - io_uring_cq_advance(&ep->ring, n); - cnt += n; - } - - return cnt; -#else - int cnt = 0, ret; - struct io_uring_cqe *cqe; - FILE_EVENT *fe; - - while (1) { - ret = io_uring_peek_cqe(&ep->ring, &cqe); - if (ret == -EAGAIN) { - break; - } - - ret = cqe->res; - fe = (FILE_EVENT*) io_uring_cqe_get_data(cqe); - io_uring_cqe_seen(&ep->ring, cqe); - - if (ret == -ETIME || ret == -ECANCELED || fe == NULL) { - continue; - } else if (ret < 0) { - return -1; - } - - handle_one((EVENT*) ep, fe, ret); - cnt++; - } - return cnt; -#endif -} - static int event_uring_wait(EVENT *ev, int timeout) { EVENT_URING *ep = (EVENT_URING*) ev; int ret, count = 0; - if (ep->appending > 0) { - SUBMMIT(ep); + ret = submit_and_wait(ep, timeout); + if (ret == 0) { + return 0; + } else if (ret < 0) { + return -1; + } + count += ret; + + ret = peek_more(ep); + if (ret == 0) { + return count; + } else if (ret < 0) { + return -1; } - while (1) { -#if 1 - if (count > 0) { - ret = peek_more(ep); - if (ret == 0) { - break; - } else if (ret < 0) { - return -1; - } - count += ret; - break; - } else { - ret = wait_one(ep, timeout); - if (ret == 0) { - break; - } else if (ret < 0) { - return -1; - } - count += ret; - } -#else - count = peek_more(ep); - if (count <= 0) { - break; - } -#endif - //usleep(10000); - } + count += ret; + //usleep(10000); return count; } @@ -585,6 +569,8 @@ EVENT *event_io_uring_create(int size) //eu->sqe_size = 256; memset(¶ms, 0, sizeof(params)); + //params.flags = IORING_SETUP_SQPOLL; + ret = io_uring_queue_init_params(eu->sqe_size, &eu->ring, ¶ms); if (ret < 0) { msg_fatal("%s(%d): init io_uring error=%s, size=%zd", @@ -594,9 +580,13 @@ EVENT *event_io_uring_create(int size) __FUNCTION__, __LINE__, eu->sqe_size); } + if (!(params.features & IORING_FEAT_FAST_POLL)) { + msg_info("IORING_FEAT_FAST_POLL not available in the kernel"); + } else { + msg_info("IORING_FEAT_FAST_POLL is available in the kernel"); + } + eu->appending = 0; - eu->queue = queue_new(); - pthread_create(&eu->waiter, NULL, submit_waiter, eu); eu->event.name = event_uring_name; eu->event.handle = (acl_handle_t (*)(EVENT *)) event_uring_handle; diff --git a/lib_fiber/c/src/hook/io.c b/lib_fiber/c/src/hook/io.c index ca06a1621..1fc0a666c 100644 --- a/lib_fiber/c/src/hook/io.c +++ b/lib_fiber/c/src/hook/io.c @@ -637,7 +637,7 @@ ssize_t acl_fiber_recvmsg(socket_t sockfd, struct msghdr *msg, int flags) // flag was set and the fd was in non-block status in order to return imaginary // from connecting process. -#if defined(HAS_IO_URING) +#if defined(HAS_IO_URING_xxx) # define CHECK_SET_NBLOCK(_fd) do { \ if (var_hook_sys_api && !EVENT_IS_IO_URING(fiber_io_event())) { \ FILE_EVENT *fe = fiber_file_get(_fd); \