Optimize the closing process in fiber module.

This commit is contained in:
zhengshuxin 2023-07-12 18:42:23 +08:00
parent e9b14372ea
commit 95124bafc3
10 changed files with 98 additions and 120 deletions

View File

@ -468,36 +468,12 @@ void acl_fiber_signal(ACL_FIBER *fiber, int signum)
fiber->signum = signum;
if (fiber == curr) { // Just return if kill myself
return;
// Just only wakeup the suspended fiber.
if (fiber->status == FIBER_STATUS_SUSPEND) {
ring_detach(&fiber->me); // This is safety!
acl_fiber_ready(fiber);
}
ring_detach(&curr->me);
ring_detach(&fiber->me);
/* Add the current fiber and signaled fiber in the head of the ready */
#if 0
fiber_ready(fiber);
fiber_yield();
#elif 1
/* First add current fiber, then the signaled fiber, and the signaled
* fiber will run first, then the current fiber.
*/
curr->status = FIBER_STATUS_READY;
ring_append(&__thread_fiber->ready, &curr->me);
fiber->status = FIBER_STATUS_READY;
ring_append(&__thread_fiber->ready, &fiber->me);
#else
fiber->status = FIBER_STATUS_READY;
ring_prepend(&__thread_fiber->ready, &fiber->me);
curr->status = FIBER_STATUS_READY;
ring_prepend(&__thread_fiber->ready, &curr->me);
#endif
acl_fiber_switch();
fiber->flag &= ~FIBER_F_SIGNALED;
}
int acl_fiber_signum(ACL_FIBER *fiber)
@ -538,6 +514,9 @@ int acl_fiber_yield(void)
}
__thread_fiber->switched_old = __thread_fiber->switched;
// Reset the current fiber's status in order to be added to
// ready queue again.
__thread_fiber->running->status = FIBER_STATUS_NONE;
acl_fiber_ready(__thread_fiber->running);
acl_fiber_switch();
@ -674,7 +653,7 @@ void acl_fiber_schedule_init(int on)
void acl_fiber_attr_init(ACL_FIBER_ATTR *attr)
{
attr->oflag = 0;
attr->stack_size = 128000;
attr->stack_size = 128000;
}
void acl_fiber_attr_setstacksize(ACL_FIBER_ATTR *attr, size_t size)

View File

@ -136,7 +136,7 @@ FILE_EVENT *fiber_file_open(socket_t fd);
void fiber_file_set(FILE_EVENT *fe);
FILE_EVENT *fiber_file_get(socket_t fd);
void fiber_file_free(FILE_EVENT *fe);
void fiber_file_close(FILE_EVENT *fe);
int fiber_file_close(FILE_EVENT *fe);
FILE_EVENT *fiber_file_cache_get(socket_t fd);
void fiber_file_cache_put(FILE_EVENT *fe);

View File

