adding io_uring ...

This commit is contained in:
zhengshuxin 2022-10-08 10:25:40 -04:00
parent 64dab7b0f9
commit 14e9a8a159
6 changed files with 201 additions and 207 deletions

View File

@ -24,6 +24,7 @@ CFLAGS = -c -g -W \
-Wmissing-prototypes \
-Wcast-qual \
-DUSE_FAST_RING \
-DHAS_IO_URING \
#-O3 \
#-DUSE_VALGRIND \
#-DUSE_INLINE_MEMCPY\

View File

@ -136,15 +136,17 @@ struct FILE_EVENT {
#ifdef HAS_IO_URING
char *rbuf;
size_t rsize;
char *wbuf;
int rlen;
const char *wbuf;
size_t wsize;
int wlen;
#endif
#ifdef HAS_IOCP
char packet[1500]; // just for UDP packet
char *buff;
int size;
int len;
char *rbuf;
int rsize;
int rlen;
HANDLE h_iocp;
IOCP_EVENT *reader;
IOCP_EVENT *writer;
@ -196,8 +198,10 @@ struct EVENT {
long long stamp; // the stamp of the current fiber scheduler
unsigned flag;
#define EVENT_F_IOCP (1 << 0)
#define EVENT_IS_IOCP(x) ((x)->flag & EVENT_F_IOCP)
#define EVENT_F_IOCP (1 << 0)
#define EVENT_F_IO_URING (1 << 1)
#define EVENT_IS_IOCP(x) ((x)->flag & EVENT_F_IOCP)
#define EVENT_IS_IO_URING(x) ((x)->flag & EVENT_F_IO_URING)
#ifdef HAS_POLL
TIMER_CACHE *poll_list;

View File

@ -8,73 +8,6 @@
#include "event.h"
#include "event_io_uring.h"
typedef int (*que_init_fn)(unsigned, struct io_uring*, struct io_uring_params*);
typedef void (*que_exit_fn)(struct io_uring*);
typedef struct io_uring_sqe (*get_sqe_fn)(struct io_uring*);
typedef void (*sqe_set_flags_fn)(struct io_uring_sqe*, unsigned);
typedef void (*sqe_set_data_fn)(struct io_uring_sqe*, void*);
typedef void (*cqe_get_data_fn)(struct io_uring_cqe*);
typedef void (*prep_accept_fn)(struct io_uring_sqe*, int, struct sockaddr*, socklen_t*, int);
typedef void (*prep_read_fn)(struct io_uring_sqe*, int, void*, unsigned, __u64);
typedef void (*prep_write_fn)(struct io_uring_sqe*, int, const void*, unsigned, __u64);
typedef int (*submit_and_wait_fn)(struct io_uring*, struct io_uring_cqe**,
unsigned, struct __kernel_timespec, sigset_t*);
static que_init_fn __sys_que_init = NULL;
static que_exit_fn __sys_que_exit = NULL;
static get_sqe_fn __sys_get_sqe = NULL;
static sqe_set_flags_fn __sys_sqe_set_flags = NULL;
static sqe_set_data __sys_sqe_set_data = NULL;
static *cqe_get_data_fn __sys_cqe_get_data = NULL;
static prep_accept_fn __sys_prep_accept = NULL;
static prep_read_fn __sys_prep_read = NULL;
static prep_write_fn __sys_prep_write = NULL;
static submit_and_wait_fn __sys_submit_and_wait = NULL;
static void hook_api(void)
{
__sys_que_init = (que_init_fn) dlsym(RTLD_NEXT, "io_uring_queue_init_params");
assert(__sys_que_init);
__sys_que_exit = (que_exit_fn) dlsym(RTLD_NEXT, "io_uring_queue_exit");
assert(__sys_que_exit);
__sys_get_sqe = (get_sqe_fn) dlsym(RTLD_NEXT, "io_uring_get_sqe");
assert(__sys_get_sqe);
__sys_sqe_set_flags = (sqe_set_flags_fn) dlsym(RTLD_NEXT, "io_uring_sqe_set_flags");
assert(__sys_sqe_set_flags);
__sys_sqe_set_data = (sqe_set_data_fn) dlsym(RTLD_NEXT, "io_uring_sqe_set_data");
assert(__sys_sqe_set_data);
__sys_cqe_get_data = (cqe_get_data_fn) dlsym(RTLD_NEXT, "io_uring_cqe_get_data");
assert(__sys_cqe_get_data);
__sys_prep_accept = (prep_accept_fn) dlsym(RTLD_NEXT, "io_uring_prep_accept");
assert(__sys_prep_accept);
__sys_prep_read = (prep_read_fn) dlsym(RTLD_NEXT, "io_uring_prep_read");
assert(__sys_prep_read);
__sys_prep_write = (prep_write_fn) dlsym(RTLD_NEXT, "io_uring_prep_write");
assert(__sys_prep_write);
__sys_submit_and_wait = (submit_and_wait_fn) dlsym(RTLD_NEXT, "io_uring_submit_and_wait_timeout");
assert(__sys_submit_and_wait);
}
static pthread_once_t __once_control = PTHREAD_ONCE_INIT;
static void hook_init(void)
{
if (pthread_once(&__once_control, hook_api) != 0) {
abort();
}
}
/****************************************************************************/
typedef struct EVENT_URING {
EVENT event;
struct io_uring ring;
@ -82,33 +15,47 @@ typedef struct EVENT_URING {
static void event_uring_free(EVENT *ev)
{
EVENT_URING *eu = (EVENT_URING*) ev;
EVENT_URING *ep = (EVENT_URING*) ev;
__sys_que_exit(&eu->ring);
mem_free(eu);
io_uring_queue_exit(&ep->ring);
mem_free(ep);
}
static int event_uring_add_read(EVENT_URING *eu, FILE_EVENT *fe)
static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe)
{
struct io_uring_sqe *sqe = __sys_get_sqe(&eu->ring);
struct io_uring_sqe *sqe;
if (fe->mask & EVENT_READ) {
return 0;
}
fe->mask |= EVENT_READ;
sqe = io_uring_get_sqe(&ep->ring);
assert(sqe);
__sys_prep_read(sqe, fe->fd, fe->rbuf, fe->rsize, 0);
__sys_sqe_set_data(sqe, fe);
io_uring_sqe_set_data(sqe, fe);
io_uring_prep_read(sqe, fe->fd, fe->rbuf, fe->rsize, 0);
return 0;
}
static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe)
{
struct io_uring_sqe *sqe = __sys_get_sqe(&eu->ring);
struct io_uring_sqe *sqe;
if (fe->mask & EVENT_WRITE) {
return 0;
}
fe->mask |= EVENT_WRITE;
sqe = io_uring_get_sqe(&ep->ring);
assert(sqe);
__sys_prep_write(sqe, fe->fd, fe->wbuf, fe->wsize, 0);
__sys_sqe_set_data(sqe, fe);
io_uring_sqe_set_data(sqe, fe);
io_uring_prep_write(sqe, fe->fd, fe->wbuf, fe->wlen, 0);
return 0;
}
static int event_uring_del_read(EVENT_URING *ep, FILE_EVENT *fe)
static int event_uring_del_read(EVENT_URING *ep UNUSED, FILE_EVENT *fe)
{
if (!(fe->mask & EVENT_READ)) {
return 0;
@ -118,7 +65,7 @@ static int event_uring_del_read(EVENT_URING *ep, FILE_EVENT *fe)
return 0;
}
static int event_uring_del_write(EVENT_URING *ep, FILE_EVENT *fe)
static int event_uring_del_write(EVENT_URING *ep UNUSED, FILE_EVENT *fe)
{
if (!(fe->mask & EVENT_WRITE)) {
return 0;
@ -130,34 +77,56 @@ static int event_uring_del_write(EVENT_URING *ep, FILE_EVENT *fe)
static int event_uring_wait(EVENT *ev, int timeout)
{
EVENT_URING *eu = (EVENT_URING*) ev;
EVENT_URING *ep = (EVENT_URING*) ev;
struct __kernel_timespec ts, *tp;
struct io_uring_cqe *cqes;
unsigned count = 0;
struct io_uring_cqe *cqe;
unsigned count = 0, head;
FILE_EVENT *fe;
int n, i;
int n;
if (timeout >= 0) {
ts.tv_sec = timeout / 1000;
ts.tv_nsec = (((long long) timeout) % 1000) * 1000000;
tp = &ts;
} else {
tp = NULL;
}
n = __sys_submit_and_wait(&eu->ring, &cqe, 1, &tp, NULL);
n = io_uring_submit_and_wait_timeout(&ep->ring, &cqe, 1, tp, NULL);
if (n == 0) {
return 0;
} else if (n < 0) {
if (n == -ETIME) {
return 0;
}
printf("%s(%d): wait error=%d\r\n", __FUNCTION__, __LINE__, n);
return -1;
}
for (i = 0; i < n; i++) {
io_uring_for_each_cqe(&ep->ring, head, cqe) {
if (cqe->res == -ENOBUFS) {
return -1;
}
count++;
fe = (FILE_EVENT*) io_uring_cqe_get_data();
fe = (FILE_EVENT*) io_uring_cqe_get_data(cqe);
assert(fe);
io_uring_cqe_seen(&ep->ring, cqe);
if (fe && (fe->mask & EVENT_READ) && fe->r_proc) {
fe->mask &= ~EVENT_READ;
fe->rlen = cqe->res;
fe->r_proc(ev, fe);
}
if (fe && (fe->mask & EVENT_WRITE) && fe->w_proc) {
fe->mask &= ~EVENT_WRITE;
fe->w_proc(ev, fe);
}
}
return count;
}
static int event_uring_checkfd(EVENT *ev UNUSED, FILE_EVENT *fe UNUSED)
@ -167,7 +136,8 @@ static int event_uring_checkfd(EVENT *ev UNUSED, FILE_EVENT *fe UNUSED)
static long event_uring_handle(EVENT *ev)
{
return (long) &ev->ring;
EVENT_URING *ep = (EVENT_URING *) ev;
return (long) &ep->ring;
}
static const char *event_uring_name(void)
@ -175,27 +145,24 @@ static const char *event_uring_name(void)
return "io_uring";
}
EVENT *event_uring_create(int size)
EVENT *event_io_uring_create(int size)
{
EVENT_URING *eu = (EVENT_URING *) mem_calloc(1, sizeof(EVENT_URING));
struct io_uring_params params;
if (__sys_init_params == NULL) {
hook_init();
}
if (size <= 0 || size > 100) {
size = 100;
}
memset(&params, 0, sizeof(params));
if (__sys_init_params(size, &eu->ring, &params) < 0) {
if (io_uring_queue_init_params(size, &eu->ring, &params) < 0) {
abort();
}
eu->event.name = event_uring_name;
eu->event.handle = (acl_handle_t (*)(EVENT *)) event_uring_handle;
eu->event.free = event_uring_free;
eu->event.flag = EVENT_F_IO_URING;
eu->event.event_wait = event_uring_wait;
eu->event.checkfd = (event_oper *) event_uring_checkfd;

View File

@ -0,0 +1,12 @@
#ifndef EVENT_IO_URING_INCLUDE_H
#define EVENT_IO_URING_INCLUDE_H
#include "event.h"
#ifdef HAS_EPOLL
EVENT *event_io_uring_create(int size);
#endif
#endif

View File

@ -242,29 +242,29 @@ static int iocp_add_read(EVENT_IOCP *ev, FILE_EVENT *fe)
return iocp_add_listen(ev, fe);
}
/* If fe->buff has been set in io.c, we use it as overlapped buffer,
/* If fe->rbuf has been set in io.c, we use it as overlapped buffer,
* or we must check if the socket is for UDP and being in poll reading
* status, if so, we must use the fixed buffer as UDP's reading buffer,
* because IOCP will discard UDP packet when no buffer provided.
*/
if (fe->buff != NULL && fe->size > 0) {
wsaData.buf = fe->buff;
wsaData.len = fe->size;
if (fe->rbuf != NULL && fe->rsize > 0) {
wsaData.buf = fe->rbuf;
wsaData.len = fe->rsize;
} else if (IS_POLLING(fe) && fe->sock_type == SOCK_DGRAM) {
fe->buff = fe->packet;
fe->size = sizeof(fe->packet);
fe->len = 0;
fe->rbuf = fe->packet;
fe->rsize = sizeof(fe->packet);
fe->rlen = 0;
wsaData.buf = fe->packet;
wsaData.len = fe->size;
wsaData.len = fe->rsize;
} else {
wsaData.buf = fe->buff;
wsaData.len = fe->size;
wsaData.buf = fe->rbuf;
wsaData.len = fe->rsize;
}
ret = WSARecv(fe->fd, &wsaData, 1, &len, &flags,
(OVERLAPPED*) &event->overlapped, NULL);
fe->len = (int) len;
fe->rlen = (int) len;
if (ret != SOCKET_ERROR) {
fe->mask |= EVENT_READ;
@ -278,7 +278,7 @@ static int iocp_add_read(EVENT_IOCP *ev, FILE_EVENT *fe)
fe->mask |= EVENT_ERR;
#if 0
fe->mask &= ~EVENT_READ;
fe->len = -1;
fe->rlen = -1;
array_append(ev->events, event);
#else
iocp_event_save(ev, event, fe, -1);
@ -418,7 +418,7 @@ static int iocp_add_write(EVENT_IOCP *ev, FILE_EVENT *fe)
fe->mask |= EVENT_ERR;
#if 0
fe->mask &= ~EVENT_WRITE;
fe->len = -1;
fe->rlen = -1;
array_append(ev->events, event);
#else
iocp_event_save(ev, event, fe, -1);
@ -500,7 +500,7 @@ static void iocp_event_save(EVENT_IOCP *ei, IOCP_EVENT *event,
CLR_WRITEWAIT(fe);
}
fe->len = (int) trans;
fe->rlen = (int) trans;
array_append(ei->events, event);
}

View File

@ -108,60 +108,116 @@ int WINAPI acl_fiber_close(socket_t fd)
/****************************************************************************/
#ifdef SYS_UNIX
//# define READ_FIRST
# ifdef READ_FIRST
ssize_t acl_fiber_read(socket_t fd, void *buf, size_t count)
#if defined(HAS_IOCP) || defined(HAS_IO_URING)
static int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
{
FILE_EVENT* fe;
if (fd == INVALID_SOCKET) {
msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
return -1;
#if defined(HAS_IOCP)
/* If the socket type is UDP, We must check the fixed buffer first,
* which maybe used in iocp_add_read() and set for polling read status.
*/
if (fe->sock_type == SOCK_DGRAM && fe->rbuf == fe->packet && fe->rlen > 0) {
if (fe->rlen < len) {
len = fe->rlen;
}
memcpy(buf, fe->packet, len);
fe->rbuf = NULL;
fe->rlen = 0;
return len;
}
#endif
if (sys_read == NULL) {
hook_once();
}
if (!var_hook_sys_api) {
return (*sys_read)(fd, buf, count);
}
fe = fiber_file_open(fd);
CLR_POLLING(fe);
fe->rbuf = buf;
fe->rsize = (size_t) len;
fe->rlen = 0;
while (1) {
ssize_t n;
int err;
if (acl_fiber_canceled(fe->fiber)) {
acl_fiber_set_error(fe->fiber->errnum);
fe->mask &= ~EVENT_READ;
if (fiber_wait_read(fe) < 0) {
msg_error("%s(%d): fiber_wait_read error=%s, fd=%d",
__FUNCTION__, __LINE__, last_serror(), (int) fe->fd);
return -1;
}
n = (*sys_read)(fd, buf, count);
if (n >= 0) {
return n;
if (fe->mask & EVENT_ERR) {
err = acl_fiber_last_error();
fiber_save_errno(err);
return -1;
}
if (acl_fiber_canceled(fe->fiber_r)) {
acl_fiber_set_error(fe->fiber_r->errnum);
return -1;
}
if (fe->rlen >= 0) {
return fe->rlen;
}
err = acl_fiber_last_error();
fiber_save_errno(err);
if (!error_again(err)) {
return -1;
}
if (fiber_wait_read(fe) < 0) {
msg_error("%s(%d): fiber_wait_read error=%s, fd=%d",
__FUNCTION__, __LINE__, last_serror(), (int) fd);
if (fe->type != TYPE_SPIPE) {
fiber_file_free(fe);
}
return -1;
}
}
}
# else
#endif // HAS_IOCP || HAS_IO_URING
#if defined(HAS_IO_URING)
static int fiber_iocp_write(FILE_EVENT *fe, const char *buf, int len)
{
fe->wbuf = buf;
fe->wsize = (size_t) len;
fe->wlen = 0;
while (1) {
int err;
fe->mask &= ~EVENT_WRITE;
if (fiber_wait_write(fe) < 0) {
msg_error("%s(%d): fiber_wait_write error=%s, fd=%d",
__FUNCTION__, __LINE__, last_serror(), (int) fe->fd);
return -1;
}
if (fe->mask & EVENT_ERR) {
err = acl_fiber_last_error();
fiber_save_errno(err);
return -1;
}
if (acl_fiber_canceled(fe->fiber_w)) {
acl_fiber_set_error(fe->fiber_w->errnum);
return -1;
}
if (fe->wlen == (int) fe->wsize) {
return fe->wlen;
}
err = acl_fiber_last_error();
fiber_save_errno(err);
if (!error_again(err)) {
if (fe->type != TYPE_SPIPE) {
fiber_file_free(fe);
}
return -1;
}
}
}
#endif
#ifdef SYS_UNIX
ssize_t acl_fiber_read(socket_t fd, void *buf, size_t count)
{
FILE_EVENT* fe;
@ -182,6 +238,12 @@ ssize_t acl_fiber_read(socket_t fd, void *buf, size_t count)
fe = fiber_file_open_read(fd);
CLR_POLLING(fe);
#ifdef HAS_IO_URING
if (EVENT_IS_IO_URING(fiber_io_event())) {
return fiber_iocp_read(fe, buf, (int) count);
}
#endif
while (1) {
ssize_t ret;
int err;
@ -237,7 +299,6 @@ ssize_t acl_fiber_read(socket_t fd, void *buf, size_t count)
}
}
}
# endif // READ_FIRST
ssize_t acl_fiber_readv(socket_t fd, const struct iovec *iov, int iovcnt)
{
@ -294,65 +355,6 @@ ssize_t acl_fiber_readv(socket_t fd, const struct iovec *iov, int iovcnt)
}
#endif // SYS_UNIX
#ifdef HAS_IOCP
static int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
{
/* If the socket type is UDP, We must check the fixed buffer first,
* which maybe used in iocp_add_read() and set for polling read status.
*/
if (fe->sock_type == SOCK_DGRAM && fe->buff == fe->packet && fe->len > 0) {
if (fe->len < len) {
len = fe->len;
}
memcpy(buf, fe->packet, len);
fe->buff = NULL;
fe->len = 0;
return len;
}
fe->buff = buf;
fe->size = len;
fe->len = 0;
while (1) {
int err;
fe->mask &= ~EVENT_READ;
if (fiber_wait_read(fe) < 0) {
msg_error("%s(%d): fiber_wait_read error=%s, fd=%d",
__FUNCTION__, __LINE__, last_serror(), (int) fe->fd);
return -1;
}
if (fe->mask & EVENT_ERR) {
err = acl_fiber_last_error();
fiber_save_errno(err);
return -1;
}
if (acl_fiber_canceled(fe->fiber_r)) {
acl_fiber_set_error(fe->fiber_r->errnum);
return -1;
}
if (fe->len >= 0) {
return fe->len;
}
err = acl_fiber_last_error();
fiber_save_errno(err);
if (!error_again(err)) {
if (fe->type != TYPE_SPIPE) {
fiber_file_free(fe);
}
return -1;
}
}
}
#endif
#ifdef SYS_WIN
int WINAPI acl_fiber_WSARecv(socket_t sockfd,
LPWSABUF lpBuffers,
@ -590,6 +592,14 @@ ssize_t acl_fiber_write(socket_t fd, const void *buf, size_t count)
CHECK_SET_NBLOCK(fd);
#if defined(HAS_IO_URING)
if (EVENT_IS_IO_URING(fiber_io_event())) {
FILE_EVENT *fe = fiber_file_open_write(fd);
CLR_POLLING(fe);
return fiber_iocp_write(fe, buf, (int) count);
}
#endif
while (1) {
ssize_t n = (*sys_write)(fd, buf, count);
FILE_EVENT *fe;