From 95124bafc3c9a1a707dcb6418112f1068003936c Mon Sep 17 00:00:00 2001 From: zhengshuxin Date: Wed, 12 Jul 2023 18:42:23 +0800 Subject: [PATCH] Optimize the closing process in fiber module. --- lib_fiber/c/src/fiber.c | 39 ++------ lib_fiber/c/src/fiber.h | 2 +- lib_fiber/c/src/fiber_io.c | 99 ++++++++++++------- lib_fiber/c/src/file_event.c | 7 +- lib_fiber/c/src/hook/fiber_read.c | 19 ---- lib_fiber/c/src/hook/fiber_write.c | 10 -- lib_fiber/c/src/hook/file.c | 5 - lib_fiber/c/src/hook/io.c | 23 ++--- .../samples/close_another/login/main.cpp | 4 +- .../samples/close_another/server/main.cpp | 10 +- 10 files changed, 98 insertions(+), 120 deletions(-) diff --git a/lib_fiber/c/src/fiber.c b/lib_fiber/c/src/fiber.c index 32ff6eb66..55b184318 100644 --- a/lib_fiber/c/src/fiber.c +++ b/lib_fiber/c/src/fiber.c @@ -468,36 +468,12 @@ void acl_fiber_signal(ACL_FIBER *fiber, int signum) fiber->signum = signum; - if (fiber == curr) { // Just return if kill myself - return; + // Just only wakeup the suspended fiber. + if (fiber->status == FIBER_STATUS_SUSPEND) { + ring_detach(&fiber->me); // This is safety! + + acl_fiber_ready(fiber); } - - ring_detach(&curr->me); - ring_detach(&fiber->me); - - /* Add the current fiber and signaled fiber in the head of the ready */ -#if 0 - fiber_ready(fiber); - fiber_yield(); -#elif 1 - /* First add current fiber, then the signaled fiber, and the signaled - * fiber will run first, then the current fiber. - */ - - curr->status = FIBER_STATUS_READY; - ring_append(&__thread_fiber->ready, &curr->me); - - fiber->status = FIBER_STATUS_READY; - ring_append(&__thread_fiber->ready, &fiber->me); -#else - fiber->status = FIBER_STATUS_READY; - ring_prepend(&__thread_fiber->ready, &fiber->me); - - curr->status = FIBER_STATUS_READY; - ring_prepend(&__thread_fiber->ready, &curr->me); -#endif - acl_fiber_switch(); - fiber->flag &= ~FIBER_F_SIGNALED; } int acl_fiber_signum(ACL_FIBER *fiber) @@ -538,6 +514,9 @@ int acl_fiber_yield(void) } __thread_fiber->switched_old = __thread_fiber->switched; + + // Reset the current fiber's status in order to be added to + // ready queue again. __thread_fiber->running->status = FIBER_STATUS_NONE; acl_fiber_ready(__thread_fiber->running); acl_fiber_switch(); @@ -674,7 +653,7 @@ void acl_fiber_schedule_init(int on) void acl_fiber_attr_init(ACL_FIBER_ATTR *attr) { attr->oflag = 0; - attr->stack_size = 128000; + attr->stack_size = 128000; } void acl_fiber_attr_setstacksize(ACL_FIBER_ATTR *attr, size_t size) diff --git a/lib_fiber/c/src/fiber.h b/lib_fiber/c/src/fiber.h index 18ec6bfa6..4f8784603 100644 --- a/lib_fiber/c/src/fiber.h +++ b/lib_fiber/c/src/fiber.h @@ -136,7 +136,7 @@ FILE_EVENT *fiber_file_open(socket_t fd); void fiber_file_set(FILE_EVENT *fe); FILE_EVENT *fiber_file_get(socket_t fd); void fiber_file_free(FILE_EVENT *fe); -void fiber_file_close(FILE_EVENT *fe); +int fiber_file_close(FILE_EVENT *fe); FILE_EVENT *fiber_file_cache_get(socket_t fd); void fiber_file_cache_put(FILE_EVENT *fe); diff --git a/lib_fiber/c/src/fiber_io.c b/lib_fiber/c/src/fiber_io.c index b26c87afd..a799c1d15 100644 --- a/lib_fiber/c/src/fiber_io.c +++ b/lib_fiber/c/src/fiber_io.c @@ -446,18 +446,18 @@ static void read_callback(EVENT *ev, FILE_EVENT *fe) */ int fiber_wait_read(FILE_EVENT *fe) { + ACL_FIBER *curr; int ret; fiber_io_check(); - fe->fiber_r = acl_fiber_running(); - // When return 0 just let it go continue ret = event_add_read(__thread_fiber->event, fe, read_callback); if (ret <= 0) { return ret; } + fe->fiber_r = curr = acl_fiber_running(); fe->fiber_r->wstatus |= FIBER_WAIT_READ; SET_READWAIT(fe); @@ -474,6 +474,11 @@ int fiber_wait_read(FILE_EVENT *fe) WAITER_DEC(__thread_fiber->event); } + if (acl_fiber_canceled(curr)) { + acl_fiber_set_error(curr->errnum); + return -1; + } + return ret; } @@ -493,17 +498,17 @@ static void write_callback(EVENT *ev, FILE_EVENT *fe) int fiber_wait_write(FILE_EVENT *fe) { + ACL_FIBER *curr; int ret; fiber_io_check(); - fe->fiber_w = acl_fiber_running(); - ret = event_add_write(__thread_fiber->event, fe, write_callback); if (ret <= 0) { return ret; } + fe->fiber_w = curr = acl_fiber_running(); fe->fiber_w->wstatus |= FIBER_WAIT_WRITE; SET_WRITEWAIT(fe); @@ -520,6 +525,11 @@ int fiber_wait_write(FILE_EVENT *fe) WAITER_DEC(__thread_fiber->event); } + if (acl_fiber_canceled(curr)) { + acl_fiber_set_error(curr->errnum); + return -1; + } + return ret; } @@ -670,9 +680,10 @@ void fiber_file_free(FILE_EVENT *fe) } } -void fiber_file_close(FILE_EVENT *fe) +int fiber_file_close(FILE_EVENT *fe) { ACL_FIBER *curr; + int n = 0; assert(fe); fiber_io_check(); @@ -687,42 +698,64 @@ void fiber_file_close(FILE_EVENT *fe) curr = acl_fiber_running(); - if (IS_READWAIT(fe) && fe->fiber_r && fe->fiber_r != curr - && fe->fiber_r->status != FIBER_STATUS_EXITING) { + if (fe->fiber_r && fe->fiber_r != curr) { + n++; - // The current fiber is closing the other fiber's fd, and the - // other fiber hoding the fd is blocked by waiting for the - // fd to be ready, so we just notify the blocked fiber to - // wakeup from read waiting status. + if (IS_READWAIT(fe) + && fe->fiber_r->status == FIBER_STATUS_SUSPEND) { + + // The current fiber is closing the other fiber's fd, + // and the other fiber hoding the fd is blocked by + // waiting for the fd to be ready, so we just notify + // the blocked fiber to wakeup from read waiting status. + + SET_CLOSING(fe); + CLR_READWAIT(fe); - SET_CLOSING(fe); - CLR_READWAIT(fe); #ifdef HAS_IO_URING - if (EVENT_IS_IO_URING(__thread_fiber->event)) { - file_cancel(__thread_fiber->event, fe, CANCEL_IO_READ); - } else { + if (EVENT_IS_IO_URING(__thread_fiber->event)) { + file_cancel(__thread_fiber->event, fe, + CANCEL_IO_READ); + } else { + acl_fiber_kill(fe->fiber_r); + } +#else acl_fiber_kill(fe->fiber_r); - } -#else - acl_fiber_kill(fe->fiber_r); #endif - } - - if (IS_WRITEWAIT(fe) && fe->fiber_w && fe->fiber_w != curr - && fe->fiber_w->status != FIBER_STATUS_EXITING) { - - CLR_WRITEWAIT(fe); - SET_CLOSING(fe); -#ifdef HAS_IO_URING - if (EVENT_IS_IO_URING(__thread_fiber->event)) { - file_cancel(__thread_fiber->event, fe, CANCEL_IO_WRITE); } else { - acl_fiber_kill(fe->fiber_w); + // If the reader fiber isn't blocked by the read wait, + // maybe it has been in the ready queue, just set flag. + fe->fiber_r->flag |= FIBER_F_KILLED; + fe->fiber_r->errnum = ECANCELED; } -#else - acl_fiber_kill(fe->fiber_w); -#endif } + + if (fe->fiber_w && fe->fiber_w != curr) { + n++; + + if (IS_WRITEWAIT(fe) + && fe->fiber_w->status == FIBER_STATUS_SUSPEND) { + + CLR_WRITEWAIT(fe); + SET_CLOSING(fe); + +#ifdef HAS_IO_URING + if (EVENT_IS_IO_URING(__thread_fiber->event)) { + file_cancel(__thread_fiber->event, fe, + CANCEL_IO_WRITE); + } else { + acl_fiber_kill(fe->fiber_w); + } +#else + acl_fiber_kill(fe->fiber_w); +#endif + } else { + fe->fiber_w->flag |= FIBER_F_KILLED; + fe->fiber_w->errnum = ECANCELED; + } + } + + return n; } /****************************************************************************/ diff --git a/lib_fiber/c/src/file_event.c b/lib_fiber/c/src/file_event.c index d32e8366a..f9359469e 100644 --- a/lib_fiber/c/src/file_event.c +++ b/lib_fiber/c/src/file_event.c @@ -7,8 +7,11 @@ void file_event_init(FILE_EVENT *fe, socket_t fd) { ring_init(&fe->me); - fe->fiber_r = acl_fiber_running(); - fe->fiber_w = acl_fiber_running(); + + // XXX: don't set fiber_r/fiber_w here! + // fe->fiber_r = acl_fiber_running(); + // fe->fiber_w = acl_fiber_running(); + fe->fd = fd; fe->id = -1; fe->status = STATUS_NONE; diff --git a/lib_fiber/c/src/hook/fiber_read.c b/lib_fiber/c/src/hook/fiber_read.c index 69f72d0a1..c0690c31d 100644 --- a/lib_fiber/c/src/hook/fiber_read.c +++ b/lib_fiber/c/src/hook/fiber_read.c @@ -26,11 +26,6 @@ static int uring_wait_read(FILE_EVENT *fe) return -1; } - if (acl_fiber_canceled(fe->fiber_r)) { - acl_fiber_set_error(fe->fiber_r->errnum); - return -1; - } - if (fe->reader_ctx.res >= 0) { return fe->reader_ctx.res; } @@ -88,11 +83,6 @@ static int iocp_wait_read(FILE_EVENT *fe) return -1; } - if (acl_fiber_canceled(fe->fiber_r)) { - acl_fiber_set_error(fe->fiber_r->errnum); - return -1; - } - if (fe->res >= 0) { return fe->res; } @@ -143,7 +133,6 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len) // -1: The fd isn't a valid descriptor, just return error, and // the fe should be freed. -// After calling acl_fiber_canceled(): // If the suspending fiber wakeup for the reason that it was // killed by the other fiber which called acl_fiber_kill and // want to close the fd owned by the current fiber, we just @@ -167,10 +156,6 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len) } else if (fiber_wait_read((_fe)) < 0) { \ return -1; \ } \ - if (acl_fiber_canceled((_fe)->fiber_r)) { \ - acl_fiber_set_error((_fe)->fiber_r->errnum); \ - return -1; \ - } \ if ((_fn) == NULL) { \ hook_once(); \ } \ @@ -196,10 +181,6 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len) } else if (fiber_wait_read((_fe)) < 0) { \ return -1; \ } \ - if (acl_fiber_canceled((_fe)->fiber_r)) { \ - acl_fiber_set_error((_fe)->fiber_r->errnum); \ - return -1; \ - } \ if ((_fn) == NULL) { \ hook_once(); \ } \ diff --git a/lib_fiber/c/src/hook/fiber_write.c b/lib_fiber/c/src/hook/fiber_write.c index 946a61363..56cc27b98 100644 --- a/lib_fiber/c/src/hook/fiber_write.c +++ b/lib_fiber/c/src/hook/fiber_write.c @@ -46,11 +46,6 @@ static int wait_write(FILE_EVENT *fe) return -1; } - if (acl_fiber_canceled(fe->fiber_w)) { - acl_fiber_set_error(fe->fiber_w->errnum); - return -1; - } - return 0; } @@ -431,11 +426,6 @@ ssize_t fiber_sendfile64(socket_t out_fd, int in_fd, off64_t *offset, size_t cou __FUNCTION__, __LINE__, out_fd); return -1; } - - if (acl_fiber_canceled(fe->fiber_w)) { - acl_fiber_set_error(fe->fiber_w->errnum); - return -1; - } } } diff --git a/lib_fiber/c/src/hook/file.c b/lib_fiber/c/src/hook/file.c index 2a4f83799..5de2d85df 100644 --- a/lib_fiber/c/src/hook/file.c +++ b/lib_fiber/c/src/hook/file.c @@ -631,11 +631,6 @@ ssize_t file_sendfile(socket_t out_fd, int in_fd, off64_t *off, size_t cnt) __LINE__, out_fd); return -1; } - - if (acl_fiber_canceled(fe->fiber_w)) { - acl_fiber_set_error(fe->fiber_w->errnum); - return -1; - } } } #endif diff --git a/lib_fiber/c/src/hook/io.c b/lib_fiber/c/src/hook/io.c index eabb3ccc5..5aec26ecf 100644 --- a/lib_fiber/c/src/hook/io.c +++ b/lib_fiber/c/src/hook/io.c @@ -85,23 +85,14 @@ int WINAPI acl_fiber_close(socket_t fd) return ret; } - // If the fd is in the status waiting for IO ready, the current fiber - // is trying to close the other fiber's fd, so, we should wakeup the - // suspending fiber and wait for its returning back, the process is: - // ->killer kill the fiber which is suspending and holding the fd; - // ->suspending fiber wakeup and return; - // ->killer closing the fd and free the fe. + ret = fiber_file_close(fe); - // If the fd isn't in waiting status, we don't know which fiber is - // holding the fd, but we can close it and free the fe with it. - // If the current fiber is holding the fd, we just close it ok, else - // if the other fiber is holding it, the IO API such as acl_fiber_read - // should hanlding the fd carefully, the process is: - // ->killer closing the fd and free fe owned by itself or other fiber; - // ->if fd was owned by the other fiber, calling API like acl_fiber_read - // will return -1 after fiber_wait_read returns. - - fiber_file_close(fe); + // If the return value more than 0, the fe has just been bound with + // the other fiber, we just return here and the fe will really be + // closed and freed by the last one binding the fe. + if (ret > 0) { + return 0; + } ev = fiber_io_event(); if (ev && ev->close_sock) { diff --git a/lib_fiber/samples/close_another/login/main.cpp b/lib_fiber/samples/close_another/login/main.cpp index 05d3e827d..f7f8b147b 100644 --- a/lib_fiber/samples/close_another/login/main.cpp +++ b/lib_fiber/samples/close_another/login/main.cpp @@ -56,8 +56,8 @@ private: static void usage(const char* procname) { printf("usage: %s -h [help] -s server_addr\r\n" - " -c max_fibers\r\n" - " -n connect_count\r\n" + " -c max_fibers [default: 10]\r\n" + " -n connect_count [default: 10]\r\n" , procname); } diff --git a/lib_fiber/samples/close_another/server/main.cpp b/lib_fiber/samples/close_another/server/main.cpp index d9a652cd5..23d93d95a 100644 --- a/lib_fiber/samples/close_another/server/main.cpp +++ b/lib_fiber/samples/close_another/server/main.cpp @@ -32,6 +32,7 @@ protected: int fd = conn_->sock_handle(); char buf[8192]; while (true) { + printf(">>>fiber begin to read from fd=%d\r\n", conn_->sock_handle()); int ret = conn_->read(buf, sizeof(buf), false); if (ret == -1) { printf("read error=%s, fd=%d, %d\r\n", @@ -102,8 +103,8 @@ private: } else { // Must unbind the socket with the conn object // to avoid closing the socket twice. - conn.unbind_sock(); - printf("Kick the old fd=%d, my fd=%d\r\n", + //conn.unbind_sock(); + printf("Close the old fd=%d, my fd=%d\r\n", fd, conn_->sock_handle()); close(fd); } @@ -111,15 +112,18 @@ private: printf("Kick old %s, old fd=%d, my fd=%d\r\n", name_.c_str(), fd, conn_->sock_handle()); +#if 0 if (conn_->format("Welcome %s!\r\n", name_.c_str()) <= 0) { printf("write to %s error %s\r\n", name_.c_str(), acl::last_serror()); return false; } +#endif it = __users.find(name_); if (it == __users.end()) { __users[name_] = this; + printf("Add new one ok!\r\n"); return true; } else { printf("The other one has already logined!\r\n"); @@ -128,7 +132,9 @@ private: } ~fiber_client(void) { + printf(">>>Begin delete conn=%p, fd=%d\r\n", conn_, conn_->sock_handle()); delete conn_; + printf(">>>Delete ok, conn=%p\r\n\r\n", conn_); } };