use different context for read/write operations in io_uring module

This commit is contained in:
zhengshuxin 2022-11-02 06:57:18 -04:00
parent aef12af92e
commit 49b617f2e3
9 changed files with 156 additions and 144 deletions

View File

@ -416,7 +416,7 @@ TAG_AGAIN:
return (n); return (n);
} }
static int __loop_writen(ACL_VSTREAM *stream, const void *vptr, size_t dlen) static int __loop_writen(ACL_VSTREAM *stream, const void *vptr, ssize_t dlen)
{ {
const unsigned char *ptr; const unsigned char *ptr;
int n; int n;
@ -433,6 +433,8 @@ static int __loop_writen(ACL_VSTREAM *stream, const void *vptr, size_t dlen)
return (ACL_VSTREAM_EOF); return (ACL_VSTREAM_EOF);
} }
assert(n <= dlen);
dlen -= n; dlen -= n;
ptr += n; ptr += n;
} }
@ -448,7 +450,7 @@ int private_vstream_writen(ACL_VSTREAM *stream, const void *vptr, size_t dlen)
if (private_vstream_fflush(stream) == ACL_VSTREAM_EOF) if (private_vstream_fflush(stream) == ACL_VSTREAM_EOF)
return (ACL_VSTREAM_EOF); return (ACL_VSTREAM_EOF);
} }
return (__loop_writen(stream, vptr, dlen)); return (__loop_writen(stream, vptr, (ssize_t) dlen));
} }
int private_vstream_write(ACL_VSTREAM *stream, const void *vptr, size_t dlen) int private_vstream_write(ACL_VSTREAM *stream, const void *vptr, size_t dlen)

View File

@ -166,6 +166,12 @@ int event_checkfd(EVENT *ev, FILE_EVENT *fe)
acl_fiber_set_error(0); acl_fiber_set_error(0);
return 0; return 0;
} }
#if defined(HAS_IO_URING)
} else if (EVENT_IS_IO_URING(ev)) {
fe->type = TYPE_FILE | TYPE_EVENTABLE;
acl_fiber_set_error(0);
return 1;
#endif
} else { } else {
fe->type = TYPE_FILE; fe->type = TYPE_FILE;
acl_fiber_set_error(0); acl_fiber_set_error(0);
@ -250,20 +256,24 @@ int event_add_read(EVENT *ev, FILE_EVENT *fe, event_proc *proc)
return 0; return 0;
} }
fe->r_proc = proc;
if (fe->oper & EVENT_DEL_READ) { if (fe->oper & EVENT_DEL_READ) {
fe->oper &= ~EVENT_DEL_READ; fe->oper &= ~EVENT_DEL_READ;
} }
if (!(fe->mask & EVENT_READ)) { if (!(fe->mask & EVENT_READ)) {
if (EVENT_IS_IO_URING(ev)) {
ev->add_read(ev, fe);
}
// we should check the fd's type for the first time. // we should check the fd's type for the first time.
if (fe->me.parent == &fe->me) { else if (fe->me.parent == &fe->me) {
ring_prepend(&ev->events, &fe->me); ring_prepend(&ev->events, &fe->me);
} }
fe->oper |= EVENT_ADD_READ; fe->oper |= EVENT_ADD_READ;
} }
fe->r_proc = proc;
return 1; return 1;
} }
@ -293,19 +303,22 @@ int event_add_write(EVENT *ev, FILE_EVENT *fe, event_proc *proc)
return 0; return 0;
} }
fe->w_proc = proc;
if (fe->oper & EVENT_DEL_WRITE) { if (fe->oper & EVENT_DEL_WRITE) {
fe->oper &= ~EVENT_DEL_WRITE; fe->oper &= ~EVENT_DEL_WRITE;
} }
if (!(fe->mask & EVENT_WRITE)) { if (!(fe->mask & EVENT_WRITE)) {
if (fe->me.parent == &fe->me) { if (EVENT_IS_IO_URING(ev)) {
ev->add_write(ev, fe);
} else if (fe->me.parent == &fe->me) {
ring_prepend(&ev->events, &fe->me); ring_prepend(&ev->events, &fe->me);
} }
fe->oper |= EVENT_ADD_WRITE; fe->oper |= EVENT_ADD_WRITE;
} }
fe->w_proc = proc;
return 1; return 1;
} }

View File

