Merge branch 'gitee-master' into gitlab-upstream

This commit is contained in:
zhengshuxin 2023-07-13 22:10:46 +08:00
commit 84a167b443
18 changed files with 187 additions and 148 deletions

View File

@ -175,6 +175,12 @@ FIBER_API int acl_fiber_closed(ACL_FIBER* fiber);
*/
FIBER_API int acl_fiber_canceled(ACL_FIBER* fiber);
/**
* Clear the fiber's flag and errnum to 0.
* @param fiber {ACL_FIBER*}
*/
FIBER_API void acl_fiber_clear(ACL_FIBER *fiber);
/**
* Wakeup the suspended fiber with the assosiated signal number
* @param fiber {const ACL_FIBER*} the specified fiber, NOT NULL

View File

@ -36,8 +36,9 @@ FIBER_API void acl_fiber_lock_free(ACL_FIBER_LOCK* l);
* Lock the specified fiber mutex, return immediately when locked, or will
* wait until the mutex can be used
* @param l {ACL_FIBER_LOCK*} created by acl_fiber_lock_create
* @return {int} successful when return 0, or error return -1
*/
FIBER_API void acl_fiber_lock_lock(ACL_FIBER_LOCK* l);
FIBER_API int acl_fiber_lock_lock(ACL_FIBER_LOCK* l);
/**
* Try lock the specified fiber mutex, return immediately no matter the mutex
@ -73,14 +74,15 @@ FIBER_API void acl_fiber_rwlock_free(ACL_FIBER_RWLOCK* l);
* function will return immediately; otherwise, the caller will wait for all
* write locking be released. Read lock on it will successful when returning
* @param l {ACL_FIBER_RWLOCK*} created by acl_fiber_rwlock_create
* @return {int} successful when return 0, or error if return -1
*/
FIBER_API void acl_fiber_rwlock_rlock(ACL_FIBER_RWLOCK* l);
FIBER_API int acl_fiber_rwlock_rlock(ACL_FIBER_RWLOCK* l);
/**
* Try to locking the Readonly lock, return immediately no matter locking
* is successful.
* @param l {ACL_FIBER_RWLOCK*} crated by acl_fiber_rwlock_create
* @retur {int} 1 returned when successfully locked, or 0 returned if locking
* @retur {int} 0 returned when successfully locked, or -1 returned if locking
* operation is failed.
*/
FIBER_API int acl_fiber_rwlock_tryrlock(ACL_FIBER_RWLOCK* l);
@ -88,14 +90,15 @@ FIBER_API int acl_fiber_rwlock_tryrlock(ACL_FIBER_RWLOCK* l);
/**
* Lock the rwlock in Write Lock mode, return until no any one locking it
* @param l {ACL_FIBER_RWLOCK*} created by acl_fiber_rwlock_create
* @return {int} return 0 if successful, -1 if error.
*/
FIBER_API void acl_fiber_rwlock_wlock(ACL_FIBER_RWLOCK* l);
FIBER_API int acl_fiber_rwlock_wlock(ACL_FIBER_RWLOCK* l);
/**
* Try to lock the rwlock in Write Lock mode. return immediately no matter
* locking is successful.
* @param l {ACL_FIBER_RWLOCK*} created by acl_fiber_rwlock_create
* @return {int} 1 returned when locking successfully, or 0 returned when
* @return {int} 0 returned when locking successfully, or -1 returned when
* locking failed
*/
FIBER_API int acl_fiber_rwlock_trywlock(ACL_FIBER_RWLOCK* l);

View File

