optimize and test io_uring module in fiber

This commit is contained in:
zhengshuxin 2022-10-10 02:25:10 -04:00
parent b3c2ddaac8
commit b9f1d3f677
9 changed files with 109 additions and 84 deletions

View File

@ -158,20 +158,21 @@ struct FILE_EVENT {
#ifdef HAS_IO_URING
char *rbuf;
size_t rsize;
unsigned rsize;
int rlen;
__u64 off;
const char *wbuf;
size_t wsize;
unsigned wsize;
int wlen;
socket_t iocp_sock;
union {
struct sockaddr_in peer_addr;
struct statx statxbuf;
struct {
struct sockaddr_in addr;
socklen_t len;
} peer;
struct statx *statxbuf;
char *path;
int pipefd[2];
} var;
socklen_t addr_len;
struct __kernel_timespec rts;
struct __kernel_timespec wts;
int r_timeout;

View File

@ -64,10 +64,10 @@ static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe)
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
assert(sqe);
fe->addr_len = (socklen_t) sizeof(fe->var.peer_addr);
fe->var.peer.len = (socklen_t) sizeof(fe->var.peer.addr);
io_uring_prep_accept(sqe, fe->fd,
(struct sockaddr*) &fe->var.peer_addr,
(socklen_t*) &fe->addr_len, 0);
(struct sockaddr*) &fe->var.peer.addr,
(socklen_t*) &fe->var.peer.len, 0);
io_uring_sqe_set_data(sqe, fe);
TRY_SUBMMIT(ep);
@ -116,8 +116,8 @@ static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe)
non_blocking(fe->fd, 1);
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_connect(sqe, fe->fd,
(struct sockaddr*) &fe->var.peer_addr,
(socklen_t) fe->addr_len);
(struct sockaddr*) &fe->var.peer.addr,
(socklen_t) fe->var.peer.len);
io_uring_sqe_set_data(sqe, fe);
TRY_SUBMMIT(ep);
@ -217,11 +217,12 @@ void event_uring_sendfile(EVENT *ev, FILE_EVENT *fe, int out, int in,
{
EVENT_URING *ep = (EVENT_URING*) ev;
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
unsigned flags = SPLICE_F_MOVE |SPLICE_F_MORE;
unsigned flags = SPLICE_F_MOVE | SPLICE_F_MORE; // | SPLICE_F_NONBLOCK;
printf("hello>>>in=%d, off=%d\n", in, (int) off);
printf("hello>>>in=%d, off=%dfe=%p\n", in, (int) off, fe);
io_uring_prep_splice(sqe, in, off, fe->var.pipefd[1], -1, cnt, flags);
io_uring_sqe_set_data(sqe, fe);
sqe->flags = IOSQE_IO_LINK;
sqe->opcode = IORING_OP_SPLICE;
TRY_SUBMMIT(ep);
@ -258,8 +259,11 @@ static int event_uring_del_write(EVENT_URING *ep UNUSED, FILE_EVENT *fe)
static void handle_read(EVENT *ev, FILE_EVENT *fe, int res)
{
printf("%s: file size=%zd, s=%zd statusx=%zd\n", __FUNCTION__,
sizeof(FILE_EVENT), sizeof(struct sockaddr_in),
sizeof(struct statx));
if (fe->mask & EVENT_ACCEPT) {
fe->iocp_sock = res;
fe->rlen = res;
} else if (fe->mask & EVENT_POLLIN) {
if (res & (POLLIN | ERR)) {
if (res & POLLERR) {
@ -292,7 +296,7 @@ static void handle_read(EVENT *ev, FILE_EVENT *fe, int res)
static void handle_write(EVENT *ev, FILE_EVENT *fe, int res)
{
if (fe->mask & EVENT_CONNECT) {
fe->iocp_sock = res;
fe->rlen = res;
} else if (fe->mask & EVENT_POLLOUT) {
if (res & (POLLOUT | ERR)) {
if (res & POLLERR) {

View File

@ -224,7 +224,7 @@ static void fiber_io_loop(ACL_FIBER *self fiber_unused, void *ctx)
if (timer) {
continue;
}
msg_info("%s(%d), tid=%lu: fdcount=0, waiter=%u, events=%d",
__FUNCTION__, __LINE__, __pthread_self(),
ev->waiter, ring_size(&ev->events));
@ -550,48 +550,52 @@ FILE_EVENT *fiber_file_open_write(socket_t fd)
return fe;
}
static int fiber_file_del(FILE_EVENT *fe)
static int fiber_file_del(FILE_EVENT *fe, socket_t fd)
{
#ifdef SYS_WIN
char key[64];
if (fe->fd == INVALID_SOCKET || fe->fd >= (socket_t) var_maxfd) {
msg_error("%s(%d): invalid fd=%d",
__FUNCTION__, __LINE__, fe->fd);
if (fd == INVALID_SOCKET || fd >= (socket_t) var_maxfd) {
msg_error("%s(%d): invalid fd=%d", __FUNCTION__, __LINE__, fd);
return -1;
}
//_snprintf(key, sizeof(key), "%u", fe->fd);
_i64toa(fe->fd, key, 10);
_i64toa(fd, key, 10);
htable_delete(__thread_fiber->events, key, NULL);
return 0;
#else
if (fe->fd == INVALID_SOCKET || fe->fd >= var_maxfd) {
msg_error("%s(%d): invalid fd=%d",
__FUNCTION__, __LINE__, fe->fd);
if (fd == INVALID_SOCKET || fd >= var_maxfd) {
msg_error("%s(%d): invalid fd=%d", __FUNCTION__, __LINE__, fd);
return -1;
}
if (__thread_fiber->events[fe->fd] != fe) {
if (__thread_fiber->events[fd] != fe) {
msg_error("%s(%d): invalid fe=%p, fd=%d, origin=%p",
__FUNCTION__, __LINE__, fe, fe->fd,
__thread_fiber->events[fe->fd]);
__FUNCTION__, __LINE__, fe, fd, __thread_fiber->events[fd]);
return -1;
}
__thread_fiber->events[fe->fd] = NULL;
__thread_fiber->events[fd] = NULL;
return 0;
#endif
}
void fiber_file_free(FILE_EVENT *fe)
{
if (fiber_file_del(fe) == 0) {
socket_t fd = fe->fd;
// We must set fd INVALID_SOCKET to stop any using the old fd,
// fe will be freed only when the reference of it is 0.
fe->fd = INVALID_SOCKET;
if (fiber_file_del(fe, fd) == 0) {
file_event_unrefer(fe);
} else {
// xxx: What happened?
msg_error("Some error happened for fe=%p, fd=%d", fe, fe->fd);
msg_error("%s(%d): some error happened for fe=%p, fd=%d",
__FUNCTION__, __LINE__, fe, fe->fd);
}
}

View File

@ -28,10 +28,9 @@ void file_event_init(FILE_EVENT *fe, socket_t fd)
fe->off = 0;
fe->wbuf = 0;
fe->wsize = 0;
fe->iocp_sock = INVALID_SOCKET;
fe->addr_len = 0;
fe->r_timeout = -1;
fe->w_timeout = -1;
memset(&fe->var, 0, sizeof(fe->var));
#endif
#ifdef HAS_IOCP

View File

@ -63,11 +63,9 @@ int file_close(EVENT *ev, FILE_EVENT *fe)
fe->mask &= ~EVENT_FILE_CLOSE;
if (fe->rlen == 0) {
file_event_unrefer(fe);
return 0;
} else {
acl_fiber_set_error(fe->rlen);
file_event_unrefer(fe);
return -1;
}
}
@ -241,10 +239,11 @@ int statx(int dirfd, const char *pathname, int flags, unsigned int mask,
FILE_ALLOC(fe, EVENT_FILE_STATX);
fe->rbuf = strdup(pathname);
memcpy(&fe->var.statxbuf, statxbuf, sizeof(struct statx));
fe->var.statxbuf = (struct statx*) malloc(sizeof(struct statx));
memcpy(fe->var.statxbuf, statxbuf, sizeof(struct statx));
event_uring_file_statx(ev, fe, dirfd, fe->rbuf, flags, mask,
&fe->var.statxbuf);
fe->var.statxbuf);
fiber_io_inc();
acl_fiber_switch();
@ -255,11 +254,13 @@ int statx(int dirfd, const char *pathname, int flags, unsigned int mask,
fe->rbuf = NULL;
if (fe->rlen == 0) {
memcpy(statxbuf, fe->var.statxbuf, sizeof(struct statx));
free(fe->var.statxbuf);
file_event_unrefer(fe);
memcpy(statxbuf, &fe->var.statxbuf, sizeof(struct statx));
return 0;
} else {
acl_fiber_set_error(fe->rlen);
free(fe->var.statxbuf);
file_event_unrefer(fe);
return -1;
}
@ -420,7 +421,10 @@ ssize_t file_sendfile(socket_t out_fd, int in_fd, off64_t *off, size_t cnt)
close(fe->var.pipefd[0]);
close(fe->var.pipefd[1]);
if (ret < 0) {
if (ret == 0) {
file_event_unrefer(fe);
return 0;
} else if (ret < 0) {
acl_fiber_set_error(-ret);
file_event_unrefer(fe);
return -1;

View File

@ -102,9 +102,6 @@ int WINAPI acl_fiber_close(socket_t fd)
ret = (*sys_close)(fd);
}
// We must set fd INVALID_SOCKET to stop any using the old fd,
// fe will be freed only when the reference of it is 0.
fe->fd = INVALID_SOCKET;
fiber_file_free(fe);
if (ret != 0) {

View File

@ -87,14 +87,14 @@ static socket_t fiber_iocp_accept(FILE_EVENT *fe)
{
fe->mask &= ~EVENT_READ;
fe->mask |= EVENT_ACCEPT;
fe->iocp_sock = INVALID_SOCKET;
fe->rlen = INVALID_SOCKET;
if (fiber_wait_read(fe) < 0) {
msg_error("%s(%d): fiber_wait_read error=%s, fd=%d",
__FUNCTION__, __LINE__, last_serror(), (int) fe->fd);
return INVALID_SOCKET;
}
return fe->iocp_sock;
return fe->rlen;
}
#endif
@ -291,8 +291,8 @@ static socket_t fiber_iocp_connect(FILE_EVENT *fe)
}
fe->mask &= ~EVENT_CONNECT;
if (fe->iocp_sock < 0) {
acl_fiber_set_error(-fe->iocp_sock);
if (fe->rlen < 0) {
acl_fiber_set_error(-fe->rlen);
return -1;
}
return 0;
@ -326,8 +326,8 @@ int WINAPI acl_fiber_connect(socket_t sockfd, const struct sockaddr *addr,
SET_CONNECTING(fe);
#if defined(HAS_IOCP) || defined(HAS_IO_URING)
memcpy(&fe->var.peer_addr, addr, addrlen);
fe->addr_len = addrlen;
memcpy(&fe->var.peer.addr, addr, addrlen);
fe->var.peer.len = addrlen;
#endif
// The socket must be set to in no blocking status to avoid to be

View File

@ -71,6 +71,7 @@ static void fiber_writefile(ACL_FIBER *fiber acl_unused, void *ctx)
}
(void) write(fd, "\r\n", 2);
printf("write over!\r\n");
n = close(fd);
printf("close %s %s, fd=%d\r\n", path, n == 0 ? "ok" : "error", fd);
}
@ -135,7 +136,9 @@ static void fiber_sendfile(ACL_FIBER *fiber acl_unused, void *ctx)
}
cfd = accept_one();
ret = sendfile64(cfd, in, &fc->off, 100);
printf(">>>begin call sendfile64\r\n");
ret = sendfile64(cfd, in, &fc->off, fc->len);
printf(">>>sendfile ret=%zd, off=%d\r\n", ret, (int) fc->off);
close(in);
@ -144,13 +147,12 @@ static void fiber_sendfile(ACL_FIBER *fiber acl_unused, void *ctx)
static void fiber_splice(ACL_FIBER *fiber acl_unused, void *ctx)
{
const char *path = (const char*) ctx;
int in = open(path, O_RDONLY, 0600), out;
loff_t off_in = 0;
struct FIBER_CTX *fc = (struct FIBER_CTX*) ctx;
int in = open(fc->frompath, O_RDONLY, 0600), out;
int pipefd[2];
if (in == -1) {
printf("open %s error %s\r\n", path, strerror(errno));
printf("open %s error %s\r\n", fc->frompath, strerror(errno));
return;
}
@ -162,22 +164,22 @@ static void fiber_splice(ACL_FIBER *fiber acl_unused, void *ctx)
while (1) {
char buf[1024];
int flags = SPLICE_F_MOVE |SPLICE_F_MORE;
int ret = splice(in, &off_in, out, NULL, 100, flags);
int flags = SPLICE_F_MOVE | SPLICE_F_MORE;
int ret = splice(in, &fc->off, out, NULL, fc->len, flags);
if (ret <= 0) {
printf("splice over: %s\r\n", strerror(errno));
printf("splice over, ret=%d: %s\r\n", ret, strerror(errno));
break;
}
ret = read(pipefd[0], buf, sizeof(buf) - 1);
if (ret <= 0) {
printf("read from pipe over: %s\r\n", strerror(errno));
printf("pipe over, ret=%d: %s\r\n", ret, strerror(errno));
break;
}
buf[ret] = 0;
printf("%s", buf);
printf("off: %ld, %s", fc->off, buf);
fflush(stdout);
}
@ -244,42 +246,30 @@ static void usage(const char *proc)
" -f filepath\r\n"
" -t tofilepath\r\n"
" -a action[read|write|rename|unlink|stat|mkdir|splice|pread|pwrite|sendfile]\r\n"
" -n write_size[default: 1024]\r\n"
" -n size[default: 1024]\r\n"
" -o open_flags[O_RDONLY, O_WRONLY, O_RDWR, O_APPEND, O_CREAT, O_EXCL, O_TRUNC]\r\n"
" -p offset\r\n"
, proc);
}
static int check_ctx(const char *action, const struct FIBER_CTX *ctx)
{
if (ctx->frompath == NULL) {
printf("%s: frompath NULL\r\n", action);
return -1;
}
if (ctx->topath == NULL) {
printf("%s: rename topath: NULL\r\n", action);
return -1;
}
return 0;
}
int main(int argc, char *argv[])
{
int ch;
char buf[256], buf2[256], action[128];
struct FIBER_CTX ctx;
buf[0] = 0;
snprintf(buf, sizeof(buf), "from.txt");
snprintf(buf2, sizeof(buf2), "to.txt");
action[0] = 0;
ctx.frompath = NULL;
ctx.topath = NULL;
ctx.frompath = buf;
ctx.topath = buf2;
ctx.off = 0;
ctx.len = 100;
#define EQ(x, y) !strcasecmp((x), (y))
while ((ch = getopt(argc, argv, "hf:t:a:o:n:p:l:")) > 0) {
while ((ch = getopt(argc, argv, "hf:t:a:o:n:p:")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
@ -295,14 +285,12 @@ int main(int argc, char *argv[])
case 'p':
ctx.off = atoi(optarg);
break;
case 'l':
ctx.len = atoi(optarg);
break;
case 'a':
snprintf(action, sizeof(action), "%s", optarg);
break;
case 'n':
__write_size = atoi(optarg);
ctx.len = atoi(optarg);
break;
case 'o':
if (EQ(optarg, "O_RDONLY")) {
@ -334,7 +322,7 @@ int main(int argc, char *argv[])
return 1;
}
acl_fiber_msg_stdout_enable(1);
//acl_fiber_msg_stdout_enable(1);
acl_msg_stdout_enable(1);
if (__open_flags == 0) {
@ -350,14 +338,11 @@ int main(int argc, char *argv[])
} else if (EQ(action, "stat")) {
acl_fiber_create(fiber_filestat, buf, 320000);
} else if (EQ(action, "rename")) {
if (check_ctx("rename", &ctx) == -1) {
return 1;
}
acl_fiber_create(fiber_rename, &ctx, 320000);
} else if (EQ(action, "mkdir")) {
acl_fiber_create(fiber_mkdir, buf, 320000);
} else if (EQ(action, "splice")) {
acl_fiber_create(fiber_splice, buf, 320000);
acl_fiber_create(fiber_splice, &ctx, 320000);
} else if (EQ(action, "pread")) {
acl_fiber_create(fiber_pread, &ctx, 320000);
} else if (EQ(action, "sendfile")) {

31
lib_fiber/samples/file/tt.sh Executable file
View File

@ -0,0 +1,31 @@
#!/bin/sh
./file -f from.txt -o O_WRONLY -o O_CREAT -n 8291
echo "enter any key to test read..."
read n
./file -f from.txt -o O_RDONLY
echo "enter any key to test stat..."
read n
./file -f from.txt -a stat
echo "enter any key to test splice..."
read n
./file -f from.txt -a splice -p 0
echo "enter any key to test pread..."
read n
./file -f from.txt -a pread -p 512 -n 256
echo "enter any key to test rename..."
read n
./file -f from.txt -t to.txt -a rename
echo "enter any key to test unlink..."
read n
./file -f to.txt -a unlink
echo "enter any key to test mkdir..."
read n
./file -f "a/b/c/d" -a mkdir