@ -446,18 +446,18 @@ static void read_callback(EVENT *ev, FILE_EVENT *fe)
*/
int fiber_wait_read(FILE_EVENT *fe)
{
ACL_FIBER *curr;
int ret;
fiber_io_check();
fe->fiber_r = acl_fiber_running();
// When return 0 just let it go continue
ret = event_add_read(__thread_fiber->event, fe, read_callback);
if (ret <= 0) {
return ret;
}
fe->fiber_r = curr = acl_fiber_running();
fe->fiber_r->wstatus |= FIBER_WAIT_READ;
SET_READWAIT(fe);
@ -474,6 +474,11 @@ int fiber_wait_read(FILE_EVENT *fe)
WAITER_DEC(__thread_fiber->event);
}
if (acl_fiber_canceled(curr)) {
acl_fiber_set_error(curr->errnum);
return -1;
}
return ret;
}
@ -493,17 +498,17 @@ static void write_callback(EVENT *ev, FILE_EVENT *fe)
int fiber_wait_write(FILE_EVENT *fe)
{
ACL_FIBER *curr;
int ret;
fiber_io_check();
fe->fiber_w = acl_fiber_running();
ret = event_add_write(__thread_fiber->event, fe, write_callback);
if (ret <= 0) {
return ret;
}
fe->fiber_w = curr = acl_fiber_running();
fe->fiber_w->wstatus |= FIBER_WAIT_WRITE;
SET_WRITEWAIT(fe);
@ -520,6 +525,11 @@ int fiber_wait_write(FILE_EVENT *fe)
WAITER_DEC(__thread_fiber->event);
}
if (acl_fiber_canceled(curr)) {
acl_fiber_set_error(curr->errnum);
return -1;
}
return ret;
}
@ -670,9 +680,10 @@ void fiber_file_free(FILE_EVENT *fe)
}
}
void fiber_file_close(FILE_EVENT *fe)
int fiber_file_close(FILE_EVENT *fe)
{
ACL_FIBER *curr;
int n = 0;
assert(fe);
fiber_io_check();
@ -687,42 +698,64 @@ void fiber_file_close(FILE_EVENT *fe)
curr = acl_fiber_running();
if (IS_READWAIT(fe) && fe->fiber_r && fe->fiber_r != curr
&& fe->fiber_r->status != FIBER_STATUS_EXITING) {
if (fe->fiber_r && fe->fiber_r != curr) {
n++;
// The current fiber is closing the other fiber's fd, and the
// other fiber hoding the fd is blocked by waiting for the
// fd to be ready, so we just notify the blocked fiber to
// wakeup from read waiting status.
if (IS_READWAIT(fe)
&& fe->fiber_r->status == FIBER_STATUS_SUSPEND) {
// The current fiber is closing the other fiber's fd,
// and the other fiber hoding the fd is blocked by
// waiting for the fd to be ready, so we just notify
// the blocked fiber to wakeup from read waiting status.
SET_CLOSING(fe);
CLR_READWAIT(fe);
SET_CLOSING(fe);
CLR_READWAIT(fe);
#ifdef HAS_IO_URING
if (EVENT_IS_IO_URING(__thread_fiber->event)) {
file_cancel(__thread_fiber->event, fe, CANCEL_IO_READ);
} else {
if (EVENT_IS_IO_URING(__thread_fiber->event)) {
file_cancel(__thread_fiber->event, fe,
CANCEL_IO_READ);
} else {
acl_fiber_kill(fe->fiber_r);
}
#else
acl_fiber_kill(fe->fiber_r);
}
#else
acl_fiber_kill(fe->fiber_r);
#endif
}
if (IS_WRITEWAIT(fe) && fe->fiber_w && fe->fiber_w != curr
&& fe->fiber_w->status != FIBER_STATUS_EXITING) {
CLR_WRITEWAIT(fe);
SET_CLOSING(fe);
#ifdef HAS_IO_URING
if (EVENT_IS_IO_URING(__thread_fiber->event)) {
file_cancel(__thread_fiber->event, fe, CANCEL_IO_WRITE);
} else {
acl_fiber_kill(fe->fiber_w);
// If the reader fiber isn't blocked by the read wait,
// maybe it has been in the ready queue, just set flag.
fe->fiber_r->flag |= FIBER_F_KILLED;
fe->fiber_r->errnum = ECANCELED;
}
#else
acl_fiber_kill(fe->fiber_w);
#endif
}
if (fe->fiber_w && fe->fiber_w != curr) {
n++;
if (IS_WRITEWAIT(fe)
&& fe->fiber_w->status == FIBER_STATUS_SUSPEND) {
CLR_WRITEWAIT(fe);
SET_CLOSING(fe);
#ifdef HAS_IO_URING
if (EVENT_IS_IO_URING(__thread_fiber->event)) {
file_cancel(__thread_fiber->event, fe,
CANCEL_IO_WRITE);
} else {
acl_fiber_kill(fe->fiber_w);
}
#else
acl_fiber_kill(fe->fiber_w);
#endif
} else {
fe->fiber_w->flag |= FIBER_F_KILLED;
fe->fiber_w->errnum = ECANCELED;
}
}
return n;
}
/****************************************************************************/

View File

@ -7,8 +7,11 @@
void file_event_init(FILE_EVENT *fe, socket_t fd)
{
ring_init(&fe->me);
fe->fiber_r = acl_fiber_running();
fe->fiber_w = acl_fiber_running();
// XXX: don't set fiber_r/fiber_w here!
// fe->fiber_r = acl_fiber_running();
// fe->fiber_w = acl_fiber_running();
fe->fd = fd;
fe->id = -1;
fe->status = STATUS_NONE;

View File

@ -26,11 +26,6 @@ static int uring_wait_read(FILE_EVENT *fe)
return -1;
}
if (acl_fiber_canceled(fe->fiber_r)) {
acl_fiber_set_error(fe->fiber_r->errnum);
return -1;
}
if (fe->reader_ctx.res >= 0) {
return fe->reader_ctx.res;
}
@ -88,11 +83,6 @@ static int iocp_wait_read(FILE_EVENT *fe)
return -1;
}
if (acl_fiber_canceled(fe->fiber_r)) {
acl_fiber_set_error(fe->fiber_r->errnum);
return -1;
}
if (fe->res >= 0) {
return fe->res;
}
@ -143,7 +133,6 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
// -1: The fd isn't a valid descriptor, just return error, and
// the fe should be freed.
// After calling acl_fiber_canceled():
// If the suspending fiber wakeup for the reason that it was
// killed by the other fiber which called acl_fiber_kill and
// want to close the fd owned by the current fiber, we just
@ -167,10 +156,6 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
} else if (fiber_wait_read((_fe)) < 0) { \
return -1; \
} \
if (acl_fiber_canceled((_fe)->fiber_r)) { \
acl_fiber_set_error((_fe)->fiber_r->errnum); \
return -1; \
} \
if ((_fn) == NULL) { \
hook_once(); \
} \
@ -196,10 +181,6 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
} else if (fiber_wait_read((_fe)) < 0) { \
return -1; \
} \
if (acl_fiber_canceled((_fe)->fiber_r)) { \
acl_fiber_set_error((_fe)->fiber_r->errnum); \
return -1; \
} \
if ((_fn) == NULL) { \
hook_once(); \
} \