@ -434,6 +434,14 @@ int acl_fiber_canceled(ACL_FIBER *fiber)
return fiber && (fiber->flag & FIBER_F_CANCELED);
}
void acl_fiber_clear(ACL_FIBER *fiber)
{
if (fiber) {
fiber->errnum = 0;
fiber->flag &= ~FIBER_F_CANCELED;
}
}
void acl_fiber_kill(ACL_FIBER *fiber)
{
fiber->errnum = ECANCELED;
@ -468,36 +476,12 @@ void acl_fiber_signal(ACL_FIBER *fiber, int signum)
fiber->signum = signum;
if (fiber == curr) { // Just return if kill myself
return;
// Just only wakeup the suspended fiber.
if (fiber->status == FIBER_STATUS_SUSPEND) {
ring_detach(&fiber->me); // This is safety!
acl_fiber_ready(fiber);
}
ring_detach(&curr->me);
ring_detach(&fiber->me);
/* Add the current fiber and signaled fiber in the head of the ready */
#if 0
fiber_ready(fiber);
fiber_yield();
#elif 1
/* First add current fiber, then the signaled fiber, and the signaled
* fiber will run first, then the current fiber.
*/
curr->status = FIBER_STATUS_READY;
ring_append(&__thread_fiber->ready, &curr->me);
fiber->status = FIBER_STATUS_READY;
ring_append(&__thread_fiber->ready, &fiber->me);
#else
fiber->status = FIBER_STATUS_READY;
ring_prepend(&__thread_fiber->ready, &fiber->me);
curr->status = FIBER_STATUS_READY;
ring_prepend(&__thread_fiber->ready, &curr->me);
#endif
acl_fiber_switch();
fiber->flag &= ~FIBER_F_SIGNALED;
}
int acl_fiber_signum(ACL_FIBER *fiber)
@ -538,6 +522,9 @@ int acl_fiber_yield(void)
}
__thread_fiber->switched_old = __thread_fiber->switched;
// Reset the current fiber's status in order to be added to
// ready queue again.
__thread_fiber->running->status = FIBER_STATUS_NONE;
acl_fiber_ready(__thread_fiber->running);
acl_fiber_switch();
@ -674,7 +661,7 @@ void acl_fiber_schedule_init(int on)
void acl_fiber_attr_init(ACL_FIBER_ATTR *attr)
{
attr->oflag = 0;
attr->stack_size = 128000;
attr->stack_size = 128000;
}
void acl_fiber_attr_setstacksize(ACL_FIBER_ATTR *attr, size_t size)

View File

