Merge branch 'gitee-master' into gitlab-upstream

This commit is contained in:
zhengshuxin 2023-07-11 16:35:28 +08:00
commit 59b67ce231
18 changed files with 163 additions and 96 deletions

View File

@ -102,8 +102,8 @@ endif()
# for lower version ndk such add ndk12b, the read/write functions have confusions in the ndk's header,
# so we should disable hook io in io.c
if (ACL_DISABLE_HOOK_IO MATCHES "YES")
add_definitions("-DDISABLE_HOOK_IO")
if (ACL_DISABLE_HOOK MATCHES "YES")
add_definitions("-DDISABLE_HOOK")
endif()
##############################################################################

View File

@ -59,11 +59,11 @@ void fbase_event_open(FIBER_BASE *fbase)
__FILE__, __LINE__, __FUNCTION__, (int) fbase->event_in);
}
fbase->in = fiber_file_open_read(fbase->event_in);
fbase->in = fiber_file_open(fbase->event_in);
if (fbase->event_in == fbase->event_out) {
fbase->out = fbase->in;
} else {
fbase->out = fiber_file_open_write(fbase->event_out);
fbase->out = fiber_file_open(fbase->event_out);
}
#if defined(HAS_IO_URING)

View File

@ -132,8 +132,7 @@ EVENT *fiber_io_event(void);
void fiber_timer_add(ACL_FIBER *fiber, unsigned milliseconds);
int fiber_timer_del(ACL_FIBER *fiber);
FILE_EVENT *fiber_file_open_read(socket_t fd);
FILE_EVENT *fiber_file_open_write(socket_t fd);
FILE_EVENT *fiber_file_open(socket_t fd);
void fiber_file_set(FILE_EVENT *fe);
FILE_EVENT *fiber_file_get(socket_t fd);
void fiber_file_free(FILE_EVENT *fe);

View File