View File

@ -46,11 +46,6 @@ static int wait_write(FILE_EVENT *fe)
return -1;
}
if (acl_fiber_canceled(fe->fiber_w)) {
acl_fiber_set_error(fe->fiber_w->errnum);
return -1;
}
return 0;
}
@ -431,11 +426,6 @@ ssize_t fiber_sendfile64(socket_t out_fd, int in_fd, off64_t *offset, size_t cou
__FUNCTION__, __LINE__, out_fd);
return -1;
}
if (acl_fiber_canceled(fe->fiber_w)) {
acl_fiber_set_error(fe->fiber_w->errnum);
return -1;
}
}
}

View File

@ -631,11 +631,6 @@ ssize_t file_sendfile(socket_t out_fd, int in_fd, off64_t *off, size_t cnt)
__LINE__, out_fd);
return -1;
}
if (acl_fiber_canceled(fe->fiber_w)) {
acl_fiber_set_error(fe->fiber_w->errnum);
return -1;
}
}
}
#endif

View File

@ -85,23 +85,14 @@ int WINAPI acl_fiber_close(socket_t fd)
return ret;
}
// If the fd is in the status waiting for IO ready, the current fiber
// is trying to close the other fiber's fd, so, we should wakeup the
// suspending fiber and wait for its returning back, the process is:
// ->killer kill the fiber which is suspending and holding the fd;
// ->suspending fiber wakeup and return;
// ->killer closing the fd and free the fe.
ret = fiber_file_close(fe);
// If the fd isn't in waiting status, we don't know which fiber is
// holding the fd, but we can close it and free the fe with it.
// If the current fiber is holding the fd, we just close it ok, else
// if the other fiber is holding it, the IO API such as acl_fiber_read
// should hanlding the fd carefully, the process is:
// ->killer closing the fd and free fe owned by itself or other fiber;
// ->if fd was owned by the other fiber, calling API like acl_fiber_read
// will return -1 after fiber_wait_read returns.
fiber_file_close(fe);
// If the return value more than 0, the fe has just been bound with
// the other fiber, we just return here and the fe will really be
// closed and freed by the last one binding the fe.
if (ret > 0) {
return 0;
}
ev = fiber_io_event();
if (ev && ev->close_sock) {

View File

@ -56,8 +56,8 @@ private:
static void usage(const char* procname) {
printf("usage: %s -h [help] -s server_addr\r\n"
" -c max_fibers\r\n"
" -n connect_count\r\n"
" -c max_fibers [default: 10]\r\n"
" -n connect_count [default: 10]\r\n"
, procname);
}

View File

@ -32,6 +32,7 @@ protected:
int fd = conn_->sock_handle();
char buf[8192];
while (true) {
printf(">>>fiber begin to read from fd=%d\r\n", conn_->sock_handle());
int ret = conn_->read(buf, sizeof(buf), false);
if (ret == -1) {
printf("read error=%s, fd=%d, %d\r\n",
@ -102,8 +103,8 @@ private:
} else {
// Must unbind the socket with the conn object
// to avoid closing the socket twice.
conn.unbind_sock();
printf("Kick the old fd=%d, my fd=%d\r\n",
//conn.unbind_sock();
printf("Close the old fd=%d, my fd=%d\r\n",
fd, conn_->sock_handle());
close(fd);
}
@ -111,15 +112,18 @@ private:
printf("Kick old %s, old fd=%d, my fd=%d\r\n", name_.c_str(),
fd, conn_->sock_handle());
#if 0
if (conn_->format("Welcome %s!\r\n", name_.c_str()) <= 0) {
printf("write to %s error %s\r\n",
name_.c_str(), acl::last_serror());
return false;
}
#endif
it = __users.find(name_);
if (it == __users.end()) {
__users[name_] = this;
printf("Add new one ok!\r\n");
return true;
} else {
printf("The other one has already logined!\r\n");
@ -128,7 +132,9 @@ private:
}
~fiber_client(void) {
printf(">>>Begin delete conn=%p, fd=%d\r\n", conn_, conn_->sock_handle());
delete conn_;
printf(">>>Delete ok, conn=%p\r\n\r\n", conn_);
}
};