@ -64,8 +64,8 @@ struct ACL_FIBER {
int signum;
unsigned short status;
unsigned short wstatus;
unsigned int oflag;
unsigned int flag;
unsigned int oflag; // The flags for creating fiber.
unsigned int flag; // The flags for the fiber's running status.
#define FIBER_F_STARTED (unsigned) (1 << 0)
#define FIBER_F_SAVE_ERRNO (unsigned) (1 << 1)
@ -136,7 +136,7 @@ FILE_EVENT *fiber_file_open(socket_t fd);
void fiber_file_set(FILE_EVENT *fe);
FILE_EVENT *fiber_file_get(socket_t fd);
void fiber_file_free(FILE_EVENT *fe);
void fiber_file_close(FILE_EVENT *fe);
int fiber_file_close(FILE_EVENT *fe);
FILE_EVENT *fiber_file_cache_get(socket_t fd);
void fiber_file_cache_put(FILE_EVENT *fe);

View File

@ -446,11 +446,16 @@ static void read_callback(EVENT *ev, FILE_EVENT *fe)
*/
int fiber_wait_read(FILE_EVENT *fe)
{
ACL_FIBER *curr;
int ret;
fiber_io_check();
fe->fiber_r = acl_fiber_running();
curr = acl_fiber_running();
if (acl_fiber_canceled(curr)) {
acl_fiber_set_error(curr->errnum);
return -1;
}
// When return 0 just let it go continue
ret = event_add_read(__thread_fiber->event, fe, read_callback);
@ -458,6 +463,7 @@ int fiber_wait_read(FILE_EVENT *fe)
return ret;
}
fe->fiber_r = curr;
fe->fiber_r->wstatus |= FIBER_WAIT_READ;
SET_READWAIT(fe);
@ -474,6 +480,11 @@ int fiber_wait_read(FILE_EVENT *fe)
WAITER_DEC(__thread_fiber->event);
}
if (acl_fiber_canceled(curr)) {
acl_fiber_set_error(curr->errnum);
return -1;
}
return ret;
}
@ -493,17 +504,23 @@ static void write_callback(EVENT *ev, FILE_EVENT *fe)
int fiber_wait_write(FILE_EVENT *fe)
{
ACL_FIBER *curr;
int ret;
fiber_io_check();
fe->fiber_w = acl_fiber_running();
curr = acl_fiber_running();
if (acl_fiber_canceled(curr)) {
acl_fiber_set_error(curr->errnum);
return -1;
}
ret = event_add_write(__thread_fiber->event, fe, write_callback);
if (ret <= 0) {
return ret;
}
fe->fiber_w = curr;
fe->fiber_w->wstatus |= FIBER_WAIT_WRITE;
SET_WRITEWAIT(fe);
@ -520,6 +537,11 @@ int fiber_wait_write(FILE_EVENT *fe)
WAITER_DEC(__thread_fiber->event);
}
if (acl_fiber_canceled(curr)) {
acl_fiber_set_error(curr->errnum);
return -1;
}
return ret;
}
@ -670,9 +692,10 @@ void fiber_file_free(FILE_EVENT *fe)
}
}
void fiber_file_close(FILE_EVENT *fe)
int fiber_file_close(FILE_EVENT *fe)
{
ACL_FIBER *curr;
int n = 0;
assert(fe);
fiber_io_check();
@ -687,42 +710,64 @@ void fiber_file_close(FILE_EVENT *fe)
curr = acl_fiber_running();
if (IS_READWAIT(fe) && fe->fiber_r && fe->fiber_r != curr
&& fe->fiber_r->status != FIBER_STATUS_EXITING) {
if (fe->fiber_r && fe->fiber_r != curr) {
n++;
// The current fiber is closing the other fiber's fd, and the
// other fiber hoding the fd is blocked by waiting for the
// fd to be ready, so we just notify the blocked fiber to
// wakeup from read waiting status.
if (IS_READWAIT(fe)
&& fe->fiber_r->status == FIBER_STATUS_SUSPEND) {
// The current fiber is closing the other fiber's fd,
// and the other fiber hoding the fd is blocked by
// waiting for the fd to be ready, so we just notify
// the blocked fiber to wakeup from read waiting status.
SET_CLOSING(fe);
CLR_READWAIT(fe);
SET_CLOSING(fe);
CLR_READWAIT(fe);
#ifdef HAS_IO_URING
if (EVENT_IS_IO_URING(__thread_fiber->event)) {
file_cancel(__thread_fiber->event, fe, CANCEL_IO_READ);
} else {
if (EVENT_IS_IO_URING(__thread_fiber->event)) {
file_cancel(__thread_fiber->event, fe,
CANCEL_IO_READ);
} else {
acl_fiber_kill(fe->fiber_r);
}
#else
acl_fiber_kill(fe->fiber_r);
}
#else
acl_fiber_kill(fe->fiber_r);
#endif
}
if (IS_WRITEWAIT(fe) && fe->fiber_w && fe->fiber_w != curr
&& fe->fiber_w->status != FIBER_STATUS_EXITING) {
CLR_WRITEWAIT(fe);
SET_CLOSING(fe);
#ifdef HAS_IO_URING
if (EVENT_IS_IO_URING(__thread_fiber->event)) {
file_cancel(__thread_fiber->event, fe, CANCEL_IO_WRITE);
} else {
acl_fiber_kill(fe->fiber_w);
// If the reader fiber isn't blocked by the read wait,
// maybe it has been in the ready queue, just set flag.
fe->fiber_r->flag |= FIBER_F_KILLED;
fe->fiber_r->errnum = ECANCELED;
}
#else
acl_fiber_kill(fe->fiber_w);
#endif
}
if (fe->fiber_w && fe->fiber_w != curr) {
n++;
if (IS_WRITEWAIT(fe)
&& fe->fiber_w->status == FIBER_STATUS_SUSPEND) {
CLR_WRITEWAIT(fe);
SET_CLOSING(fe);
#ifdef HAS_IO_URING
if (EVENT_IS_IO_URING(__thread_fiber->event)) {
file_cancel(__thread_fiber->event, fe,
CANCEL_IO_WRITE);
} else {
acl_fiber_kill(fe->fiber_w);
}
#else
acl_fiber_kill(fe->fiber_w);
#endif
} else {
fe->fiber_w->flag |= FIBER_F_KILLED;
fe->fiber_w->errnum = ECANCELED;
}
}
return n;
}
/****************************************************************************/

View File

@ -7,8 +7,11 @@
void file_event_init(FILE_EVENT *fe, socket_t fd)
{
ring_init(&fe->me);
fe->fiber_r = acl_fiber_running();
fe->fiber_w = acl_fiber_running();
// XXX: don't set fiber_r/fiber_w here!
// fe->fiber_r = acl_fiber_running();
// fe->fiber_w = acl_fiber_running();
fe->fd = fd;
fe->id = -1;
fe->status = STATUS_NONE;

View File

@ -26,11 +26,6 @@ static int uring_wait_read(FILE_EVENT *fe)
return -1;
}
if (acl_fiber_canceled(fe->fiber_r)) {
acl_fiber_set_error(fe->fiber_r->errnum);
return -1;
}
if (fe->reader_ctx.res >= 0) {
return fe->reader_ctx.res;
}
@ -88,11 +83,6 @@ static int iocp_wait_read(FILE_EVENT *fe)
return -1;
}
if (acl_fiber_canceled(fe->fiber_r)) {
acl_fiber_set_error(fe->fiber_r->errnum);
return -1;
}
if (fe->res >= 0) {
return fe->res;
}
@ -143,7 +133,6 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
// -1: The fd isn't a valid descriptor, just return error, and
// the fe should be freed.
// After calling acl_fiber_canceled():
// If the suspending fiber wakeup for the reason that it was
// killed by the other fiber which called acl_fiber_kill and
// want to close the fd owned by the current fiber, we just
@ -167,10 +156,6 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
} else if (fiber_wait_read((_fe)) < 0) { \
return -1; \
} \
if (acl_fiber_canceled((_fe)->fiber_r)) { \
acl_fiber_set_error((_fe)->fiber_r->errnum); \
return -1; \
} \
if ((_fn) == NULL) { \
hook_once(); \
} \
@ -196,10 +181,6 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
} else if (fiber_wait_read((_fe)) < 0) { \
return -1; \
} \
if (acl_fiber_canceled((_fe)->fiber_r)) { \
acl_fiber_set_error((_fe)->fiber_r->errnum); \
return -1; \
} \
if ((_fn) == NULL) { \
hook_once(); \
} \

