This commit is contained in:
zhengshuxin 2022-10-26 07:03:34 -04:00
parent efe06cd020
commit 9286a008c5
18 changed files with 185 additions and 77 deletions

View File

@ -156,6 +156,10 @@ static void prepare_child_fds(ACL_MASTER_SERV *serv)
acl_msg_fatal("%s: dup2 listen_fd %d: %s",
myname, serv->listen_fds[n], strerror(errno));
}
acl_msg_info("%s: dup listen fd from %d to %d in %s mode",
myname, serv->listen_fds[n], ACL_MASTER_LISTEN_FD + n,
acl_is_blocking(ACL_MASTER_LISTEN_FD + n) == 1 ?
"non-blocking" : "blocking");
(void) close(serv->listen_fds[n]);
acl_vstream_free(serv->listen_streams[n]);
}

View File

@ -72,7 +72,7 @@ service $<PROGRAM>
# 程序标准输出重定向至指定文件中
# master_stdout = {install_path}/var/log/stdout.log
# 程序错误输出重定向至指定文件中
# master_stderr = {install_path}/var/log/tderr.log
# master_stderr = {install_path}/var/log/stderr.log
# 事件引擎: kernel, poll, select, io_uring
fiber_schedule_event = kernel

View File

@ -6,3 +6,6 @@ $ lldb --core "/cores/core.xxxxx"
(lldb) bt all
(lldb) thread select 1
(lldb) frame select 1
# open core dump on ubuntu
systemctl disable apport.service

View File

@ -77,6 +77,9 @@ ACL_API int acl_write_to_log2(const char *info, const char *fmt, va_list ap);
*/
ACL_API void acl_close_log(void);
ACL_API ACL_ARRAY *acl_log_get_streams(void);
ACL_API void acl_log_free_streams(ACL_ARRAY *a);
#ifdef __cplusplus
}
#endif

View File

@ -103,6 +103,30 @@ static ACL_FIFO *__loggers = NULL;
static int __log_close_onexec = 1;
ACL_ARRAY *acl_log_get_streams(void)
{
ACL_ARRAY *a;
ACL_ITER iter;
if (__loggers == NULL) {
return NULL;
}
a = acl_array_create(1);
acl_foreach(iter, __loggers) {
ACL_LOG *log = (ACL_LOG*) iter.data;
acl_array_append(a, log);
}
return a;
}
void acl_log_free_streams(ACL_ARRAY *a)
{
if (a) {
acl_array_free(a, NULL);
}
}
void acl_log_close_onexec(int yes)
{
__log_close_onexec = yes;

View File

@ -90,6 +90,8 @@ FIBER_API void acl_fiber_freeaddrinfo(struct addrinfo *res);
#endif
FIBER_API void acl_fiber_set_sysio(socket_t fd);
#ifdef __cplusplus
}
#endif

View File

@ -39,7 +39,8 @@ EVENT *event_create(int size)
#ifdef HAS_POLL
ev = event_poll_create(size);
#else
msg_fatal("%s(%d): not support!", __FUNCTION__, __LINE__);
printf("%s(%d): not support!\r\n", __FUNCTION__, __LINE__);
assert(0);
#endif
break;
case FIBER_EVENT_SELECT:
@ -49,14 +50,16 @@ EVENT *event_create(int size)
#ifdef HAS_WMSG
ev = event_wmsg_create(size);
#else
msg_fatal("%s(%d): WMSG not support!", __FUNCTION__, __LINE__);
printf("%s(%d): WMSG not support!\r\n", __FUNCTION__, __LINE__);
assert(0);
#endif
break;
case FIBER_EVENT_IO_URING:
#ifdef HAS_IO_URING
ev = event_io_uring_create(size);
#else
msg_fatal("%s(%d): IO_URING not support!", __FUNCTION__, __LINE__);
printf("%s(%d): IO_URING not support!\r\n", __FUNCTION__, __LINE__);
assert(0);
#endif
break;
default:
@ -67,7 +70,7 @@ EVENT *event_create(int size)
#elif defined(HAS_IOCP)
ev = event_iocp_create(size);
#else
msg_fatal("%s(%d): not support!", __FUNCTION__, __LINE__);
printf("%s(%d): not support!\r\n", __FUNCTION__, __LINE__);
assert(0);
#endif
break;

