optimize and test io_uring...

This commit is contained in:
zhengshuxin 2022-10-19 06:45:10 -04:00
parent 7f635e1cd6
commit 9ceca47c9f
2 changed files with 146 additions and 156 deletions

View File

@ -5,7 +5,6 @@
#include <dlfcn.h>
#include <liburing.h>
#include "../common/queue.h"
#include "event.h"
#include "event_io_uring.h"
@ -14,40 +13,16 @@ typedef struct EVENT_URING {
struct io_uring ring;
size_t sqe_size;
size_t appending;
QUEUE *queue;
pthread_t waiter;
} EVENT_URING;
static void *submit_waiter(void *ctx)
{
EVENT_URING *ep = (EVENT_URING*) ctx;
while (1) {
if (queue_pop(ep->queue) == NULL) {
printf("Got NULL and over now!\r\n");
break;
}
io_uring_submit(&ep->ring);
}
queue_push(ep->queue, NULL);
return NULL;
}
static void event_uring_free(EVENT *ev)
{
EVENT_URING *ep = (EVENT_URING*) ev;
io_uring_queue_exit(&ep->ring);
queue_push(ep->queue, NULL);
queue_pop(ep->queue);
queue_free(ep->queue, NULL);
mem_free(ep);
}
#if 1
#define TRY_SUBMMIT(e) do { \
if (++(e)->appending >= (e)->sqe_size) { \
(e)->appending = 0; \
@ -60,22 +35,6 @@ static void event_uring_free(EVENT *ev)
io_uring_submit(&(e)->ring); \
} while (0)
#else
#define TRY_SUBMMIT(e) do { \
if (++(e)->appending >= (e)->sqe_size) { \
(e)->appending = 0; \
queue_push((e)->queue, (e)); \
} \
} while (0)
#define SUBMMIT(e) do { \
(e)->appending = 0; \
queue_push((e)->queue, (e)); \
} while (0)
#endif
static void add_read_wait(EVENT_URING *ep, FILE_EVENT *fe, int tmo_ms)
{
struct io_uring_sqe *sqe;
@ -104,24 +63,22 @@ static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe)
fe->mask |= EVENT_READ;
if (fe->mask & EVENT_POLLIN) {
if (LIKELY(!(fe->mask & (EVENT_POLLIN | EVENT_ACCEPT)))) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_read(sqe, fe->fd, fe->rbuf, fe->rsize, fe->off);
io_uring_sqe_set_data(sqe, fe);
TRY_SUBMMIT(ep);
} else if (fe->mask & EVENT_POLLIN) {
add_read_wait(ep, fe, fe->r_timeout);
} else if (fe->mask & EVENT_ACCEPT) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
assert(sqe);
fe->var.peer.len = (socklen_t) sizeof(fe->var.peer.addr);
io_uring_prep_accept(sqe, fe->fd,
(struct sockaddr*) &fe->var.peer.addr,
(socklen_t*) &fe->var.peer.len, 0);
io_uring_sqe_set_data(sqe, fe);
TRY_SUBMMIT(ep);
} else {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_read(sqe, fe->fd, fe->rbuf, fe->rsize, fe->off);
io_uring_sqe_set_data(sqe, fe);
TRY_SUBMMIT(ep);
}
@ -156,7 +113,13 @@ static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe)
fe->mask |= EVENT_WRITE;
if (fe->mask & EVENT_POLLOUT) {
if (LIKELY(!(fe->mask & (EVENT_POLLOUT | EVENT_CONNECT)))) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_write(sqe, fe->fd, fe->wbuf, fe->wsize, fe->off);
io_uring_sqe_set_data(sqe, fe);
TRY_SUBMMIT(ep);
} else if (fe->mask & EVENT_POLLOUT) {
add_write_wait(ep, fe, fe->r_timeout);
} else if (fe->mask & EVENT_CONNECT) {
non_blocking(fe->fd, 1);
@ -166,12 +129,6 @@ static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe)
(socklen_t) fe->var.peer.len);
io_uring_sqe_set_data(sqe, fe);
TRY_SUBMMIT(ep);
} else {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_write(sqe, fe->fd, fe->wbuf, fe->wsize, fe->off);
io_uring_sqe_set_data(sqe, fe);
TRY_SUBMMIT(ep);
}
@ -305,7 +262,7 @@ static int event_uring_del_write(EVENT_URING *ep UNUSED, FILE_EVENT *fe)
static void handle_read(EVENT *ev, FILE_EVENT *fe, int res)
{
if (!(fe->mask & (EVENT_ACCEPT | EVENT_POLLIN))) {
if (LIKELY(!(fe->mask & (EVENT_ACCEPT | EVENT_POLLIN)))) {
fe->rlen = res;
if ((fe->type & TYPE_FILE) && res > 0) {
fe->off += res;
@ -338,7 +295,7 @@ static void handle_read(EVENT *ev, FILE_EVENT *fe, int res)
static void handle_write(EVENT *ev, FILE_EVENT *fe, int res)
{
if (!(fe->mask & (EVENT_CONNECT | EVENT_POLLOUT))) {
if (LIKELY(!(fe->mask & (EVENT_CONNECT | EVENT_POLLOUT)))) {
fe->wlen = res;
if ((fe->type & TYPE_FILE) && res > 0) {
fe->off += res;
@ -403,7 +360,105 @@ static void handle_one(EVENT *ev, FILE_EVENT *fe, int res)
}
}
static int wait_one(EVENT_URING *ep, int timeout)
static int peek_more(EVENT_URING *ep)
{
#define PEEK_FOREACH
//#define PEEK_BATCH
#if defined(PEEK_FOREACH)
struct io_uring_cqe *cqe;
unsigned head, count = 0;
FILE_EVENT *fe;
int ret;
io_uring_for_each_cqe(&ep->ring, head, cqe) {
count++;
fe = (FILE_EVENT*) io_uring_cqe_get_data(cqe);
ret = cqe->res;
if (ret == -ENOBUFS) {
msg_error("%s(%d): ENOBUFS error", __FUNCTION__, __LINE__);
return -1;
}
if (ret == -ETIME || ret == -ECANCELED || fe == NULL) {
continue;
}
handle_one((EVENT*) ep, fe, ret);
}
if (count > 0) {
io_uring_cq_advance(&ep->ring, count);
}
return count;
#elif defined(PEEK_BATCH)
#define PEEK_MAX 100
struct io_uring_cqe *cqes[PEEK_MAX + 1];
FILE_EVENT *fe;
unsigned n, i;
int ret, cnt = 0;
while ((n = io_uring_peek_batch_cqe(&ep->ring, cqes, PEEK_MAX)) > 0) {
for (i = 0; i < n; i++) {
ret = cqes[i]->res;
fe = (FILE_EVENT*) io_uring_cqe_get_data(cqes[i]);
if (ret == -ENOBUFS) {
msg_error("%s(%d): ENOBUFS error",
__FUNCTION__, __LINE__);
return -1;
}
if (ret == -ETIME || ret == -ECANCELED || fe == NULL) {
continue;
}
handle_one((EVENT*) ep, fe, ret);
}
io_uring_cq_advance(&ep->ring, n);
cnt += n;
}
return cnt;
#else
int cnt = 0, ret;
struct io_uring_cqe *cqe;
FILE_EVENT *fe;
while (1) {
ret = io_uring_peek_cqe(&ep->ring, &cqe);
if (ret == -EAGAIN) {
break;
}
ret = cqe->res;
fe = (FILE_EVENT*) io_uring_cqe_get_data(cqe);
io_uring_cqe_seen(&ep->ring, cqe);
if (ret == -ETIME || ret == -ECANCELED || fe == NULL) {
continue;
} else if (ret < 0) {
return -1;
}
handle_one((EVENT*) ep, fe, ret);
cnt++;
}
return cnt;
#endif
}
static int submit_and_wait(EVENT_URING *ep, int timeout)
{
struct __kernel_timespec ts, *tp;
struct io_uring_cqe *cqe;
@ -420,7 +475,13 @@ static int wait_one(EVENT_URING *ep, int timeout)
tp = NULL;
}
ret = io_uring_wait_cqes(&ep->ring, &cqe, 1, tp, NULL);
if (ep->appending > 0) {
ep->appending = 0; \
ret = io_uring_submit_and_wait_timeout(&ep->ring, &cqe, 1, tp, NULL);
} else {
ret = io_uring_wait_cqes(&ep->ring, &cqe, 1, tp, NULL);
}
if (ret < 0) {
if (ret == -ETIME) {
return 0;
@ -451,105 +512,28 @@ static int wait_one(EVENT_URING *ep, int timeout)
return 1;
}
static int peek_more(EVENT_URING *ep)
{
#define PEEK_BATCH
#ifdef PEEK_BATCH
#define PEEK_MAX 100
struct io_uring_cqe *cqes[PEEK_MAX + 1];
FILE_EVENT *fe;
unsigned n, i;
int ret, cnt = 0;
while ((n = io_uring_peek_batch_cqe(&ep->ring, cqes, PEEK_MAX)) > 0) {
for (i = 0; i < n; i++) {
ret = cqes[i]->res;
fe = (FILE_EVENT*) io_uring_cqe_get_data(cqes[i]);
if (ret == -ENOBUFS) {
msg_error("%s(%d): ENOBUFS error",
__FUNCTION__, __LINE__);
return -1;
}
if (ret == -ETIME || ret == -ECANCELED || fe == NULL) {
continue;
}
handle_one((EVENT*) ep, fe, ret);
}
io_uring_cq_advance(&ep->ring, n);
cnt += n;
}
return cnt;
#else
int cnt = 0, ret;
struct io_uring_cqe *cqe;
FILE_EVENT *fe;
while (1) {
ret = io_uring_peek_cqe(&ep->ring, &cqe);
if (ret == -EAGAIN) {
break;
}
ret = cqe->res;
fe = (FILE_EVENT*) io_uring_cqe_get_data(cqe);
io_uring_cqe_seen(&ep->ring, cqe);
if (ret == -ETIME || ret == -ECANCELED || fe == NULL) {
continue;
} else if (ret < 0) {
return -1;
}
handle_one((EVENT*) ep, fe, ret);
cnt++;
}
return cnt;
#endif
}
static int event_uring_wait(EVENT *ev, int timeout)
{
EVENT_URING *ep = (EVENT_URING*) ev;
int ret, count = 0;
if (ep->appending > 0) {
SUBMMIT(ep);
ret = submit_and_wait(ep, timeout);
if (ret == 0) {
return 0;
} else if (ret < 0) {
return -1;
}
count += ret;
ret = peek_more(ep);
if (ret == 0) {
return count;
} else if (ret < 0) {
return -1;
}
while (1) {
#if 1
if (count > 0) {
ret = peek_more(ep);
if (ret == 0) {
break;
} else if (ret < 0) {
return -1;
}
count += ret;
break;
} else {
ret = wait_one(ep, timeout);
if (ret == 0) {
break;
} else if (ret < 0) {
return -1;
}
count += ret;
}
#else
count = peek_more(ep);
if (count <= 0) {
break;
}
#endif
//usleep(10000);
}
count += ret;
//usleep(10000);
return count;
}
@ -585,6 +569,8 @@ EVENT *event_io_uring_create(int size)
//eu->sqe_size = 256;
memset(&params, 0, sizeof(params));
//params.flags = IORING_SETUP_SQPOLL;
ret = io_uring_queue_init_params(eu->sqe_size, &eu->ring, &params);
if (ret < 0) {
msg_fatal("%s(%d): init io_uring error=%s, size=%zd",
@ -594,9 +580,13 @@ EVENT *event_io_uring_create(int size)
__FUNCTION__, __LINE__, eu->sqe_size);
}
if (!(params.features & IORING_FEAT_FAST_POLL)) {
msg_info("IORING_FEAT_FAST_POLL not available in the kernel");
} else {
msg_info("IORING_FEAT_FAST_POLL is available in the kernel");
}
eu->appending = 0;
eu->queue = queue_new();
pthread_create(&eu->waiter, NULL, submit_waiter, eu);
eu->event.name = event_uring_name;
eu->event.handle = (acl_handle_t (*)(EVENT *)) event_uring_handle;

View File

@ -637,7 +637,7 @@ ssize_t acl_fiber_recvmsg(socket_t sockfd, struct msghdr *msg, int flags)
// flag was set and the fd was in non-block status in order to return imaginary
// from connecting process.
#if defined(HAS_IO_URING)
#if defined(HAS_IO_URING_xxx)
# define CHECK_SET_NBLOCK(_fd) do { \
if (var_hook_sys_api && !EVENT_IS_IO_URING(fiber_io_event())) { \
FILE_EVENT *fe = fiber_file_get(_fd); \