@ -277,7 +277,7 @@ static void fiber_io_loop(ACL_FIBER *self fiber_unused, void *ctx)
} else if (ring_size(&ev->events) > 0) {
continue;
}
#if 0
// Only sleep fiber alive ?
timer = TIMER_FIRST(__thread_fiber->ev_timer);
@ -581,7 +581,7 @@ void fiber_file_set(FILE_EVENT *fe)
#endif
}
FILE_EVENT *fiber_file_open_read(socket_t fd)
FILE_EVENT *fiber_file_open(socket_t fd)
{
FILE_EVENT *fe = fiber_file_get(fd);
@ -600,23 +600,6 @@ FILE_EVENT *fiber_file_open_read(socket_t fd)
*/
// Don't set fiber_r here, which will be set in fiber_wait_read()
//fe->fiber_r = acl_fiber_running();
return fe;
}
FILE_EVENT *fiber_file_open_write(socket_t fd)
{
FILE_EVENT *fe = fiber_file_get(fd);
if (fe == NULL) {
fe = file_event_alloc(fd);
fiber_file_set(fe);
#ifdef HAS_IO_URING
if (var_hook_sys_api && EVENT_IS_IO_URING(fiber_io_event())) {
fe->mask |= EVENT_DIRECT;
}
#endif
}
// Don't set fiber_w here, which will be set in fiber_wait_write()
//fe->fiber_w = acl_fiber_running();
return fe;

View File

@ -530,7 +530,7 @@ static void epoll_ctl_add(EVENT *ev, EPOLL_EVENT *ee,
if (event->events & EPOLLIN) {
epx->mask |= EVENT_READ;
epx->fe = fiber_file_open_read(fd);
epx->fe = fiber_file_open(fd);
epx->fe->epx = epx;
event_add_read(ev, epx->fe, read_callback);
@ -539,7 +539,7 @@ static void epoll_ctl_add(EVENT *ev, EPOLL_EVENT *ee,
if (event->events & EPOLLOUT) {
epx->mask |= EVENT_WRITE;
epx->fe = fiber_file_open_write(fd);
epx->fe = fiber_file_open(fd);
epx->fe->epx = epx;
event_add_write(ev, epx->fe, write_callback);

View File

@ -378,7 +378,7 @@ int fiber_sendmmsg(FILE_EVENT *fe, struct mmsghdr *msgvec, unsigned int vlen,
# endif // HAS_MMSG
#endif // SYS_UNIX
#if defined(__USE_LARGEFILE64) && !defined(DISABLE_HOOK_IO)
#if defined(__USE_LARGEFILE64) && !defined(DISABLE_HOOK)
ssize_t fiber_sendfile64(socket_t out_fd, int in_fd, off64_t *offset, size_t count)
{
@ -410,7 +410,7 @@ ssize_t fiber_sendfile64(socket_t out_fd, int in_fd, off64_t *offset, size_t cou
return -1;
}
fe = fiber_file_open_write(out_fd);
fe = fiber_file_open(out_fd);
CLR_POLLING(fe);
if (fiber_wait_write(fe) < 0) {

View File

@ -616,7 +616,7 @@ ssize_t file_sendfile(socket_t out_fd, int in_fd, off64_t *off, size_t cnt)
return -1;
}
fe = fiber_file_open_write(out_fd);
fe = fiber_file_open(out_fd);
CLR_POLLING(fe);
fe->mask |= EVENT_POLLOUT;

View File

@ -180,7 +180,7 @@ void WINAPI acl_fiber_freeaddrinfo(struct addrinfo *res)
resolver_freeaddrinfo(res);
}
#ifdef SYS_UNIX
#if defined(SYS_UNIX) && !defined(DISABLE_HOOK)
int getaddrinfo(const char *node, const char *service,
const struct addrinfo* hints, struct addrinfo **res)

View File

@ -11,7 +11,7 @@
#define IS_INVALID(fd) (fd == INVALID_SOCKET)
#endif
#if defined(SYS_UNIX) && !defined(DISABLE_HOOK_IO)
#if defined(SYS_UNIX) && !defined(DISABLE_HOOK)
unsigned int sleep(unsigned int seconds)
{
@ -148,7 +148,7 @@ ssize_t acl_fiber_read(socket_t fd, void *buf, size_t count)
return (*sys_read)(fd, buf, count);
}
fe = fiber_file_open_read(fd);
fe = fiber_file_open(fd);
return fiber_read(fe, buf, count);
}
@ -168,7 +168,7 @@ ssize_t acl_fiber_readv(socket_t fd, const struct iovec *iov, int iovcnt)
return (*sys_readv)(fd, iov, iovcnt);
}
fe = fiber_file_open_read(fd);
fe = fiber_file_open(fd);
return fiber_readv(fe, iov, iovcnt);
}
@ -207,7 +207,7 @@ ssize_t acl_fiber_recv(socket_t sockfd, void *buf, size_t len, int flags)
return (*sys_recv)(sockfd, buf, len, flags);
}
fe = fiber_file_open_read(sockfd);
fe = fiber_file_open(sockfd);
return fiber_recv(fe, buf, len, flags);
}
@ -234,7 +234,7 @@ ssize_t acl_fiber_recvfrom(socket_t sockfd, void *buf, size_t len,
src_addr, addrlen);
}
fe = fiber_file_open_read(sockfd);
fe = fiber_file_open(sockfd);
return fiber_recvfrom(fe, buf, len, flags, src_addr, addrlen);
}
@ -256,7 +256,7 @@ ssize_t acl_fiber_recvmsg(socket_t sockfd, struct msghdr *msg, int flags)
return (*sys_recvmsg)(sockfd, msg, flags);
}
fe = fiber_file_open_read(sockfd);
fe = fiber_file_open(sockfd);
return fiber_recvmsg(fe, msg, flags);
}
@ -283,7 +283,7 @@ int acl_fiber_recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
return (*sys_recvmmsg)(sockfd, msgvec, vlen, flags, timeout);
}
fe = fiber_file_open_read(sockfd);
fe = fiber_file_open(sockfd);
return fiber_recvmmsg(fe, msgvec, vlen, flags, timeout);
}
# endif
@ -310,7 +310,7 @@ ssize_t acl_fiber_write(socket_t fd, const void *buf, size_t count)
return (*sys_write)(fd, buf, count);
}
fe = fiber_file_open_write(fd);
fe = fiber_file_open(fd);
return fiber_write(fe, buf, count);
}
@ -330,7 +330,7 @@ ssize_t acl_fiber_writev(socket_t fd, const struct iovec *iov, int iovcnt)
return (*sys_writev)(fd, iov, iovcnt);
}
fe = fiber_file_open_write(fd);
fe = fiber_file_open(fd);
return fiber_writev(fe, iov, iovcnt);
}
@ -356,7 +356,7 @@ ssize_t acl_fiber_send(socket_t sockfd, const void *buf, size_t len, int flags)
return (int) (*sys_send)(sockfd, buf, len, flags);
}
fe = fiber_file_open_write(sockfd);
fe = fiber_file_open(sockfd);
return fiber_send(fe, buf, len, flags);
}
@ -384,7 +384,7 @@ ssize_t acl_fiber_sendto(socket_t sockfd, const void *buf, size_t len,
dest_addr, addrlen);
}
fe = fiber_file_open_write(sockfd);
fe = fiber_file_open(sockfd);
return fiber_sendto(fe, buf, len, flags, dest_addr, addrlen);
}
@ -406,7 +406,7 @@ ssize_t acl_fiber_sendmsg(socket_t sockfd, const struct msghdr *msg, int flags)
return (*sys_sendmsg)(sockfd, msg, flags);
}
fe = fiber_file_open_write(sockfd);
fe = fiber_file_open(sockfd);
return fiber_sendmsg(fe, msg, flags);
}
@ -433,7 +433,7 @@ int acl_fiber_sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
return (*sys_sendmmsg)(sockfd, msgvec, vlen, flags);
}
fe = fiber_file_open_write(sockfd);
fe = fiber_file_open(sockfd);
return fiber_sendmmsg(fe, msgvec, vlen, flags);
}
# endif
@ -442,7 +442,7 @@ int acl_fiber_sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
/****************************************************************************/
#if defined(SYS_UNIX) && !defined(DISABLE_HOOK_IO)
#if defined(SYS_UNIX) && !defined(DISABLE_HOOK)
ssize_t read(socket_t fd, void *buf, size_t count)
{
@ -472,7 +472,7 @@ ssize_t recvmsg(socket_t sockfd, struct msghdr *msg, int flags)
#endif // SYS_UNIX
#if defined(SYS_UNIX) && !defined(DISABLE_HOOK_IO)
#if defined(SYS_UNIX) && !defined(DISABLE_HOOK)
ssize_t write(socket_t fd, const void *buf, size_t count)
{
@ -525,7 +525,7 @@ int sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, int flags)
/****************************************************************************/
#if defined(__USE_LARGEFILE64) && !defined(DISABLE_HOOK_IO)
#if defined(__USE_LARGEFILE64) && !defined(DISABLE_HOOK)
ssize_t sendfile64(socket_t out_fd, int in_fd, off64_t *offset, size_t count)
{

View File

@ -44,7 +44,7 @@ int fiber_sendmmsg(FILE_EVENT *fe, struct mmsghdr *msgvec, unsigned int vlen,
int flags);
# endif
# if defined(__USE_LARGEFILE64) && !defined(DISABLE_HOOK_IO)
# if defined(__USE_LARGEFILE64) && !defined(DISABLE_HOOK)
ssize_t fiber_sendfile64(socket_t out_fd, int in_fd, off64_t *offset, size_t count);
# endif

View File

@ -68,13 +68,11 @@ static void handle_poll_read(EVENT *ev, FILE_EVENT *fe, POLLFD *pfd)
static void read_callback(EVENT *ev, FILE_EVENT *fe)
{
POLLFD *pfd;
//RING_ITER iter;
RING *iter = fe->pfds.succ, *next = iter;
event_del_read(ev, fe);
SET_READABLE(fe);
#if 1
// Walk througth the RING list, handle each poll event, and one RING
// node maybe be detached after it has been handled without any poll
// event bound with it again.
@ -85,15 +83,6 @@ static void read_callback(EVENT *ev, FILE_EVENT *fe)
handle_poll_read(ev, fe, pfd);
}
}
#else
ring_foreach(iter, &fe->pfds) {
pfd = ring_to_appl(iter.ptr, POLLFD, me);
if (pfd->pfd->events & POLLIN) {
handle_poll_read(ev, fe, pfd);
break;
}
}
#endif
}
/**
@ -134,13 +123,11 @@ static void handle_poll_write(EVENT *ev, FILE_EVENT *fe, POLLFD *pfd)
static void write_callback(EVENT *ev, FILE_EVENT *fe)
{
POLLFD *pfd;
//RING_ITER iter;
RING *iter = fe->pfds.succ, *next = iter;
event_del_write(ev, fe);
SET_WRITABLE(fe);
#if 1
for (; iter != &fe->pfds; iter = next) {
next = next->succ;
pfd = ring_to_appl(iter, POLLFD, me);
@ -148,15 +135,6 @@ static void write_callback(EVENT *ev, FILE_EVENT *fe)
handle_poll_write(ev, fe, pfd);
}
}
#else
ring_foreach(iter, &fe->pfds) {
pfd = ring_to_appl(iter.ptr, POLLFD, me);
if (pfd->pfd->events & POLLOUT) {
handle_poll_write(ev, fe, pfd);
break;
}
}
#endif
}
/**
@ -247,29 +225,13 @@ static void poll_event_clean(EVENT *ev, POLL_EVENT *pe)
}
}
/**
* This callback will be called from event_process_poll() in event.c and the
* fiber blocked after calling acl_fiber_switch() in acl_fiber_poll() will
* wakeup and continue to run.
*/
static void poll_callback(EVENT *ev fiber_unused, POLL_EVENT *pe)
{
if (pe->fiber->status != FIBER_STATUS_READY) {
acl_fiber_ready(pe->fiber);
}
}
static POLLFD *pollfd_alloc(POLL_EVENT *pe, struct pollfd *fds, nfds_t nfds)
{
POLLFD *pfds = (POLLFD *) mem_malloc(nfds * sizeof(POLLFD));
nfds_t i;
for (i = 0; i < nfds; i++) {
if (fds[i].events & POLLIN) {
pfds[i].fe = fiber_file_open_read(fds[i].fd);
} else {
pfds[i].fe = fiber_file_open_write(fds[i].fd);
}
pfds[i].fe = fiber_file_open(fds[i].fd);
#ifdef HAS_IOCP
pfds[i].fe->rbuf = NULL;
pfds[i].fe->rsize = 0;
@ -334,6 +296,18 @@ static void pollfds_copy(struct pollfd *fds, const pollfds *pfds)
#endif // SHARE_STACK
/**
* This callback will be called from event_process_poll() in event.c and the
* fiber blocked after calling acl_fiber_switch() in acl_fiber_poll() will
* wakeup and continue to run.
*/
static void poll_callback(EVENT *ev fiber_unused, POLL_EVENT *pe)
{
if (pe->fiber->status != FIBER_STATUS_READY) {
acl_fiber_ready(pe->fiber);
}
}
int WINAPI acl_fiber_poll(struct pollfd *fds, nfds_t nfds, int timeout)
{
long long now;
@ -367,6 +341,8 @@ int WINAPI acl_fiber_poll(struct pollfd *fds, nfds_t nfds, int timeout)
old_timeout = ev->timeout;
#ifdef SHARE_STACK
// In shared stack mode, the fds input must be save to the dynamic
// memory to avoid memory collision accessed by different fibers.
if (curr->oflag & ACL_FIBER_ATTR_SHARE_STACK) {
pfds = pollfds_save(fds, nfds);
pe = (POLL_EVENT *) mem_malloc(sizeof(POLL_EVENT));
@ -446,7 +422,7 @@ int WINAPI acl_fiber_poll(struct pollfd *fds, nfds_t nfds, int timeout)
return nready;
}
#ifdef SYS_UNIX
#if defined(SYS_UNIX) && !defined(DISABLE_HOOK)
int poll(struct pollfd *fds, nfds_t nfds, int timeout)
{
return acl_fiber_poll(fds, nfds, timeout);

View File

@ -142,7 +142,7 @@ socket_t WINAPI acl_fiber_accept(socket_t sockfd, struct sockaddr *addr,
#ifdef HAS_IO_URING
if (EVENT_IS_IO_URING(fiber_io_event())) {
fe = fiber_file_open_read(sockfd);
fe = fiber_file_open(sockfd);
return fiber_iocp_accept(fe);
}
#endif
@ -170,7 +170,7 @@ socket_t WINAPI acl_fiber_accept(socket_t sockfd, struct sockaddr *addr,
return INVALID_SOCKET;
}
fe = fiber_file_open_read(sockfd);
fe = fiber_file_open(sockfd);
while (1) {
if (fiber_wait_read(fe) < 0) {
@ -320,7 +320,7 @@ int WINAPI acl_fiber_connect(socket_t sockfd, const struct sockaddr *addr,
return sys_connect ? (*sys_connect)(sockfd, addr, addrlen) : -1;
}
fe = fiber_file_open_write(sockfd);
fe = fiber_file_open(sockfd);
SET_NDUBLOCK(fe);

View File

@ -28,14 +28,14 @@ static SYNC_TIMER *sync_timer_new(void)
out = mbox_out(timer->box);
assert(out != INVALID_SOCKET);
fe = fiber_file_open_write(out);
fe = fiber_file_open(out);
assert(fe);
fe->type |= TYPE_INTERNAL | TYPE_EVENTABLE;
in = mbox_in(timer->box);
assert(in != INVALID_SOCKET);
if (in != out) {
fe = fiber_file_open_read(in);
fe = fiber_file_open(in);
assert(fe);
fe->type |= TYPE_INTERNAL | TYPE_EVENTABLE;
}

View File

@ -24,14 +24,14 @@ static SYNC_WAITER *sync_waiter_new(void)
out = mbox_out(waiter->box);
assert(out != INVALID_SOCKET);
fe = fiber_file_open_write(out);
fe = fiber_file_open(out);
assert(fe);
fe->type |= TYPE_INTERNAL | TYPE_EVENTABLE;
in = mbox_in(waiter->box);
assert(in != INVALID_SOCKET);
if (in != out) {
fe = fiber_file_open_read(in);
fe = fiber_file_open(in);
assert(fe);
fe->type |= TYPE_INTERNAL | TYPE_EVENTABLE;
}
@ -129,7 +129,7 @@ void sync_waiter_wakeup(SYNC_WAITER *waiter, ACL_FIBER *fb)
fe->mask |= EVENT_SYSIO;
// The fe mabye be used again in mbox_send->acl_fiber_write
// ->fiber_file_open_write->fiber_file_get.
// ->fiber_file_open->fiber_file_get.
mbox_send(waiter->box, fb);
// If no other fiber is suspended by the sem, then release it.

View File

@ -0,0 +1,101 @@
#include "stdafx.h"
#include "test_io.h"
#ifdef __linux__
#include <sys/eventfd.h>
static int read_wait(int fd, int timeo)
{
struct pollfd pfd;
printf(">>>%s: fiber-%d, pfd=%p\r\n", __FUNCTION__, acl::fiber::self(), &pfd);
memset(&pfd, 0, sizeof(pfd));
pfd.fd = fd;
pfd.events = POLLIN;
int n = poll(&pfd, 1, timeo * 1000);
if (n < 0) {
printf("poll error: %s\r\n", acl::last_serror());
return -1;
}
if (n == 0) {
printf("poll read timeout: %s\r\n", acl::last_serror());
return 0;
}
printf("%s: fd=%d is readable!\r\n", __FUNCTION__, fd);
return 1;
}
static bool fiber_read(int fd, long long& out)
{
if (read_wait(fd, 2) <= 0) {
printf("read_wait error for fd=%d\r\n", fd);
return false;
}
long long n;
ssize_t ret = read(fd, &n, sizeof(n));
if (ret != sizeof(n)) {
printf("read from eventfd %d error %s\r\n",
fd, acl::last_serror());
return false;
} else {
out = n;
return true;
}
}
#endif // __linux__
int test_poll(AUT_LINE *test_line acl_unused, void *arg acl_unused)
{
#ifdef __linux__
int fd = eventfd(0, 0);
if (fd == -1) {
printf("create eventfd error %s\r\n", acl::last_serror());
return -1;
}
long long out = 0, in = 1000000;
int shared_stack = 0;
AUT_INT(test_line, "shared_stack", shared_stack, 0);
if (shared_stack) {
printf(">>>fiber's stack shared\r\n");
go_share(8000) [=, &out] {
(void) fiber_read(fd, out);
};
} else {
printf(">>>fiber's stack no-shared\r\n");
go[=, &out] {
(void) fiber_read(fd, out);
};
}
go[=] {
long long n = in;
ssize_t ret = write(fd, &n, sizeof(n));
if (ret != sizeof(n)) {
printf("write to eventfd %d error %s\r\n",
fd, acl::last_serror());
}
};
acl::fiber::schedule();
if (out == in) {
printf("Ok, the result read from eventfd: %lld\r\n", out);
return 0;
} else {
printf("Err, the result is %lld, but need %lld\r\n", out, in);
return -1;
}
#else
printf("eventfd only be supported on Linux\r\n");
return 0;
#endif
}

View File

@ -5,3 +5,6 @@ void io_register(void);
/* In eventfd.cpp */
int test_eventfd(AUT_LINE *test_line, void *arg);
/* In poll.cpp */
int test_poll(AUT_LINE *test_line, void *arg);

View File

@ -6,5 +6,8 @@ static AUT_FN_ITEM __test_fn_tab[] = {
/* In eventfd.cpp */
{ "test_eventfd", "test_eventfd", test_eventfd, NULL, 0 },
/* In poll.cpp */
{ "test_poll", "test_poll", test_poll, NULL, 0 },
{ NULL, NULL, NULL, NULL, 0 },
};

View File

@ -9,3 +9,5 @@ tbox_mixed_consume|0|0|threads_consumer=2,threads_producer=2,threads_consumer_al
file_load|0|0|filename=main.cpp
file_load|0|0|filename=Makefile.in,show=1
test_eventfd|0|0|
test_poll|0|0|shared_stack=0
test_poll|0|0|shared_stack=1