View File

@ -124,25 +124,26 @@ struct FILE_EVENT {
unsigned mask;
#define EVENT_NONE 0
#define EVENT_READ (unsigned) (1 << 0)
#define EVENT_WRITE (unsigned) (1 << 1)
#define EVENT_ERR (unsigned) (1 << 2)
#define EVENT_HUP (unsigned) (1 << 3)
#define EVENT_NVAL (unsigned) (1 << 4)
#define EVENT_SYSIO (unsigned) (1 << 0)
#define EVENT_READ (unsigned) (1 << 1)
#define EVENT_WRITE (unsigned) (1 << 2)
#define EVENT_ERR (unsigned) (1 << 3)
#define EVENT_HUP (unsigned) (1 << 4)
#define EVENT_NVAL (unsigned) (1 << 5)
#ifdef HAS_IO_URING
#define EVENT_ACCEPT (unsigned) (1 << 5)
#define EVENT_CONNECT (unsigned) (1 << 6)
#define EVENT_POLLIN (unsigned) (1 << 7)
#define EVENT_POLLOUT (unsigned) (1 << 8)
#define EVENT_FILE_OPENAT (unsigned) (1 << 9)
#define EVENT_FILE_CLOSE (unsigned) (1 << 10)
#define EVENT_FILE_UNLINK (unsigned) (1 << 11)
#define EVENT_FILE_STATX (unsigned) (1 << 12)
#define EVENT_FILE_RENAMEAT2 (unsigned) (1 << 13)
#define EVENT_DIR_MKDIRAT (unsigned) (1 << 14)
#define EVENT_SPLICE (unsigned) (1 << 15)
#define EVENT_ACCEPT (unsigned) (1 << 6)
#define EVENT_CONNECT (unsigned) (1 << 7)
#define EVENT_POLLIN (unsigned) (1 << 8)
#define EVENT_POLLOUT (unsigned) (1 << 9)
#define EVENT_FILE_OPENAT (unsigned) (1 << 10)
#define EVENT_FILE_CLOSE (unsigned) (1 << 11)
#define EVENT_FILE_UNLINK (unsigned) (1 << 12)
#define EVENT_FILE_STATX (unsigned) (1 << 13)
#define EVENT_FILE_RENAMEAT2 (unsigned) (1 << 14)
#define EVENT_DIR_MKDIRAT (unsigned) (1 << 15)
#define EVENT_SPLICE (unsigned) (1 << 16)
#endif // HAS_IO_URING

View File

@ -220,8 +220,7 @@ static int epoll_event_wait(EVENT *ev, int timeout)
if (acl_fiber_last_error() == FIBER_EINTR) {
return 0;
}
msg_fatal("%s: epoll_wait error %s",
__FUNCTION__, last_serror());
msg_fatal("%s: epoll_wait error %s", __FUNCTION__, last_serror());
} else if (n == 0) {
return 0;
}

View File

