mirror of
https://gitee.com/acl-dev/acl.git
synced 2024-12-03 04:17:52 +08:00
test and optimize ...
This commit is contained in:
parent
acac293ee1
commit
d8db1f5de9
@ -143,7 +143,6 @@ struct FILE_EVENT {
|
||||
#define EVENT_FILE_RENAMEAT2 (unsigned) (1 << 13)
|
||||
#define EVENT_DIR_MKDIRAT (unsigned) (1 << 14)
|
||||
#define EVENT_SPLICE (unsigned) (1 << 15)
|
||||
#define EVENT_SENDFILE (unsigned) (1 << 16)
|
||||
|
||||
#endif // HAS_IO_URING
|
||||
|
||||
@ -171,7 +170,6 @@ struct FILE_EVENT {
|
||||
} peer;
|
||||
struct statx *statxbuf;
|
||||
char *path;
|
||||
int pipefd[2];
|
||||
} var;
|
||||
struct __kernel_timespec rts;
|
||||
struct __kernel_timespec wts;
|
||||
|
@ -215,6 +215,9 @@ void event_uring_splice(EVENT *ev, FILE_EVENT *fe, int fd_in, loff_t off_in,
|
||||
TRY_SUBMMIT(ep);
|
||||
}
|
||||
|
||||
#if 0
|
||||
// Some problems can't be resolve current, so I use another way to do it.
|
||||
|
||||
void event_uring_sendfile(EVENT *ev, FILE_EVENT *fe, int out, int in,
|
||||
off64_t off, size_t cnt)
|
||||
{
|
||||
@ -222,8 +225,6 @@ void event_uring_sendfile(EVENT *ev, FILE_EVENT *fe, int out, int in,
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
|
||||
unsigned flags = SPLICE_F_MOVE | SPLICE_F_MORE; // | SPLICE_F_NONBLOCK;
|
||||
|
||||
printf("hello>>>in=%d, off=%d, cnt=%zd, fe=%p, flags=%u\n",
|
||||
in, (int) off, cnt, fe, flags);
|
||||
io_uring_prep_splice(sqe, in, off, fe->var.pipefd[1], -1, cnt, flags);
|
||||
io_uring_sqe_set_data(sqe, fe);
|
||||
sqe->flags |= IOSQE_IO_LINK | SPLICE_F_FD_IN_FIXED | IOSQE_ASYNC;
|
||||
@ -239,6 +240,7 @@ void event_uring_sendfile(EVENT *ev, FILE_EVENT *fe, int out, int in,
|
||||
|
||||
TRY_SUBMMIT(ep);
|
||||
}
|
||||
#endif
|
||||
|
||||
static int event_uring_del_read(EVENT_URING *ep UNUSED, FILE_EVENT *fe)
|
||||
{
|
||||
@ -350,8 +352,7 @@ static void handle_one(EVENT *ev, FILE_EVENT *fe, int res)
|
||||
| EVENT_FILE_STATX \
|
||||
| EVENT_FILE_RENAMEAT2 \
|
||||
| EVENT_DIR_MKDIRAT \
|
||||
| EVENT_SPLICE \
|
||||
| EVENT_SENDFILE)
|
||||
| EVENT_SPLICE)
|
||||
|
||||
if (fe->mask & FLAGS) {
|
||||
fe->rlen = res;
|
||||
|
@ -21,8 +21,6 @@ void event_uring_mkdirat(EVENT *ev, FILE_EVENT *fe, int dirfd,
|
||||
void event_uring_splice(EVENT *ev, FILE_EVENT *fe, int fd_in, loff_t off_in,
|
||||
int fd_out, loff_t off_out, size_t len, unsigned int splice_flags,
|
||||
unsigned int sqe_flags, __u8 opcode);
|
||||
void event_uring_sendfile(EVENT *ev, FILE_EVENT *fe, int out, int in,
|
||||
off64_t off, size_t cnt);
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -17,6 +17,7 @@ void file_event_init(FILE_EVENT *fe, socket_t fd)
|
||||
fe->mask = 0;
|
||||
fe->r_proc = NULL;
|
||||
fe->w_proc = NULL;
|
||||
|
||||
#ifdef HAS_POLL
|
||||
fe->pfd = NULL;
|
||||
#endif
|
||||
|
@ -1,10 +1,14 @@
|
||||
#include "stdafx.h"
|
||||
#include "common.h"
|
||||
|
||||
#ifdef HAS_IO_URING
|
||||
|
||||
#include "common.h"
|
||||
#include "event.h"
|
||||
#include "fiber.h"
|
||||
#include "hook.h"
|
||||
|
||||
#ifdef HAS_IO_URING
|
||||
#define _GNU_SOURCE
|
||||
#include <fcntl.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
@ -21,14 +25,13 @@
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define FILE_ALLOC(fe, type) do { \
|
||||
(fe) = file_event_alloc(-1); \
|
||||
(fe)->fiber_r = acl_fiber_running(); \
|
||||
(fe)->fiber_w = acl_fiber_running(); \
|
||||
(fe)->fiber_r->status = FIBER_STATUS_NONE; \
|
||||
(fe)->fiber_w->status = FIBER_STATUS_NONE; \
|
||||
(fe)->r_proc = file_read_callback; \
|
||||
(fe)->mask = (type); \
|
||||
#define FILE_ALLOC(__fe, __type) do { \
|
||||
(__fe) = file_event_alloc(-1); \
|
||||
(__fe)->fiber_r->status = FIBER_STATUS_NONE; \
|
||||
(__fe)->fiber_w->status = FIBER_STATUS_NONE; \
|
||||
(__fe)->r_proc = file_read_callback; \
|
||||
(__fe)->mask = (__type); \
|
||||
(__fe)->type = TYPE_EVENTABLE | TYPE_FILE; \
|
||||
} while (0)
|
||||
|
||||
static void file_read_callback(EVENT *ev UNUSED, FILE_EVENT *fe)
|
||||
@ -50,10 +53,10 @@ int file_close(EVENT *ev, FILE_EVENT *fe)
|
||||
return (*sys_close)(fe->fd);
|
||||
}
|
||||
|
||||
fe->fiber_r = acl_fiber_running();
|
||||
fe->fiber_r = acl_fiber_running();
|
||||
fe->fiber_r->status = FIBER_STATUS_NONE;
|
||||
fe->r_proc = file_read_callback;
|
||||
fe->mask = EVENT_FILE_CLOSE;
|
||||
fe->r_proc = file_read_callback;
|
||||
fe->mask |= EVENT_FILE_CLOSE;
|
||||
|
||||
event_uring_file_close(ev, fe);
|
||||
|
||||
@ -109,7 +112,7 @@ int openat(int dirfd, const char *pathname, int flags, ...)
|
||||
if (fe->rlen >= 0) {
|
||||
fe->fd = fe->rlen;
|
||||
fe->type = TYPE_FILE | TYPE_EVENTABLE;
|
||||
fiber_file_set(fe);
|
||||
fiber_file_set(fe); // Save the fe for the future using.
|
||||
return fe->fd;
|
||||
}
|
||||
|
||||
@ -187,7 +190,7 @@ int renameat2(int olddirfd, const char *oldpath,
|
||||
}
|
||||
|
||||
FILE_ALLOC(fe, EVENT_FILE_RENAMEAT2);
|
||||
fe->rbuf = strdup(oldpath);
|
||||
fe->rbuf = strdup(oldpath);
|
||||
fe->var.path = strdup(newpath);
|
||||
|
||||
event_uring_file_renameat2(ev, fe, olddirfd, fe->rbuf,
|
||||
@ -331,8 +334,70 @@ int mkdirat(int dirfd, const char *pathname, mode_t mode)
|
||||
}
|
||||
}
|
||||
|
||||
#define _GNU_SOURCE
|
||||
#include <fcntl.h>
|
||||
ssize_t pread(int fd, void *buf, size_t count, off_t offset)
|
||||
{
|
||||
FILE_EVENT *fe;
|
||||
ssize_t ret;
|
||||
|
||||
if (fd == INVALID_SOCKET) {
|
||||
msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (sys_pread == NULL) {
|
||||
hook_once();
|
||||
}
|
||||
|
||||
if (!var_hook_sys_api) {
|
||||
return (*sys_pread)(fd, buf, count, offset);
|
||||
}
|
||||
|
||||
if (!EVENT_IS_IO_URING(fiber_io_event())) {
|
||||
return (*sys_pread)(fd, buf, count, offset);
|
||||
}
|
||||
|
||||
// We alloc one new FILE_EVENT for the fd so that multiple fibers
|
||||
// can pread or pwrite the same fd.
|
||||
|
||||
FILE_ALLOC(fe, EVENT_READ);
|
||||
fe->fd = fd;
|
||||
fe->off = offset;
|
||||
ret = fiber_iocp_read(fe, buf, (int) count);
|
||||
file_event_unrefer(fe);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset)
|
||||
{
|
||||
FILE_EVENT *fe;
|
||||
ssize_t ret;
|
||||
|
||||
if (fd == INVALID_SOCKET) {
|
||||
msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (sys_pwrite == NULL) {
|
||||
hook_once();
|
||||
}
|
||||
|
||||
if (!var_hook_sys_api) {
|
||||
return (*sys_pwrite)(fd, buf, count, offset);
|
||||
}
|
||||
|
||||
if (!EVENT_IS_IO_URING(fiber_io_event())) {
|
||||
return (*sys_pwrite)(fd, buf, count, offset);
|
||||
}
|
||||
|
||||
FILE_ALLOC(fe, EVENT_WRITE);
|
||||
fe->fd = fd;
|
||||
fe->off = offset;
|
||||
ret = fiber_iocp_write(fe, buf, (int) count);
|
||||
file_event_unrefer(fe);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
ssize_t splice(int fd_in, loff_t *poff_in, int fd_out,
|
||||
loff_t *poff_out, size_t len, unsigned int flags)
|
||||
@ -361,13 +426,14 @@ ssize_t splice(int fd_in, loff_t *poff_in, int fd_out,
|
||||
return (*sys_splice)(fd_in, poff_in, fd_out, poff_out, len, flags);
|
||||
}
|
||||
|
||||
off_in = poff_in ? *poff_in : -1;
|
||||
off_in = poff_in ? *poff_in : -1;
|
||||
off_out = poff_out ? *poff_out : -1;
|
||||
|
||||
// The same fd_in maybe be shared by multiple fibers, so we should
|
||||
// alloc one new FILE_EVENT for each operation.
|
||||
FILE_ALLOC(fe, EVENT_SPLICE);
|
||||
fe->fiber_r->status = FIBER_STATUS_WAIT_READ;
|
||||
|
||||
// flags => SPLICE_F_FD_IN_FIXED;
|
||||
// sqe_flags => IOSQE_FIXED_FILE;
|
||||
event_uring_splice(ev, fe, fd_in, off_in, fd_out, off_out, len, flags,
|
||||
sqe_flags, IORING_OP_SPLICE);
|
||||
|
||||
@ -398,48 +464,33 @@ ssize_t splice(int fd_in, loff_t *poff_in, int fd_out,
|
||||
|
||||
ssize_t file_sendfile(socket_t out_fd, int in_fd, off64_t *off, size_t cnt)
|
||||
{
|
||||
FILE_EVENT *fe;
|
||||
EVENT *ev = fiber_io_event();
|
||||
int ret;
|
||||
unsigned flags = SPLICE_F_MOVE | SPLICE_F_MORE | SPLICE_F_NONBLOCK;
|
||||
ssize_t ret;
|
||||
int pipefd[2];
|
||||
|
||||
fe = fiber_file_open_read(in_fd);
|
||||
fe->mask |= EVENT_SENDFILE;
|
||||
|
||||
if (pipe(fe->var.pipefd) == -1) {
|
||||
fe->mask &= ~EVENT_SENDFILE;
|
||||
if (pipe(pipefd) == -1) {
|
||||
msg_error("%s(%d): pipe error=%s",
|
||||
__FUNCTION__, __LINE__, last_serror());
|
||||
return -1;
|
||||
}
|
||||
|
||||
fe->fiber_r = acl_fiber_running();
|
||||
fe->fiber_r->status = FIBER_STATUS_WAIT_READ;
|
||||
|
||||
event_uring_sendfile(ev, fe, out_fd, in_fd, off ? *off : 0, cnt);
|
||||
|
||||
fiber_io_inc();
|
||||
acl_fiber_switch();
|
||||
fiber_io_dec();
|
||||
|
||||
fe->mask &= ~EVENT_SENDFILE;
|
||||
|
||||
ret = fe->rlen;
|
||||
close(fe->var.pipefd[0]);
|
||||
close(fe->var.pipefd[1]);
|
||||
fe->var.pipefd[0] = -1;
|
||||
fe->var.pipefd[1] = -1;
|
||||
|
||||
printf(">>>>>>>>%s: ret=%d\n", __FUNCTION__, ret);
|
||||
if (ret == 0) {
|
||||
return 0;
|
||||
} else if (ret < 0) {
|
||||
acl_fiber_set_error(-ret);
|
||||
return -1;
|
||||
ret = splice(in_fd, off, pipefd[1], NULL, cnt, flags);
|
||||
if (ret <= 0) {
|
||||
close(pipefd[0]);
|
||||
close(pipefd[1]);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (off) {
|
||||
*off += cnt;
|
||||
ret = splice(pipefd[0], NULL, out_fd, NULL, ret, flags);
|
||||
if (ret <= 0) {
|
||||
close(pipefd[0]);
|
||||
close(pipefd[1]);
|
||||
return ret;
|
||||
}
|
||||
|
||||
close(pipefd[0]);
|
||||
close(pipefd[1]);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -162,6 +162,11 @@ extern epoll_ctl_fn *sys_epoll_ctl;
|
||||
typedef struct EVENT EVENT;
|
||||
typedef struct FILE_EVENT FILE_EVENT;
|
||||
|
||||
// in io.c
|
||||
int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len);
|
||||
int fiber_iocp_write(FILE_EVENT *fe, const char *buf, int len);
|
||||
|
||||
// in file.c
|
||||
extern int file_close(EVENT *ev, FILE_EVENT *fe);
|
||||
extern ssize_t file_sendfile(socket_t out_fd, int in_fd, off64_t *off, size_t cnt);
|
||||
|
||||
|
@ -113,7 +113,7 @@ int WINAPI acl_fiber_close(socket_t fd)
|
||||
/****************************************************************************/
|
||||
|
||||
#if defined(HAS_IOCP) || defined(HAS_IO_URING)
|
||||
static int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
|
||||
int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
|
||||
{
|
||||
#if defined(HAS_IOCP)
|
||||
/* If the socket type is UDP, We must check the fixed buffer first,
|
||||
@ -137,6 +137,8 @@ static int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
|
||||
while (1) {
|
||||
int err;
|
||||
|
||||
// Must clear the EVENT_READ flags in order to set IO event
|
||||
// for each IO process.
|
||||
fe->mask &= ~EVENT_READ;
|
||||
|
||||
if (fiber_wait_read(fe) < 0) {
|
||||
@ -174,7 +176,7 @@ static int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
|
||||
#endif // HAS_IOCP || HAS_IO_URING
|
||||
|
||||
#if defined(HAS_IO_URING)
|
||||
static int fiber_iocp_write(FILE_EVENT *fe, const char *buf, int len)
|
||||
int fiber_iocp_write(FILE_EVENT *fe, const char *buf, int len)
|
||||
{
|
||||
fe->wbuf = buf;
|
||||
fe->wsize = (size_t) len;
|
||||
@ -217,75 +219,10 @@ static int fiber_iocp_write(FILE_EVENT *fe, const char *buf, int len)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
#endif // HAS_IO_URING
|
||||
|
||||
#ifdef SYS_UNIX
|
||||
|
||||
ssize_t pread(int fd, void *buf, size_t count, off_t offset)
|
||||
{
|
||||
FILE_EVENT *fe;
|
||||
|
||||
if (fd == INVALID_SOCKET) {
|
||||
msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (sys_pread == NULL) {
|
||||
hook_once();
|
||||
}
|
||||
|
||||
if (!var_hook_sys_api) {
|
||||
return (*sys_pread)(fd, buf, count, offset);
|
||||
}
|
||||
|
||||
#ifdef HAS_IO_URING
|
||||
if (!EVENT_IS_IO_URING(fiber_io_event())) {
|
||||
return (*sys_pread)(fd, buf, count, offset);
|
||||
}
|
||||
|
||||
fe = fiber_file_open_read(fd);
|
||||
CLR_POLLING(fe);
|
||||
fe->off = offset;
|
||||
|
||||
return fiber_iocp_read(fe, buf, (int) count);
|
||||
#else
|
||||
return (*sys_pread)(fd, buf, count, offset);
|
||||
#endif
|
||||
}
|
||||
|
||||
ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset)
|
||||
{
|
||||
FILE_EVENT *fe;
|
||||
|
||||
if (fd == INVALID_SOCKET) {
|
||||
msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (sys_pwrite == NULL) {
|
||||
hook_once();
|
||||
}
|
||||
|
||||
if (!var_hook_sys_api) {
|
||||
return (*sys_pwrite)(fd, buf, count, offset);
|
||||
}
|
||||
|
||||
#ifdef HAS_IO_URING
|
||||
if (!EVENT_IS_IO_URING(fiber_io_event())) {
|
||||
return (*sys_pwrite)(fd, buf, count, offset);
|
||||
}
|
||||
|
||||
fe = fiber_file_open_write(fd);
|
||||
CLR_POLLING(fe);
|
||||
fe->off = offset;
|
||||
|
||||
return fiber_iocp_write(fe, buf, (int) count);
|
||||
#else
|
||||
return (*sys_pwrite)(fd, buf, count, offset);
|
||||
#endif
|
||||
}
|
||||
|
||||
ssize_t acl_fiber_read(socket_t fd, void *buf, size_t count)
|
||||
{
|
||||
FILE_EVENT* fe;
|
||||
|
@ -17,8 +17,10 @@ static int __write_size = 1024;
|
||||
struct FIBER_CTX {
|
||||
const char *frompath;
|
||||
const char *topath;
|
||||
int fd;
|
||||
off_t off;
|
||||
int len;
|
||||
char addr[256];
|
||||
};
|
||||
|
||||
static void fiber_readfile(ACL_FIBER *fiber acl_unused, void *ctx)
|
||||
@ -56,7 +58,7 @@ static void fiber_writefile(ACL_FIBER *fiber acl_unused, void *ctx)
|
||||
char buf[10];
|
||||
if (fd < 0) {
|
||||
printf("open %s error %s\r\n", path, strerror(errno));
|
||||
return;
|
||||
exit(1);
|
||||
}
|
||||
|
||||
printf("open %s ok, fd=%d\r\n", path, fd);
|
||||
@ -78,46 +80,210 @@ static void fiber_writefile(ACL_FIBER *fiber acl_unused, void *ctx)
|
||||
|
||||
static void fiber_pread(ACL_FIBER *fiber acl_unused, void *ctx)
|
||||
{
|
||||
const struct FIBER_CTX *fc = (const struct FIBER_CTX*)ctx;
|
||||
int fd = open(fc->frompath, O_RDONLY, 0600), ret;
|
||||
struct FIBER_CTX *fc = (struct FIBER_CTX*) ctx;
|
||||
char buf[4096];
|
||||
int len = (int) sizeof(buf) - 1 > fc->len ? fc->len : (int) sizeof(buf) - 1;
|
||||
int len = (int) sizeof(buf) - 1 > fc->len
|
||||
? fc->len : (int) sizeof(buf) - 1;
|
||||
int fd, ret;
|
||||
|
||||
assert(fc->len > 0);
|
||||
|
||||
fd = open(fc->frompath, O_RDONLY, 0600);
|
||||
if (fd == -1) {
|
||||
printf("open %s error %s\r\n", fc->frompath, strerror(errno));
|
||||
printf("open %s for read error %s\r\n",
|
||||
fc->frompath, strerror(errno));
|
||||
return;
|
||||
}
|
||||
|
||||
printf(">>%s: begin to call pread from fd=%d\r\n", __FUNCTION__, fd);
|
||||
|
||||
ret = pread(fd, buf, len, fc->off);
|
||||
if (ret < 0) {
|
||||
printf("pread from %s %d error %s\r\n",
|
||||
fc->frompath, fd, strerror(errno));
|
||||
if (ret <= 0) {
|
||||
printf("pread from %s %d over %s, ret=%d\r\n",
|
||||
fc->frompath, fd, strerror(errno), ret);
|
||||
} else {
|
||||
buf[ret] = 0;
|
||||
printf("%s\r\n", buf);
|
||||
printf("pread from %s, ret=%d\r\n", fc->frompath, ret);
|
||||
}
|
||||
fc->off += ret;
|
||||
|
||||
printf(">>>%s: begin to close fd=%d\r\n", __FUNCTION__, fd);
|
||||
close(fd);
|
||||
}
|
||||
|
||||
static void fiber_pwrite(ACL_FIBER *fiber acl_unused, void *ctx)
|
||||
{
|
||||
struct FIBER_CTX *fc = (struct FIBER_CTX*) ctx;
|
||||
int fd, ret;
|
||||
char *buf;
|
||||
|
||||
assert(fc->len > 0);
|
||||
|
||||
fd = open(fc->frompath, O_WRONLY | O_CREAT | O_TRUNC, 0600);
|
||||
if (fd == -1) {
|
||||
printf("open %s for write error %s\r\n",
|
||||
fc->frompath, strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
printf(">>%s: begin to call pwrite to fd=%d\r\rn", __FUNCTION__, fd);
|
||||
buf = malloc(fc->len);
|
||||
memset(buf, 'x', fc->len);
|
||||
ret = pwrite(fd, buf, fc->len, fc->off);
|
||||
printf(">>pwrite ret=%d, file=%s, fd=%d\r\n", ret, fc->frompath, fd);
|
||||
|
||||
close(fd);
|
||||
}
|
||||
|
||||
struct WRITER_CTX {
|
||||
int fd;
|
||||
off_t off;
|
||||
int len;
|
||||
ACL_FIBER_SEM *sem;
|
||||
};
|
||||
|
||||
#define MB 1000000
|
||||
|
||||
static void fiber_one_pwriter(ACL_FIBER *fb acl_unused, void *ctx)
|
||||
{
|
||||
struct WRITER_CTX *wc = (struct WRITER_CTX*) ctx;
|
||||
int count = wc->len / MB, left = wc->len % MB;
|
||||
char *buf = malloc(MB);
|
||||
int ret, i;
|
||||
|
||||
memset(buf, 'x', MB);
|
||||
|
||||
for (i = 0; i < count; i++) {
|
||||
printf(">>>%d: fiber-%d running ...\r\n", __LINE__, acl_fiber_self());
|
||||
ret = pwrite(wc->fd, buf, MB, wc->off);
|
||||
//printf(">>>fiber-%d, ret=%d\r\n", acl_fiber_self(), ret);
|
||||
|
||||
if (ret <= 0) {
|
||||
printf("pwrite ret=%d, wrror=%s\r\n", ret, strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
printf(">>>%d: fiber-%d running ...\r\n", __LINE__, acl_fiber_self());
|
||||
wc->off += ret;
|
||||
}
|
||||
|
||||
printf(">>>%d: fiber-%d running ...\r\n", __LINE__, acl_fiber_self());
|
||||
|
||||
if (left > 0) {
|
||||
ret = pwrite(wc->fd, buf, left, wc->off);
|
||||
if (ret <= 0) {
|
||||
printf("pwrite ret=%d, wrror=%s\r\n", ret, strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
printf("fiber=%d: pwrite ok, ret=%d, off=%ld, len=%d\r\n",
|
||||
acl_fiber_self(), ret, wc->off, wc->len);
|
||||
|
||||
acl_fiber_sem_post(wc->sem);
|
||||
free(buf);
|
||||
free(wc);
|
||||
}
|
||||
|
||||
static void co_writers(struct FIBER_CTX *fc, int fd)
|
||||
{
|
||||
#define COUNT 10
|
||||
int i, step = fc->len / COUNT, cnt = 0;
|
||||
off_t off = 0;
|
||||
ACL_FIBER_SEM *sem = acl_fiber_sem_create(0);
|
||||
|
||||
assert(fc->len > 0);
|
||||
assert(step > 0);
|
||||
|
||||
for (i = 0; i < COUNT - 1; i++) {
|
||||
struct WRITER_CTX *wc = malloc(sizeof(struct WRITER_CTX));
|
||||
wc->fd = fd;
|
||||
wc->sem = sem;
|
||||
wc->len = step;
|
||||
wc->off = off;
|
||||
off += step;
|
||||
cnt++;
|
||||
|
||||
acl_fiber_create(fiber_one_pwriter, wc, 320000);
|
||||
}
|
||||
|
||||
if (off < fc->len) {
|
||||
struct WRITER_CTX *wc = malloc(sizeof(struct WRITER_CTX));
|
||||
wc->fd = fd;
|
||||
wc->sem = sem;
|
||||
wc->len = fc->len - off;
|
||||
wc->off = off;
|
||||
cnt++;
|
||||
|
||||
acl_fiber_create(fiber_one_pwriter, wc, 320000);
|
||||
}
|
||||
|
||||
for (i = 0; i < cnt; i++) {
|
||||
acl_fiber_sem_wait(sem);
|
||||
//sleep(1);
|
||||
}
|
||||
|
||||
printf("All fiber finished!\r\n");
|
||||
acl_fiber_sem_free(sem);
|
||||
}
|
||||
|
||||
static void fiber_co_pwrite(ACL_FIBER *fb acl_unused, void *ctx)
|
||||
{
|
||||
struct FIBER_CTX *fc = (struct FIBER_CTX*) ctx;
|
||||
int fd, i;
|
||||
|
||||
fd = open(fc->frompath, O_WRONLY | O_CREAT | O_TRUNC, 0600);
|
||||
if (fd == -1) {
|
||||
printf("open %s for write error %s\r\n",
|
||||
fc->frompath, strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
for (i = 0; i < 10; i++) {
|
||||
co_writers(fc, fd);
|
||||
}
|
||||
close(fd);
|
||||
}
|
||||
|
||||
static void fiber_reader(ACL_FIBER *fb acl_unused, void *ctx)
|
||||
{
|
||||
struct FIBER_CTX *fc = (struct FIBER_CTX*) ctx;
|
||||
ACL_VSTREAM *conn = acl_vstream_connect(fc->addr, ACL_BLOCKING, 10, 10, 1024);
|
||||
char buf[1024];
|
||||
|
||||
while (1) {
|
||||
int ret = read(ACL_VSTREAM_SOCK(conn), buf, sizeof(buf) - 1);
|
||||
if (ret <= 0) {
|
||||
break;
|
||||
}
|
||||
buf[ret] = 0;
|
||||
printf("%s", buf);
|
||||
fflush(stdout);
|
||||
}
|
||||
|
||||
acl_vstream_close(conn);
|
||||
}
|
||||
|
||||
static void wait_and_sendfile(int in, struct FIBER_CTX *fc)
|
||||
{
|
||||
const char *addr = "127.0.0.1:8080";
|
||||
ACL_VSTREAM *ln = acl_vstream_listen(addr, 128), *conn;
|
||||
ACL_VSTREAM *ln = acl_vstream_listen(fc->addr, 128), *conn;
|
||||
ssize_t ret;
|
||||
off_t off_saved = fc->off;
|
||||
int cfd;
|
||||
if (ln == NULL) {
|
||||
printf("listen %s error %s\r\n", addr, strerror(errno));
|
||||
printf("listen %s error %s\r\n", fc->addr, strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
acl_fiber_create(fiber_reader, fc, 320000);
|
||||
|
||||
while (1) {
|
||||
printf("Waiting for accept from %s ...\r\n", addr);
|
||||
printf("Waiting for accept from %s ...\r\n", fc->addr);
|
||||
|
||||
conn = acl_vstream_accept(ln, NULL, 0);
|
||||
if (conn == NULL) {
|
||||
printf("accept from %s error %s\r\n", addr, strerror(errno));
|
||||
printf("accept from %s error %s\r\n",
|
||||
fc->addr, strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
@ -127,6 +293,7 @@ static void wait_and_sendfile(int in, struct FIBER_CTX *fc)
|
||||
printf(">>>begin call sendfile64 to fd=%d\r\n", cfd);
|
||||
|
||||
ret = sendfile64(cfd, in, &fc->off, fc->len);
|
||||
printf(">>>begin to close cfd=%d\r\n", cfd);
|
||||
close(cfd);
|
||||
|
||||
printf(">>>sendfile ret=%zd, off=%d\r\n", ret, (int) fc->off);
|
||||
@ -260,7 +427,7 @@ static void usage(const char *proc)
|
||||
printf("usage: %s -h [help]\r\n"
|
||||
" -f filepath\r\n"
|
||||
" -t tofilepath\r\n"
|
||||
" -a action[read|write|rename|unlink|stat|mkdir|splice|pread|pwrite|sendfile]\r\n"
|
||||
" -a action[read|write|rename|unlink|stat|mkdir|splice|pread|pwrite|sendfile|co_pwrite]\r\n"
|
||||
" -n size[default: 1024]\r\n"
|
||||
" -o open_flags[O_RDONLY, O_WRONLY, O_RDWR, O_APPEND, O_CREAT, O_EXCL, O_TRUNC]\r\n"
|
||||
" -p offset\r\n"
|
||||
@ -281,6 +448,7 @@ int main(int argc, char *argv[])
|
||||
ctx.topath = buf2;
|
||||
ctx.off = 0;
|
||||
ctx.len = 100;
|
||||
snprintf(ctx.addr, sizeof(ctx.addr), "127.0.0.1|8080");
|
||||
|
||||
#define EQ(x, y) !strcasecmp((x), (y))
|
||||
|
||||
@ -337,7 +505,7 @@ int main(int argc, char *argv[])
|
||||
return 1;
|
||||
}
|
||||
|
||||
//acl_fiber_msg_stdout_enable(1);
|
||||
acl_fiber_msg_stdout_enable(1);
|
||||
acl_msg_stdout_enable(1);
|
||||
|
||||
if (__open_flags == 0) {
|
||||
@ -360,6 +528,10 @@ int main(int argc, char *argv[])
|
||||
acl_fiber_create(fiber_splice, &ctx, 320000);
|
||||
} else if (EQ(action, "pread")) {
|
||||
acl_fiber_create(fiber_pread, &ctx, 320000);
|
||||
} else if (EQ(action, "pwrite")) {
|
||||
acl_fiber_create(fiber_pwrite, &ctx, 320000);
|
||||
} else if (EQ(action, "co_pwrite")) {
|
||||
acl_fiber_create(fiber_co_pwrite, &ctx, 320000);
|
||||
} else if (EQ(action, "sendfile")) {
|
||||
acl_fiber_create(fiber_sendfile, &ctx, 320000);
|
||||
} else {
|
||||
@ -369,7 +541,6 @@ int main(int argc, char *argv[])
|
||||
|
||||
acl_fiber_schedule_with(FIBER_EVENT_IO_URING);
|
||||
|
||||
printf("Enter any key to exit ...\r\n");
|
||||
getchar();
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,10 @@ echo "enter any key to test unlink..."
|
||||
read n
|
||||
./file -f to.txt -a unlink
|
||||
|
||||
echo "enter any key to test pwrite..."
|
||||
read n
|
||||
./file -f from.txt -a pwrite -p 500 -n 256
|
||||
|
||||
echo "enter any key to test mkdir..."
|
||||
read n
|
||||
./file -f "a/b/c/d" -a mkdir
|
||||
|
Loading…
Reference in New Issue
Block a user