diff --git a/lib_fiber/c/src/event.h b/lib_fiber/c/src/event.h index 8a86b9e26..af658529f 100644 --- a/lib_fiber/c/src/event.h +++ b/lib_fiber/c/src/event.h @@ -158,20 +158,21 @@ struct FILE_EVENT { #ifdef HAS_IO_URING char *rbuf; - size_t rsize; + unsigned rsize; int rlen; __u64 off; const char *wbuf; - size_t wsize; + unsigned wsize; int wlen; - socket_t iocp_sock; union { - struct sockaddr_in peer_addr; - struct statx statxbuf; + struct { + struct sockaddr_in addr; + socklen_t len; + } peer; + struct statx *statxbuf; char *path; int pipefd[2]; } var; - socklen_t addr_len; struct __kernel_timespec rts; struct __kernel_timespec wts; int r_timeout; diff --git a/lib_fiber/c/src/event/event_io_uring.c b/lib_fiber/c/src/event/event_io_uring.c index 872cb0fdf..841d7e52a 100644 --- a/lib_fiber/c/src/event/event_io_uring.c +++ b/lib_fiber/c/src/event/event_io_uring.c @@ -64,10 +64,10 @@ static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe) struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); assert(sqe); - fe->addr_len = (socklen_t) sizeof(fe->var.peer_addr); + fe->var.peer.len = (socklen_t) sizeof(fe->var.peer.addr); io_uring_prep_accept(sqe, fe->fd, - (struct sockaddr*) &fe->var.peer_addr, - (socklen_t*) &fe->addr_len, 0); + (struct sockaddr*) &fe->var.peer.addr, + (socklen_t*) &fe->var.peer.len, 0); io_uring_sqe_set_data(sqe, fe); TRY_SUBMMIT(ep); @@ -116,8 +116,8 @@ static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe) non_blocking(fe->fd, 1); struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); io_uring_prep_connect(sqe, fe->fd, - (struct sockaddr*) &fe->var.peer_addr, - (socklen_t) fe->addr_len); + (struct sockaddr*) &fe->var.peer.addr, + (socklen_t) fe->var.peer.len); io_uring_sqe_set_data(sqe, fe); TRY_SUBMMIT(ep); @@ -217,11 +217,12 @@ void event_uring_sendfile(EVENT *ev, FILE_EVENT *fe, int out, int in, { EVENT_URING *ep = (EVENT_URING*) ev; struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring); - unsigned flags = SPLICE_F_MOVE |SPLICE_F_MORE; + unsigned flags = SPLICE_F_MOVE | SPLICE_F_MORE; // | SPLICE_F_NONBLOCK; - printf("hello>>>in=%d, off=%d\n", in, (int) off); + printf("hello>>>in=%d, off=%dfe=%p\n", in, (int) off, fe); io_uring_prep_splice(sqe, in, off, fe->var.pipefd[1], -1, cnt, flags); io_uring_sqe_set_data(sqe, fe); + sqe->flags = IOSQE_IO_LINK; sqe->opcode = IORING_OP_SPLICE; TRY_SUBMMIT(ep); @@ -258,8 +259,11 @@ static int event_uring_del_write(EVENT_URING *ep UNUSED, FILE_EVENT *fe) static void handle_read(EVENT *ev, FILE_EVENT *fe, int res) { + printf("%s: file size=%zd, s=%zd statusx=%zd\n", __FUNCTION__, + sizeof(FILE_EVENT), sizeof(struct sockaddr_in), + sizeof(struct statx)); if (fe->mask & EVENT_ACCEPT) { - fe->iocp_sock = res; + fe->rlen = res; } else if (fe->mask & EVENT_POLLIN) { if (res & (POLLIN | ERR)) { if (res & POLLERR) { @@ -292,7 +296,7 @@ static void handle_read(EVENT *ev, FILE_EVENT *fe, int res) static void handle_write(EVENT *ev, FILE_EVENT *fe, int res) { if (fe->mask & EVENT_CONNECT) { - fe->iocp_sock = res; + fe->rlen = res; } else if (fe->mask & EVENT_POLLOUT) { if (res & (POLLOUT | ERR)) { if (res & POLLERR) { diff --git a/lib_fiber/c/src/fiber_io.c b/lib_fiber/c/src/fiber_io.c index 1683e0a46..a2397b95b 100644 --- a/lib_fiber/c/src/fiber_io.c +++ b/lib_fiber/c/src/fiber_io.c @@ -224,7 +224,7 @@ static void fiber_io_loop(ACL_FIBER *self fiber_unused, void *ctx) if (timer) { continue; } - + msg_info("%s(%d), tid=%lu: fdcount=0, waiter=%u, events=%d", __FUNCTION__, __LINE__, __pthread_self(), ev->waiter, ring_size(&ev->events)); @@ -550,48 +550,52 @@ FILE_EVENT *fiber_file_open_write(socket_t fd) return fe; } -static int fiber_file_del(FILE_EVENT *fe) +static int fiber_file_del(FILE_EVENT *fe, socket_t fd) { #ifdef SYS_WIN char key[64]; - if (fe->fd == INVALID_SOCKET || fe->fd >= (socket_t) var_maxfd) { - msg_error("%s(%d): invalid fd=%d", - __FUNCTION__, __LINE__, fe->fd); + if (fd == INVALID_SOCKET || fd >= (socket_t) var_maxfd) { + msg_error("%s(%d): invalid fd=%d", __FUNCTION__, __LINE__, fd); return -1; } //_snprintf(key, sizeof(key), "%u", fe->fd); - _i64toa(fe->fd, key, 10); + _i64toa(fd, key, 10); htable_delete(__thread_fiber->events, key, NULL); return 0; #else - if (fe->fd == INVALID_SOCKET || fe->fd >= var_maxfd) { - msg_error("%s(%d): invalid fd=%d", - __FUNCTION__, __LINE__, fe->fd); + if (fd == INVALID_SOCKET || fd >= var_maxfd) { + msg_error("%s(%d): invalid fd=%d", __FUNCTION__, __LINE__, fd); return -1; } - if (__thread_fiber->events[fe->fd] != fe) { + if (__thread_fiber->events[fd] != fe) { msg_error("%s(%d): invalid fe=%p, fd=%d, origin=%p", - __FUNCTION__, __LINE__, fe, fe->fd, - __thread_fiber->events[fe->fd]); + __FUNCTION__, __LINE__, fe, fd, __thread_fiber->events[fd]); return -1; } - __thread_fiber->events[fe->fd] = NULL; + __thread_fiber->events[fd] = NULL; return 0; #endif } void fiber_file_free(FILE_EVENT *fe) { - if (fiber_file_del(fe) == 0) { + socket_t fd = fe->fd; + + // We must set fd INVALID_SOCKET to stop any using the old fd, + // fe will be freed only when the reference of it is 0. + fe->fd = INVALID_SOCKET; + + if (fiber_file_del(fe, fd) == 0) { file_event_unrefer(fe); } else { // xxx: What happened? - msg_error("Some error happened for fe=%p, fd=%d", fe, fe->fd); + msg_error("%s(%d): some error happened for fe=%p, fd=%d", + __FUNCTION__, __LINE__, fe, fe->fd); } } diff --git a/lib_fiber/c/src/file_event.c b/lib_fiber/c/src/file_event.c index 76a57f692..55a41fd2b 100644 --- a/lib_fiber/c/src/file_event.c +++ b/lib_fiber/c/src/file_event.c @@ -28,10 +28,9 @@ void file_event_init(FILE_EVENT *fe, socket_t fd) fe->off = 0; fe->wbuf = 0; fe->wsize = 0; - fe->iocp_sock = INVALID_SOCKET; - fe->addr_len = 0; fe->r_timeout = -1; fe->w_timeout = -1; + memset(&fe->var, 0, sizeof(fe->var)); #endif #ifdef HAS_IOCP diff --git a/lib_fiber/c/src/hook/file.c b/lib_fiber/c/src/hook/file.c index 8434e7b1e..28657e796 100644 --- a/lib_fiber/c/src/hook/file.c +++ b/lib_fiber/c/src/hook/file.c @@ -63,11 +63,9 @@ int file_close(EVENT *ev, FILE_EVENT *fe) fe->mask &= ~EVENT_FILE_CLOSE; if (fe->rlen == 0) { - file_event_unrefer(fe); return 0; } else { acl_fiber_set_error(fe->rlen); - file_event_unrefer(fe); return -1; } } @@ -241,10 +239,11 @@ int statx(int dirfd, const char *pathname, int flags, unsigned int mask, FILE_ALLOC(fe, EVENT_FILE_STATX); fe->rbuf = strdup(pathname); - memcpy(&fe->var.statxbuf, statxbuf, sizeof(struct statx)); + fe->var.statxbuf = (struct statx*) malloc(sizeof(struct statx)); + memcpy(fe->var.statxbuf, statxbuf, sizeof(struct statx)); event_uring_file_statx(ev, fe, dirfd, fe->rbuf, flags, mask, - &fe->var.statxbuf); + fe->var.statxbuf); fiber_io_inc(); acl_fiber_switch(); @@ -255,11 +254,13 @@ int statx(int dirfd, const char *pathname, int flags, unsigned int mask, fe->rbuf = NULL; if (fe->rlen == 0) { + memcpy(statxbuf, fe->var.statxbuf, sizeof(struct statx)); + free(fe->var.statxbuf); file_event_unrefer(fe); - memcpy(statxbuf, &fe->var.statxbuf, sizeof(struct statx)); return 0; } else { acl_fiber_set_error(fe->rlen); + free(fe->var.statxbuf); file_event_unrefer(fe); return -1; } @@ -420,7 +421,10 @@ ssize_t file_sendfile(socket_t out_fd, int in_fd, off64_t *off, size_t cnt) close(fe->var.pipefd[0]); close(fe->var.pipefd[1]); - if (ret < 0) { + if (ret == 0) { + file_event_unrefer(fe); + return 0; + } else if (ret < 0) { acl_fiber_set_error(-ret); file_event_unrefer(fe); return -1; diff --git a/lib_fiber/c/src/hook/io.c b/lib_fiber/c/src/hook/io.c index a7a67f5a3..29858f331 100644 --- a/lib_fiber/c/src/hook/io.c +++ b/lib_fiber/c/src/hook/io.c @@ -102,9 +102,6 @@ int WINAPI acl_fiber_close(socket_t fd) ret = (*sys_close)(fd); } - // We must set fd INVALID_SOCKET to stop any using the old fd, - // fe will be freed only when the reference of it is 0. - fe->fd = INVALID_SOCKET; fiber_file_free(fe); if (ret != 0) { diff --git a/lib_fiber/c/src/hook/socket.c b/lib_fiber/c/src/hook/socket.c index 279b3688e..f80115771 100644 --- a/lib_fiber/c/src/hook/socket.c +++ b/lib_fiber/c/src/hook/socket.c @@ -87,14 +87,14 @@ static socket_t fiber_iocp_accept(FILE_EVENT *fe) { fe->mask &= ~EVENT_READ; fe->mask |= EVENT_ACCEPT; - fe->iocp_sock = INVALID_SOCKET; + fe->rlen = INVALID_SOCKET; if (fiber_wait_read(fe) < 0) { msg_error("%s(%d): fiber_wait_read error=%s, fd=%d", __FUNCTION__, __LINE__, last_serror(), (int) fe->fd); return INVALID_SOCKET; } - return fe->iocp_sock; + return fe->rlen; } #endif @@ -291,8 +291,8 @@ static socket_t fiber_iocp_connect(FILE_EVENT *fe) } fe->mask &= ~EVENT_CONNECT; - if (fe->iocp_sock < 0) { - acl_fiber_set_error(-fe->iocp_sock); + if (fe->rlen < 0) { + acl_fiber_set_error(-fe->rlen); return -1; } return 0; @@ -326,8 +326,8 @@ int WINAPI acl_fiber_connect(socket_t sockfd, const struct sockaddr *addr, SET_CONNECTING(fe); #if defined(HAS_IOCP) || defined(HAS_IO_URING) - memcpy(&fe->var.peer_addr, addr, addrlen); - fe->addr_len = addrlen; + memcpy(&fe->var.peer.addr, addr, addrlen); + fe->var.peer.len = addrlen; #endif // The socket must be set to in no blocking status to avoid to be diff --git a/lib_fiber/samples/file/main.c b/lib_fiber/samples/file/main.c index 134c96b92..15fbfa194 100644 --- a/lib_fiber/samples/file/main.c +++ b/lib_fiber/samples/file/main.c @@ -71,6 +71,7 @@ static void fiber_writefile(ACL_FIBER *fiber acl_unused, void *ctx) } (void) write(fd, "\r\n", 2); + printf("write over!\r\n"); n = close(fd); printf("close %s %s, fd=%d\r\n", path, n == 0 ? "ok" : "error", fd); } @@ -135,7 +136,9 @@ static void fiber_sendfile(ACL_FIBER *fiber acl_unused, void *ctx) } cfd = accept_one(); - ret = sendfile64(cfd, in, &fc->off, 100); + + printf(">>>begin call sendfile64\r\n"); + ret = sendfile64(cfd, in, &fc->off, fc->len); printf(">>>sendfile ret=%zd, off=%d\r\n", ret, (int) fc->off); close(in); @@ -144,13 +147,12 @@ static void fiber_sendfile(ACL_FIBER *fiber acl_unused, void *ctx) static void fiber_splice(ACL_FIBER *fiber acl_unused, void *ctx) { - const char *path = (const char*) ctx; - int in = open(path, O_RDONLY, 0600), out; - loff_t off_in = 0; + struct FIBER_CTX *fc = (struct FIBER_CTX*) ctx; + int in = open(fc->frompath, O_RDONLY, 0600), out; int pipefd[2]; if (in == -1) { - printf("open %s error %s\r\n", path, strerror(errno)); + printf("open %s error %s\r\n", fc->frompath, strerror(errno)); return; } @@ -162,22 +164,22 @@ static void fiber_splice(ACL_FIBER *fiber acl_unused, void *ctx) while (1) { char buf[1024]; - int flags = SPLICE_F_MOVE |SPLICE_F_MORE; - int ret = splice(in, &off_in, out, NULL, 100, flags); + int flags = SPLICE_F_MOVE | SPLICE_F_MORE; + int ret = splice(in, &fc->off, out, NULL, fc->len, flags); if (ret <= 0) { - printf("splice over: %s\r\n", strerror(errno)); + printf("splice over, ret=%d: %s\r\n", ret, strerror(errno)); break; } ret = read(pipefd[0], buf, sizeof(buf) - 1); if (ret <= 0) { - printf("read from pipe over: %s\r\n", strerror(errno)); + printf("pipe over, ret=%d: %s\r\n", ret, strerror(errno)); break; } buf[ret] = 0; - printf("%s", buf); + printf("off: %ld, %s", fc->off, buf); fflush(stdout); } @@ -244,42 +246,30 @@ static void usage(const char *proc) " -f filepath\r\n" " -t tofilepath\r\n" " -a action[read|write|rename|unlink|stat|mkdir|splice|pread|pwrite|sendfile]\r\n" - " -n write_size[default: 1024]\r\n" + " -n size[default: 1024]\r\n" " -o open_flags[O_RDONLY, O_WRONLY, O_RDWR, O_APPEND, O_CREAT, O_EXCL, O_TRUNC]\r\n" " -p offset\r\n" , proc); } -static int check_ctx(const char *action, const struct FIBER_CTX *ctx) -{ - if (ctx->frompath == NULL) { - printf("%s: frompath NULL\r\n", action); - return -1; - } - if (ctx->topath == NULL) { - printf("%s: rename topath: NULL\r\n", action); - return -1; - } - - return 0; -} - int main(int argc, char *argv[]) { int ch; char buf[256], buf2[256], action[128]; struct FIBER_CTX ctx; - buf[0] = 0; + snprintf(buf, sizeof(buf), "from.txt"); + snprintf(buf2, sizeof(buf2), "to.txt"); + action[0] = 0; - ctx.frompath = NULL; - ctx.topath = NULL; + ctx.frompath = buf; + ctx.topath = buf2; ctx.off = 0; ctx.len = 100; #define EQ(x, y) !strcasecmp((x), (y)) - while ((ch = getopt(argc, argv, "hf:t:a:o:n:p:l:")) > 0) { + while ((ch = getopt(argc, argv, "hf:t:a:o:n:p:")) > 0) { switch (ch) { case 'h': usage(argv[0]); @@ -295,14 +285,12 @@ int main(int argc, char *argv[]) case 'p': ctx.off = atoi(optarg); break; - case 'l': - ctx.len = atoi(optarg); - break; case 'a': snprintf(action, sizeof(action), "%s", optarg); break; case 'n': __write_size = atoi(optarg); + ctx.len = atoi(optarg); break; case 'o': if (EQ(optarg, "O_RDONLY")) { @@ -334,7 +322,7 @@ int main(int argc, char *argv[]) return 1; } - acl_fiber_msg_stdout_enable(1); + //acl_fiber_msg_stdout_enable(1); acl_msg_stdout_enable(1); if (__open_flags == 0) { @@ -350,14 +338,11 @@ int main(int argc, char *argv[]) } else if (EQ(action, "stat")) { acl_fiber_create(fiber_filestat, buf, 320000); } else if (EQ(action, "rename")) { - if (check_ctx("rename", &ctx) == -1) { - return 1; - } acl_fiber_create(fiber_rename, &ctx, 320000); } else if (EQ(action, "mkdir")) { acl_fiber_create(fiber_mkdir, buf, 320000); } else if (EQ(action, "splice")) { - acl_fiber_create(fiber_splice, buf, 320000); + acl_fiber_create(fiber_splice, &ctx, 320000); } else if (EQ(action, "pread")) { acl_fiber_create(fiber_pread, &ctx, 320000); } else if (EQ(action, "sendfile")) { diff --git a/lib_fiber/samples/file/tt.sh b/lib_fiber/samples/file/tt.sh new file mode 100755 index 000000000..c046d24f4 --- /dev/null +++ b/lib_fiber/samples/file/tt.sh @@ -0,0 +1,31 @@ +#!/bin/sh + +./file -f from.txt -o O_WRONLY -o O_CREAT -n 8291 + +echo "enter any key to test read..." +read n +./file -f from.txt -o O_RDONLY + +echo "enter any key to test stat..." +read n +./file -f from.txt -a stat + +echo "enter any key to test splice..." +read n +./file -f from.txt -a splice -p 0 + +echo "enter any key to test pread..." +read n +./file -f from.txt -a pread -p 512 -n 256 + +echo "enter any key to test rename..." +read n +./file -f from.txt -t to.txt -a rename + +echo "enter any key to test unlink..." +read n +./file -f to.txt -a unlink + +echo "enter any key to test mkdir..." +read n +./file -f "a/b/c/d" -a mkdir