View File

@ -46,11 +46,6 @@ static int wait_write(FILE_EVENT *fe)
return -1;
}
if (acl_fiber_canceled(fe->fiber_w)) {
acl_fiber_set_error(fe->fiber_w->errnum);
return -1;
}
return 0;
}
@ -431,11 +426,6 @@ ssize_t fiber_sendfile64(socket_t out_fd, int in_fd, off64_t *offset, size_t cou
__FUNCTION__, __LINE__, out_fd);
return -1;
}
if (acl_fiber_canceled(fe->fiber_w)) {
acl_fiber_set_error(fe->fiber_w->errnum);
return -1;
}
}
}

View File

@ -631,11 +631,6 @@ ssize_t file_sendfile(socket_t out_fd, int in_fd, off64_t *off, size_t cnt)
__LINE__, out_fd);
return -1;
}
if (acl_fiber_canceled(fe->fiber_w)) {
acl_fiber_set_error(fe->fiber_w->errnum);
return -1;
}
}
}
#endif

View File

@ -85,23 +85,14 @@ int WINAPI acl_fiber_close(socket_t fd)
return ret;
}
// If the fd is in the status waiting for IO ready, the current fiber
// is trying to close the other fiber's fd, so, we should wakeup the
// suspending fiber and wait for its returning back, the process is:
// ->killer kill the fiber which is suspending and holding the fd;
// ->suspending fiber wakeup and return;
// ->killer closing the fd and free the fe.
ret = fiber_file_close(fe);
// If the fd isn't in waiting status, we don't know which fiber is
// holding the fd, but we can close it and free the fe with it.
// If the current fiber is holding the fd, we just close it ok, else
// if the other fiber is holding it, the IO API such as acl_fiber_read
// should hanlding the fd carefully, the process is:
// ->killer closing the fd and free fe owned by itself or other fiber;
// ->if fd was owned by the other fiber, calling API like acl_fiber_read
// will return -1 after fiber_wait_read returns.
fiber_file_close(fe);
// If the return value more than 0, the fe has just been bound with
// the other fiber, we just return here and the fe will really be
// closed and freed by the last one binding the fe.
if (ret > 0) {
return 0;
}
ev = fiber_io_event();
if (ev && ev->close_sock) {

View File

@ -40,6 +40,8 @@ static void handle_poll_read(EVENT *ev, FILE_EVENT *fe, POLLFD *pfd)
pfd->pfd->revents |= POLLNVAL;
}
pfd->fe->fiber_r = NULL;
if (!(pfd->pfd->events & POLLOUT)) {
ring_detach(&pfd->me);
pfd->fe = NULL;
@ -104,6 +106,8 @@ static void handle_poll_write(EVENT *ev, FILE_EVENT *fe, POLLFD *pfd)
pfd->pfd->revents |= POLLNVAL;
}
pfd->fe->fiber_w = NULL;
if (!(pfd->pfd->events & POLLIN)) {
ring_detach(&pfd->me);
pfd->fe = NULL;

View File

@ -77,20 +77,16 @@ static int __lock(ACL_FIBER_LOCK *lk, int block)
return 0;
}
if (acl_fiber_killed(curr)) {
msg_info("%s(%d), %s: lock fiber-%u was killed",
__FILE__, __LINE__, __FUNCTION__, acl_fiber_id(curr));
} else {
msg_error("%s(%d), %s: qlock: owner=%p self=%p oops",
__FILE__, __LINE__, __FUNCTION__, lk->owner, curr);
if (acl_fiber_canceled(curr)) {
acl_fiber_set_error(curr->errnum);
}
return -1;
}
void acl_fiber_lock_lock(ACL_FIBER_LOCK *lk)
int acl_fiber_lock_lock(ACL_FIBER_LOCK *lk)
{
__lock(lk, 1);
return __lock(lk, 1);
}
int acl_fiber_lock_trylock(ACL_FIBER_LOCK *lk)
@ -154,11 +150,11 @@ static int __rlock(ACL_FIBER_RWLOCK *lk, int block)
if (lk->writer == NULL && FIRST_FIBER(&lk->wwaiting) == NULL) {
lk->readers++;
return 1;
return 0;
}
if (!block) {
return 0;
return -1;
}
curr = acl_fiber_running();
@ -176,12 +172,16 @@ static int __rlock(ACL_FIBER_RWLOCK *lk, int block)
/* if switch to me because other killed me, I should detach myself */
ring_detach(&curr->me);
return 1;
if (acl_fiber_canceled(curr)) {
acl_fiber_set_error(curr->errnum);
return -1;
}
return 0;
}
void acl_fiber_rwlock_rlock(ACL_FIBER_RWLOCK *lk)
int acl_fiber_rwlock_rlock(ACL_FIBER_RWLOCK *lk)
{
(void) __rlock(lk, 1);
return __rlock(lk, 1);
}
int acl_fiber_rwlock_tryrlock(ACL_FIBER_RWLOCK *lk)
@ -196,11 +196,11 @@ static int __wlock(ACL_FIBER_RWLOCK *lk, int block)
if (lk->writer == NULL && lk->readers == 0) {
lk->writer = acl_fiber_running();
return 1;
return 0;
}
if (!block) {
return 0;
return -1;
}
curr = acl_fiber_running();
@ -218,12 +218,16 @@ static int __wlock(ACL_FIBER_RWLOCK *lk, int block)
/* if switch to me because other killed me, I should detach myself */
ring_detach(&curr->me);
return 1;
if (acl_fiber_canceled(curr)) {
acl_fiber_set_error(curr->errnum);
return -1;
}
return 0;
}
void acl_fiber_rwlock_wlock(ACL_FIBER_RWLOCK *lk)
int acl_fiber_rwlock_wlock(ACL_FIBER_RWLOCK *lk)
{
__wlock(lk, 1);
return __wlock(lk, 1);
}
int acl_fiber_rwlock_trywlock(ACL_FIBER_RWLOCK *lk)

