Optimize and test io_uring event mode in fiber module.

This commit is contained in:
root 2024-11-06 15:49:49 +08:00
parent d404b51e7c
commit fde9c6316f
6 changed files with 76 additions and 58 deletions

View File

@ -6,6 +6,7 @@
#include "io.h"
#if defined(HAS_IO_URING)
static int uring_wait_read(FILE_EVENT *fe)
{
while (1) {
@ -30,13 +31,16 @@ static int uring_wait_read(FILE_EVENT *fe)
return fe->reader_ctx.res;
}
err = acl_fiber_last_error();
// Use the negative of res as the error.
err = -fe->reader_ctx.res;
acl_fiber_set_error(err);
fiber_save_errno(err);
if (!error_again(err)) {
if (!(fe->type & TYPE_EVENTABLE)) {
fiber_file_free(fe);
}
// Don't free fe here, which will be freed in close.
//if (!(fe->type & TYPE_EVENTABLE)) {
// fiber_file_free(fe);
//}
return -1;
}
}
@ -60,9 +64,9 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
return iocp_wait_read(fe);
}
#endif // HAS_IO_URING
#if defined(HAS_IOCP)
#elif defined(HAS_IOCP)
static int iocp_wait_read(FILE_EVENT *fe)
{
while (1) {
@ -122,7 +126,8 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
fe->res = 0;
return iocp_wait_read(fe);
}
#endif // HAS_IOCP
#endif // HAS_IOCP || HAS_IO_URING
// After calling fiber_wait_read():
// The fiber_wait_read will return three status:
@ -148,7 +153,8 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
// not monitor the file fd in fiber_wait_read.
#if defined(_WIN32) || defined(_WIN64)
#define FIBER_READ(_fn, _fe, ...) do { \
# define FIBER_READ(_fn, _fe, ...) do { \
ssize_t ret; \
int err; \
if (IS_READABLE((_fe))) { \
@ -172,8 +178,10 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
return -1; \
} \
} while (1)
#else
#define FIBER_READ(_fn, _fe, _args...) do { \
# define FIBER_READ(_fn, _fe, _args...) do { \
ssize_t ret; \
int err; \
if (IS_READABLE((_fe))) { \
@ -197,6 +205,7 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
return -1; \
} \
} while (1)
#endif
#define FILE_ALLOC(f, t, fd) do { \
@ -212,18 +221,18 @@ ssize_t fiber_read(FILE_EVENT *fe, void *buf, size_t count)
CLR_POLLING(fe);
#ifdef HAS_IO_URING
// One FILE_EVENT can be used by multiple fibers with the same
// EVENT_BUSY_READ or EVENT_BUSY_WRITE in the same time. But can be
// used by two fibers that one is a reader and the other is a writer,
// because there're two different objects for reader and writer.
if (EVENT_IS_IO_URING(fiber_io_event())) {
#define SET_READ(f) do { \
# define SET_READ(f) do { \
(f)->in.read_ctx.buf = buf; \
(f)->in.read_ctx.len = (int) count; \
(f)->mask |= EVENT_READ; \
} while (0)
// One FILE_EVENT can be used by multiple fibers with the same
// EVENT_BUSY_READ or EVENT_BUSY_WRITE in the same time. But can be
// used by two fibers that one is a reader and the other is a writer,
// because there're two different objects for reader and writer.
if (EVENT_IS_IO_URING(fiber_io_event())) {
int ret;
if (!(fe->busy & EVENT_BUSY_READ)) {
@ -251,15 +260,15 @@ ssize_t fiber_readv(FILE_EVENT *fe, const struct iovec *iov, int iovcnt)
CLR_POLLING(fe);
#ifdef HAS_IO_URING
if (EVENT_IS_IO_URING(fiber_io_event())) {
#define SET_READV(f) do { \
# define SET_READV(f) do { \
(f)->in.readv_ctx.iov = iov; \
(f)->in.readv_ctx.cnt = iovcnt; \
(f)->in.readv_ctx.off = 0; \
(f)->mask |= EVENT_READV; \
} while (0)
if (EVENT_IS_IO_URING(fiber_io_event())) {
int ret;
if (!(fe->busy & EVENT_BUSY_READ)) {
@ -287,14 +296,14 @@ ssize_t fiber_recvmsg(FILE_EVENT *fe, struct msghdr *msg, int flags)
CLR_POLLING(fe);
#ifdef HAS_IO_URING
if (EVENT_IS_IO_URING(fiber_io_event())) {
#define SET_RECVMSG(f) do { \
# define SET_RECVMSG(f) do { \
(f)->in.recvmsg_ctx.msg = msg; \
(f)->in.recvmsg_ctx.flags = flags; \
(f)->mask |= EVENT_RECVMSG; \
} while (0)
if (EVENT_IS_IO_URING(fiber_io_event())) {
int ret;
if (!(fe->busy & EVENT_BUSY_READ)) {
@ -318,6 +327,7 @@ ssize_t fiber_recvmsg(FILE_EVENT *fe, struct msghdr *msg, int flags)
}
# ifdef HAS_MMSG
ssize_t fiber_recvmmsg(FILE_EVENT *fe, struct mmsghdr *msgvec,
unsigned int vlen, int flags, const struct timespec *timeout)
{
@ -336,6 +346,7 @@ ssize_t fiber_recvmmsg(FILE_EVENT *fe, struct mmsghdr *msgvec,
FIBER_READ(sys_recvmmsg, fe, msgvec, vlen, flags, NULL);
}
# endif // HAS_MMSG
#endif // SYS_UNIX
@ -353,15 +364,15 @@ ssize_t fiber_recv(FILE_EVENT *fe, void *buf, size_t len, int flags)
# endif
}
#elif defined(HAS_IO_URING)
if (EVENT_IS_IO_URING(fiber_io_event())) {
#define SET_RECV(f) do { \
# define SET_RECV(f) do { \
(f)->in.recv_ctx.buf = buf; \
(f)->in.recv_ctx.len = (unsigned) len; \
(f)->in.recv_ctx.flags = flags; \
(f)->mask |= EVENT_RECV; \
} while (0)
if (EVENT_IS_IO_URING(fiber_io_event())) {
int ret;
if (!(fe->busy & EVENT_BUSY_READ)) {
@ -398,9 +409,8 @@ ssize_t fiber_recvfrom(FILE_EVENT *fe, void *buf, size_t len,
return fiber_iocp_read(fe, buf, (int) len);
}
#elif defined(HAS_IO_URING) && defined(IO_URING_HAS_RECVFROM)
if (EVENT_IS_IO_URING(fiber_io_event())) {
#define SET_RECVFROM(f) do { \
# define SET_RECVFROM(f) do { \
(f)->in.recvfrom_ctx.buf = buf; \
(f)->in.recvfrom_ctx.len = (unsigned) len; \
(f)->in.recvfrom_ctx.flags = flags; \
@ -409,6 +419,7 @@ ssize_t fiber_recvfrom(FILE_EVENT *fe, void *buf, size_t len,
(f)->mask |= EVENT_RECVFROM; \
} while (0)
if (EVENT_IS_IO_URING(fiber_io_event())) {
int ret;
if (!(fe->busy & EVENT_BUSY_READ)) {

View File

@ -10,6 +10,7 @@
// from connecting process.
#if defined(HAS_IO_URING)
# define CHECK_SET_NBLOCK(_fd) do { \
if (var_hook_sys_api && !EVENT_IS_IO_URING(fiber_io_event())) { \
FILE_EVENT *_fe = fiber_file_get(_fd); \
@ -19,7 +20,9 @@
} \
} \
} while (0)
#else
# define CHECK_SET_NBLOCK(_fd) do { \
if (var_hook_sys_api) { \
FILE_EVENT *_fe = fiber_file_get(_fd); \
@ -29,7 +32,8 @@
} \
} \
} while (0)
#endif
#endif // HAS_IO_URING
static int wait_write(FILE_EVENT *fe)
{
@ -52,7 +56,8 @@ static int wait_write(FILE_EVENT *fe)
}
#if defined(HAS_IO_URING)
static int iocp_wait_write(FILE_EVENT *fe)
static int uring_wait_write(FILE_EVENT *fe)
{
while (1) {
int err;
@ -73,22 +78,22 @@ static int iocp_wait_write(FILE_EVENT *fe)
fiber_save_errno(err);
if (!error_again(err)) {
if (!(fe->type & TYPE_EVENTABLE)) {
fiber_file_free(fe);
}
// Don't free fe here, which will be freed in close.
//if (!(fe->type & TYPE_EVENTABLE)) {
// fiber_file_free(fe);
//}
return -1;
}
}
}
#endif
#if defined(HAS_IO_URING)
int fiber_iocp_write(FILE_EVENT *fe, const char *buf, int len)
int fiber_uring_write(FILE_EVENT *fe, const char *buf, int len)
{
fe->out.write_ctx.buf = buf;
fe->out.write_ctx.len = len;
return iocp_wait_write(fe);
return uring_wait_write(fe);
}
#endif // HAS_IO_URING
#define CHECK_WRITE_RESULT(_fe, _n) do { \
@ -112,34 +117,34 @@ int fiber_iocp_write(FILE_EVENT *fe, const char *buf, int len)
(__fe)->type = TYPE_EVENTABLE; \
} while (0)
#ifdef SYS_UNIX
ssize_t fiber_write(FILE_EVENT *fe, const void *buf, size_t count)
{
CLR_POLLING(fe);
#if defined(HAS_IO_URING)
if (EVENT_IS_IO_URING(fiber_io_event()) && !(fe->mask & EVENT_SYSIO)) {
#define SET_WRITE(f) do { \
# define SET_WRITE(f) do { \
(f)->out.write_ctx.buf = buf; \
(f)->out.write_ctx.len = (unsigned) count; \
(f)->mask |= EVENT_WRITE; \
} while (0)
if (EVENT_IS_IO_URING(fiber_io_event()) && !(fe->mask & EVENT_SYSIO)) {
int ret;
if (!(fe->busy & EVENT_BUSY_WRITE)) {
SET_WRITE(fe);
fe->busy |= EVENT_BUSY_WRITE;
ret = iocp_wait_write(fe);
ret = uring_wait_write(fe);
fe->busy &= ~EVENT_BUSY_WRITE;
} else {
FILE_ALLOC(fe, 0, fe->fd);
SET_WRITE(fe);
ret = iocp_wait_write(fe);
ret = uring_wait_write(fe);
file_event_unrefer(fe);
}
return ret;
@ -164,28 +169,28 @@ ssize_t fiber_writev(FILE_EVENT *fe, const struct iovec *iov, int iovcnt)
CLR_POLLING(fe);
#if defined(HAS_IO_URING)
if (EVENT_IS_IO_URING(fiber_io_event())) {
#define SET_WRITEV(f) do { \
# define SET_WRITEV(f) do { \
(f)->out.writev_ctx.iov = iov; \
(f)->out.writev_ctx.cnt = iovcnt; \
(f)->out.writev_ctx.off = 0; \
(f)->mask |= EVENT_WRITEV; \
} while (0)
if (EVENT_IS_IO_URING(fiber_io_event())) {
int ret;
if (!(fe->busy & EVENT_BUSY_WRITE)) {
SET_WRITEV(fe);
fe->busy |= EVENT_BUSY_WRITE;
ret = iocp_wait_write(fe);
ret = uring_wait_write(fe);
fe->busy &= ~EVENT_BUSY_WRITE;
} else {
FILE_ALLOC(fe, 0, fe->fd);
SET_WRITEV(fe);
ret = iocp_wait_write(fe);
ret = uring_wait_write(fe);
file_event_unrefer(fe);
}
return ret;
@ -211,28 +216,28 @@ ssize_t fiber_send(FILE_EVENT *fe, const void *buf, size_t len, int flags)
CLR_POLLING(fe);
#if defined(HAS_IO_URING)
if (EVENT_IS_IO_URING(fiber_io_event())) {
#define SET_SEND(f) do { \
# define SET_SEND(f) do { \
(f)->out.send_ctx.buf = buf; \
(f)->out.send_ctx.len = (unsigned) len; \
(f)->out.send_ctx.flags = flags; \
(f)->mask |= EVENT_SEND; \
} while (0)
if (EVENT_IS_IO_URING(fiber_io_event())) {
int ret;
if (!(fe->busy & EVENT_BUSY_WRITE)) {
SET_SEND(fe);
fe->busy |= EVENT_BUSY_WRITE;
ret = iocp_wait_write(fe);
ret = uring_wait_write(fe);
fe->busy &= ~EVENT_BUSY_WRITE;
} else {
FILE_ALLOC(fe, 0, fe->fd);
SET_SEND(fe);
ret = iocp_wait_write(fe);
ret = uring_wait_write(fe);
file_event_unrefer(fe);
}
return ret;
@ -262,9 +267,8 @@ ssize_t fiber_sendto(FILE_EVENT *fe, const void *buf, size_t len,
CLR_POLLING(fe);
#if defined(HAS_IO_URING) && defined(IO_URING_HAS_SENDTO)
if (EVENT_IS_IO_URING(fiber_io_event())) {
#define SET_SENDTO(f) do { \
# define SET_SENDTO(f) do { \
(f)->out.sendto_ctx.buf = buf; \
(f)->out.sendto_ctx.len = (unsigned) len; \
(f)->out.sendto_ctx.flags = flags; \
@ -273,19 +277,20 @@ ssize_t fiber_sendto(FILE_EVENT *fe, const void *buf, size_t len,
(f)->mask |= EVENT_SENDTO; \
} while (0)
if (EVENT_IS_IO_URING(fiber_io_event())) {
int ret;
if (!(fe->busy & EVENT_BUSY_WRITE)) {
SET_SENDTO(fe);
fe->busy |= EVENT_BUSY_WRITE;
ret = iocp_wait_write(fe);
ret = uring_wait_write(fe);
fe->busy &= ~EVENT_BUSY_WRITE;
} else {
FILE_ALLOC(fe, 0, fe->fd);
SET_SENDTO(fe);
ret = iocp_wait_write(fe);
ret = uring_wait_write(fe);
file_event_unrefer(fe);
}
return ret;
@ -312,32 +317,33 @@ ssize_t fiber_sendto(FILE_EVENT *fe, const void *buf, size_t len,
}
#ifdef SYS_UNIX
ssize_t fiber_sendmsg(FILE_EVENT *fe, const struct msghdr *msg, int flags)
{
CLR_POLLING(fe);
#if defined(HAS_IO_URING)
if (EVENT_IS_IO_URING(fiber_io_event())) {
#define SET_SENDMSG(f) do { \
# define SET_SENDMSG(f) do { \
(f)->out.sendmsg_ctx.msg = msg; \
(f)->out.sendmsg_ctx.flags = flags; \
(f)->mask |= EVENT_SENDMSG; \
} while (0)
if (EVENT_IS_IO_URING(fiber_io_event())) {
int ret;
if (!(fe->busy & EVENT_BUSY_WRITE)) {
SET_SENDMSG(fe);
fe->busy |= EVENT_BUSY_WRITE;
ret = iocp_wait_write(fe);
ret = uring_wait_write(fe);
fe->busy &= ~EVENT_BUSY_WRITE;
} else {
FILE_ALLOC(fe, 0, fe->fd);
SET_SENDMSG(fe);
ret = iocp_wait_write(fe);
ret = uring_wait_write(fe);
file_event_unrefer(fe);
}
return ret;
@ -358,6 +364,7 @@ ssize_t fiber_sendmsg(FILE_EVENT *fe, const struct msghdr *msg, int flags)
}
# ifdef HAS_MMSG
int fiber_sendmmsg(FILE_EVENT *fe, struct mmsghdr *msgvec, unsigned int vlen,
int flags)
{

View File

@ -504,7 +504,7 @@ ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset)
fe->fd = fd;
fe->out.write_ctx.off = offset;
ret = fiber_iocp_write(fe, buf, (int) count);
ret = fiber_uring_write(fe, buf, (int) count);
file_event_unrefer(fe);
return ret;

View File

@ -34,7 +34,7 @@ ssize_t fiber_recvmmsg(FILE_EVENT *fe, struct mmsghdr *msgvec,
# endif
// in fiber_write.c
int fiber_iocp_write(FILE_EVENT *fe, const char *buf, int len);
int fiber_uring_write(FILE_EVENT *fe, const char *buf, int len);
ssize_t fiber_write(FILE_EVENT *fe, const void *buf, size_t count);
ssize_t fiber_writev(FILE_EVENT *fe, const struct iovec *iov, int iovcnt);

View File

@ -83,10 +83,10 @@ ifeq ($(findstring Linux, $(UNIXNAME)), Linux)
# CFLAGS += -DLINUX2
SYSLIB += -lcrypt -rdynamic -ldl -lrt
ifeq ($(HAS_IO_URING), yes)
SYSLIB += -luring-ffi
SYSLIB += -luring-ffi -Wl,-rpath,/usr/lib
endif
ifeq ($(has_io_uring), yes)
SYSLIB += -luring-ffi
SYSLIB += -luring-ffi -Wl,-rpath,/usr/lib
endif
ifeq ($(DEBUG_STACK), yes)
SYSLIB += -lunwind -lunwind-generic

View File

@ -160,7 +160,7 @@ static void echo_client(SOCKET fd)
ret = read(fd, buf, BUF_SIZE - 1);
#endif
if (ret <= 0) {
printf("read error: %s\r\n", acl_last_serror());
printf("read error: %s, fd=%d\r\n", acl_last_serror(), fd);
break;
}