Merge branch 'gitee-master' into gitlab-upstream

This commit is contained in:
zhengshuxin 2023-06-27 21:54:00 +08:00
commit 9120cb2c98
36 changed files with 1144 additions and 381 deletions

View File

@ -1,7 +1,6 @@
#include "stdafx.h"
#include "file_tmpl.h"
file_tmpl::file_tmpl(void)
{
}
@ -159,6 +158,17 @@ bool file_tmpl::create_common()
return files_copy(name, tab);
}
bool file_tmpl::create_other()
{
if (!copy_and_replace("setup-other.sh", "setup.sh", true)) {
return false;
}
acl::string path(project_name_);
path += ".cf";
return copy_and_replace("master_other.cf", path);
}
bool file_tmpl::file_copy(const char* from, const char* to_in)
{
string to_buf;

View File

@ -23,6 +23,7 @@ public:
bool copy_and_replace(const char* from,
const char* to, bool exec = false);
bool create_common();
bool create_other();
bool file_copy(const char* from, const char* to);
bool files_copy(const char* name, const FILE_FROM_TO* tab);

View File

@ -102,34 +102,40 @@ static bool create_proc(file_tmpl& tmpl)
return tmpl.files_copy(name, tab);
}
static bool create_service(file_tmpl& tmpl)
static bool create_service(file_tmpl& tmpl, const char* type)
{
printf("choose master_service type:\r\n");
printf(" t: for master_threads\r\n"
" f: for master_fiber\r\n"
" p: for master_proc\r\n");
printf(">");
fflush(stdout);
char buf[256];
int n = acl_vstream_gets_nonl(ACL_VSTREAM_IN, buf, sizeof(buf));
if (n == ACL_VSTREAM_EOF) {
return false;
} else if (strcasecmp(buf, "t") == 0) {
if (type == NULL || *type == 0) {
printf("choose master_service type:\r\n");
printf(" t: for master_threads\r\n"
" f: for master_fiber\r\n"
" p: for master_proc\r\n");
printf(">");
fflush(stdout);
int n = acl_vstream_gets_nonl(ACL_VSTREAM_IN, buf, sizeof(buf));
if (n == ACL_VSTREAM_EOF) {
return false;
}
type = buf;
}
if (strcasecmp(type, "t") == 0) {
create_threads(tmpl);
} else if (strcasecmp(buf, "p") == 0) {
} else if (strcasecmp(type, "p") == 0) {
create_proc(tmpl);
} else if (strcasecmp(buf, "f") == 0) {
} else if (strcasecmp(type, "f") == 0) {
create_fiber(tmpl);
} else {
printf("invalid: %s\r\n", buf);
printf("invalid type: %s\r\n", type);
return false;
}
return true;
}
static bool create_http_servlet(file_tmpl& tmpl)
static bool create_http_servlet(file_tmpl& tmpl, const char* type)
{
tpl_t* tpl = tmpl.open_tpl("http_servlet.cpp");
if (tpl == NULL) {
@ -158,50 +164,52 @@ static bool create_http_servlet(file_tmpl& tmpl)
tpl_free(tpl);
// 设置服务器模板类型
return create_service(tmpl);
return create_service(tmpl, type);
}
void http_creator()
void http_creator(const char* name, const char* type)
{
bool loop;
file_tmpl tmpl;
if (name && *name && type && *type) {
loop = true;
} else {
loop = false;
}
// 设置源程序所在目录
tmpl.set_path_from("tmpl/http");
while (true) {
printf("please input your program name: ");
fflush(stdout);
char buf[256];
int n;
n = acl_vstream_gets_nonl(ACL_VSTREAM_IN, buf, sizeof(buf));
if (n == ACL_VSTREAM_EOF) {
break;
if (name == NULL || *name == 0) {
printf("please input your program name: ");
fflush(stdout);
n = acl_vstream_gets_nonl(ACL_VSTREAM_IN, buf, sizeof(buf));
if (n == ACL_VSTREAM_EOF) {
break;
}
if (n == 0) {
acl::safe_snprintf(buf, sizeof(buf), "http_demo");
}
name = buf;
}
if (n == 0) {
acl::safe_snprintf(buf, sizeof(buf), "http_demo");
}
tmpl.set_project_name(name);
tmpl.set_project_name(buf);
// 创建目录
tmpl.create_dirs();
printf("please choose one http application type:\r\n");
printf("s: http servlet\r\n");
printf(">");
fflush(stdout);
tmpl.create_common();
create_http_servlet(tmpl, type);
n = acl_vstream_gets_nonl(ACL_VSTREAM_IN, buf, sizeof(buf));
if (n == ACL_VSTREAM_EOF) {
if (!loop) {
break;
} else if (strcasecmp(buf, "s") == 0) {
tmpl.create_common();
create_http_servlet(tmpl);
} else {
printf("unknown flag: %s\r\n", buf);
}
break;
}
}

View File

@ -1,3 +1,3 @@
#pragma once
void http_creator();
void http_creator(const char* name, const char* type);

View File

@ -137,12 +137,19 @@ static bool create_master_udp(file_tmpl& tmpl)
{ NULL, NULL }
};
return tmpl. files_copy(name, tab);
return tmpl.files_copy(name, tab);
}
void master_creator()
void master_creator(const char* name, const char* type)
{
file_tmpl tmpl;
bool loop;
if (name && *name && type && *type) {
loop = true;
} else {
loop = false;
}
// 设置源程序所在目录
tmpl.set_path_from("tmpl/master");
@ -151,73 +158,91 @@ void master_creator()
char buf[256];
int n;
printf("please input your program name: ");
fflush(stdout);
if (name == NULL || *name == 0) {
printf("please input your program name: ");
fflush(stdout);
n = acl_vstream_gets_nonl(ACL_VSTREAM_IN, buf, sizeof(buf));
if (n == ACL_VSTREAM_EOF) {
break;
n = acl_vstream_gets_nonl(ACL_VSTREAM_IN, buf, sizeof(buf));
if (n == ACL_VSTREAM_EOF) {
break;
}
if (n == 0) {
acl::safe_snprintf(buf, sizeof(buf), "master_service");
}
name = buf;
}
if (n == 0) {
acl::safe_snprintf(buf, sizeof(buf), "master_service");
}
// 设置项目名称, 一般与服务程序名相同
tmpl.set_project_name(name);
tmpl.set_project_name(buf);
// 创建目录
tmpl.create_dirs();
printf("choose master_service type:\r\n");
printf(" t: for master_threads\r\n"
" p: for master_proc\r\n"
" a: for master_aio\t\n"
" g: for master_trigger\r\n"
" r: for master_rpc\r\n"
" u: for master_udp\r\n"
" f: for master_fiber\r\n"
" s: skip choose, try again\r\n");
printf(">");
fflush(stdout);
if (type == NULL || *type == 0) {
printf("choose master_service type:\r\n");
printf(" t: for master_threads\r\n"
" p: for master_proc\r\n"
" a: for master_aio\t\n"
" g: for master_trigger\r\n"
" r: for master_rpc\r\n"
" u: for master_udp\r\n"
" f: for master_fiber\r\n"
" o: for other service\r\n"
" s: skip choose, try again\r\n");
printf(">");
fflush(stdout);
n = acl_vstream_gets_nonl(ACL_VSTREAM_IN, buf, sizeof(buf));
if (n == ACL_VSTREAM_EOF) {
break;
} else if (strcasecmp(buf, "t") == 0) {
n = acl_vstream_gets_nonl(ACL_VSTREAM_IN, buf, sizeof(buf));
if (n == ACL_VSTREAM_EOF) {
break;
}
type = buf;
}
if (strcasecmp(type, "t") == 0) {
tmpl.create_common();
create_master_threads(tmpl);
break;
} else if (strcasecmp(buf, "p") == 0) {
} else if (strcasecmp(type, "p") == 0) {
tmpl.create_common();
create_master_proc(tmpl);
break;
} else if (strcasecmp(buf, "a") == 0) {
} else if (strcasecmp(type, "a") == 0) {
tmpl.create_common();
create_master_aio(tmpl);
break;
} else if (strcasecmp(buf, "r") == 0) {
} else if (strcasecmp(type, "r") == 0) {
tmpl.create_common();
create_master_rpc(tmpl);
break;
} else if (strcasecmp(buf, "g") == 0) {
} else if (strcasecmp(type, "g") == 0) {
tmpl.create_common();
create_master_trigger(tmpl);
break;
} else if (strcasecmp(buf, "u") == 0) {
} else if (strcasecmp(type, "u") == 0) {
tmpl.create_common();
create_master_udp(tmpl);
break;
} else if (strcasecmp(buf, "f") == 0) {
} else if (strcasecmp(type, "f") == 0) {
tmpl.create_common();
create_master_fiber(tmpl);
break;
} else if (strcasecmp(buf, "s") == 0) {
goto END;
} else if (strcasecmp(type, "o") == 0) {
tmpl.create_other();
break;
} else if (strcasecmp(type, "s") == 0) {
break;
} else {
printf("unknown ch: %s\r\n", buf);
printf("unknown ch: %s\r\n", type);
}
if (!loop) {
break;
}
}
END:
for (int i = 0; i < 78; i++) {
putchar('-');
}

View File

@ -1,3 +1,3 @@
#pragma once
void master_creator();
void master_creator(const char* name, const char* type);

View File

@ -0,0 +1,50 @@
service $<PROGRAM> {
# 进程是否禁止运行
master_disable = no
# 服务类型, 当为 none , 则表示由子进程自行监听服务地址
master_type = none
# 当子进程异常退出时如果该值非空则将子进程异常退出的消息通知该服务
# master_notify_addr = /opt/soft/acl-master/var/public/monitor.sock
# 程序异常时的邮件通知接收者
# master_notify_recipients = xxx@xxx.com
# 是否需要 chroot: n -- no, y -- yes
master_chroot = n
# 最大进程数
master_maxproc = 1
# 预启动进程数该值不得大于 master_maxproc
master_prefork = 1
# 进程程序名
master_command = {install_path}/sbin/redis-server
# 指定程序版本, 方便管理模块获取
master_version = 1.0.0-0
# 进程日志记录文件
master_log = {install_path}/var/log/$<PROGRAM>.log
# 子进程标准输出信息转存日志文件
master_stdout = {install_path}/var/log/$<PROGRAM>.stdout
# 子进程标准错误输出信息转存日志文件
master_stderr = {install_path}/var/log/$<PROGRAM>.stderr
# 进程启动参数, acl_master 会以此配置项做为参数来启动子进程
master_args =
# 如果该项非空, 则程序启动后将被切换至该目录
master_home = {install_path}
# 针对由 Golang 编写的服务, 可以帮助其切换用户运行身份
# master_owner = nobody
# 传递给服务子进程的环境变量, 可以通过 getenv("SERVICE_ENV") 获得此值
# master_env = logme:FALSE, priority:E_LOG_INFO, action:E_LOG_PER_DAY, flush:sync_flush, imit_size:512,\
# sync_action:E_LOG_SEM, sem_name:/tmp/ioctl_echo.sem
}

View File

@ -0,0 +1,190 @@
#!/bin/sh
###############################################################################
PATH=/bin:/usr/bin:/usr/sbin:/usr/etc:/sbin:/etc
tempdir="/tmp"
umask 022
function censored_ls() {
ls "$@" | egrep -v '^\.|/\.|CVS|RCS|SCCS|linux\.d|solaris\.d|hp_ux\.d|example'
}
function compare_or_replace() {
(cmp $2 $3 >/dev/null 2>&1 && echo Skipping $3...) || {
echo Updating $3...
rm -f $tempdir/junk || exit 1
cp $2 $tempdir/junk || exit 1
chmod $1 $tempdir/junk || exit 1
mv -f $tempdir/junk $3 || exit 1
chmod $1 $3 || exit 1
}
}
###############################################################################
RPATH=
function guess_os() {
os_name=`uname -s`
os_type=`uname -p`
case $os_name in
Linux)
case $os_type in
x86_64)
RPATH="linux64"
;;
i686)
RPATH="linux32"
;;
aarch64)
RPATH="aarch64"
;;
*)
echo "unknown OS - $os_name $os_type"
exit 1
;;
esac
;;
SunOS)
case $os_type in
i386)
RPATH="sunos_x86"
;;
*)
echo "unknown OS - $os_name $os_type"
exit 1
;;
esac
;;
FreeBSD)
RPATH="freebsd"
;;
Darwin)
RPATH="macos"
;;
*)
echo "unknown OS - $os_name $os_type"
exit 1
;;
esac
}
function create_path() {
test -d $1 || mkdir -p $1 || {
echo "can't mkdir $1"
exit 1
}
}
function copy_file() {
test -f $2 && {
compare_or_replace $1 $2 $3 || {
echo "copy file: $2 error"
exit 1
}
}
}
function install_file() {
rm -f $tempdir/junk2 || {
echo "can't remove file: $tempdir/junk2"
exit 1
}
test -f $2 && {
cat $2 | sed -e 's;{install_path};'$INSTALL_PATH';;' >$tempdir/junk2 || {
echo "can't create file: $tempdir/junk2"
exit 1
}
compare_or_replace $1 $tempdir/junk2 $3 || {
echo "can't move to file: $3"
exit 1
}
}
rm -f $tempdir/junk2 || {
echo "can't remove file: $tempdir/junk2"
exit 1
}
}
###############################################################################
INSTALL_PATH=
if [ $# -lt 1 ]
then
# echo "parameter not enougth($#)"
echo "usage:$0 install_path"
exit 1
fi
if [ $# -eq 2 ]
then
PREFIX_PATH=$1
INSTALL_PATH=$2
else
INSTALL_PATH=$1
PREFIX_PATH=
fi
case $INSTALL_PATH in
/*) ;;
no) ;;
*) echo Error: $INSTALL_PATH should be an absolute path name. 1>&2; exit 1;;
esac
echo Installing to $INSTALL_PATH...
CONF_PATH=$PREFIX_PATH$INSTALL_PATH/conf
VAR_PATH=$PREFIX_PATH$INSTALL_PATH/var
SERVICE_NAME=$<PROGRAM>
SERVICE_CONF=$CONF_PATH/$SERVICE_NAME.cf
###############################################################################
function create_all_path() {
create_path $INSTALL_PATH
create_path $CONF_PATH
create_path $VAR_PATH
create_path $VAR_PATH/log
create_path $VAR_PATH/pid
chmod 1777 $VAR_PATH/log
}
function copy_all_file() {
install_file a+x,go-wrx $SERVICE_NAME.cf $SERVICE_CONF
}
MASTER_PATH=/opt/soft/acl-master
MASTER_CONF=$MASTER_PATH/conf
MASTER_SERVICES=$MASTER_CONF/services.cf
MASTER_CTL=$MASTER_PATH/bin/master_ctl
function add_master_service() {
echo ""
if [ ! -d $MASTER_CONF ]; then
echo "$MASTER_CONF not exist!"
return
fi
if [ -f $MASTER_SERVICES ]; then
has=`cat $MASTER_SERVICES | grep $SERVICE_CONF | wc -l`
if [ $has != 0 ]; then
echo "Service for $SERVICE_CONF already in $MASTER_SERVICES!"
return
fi
fi
echo "$SERVICE_CONF" >> $MASTER_SERVICES
echo "Service added for $SERVICE_CONF"
if [ -f $MASTER_CTL ]; then
echo "Start your service by running:"
echo "$MASTER_CTL -f $SERVICE_CONF -a start"
fi
}
guess_os
create_all_path
copy_all_file
add_master_service
###############################################################################

View File

@ -3,6 +3,9 @@
#include "stdafx.h"
#include <stdio.h>
#if !defined(_WIN32) && !defined(_WIN64)
# include <getopt.h>
#endif
#include "master_creator.h"
#include "http_creator.h"
@ -10,27 +13,79 @@ static void create_db()
{
}
static int create_master_service(const char* name, const char* type)
{
master_creator(name, type);
return 0;
}
static int create_http_service(const char* name, const char* type)
{
http_creator(name, type);
return 0;
}
static void usage(const char* procname)
{
printf("usage: %s -h [help]\r\n"
" -n program_name\r\n"
" -t service_type [ t -> master_threads, p -> master_proc, f -> master_fiber, u -> master_udp, o -> master_other ]\r\n"
" -a application_type [ http | master, default: master ]\r\n"
, procname);
}
int main(int argc, char* argv[])
{
(void) argc, (void) argv;
int ch;
acl::string name, type, app("master");
while ((ch = getopt(argc, argv, "hn:t:a:")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
return 0;
case 'n':
name = optarg;
break;
case 't':
type = optarg;
break;
case 'a':
app = optarg;
break;
default:
break;
}
}
acl::acl_cpp_init();
if (!name.empty() && !type.empty()) {
if (app == "master") {
return create_master_service(name, type);
} else if (app == "http") {
return create_http_service(name, type);
}
}
while (true) {
char buf[256];
printf("select one below:\r\n");
printf("m: master_service; d: db; h: http; q: exit\r\n");
printf(">"); fflush(stdout);
char buf[256];
int n = acl_vstream_gets_nonl(ACL_VSTREAM_IN, buf, sizeof(buf));
if (n == ACL_VSTREAM_EOF) {
break;
} else if (strcasecmp(buf, "m") == 0) {
master_creator();
}
if (strcasecmp(buf, "m") == 0) {
master_creator(NULL, NULL);
} else if (strcasecmp(buf, "d") == 0) {
create_db();
} else if (strcasecmp(buf, "h") == 0) {
http_creator();
http_creator(NULL, NULL);
} else if (strcasecmp(buf, "q") == 0) {
break;
} else {

View File

@ -114,20 +114,21 @@ install_file()
}
###############################################################################
INSTALL_PATH=
INSTALL_PATH=/opt/soft/acl-master
if [ $# -lt 1 ]
then
# echo "parameter not enougth($#)"
echo "usage:$0 install_path"
exit 1
# echo "usage:$0 install_path"
echo "Using the default path: $INSTALL_PATH"
fi
if [ $# -eq 2 ]
then
PREFIX_PATH=$1
INSTALL_PATH=$2
else
elif [ $# -gt 2 ]
then
INSTALL_PATH=$1
PREFIX_PATH=
fi

View File

@ -1,6 +1,10 @@
#ifndef ACL_DEFINE_UNIX_INCLUDE_H
#define ACL_DEFINE_UNIX_INCLUDE_H
#ifndef _GNU_SOURCE
# define _GNU_SOURCE
#endif
#include "acl_define_linux.h"
#include "acl_define_sunx86.h"
#include "acl_define_bsd.h"

View File

@ -86,12 +86,14 @@ int acl_write_wait_ms(ACL_SOCKET fd, int timeout)
}
if ((fds.revents & (POLLHUP | POLLERR))) {
/*
acl_msg_error("%s(%d), %s: %s, %s, %s, fd=%d",
__FILE__, __LINE__, myname,
acl_last_serror(),
fds.revents & POLLHUP ? "POLLHUP" : "0",
fds.revents & POLLERR ? "POLLERR" : "0",
fd);
*/
acl_set_error(ACL_ECONNREFUSED);
return -1;
}

View File

@ -6,6 +6,10 @@
#include <WinSock2.h>
#endif
#if __cplusplus >= 201103L
#include <memory>
#endif
namespace acl {
class socket_stream;
@ -13,9 +17,9 @@ class socket_stream;
enum {
OPEN_FLAG_NONE = 0,
OPEN_FLAG_NONBLOCK = 1, // 非阻塞模式
OPEN_FLAG_REUSEPORT = 1 << 1, // 端口复用,要求 Linux3.0 以上
OPEN_FLAG_FASTOPEN = 1 << 2, // 是否启用 Fast open实验阶段
OPEN_FLAG_EXCLUSIVE = 1 << 3, // 是否禁止复用地址
OPEN_FLAG_REUSEPORT = (1 << 1), // 端口复用,要求 Linux3.0 以上
OPEN_FLAG_FASTOPEN = (1 << 2), // 是否启用 Fast open实验阶段
OPEN_FLAG_EXCLUSIVE = (1 << 3), // 是否禁止复用地址
};
/**
@ -98,16 +102,26 @@ public:
* NULL
* @param etimed {bool*} NULL
* NULL true
* @return {socket_stream*}
* @return {socket_stream*} ,
* delete .
*/
socket_stream* accept(int timeout = -1, bool* etimed = NULL);
#if __cplusplus >= 201103L
// 使用 c++11 shared_ptr 方式获得客户端流对象, 更安全地使用流对象
using shared_stream = std::shared_ptr<socket_stream>;
shared_stream shared_accept(int timeout = -1, bool* etimed = NULL) {
shared_stream ss(accept(timeout, etimed));
return ss;
}
#endif
/**
*
* @return {const char*}
*/
const char* get_addr(void) const
{
const char* get_addr(void) const {
return addr_.c_str();
}
@ -116,11 +130,10 @@ public:
* @return {int}
*/
#if defined(_WIN32) || defined(_WIN64)
SOCKET sock_handle(void) const
SOCKET sock_handle(void) const {
#else
int sock_handle(void) const
int sock_handle(void) const {
#endif
{
return fd_;
}

View File

@ -47,6 +47,10 @@ typedef int socklen_t;
#else
# ifndef _GNU_SOURCE
# define _GNU_SOURCE
# endif
# include <errno.h>
# include <sys/types.h>
# include <sys/socket.h>
@ -79,7 +83,8 @@ typedef int socket_t;
# define FIBER_ECONNABORTED ECONNABORTED
# define FIBER_EINPROGRESS EINPROGRESS
# ifdef MSG_WAITFORONE
# include <sys/syscall.h>
# if defined(SYS_recvmmsg) && defined(SYS_sendmmsg)
# define HAS_MMSG
# endif
#endif

View File

@ -353,7 +353,9 @@ void event_del_read(EVENT *ev, FILE_EVENT *fe)
}
if (fe->mask & EVENT_READ) {
if (fe->me.parent == &fe->me) {
if (fe->mask & EVENT_DIRECT) {
(void) ev->del_read(ev, fe);
} else if (fe->me.parent == &fe->me) {
ring_prepend(&ev->events, &fe->me);
}
@ -370,7 +372,9 @@ void event_del_write(EVENT *ev, FILE_EVENT *fe)
}
if (fe->mask & EVENT_WRITE) {
if (fe->me.parent == &fe->me) {
if (fe->mask & EVENT_DIRECT) {
(void) ev->del_write(ev, fe);
} else if (fe->me.parent == &fe->me) {
ring_prepend(&ev->events, &fe->me);
}

View File

@ -197,7 +197,7 @@ struct FILE_EVENT {
event_proc *r_proc;
event_proc *w_proc;
#ifdef HAS_POLL
POLLFD *pfd;
RING pfds;
#endif
#ifdef HAS_EPOLL
EPOLL_CTX *epx;

View File

@ -136,9 +136,6 @@ void fiber_file_close(FILE_EVENT *fe);
FILE_EVENT *fiber_file_cache_get(socket_t fd);
void fiber_file_cache_put(FILE_EVENT *fe);
/* in hook/epoll.c */
int epoll_event_close(int epfd);
/* in fiber/fiber_unix.c, fiber/fiber_win.c */
ACL_FIBER *fiber_real_origin(void);
ACL_FIBER *fiber_real_alloc(const ACL_FIBER_ATTR *attr);

View File

@ -466,6 +466,8 @@ int fiber_wait_read(FILE_EVENT *fe)
acl_fiber_switch();
fe->fiber_r = NULL;
if (!(fe->type & TYPE_INTERNAL)) {
WAITER_DEC(__thread_fiber->event);
}
@ -509,6 +511,8 @@ int fiber_wait_write(FILE_EVENT *fe)
acl_fiber_switch();
fe->fiber_w = NULL;
if (!(fe->type & TYPE_INTERNAL)) {
WAITER_DEC(__thread_fiber->event);
}

View File

@ -19,7 +19,7 @@ void file_event_init(FILE_EVENT *fe, socket_t fd)
fe->w_proc = NULL;
#ifdef HAS_POLL
fe->pfd = NULL;
ring_init(&fe->pfds);
#endif
#ifdef HAS_IO_URING

View File

@ -10,14 +10,17 @@
/****************************************************************************/
/**
* One socket fd has one EPOLL_CTX in epoll mode witch was set in FILE_EVENT.
*/
struct EPOLL_CTX {
int fd;
int op;
int mask;
int rmask;
FILE_EVENT *fe;
EPOLL_EVENT *ee;
epoll_data_t data;
int fd; // The socket fd.
int op; // Same with the epoll_ctl's op.
int mask; // The event mask been set.
int rmask; // The result event mask.
FILE_EVENT *fe; // Refer to the FILE_EVENT with the socket fd.
EPOLL_EVENT *ee; // Refer to the fiber's EPOLL_EVENT.
epoll_data_t data; // Same as the system epoll_data_t.
};
/**
@ -29,10 +32,14 @@ struct EPOLL_CTX {
* |- socket EPOLL_CTX
* |- socket EPOLL_CTX
* |- ...
* Notice: one EPOLL_EVENT can only belong to one fiber, and one fiber can
* have only one EPOLL_EVENT; And EPOLL::ep_events holds multiple EPOLL_EVENT;
* One EPOLL object is bound with one epoll fd duplicated with the system
* epoll fd owned by the scheduling thread.
*/
struct EPOLL {
int epfd;
EPOLL_CTX **fds;
int epfd; // Duplicate the current thread's epoll fd.
EPOLL_CTX **fds; // Hold EPOLL_CTX objects of the epfd.
size_t nfds;
// Store all EPOLL_EVENT, every fiber should use its own EPOLL_EVENT,
@ -104,6 +111,9 @@ static EPOLL_EVENT *epoll_event_alloc(void)
/****************************************************************************/
static ARRAY *__main_epfds = NULL;
// Hold the EPOLL objects of one thread; And the EPOLL object was created in
// epoll_create/epoll_try_register as below.
static __thread ARRAY *__epfds = NULL;
static pthread_key_t __once_key;
@ -153,6 +163,7 @@ static void thread_init(void)
}
}
// Create one EPOLL for the specified epfd.
static EPOLL *epoll_alloc(int epfd)
{
EPOLL *ep;
@ -179,7 +190,7 @@ static EPOLL *epoll_alloc(int epfd)
}
}
ep = mem_malloc(sizeof(EPOLL));
ep = (EPOLL*) mem_malloc(sizeof(EPOLL));
array_append(__epfds, ep);
/* Duplicate the current thread's epoll fd, so we can assosiate the
@ -190,6 +201,7 @@ static EPOLL *epoll_alloc(int epfd)
ep->nfds = maxfd;
ep->fds = (EPOLL_CTX **) mem_malloc(maxfd * sizeof(EPOLL_CTX *));
for (i = 0; i < maxfd; i++) {
ep->fds[i] = NULL;
}
@ -198,6 +210,7 @@ static EPOLL *epoll_alloc(int epfd)
return ep;
}
// Maybe called by the fcntl API being hooked.
int epoll_try_register(int epfd)
{
int sys_epfd;
@ -258,7 +271,8 @@ static void epoll_free(EPOLL *ep)
mem_free(ep);
}
int epoll_event_close(int epfd)
// Close and free one EPOLL with the specified epfd.
int epoll_close(int epfd)
{
EVENT *ev;
int sys_epfd;
@ -303,9 +317,13 @@ int epoll_event_close(int epfd)
epoll_free(ep);
array_delete(__epfds, pos, NULL);
// Because the epfd was created by dup, so it should be closed by the
// system close API directly.
return (*sys_close)(epfd);
}
// Find the EPOLL_EVENT for the current fiber with the specified epfd, and
// new one will be created if not found and create is true.
static EPOLL_EVENT *epoll_event_find(int epfd, int create)
{
ACL_FIBER *curr = acl_fiber_running();
@ -320,6 +338,7 @@ static EPOLL_EVENT *epoll_event_find(int epfd, int create)
return NULL;
}
// First, we should find the EPOLL with the specified epfd.
foreach(iter, __epfds) {
EPOLL *tmp = (EPOLL *) iter.data;
if (tmp->epfd == epfd) {
@ -334,16 +353,22 @@ static EPOLL_EVENT *epoll_event_find(int epfd, int create)
return NULL;
}
// Then, trying to find the EPOLL_EVENT of the current fiber.
SNPRINTF(key, sizeof(key), "%u", curr->fid);
ee = (EPOLL_EVENT *) htable_find(ep->ep_events, key);
if (ee != NULL) {
ee->epoll = ep;
return ee;
}
// If not found, create one if needed by the caller or else return NULL.
if (create) {
ee = epoll_event_alloc();
ee->epoll = ep;
// Store the current fiber's EPOLL_EVENT into the htable with
// the key associated withe fiber's ID.
htable_enter(ep->ep_events, key, ee);
return ee;
@ -354,6 +379,7 @@ static EPOLL_EVENT *epoll_event_find(int epfd, int create)
/****************************************************************************/
// Hook the system API to create one epoll fd.
int epoll_create(int size fiber_unused)
{
EVENT *ev;
@ -386,6 +412,7 @@ int epoll_create(int size fiber_unused)
}
#ifdef EPOLL_CLOEXEC
// Hook the system API.
int epoll_create1(int flags)
{
int epfd = epoll_create(100);
@ -407,6 +434,7 @@ static void read_callback(EVENT *ev, FILE_EVENT *fe)
EPOLL *ep;
assert(epx);
assert(epx->fd == fe->fd);
assert(epx->mask & EVENT_READ);
ee = epx->ee;
@ -423,9 +451,14 @@ static void read_callback(EVENT *ev, FILE_EVENT *fe)
return;
}
assert(ep->fds[epx->fd] == epx);
// Save the fd IO event's result to the result receiver been set in
// epoll_wait as below.
ee->events[ee->nready].events |= EPOLLIN;
memcpy(&ee->events[ee->nready].data, &ep->fds[epx->fd]->data,
sizeof(ep->fds[epx->fd]->data));
// Restore the data been set in epoll_ctl_add.
memcpy(&ee->events[ee->nready].data, &epx->data, sizeof(epx->data));
if (ee->nready == 0) {
timer_cache_remove(ev->epoll_list, ee->expire, &ee->me);
@ -446,6 +479,7 @@ static void write_callback(EVENT *ev fiber_unused, FILE_EVENT *fe)
EPOLL *ep;
assert(epx);
assert(epx->fd == fe->fd);
assert(epx->mask & EVENT_WRITE);
ee = epx->ee;
@ -458,9 +492,10 @@ static void write_callback(EVENT *ev fiber_unused, FILE_EVENT *fe)
return;
}
assert(ep->fds[epx->fd] == epx);
ee->events[ee->nready].events |= EPOLLOUT;
memcpy(&ee->events[ee->nready].data, &ep->fds[epx->fd]->data,
sizeof(ep->fds[epx->fd]->data));
memcpy(&ee->events[ee->nready].data, &epx->data, sizeof(epx->data));
if (ee->nready == 0) {
timer_cache_remove(ev->epoll_list, ee->expire, &ee->me);
@ -478,61 +513,68 @@ static void epoll_ctl_add(EVENT *ev, EPOLL_EVENT *ee,
struct epoll_event *event, int fd, int op)
{
EPOLL *ep = ee->epoll;
EPOLL_CTX *epx = ep->fds[fd];
if (ep->fds[fd] == NULL) {
ep->fds[fd] = (EPOLL_CTX *) mem_malloc(sizeof(EPOLL_CTX));
if (epx == NULL) {
epx = ep->fds[fd] = (EPOLL_CTX *) mem_malloc(sizeof(EPOLL_CTX));
}
ep->fds[fd]->fd = fd;
ep->fds[fd]->op = op;
ep->fds[fd]->mask = EVENT_NONE;
ep->fds[fd]->rmask = EVENT_NONE;
ep->fds[fd]->ee = ee;
epx->fd = fd;
epx->op = op;
epx->mask = EVENT_NONE;
epx->rmask = EVENT_NONE;
epx->ee = ee;
memcpy(&ep->fds[fd]->data, &event->data, sizeof(event->data));
// Save the fd's context in epx bound with the fd.
memcpy(&epx->data, &event->data, sizeof(event->data));
if (event->events & EPOLLIN) {
ep->fds[fd]->mask |= EVENT_READ;
ep->fds[fd]->fe = fiber_file_open_read(fd);
ep->fds[fd]->fe->epx = ep->fds[fd];
epx->mask |= EVENT_READ;
epx->fe = fiber_file_open_read(fd);
epx->fe->epx = epx;
event_add_read(ev, ep->fds[fd]->fe, read_callback);
SET_READWAIT(ep->fds[fd]->fe);
event_add_read(ev, epx->fe, read_callback);
SET_READWAIT(epx->fe);
}
if (event->events & EPOLLOUT) {
ep->fds[fd]->mask |= EVENT_WRITE;
ep->fds[fd]->fe = fiber_file_open_write(fd);
ep->fds[fd]->fe->epx = ep->fds[fd];
epx->mask |= EVENT_WRITE;
epx->fe = fiber_file_open_write(fd);
epx->fe->epx = epx;
event_add_write(ev, ep->fds[fd]->fe, write_callback);
SET_WRITEWAIT(ep->fds[fd]->fe);
event_add_write(ev, epx->fe, write_callback);
SET_WRITEWAIT(epx->fe);
}
}
static void epoll_ctl_del(EVENT *ev, EPOLL_EVENT *ee, int fd)
{
EPOLL *ep = ee->epoll;
EPOLL_CTX *epx = ep->fds[fd];
if (ep->fds[fd]->mask & EVENT_READ) {
event_del_read(ev, ep->fds[fd]->fe);
CLR_READWAIT(ep->fds[fd]->fe);
assert(epx);
if (epx->mask & EVENT_READ) {
assert(epx->fe);
event_del_read(ev, epx->fe);
CLR_READWAIT(epx->fe);
}
if (ep->fds[fd]->mask & EVENT_WRITE) {
event_del_write(ev, ep->fds[fd]->fe);
CLR_WRITEWAIT(ep->fds[fd]->fe);
if (epx->mask & EVENT_WRITE) {
assert(epx->fe);
event_del_write(ev, epx->fe);
CLR_WRITEWAIT(epx->fe);
}
ep->fds[fd]->fd = -1;
ep->fds[fd]->op = 0;
ep->fds[fd]->mask = EVENT_NONE;
ep->fds[fd]->rmask = EVENT_NONE;
ep->fds[fd]->fe->epx = NULL;
ep->fds[fd]->fe = NULL;
memset(&ep->fds[fd]->data, 0, sizeof(ep->fds[fd]->data));
epx->fd = -1;
epx->op = 0;
epx->mask = EVENT_NONE;
epx->rmask = EVENT_NONE;
epx->fe->epx = NULL;
epx->fe = NULL;
memset(&epx->data, 0, sizeof(epx->data));
mem_free(ep->fds[fd]);
mem_free(epx);
ep->fds[fd] = NULL;
}
@ -549,6 +591,7 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
return sys_epoll_ctl ? (*sys_epoll_ctl)(epfd, op, fd, event) : -1;
}
// Get the fiber's EPOLL_EVENT with the specified epfd.
ee = epoll_event_find(epfd, 1);
if (ee == NULL) {
msg_error("%s(%d), %s: not exist epfd=%d",

View File

@ -224,78 +224,85 @@ static void hook_api(void)
#ifdef SYS_UNIX
# ifdef MINGW
# define LOAD_FN(name, type, fn, fp) do { \
# define LOAD_FN(name, type, fn, fp, fatal) do { \
(fn) = (type) dlsym(RTLD_DEFAULT, name); \
assert((fn)); \
if ((fn) == NULL) { \
assert((fatal) != 1); \
} \
(fp) = &(fn); \
} while (0)
# else
# define LOAD_FN(name, type, fn, fp) do { \
# define LOAD_FN(name, type, fn, fp, fatal) do { \
(fn) = (type) dlsym(RTLD_NEXT, name); \
assert((fn)); \
if ((fn) == NULL) { \
const char* e = dlerror(); \
printf("%s(%d): name=%s not found: %s\r\n", \
__FUNCTION__, __LINE__, name, e ? e : "unknown"); \
assert((fatal) != 1); \
} \
(fp) = &(fn); \
} while (0)
# endif
LOAD_FN("socket", socket_fn, __sys_socket, sys_socket);
LOAD_FN("close", close_fn, __sys_close, sys_close);
LOAD_FN("listen", listen_fn, __sys_listen, sys_listen);
LOAD_FN("accept", accept_fn, __sys_accept, sys_accept);
LOAD_FN("connect", connect_fn, __sys_connect, sys_connect);
LOAD_FN("setsockopt", setsockopt_fn, __sys_setsockopt, sys_setsockopt);
LOAD_FN("sleep", sleep_fn, __sys_sleep, sys_sleep);
LOAD_FN("fcntl", fcntl_fn, __sys_fcntl, sys_fcntl);
LOAD_FN("read", read_fn, __sys_read, sys_read);
LOAD_FN("readv", readv_fn, __sys_readv, sys_readv);
LOAD_FN("recv", recv_fn, __sys_recv, sys_recv);
LOAD_FN("recvfrom", recvfrom_fn, __sys_recvfrom, sys_recvfrom);
LOAD_FN("recvmsg", recvmsg_fn, __sys_recvmsg, sys_recvmsg);
LOAD_FN("write", write_fn, __sys_write, sys_write);
LOAD_FN("writev", writev_fn, __sys_writev, sys_writev);
LOAD_FN("send", send_fn, __sys_send, sys_send);
LOAD_FN("sendto", sendto_fn, __sys_sendto, sys_sendto);
LOAD_FN("sendmsg", sendmsg_fn, __sys_sendmsg, sys_sendmsg);
LOAD_FN("socket", socket_fn, __sys_socket, sys_socket, 1);
LOAD_FN("close", close_fn, __sys_close, sys_close, 1);
LOAD_FN("listen", listen_fn, __sys_listen, sys_listen, 1);
LOAD_FN("accept", accept_fn, __sys_accept, sys_accept, 1);
LOAD_FN("connect", connect_fn, __sys_connect, sys_connect, 1);
LOAD_FN("setsockopt", setsockopt_fn, __sys_setsockopt, sys_setsockopt, 1);
LOAD_FN("sleep", sleep_fn, __sys_sleep, sys_sleep, 1);
LOAD_FN("fcntl", fcntl_fn, __sys_fcntl, sys_fcntl, 1);
LOAD_FN("read", read_fn, __sys_read, sys_read, 1);
LOAD_FN("readv", readv_fn, __sys_readv, sys_readv, 1);
LOAD_FN("recv", recv_fn, __sys_recv, sys_recv, 1);
LOAD_FN("recvfrom", recvfrom_fn, __sys_recvfrom, sys_recvfrom, 1);
LOAD_FN("recvmsg", recvmsg_fn, __sys_recvmsg, sys_recvmsg, 1);
LOAD_FN("write", write_fn, __sys_write, sys_write, 1);
LOAD_FN("writev", writev_fn, __sys_writev, sys_writev, 1);
LOAD_FN("send", send_fn, __sys_send, sys_send, 1);
LOAD_FN("sendto", sendto_fn, __sys_sendto, sys_sendto, 1);
LOAD_FN("sendmsg", sendmsg_fn, __sys_sendmsg, sys_sendmsg, 1);
# ifdef HAS_MMSG
LOAD_FN("recvmmsg", recvmmsg_fn, __sys_recvmmsg, sys_recvmmsg);
LOAD_FN("sendmmsg", sendmmsg_fn, __sys_sendmmsg, sys_sendmmsg);
#endif
LOAD_FN("recvmmsg", recvmmsg_fn, __sys_recvmmsg, sys_recvmmsg, 0);
LOAD_FN("sendmmsg", sendmmsg_fn, __sys_sendmmsg, sys_sendmmsg, 0);
# endif
# ifdef __USE_LARGEFILE64
LOAD_FN("sendfile64", sendfile64_fn, __sys_sendfile64, sys_sendfile64);
LOAD_FN("sendfile64", sendfile64_fn, __sys_sendfile64, sys_sendfile64, 0);
# endif
LOAD_FN("pread", pread_fn, __sys_pread, sys_pread);
LOAD_FN("pwrite", pwrite_fn, __sys_pwrite, sys_pwrite);
LOAD_FN("poll", poll_fn, __sys_poll, sys_poll);
LOAD_FN("select", select_fn, __sys_select, sys_select);
LOAD_FN("pread", pread_fn, __sys_pread, sys_pread, 1);
LOAD_FN("pwrite", pwrite_fn, __sys_pwrite, sys_pwrite, 1);
LOAD_FN("poll", poll_fn, __sys_poll, sys_poll, 1);
LOAD_FN("select", select_fn, __sys_select, sys_select, 1);
# ifdef HAS_EPOLL
LOAD_FN("epoll_create", epoll_create_fn, __sys_epoll_create, sys_epoll_create);
LOAD_FN("epoll_create", epoll_create_fn, __sys_epoll_create, sys_epoll_create, 1);
LOAD_FN("epoll_wait", epoll_wait_fn, __sys_epoll_wait, sys_epoll_wait);
LOAD_FN("epoll_wait", epoll_wait_fn, __sys_epoll_wait, sys_epoll_wait, 1);
LOAD_FN("epoll_ctl", epoll_ctl_fn, __sys_epoll_ctl, sys_epoll_ctl);
LOAD_FN("epoll_ctl", epoll_ctl_fn, __sys_epoll_ctl, sys_epoll_ctl, 1);
# endif // HAS_EPOLL
# ifdef HAS_IO_URING
LOAD_FN("openat", openat_fn, __sys_openat, sys_openat);
LOAD_FN("unlink", unlink_fn, __sys_unlink, sys_unlink);
LOAD_FN("openat", openat_fn, __sys_openat, sys_openat, 1);
LOAD_FN("unlink", unlink_fn, __sys_unlink, sys_unlink, 1);
# ifdef HAS_STATX
LOAD_FN("statx", statx_fn, __sys_statx, sys_statx);
LOAD_FN("statx", statx_fn, __sys_statx, sys_statx, 1);
# endif
# ifdef HAS_RENAMEAT2
LOAD_FN("renameat2", renameat2_fn, __sys_renameat2, sys_renameat2);
LOAD_FN("renameat2", renameat2_fn, __sys_renameat2, sys_renameat2, 1);
# endif
LOAD_FN("mkdirat", mkdirat_fn, __sys_mkdirat, sys_mkdirat);
LOAD_FN("splice", splice_fn, __sys_splice, sys_splice);
LOAD_FN("mkdirat", mkdirat_fn, __sys_mkdirat, sys_mkdirat, 1);
LOAD_FN("splice", splice_fn, __sys_splice, sys_splice, 1);
# endif
LOAD_FN("getaddrinfo", getaddrinfo_fn, __sys_getaddrinfo, sys_getaddrinfo);
LOAD_FN("freeaddrinfo", freeaddrinfo_fn, __sys_freeaddrinfo, sys_freeaddrinfo);
LOAD_FN("gethostbyname", gethostbyname_fn, __sys_gethostbyname, sys_gethostbyname);
LOAD_FN("getaddrinfo", getaddrinfo_fn, __sys_getaddrinfo, sys_getaddrinfo, 1);
LOAD_FN("freeaddrinfo", freeaddrinfo_fn, __sys_freeaddrinfo, sys_freeaddrinfo, 1);
LOAD_FN("gethostbyname", gethostbyname_fn, __sys_gethostbyname, sys_gethostbyname, 1);
# ifndef __APPLE__
LOAD_FN("gethostbyname_r", gethostbyname_r_fn, __sys_gethostbyname_r, sys_gethostbyname_r);
LOAD_FN("gethostbyname_r", gethostbyname_r_fn, __sys_gethostbyname_r, sys_gethostbyname_r, 1);
# endif
#elif defined(SYS_WIN)
__sys_socket = socket;

View File

@ -200,6 +200,7 @@ void hook_once(void);
#if defined(__linux__)
// in epoll.c
int epoll_try_register(int epfd);
int epoll_close(int epfd);
#endif
#ifdef __cplusplus

View File

@ -64,11 +64,11 @@ int WINAPI acl_fiber_close(socket_t fd)
}
#ifdef HAS_EPOLL
/* when the fd was closed by epoll_event_close normally, the fd
/* when the fd was closed by epoll_close normally, the fd
* must be a epoll fd which was created by epoll_create function
* hooked in hook_net.c
*/
if (epoll_event_close(fd) == 0) {
if (epoll_close(fd) == 0) {
return 0;
}
#endif
@ -271,6 +271,11 @@ int acl_fiber_recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
hook_once();
}
if (sys_recvmmsg == NULL) {
printf("%s(%d): sys_recvmmsg NULL\r\n", __FUNCTION__, __LINE__);
return -1;
}
if (!var_hook_sys_api) {
return (*sys_recvmmsg)(sockfd, msgvec, vlen, flags, timeout);
}
@ -412,6 +417,11 @@ int acl_fiber_sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
hook_once();
}
if (sys_sendmmsg == NULL) {
printf("%s(%d): sys_sendmmsg NULL\r\n", __FUNCTION__, __LINE__);
return -1;
}
if (!var_hook_sys_api) {
return (*sys_sendmmsg)(sockfd, msgvec, vlen, flags);
}

View File

@ -8,6 +8,7 @@
#ifdef HAS_POLL
struct POLLFD {
RING me;
FILE_EVENT *fe;
POLL_EVENT *pe;
struct pollfd *pfd;
@ -23,21 +24,10 @@ struct POLLFD {
* and POLLIN flag will be set in the specified FILE_EVENT that will be used
* by the application called acl_fiber_poll().
*/
static void read_callback(EVENT *ev, FILE_EVENT *fe)
static void handle_poll_read(EVENT *ev, FILE_EVENT *fe, POLLFD *pfd)
{
POLLFD *pfd = fe->pfd;
/* In iocp mode on windows, the pfd maybe be set NULL when more
* overlapped events happened by IO or poll events.
*/
if (pfd == NULL) {
return;
}
assert(pfd->pfd->events & POLLIN);
event_del_read(ev, fe);
pfd->pfd->revents |= POLLIN;
if (fe->mask & EVENT_ERR) {
@ -51,7 +41,7 @@ static void read_callback(EVENT *ev, FILE_EVENT *fe)
}
if (!(pfd->pfd->events & POLLOUT)) {
fe->pfd = NULL;
ring_detach(&pfd->me);
pfd->fe = NULL;
}
@ -73,24 +63,46 @@ static void read_callback(EVENT *ev, FILE_EVENT *fe)
}
pfd->pe->nready++;
}
static void read_callback(EVENT *ev, FILE_EVENT *fe)
{
POLLFD *pfd;
//RING_ITER iter;
RING *iter = fe->pfds.succ, *next = iter;
event_del_read(ev, fe);
SET_READABLE(fe);
#if 1
// Walk througth the RING list, handle each poll event, and one RING
// node maybe be detached after it has been handled without any poll
// event bound with it again.
for (; iter != &fe->pfds; iter = next) {
next = next->succ;
pfd = ring_to_appl(iter, POLLFD, me);
if (pfd->pfd->events & POLLIN) {
handle_poll_read(ev, fe, pfd);
}
}
#else
ring_foreach(iter, &fe->pfds) {
pfd = ring_to_appl(iter.ptr, POLLFD, me);
if (pfd->pfd->events & POLLIN) {
handle_poll_read(ev, fe, pfd);
break;
}
}
#endif
}
/**
* Similiar to read_callback except that the POLLOUT flag will be set in it.
*/
static void write_callback(EVENT *ev, FILE_EVENT *fe)
static void handle_poll_write(EVENT *ev, FILE_EVENT *fe, POLLFD *pfd)
{
POLLFD *pfd = fe->pfd;
if (pfd == NULL) {
return;
}
assert(pfd->pfd->events & POLLOUT);
event_del_write(ev, fe);
pfd->pfd->revents |= POLLOUT;
if (fe->mask & EVENT_ERR) {
@ -104,7 +116,7 @@ static void write_callback(EVENT *ev, FILE_EVENT *fe)
}
if (!(pfd->pfd->events & POLLIN)) {
fe->pfd = NULL;
ring_detach(&pfd->me);
pfd->fe = NULL;
}
@ -117,7 +129,34 @@ static void write_callback(EVENT *ev, FILE_EVENT *fe)
}
pfd->pe->nready++;
}
static void write_callback(EVENT *ev, FILE_EVENT *fe)
{
POLLFD *pfd;
//RING_ITER iter;
RING *iter = fe->pfds.succ, *next = iter;
event_del_write(ev, fe);
SET_WRITABLE(fe);
#if 1
for (; iter != &fe->pfds; iter = next) {
next = next->succ;
pfd = ring_to_appl(iter, POLLFD, me);
if (pfd->pfd->events & POLLOUT) {
handle_poll_write(ev, fe, pfd);
}
}
#else
ring_foreach(iter, &fe->pfds) {
pfd = ring_to_appl(iter.ptr, POLLFD, me);
if (pfd->pfd->events & POLLOUT) {
handle_poll_write(ev, fe, pfd);
break;
}
}
#endif
}
/**
@ -150,7 +189,8 @@ static void poll_event_set(EVENT *ev, POLL_EVENT *pe, int timeout)
SET_WRITEWAIT(pfd->fe);
}
pfd->fe->pfd = pfd;
// Append the POLLFD to the FILE_EVENT's list.
ring_prepend(&pfd->fe->pfds, &pfd->me);
pfd->pfd->revents = 0;
// Add one reference to avoid fe being freeed in advanced.
@ -174,7 +214,7 @@ static void poll_event_clean(EVENT *ev, POLL_EVENT *pe)
for (i = 0; i < pe->nfds; i++) {
POLLFD *pfd = &pe->fds[i];
/* maybe has been cleaned in read_callback/write_callback */
/* Maybe has been cleaned in read_callback/write_callback. */
if (pfd->fe == NULL) {
continue;
}
@ -187,6 +227,7 @@ static void poll_event_clean(EVENT *ev, POLL_EVENT *pe)
pfd->fe->mask &= ~EVENT_POLLIN;
#endif
event_del_read(ev, pfd->fe);
pfd->fe->fiber_r = NULL;
}
if (pfd->pfd->events & POLLOUT) {
CLR_WRITEWAIT(pfd->fe);
@ -194,13 +235,15 @@ static void poll_event_clean(EVENT *ev, POLL_EVENT *pe)
pfd->fe->mask &= ~EVENT_POLLOUT;
#endif
event_del_write(ev, pfd->fe);
pfd->fe->fiber_w = NULL;
}
pfd->fe->pfd = NULL;
// Unrefer the fe because we don't need it again.
//file_event_unrefer(pfd->fe);
pfd->fe = NULL;
// Remove the POLLFD from the FILE_EVENT.
ring_detach(&pfd->me);
pfd->fe = NULL;
}
}
@ -234,6 +277,7 @@ static POLLFD *pollfd_alloc(POLL_EVENT *pe, struct pollfd *fds, nfds_t nfds)
pfds[i].pe = pe;
pfds[i].pfd = &fds[i];
pfds[i].pfd->revents = 0;
ring_init(&pfds[i].me);
SET_POLLING(pfds[i].fe);
}
@ -318,20 +362,19 @@ int WINAPI acl_fiber_poll(struct pollfd *fds, nfds_t nfds, int timeout)
return sys_poll ? (*sys_poll)(fds, nfds, timeout) : -1;
}
curr = acl_fiber_running();
curr = acl_fiber_running();
ev = fiber_io_event();
old_timeout = ev->timeout;
#ifdef SHARE_STACK
if (curr->oflag & ACL_FIBER_ATTR_SHARE_STACK) {
pfds = pollfds_save(fds, nfds);
pe = (POLL_EVENT *) mem_malloc(sizeof(POLL_EVENT));
pe->fds = pollfd_alloc(pe, pfds->fds, nfds);
pfds = pollfds_save(fds, nfds);
pe = (POLL_EVENT *) mem_malloc(sizeof(POLL_EVENT));
pe->fds = pollfd_alloc(pe, pfds->fds, nfds);
} else {
pfds = NULL;
pe = &pevent;
pe->fds = pollfd_alloc(pe, fds, nfds);
pfds = NULL;
pe = &pevent;
pe->fds = pollfd_alloc(pe, fds, nfds);
}
#else
pe = &pevent;

View File

@ -133,6 +133,13 @@ public:
*/
static void stdout_open(bool on);
/**
*
* @param max {int} >= 0
* @return {int}
*/
static int set_fdlimit(int max);
/**
*
* schedule schedule_with

View File

@ -8,13 +8,13 @@ struct ACL_FIBER_SEM;
namespace acl {
typedef enum {
fiber_sem_t_def = 0,
fiber_sem_t_sync = 0,
fiber_sem_t_async = (1 << 0),
} fiber_sem_attr_t;
class FIBER_CPP_API fiber_sem {
public:
fiber_sem(int max, fiber_sem_attr_t attr = fiber_sem_t_def);
fiber_sem(int max, fiber_sem_attr_t attr = fiber_sem_t_async);
~fiber_sem(void);
int wait(void);
@ -50,7 +50,7 @@ template<typename T>
class fiber_sbox {
public:
fiber_sbox(bool free_obj = true, bool async = true)
: sem_(0, async ? fiber_sem_t_async : fiber_sem_t_def)
: sem_(0, async ? fiber_sem_t_async : fiber_sem_t_sync)
, free_obj_(free_obj) {}
~fiber_sbox(void) { clear(free_obj_); }
@ -104,10 +104,29 @@ template<typename T>
class fiber_sbox2 {
public:
fiber_sbox2(bool async = true)
: sem_(0, async ? fiber_sem_t_async : fiber_sem_t_def) {}
: sem_(0, async ? fiber_sem_t_async : fiber_sem_t_sync) {}
~fiber_sbox2(void) {}
#if __cplusplus >= 201103L // Support c++11 ?
void push(T t) {
sbox_.emplace_back(std::move(t));
sem_.post();
}
bool pop(T& t) {
if (sem_.wait() < 0) {
return false;
}
t = std::move(sbox_.front());
sbox_.pop_front();
return true;
}
#else
void push(T t) {
sbox_.push_back(t);
sem_.post();
@ -123,6 +142,8 @@ public:
return true;
}
#endif
size_t size(void) const {
return sem_.num();
}

View File

@ -5,6 +5,13 @@
#include "fiber.hpp"
#include "fiber_tbox.hpp"
// __cplusplus:
// 199711L (C++98 or C++03)
// 201103L (C++11)
// 201402L (C++14)
// 201703L (C++17)
// 202002L (C++20)
#if __cplusplus >= 201103L // Support c++11 ?
struct ACL_FIBER;

View File

@ -308,6 +308,11 @@ void fiber::stdout_open(bool on)
acl_fiber_msg_stdout_enable(on ? 1 : 0);
}
int fiber::set_fdlimit(int max)
{
return acl_fiber_set_fdlimit(max);
}
ACL_FIBER* fiber::fiber_create(void (*fn)(ACL_FIBER*, void*), void* ctx,
size_t stack_size, bool share_stack /* false */)
{

View File

@ -1,68 +1,22 @@
#include "stdafx.h"
#include <memory>
#include <atomic>
class client_socket {
public:
client_socket(acl::socket_stream* conn, std::atomic<long>& nusers)
: conn_(conn), nusers_(nusers) {}
~client_socket(void) {
printf("delete conn=%p\r\n", conn_);
--nusers_;
delete conn_;
}
acl::socket_stream& get_conn(void) {
return *conn_;
}
private:
acl::socket_stream* conn_;
std::atomic<long>& nusers_;
};
using shared_client = std::shared_ptr<client_socket>;
class message {
public:
message(shared_client client, std::atomic<long>& nmsgs,
const char* buf, size_t len)
: client_(client), nmsgs_(nmsgs), buf_(buf, len) {}
~message(void) { --nmsgs_; }
const std::string& get_data(void) const {
return buf_;
}
shared_client get_client(void) {
return client_;
}
private:
shared_client client_;
std::atomic<long>& nmsgs_;
std::string buf_;
};
using shared_message = std::shared_ptr<message>;
#include "server_pool.h"
static void usage(const char* procname) {
printf("usage: %s -h[help]\r\n"
" -s ip:port, default: 127.0.0.1:9001\r\n"
" -c fiber_pool_count [default: 100] \r\n"
" -r timeout\r\n"
" -r read_timeout\r\n"
" -w write_timeout\r\n"
" -S [if in sync mode, default: false]\r\n"
, procname);
}
int main(int argc, char* argv[]) {
acl::string addr("127.0.0.1:9001");
bool sync = false;
int ch, nfibers = 100;
bool sync = false, use_unique = false;
int ch, nfibers = 100, rtimeo = -1, wtimeo = -1;
while ((ch = getopt(argc, argv, "hs:Sc:r:")) > 0) {
while ((ch = getopt(argc, argv, "hs:Sc:r:w:U")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
@ -76,6 +30,15 @@ int main(int argc, char* argv[]) {
case 'c':
nfibers = atoi(optarg);
break;
case 'U':
use_unique = true;
break;
case 'r':
rtimeo = atoi(optarg);
break;
case 'w':
wtimeo = atoi(optarg);
break;
default:
break;
}
@ -84,79 +47,15 @@ int main(int argc, char* argv[]) {
acl::fiber::stdout_open(true);
acl::log::stdout_open(true);
acl::server_socket ss;
if (!ss.open(addr)) {
printf("listen %s error %s\r\n", addr.c_str(), acl::last_serror());
return 1;
if (use_unique) {
#if __cplusplus >= 201402L
server_pool2_run(addr, sync, nfibers);
#else
std::cout << "unique_ptr should be used for c++14" << std::endl;
#endif
} else {
server_pool_run(addr, sync, nfibers, rtimeo, wtimeo);
}
printf("listen on %s, fiber pool: %d\r\n", addr.c_str(), nfibers);
acl::fiber_sbox2<shared_message> box;
for (int i = 0; i < nfibers; i++) {
go[&box] {
while (true) {
shared_message msg;
if (!box.pop(msg)) {
printf("POP end!\r\n");
break;
}
auto client = msg->get_client();
auto data = msg->get_data();
if (client->get_conn().write(data.c_str(), data.size()) == -1) {
printf("write error: %s\r\n", acl::last_serror());
break;
}
}
};
}
std::atomic<long> nusers(0), nmsgs(0);
go[&nusers, &nmsgs] {
while (true) {
std::cout << "client count: " << nusers << "; message count: " << nmsgs << std::endl;
::sleep(1);
}
};
go[&ss, &box, &nusers, &nmsgs, sync] {
while (true) {
auto conn = ss.accept();
if (conn == NULL) {
printf("accept error %s\r\n", acl::last_serror());
break;
}
++nusers;
auto client = std::make_shared<client_socket>(conn, nusers);
go[&box, &nmsgs, client, sync] {
char buf[4096];
while (true) {
int ret = client->get_conn().read(buf, sizeof(buf), false);
if (ret <= 0) {
break;
}
if (sync) {
if (client->get_conn().write(buf, ret) != ret) {
break;
} else {
continue;
}
}
++nmsgs;
auto msg = std::make_shared<message>(client, nmsgs, buf, ret);
box.push(msg);
}
};
}
};
acl::fiber::schedule();
return 0;
}

View File

@ -0,0 +1,152 @@
#include "stdafx.h"
#include <memory>
#include <atomic>
#include "server_pool.h"
static int __rtimeo = 0, __wtimeo = 0;
using shared_stream = std::shared_ptr<acl::socket_stream>;
class socket_client {
public:
socket_client(shared_stream conn, std::atomic<long>& nusers)
: conn_(conn), nusers_(nusers) {}
~socket_client(void) {
--nusers_;
}
acl::socket_stream& get_conn(void) {
return *conn_;
}
private:
shared_stream conn_;
std::atomic<long>& nusers_;
};
using shared_client = std::shared_ptr<socket_client>;
class message {
public:
message(shared_client client, std::atomic<long>& nmsgs,
const char* buf, size_t len)
: client_(client), nmsgs_(nmsgs), buf_(buf, len) {}
~message(void) { --nmsgs_; }
const std::string& get_data(void) const {
return buf_;
}
shared_client get_client(void) {
return client_;
}
private:
shared_client client_;
std::atomic<long>& nmsgs_;
std::string buf_;
};
using shared_message = std::shared_ptr<message>;
void server_pool_run(const char* addr, bool sync, int nfibers,
int rtimeo, int wtimeo) {
acl::server_socket ss;
if (!ss.open(addr)) {
printf("listen %s error %s\r\n", addr, acl::last_serror());
return;
}
__rtimeo = rtimeo;
__wtimeo = wtimeo;
printf("listen on %s, fiber pool: %d\r\n", addr, nfibers);
acl::fiber_sbox2<shared_message> box;
for (int i = 0; i < nfibers; i++) {
go[&box] {
while (true) {
shared_message msg;
if (!box.pop(msg)) {
printf("POP end!\r\n");
break;
}
auto client = msg->get_client();
auto data = msg->get_data();
if (__wtimeo > 0) {
int fd = client->get_conn().sock_handle();
if (acl_write_wait(fd , __wtimeo) < 0) {
printf("write wait error\r\n");
break;
}
}
if (client->get_conn().write(data.c_str(), data.size()) == -1) {
printf("write error: %s\r\n", acl::last_serror());
break;
}
}
};
}
std::atomic<long> nusers(0), nmsgs(0);
#if 0
go[&nusers, &nmsgs] {
while (true) {
std::cout << "client count: " << nusers
<< "; message count: " << nmsgs << std::endl;
::sleep(1);
}
};
#endif
go[&ss, &box, &nusers, &nmsgs, sync] {
while (true) {
auto conn = ss.shared_accept();
if (conn.get() == NULL) {
printf("accept error %s\r\n", acl::last_serror());
break;
}
++nusers;
auto client = std::make_shared<socket_client>(conn, nusers);
go[&box, &nmsgs, client, sync] {
char buf[4096];
while (true) {
if (__rtimeo > 0) {
int fd = client->get_conn().sock_handle();
if (acl_read_wait(fd, __rtimeo) < 0) {
printf("read wait error\r\n");
break;
}
}
int ret = client->get_conn().read(buf, sizeof(buf), false);
if (ret <= 0) {
break;
}
if (sync) {
if (client->get_conn().write(buf, ret) != ret) {
break;
} else {
continue;
}
}
++nmsgs;
auto msg = std::make_shared<message>(client, nmsgs, buf, ret);
box.push(msg);
}
};
}
};
acl::fiber::schedule();
}

View File

@ -0,0 +1,4 @@
#pragma once
void server_pool_run(const char* addr, bool sync, int nfibers, int rtimeo, int wtimeo);
void server_pool2_run(const char* addr, bool sync, int nfibers);

View File

@ -0,0 +1,141 @@
#include "stdafx.h"
#include <memory>
#include <atomic>
#include "server_pool.h"
#if __cplusplus >= 201402L
using unique_stream = std::unique_ptr<acl::socket_stream>;
class socket_client2 {
public:
socket_client2(unique_stream conn, std::atomic<long>& nusers)
: conn_(std::move(conn)), nusers_(nusers) {}
~socket_client2(void) {
printf(">>>socket_client2 deleted<<<\r\n");
--nusers_;
}
acl::socket_stream& get_conn(void) {
return *conn_;
}
private:
unique_stream conn_;
std::atomic<long>& nusers_;
};
using shared_client = std::shared_ptr<socket_client2>;
class message2 {
public:
message2(shared_client client, std::atomic<long>& nmsgs,
const char* buf, size_t len)
: client_(client), nmsgs_(nmsgs), buf_(buf, len) {}
~message2(void) {
--nmsgs_;
}
const std::string& get_data(void) const {
return buf_;
}
shared_client get_client(void) {
return client_;
}
private:
shared_client client_;
std::atomic<long>& nmsgs_;
std::string buf_;
};
using unique_message2 = std::unique_ptr<message2>;
void server_pool2_run(const char* addr, bool sync, int nfibers) {
acl::server_socket ss;
if (!ss.open(addr)) {
printf("listen %s error %s\r\n", addr, acl::last_serror());
return;
}
printf("listen on %s, fiber pool: %d\r\n", addr, nfibers);
acl::fiber_sbox2<unique_message2> box;
for (int i = 0; i < nfibers; i++) {
go[&box] {
while (true) {
unique_message2 msg;
if (!box.pop(msg)) {
continue;
}
auto client = msg->get_client();
auto data = msg->get_data();
if (client->get_conn().write(data.c_str(), data.size()) == -1) {
printf("write error: %s\r\n", acl::last_serror());
break;
}
}
};
}
std::atomic<long> nusers(0), nmsgs(0);
go[&nusers, &nmsgs] {
while (true) {
std::cout << "client count: " << nusers
<< "; message2 count: " << nmsgs << std::endl;
::sleep(1);
}
};
go[&ss, &box, &nusers, &nmsgs, sync] {
while (true) {
auto conn = ss.accept();
if (conn == NULL) {
printf("accept error %s\r\n", acl::last_serror());
break;
}
++nusers;
go[&box, &nmsgs, conn, sync, &nusers] {
unique_stream stream(conn);
auto client = std::make_shared<socket_client2>(std::move(stream), nusers);
char buf[4096];
while (true) {
int ret = client->get_conn().read(buf, sizeof(buf), false);
if (ret <= 0) {
break;
}
if (sync) {
if (client->get_conn().write(buf, ret) != ret) {
break;
} else {
continue;
}
}
++nmsgs;
auto msg = std::make_unique<message2>(client, nmsgs, buf, ret);
box.push(std::move(msg));
}
};
}
};
acl::fiber::schedule();
}
#else
void server_pool2_run(const char*, bool, int) {
std::cout << "Need c++14!" << std::endl;
}
#endif

View File

@ -32,19 +32,51 @@ static long long int __total_count = 0;
static int __total_clients = 0;
static int __total_error_clients = 0;
static int __show_max = 10;
static int __show_count = 0;
static int __fiber_delay = 0;
static int __conn_timeout = -1;
static int __io_timeout = -1;
static int __max_loop = 10000;
static int __max_fibers = 100;
static int __left_fibers = 100;
static int __read_data = 1;
static int __stack_size = 32000;
static int __stack_share = 0;
static int __show_max = 10;
static int __show_count = 0;
static int __fiber_delay = 0;
static int __conn_timeout = -1;
static int __read_timeout = -1;
static int __write_timeout = -1;
static int __max_loop = 10000;
static int __max_fibers = 100;
static int __left_fibers = 100;
static int __read_data = 1;
static int __stack_size = 32000;
static int __stack_share = 0;
static struct timeval __begin;
static int check_read(SOCKET fd, int timeout)
{
struct pollfd pfd;
int n;
memset(&pfd, 0, sizeof(struct pollfd));
pfd.fd = fd;
pfd.events = POLLIN;
#if defined(_WIN32) || defined(_WIN64)
n = acl_fiber_poll(&pfd, 1, timeout);
#else
n = poll(&pfd, 1, timeout);
#endif
if (n < 0) {
printf("poll error: %s\r\n", acl_last_serror());
return -1;
}
if (n == 0) {
return 0;
}
if (pfd.revents & POLLIN) {
return 1;
} else {
printf(">>>poll return n=%d read no ready,fd=%d, pfd=%p\n", n, fd, &pfd);
return 0;
}
}
static int check_write(SOCKET fd, int timeout)
{
struct pollfd pfd;
@ -55,7 +87,7 @@ static int check_write(SOCKET fd, int timeout)
pfd.events = POLLOUT;
#if defined(_WIN32) || defined(_WIN64)
n = WSAPoll(&pfd, 1, timeout);
n = acl_fiber_poll(&pfd, 1, timeout);
#else
n = poll(&pfd, 1, timeout);
#endif
@ -91,6 +123,11 @@ static void echo_client(SOCKET fd)
const char *str = "hello world\r\n";
for (i = 0; i < __max_loop; i++) {
if (__write_timeout > 0 && check_write(fd, __write_timeout * 1000) <= 0) {
printf("write wait error=%s, fd=%d\r\n", acl_last_serror(), fd);
break;
}
#if defined(_WIN32) || defined(_WIN64)
if (acl_fiber_send(fd, str, strlen(str), 0) <= 0) {
#else
@ -112,6 +149,11 @@ static void echo_client(SOCKET fd)
continue;
}
if (__read_timeout > 0 && (ret = check_read(fd, __read_timeout * 1000)) <= 0) {
printf("read wait error=%s, fd=%d, ret=%d\r\n", acl_last_serror(), fd, ret);
break;
}
#if defined(_WIN32) || defined(_WIN64)
ret = acl_fiber_recv(fd, buf, BUF_SIZE, 0);
#else
@ -243,7 +285,8 @@ static void usage(const char *procname)
" -s server_ip\r\n"
" -p server_port\r\n"
" -t connt_timeout\r\n"
" -r io_timeout\r\n"
" -r read_timeout\r\n"
" -w write_timeout\r\n"
" -c max_fibers\r\n"
" -S [if using single IO, dafault: no]\r\n"
" -Z [if sharing fiber stack, default: no]\r\n"
@ -280,7 +323,7 @@ int main(int argc, char *argv[])
snprintf(__server_ip, sizeof(__server_ip), "%s", "127.0.0.1");
while ((ch = getopt(argc, argv, "hc:n:s:p:t:r:Sd:z:e:m:Z")) > 0) {
while ((ch = getopt(argc, argv, "hc:n:s:p:t:r:w:Sd:z:e:m:Z")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
@ -293,7 +336,10 @@ int main(int argc, char *argv[])
__conn_timeout = atoi(optarg);
break;
case 'r':
__io_timeout = atoi(optarg);
__read_timeout = atoi(optarg);
break;
case 'w':
__write_timeout = atoi(optarg);
break;
case 'n':
__max_loop = atoi(optarg);

View File

@ -1,3 +1,4 @@
//#define _GNU_SOURCE
#include "lib_acl.h"
#include <stdio.h>
#include <stdlib.h>

View File

@ -47,18 +47,18 @@ static void fiber4(const acl::string& buf)
static void usage(const char* procname)
{
printf("usage: %s -h [help] -n fibers_count -S\r\n", procname);
printf("usage: %s -h [help] -n fibers_count -m maxfd -S\r\n", procname);
}
int main(int argc, char *argv[])
{
int ch, n = 10;
int ch, n = 10, maxfd = 0;
bool share_stack = false;
acl::acl_cpp_init();
acl::log::stdout_open(true);
while ((ch = getopt(argc, argv, "hn:S")) > 0) {
while ((ch = getopt(argc, argv, "hn:m:S")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
@ -66,6 +66,9 @@ int main(int argc, char *argv[])
case 'n':
n = atoi(optarg);
break;
case 'm':
maxfd = atoi(optarg);
break;
case 'S':
share_stack = true;
break;
@ -74,6 +77,9 @@ int main(int argc, char *argv[])
}
}
int ret = acl::fiber::set_fdlimit(maxfd);
printf("Current maxfd=%d\r\n", ret);
for (int i = 0; i < n; i++) {
acl::fiber* f = new myfiber();
f->start(share_stack ? 8000 : 32000, share_stack);

View File

@ -1,3 +1,4 @@
#include "lib_acl.h"
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
@ -8,11 +9,11 @@
#include <unistd.h>
#include <poll.h>
#endif
#include "lib_acl.h"
#include "fiber/libfiber.h"
#if defined(_WIN32) || defined(_WIN64)
# define POLL WSAPoll
//# define POLL WSAPoll
# define POLL acl_fiber_poll
# define CLOSE acl_fiber_close
# define LISTEN acl_fiber_listen
# define ACCEPT acl_fiber_accept