View File

@ -84,6 +84,14 @@ int acl_fiber_sem_wait(ACL_FIBER_SEM *sem)
return -1;
}
// Sanity check befor suspending.
if (acl_fiber_canceled(curr)) {
acl_fiber_set_error(curr->errnum);
//msg_info("%s(%d): fiber-%d be killed",
// __FUNCTION__, __LINE__, acl_fiber_id(curr));
return -1;
}
ring_prepend(&sem->waiting, &curr->me);
curr->wstatus |= FIBER_WAIT_SEM;
@ -101,9 +109,10 @@ int acl_fiber_sem_wait(ACL_FIBER_SEM *sem)
*/
ring_detach(&curr->me);
if (acl_fiber_killed(curr)) {
msg_info("%s(%d): fiber-%d be killed",
__FUNCTION__, __LINE__, acl_fiber_id(curr));
if (acl_fiber_canceled(curr)) {
acl_fiber_set_error(curr->errnum);
//msg_info("%s(%d): fiber-%d be killed",
// __FUNCTION__, __LINE__, acl_fiber_id(curr));
return -1;
}

View File

@ -105,6 +105,11 @@ public:
*/
void set_errno(int errnum);
/**
*
*/
static void clear(void);
public:
/**
*

View File

@ -59,6 +59,15 @@ void fiber::set_errno(int errnum)
}
}
void fiber::clear(void)
{
ACL_FIBER *curr = acl_fiber_running();
if (curr) {
acl_fiber_clear(curr);
}
}
const char* fiber::last_serror(void)
{
return acl_fiber_last_serror();

View File

@ -49,7 +49,7 @@ void fiber_rwlock::rlock(void)
bool fiber_rwlock::tryrlock(void)
{
return acl_fiber_rwlock_tryrlock(rwlk_) == 0 ? false : true;
return acl_fiber_rwlock_tryrlock(rwlk_) == 0 ? true : false;
}
void fiber_rwlock::runlock(void)
@ -64,7 +64,7 @@ void fiber_rwlock::wlock(void)
bool fiber_rwlock::trywlock(void)
{
return acl_fiber_rwlock_trywlock(rwlk_) == 0 ? false : true;
return acl_fiber_rwlock_trywlock(rwlk_) == 0 ? true : false;
}
void fiber_rwlock::wunlock(void)

View File

@ -56,8 +56,8 @@ private:
static void usage(const char* procname) {
printf("usage: %s -h [help] -s server_addr\r\n"
" -c max_fibers\r\n"
" -n connect_count\r\n"
" -c max_fibers [default: 10]\r\n"
" -n connect_count [default: 10]\r\n"
, procname);
}

View File

@ -32,6 +32,7 @@ protected:
int fd = conn_->sock_handle();
char buf[8192];
while (true) {
printf(">>>fiber begin to read from fd=%d\r\n", conn_->sock_handle());
int ret = conn_->read(buf, sizeof(buf), false);
if (ret == -1) {
printf("read error=%s, fd=%d, %d\r\n",
@ -98,12 +99,13 @@ private:
int fd = conn.sock_handle();
if (use_kill_) {
printf("Kill fiber-%d\r\n", it->second->get_id());
it->second->kill();
} else {
// Must unbind the socket with the conn object
// to avoid closing the socket twice.
conn.unbind_sock();
printf("Kick the old fd=%d, my fd=%d\r\n",
//conn.unbind_sock();
printf("Close the old fd=%d, my fd=%d\r\n",
fd, conn_->sock_handle());
close(fd);
}
@ -111,15 +113,18 @@ private:
printf("Kick old %s, old fd=%d, my fd=%d\r\n", name_.c_str(),
fd, conn_->sock_handle());
#if 0
if (conn_->format("Welcome %s!\r\n", name_.c_str()) <= 0) {
printf("write to %s error %s\r\n",
name_.c_str(), acl::last_serror());
return false;
}
#endif
it = __users.find(name_);
if (it == __users.end()) {
__users[name_] = this;
printf("Add new one ok!\r\n");
return true;
} else {
printf("The other one has already logined!\r\n");
@ -128,7 +133,9 @@ private:
}
~fiber_client(void) {
printf(">>>Begin delete conn=%p, fd=%d\r\n", conn_, conn_->sock_handle());
delete conn_;
printf(">>>Delete ok, conn=%p\r\n\r\n", conn_);
}
};