optimize and test io_uring module in fiber

This commit is contained in:
zhengshuxin 2022-10-12 05:55:50 -04:00
parent 61c6053c29
commit 4a2f036f7a
8 changed files with 315 additions and 71 deletions

View File

@ -39,7 +39,6 @@ static void add_read_wait(EVENT_URING *ep, FILE_EVENT *fe, int tmo_ms)
io_uring_sqe_set_data(sqe, fe);
sqe->flags = IOSQE_IO_LINK;
file_event_refer(fe);
TRY_SUBMMIT(ep);
fe->rts.tv_sec = tmo_ms / 1000;
@ -47,9 +46,7 @@ static void add_read_wait(EVENT_URING *ep, FILE_EVENT *fe, int tmo_ms)
sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_link_timeout(sqe, &fe->rts, 0);
io_uring_sqe_set_data(sqe, fe);
file_event_refer(fe);
TRY_SUBMMIT(ep);
}
@ -85,31 +82,53 @@ static int event_uring_add_read(EVENT_URING *ep, FILE_EVENT *fe)
return 0;
}
static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe)
static void add_write_wait(EVENT_URING *ep, FILE_EVENT *fe, int tmo_ms)
{
struct io_uring_sqe *sqe;
sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_poll_add(sqe, fe->fd, POLLOUT | POLLHUP | POLLERR);
io_uring_sqe_set_data(sqe, fe);
sqe->flags = IOSQE_IO_LINK;
TRY_SUBMMIT(ep);
fe->wts.tv_sec = tmo_ms / 1000;
fe->wts.tv_nsec = (((long long) tmo_ms) % 1000) * 1000000;
sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_link_timeout(sqe, &fe->wts, 0);
TRY_SUBMMIT(ep);
}
static int event_uring_add_write(EVENT_URING *ep, FILE_EVENT *fe)
{
if (fe->mask & EVENT_WRITE) {
return 0;
}
fe->mask |= EVENT_WRITE;
sqe = io_uring_get_sqe(&ep->ring);
assert(sqe);
io_uring_sqe_set_data(sqe, fe);
if (fe->mask & EVENT_CONNECT) {
if (fe->mask & EVENT_POLLOUT) {
add_write_wait(ep, fe, fe->r_timeout);
} else if (fe->mask & EVENT_CONNECT) {
non_blocking(fe->fd, 1);
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_connect(sqe, fe->fd,
(struct sockaddr*) &fe->peer_addr,
(socklen_t) fe->addr_len);
io_uring_sqe_set_data(sqe, fe);
TRY_SUBMMIT(ep);
} else {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ep->ring);
io_uring_prep_write(sqe, fe->fd, fe->wbuf, fe->wsize, 0);
io_uring_sqe_set_data(sqe, fe);
TRY_SUBMMIT(ep);
}
if (++ep->appending >= ep->sqe_size) {
ep->appending = 0;
io_uring_submit(&ep->ring);
}
return 0;
}
@ -133,24 +152,29 @@ static int event_uring_del_write(EVENT_URING *ep UNUSED, FILE_EVENT *fe)
return 0;
}
#define ERR (POLLERR | POLLHUP | POLLNVAL)
static void handle_read(EVENT *ev, FILE_EVENT *fe, int res)
{
if (fe->mask & EVENT_ACCEPT) {
fe->iocp_sock = res;
} else if (fe->mask & EVENT_POLLIN) {
if (res == -ETIME) {
printf("fd=%d timeout, fe=%p\r\n", fe->fd, fe);
file_event_unrefer(fe);
return;
} else if (res == -ECANCELED) {
printf("fd=%d canceled, fe=%p\n", fe->fd, fe);
file_event_unrefer(fe);
return;
} else if (res & POLLIN) {
if (res & (POLLIN | ERR)) {
if (res & POLLERR) {
fe->mask |= EVENT_ERR;
}
if (res & POLLHUP) {
fe->mask |= EVENT_HUP;
}
if (res & POLLNVAL) {
fe->mask |= EVENT_NVAL;;
}
fe->mask &= ~EVENT_POLLIN;
CLR_READWAIT(fe);
} else {
printf("unknown res=%d, fd=%d\n", res, fe->fd);
msg_error("%s(%d): unknown res=%d, fd=%d",
__FUNCTION__, __LINE__, res, fe->fd);
}
} else {
fe->rlen = res;
@ -165,26 +189,31 @@ static void handle_write(EVENT *ev, FILE_EVENT *fe, int res)
if (fe->mask & EVENT_CONNECT) {
fe->iocp_sock = res;
} else if (fe->mask & EVENT_POLLOUT) {
if (res == -ETIME) {
printf("fd=%d timeout, fe=%p\r\n", fe->fd, fe);
file_event_unrefer(fe);
return;
} else if (res == -ECANCELED) {
printf("fd=%d canceled, fe=%p\n", fe->fd, fe);
file_event_unrefer(fe);
return;
} else if (res & POLLIN) {
if (res & (POLLOUT | ERR)) {
if (res & POLLERR) {
fe->mask |= EVENT_ERR;
}
if (res & POLLHUP) {
fe->mask |= EVENT_HUP;
}
if (res & POLLNVAL) {
fe->mask |= EVENT_NVAL;;
}
fe->mask &= ~EVENT_POLLOUT;
CLR_WRITEWAIT(fe);
} else {
printf("unknown res=%d, fd=%d\n", res, fe->fd);
msg_error("%s(%d): unknown res=%d, fd=%d",
__FUNCTION__, __LINE__, res, fe->fd);
}
} else {
fe->wlen = res;
}
fe->mask &= ~EVENT_WRITE;
fe->w_proc(ev, fe);
if (fe->w_proc) {
fe->w_proc(ev, fe);
}
}
static int event_uring_wait(EVENT *ev, int timeout)
@ -212,8 +241,6 @@ static int event_uring_wait(EVENT *ev, int timeout)
}
while (1) {
cqe = NULL;
if (count > 0) {
ret = io_uring_peek_cqe(&ep->ring, &cqe);
} else {
@ -245,6 +272,13 @@ static int event_uring_wait(EVENT *ev, int timeout)
return -1;
}
if (res == -ETIME || res == -ECANCELED || fe == NULL) {
continue;
} else if (res < 0) {
msg_error("%s(%d): some other error=%d, %s",
__FUNCTION__, __LINE__, -res, strerror(-res));
}
//usleep(100000);
if ((fe->mask & EVENT_READ) && fe->r_proc) {

View File

@ -200,6 +200,7 @@ static void fiber_io_loop(ACL_FIBER *self fiber_unused, void *ctx)
event_process(ev, left > 0 ? (int) left + 1 : (int) left);
if (__thread_fiber->io_stop) {
msg_info("%s(%d): io_stop set!", __FUNCTION__, __LINE__);
break;
}
@ -214,6 +215,8 @@ static void fiber_io_loop(ACL_FIBER *self fiber_unused, void *ctx)
continue;
} else if (ring_size(&ev->events) > 0) {
continue;
} else if (__thread_fiber->io_count > 0) {
continue;
}
// only sleep fiber alive ?
@ -247,7 +250,7 @@ static void fiber_io_loop(ACL_FIBER *self fiber_unused, void *ctx)
}
if (__thread_fiber->io_count > 0) {
msg_info("%s(%d), %s: waiting io: %d", __FILE__, __LINE__,
msg_warn("%s(%d), %s: waiting io: %d", __FILE__, __LINE__,
__FUNCTION__, (int) __thread_fiber->io_count);
}

View File

@ -283,7 +283,7 @@ static socket_t fiber_iocp_connect(FILE_EVENT *fe)
fe->mask &= ~EVENT_WRITE;
fe->mask |= EVENT_CONNECT;
if (fiber_wait_read(fe) < 0) {
if (fiber_wait_write(fe) < 0) {
fe->mask &= ~EVENT_CONNECT;
msg_error("%s(%d): fiber_wait_write rrror=%s, fd=%d",
__FUNCTION__, __LINE__, last_serror(), (int) fe->fd);
@ -293,9 +293,9 @@ static socket_t fiber_iocp_connect(FILE_EVENT *fe)
fe->mask &= ~EVENT_CONNECT;
if (fe->iocp_sock < 0) {
acl_fiber_set_error(-fe->iocp_sock);
return INVALID_SOCKET;
return -1;
}
return fe->fd;
return 0;
}
#endif
@ -330,12 +330,6 @@ int WINAPI acl_fiber_connect(socket_t sockfd, const struct sockaddr *addr,
fe->addr_len = addrlen;
#endif
#if defined(HAS_IO_URING)
if (EVENT_IS_IO_URING(fiber_io_event())) {
return fiber_iocp_connect(fe);
}
#endif
// The socket must be set to in no blocking status to avoid to be
// blocked by the sys_connect API. If sys_connect returns an error
// which is FIBER_EINPROGRESS or FIBER_EAGAIN and the original status
@ -347,6 +341,16 @@ int WINAPI acl_fiber_connect(socket_t sockfd, const struct sockaddr *addr,
non_blocking(sockfd, NON_BLOCKING);
}
#if defined(HAS_IO_URING)
// For IO_URING event, if the socket hasn't been set non-block,
// we should use io_uring to connect the server in block mode,
// else we should use connect system API in non-block, so the
// user can poll waiting for writable to check the connection is ok.
if (EVENT_IS_IO_URING(fiber_io_event()) && !nblock) {
return fiber_iocp_connect(fe);
}
#endif
#ifdef HAS_IOCP
if (EVENT_IS_IOCP(fiber_io_event())) {
EVENT *ev = fiber_io_event();

View File

@ -145,7 +145,7 @@ OUT_PATH = .
OBJ_PATH = $(OUT_PATH)
#Project's objs
SRC = $(wildcard *.c)
SRC = $(wildcard *.c) $(wildcard ../*.c)
OBJ = $(patsubst %.c, $(OBJ_PATH)/%.o, $(notdir $(SRC)))
###########################################################
@ -162,6 +162,8 @@ all: RM $(OBJ)
@echo ""
$(OBJ_PATH)/%.o: %.c
$(COMPILE) $< -o $@
$(OBJ_PATH)/%.o: ../%.c
$(COMPILE) $< -o $@
RM:
rm -f $(PROG)
clean cl:

View File

@ -10,12 +10,17 @@
#include "lib_acl.h"
#include "fiber/libfiber.h"
#include "stamp.h"
#include "../patch.h"
#if defined(_WIN32) || defined(_WIN64)
# define snprintf _snprintf
# define CONNECT acl_fiber_connect
# define CLOSE acl_fiber_close
#else
# define SOCKET int
# define INVALID_SOCKET -1
# define CONNECT connect
# define CLOSE close
#endif
static char __server_ip[64];
@ -25,8 +30,11 @@ static long long int __total_count = 0;
static int __total_clients = 0;
static int __total_error_clients = 0;
static int __show_max = 10;
static int __show_count = 0;
static int __fiber_delay = 0;
static int __conn_timeout = 0;
static int __conn_timeout = -1;
static int __io_timeout = -1;
static int __max_loop = 10000;
static int __max_fibers = 100;
static int __left_fibers = 100;
@ -34,6 +42,39 @@ static int __read_data = 1;
static int __stack_size = 32000;
static struct timeval __begin;
static int check_write(SOCKET fd, int timeout)
{
struct pollfd pfd;
int n;
memset(&pfd, 0, sizeof(struct pollfd));
pfd.fd = fd;
pfd.events = POLLOUT;
n = poll(&pfd, 1, timeout);
if (n < 0) {
printf("poll error: %s\r\n", acl_last_serror());
return -1;
}
if (n == 0) {
return 0;
}
if (pfd.revents & POLLERR) {
printf(">>>POLLERR, fd=%d\r\n", fd);
return -1;
} else if (pfd.revents & POLLHUP) {
printf(">>>POLLHUP, fd=%d\r\n", fd);
return -1;
} else if (pfd.revents & POLLOUT) {
return 1;
} else {
printf(">>>poll return n=%d write no ready,fd=%d, pfd=%p\n", n, fd, &pfd);
return 0;
}
}
static void echo_client(SOCKET fd)
{
#define BUF_SIZE 8192
@ -72,7 +113,8 @@ static void echo_client(SOCKET fd)
printf("read error: %s\r\n", acl_last_serror());
break;
}
if (i < 10) {
if (++__show_count < __show_max) {
buf[ret] = 0;
printf("%s", buf);
fflush(stdout);
@ -89,7 +131,7 @@ static void echo_client(SOCKET fd)
#endif
}
static void fiber_connect(ACL_FIBER *fiber acl_unused, void *ctx acl_unused)
static SOCKET start_connect(void)
{
SOCKET fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in sa;
@ -106,19 +148,42 @@ static void fiber_connect(ACL_FIBER *fiber acl_unused, void *ctx acl_unused)
acl_fiber_delay(__fiber_delay);
}
#if defined(_WIN32) || defined(_WIN64)
if (acl_fiber_connect(fd, (const struct sockaddr *) &sa, len) < 0) {
acl_fiber_close(fd);
#else
if (connect(fd, (const struct sockaddr *) &sa, len) < 0) {
close(fd);
#endif
if (__conn_timeout > 0) {
set_non_blocking(fd, 1);
}
int ret = CONNECT(fd, (const struct sockaddr *) &sa, len);
if (ret == 0) {
return fd;
}
printf("%s: ret=%d, errno=%d, %s\n", __FUNCTION__, ret, errno, strerror(errno));
if (acl_fiber_last_error() != FIBER_EINPROGRESS) {
CLOSE(fd);
return INVALID_SOCKET;
}
printf("%s: WAITING FOR CONNECTING READY, fd=%d\r\n", __FUNCTION__, fd);
if (check_write(fd, __conn_timeout) <= 0) {
CLOSE(fd);
return INVALID_SOCKET;
} else {
return fd;
}
}
static void fiber_connect(ACL_FIBER *fiber acl_unused, void *ctx acl_unused)
{
SOCKET fd = start_connect();
if (fd == INVALID_SOCKET) {
__total_error_clients++;
printf("fiber-%d: connect %s:%d error %s\r\n",
acl_fiber_self(), __server_ip, __server_port,
acl_last_serror());
__total_error_clients++;
exit (1);
} else {
__total_clients++;
printf("fiber-%d: connect %s:%d ok, clients: %d, fd: %d\r\n",
@ -152,6 +217,8 @@ static void fiber_main(ACL_FIBER *fiber acl_unused, void *ctx acl_unused)
{
int i;
sleep(1); // just waiting for the IO event fiber to run first
for (i = 0; i < __max_fibers; i++) {
acl_fiber_create(fiber_connect, NULL, __stack_size);
}
@ -164,11 +231,14 @@ static void usage(const char *procname)
" -s server_ip\r\n"
" -p server_port\r\n"
" -t connt_timeout\r\n"
" -r io_timeout\r\n"
" -c max_fibers\r\n"
" -S [if using single IO, dafault: no]\r\n"
" -d fiber_delay_ms\r\n"
" -z stack_size\r\n"
" -n max_loop\r\n", procname);
" -n max_loop\r\n"
" -m show_max\r\n"
, procname);
}
static void test_time(void)
@ -197,7 +267,7 @@ int main(int argc, char *argv[])
snprintf(__server_ip, sizeof(__server_ip), "%s", "127.0.0.1");
while ((ch = getopt(argc, argv, "hc:n:s:p:t:Sd:z:e:")) > 0) {
while ((ch = getopt(argc, argv, "hc:n:s:p:t:r:Sd:z:e:m:")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
@ -209,6 +279,9 @@ int main(int argc, char *argv[])
case 't':
__conn_timeout = atoi(optarg);
break;
case 'r':
__io_timeout = atoi(optarg);
break;
case 'n':
__max_loop = atoi(optarg);
break;
@ -227,6 +300,9 @@ int main(int argc, char *argv[])
case 'z':
__stack_size = atoi(optarg);
break;
case 'm':
__show_max = atoi(optarg);
break;
case 'e':
if (strcasecmp(optarg, "select") == 0) {
event_mode = FIBER_EVENT_SELECT;

View File

@ -7,6 +7,7 @@
#if defined(_WIN32) || defined(_WIN64)
#include <winsock2.h>
#else
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
#include <netinet/in.h>
@ -121,3 +122,81 @@ SOCKET socket_connect(const char *ip, int port)
}
return fd;
}
#if defined(_WIN32) || defined(_WIN64)
int set_non_blocking(SOCKET fd, int on)
{
unsigned long n = on;
int flags = 0;
if (ioctlsocket(fd, FIONBIO, &n) < 0) {
msg_error("ioctlsocket(fd,FIONBIO) failed");
return -1;
}
return flags;
}
int socket_is_non_blocking(SOCKET fd)
{
printf("%s: Not support, fd=%d\r\n", __FUNCTION__, fd);
return -1;
}
#else
# ifndef O_NONBLOCK
# define PATTERN FNDELAY
# else
# define PATTERN O_NONBLOCK
# endif
int set_non_blocking(SOCKET fd, int on)
{
int flags;
int nonb = PATTERN;
/*
** NOTE: consult ALL your relevant manual pages *BEFORE* changing
** these ioctl's. There are quite a few variations on them,
** as can be seen by the PCS one. They are *NOT* all the same.
** Heed this well. - Avalon.
*/
#ifdef NBLOCK_POSIX
nonb |= O_NONBLOCK;
#endif
#ifdef NBLOCK_BSD
nonb |= O_NDELAY;
#endif
if ((flags = fcntl(fd, F_GETFL)) == -1) {
printf("%s(%d), %s: fcntl(%d, F_GETFL) error: %s\r\n",
__FILE__, __LINE__, __FUNCTION__,
fd, acl_fiber_last_serror());
return -1;
}
if (fcntl(fd, F_SETFL, on ? flags | nonb : flags & ~nonb) < 0) {
printf("%s(%d), %s: fcntl(%d, F_SETL, nonb) error: %s\r\n",
__FILE__, __LINE__, __FUNCTION__,
fd, acl_fiber_last_serror());
return -1;
}
return (flags & PATTERN) ? 1 : 0;
}
int socket_is_non_blocking(SOCKET fd)
{
int flags;
if ((flags = fcntl(fd, F_GETFL)) == -1) {
printf("%s(%d), %s: fcntl(%d, F_GETFL) error: %s\r\n",
__FILE__, __LINE__, __FUNCTION__,
fd, acl_fiber_last_serror());
return 0;
}
return (flags & PATTERN) ? 1 : 0;
}
#endif // !_WIN32 && !_WIN64

View File

@ -23,6 +23,8 @@ void socket_close(SOCKET fd);
SOCKET socket_listen(const char *ip, int port);
SOCKET socket_accept(SOCKET fd);
SOCKET socket_connect(const char *ip, int port);
int set_non_blocking(SOCKET fd, int on);
int socket_is_non_blocking(SOCKET fd);
#ifdef __cplusplus
}

View File

@ -32,7 +32,8 @@ static int __socket_count = 0;
static char __listen_ip[64];
static int __listen_port = 9001;
static int __listen_qlen = 64;
static int __rw_timeout = 0;
static int __read_timeout = -1;
static int __write_timeout = -1;
static int __echo_data = 1;
static int check_read(int fd, int timeout)
@ -61,6 +62,32 @@ static int check_read(int fd, int timeout)
}
}
static int check_write(int fd, int timeout)
{
struct pollfd pfd;
int n;
memset(&pfd, 0, sizeof(struct pollfd));
pfd.fd = fd;
pfd.events = POLLOUT;
n = POLL(&pfd, 1, timeout);
if (n < 0) {
printf("poll error: %s\r\n", acl_last_serror());
return -1;
}
if (n == 0) {
return 0;
}
if (pfd.revents & POLLOUT) {
return 1;
} else {
printf(">>>poll return n=%d write no ready,fd=%d, pfd=%p\n", n, fd, &pfd);
return 0;
}
}
static void echo_client(ACL_FIBER *fiber acl_unused, void *ctx)
{
SOCKET *pfd = (SOCKET *) ctx;
@ -72,10 +99,11 @@ static void echo_client(ACL_FIBER *fiber acl_unused, void *ctx)
//printf("client fiber-%d: fd: %d\r\n", acl_fiber_self(), fd);
while (1) {
if (__rw_timeout > 0) {
ret = check_read(fd, __rw_timeout * 1000);
if (ret < 0)
if (__read_timeout > 0) {
ret = check_read(fd, __read_timeout * 1000);
if (ret < 0) {
break;
}
if (ret == 0) {
printf("read timeout fd=%u\r\n", fd);
break;
@ -104,24 +132,36 @@ static void echo_client(ACL_FIBER *fiber acl_unused, void *ctx)
//buf[ret] = 0; printf("buf=%s\r\n", buf);
__count++;
if (!__echo_data)
if (!__echo_data) {
continue;
}
if (__write_timeout > 0) {
int n = check_write(fd, __write_timeout * 1000);
if (n < 0) {
break;
}
if (n == 0) {
printf("write wait timeout, fd=%d\r\n", fd);
break;
}
}
#if defined(_WIN32) || defined(_WIN64)
if (acl_fiber_send(fd, buf, ret, 0) < 0) {
#else
if (write(fd, buf, ret) < 0) {
#endif
if (errno == EINTR)
if (errno == EINTR) {
continue;
}
printf("write error, fd: %d\r\n", fd);
break;
}
}
__socket_count--;
printf("%s: close %d, socket_count=%d\r\n",
__FUNCTION__, fd, __socket_count);
printf("%s: close %d, socket_count=%d\r\n", __FUNCTION__, fd, __socket_count);
CLOSE(fd);
free(pfd);
@ -217,7 +257,8 @@ static void usage(const char *procname)
" -e event_mode [kernel|select|poll|io_uring]\r\n"
" -s listen_ip\r\n"
" -p listen_port\r\n"
" -r rw_timeout\r\n"
" -r read_timeout\r\n"
" -w write_timeout\r\n"
" -q listen_queue\r\n"
" -z stack_size\r\n"
" -Z [if use shared stack]\r\n"
@ -233,7 +274,7 @@ int main(int argc, char *argv[])
acl_fiber_attr_init(&fiber_attr);
snprintf(__listen_ip, sizeof(__listen_ip), "%s", "127.0.0.1");
while ((ch = getopt(argc, argv, "hs:p:r:q:Sz:Ze:")) > 0) {
while ((ch = getopt(argc, argv, "hs:p:r:w:q:Sz:Ze:")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
@ -245,7 +286,10 @@ int main(int argc, char *argv[])
__listen_port = atoi(optarg);
break;
case 'r':
__rw_timeout = atoi(optarg);
__read_timeout = atoi(optarg);
break;
case 'w':
__write_timeout = atoi(optarg);
break;
case 'q':
__listen_qlen = atoi(optarg);