@ -569,6 +569,8 @@ EVENT *event_io_uring_create(int size)
eu->sqe_size = size;
}
// XXX: Don't write log here to avoid IO write recursive!
//eu->sqe_size = 256;
memset(&params, 0, sizeof(params));
@ -576,17 +578,19 @@ EVENT *event_io_uring_create(int size)
ret = io_uring_queue_init_params(eu->sqe_size, &eu->ring, &params);
if (ret < 0) {
msg_fatal("%s(%d): init io_uring error=%s, size=%zd",
printf("%s(%d): init io_uring error=%s, size=%zd\r\n",
__FUNCTION__, __LINE__, strerror(-ret), eu->sqe_size);
abort();
} else {
msg_info("%s(%d): init io_uring ok, size=%zd",
printf("%s(%d): init io_uring ok, size=%zd\r\n",
__FUNCTION__, __LINE__, eu->sqe_size);
}
if (!(params.features & IORING_FEAT_FAST_POLL)) {
msg_info("IORING_FEAT_FAST_POLL not available in the kernel");
printf("IORING_FEAT_FAST_POLL not available in the kernel\r\n");
} else {
msg_info("IORING_FEAT_FAST_POLL is available in the kernel");
printf("IORING_FEAT_FAST_POLL is available in the kernel\r\n");
msg_info("IORING_FEAT_FAST_POLL is available in the kernel\r\n");
}
eu->appending = 0;

View File

@ -641,8 +641,9 @@ EVENT *event_iocp_create(int size)
ei->h_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (ei->h_iocp == NULL) {
msg_fatal("%s(%d): create iocp error(%s)",
printf("%s(%d): create iocp error(%s)\r\n",
__FUNCTION__, __LINE__, last_serror());
abort();
}
ei->events = array_create(100);

View File

@ -254,10 +254,7 @@ EVENT *event_poll_create(int size)
if (sys_poll == NULL) {
hook_once();
if (sys_poll == NULL) {
msg_error("%s: sys_poll NULL", __FUNCTION__);
return NULL;
}
assert(sys_poll != NULL);
}
// override size with system open limit setting

View File

@ -248,10 +248,7 @@ EVENT *event_select_create(int size)
if (sys_select == NULL) {
hook_once();
if (sys_select == NULL) {
msg_error("%s: sys_select NULL", __FUNCTION__);
return NULL;
}
assert(sys_select != NULL);
}
// override size with system open limit setting

View File

@ -161,8 +161,9 @@ static void fiber_check(void)
}
if (pthread_once(&__once_control, thread_init) != 0) {
msg_fatal("%s(%d), %s: pthread_once error %s",
printf("%s(%d), %s: pthread_once error %s\r\n",
__FILE__, __LINE__, __FUNCTION__, last_serror());
abort();
}
__thread_fiber = (THREAD *) mem_calloc(1, sizeof(THREAD));
@ -188,7 +189,8 @@ static void fiber_check(void)
__main_fiber = __thread_fiber;
atexit(fiber_schedule_main_free);
} else if (pthread_setspecific(__fiber_key, __thread_fiber) != 0) {
msg_fatal("pthread_setspecific error!");
printf("pthread_setspecific error!\r\n");
abort();
}
}

View File

@ -83,7 +83,7 @@ static void fiber_io_main_free(void)
}
}
static void thread_init(void)
static void thread_once(void)
{
if (pthread_key_create(&__fiber_key, thread_free) != 0) {
msg_fatal("%s(%d), %s: pthread_key_create error %s",
@ -91,29 +91,8 @@ static void thread_init(void)
}
}
static pthread_once_t __once_control = PTHREAD_ONCE_INIT;
// Notice: don't write log here to avoid recursive calling when user call
// acl_fiber_msg_register() to hook the log process.
void fiber_io_check(void)
static void thread_init(void)
{
if (__thread_fiber != NULL) {
if (__thread_fiber->ev_fiber == NULL) {
__thread_fiber->ev_fiber = acl_fiber_create(fiber_io_loop,
__thread_fiber->event, STACK_SIZE);
__thread_fiber->nsleeping = 0;
__thread_fiber->io_stop = 0;
ring_init(&__thread_fiber->ev_timer);
}
return;
}
if (pthread_once(&__once_control, thread_init) != 0) {
printf("%s(%d), %s: pthread_once error %s\r\n",
__FILE__, __LINE__, __FUNCTION__, last_serror());
abort();
}
var_maxfd = open_limit(0);
if (var_maxfd <= 0) {
var_maxfd = MAXFD;
@ -143,6 +122,33 @@ void fiber_io_check(void)
}
}
static pthread_once_t __once_control = PTHREAD_ONCE_INIT;
// Notice: don't write log here to avoid recursive calling when user call
// acl_fiber_msg_register() to hook the log process.
void fiber_io_check(void)
{
if (__thread_fiber != NULL) {
if (__thread_fiber->ev_fiber == NULL) {
__thread_fiber->ev_fiber = acl_fiber_create(fiber_io_loop,
__thread_fiber->event, STACK_SIZE);
__thread_fiber->nsleeping = 0;
__thread_fiber->io_stop = 0;
ring_init(&__thread_fiber->ev_timer);
}
return;
}
if (pthread_once(&__once_control, thread_once) != 0) {
printf("%s(%d), %s: pthread_once error %s\r\n",
__FILE__, __LINE__, __FUNCTION__, last_serror());
abort();
}
thread_init();
}
EVENT *fiber_io_event(void)
{
fiber_io_check();
@ -473,8 +479,18 @@ FILE_EVENT *fiber_file_get(socket_t fd)
return (FILE_EVENT *) htable_find(__thread_fiber->events, key);
#else
fiber_io_check();
if (fd == INVALID_SOCKET || fd >= var_maxfd) {
if (fd <= INVALID_SOCKET || fd >= var_maxfd) {
#ifdef HAS_IO_URING
if (EVENT_IS_IO_URING(fiber_io_event())) {
printf("%s(%d): invalid fd=%d\r\n",
__FUNCTION__, __LINE__, fd);
} else {
msg_error("%s(%d): invalid fd=%d",
__FUNCTION__, __LINE__, fd);
}
#else
msg_error("%s(%d): invalid fd=%d", __FUNCTION__, __LINE__, fd);
#endif
return NULL;
}
@ -492,12 +508,14 @@ void fiber_file_set(FILE_EVENT *fe)
htable_enter(__thread_fiber->events, key, fe);
#else
if (fe->fd == INVALID_SOCKET || fe->fd >= (socket_t) var_maxfd) {
msg_fatal("%s(%d): invalid fd=%d", __FUNCTION__, __LINE__, fe->fd);
if (fe->fd <= INVALID_SOCKET || fe->fd >= (socket_t) var_maxfd) {
printf("%s(%d): invalid fd=%d\r\n", __FUNCTION__, __LINE__, fe->fd);
abort();
}
if (__thread_fiber->events[fe->fd] != NULL) {
msg_fatal("%s(%d): exist fd=%d", __FUNCTION__, __LINE__, fe->fd);
printf("%s(%d): exist fd=%d\r\n", __FUNCTION__, __LINE__, fe->fd);
abort();
}
__thread_fiber->events[fe->fd] = fe;
@ -533,6 +551,22 @@ FILE_EVENT *fiber_file_open_write(socket_t fd)
return fe;
}
void acl_fiber_set_sysio(socket_t fd)
{
FILE_EVENT *fe;
if (fd <= INVALID_SOCKET) {
return;
}
fe = fiber_file_get(fd);
if (fe == NULL) {
fe = file_event_alloc(fd);
fiber_file_set(fe);
}
fe->mask |= EVENT_SYSIO;
}
static int fiber_file_del(FILE_EVENT *fe, socket_t fd)
{
#ifdef SYS_WIN

View File

@ -577,20 +577,20 @@ ssize_t acl_fiber_recvmsg(socket_t sockfd, struct msghdr *msg, int flags)
#if defined(HAS_IO_URING)
# define CHECK_SET_NBLOCK(_fd) do { \
if (var_hook_sys_api && !EVENT_IS_IO_URING(fiber_io_event())) { \
FILE_EVENT *fe = fiber_file_get(_fd); \
if (fe && IS_NDUBLOCK(fe)) { \
FILE_EVENT *_fe = fiber_file_get(_fd); \
if (_fe && IS_NDUBLOCK(_fe)) { \
non_blocking(_fd, NON_BLOCKING); \
CLR_NDUBLOCK(fe); \
CLR_NDUBLOCK(_fe); \
} \
} \
} while (0)
#else
# define CHECK_SET_NBLOCK(_fd) do { \
if (var_hook_sys_api) { \
FILE_EVENT *fe = fiber_file_get(_fd); \
if (fe && IS_NDUBLOCK(fe)) { \
FILE_EVENT *_fe = fiber_file_get(_fd); \
if (_fe && IS_NDUBLOCK(_fe)) { \
non_blocking(_fd, NON_BLOCKING); \
CLR_NDUBLOCK(fe); \
CLR_NDUBLOCK(_fe); \
} \
} \
} while (0)
@ -599,11 +599,15 @@ ssize_t acl_fiber_recvmsg(socket_t sockfd, struct msghdr *msg, int flags)
#ifdef SYS_UNIX
ssize_t acl_fiber_write(socket_t fd, const void *buf, size_t count)
{
if (fd == INVALID_SOCKET) {
msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
FILE_EVENT *fe;
if (fd <= INVALID_SOCKET) {
//msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
return -1;
}
fe = fiber_file_open_write(fd);
if (sys_write == NULL) {
hook_once();
}
@ -615,8 +619,7 @@ ssize_t acl_fiber_write(socket_t fd, const void *buf, size_t count)
#if defined(HAS_IO_URING)
# ifndef WRITE_FIRST
if (EVENT_IS_IO_URING(fiber_io_event())) {
FILE_EVENT *fe = fiber_file_open_write(fd);
if (EVENT_IS_IO_URING(fiber_io_event()) && !(fe->mask & EVENT_SYSIO)) {
CLR_POLLING(fe);
return fiber_iocp_write(fe, buf, (int) count);
}
@ -625,7 +628,6 @@ ssize_t acl_fiber_write(socket_t fd, const void *buf, size_t count)
while (1) {
ssize_t n = (*sys_write)(fd, buf, count);
FILE_EVENT *fe;
int err;
if (!var_hook_sys_api) {

View File

@ -149,6 +149,7 @@ socket_t WINAPI acl_fiber_accept(socket_t sockfd, struct sockaddr *addr,
#ifdef FAST_ACCEPT
// We can set the sockfd in non-blocking mode for not the io_uring.
non_blocking(sockfd, NON_BLOCKING);
# ifdef SYS_WSA_API

View File

@ -75,9 +75,11 @@ static ACL_CONFIG_STR_TABLE __conf_str_tab[] = {
static int acl_var_fiber_quick_abort;
static int acl_var_fiber_share_stack;
static int acl_var_fiber_hook_log;
static ACL_CONFIG_BOOL_TABLE __conf_bool_tab[] = {
{ "fiber_quick_abort", 1, &acl_var_fiber_quick_abort },
{ "fiber_share_stack", 0, &acl_var_fiber_share_stack },
{ "fiber_hook_log", 1, &acl_var_fiber_hook_log },
{ 0, 0, 0 },
};
@ -156,6 +158,11 @@ static void thread_fiber_accept(ACL_FIBER *fiber, void *ctx)
acl_fiber_attr_setstacksize(&attr, acl_var_fiber_stack_size);
acl_fiber_attr_setsharestack(&attr, acl_var_fiber_share_stack ? 1 : 0);
// We must set the listen fd in blocking mode for io_uring engine.
if (__fiber_schedule_event == FIBER_EVENT_IO_URING) {
acl_non_blocking(ACL_VSTREAM_SOCK(sstream), ACL_BLOCKING);
}
while (1) {
cstream = acl_vstream_accept(sstream, ip, sizeof(ip));
if (cstream != NULL) {
@ -1094,6 +1101,27 @@ static void fiber_log_writer(void *, const char *fmt, va_list ap)
acl_msg_info("%s", buf.c_str());
}
static void hook_fiber_log(void)
{
ACL_ARRAY *loggers = acl_log_get_streams();
ACL_ITER iter;
if (loggers) {
acl_foreach(iter, loggers) {
ACL_VSTREAM *fp = (ACL_VSTREAM*) iter.data;
printf(">>>fd=%d\n", ACL_VSTREAM_FILE(fp));
acl_fiber_set_sysio(ACL_VSTREAM_FILE(fp));
}
acl_log_free_streams(loggers);
}
// If hook flag been set, the log of fiber module will be
// written to the current log file.
acl_fiber_msg_register(fiber_log_writer, NULL);
}
void acl_fiber_server_main(int argc, char *argv[],
void (*service)(void*, ACL_VSTREAM*), void *ctx, int name, ...)
{
@ -1311,7 +1339,10 @@ void acl_fiber_server_main(int argc, char *argv[],
}
acl_msg_info("schedule event type - %s", acl_var_fiber_schedule_event);
acl_fiber_msg_register(fiber_log_writer, NULL);
if (acl_var_fiber_hook_log) {
hook_fiber_log();
}
#if !defined(_WIN32) && !defined(_WIN64)
/* notify master that child started ok */