@ -60,6 +60,15 @@ typedef void epoll_proc(EVENT *ev, EPOLL_EVENT *ee);
typedef struct IOCP_EVENT IOCP_EVENT; typedef struct IOCP_EVENT IOCP_EVENT;
#endif #endif
#ifdef HAS_IO_URING
typedef struct IO_URING_CTX {
FILE_EVENT *fe;
int res;
int cnt;
unsigned mask;
} IO_URING_CTX;
#endif
/** /**
* for each connection fd * for each connection fd
*/ */
@ -238,8 +247,6 @@ struct FILE_EVENT {
} sendmsg_ctx; } sendmsg_ctx;
} out; } out;
int res;
union { union {
struct { struct {
struct sockaddr_in addr; struct sockaddr_in addr;
@ -250,6 +257,8 @@ struct FILE_EVENT {
char *path; char *path;
} var; } var;
struct IO_URING_CTX reader_ctx;
struct IO_URING_CTX writer_ctx;
struct __kernel_timespec rts; struct __kernel_timespec rts;
struct __kernel_timespec wts; struct __kernel_timespec wts;
int r_timeout; int r_timeout;

View File

@ -13,6 +13,7 @@ typedef struct EVENT_URING {
struct io_uring ring; struct io_uring ring;
size_t sqe_size; size_t sqe_size;
size_t appending; size_t appending;
size_t loop_count;
} EVENT_URING; } EVENT_URING;
static void event_uring_free(EVENT *ev) static void event_uring_free(EVENT *ev)
@ -41,7 +42,8 @@ static void add_read_wait(EVENT_URING *ep, FILE_EVENT *fe, int tmo_ms)
sqe = io_uring_get_sqe(&ep->ring); sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_poll_add(sqe, fe->fd, POLLIN | POLLHUP | POLLERR); io_uring_prep_poll_add(sqe, fe->fd, POLLIN | POLLHUP | POLLERR);
io_uring_sqe_set_data(sqe, fe); fe->reader_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->reader_ctx);
sqe->flags = IOSQE_IO_LINK; sqe->flags = IOSQE_IO_LINK;
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
@ -62,6 +64,7 @@ static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe)
} }
fe->mask |= EVENT_READ; fe->mask |= EVENT_READ;
fe->reader_ctx.mask = EVENT_READ;
if (LIKELY(!(fe->mask & (EVENT_POLLIN | EVENT_ACCEPT)))) { if (LIKELY(!(fe->mask & (EVENT_POLLIN | EVENT_ACCEPT)))) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
@ -95,7 +98,9 @@ static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe)
fe->in.read_ctx.len, fe->in.read_ctx.len,
fe->in.read_ctx.off); fe->in.read_ctx.off);
} }
io_uring_sqe_set_data(sqe, fe);
fe->reader_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->reader_ctx);
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
} else if (fe->mask & EVENT_POLLIN) { } else if (fe->mask & EVENT_POLLIN) {
@ -106,7 +111,9 @@ static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe)
io_uring_prep_accept(sqe, fe->fd, io_uring_prep_accept(sqe, fe->fd,
(struct sockaddr*) &fe->var.peer.addr, (struct sockaddr*) &fe->var.peer.addr,
(socklen_t*) &fe->var.peer.len, 0); (socklen_t*) &fe->var.peer.len, 0);
io_uring_sqe_set_data(sqe, fe);
fe->reader_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->reader_ctx);
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
} }
@ -120,7 +127,8 @@ static void add_write_wait(EVENT_URING *ep, FILE_EVENT *fe, int tmo_ms)
sqe = io_uring_get_sqe(&ep->ring); sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_poll_add(sqe, fe->fd, POLLOUT | POLLHUP | POLLERR); io_uring_prep_poll_add(sqe, fe->fd, POLLOUT | POLLHUP | POLLERR);
io_uring_sqe_set_data(sqe, fe); fe->writer_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->writer_ctx);
sqe->flags = IOSQE_IO_LINK; sqe->flags = IOSQE_IO_LINK;
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
@ -141,6 +149,8 @@ static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe)
} }
fe->mask |= EVENT_WRITE; fe->mask |= EVENT_WRITE;
fe->writer_ctx.mask = EVENT_WRITE;
fe->writer_ctx.cnt++;
if (LIKELY(!(fe->mask & (EVENT_POLLOUT | EVENT_CONNECT)))) { if (LIKELY(!(fe->mask & (EVENT_POLLOUT | EVENT_CONNECT)))) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
@ -174,7 +184,9 @@ static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe)
fe->out.write_ctx.len, fe->out.write_ctx.len,
fe->out.write_ctx.off); fe->out.write_ctx.off);
} }
io_uring_sqe_set_data(sqe, fe);
fe->writer_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->writer_ctx);
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
} else if (fe->mask & EVENT_POLLOUT) { } else if (fe->mask & EVENT_POLLOUT) {
@ -185,7 +197,9 @@ static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe)
io_uring_prep_connect(sqe, fe->fd, io_uring_prep_connect(sqe, fe->fd,
(struct sockaddr*) &fe->var.peer.addr, (struct sockaddr*) &fe->var.peer.addr,
(socklen_t) fe->var.peer.len); (socklen_t) fe->var.peer.len);
io_uring_sqe_set_data(sqe, fe);
fe->writer_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->writer_ctx);
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
} }
@ -199,7 +213,8 @@ void event_uring_file_close(EVENT *ev, FILE_EVENT *fe)
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_close(sqe, fe->fd); io_uring_prep_close(sqe, fe->fd);
io_uring_sqe_set_data(sqe, fe); fe->reader_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->reader_ctx);
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
} }
@ -210,7 +225,8 @@ void event_uring_file_openat(EVENT *ev, FILE_EVENT *fe, int dirfd,
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_openat(sqe, dirfd, pathname, flags, mode); io_uring_prep_openat(sqe, dirfd, pathname, flags, mode);
io_uring_sqe_set_data(sqe, fe); fe->reader_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->reader_ctx);
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
} }
@ -220,7 +236,8 @@ void event_uring_file_unlink(EVENT *ev, FILE_EVENT *fe, const char *pathname)
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_unlink(sqe, pathname, 0); io_uring_prep_unlink(sqe, pathname, 0);
io_uring_sqe_set_data(sqe, fe); fe->reader_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->reader_ctx);
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
} }
@ -232,7 +249,8 @@ void event_uring_file_statx(EVENT *ev, FILE_EVENT *fe, int dirfd,
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_statx(sqe, dirfd, pathname, flags, mask, statxbuf); io_uring_prep_statx(sqe, dirfd, pathname, flags, mask, statxbuf);
io_uring_sqe_set_data(sqe, fe); fe->reader_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->reader_ctx);
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
} }
@ -243,7 +261,8 @@ void event_uring_file_renameat2(EVENT *ev, FILE_EVENT *fe, int olddirfd,
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_renameat(sqe, olddirfd, oldpath, newdirfd, newpath, flags); io_uring_prep_renameat(sqe, olddirfd, oldpath, newdirfd, newpath, flags);
io_uring_sqe_set_data(sqe, fe); fe->reader_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->reader_ctx);
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
} }
@ -254,7 +273,8 @@ void event_uring_mkdirat(EVENT *ev, FILE_EVENT *fe, int dirfd,
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_mkdirat(sqe, dirfd, pathname, mode); io_uring_prep_mkdirat(sqe, dirfd, pathname, mode);
io_uring_sqe_set_data(sqe, fe); fe->reader_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->reader_ctx);
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
} }
@ -267,7 +287,8 @@ void event_uring_splice(EVENT *ev, FILE_EVENT *fe, int fd_in, loff_t off_in,
io_uring_prep_splice(sqe, fd_in, off_in, fd_out, off_out, io_uring_prep_splice(sqe, fd_in, off_in, fd_out, off_out,
len, splice_flags); len, splice_flags);
io_uring_sqe_set_data(sqe, fe); fe->reader_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->reader_ctx);
sqe->flags |= sqe_flags; sqe->flags |= sqe_flags;
sqe->opcode = opcode; sqe->opcode = opcode;
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
@ -284,7 +305,8 @@ void event_uring_sendfile(EVENT *ev, FILE_EVENT *fe, int out, int in,
unsigned flags = SPLICE_F_MOVE | SPLICE_F_MORE; // | SPLICE_F_NONBLOCK; unsigned flags = SPLICE_F_MOVE | SPLICE_F_MORE; // | SPLICE_F_NONBLOCK;
io_uring_prep_splice(sqe, in, off, fe->var.pipefd[1], -1, cnt, flags); io_uring_prep_splice(sqe, in, off, fe->var.pipefd[1], -1, cnt, flags);
io_uring_sqe_set_data(sqe, fe); fe->reader_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->reader_ctx);
sqe->flags |= IOSQE_IO_LINK | SPLICE_F_FD_IN_FIXED | IOSQE_ASYNC; sqe->flags |= IOSQE_IO_LINK | SPLICE_F_FD_IN_FIXED | IOSQE_ASYNC;
sqe->opcode = IORING_OP_SPLICE; sqe->opcode = IORING_OP_SPLICE;
@ -293,7 +315,8 @@ void event_uring_sendfile(EVENT *ev, FILE_EVENT *fe, int out, int in,
flags = 0; flags = 0;
sqe = io_uring_get_sqe(&ep->ring); sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_splice(sqe, fe->var.pipefd[0], -1, out, -1, cnt, flags); io_uring_prep_splice(sqe, fe->var.pipefd[0], -1, out, -1, cnt, flags);
io_uring_sqe_set_data(sqe, fe); fe->reader_ctx.fe = fe;
io_uring_sqe_set_data(sqe, &fe->reader_ctx);
sqe->opcode = IORING_OP_SPLICE; sqe->opcode = IORING_OP_SPLICE;
TRY_SUBMMIT(ep); TRY_SUBMMIT(ep);
@ -324,7 +347,7 @@ static int event_uring_del_write(EVENT_URING *ep UNUSED, FILE_EVENT *fe)
static void handle_read(EVENT *ev, FILE_EVENT *fe, int res) static void handle_read(EVENT *ev, FILE_EVENT *fe, int res)
{ {
fe->res = res; fe->reader_ctx.res = res;
if (LIKELY(!(fe->mask & (EVENT_ACCEPT | EVENT_POLLIN)))) { if (LIKELY(!(fe->mask & (EVENT_ACCEPT | EVENT_POLLIN)))) {
if ((fe->type & TYPE_FILE) && res > 0) { if ((fe->type & TYPE_FILE) && res > 0) {
@ -335,7 +358,7 @@ static void handle_read(EVENT *ev, FILE_EVENT *fe, int res)
} }
} }
} else if (fe->mask & EVENT_ACCEPT) { } else if (fe->mask & EVENT_ACCEPT) {
// fe->res = res; // Do nothing!
} else if (fe->mask & EVENT_POLLIN) { } else if (fe->mask & EVENT_POLLIN) {
if (res & (POLLIN | ERR)) { if (res & (POLLIN | ERR)) {
if (res & POLLERR) { if (res & POLLERR) {
@ -363,7 +386,7 @@ static void handle_read(EVENT *ev, FILE_EVENT *fe, int res)
static void handle_write(EVENT *ev, FILE_EVENT *fe, int res) static void handle_write(EVENT *ev, FILE_EVENT *fe, int res)
{ {
fe->res = res; fe->writer_ctx.res = res;
if (LIKELY(!(fe->mask & (EVENT_CONNECT | EVENT_POLLOUT)))) { if (LIKELY(!(fe->mask & (EVENT_CONNECT | EVENT_POLLOUT)))) {
if ((fe->type & TYPE_FILE) && res > 0) { if ((fe->type & TYPE_FILE) && res > 0) {
@ -374,7 +397,7 @@ static void handle_write(EVENT *ev, FILE_EVENT *fe, int res)
} }
} }
} else if (fe->mask & EVENT_CONNECT) { } else if (fe->mask & EVENT_CONNECT) {
//fe->res = res; // Do nothing!
} else if (fe->mask & EVENT_POLLOUT) { } else if (fe->mask & EVENT_POLLOUT) {
if (res & (POLLOUT | ERR)) { if (res & (POLLOUT | ERR)) {
if (res & POLLERR) { if (res & POLLERR) {
@ -402,12 +425,21 @@ static void handle_write(EVENT *ev, FILE_EVENT *fe, int res)
} }
} }
static void handle_one(EVENT *ev, FILE_EVENT *fe, int res) static void handle_one(EVENT *ev, IO_URING_CTX *ctx, int res)
{ {
if ((fe->mask & EVENT_READ) && fe->r_proc) { FILE_EVENT *fe = ctx->fe;
if (ctx == &fe->reader_ctx && ctx->mask == EVENT_READ && fe->r_proc) {
fe->mask &= ~EVENT_READ;
handle_read(ev, fe, res); handle_read(ev, fe, res);
ctx->mask = 0;
return; return;
} else if ((fe->mask & EVENT_WRITE) && fe->w_proc) { }
if (ctx == &fe->writer_ctx && ctx->mask == EVENT_WRITE && fe->w_proc) {
fe->writer_ctx.cnt--;
fe->mask &= ~EVENT_WRITE;
ctx->mask = 0;
handle_write(ev, fe, res); handle_write(ev, fe, res);
return; return;
} }
@ -424,7 +456,7 @@ static void handle_one(EVENT *ev, FILE_EVENT *fe, int res)
| EVENT_DIR_MKDIRAT \ | EVENT_DIR_MKDIRAT \
| EVENT_SPLICE) | EVENT_SPLICE)
fe->res = res; fe->reader_ctx.res = res;
if (fe->mask & FLAGS) { if (fe->mask & FLAGS) {
fe->r_proc(ev, fe); fe->r_proc(ev, fe);
@ -436,31 +468,27 @@ static void handle_one(EVENT *ev, FILE_EVENT *fe, int res)
static int peek_more(EVENT_URING *ep) static int peek_more(EVENT_URING *ep)
{ {
#define PEEK_FOREACH
//#define PEEK_BATCH
#if defined(PEEK_FOREACH)
struct io_uring_cqe *cqe; struct io_uring_cqe *cqe;
unsigned head, count = 0; unsigned head, count = 0;
FILE_EVENT *fe; IO_URING_CTX *ctx;
int ret; int ret;
io_uring_for_each_cqe(&ep->ring, head, cqe) { io_uring_for_each_cqe(&ep->ring, head, cqe) {
count++; count++;
fe = (FILE_EVENT*) io_uring_cqe_get_data(cqe); ctx = (IO_URING_CTX*) io_uring_cqe_get_data(cqe);
ret = cqe->res; ret = cqe->res;
//io_uring_cqe_seen(&ep->ring, cqe);
if (ret == -ENOBUFS) { if (ret == -ENOBUFS) {
msg_error("%s(%d): ENOBUFS error", __FUNCTION__, __LINE__); msg_error("%s(%d): ENOBUFS error", __FUNCTION__, __LINE__);
return -1; return -1;
} }
if (ret == -ETIME || ret == -ECANCELED || fe == NULL) { if (ret == -ETIME || ret == -ECANCELED || ctx == NULL) {
continue; continue;
} }
handle_one((EVENT*) ep, fe, ret); handle_one((EVENT*) ep, ctx, ret);
} }
if (count > 0) { if (count > 0) {
@ -468,75 +496,13 @@ static int peek_more(EVENT_URING *ep)
} }
return count; return count;
#elif defined(PEEK_BATCH)
#define PEEK_MAX 100
struct io_uring_cqe *cqes[PEEK_MAX + 1];
FILE_EVENT *fe;
unsigned n, i;
int ret, cnt = 0;
while ((n = io_uring_peek_batch_cqe(&ep->ring, cqes, PEEK_MAX)) > 0) {
for (i = 0; i < n; i++) {
ret = cqes[i]->res;
fe = (FILE_EVENT*) io_uring_cqe_get_data(cqes[i]);
if (ret == -ENOBUFS) {
msg_error("%s(%d): ENOBUFS error",
__FUNCTION__, __LINE__);
return -1;
}
if (ret == -ETIME || ret == -ECANCELED || fe == NULL) {
continue;
}
handle_one((EVENT*) ep, fe, ret);
}
io_uring_cq_advance(&ep->ring, n);
cnt += n;
}
return cnt;
#else
int cnt = 0, ret;
struct io_uring_cqe *cqe;
FILE_EVENT *fe;
while (1) {
ret = io_uring_peek_cqe(&ep->ring, &cqe);
if (ret == -EAGAIN) {
break;
}
ret = cqe->res;
fe = (FILE_EVENT*) io_uring_cqe_get_data(cqe);
io_uring_cqe_seen(&ep->ring, cqe);
if (ret == -ETIME || ret == -ECANCELED || fe == NULL) {
continue;
} else if (ret < 0) {
return -1;
}
handle_one((EVENT*) ep, fe, ret);
cnt++;
}
return cnt;
#endif
} }
static int submit_and_wait(EVENT_URING *ep, int timeout) static int submit_and_wait(EVENT_URING *ep, int timeout)
{ {
struct __kernel_timespec ts, *tp; struct __kernel_timespec ts, *tp;
struct io_uring_cqe *cqe; struct io_uring_cqe *cqe;
FILE_EVENT *fe; IO_URING_CTX *ctx;
int ret; int ret;
if (timeout >= 0) { if (timeout >= 0) {
@ -551,7 +517,8 @@ static int submit_and_wait(EVENT_URING *ep, int timeout)
if (ep->appending > 0) { if (ep->appending > 0) {
ep->appending = 0; \ ep->appending = 0; \
ret = io_uring_submit_and_wait_timeout(&ep->ring, &cqe, 1, tp, NULL); ret = io_uring_submit_and_wait_timeout(&ep->ring, &cqe,
1, tp, NULL);
} else { } else {
ret = io_uring_wait_cqes(&ep->ring, &cqe, 1, tp, NULL); ret = io_uring_wait_cqes(&ep->ring, &cqe, 1, tp, NULL);
} }
@ -569,7 +536,7 @@ static int submit_and_wait(EVENT_URING *ep, int timeout)
} }
ret = cqe->res; ret = cqe->res;
fe = (FILE_EVENT*) io_uring_cqe_get_data(cqe); ctx = (IO_URING_CTX*) io_uring_cqe_get_data(cqe);
io_uring_cqe_seen(&ep->ring, cqe); io_uring_cqe_seen(&ep->ring, cqe);
@ -578,11 +545,11 @@ static int submit_and_wait(EVENT_URING *ep, int timeout)
return -1; return -1;
} }
if (ret == -ETIME || ret == -ECANCELED || fe == NULL) { if (ret == -ETIME || ret == -ECANCELED || ctx == NULL) {
return 1; return 1;
} }
handle_one((EVENT*) ep, fe, ret); handle_one((EVENT*) ep, ctx, ret);
return 1; return 1;
} }
@ -591,6 +558,7 @@ static int event_uring_wait(EVENT *ev, int timeout)
EVENT_URING *ep = (EVENT_URING*) ev; EVENT_URING *ep = (EVENT_URING*) ev;
int ret, count = 0; int ret, count = 0;
ep->loop_count++;
ret = submit_and_wait(ep, timeout); ret = submit_and_wait(ep, timeout);
if (ret == 0) { if (ret == 0) {
return 0; return 0;

View File

@ -21,6 +21,8 @@
#include "common/iostuff.h" #include "common/iostuff.h"
#include "fiber.h" #include "fiber.h"
//#undef HAS_EVENTFD
void fbase_event_open(FIBER_BASE *fbase) void fbase_event_open(FIBER_BASE *fbase)
{ {
#if defined(HAS_EVENTFD) #if defined(HAS_EVENTFD)
@ -55,8 +57,10 @@ void fbase_event_open(FIBER_BASE *fbase)
__FILE__, __LINE__, __FUNCTION__, (int) fbase->event_in); __FILE__, __LINE__, __FUNCTION__, (int) fbase->event_in);
} }
non_blocking(fbase->event_in, 1); //if (!EVENT_IS_IO_URING(fiber_io_event())) {
non_blocking(fbase->event_out, 1); non_blocking(fbase->event_in, 1);
non_blocking(fbase->event_out, 1);
//}
} }
void fbase_event_close(FIBER_BASE *fbase) void fbase_event_close(FIBER_BASE *fbase)
@ -76,20 +80,27 @@ int fbase_event_wait(FIBER_BASE *fbase)
{ {
long long n; long long n;
int ret, interrupt = 0, err; int ret, interrupt = 0, err;
FILE_EVENT *fe;
if (fbase->event_in == INVALID_SOCKET) { if (fbase->event_in == INVALID_SOCKET) {
msg_fatal("%s(%d), %s: invalid event_in=%d", msg_fatal("%s(%d), %s: invalid event_in=%d",
__FILE__, __LINE__, __FUNCTION__, (int) fbase->event_in); __FILE__, __LINE__, __FUNCTION__, (int) fbase->event_in);
} }
fe = fiber_file_open_read(fbase->event_in);
fe->type = TYPE_SPIPE | TYPE_EVENTABLE;
while (1) { while (1) {
//if (acl_fiber_scheduled() && read_wait(fbase->event_in, -1) == -1) { //if (acl_fiber_scheduled() && read_wait(fbase->event_in, -1) == -1) {
#if 1
if (read_wait(fbase->event_in, -1) == -1) { if (read_wait(fbase->event_in, -1) == -1) {
msg_error("%s(%d), %s: read_wait error=%s, fd=%d", msg_error("%s(%d), %s: read_wait error=%s, fd=%d",
__FILE__, __LINE__, __FUNCTION__, __FILE__, __LINE__, __FUNCTION__,
last_serror(), fbase->event_in); last_serror(), fbase->event_in);
return -1; return -1;
} }
#endif
#ifdef SYS_WIN #ifdef SYS_WIN
ret = (int) acl_fiber_recv(fbase->event_in, (char*) &n, sizeof(n), 0); ret = (int) acl_fiber_recv(fbase->event_in, (char*) &n, sizeof(n), 0);
#else #else
@ -141,6 +152,7 @@ int fbase_event_wakeup(FIBER_BASE *fbase)
{ {
long long n = 1; long long n = 1;
int ret, interrupt = 0; int ret, interrupt = 0;
FILE_EVENT *fe;
/** /**
* if (LIKELY(atomic_int64_cas(fbase->atomic, 0, 1) != 0)) { * if (LIKELY(atomic_int64_cas(fbase->atomic, 0, 1) != 0)) {
@ -154,6 +166,9 @@ int fbase_event_wakeup(FIBER_BASE *fbase)
fbase, (int) fbase->event_out); fbase, (int) fbase->event_out);
} }
fe = fiber_file_open_write(fbase->event_out);
fe->type = TYPE_SPIPE | TYPE_EVENTABLE;
while (1) { while (1) {
#ifdef SYS_WIN #ifdef SYS_WIN
ret = (int) acl_fiber_send(fbase->event_out, (char*) &n, sizeof(n), 0); ret = (int) acl_fiber_send(fbase->event_out, (char*) &n, sizeof(n), 0);
@ -166,9 +181,11 @@ int fbase_event_wakeup(FIBER_BASE *fbase)
} }
if (ret >= 0) { if (ret >= 0) {
msg_fatal("%s(%d), %s: write ret=%d invalid length, " msg_fatal("%s(%d), %s: fiber=%d, write ret=%d invalid"
"interrupt=%d", __FILE__, __LINE__, " length, interrupt=%d, fd=%d",
__FUNCTION__, ret, interrupt); __FILE__, __LINE__, __FUNCTION__,
acl_fiber_self(), ret, interrupt,
(int) fbase->event_out);
} }
if (acl_fiber_last_error() == EINTR) { if (acl_fiber_last_error() == EINTR) {

View File

@ -26,6 +26,8 @@ void file_event_init(FILE_EVENT *fe, socket_t fd)
memset(&fe->in, 0, sizeof(fe->in)); memset(&fe->in, 0, sizeof(fe->in));
memset(&fe->out, 0, sizeof(fe->out)); memset(&fe->out, 0, sizeof(fe->out));
memset(&fe->var, 0, sizeof(fe->var)); memset(&fe->var, 0, sizeof(fe->var));
memset(&fe->reader_ctx, 0, sizeof(fe->reader_ctx));
memset(&fe->writer_ctx, 0, sizeof(fe->writer_ctx));
fe->r_timeout = -1; fe->r_timeout = -1;
fe->w_timeout = -1; fe->w_timeout = -1;
#endif #endif
@ -56,6 +58,7 @@ FILE_EVENT *file_event_alloc(socket_t fd)
static void file_event_free(FILE_EVENT *fe) static void file_event_free(FILE_EVENT *fe)
{ {
memset(fe, 0, sizeof(*fe));
mem_free(fe); mem_free(fe);
} }

View File

@ -66,10 +66,10 @@ int file_close(EVENT *ev, FILE_EVENT *fe)
fe->mask &= ~EVENT_FILE_CLOSE; fe->mask &= ~EVENT_FILE_CLOSE;
if (fe->res == 0) { if (fe->reader_ctx.res == 0) {
return 0; return 0;
} else { } else {
acl_fiber_set_error(-fe->res); acl_fiber_set_error(-fe->reader_ctx.res);
return -1; return -1;
} }
} }
@ -109,14 +109,14 @@ int openat(int dirfd, const char *pathname, int flags, ...)
free(fe->var.path); free(fe->var.path);
fe->var.path = NULL; fe->var.path = NULL;
if (fe->res >= 0) { if (fe->reader_ctx.res >= 0) {
fe->fd = fe->res; fe->fd = fe->reader_ctx.res;
fe->type = TYPE_FILE | TYPE_EVENTABLE; fe->type = TYPE_FILE | TYPE_EVENTABLE;
fiber_file_set(fe); // Save the fe for the future using. fiber_file_set(fe); // Save the fe for the future using.
return fe->fd; return fe->fd;
} }
acl_fiber_set_error(-fe->res); acl_fiber_set_error(-fe->reader_ctx.res);
file_event_unrefer(fe); file_event_unrefer(fe);
return -1; return -1;
} }
@ -162,11 +162,11 @@ int unlink(const char *pathname)
free(fe->var.path); free(fe->var.path);
fe->var.path = NULL; fe->var.path = NULL;
if (fe->res == 0) { if (fe->reader_ctx.res == 0) {
file_event_unrefer(fe); file_event_unrefer(fe);
return 0; return 0;
} else { } else {
acl_fiber_set_error(-fe->res); acl_fiber_set_error(-fe->reader_ctx.res);
file_event_unrefer(fe); file_event_unrefer(fe);
return -1; return -1;
} }
@ -204,11 +204,11 @@ int renameat2(int olddirfd, const char *oldpath,
free(fe->in.read_ctx.buf); free(fe->in.read_ctx.buf);
free(fe->var.path); free(fe->var.path);
if (fe->res == 0) { if (fe->reader_ctx.res == 0) {
file_event_unrefer(fe); file_event_unrefer(fe);
return 0; return 0;
} else { } else {
acl_fiber_set_error(-fe->res); acl_fiber_set_error(-fe->reader_ctx.res);
file_event_unrefer(fe); file_event_unrefer(fe);
return -1; return -1;
} }
@ -257,13 +257,13 @@ int statx(int dirfd, const char *pathname, int flags, unsigned int mask,
free(fe->in.read_ctx.buf); free(fe->in.read_ctx.buf);
fe->in.read_ctx.buf = NULL; fe->in.read_ctx.buf = NULL;
if (fe->res == 0) { if (fe->reader_ctx.res == 0) {
memcpy(statxbuf, fe->var.statxbuf, sizeof(struct statx)); memcpy(statxbuf, fe->var.statxbuf, sizeof(struct statx));
free(fe->var.statxbuf); free(fe->var.statxbuf);
file_event_unrefer(fe); file_event_unrefer(fe);
return 0; return 0;
} else { } else {
acl_fiber_set_error(-fe->res); acl_fiber_set_error(-fe->reader_ctx.res);
free(fe->var.statxbuf); free(fe->var.statxbuf);
file_event_unrefer(fe); file_event_unrefer(fe);
return -1; return -1;
@ -324,11 +324,11 @@ int mkdirat(int dirfd, const char *pathname, mode_t mode)
fe->mask &= ~EVENT_DIR_MKDIRAT; fe->mask &= ~EVENT_DIR_MKDIRAT;
free(fe->var.path); free(fe->var.path);
if (fe->res == 0) { if (fe->reader_ctx.res == 0) {
file_event_unrefer(fe); file_event_unrefer(fe);
return 0; return 0;
} else { } else {
acl_fiber_set_error(-fe->res); acl_fiber_set_error(-fe->reader_ctx.res);
file_event_unrefer(fe); file_event_unrefer(fe);
return -1; return -1;
} }
@ -443,21 +443,21 @@ ssize_t splice(int fd_in, loff_t *poff_in, int fd_out,
fe->mask &= ~EVENT_SPLICE; fe->mask &= ~EVENT_SPLICE;
if (fe->res < 0) { if (fe->reader_ctx.res < 0) {
acl_fiber_set_error(-fe->res); acl_fiber_set_error(-fe->reader_ctx.res);
file_event_unrefer(fe); file_event_unrefer(fe);
return -1; return -1;
} }
if (off_in != -1 && poff_in) { if (off_in != -1 && poff_in) {
*poff_in += fe->res; *poff_in += fe->reader_ctx.res;
} }
if (off_out != -1 && poff_out) { if (off_out != -1 && poff_out) {
*poff_out += fe->res; *poff_out += fe->reader_ctx.res;
} }
ret = fe->res; ret = fe->reader_ctx.res;
file_event_unrefer(fe); file_event_unrefer(fe);
return ret; return ret;
} }

View File

@ -127,7 +127,7 @@ static int iocp_wait_read(FILE_EVENT *fe)
// Must clear the EVENT_READ flags in order to set IO event // Must clear the EVENT_READ flags in order to set IO event
// for each IO process. // for each IO process.
fe->mask &= ~EVENT_READ; fe->mask &= ~EVENT_READ;
fe->res = 0; fe->reader_ctx.res = 0;
if (fiber_wait_read(fe) < 0) { if (fiber_wait_read(fe) < 0) {
return -1; return -1;
@ -144,8 +144,8 @@ static int iocp_wait_read(FILE_EVENT *fe)
return -1; return -1;
} }
if (fe->res >= 0) { if (fe->reader_ctx.res >= 0) {
return fe->res; return fe->reader_ctx.res;
} }
err = acl_fiber_last_error(); err = acl_fiber_last_error();
@ -168,14 +168,14 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
* which maybe used in iocp_add_read() and set for polling read status. * which maybe used in iocp_add_read() and set for polling read status.
*/ */
if (fe->sock_type == SOCK_DGRAM && fe->rbuf == fe->packet if (fe->sock_type == SOCK_DGRAM && fe->rbuf == fe->packet
&& fe->res > 0) { && fe->reader_ctx.res > 0) {
if (fe->res < len) { if (fe->reader_ctx.res < len) {
len = fe->res; len = fe->reader_ctx.res;
} }
memcpy(buf, fe->packet, len); memcpy(buf, fe->packet, len);
fe->rbuf = NULL; fe->rbuf = NULL;
fe->res = 0; fe->reader_ctx.res = 0;
return len; return len;
} }
@ -530,14 +530,14 @@ static int iocp_wait_write(FILE_EVENT *fe)
int err; int err;
fe->mask &= ~EVENT_WRITE; fe->mask &= ~EVENT_WRITE;
fe->res = 0; fe->writer_ctx.res = -1;
if (wait_write(fe) == -1) { if (wait_write(fe) == -1) {
return -1; return -1;
} }
if (fe->res >= 0) { if (fe->writer_ctx.res >= 0) {
return fe->res; return fe->writer_ctx.res;
} }
err = acl_fiber_last_error(); err = acl_fiber_last_error();

View File

@ -87,14 +87,14 @@ static socket_t fiber_iocp_accept(FILE_EVENT *fe)
{ {
fe->mask &= ~EVENT_READ; fe->mask &= ~EVENT_READ;
fe->mask |= EVENT_ACCEPT; fe->mask |= EVENT_ACCEPT;
fe->res = INVALID_SOCKET; fe->reader_ctx.res = INVALID_SOCKET;
if (fiber_wait_read(fe) < 0) { if (fiber_wait_read(fe) < 0) {
msg_error("%s(%d): fiber_wait_read error=%s, fd=%d", msg_error("%s(%d): fiber_wait_read error=%s, fd=%d",
__FUNCTION__, __LINE__, last_serror(), (int) fe->fd); __FUNCTION__, __LINE__, last_serror(), (int) fe->fd);
return INVALID_SOCKET; return INVALID_SOCKET;
} }
return fe->res; return fe->reader_ctx.res;
} }
#endif #endif
@ -292,8 +292,8 @@ static socket_t fiber_iocp_connect(FILE_EVENT *fe)
} }
fe->mask &= ~EVENT_CONNECT; fe->mask &= ~EVENT_CONNECT;
if (fe->res < 0) { if (fe->writer_ctx.res < 0) {
acl_fiber_set_error(-fe->res); acl_fiber_set_error(-fe->writer_ctx.res);
return -1; return -1;
} }
return 0; return 0;