Use poll wait for handling IO readwrite timeout.

This commit is contained in:
zhengshuxin 2024-07-28 09:46:59 +08:00
parent 43cc154927
commit 22d4e2265c
8 changed files with 102 additions and 43 deletions

View File

@ -27,8 +27,8 @@ CFLAGS = -c -g -W \
-DUSE_FAST_RING \
-O3 \
-DUSE_CLOCK_GETTIME \
-DDEBUG_READY \
#-DUSE_VALGRIND \
#-DDEBUG_READY \
#-DUSE_INLINE_MEMCPY\
#-DUSE_FAST_TIME \
#-Waggregate-return \

View File

@ -86,12 +86,12 @@ EVENT *event_create(int size)
SET_TIME(ev->stamp); // init the event's stamp when create each event
#ifdef HAS_POLL
ev->poll_list = timer_cache_create();
ev->poll_timer = timer_cache_create();
ring_init(&ev->poll_ready);
#endif
#ifdef HAS_EPOLL
ev->epoll_list = timer_cache_create();
ev->epoll_timer = timer_cache_create();
ring_init(&ev->epoll_ready);
#endif
return ev;
@ -109,9 +109,9 @@ acl_handle_t event_handle(EVENT *ev)
void event_free(EVENT *ev)
{
timer_cache_free(ev->poll_list);
timer_cache_free(ev->poll_timer);
#ifdef HAS_EPOLL
timer_cache_free(ev->epoll_list);
timer_cache_free(ev->epoll_timer);
#endif
ev->free(ev);
@ -451,15 +451,15 @@ static void event_process_poll(EVENT *ev)
RING *head;
POLL_EVENT *pe;
long long now = event_get_stamp(ev);
TIMER_CACHE_NODE *node = TIMER_FIRST(ev->poll_list), *next;
TIMER_CACHE_NODE *node = TIMER_FIRST(ev->poll_timer), *next;
/* Check and call all the pe's callback which was timeout except the
* pe which has been ready and been removed from ev->poll_list. The
* pe which has been ready and been removed from ev->poll_timer. The
* removing operations are in read_callback or write_callback in the
* hook/poll.c.
*/
while (node && node->expire >= 0 && node->expire <= now) {
next = TIMER_NEXT(ev->poll_list, node);
next = TIMER_NEXT(ev->poll_timer, node);
// Call all the pe's callback with the same expire time.
ring_foreach(iter, &node->ring) {
@ -486,10 +486,10 @@ static void event_process_epoll(EVENT *ev)
RING *head;
EPOLL_EVENT *ee;
long long now = event_get_stamp(ev);
TIMER_CACHE_NODE *node = TIMER_FIRST(ev->epoll_list), *next;
TIMER_CACHE_NODE *node = TIMER_FIRST(ev->epoll_timer), *next;
while (node && node->expire >= 0 && node->expire <= now) {
next = TIMER_NEXT(ev->epoll_list, node);
next = TIMER_NEXT(ev->epoll_timer, node);
ring_foreach(iter, &node->ring) {
ee = TO_APPL(iter.ptr, EPOLL_EVENT, me);

View File

@ -395,12 +395,12 @@ struct EVENT {
#define EVENT_IS_IO_URING(x) ((x)->flag & EVENT_F_IO_URING)
#ifdef HAS_POLL
TIMER_CACHE *poll_list;
TIMER_CACHE *poll_timer;
RING poll_ready;
#endif
#ifdef HAS_EPOLL
TIMER_CACHE *epoll_list;
TIMER_CACHE *epoll_timer;
RING epoll_ready;
#endif

View File

@ -446,21 +446,25 @@ size_t acl_fiber_sleep(size_t seconds)
/****************************************************************************/
#ifndef USE_TIMER
static int timed_wait(socket_t fd, int delay, int oper)
{
struct pollfd fds;
fds.events = oper;
fds.fd = fd;
for (;;) {
switch (acl_fiber_poll(&fds, 1, delay)) {
#ifdef SYS_WIN
# ifdef SYS_WIN
case SOCKET_ERROR:
#else
# else
case -1:
#endif
# endif
if (acl_fiber_last_error() == FIBER_EINTR) {
continue;
}
break;
case 0:
return 0;
default:
@ -481,6 +485,7 @@ static int timed_wait(socket_t fd, int delay, int oper)
}
}
}
#endif // !USE_TIMER
static void read_callback(EVENT *ev, FILE_EVENT *fe)
{
@ -583,7 +588,9 @@ int fiber_wait_read(FILE_EVENT *fe)
event_del_read(__thread_fiber->event, fe, 1);
acl_fiber_set_error(curr->errnum);
return -1;
} else if (curr->flag & FIBER_F_TIMER) {
}
#ifdef USE_TIMER
else if (curr->flag & FIBER_F_TIMER) {
// If the IO reading timeout set in setsockopt.
// Clear FIBER_F_TIMER flag been set in wakeup_timers.
curr->flag &= ~FIBER_F_TIMER;
@ -595,6 +602,7 @@ int fiber_wait_read(FILE_EVENT *fe)
acl_fiber_set_error(FIBER_EAGAIN);
return -1;
}
#endif
// else: the IO read event should has been removed in read_callback.
return ret;
@ -673,7 +681,9 @@ int fiber_wait_write(FILE_EVENT *fe)
event_del_write(__thread_fiber->event, fe, 1);
acl_fiber_set_error(curr->errnum);
return -1;
} else if (curr->flag & FIBER_F_TIMER) {
}
#ifdef USE_TIMER
else if (curr->flag & FIBER_F_TIMER) {
curr->flag &= ~FIBER_F_TIMER;
event_del_write(__thread_fiber->event, fe, 1);
@ -681,6 +691,7 @@ int fiber_wait_write(FILE_EVENT *fe)
acl_fiber_set_error(FIBER_EAGAIN);
return -1;
}
#endif
return ret;
}

View File

@ -459,7 +459,7 @@ static void read_callback(EVENT *ev, FILE_EVENT *fe)
memcpy(&ee->events[ee->nready].data, &epx->data, sizeof(epx->data));
if (ee->nready == 0) {
timer_cache_remove(ev->epoll_list, ee->expire, &ee->me);
timer_cache_remove(ev->epoll_timer, ee->expire, &ee->me);
ring_prepend(&ev->epoll_ready, &ee->me);
}
@ -494,7 +494,7 @@ static void write_callback(EVENT *ev fiber_unused, FILE_EVENT *fe)
memcpy(&ee->events[ee->nready].data, &epx->data, sizeof(epx->data));
if (ee->nready == 0) {
timer_cache_remove(ev->epoll_list, ee->expire, &ee->me);
timer_cache_remove(ev->epoll_timer, ee->expire, &ee->me);
ring_prepend(&ev->epoll_ready, &ee->me);
}
@ -681,7 +681,7 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
event_epoll_set(ev, ee, timeout);
while (1) {
timer_cache_add(ev->epoll_list, ee->expire, &ee->me);
timer_cache_add(ev->epoll_timer, ee->expire, &ee->me);
ee->fiber->wstatus |= FIBER_WAIT_EPOLL;
@ -692,7 +692,7 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
ee->fiber->wstatus &= ~FIBER_WAIT_EPOLL;
if (ee->nready == 0) {
timer_cache_remove(ev->epoll_list, ee->expire, &ee->me);
timer_cache_remove(ev->epoll_timer, ee->expire, &ee->me);
}
ev->timeout = old_timeout;
@ -703,13 +703,12 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
ee->nready = -1;
}
msg_info("%s(%d), %s: fiber-%u was killed",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(ee->fiber));
//msg_info("%s(%d), %s: fiber-%u was killed",
// __FILE__, __LINE__, __FUNCTION__, acl_fiber_id(ee->fiber));
break;
}
if (timer_cache_size(ev->epoll_list) == 0) {
if (timer_cache_size(ev->epoll_timer) == 0) {
ev->timeout = -1;
}

View File

@ -53,16 +53,16 @@ static void handle_poll_read(EVENT *ev, FILE_EVENT *fe, POLLFD *pfd)
/*
* If any fe has been ready, the pe holding fe should be removed from
* ev->poll_list to avoid to be called in timeout process.
* ev->poll_timer to avoid to be called in timeout process.
* We should just remove pe only once by checking if the value of
* pe->nready is 0. After the pe has been removed from the
* ev->poll_list, the pe's callback will not be called again in the
* ev->poll_timer, the pe's callback will not be called again in the
* timeout process in event_process_poll() in event.c.
*/
if (pfd->pe->nready == 0) {
/* The cache timer has been be set when timeout >= 0. */
if (pfd->pe->expire >= 0) {
timer_cache_remove(ev->poll_list, pfd->pe->expire,
timer_cache_remove(ev->poll_timer, pfd->pe->expire,
&pfd->pe->me);
}
ring_prepend(&ev->poll_ready, &pfd->pe->me);
@ -119,7 +119,7 @@ static void handle_poll_write(EVENT *ev, FILE_EVENT *fe, POLLFD *pfd)
if (pfd->pe->nready == 0) {
if (pfd->pe->expire >= 0) {
timer_cache_remove(ev->poll_list, pfd->pe->expire,
timer_cache_remove(ev->poll_timer, pfd->pe->expire,
&pfd->pe->me);
}
ring_prepend(&ev->poll_ready, &pfd->pe->me);
@ -375,7 +375,7 @@ int WINAPI acl_fiber_poll(struct pollfd *fds, nfds_t nfds, int timeout)
while (1) {
/* The cache timer should be set when timeout >= 0. */
if (pe->expire >= 0) {
timer_cache_add(ev->poll_list, pe->expire, &pe->me);
timer_cache_add(ev->poll_timer, pe->expire, &pe->me);
}
pe->nready = 0;
@ -388,7 +388,7 @@ int WINAPI acl_fiber_poll(struct pollfd *fds, nfds_t nfds, int timeout)
pe->fiber->wstatus &= ~FIBER_WAIT_POLL;
if (pe->nready == 0 && pe->expire >= 0) {
timer_cache_remove(ev->poll_list, pe->expire, &pe->me);
timer_cache_remove(ev->poll_timer, pe->expire, &pe->me);
}
ev->timeout = old_timeout;
@ -401,7 +401,7 @@ int WINAPI acl_fiber_poll(struct pollfd *fds, nfds_t nfds, int timeout)
break;
}
if (timer_cache_size(ev->poll_list) == 0) {
if (timer_cache_size(ev->poll_timer) == 0) {
ev->timeout = -1;
}

View File

@ -15,6 +15,7 @@ static void echo_client(ACL_FIBER *fiber acl_unused, void *ctx)
ACL_VSTREAM *cstream = (ACL_VSTREAM *) ctx;
char buf[8192];
int ret, count = 0;
time_t stamp = time(NULL);
if (!__setsockopt_timeout) {
cstream->rw_timeout = __rw_timeout;
@ -25,13 +26,12 @@ static void echo_client(ACL_FIBER *fiber acl_unused, void *ctx)
#if defined(__APPLE__) || defined(_WIN32) || defined(_WIN64)
if (setsockopt(ACL_VSTREAM_SOCK(cstream), SOL_SOCKET,
SO_RCVTIMEO, &__rw_timeout, sizeof(__rw_timeout)) < 0) {
SO_RCVTIMEO, (char*) &__rw_timeout, sizeof(__rw_timeout)) < 0) {
#else
if (setsockopt(ACL_VSTREAM_SOCK(cstream), SOL_SOCKET,
SO_RCVTIMEO, &tm, sizeof(tm)) < 0) {
SO_RCVTIMEO, &tm, sizeof(tm)) < 0) {
#endif
printf("%s: setsockopt error: %s\r\n",
__FUNCTION__, acl_last_serror());
printf("%s: setsockopt error: %s\r\n", __FUNCTION__, acl_last_serror());
}
}
@ -40,8 +40,14 @@ static void echo_client(ACL_FIBER *fiber acl_unused, void *ctx)
while (1) {
ret = acl_vstream_gets(cstream, buf, sizeof(buf) - 1);
if (ret == ACL_VSTREAM_EOF) {
printf("gets error: %s, fd: %d, count: %d\r\n",
acl_last_serror(), SOCK(cstream), count);
printf("gets error: %s, fd: %d, count: %d, cost: %ld\r\n",
acl_last_serror(), SOCK(cstream), count, time(NULL) - stamp);
stamp = time(NULL);
if (errno == EAGAIN) {
continue;
}
break;
}
buf[ret] = 0;
@ -66,6 +72,7 @@ static void echo_client(ACL_FIBER *fiber acl_unused, void *ctx)
static void fiber_accept(ACL_FIBER *fiber acl_unused, void *ctx)
{
ACL_VSTREAM *sstream = (ACL_VSTREAM *) ctx;
const char *banner = "Welcom, enter any key to continue.\r\n";
for (;;) {
ACL_VSTREAM *cstream = acl_vstream_accept(sstream, NULL, 0);
@ -78,6 +85,15 @@ static void fiber_accept(ACL_FIBER *fiber acl_unused, void *ctx)
break;
}
printf("Accept connection from %s, enter any key to continue\r\n",
ACL_VSTREAM_PEER(cstream));
if (acl_vstream_writen(cstream, banner, strlen(banner)) == ACL_VSTREAM_EOF) {
printf("Write %s to client error %s\r\n", banner, acl_last_serror());
acl_vstream_close(cstream);
continue;
}
ret = acl_vstream_gets(cstream, buf, sizeof(buf) - 1);
if (ret == ACL_VSTREAM_EOF) {
printf("get first line error\r\n");
@ -89,9 +105,7 @@ static void fiber_accept(ACL_FIBER *fiber acl_unused, void *ctx)
continue;
}
//printf("accept one, fd: %d\r\n", ACL_VSTREAM_SOCK(cstream));
acl_fiber_create(echo_client, cstream, __stack_size);
//printf("continue to accept\r\n");
}
acl_vstream_close(sstream);

View File

@ -36,6 +36,7 @@ static int __listen_qlen = 64;
static int __read_timeout = -1;
static int __write_timeout = -1;
static int __echo_data = 1;
static int __use_sockopt = 0;
static int check_read(SOCKET fd, int timeout)
{
@ -99,8 +100,38 @@ static void echo_client(ACL_FIBER *fiber acl_unused, void *ctx)
__socket_count++;
//printf("client fiber-%d: fd: %d\r\n", acl_fiber_self(), fd);
if (__read_timeout > 0 && __use_sockopt) {
struct timeval tm;
tm.tv_sec = __read_timeout;
tm.tv_usec = 0;
#if defined(__APPLE__) || defined(_WIN32) || defined(_WIN64)
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO,
(char*) &__read_timeout, sizeof(__read_timeout)) < 0) {
#else
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tm, sizeof(tm)) < 0) {
#endif
printf("%s: setsockopt error: %s\r\n", __FUNCTION__, acl_last_serror());
}
}
if (__write_timeout > 0 && __use_sockopt) {
struct timeval tm;
tm.tv_sec = __write_timeout;
tm.tv_usec = 0;
#if defined(__APPLE__) || defined(_WIN32) || defined(_WIN64)
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO,
(char*) &__write_timeout, sizeof(__write_timeout)) < 0) {
#else
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tm, sizeof(tm)) < 0) {
#endif
printf("%s: setsockopt error: %s\r\n", __FUNCTION__, acl_last_serror());
}
}
while (1) {
if (__read_timeout > 0) {
if (__read_timeout > 0 && !__use_sockopt) {
ret = check_read(fd, __read_timeout * 1000);
if (ret < 0) {
break;
@ -137,7 +168,7 @@ static void echo_client(ACL_FIBER *fiber acl_unused, void *ctx)
continue;
}
if (__write_timeout > 0) {
if (__write_timeout > 0 && !__use_sockopt) {
int n = check_write(fd, __write_timeout * 1000);
if (n < 0) {
break;
@ -263,6 +294,7 @@ static void usage(const char *procname)
" -q listen_queue\r\n"
" -z stack_size\r\n"
" -Z [if use shared stack]\r\n"
" -T [if use setsockopt to set IO timeout]\r\n"
" -S [if using single IO, default: no]\r\n", procname);
}
@ -275,7 +307,7 @@ int main(int argc, char *argv[])
acl_fiber_attr_init(&fiber_attr);
snprintf(__listen_ip, sizeof(__listen_ip), "%s", "127.0.0.1");
while ((ch = getopt(argc, argv, "hs:p:r:w:q:Sz:Ze:")) > 0) {
while ((ch = getopt(argc, argv, "hs:p:r:w:q:Sz:Ze:T")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
@ -305,6 +337,9 @@ int main(int argc, char *argv[])
case 'Z':
acl_fiber_attr_setsharestack(&fiber_attr, 1);
break;
case 'T':
__use_sockopt = 1;
break;
case 'e':
if (strcasecmp(optarg, "select") == 0) {
event_mode = FIBER_EVENT_SELECT;