线程池服务器模板支持 accept 回调处理过程在子线程中进行

可以通过配置项(ioctl_thread_accpet=1)将 accept 的回调处理过程交由线程池中的子线程处理,这样可以在 accept
回调过程处理一些比较花费时间的处理过程而不会阻塞主线程
This commit is contained in:
zsxxsz 2014-09-10 00:08:43 +08:00
parent 65ab0a7659
commit d8ec1e924d
19 changed files with 341 additions and 323 deletions

View File

@ -1,6 +1,12 @@
修改历史列表:
------------------------------------------------------------------------
458) 2014.9.9
458.1) feature: acl_threads_server.c 线程池服务器模板支持通过配置项
(ioctl_thread_accept = 1) 将 on_accept 回调过程放在线程池的子线程中处理
458.2) feature: 事件引擎模块可以区分 ACL_EVENT_ACCEPT/ACL_EVENT_CONNECT 与
ACL_EVENT_READ/ACL_EVENT_WRITE 事件类型了
457) 2014.9.6
457.1) 整理库的所有头文件,使之更规范

View File

@ -12,16 +12,14 @@ extern "C" {
/*+++++++++++++++++++++++++++ 全局宏定义 +++++++++++++++++++++++++++++++++++*/
/* Event codes. */
#define ACL_EVENT_READ (1<<0) /**< read event */
#define ACL_EVENT_WRITE (1<<1) /**< write event */
#define ACL_EVENT_CONNECT ACL_EVENT_WRITE /**< client has connected the server*/
#define ACL_EVENT_XCPT (1<<2) /**< exception */
#define ACL_EVENT_TIME (1<<3) /**< timer event */
#define ACL_EVENT_RW_TIMEOUT (1<<4) /**< read/write timeout event */
#define ACL_EVENT_TIMEOUT ACL_EVENT_RW_TIMEOUT
#define ACL_EVENT_ACCEPT ACL_EVENT_READ
#define ACL_EVENT_CLIENT (ACL_EVENT_ACCEPT)
#define ACL_EVENT_READ (1 << 0) /**< read event */
#define ACL_EVENT_ACCEPT (1 << 1) /**< accept one connection */
#define ACL_EVENT_WRITE (1 << 2) /**< write event */
#define ACL_EVENT_CONNECT (1 << 3) /**< client has connected the server*/
#define ACL_EVENT_XCPT (1 << 4) /**< exception */
#define ACL_EVENT_TIME (1 << 5) /**< timer event */
#define ACL_EVENT_RW_TIMEOUT (1 << 6) /**< read/write timeout event */
#define ACL_EVENT_TIMEOUT ACL_EVENT_RW_TIMEOUT
#define ACL_EVENT_FD_IDLE 0
#define ACL_EVENT_FD_BUSY 1

View File

@ -131,6 +131,10 @@ extern char *acl_var_threads_dispatch_addr;
#define ACL_DEF_THREADS_DISPATCH_TYPE "default"
extern char *acl_var_threads_dispatch_type;
#define ACL_VAR_THREADS_THREAD_ACCEPT "ioctl_thread_accept"
#define ACL_DEF_THREADS_THREAD_ACCEPT 0
extern int acl_var_threads_thread_accept;
#endif /* ACL_UNIX */
#ifdef __cplusplus

View File

@ -61,26 +61,18 @@ static void __connect_notify_callback(int event_type, ACL_EVENT *event,
WRITE_SAFE_DIABLE(astream);
/* 先判断是否是超时导致返回 */
if (event_type == ACL_EVENT_RW_TIMEOUT) {
ret = aio_timeout_callback(astream);
if (ret < 0) {
if ((event_type & ACL_EVENT_RW_TIMEOUT) != 0) {
if (aio_timeout_callback(astream) < 0)
acl_aio_iocp_close(astream);
return;
}
/* 增加引用计数,以防异常关闭 */
ret = aio_timeout_callback(astream);
if (ret < 0) {
acl_aio_iocp_close(astream);
} else if (astream->flag & ACL_AIO_FLAG_IOCP_CLOSE) {
else if (astream->flag & ACL_AIO_FLAG_IOCP_CLOSE)
/* 该流正处于IO延迟关闭状态因为本次写IO已经成功完成
* IO延迟关闭过程
*/
acl_aio_iocp_close(astream);
} else {
else
acl_event_enable_write(event, astream->stream,
astream->timeout, __connect_notify_callback,
astream);
}
return;
}
@ -90,7 +82,7 @@ static void __connect_notify_callback(int event_type, ACL_EVENT *event,
acl_aio_cancel_timer(astream->aio, ConnectTimer, astream);
#endif
if (event_type == ACL_EVENT_XCPT) {
if ((event_type & ACL_EVENT_XCPT) != 0) {
acl_aio_iocp_close(astream);
return;
}
@ -112,20 +104,19 @@ static void __connect_notify_callback(int event_type, ACL_EVENT *event,
acl_set_error(ACL_ENOTCONN);
#endif
if (errno == 0 || errno == ACL_EISCONN) {
if (errno == 0 || errno == ACL_EISCONN)
event_type = ACL_EVENT_CONNECT;
} else if (event_type == ACL_EVENT_CONNECT) {
event_type = ACL_EVENT_XCPT;
}
else if ((event_type & ACL_EVENT_CONNECT) == 0)
event_type |= ACL_EVENT_XCPT;
if (event_type == ACL_EVENT_XCPT) {
if ((event_type & ACL_EVENT_XCPT) != 0) {
astream->flag |= ACL_AIO_FLAG_DEAD;
acl_aio_iocp_close(astream);
return;
}
if (event_type != ACL_EVENT_CONNECT)
acl_msg_fatal("%s: unknown event type(%d)", myname, event_type);
if ((event_type & ACL_EVENT_CONNECT) == 0)
acl_msg_fatal("%s: unknown event: %d", myname, event_type);
/* 将引用计数加1以防止在 connect_fn 内部调用了关闭过程connect_fn
* -1
@ -136,7 +127,7 @@ static void __connect_notify_callback(int event_type, ACL_EVENT *event,
ACL_ITER iter;
ACL_FIFO connect_handles;
/* XXX: 必须将各个回调句柄从回调队列中一一提出置入一个单独队列中,
/* 必须将各个回调句柄从回调队列中一一提出置入一个单独队列中,
* ACL_AIO
*/
@ -153,25 +144,25 @@ static void __connect_notify_callback(int event_type, ACL_EVENT *event,
if (handle == NULL)
break;
ret = handle->callback(astream, handle->ctx);
if (ret != 0) {
astream->nrefer--;
if (ret < 0 || astream->flag & ACL_AIO_FLAG_IOCP_CLOSE)
acl_aio_iocp_close(astream);
return;
}
if (ret == 0)
continue;
astream->nrefer--;
if (ret < 0 || astream->flag & ACL_AIO_FLAG_IOCP_CLOSE)
acl_aio_iocp_close(astream);
return;
}
}
astream->nrefer--;
if (ret < 0) {
if (ret < 0)
acl_aio_iocp_close(astream);
} else if ((astream->flag & ACL_AIO_FLAG_IOCP_CLOSE)) {
else if ((astream->flag & ACL_AIO_FLAG_IOCP_CLOSE))
/* 之前该流已经被设置了IO完成延迟关闭标志位
* IO完成延迟关闭过程
*/
acl_aio_iocp_close(astream);
}
}
ACL_ASTREAM *acl_aio_connect(ACL_AIO *aio, const char *addr, int timeout)
@ -202,21 +193,20 @@ ACL_ASTREAM *acl_aio_connect(ACL_AIO *aio, const char *addr, int timeout)
return (NULL);
}
cstream->flag |= ACL_VSTREAM_FLAG_CONNECTING;
astream = acl_aio_open(aio, cstream);
if (astream == NULL) {
if (astream == NULL)
acl_msg_fatal("%s: open astream error", myname);
}
#ifdef WIN32
if (timeout > 0 && aio->event_mode == ACL_EVENT_WMSG)
acl_aio_request_timer(aio, ConnectTimer, astream, timeout * 1000000, 0);
acl_aio_request_timer(aio, ConnectTimer, astream,
timeout * 1000000, 0);
#endif
astream->error = acl_last_error();
acl_aio_ctl(astream, ACL_AIO_CTL_TIMEOUT, timeout, ACL_AIO_CTL_END);
WRITE_SAFE_ENABLE(astream, __connect_notify_callback);
return (astream);
return astream;
}

View File

@ -28,44 +28,42 @@ static void __accept_notify_callback(int event_type,
ACL_VSTREAM *cstream;
int i;
if (event_type == ACL_EVENT_XCPT) {
char ebuf[256];
if ((event_type & ACL_EVENT_XCPT) != 0) {
acl_msg_error("%s: listen error, sleep 1 second(%s)",
myname, acl_last_strerror(ebuf, sizeof(ebuf)));
myname, acl_last_serror());
sleep(1);
/* not reached here */
return;
} else if (event_type == ACL_EVENT_RW_TIMEOUT) {
} else if ((event_type & ACL_EVENT_RW_TIMEOUT) != 0) {
(void) aio_timeout_callback(astream);
return;
}
if (event_type != ACL_EVENT_READ)
acl_msg_fatal("%s: unknown event type(%d)", myname, event_type);
if ((event_type & ACL_EVENT_READ) == 0)
acl_msg_fatal("%s: unknown event: %d", myname, event_type);
for (i = 0; i < astream->accept_nloop; i++) {
/* cstream read_buf µÄ³¤¶È read_buf_len ¼Ì³Ð×Ô¼àÌýÁ÷µÄ read_buf_len */
/* cstream read_buf ľÄł¤śČ read_buf_len źĚłĐ×ÔźŕĚýÁ÷ľÄ
* read_buf_len
*/
cstream = acl_vstream_accept(stream, NULL, 0);
if (cstream == NULL) {
int ret;
char ebuf[256];
ret = acl_last_error();
if (ret == ACL_EAGAIN || ret == ACL_ECONNABORTED)
break;
acl_msg_fatal("%s: listen exception, error(%s)",
myname, acl_last_strerror(ebuf, sizeof(ebuf)));
myname, acl_last_serror());
break;
}
client = acl_aio_open(astream->aio, cstream);
if (astream->accept_fn(client, astream->context) < 0) {
char ebuf[256];
acl_aio_iocp_close(client);
acl_msg_warn("%s(%d): accept_fn return < 0, "
"close client and break, err(%s)", myname,
__LINE__, acl_last_strerror(ebuf, sizeof(ebuf)));
__LINE__, acl_last_serror());
break;
}
}
@ -91,21 +89,19 @@ static void __listen_notify_callback(int event_type,
ACL_ASTREAM *astream = (ACL_ASTREAM *) context;
int i;
if (event_type == ACL_EVENT_XCPT) {
char ebuf[256];
if ((event_type & ACL_EVENT_XCPT) != 0) {
acl_msg_error("%s: listen error, sleep 1 second(%s)",
myname, acl_last_strerror(ebuf, sizeof(ebuf)));
myname, acl_last_serror());
sleep(1);
return;
} else if (event_type == ACL_EVENT_RW_TIMEOUT) {
} else if ((event_type & ACL_EVENT_RW_TIMEOUT) != 0) {
(void) aio_timeout_callback(astream);
return;
}
for (i = 0; i < astream->accept_nloop; i++) {
if (astream->listen_fn(astream, astream->context) < 0) {
if (astream->listen_fn(astream, astream->context) < 0)
break;
}
}
}

View File

@ -114,13 +114,13 @@ static int read_complete_callback(ACL_ASTREAM *astream, char *data, int len)
ret = handle->callback(astream, handle->ctx, data, len);
if (ret != 0) {
astream->nrefer--;
return (ret);
return ret;
}
}
}
astream->nrefer--;
return (ret);
return ret;
}
/* 尝试性读一行数据
@ -142,7 +142,7 @@ static int __gets_peek(ACL_ASTREAM *astream)
|| astream->stream->errnum == ACL_EAGAIN))
{
READ_SAFE_ENABLE(astream, main_read_callback);
return (0);
return 0;
}
/* XXX: 必须查看缓冲区中是否还有数据,
@ -157,7 +157,7 @@ static int __gets_peek(ACL_ASTREAM *astream)
/* 读出错,需要关闭流 */
astream->flag |= ACL_AIO_FLAG_DEAD;
READ_IOCP_CLOSE(astream);
return (-1);
return -1;
} else if (ready) {
char *ptr = acl_vstring_str(&astream->strbuf);
int len = ACL_VSTRING_LEN(&astream->strbuf);
@ -170,18 +170,18 @@ static int __gets_peek(ACL_ASTREAM *astream)
*/
if (n < 0 || (astream->flag & ACL_AIO_FLAG_IOCP_CLOSE)) {
READ_IOCP_CLOSE(astream);
return (-1);
return -1;
} else if (astream->keep_read == 0
|| (astream->flag & ACL_AIO_FLAG_ISRD) == 0)
{
return (0);
return 0;
}
return (len);
return len;
}
/* 未读到所要求的一行数据,继续监控该流的读行事件 */
READ_SAFE_ENABLE(astream, main_read_callback);
return (0);
return 0;
}
/* 由事件监控过程回调触发的读行事件处理过程 */
@ -193,7 +193,7 @@ static void __gets_notify_callback(int event_type, ACL_ASTREAM *astream)
if (astream->keep_read == 0)
READ_SAFE_DISABLE(astream);
if (event_type == ACL_EVENT_XCPT) {
if ((event_type & ACL_EVENT_XCPT) != 0) {
/* 该流出错,但是有可能关闭的事件通知到达时流依然可读,
*
*
@ -209,12 +209,11 @@ static void __gets_notify_callback(int event_type, ACL_ASTREAM *astream)
} while (ret > 0);
READ_IOCP_CLOSE(astream);
return;
} else if (event_type == ACL_EVENT_RW_TIMEOUT) {
} else if ((event_type & ACL_EVENT_RW_TIMEOUT) != 0) {
/* 读流超时如果应用返回值大于等于0则希望继续读,
* 0
*/
int ret = aio_timeout_callback(astream);
if (ret < 0) {
if (aio_timeout_callback(astream) < 0) {
READ_IOCP_CLOSE(astream);
} else if (astream->flag & ACL_AIO_FLAG_IOCP_CLOSE) {
/* 该流正处于IO延迟关闭状态因为本次读IO已经
@ -228,8 +227,8 @@ static void __gets_notify_callback(int event_type, ACL_ASTREAM *astream)
return;
}
if (event_type != ACL_EVENT_READ)
acl_msg_fatal("%s: unknown event(%d)", myname, event_type);
if ((event_type & ACL_EVENT_READ) == 0)
acl_msg_fatal("%s: unknown event: %d", myname, event_type);
/* 尝试性地读数据 */
while (1) {
@ -331,10 +330,10 @@ static int __read_peek(ACL_ASTREAM *astream)
|| astream->stream->errnum == ACL_EWOULDBLOCK)
{
READ_SAFE_ENABLE(astream, main_read_callback);
return (0);
return 0;
}
/* XXX: 必须查看缓冲区中是否还有数据, 必须兼容数据读不够的情况! */
/* 必须查看缓冲区中是否还有数据, 必须兼容数据读不够的情况! */
if (ACL_VSTRING_LEN(&astream->strbuf) > 0) {
char *ptr = acl_vstring_str(&astream->strbuf);
int len = ACL_VSTRING_LEN(&astream->strbuf);
@ -344,7 +343,7 @@ static int __read_peek(ACL_ASTREAM *astream)
/* 读出错,需要关闭流 */
astream->flag |= ACL_AIO_FLAG_DEAD;
READ_IOCP_CLOSE(astream);
return (-1);
return -1;
} else if (n > 0) {
char *ptr = acl_vstring_str(&astream->strbuf);
int len = ACL_VSTRING_LEN(&astream->strbuf);
@ -357,17 +356,17 @@ static int __read_peek(ACL_ASTREAM *astream)
*/
if (n < 0 || astream->flag & ACL_AIO_FLAG_IOCP_CLOSE) {
READ_IOCP_CLOSE(astream);
return (-1);
return -1;
} else if (astream->keep_read == 0
|| (astream->flag & ACL_AIO_FLAG_ISRD) == 0)
{
return (0);
return 0;
}
return (len);
return len;
} else {
/* 读数据不符合要求,继续监控该读事件 */
READ_SAFE_ENABLE(astream, main_read_callback);
return (0);
return 0;
}
}
@ -380,7 +379,7 @@ static void __read_notify_callback(int event_type, ACL_ASTREAM *astream)
if (astream->keep_read == 0)
READ_SAFE_DISABLE(astream);
if (event_type == ACL_EVENT_XCPT) {
if ((event_type & ACL_EVENT_XCPT) != 0) {
/* 该流出错,但是有可能关闭的事件通知到达时流依然可读,
*
*
@ -392,14 +391,14 @@ static void __read_notify_callback(int event_type, ACL_ASTREAM *astream)
astream->stream->sys_read_ready = 1;
ret = __read_peek(astream);
} while (ret > 0);
READ_IOCP_CLOSE(astream);
return;
} else if (event_type == ACL_EVENT_RW_TIMEOUT) {
} else if ((event_type & ACL_EVENT_RW_TIMEOUT) != 0) {
/* 读流超时如果应用返回值大于等于0则希望继续读,
* 0
*/
int ret = aio_timeout_callback(astream);
if (ret < 0) {
if (aio_timeout_callback(astream) < 0) {
/* 用户希望关闭流 */
READ_IOCP_CLOSE(astream);
} else if (astream->flag & ACL_AIO_FLAG_IOCP_CLOSE) {
@ -410,11 +409,12 @@ static void __read_notify_callback(int event_type, ACL_ASTREAM *astream)
} else {
READ_SAFE_ENABLE(astream, main_read_callback);
}
return;
}
if (event_type != ACL_EVENT_READ)
acl_msg_fatal("%s: unknown event(%d)", myname, event_type);
if ((event_type & ACL_EVENT_READ) == 0)
acl_msg_fatal("%s: unknown event: %d", myname, event_type);
/* 尝试性地读数据 */
while (1) {
@ -499,7 +499,7 @@ static int __readn_peek(ACL_ASTREAM *astream)
|| astream->stream->errnum == ACL_EWOULDBLOCK)
{
READ_SAFE_ENABLE(astream, main_read_callback);
return (0);
return 0;
}
/* XXX: 查看缓冲区中是否还有数据, 必须兼容数据读不够的情况! */
if (ACL_VSTRING_LEN(&astream->strbuf) > 0) {
@ -516,7 +516,7 @@ static int __readn_peek(ACL_ASTREAM *astream)
/* 读出错或读关闭,需要关闭流 */
astream->flag |= ACL_AIO_FLAG_DEAD;
READ_IOCP_CLOSE(astream);
return (-1);
return -1;
} else if (ready) {
/* ok, 已经满足读条件,即已经获得了所要求数据长度的数据 */
char *ptr = acl_vstring_str(&astream->strbuf);
@ -530,17 +530,17 @@ static int __readn_peek(ACL_ASTREAM *astream)
n = read_complete_callback(astream, ptr, len);
if (n < 0 || astream->flag & ACL_AIO_FLAG_IOCP_CLOSE) {
READ_IOCP_CLOSE(astream);
return (-1);
return -1;
} else if (astream->keep_read == 0
|| (astream->flag & ACL_AIO_FLAG_ISRD) == 0)
{
return (0);
return 0;
}
return (len);
return len;
} else {
/* 读数据不符合要求,继续监控该读事件 */
READ_SAFE_ENABLE(astream, main_read_callback);
return (0);
return 0;
}
}
@ -549,12 +549,11 @@ static int __readn_peek(ACL_ASTREAM *astream)
static void __readn_notify_callback(int event_type, ACL_ASTREAM *astream)
{
const char *myname = "__readn_notify_callback";
int n;
if (astream->keep_read == 0)
READ_SAFE_DISABLE(astream);
if (event_type == ACL_EVENT_XCPT) {
if ((event_type & ACL_EVENT_XCPT) != 0) {
/* 该流出错,但是有可能关闭的事件通知到达时流依然可读,
*
*
@ -566,15 +565,14 @@ static void __readn_notify_callback(int event_type, ACL_ASTREAM *astream)
astream->stream->sys_read_ready = 1;
ret = __readn_peek(astream);
} while (astream->keep_read && ret > 0);
READ_IOCP_CLOSE(astream);
return;
} else if (event_type == ACL_EVENT_RW_TIMEOUT) {
} else if ((event_type & ACL_EVENT_RW_TIMEOUT) != 0) {
/* 读流超时如果应用返回值大于等于0则希望继续读,
* 0
*/
n = aio_timeout_callback(astream);
if (n < 0) {
if (aio_timeout_callback(astream) < 0) {
READ_IOCP_CLOSE(astream);
} else if (astream->flag & ACL_AIO_FLAG_IOCP_CLOSE) {
/* 该流正处于IO延迟关闭状态因为本次读IO已经成功完成
@ -587,8 +585,9 @@ static void __readn_notify_callback(int event_type, ACL_ASTREAM *astream)
return;
}
if (event_type != ACL_EVENT_READ)
acl_msg_fatal("%s: unknown event type(%d)", myname, event_type);
if ((event_type & ACL_EVENT_READ) == 0)
acl_msg_fatal("%s: unknown event: %d", myname, event_type);
if (astream->stream == NULL)
acl_msg_fatal("%s: stream null", myname);
@ -654,22 +653,21 @@ ACL_VSTRING *acl_aio_gets_peek(ACL_ASTREAM *astream)
int ready = 0;
if ((astream->flag & ACL_AIO_FLAG_DELAY_CLOSE))
return (NULL);
return NULL;
if (acl_vstream_gets_peek(astream->stream,
&astream->strbuf, &ready) == ACL_VSTREAM_EOF
&& astream->stream->errnum != ACL_EAGAIN
&& astream->stream->errnum != ACL_EWOULDBLOCK)
{
astream->flag |= ACL_AIO_FLAG_DEAD;
if (ACL_VSTRING_LEN(&astream->strbuf) > 0) {
if (ACL_VSTRING_LEN(&astream->strbuf) > 0)
return (&astream->strbuf);
} else {
return (NULL);
}
} else if (ready) {
return (&astream->strbuf);
} else
return (NULL);
else
return NULL;
} else if (ready)
return &astream->strbuf;
else
return NULL;
}
ACL_VSTRING *acl_aio_gets_nonl_peek(ACL_ASTREAM *astream)
@ -677,22 +675,21 @@ ACL_VSTRING *acl_aio_gets_nonl_peek(ACL_ASTREAM *astream)
int ready = 0;
if ((astream->flag & ACL_AIO_FLAG_DELAY_CLOSE))
return (NULL);
return NULL;
if (acl_vstream_gets_nonl_peek(astream->stream,
&astream->strbuf, &ready) == ACL_VSTREAM_EOF
&& astream->stream->errnum != ACL_EAGAIN
&& astream->stream->errnum != ACL_EWOULDBLOCK)
{
astream->flag |= ACL_AIO_FLAG_DEAD;
if (ACL_VSTRING_LEN(&astream->strbuf) > 0) {
return (&astream->strbuf);
} else {
return (NULL);
}
} else if (ready) {
return (&astream->strbuf);
} else
return (NULL);
if (ACL_VSTRING_LEN(&astream->strbuf) > 0)
return &astream->strbuf;
else
return NULL;
} else if (ready)
return &astream->strbuf;
else
return NULL;
}
ACL_VSTRING *acl_aio_read_peek(ACL_ASTREAM *astream)
@ -700,22 +697,21 @@ ACL_VSTRING *acl_aio_read_peek(ACL_ASTREAM *astream)
int n;
if ((astream->flag & ACL_AIO_FLAG_DELAY_CLOSE))
return (NULL);
return NULL;
if ((n = acl_vstream_read_peek(astream->stream,
&astream->strbuf)) == ACL_VSTREAM_EOF
&& astream->stream->errnum != ACL_EAGAIN
&& astream->stream->errnum != ACL_EWOULDBLOCK)
{
astream->flag |= ACL_AIO_FLAG_DEAD;
if (ACL_VSTRING_LEN(&astream->strbuf) > 0) {
return (&astream->strbuf);
} else {
return (NULL);
}
} else if (n > 0) {
return (&astream->strbuf);
} else
return (NULL);
if (ACL_VSTRING_LEN(&astream->strbuf) > 0)
return &astream->strbuf;
else
return NULL;
} else if (n > 0)
return &astream->strbuf;
else
return NULL;
}
ACL_VSTRING *acl_aio_readn_peek(ACL_ASTREAM *astream, int count)
@ -723,27 +719,26 @@ ACL_VSTRING *acl_aio_readn_peek(ACL_ASTREAM *astream, int count)
int ready = 0;
if ((astream->flag & ACL_AIO_FLAG_DELAY_CLOSE))
return (NULL);
return NULL;
if (acl_vstream_readn_peek(astream->stream,
&astream->strbuf, count, &ready) == ACL_VSTREAM_EOF
&& astream->stream->errnum != ACL_EAGAIN
&& astream->stream->errnum != ACL_EWOULDBLOCK)
{
astream->flag |= ACL_AIO_FLAG_DEAD;
if (ACL_VSTRING_LEN(&astream->strbuf) > 0) {
return (&astream->strbuf);
} else {
return (NULL);
}
} else if (ready) {
return (&astream->strbuf);
} else
return (NULL);
if (ACL_VSTRING_LEN(&astream->strbuf) > 0)
return &astream->strbuf;
else
return NULL;
} else if (ready)
return &astream->strbuf;
else
return NULL;
}
int acl_aio_can_read(ACL_ASTREAM *astream)
{
return (acl_vstream_can_read(astream->stream));
return acl_vstream_can_read(astream->stream);
}
static void can_read_callback(int event_type, ACL_EVENT *event acl_unused,
@ -754,12 +749,11 @@ static void can_read_callback(int event_type, ACL_EVENT *event acl_unused,
if (astream->keep_read == 0)
READ_SAFE_DISABLE(astream);
if (event_type == ACL_EVENT_XCPT) {
if ((event_type & ACL_EVENT_XCPT) != 0) {
READ_IOCP_CLOSE(astream);
return;
} else if (event_type == ACL_EVENT_RW_TIMEOUT) {
int ret = aio_timeout_callback(astream);
if (ret < 0) {
} else if ((event_type & ACL_EVENT_RW_TIMEOUT) != 0) {
if (aio_timeout_callback(astream) < 0) {
READ_IOCP_CLOSE(astream);
} else if (astream->flag & ACL_AIO_FLAG_IOCP_CLOSE) {
/* 该流正处于IO延迟关闭状态因为本次读IO已经成功完成
@ -779,9 +773,8 @@ static void can_read_callback(int event_type, ACL_EVENT *event acl_unused,
} else if (astream->flag & ACL_AIO_FLAG_IOCP_CLOSE) {
astream->nrefer--;
READ_IOCP_CLOSE(astream);
} else {
} else
astream->nrefer--;
}
}
void acl_aio_enable_read(ACL_ASTREAM *astream,
@ -823,14 +816,10 @@ void acl_aio_disable_read(ACL_ASTREAM *astream)
int acl_aio_isrset(ACL_ASTREAM *astream)
{
const char *myname = "acl_aio_isrset";
if (astream == NULL)
acl_msg_fatal("%s: input invalid", myname);
if (astream->stream == NULL)
return (0);
return 0;
return (acl_event_isrset(astream->aio->event, astream->stream));
return acl_event_isrset(astream->aio->event, astream->stream);
}
void acl_aio_stream_set_line_length(ACL_ASTREAM *astream, int len)

View File

@ -171,17 +171,13 @@ static void __writen_notify_callback(int event_type, ACL_EVENT *event acl_unused
WRITE_SAFE_DIABLE(astream);
if (event_type == ACL_EVENT_XCPT) {
if ((event_type & ACL_EVENT_XCPT) != 0) {
/* 流发生了错误启动IO完成延迟关闭关闭 */
WRITE_IOCP_CLOSE(astream);
return;
} else if (event_type == ACL_EVENT_RW_TIMEOUT) {
} else if ((event_type & ACL_EVENT_RW_TIMEOUT) != 0) {
/* 写操作超时,如果用户的回调函数返回 -1则启动IO完成延迟关闭过程 */
int ret;
ret = aio_timeout_callback(astream);
if (ret < 0) {
if (aio_timeout_callback(astream) < 0) {
WRITE_IOCP_CLOSE(astream);
} else if (astream->flag & ACL_AIO_FLAG_IOCP_CLOSE) {
/* 该流正处于IO延迟关闭状态因为本次写IO已经成功完成
@ -195,8 +191,8 @@ static void __writen_notify_callback(int event_type, ACL_EVENT *event acl_unused
return;
}
if (event_type != ACL_EVENT_WRITE)
acl_msg_fatal("%s: unknown event type(%d)", myname, event_type);
if ((event_type & ACL_EVENT_WRITE) == 0)
acl_msg_fatal("%s: unknown event: %d", myname, event_type);
/* 尝试发送流的写队列里的数据 */
nleft = __try_fflush(astream);
@ -532,14 +528,11 @@ static void can_write_callback(int event_type, ACL_EVENT *event acl_unused,
WRITE_SAFE_DIABLE(astream);
if (event_type == ACL_EVENT_XCPT) {
if ((event_type & ACL_EVENT_XCPT) != 0) {
WRITE_IOCP_CLOSE(astream);
return;
} else if (event_type == ACL_EVENT_RW_TIMEOUT) {
int ret;
ret = aio_timeout_callback(astream);
if (ret < 0) {
} else if ((event_type & ACL_EVENT_RW_TIMEOUT) != 0) {
if (aio_timeout_callback(astream) < 0) {
WRITE_IOCP_CLOSE(astream);
} else if (astream->flag & ACL_AIO_FLAG_IOCP_CLOSE) {
/* 该流正处于IO延迟关闭状态因为本次读IO已经成功完成

View File

@ -88,7 +88,7 @@ int event_prepare(ACL_EVENT *ev)
void event_fire(ACL_EVENT *ev)
{
ACL_EVENT_FDTABLE *fdp;
int i, event_type;
int i, type;
acl_int64 r_timeout, w_timeout;
ACL_EVENT_NOTIFY_RDWR r_callback, w_callback;
@ -101,9 +101,10 @@ void event_fire(ACL_EVENT *ev)
/* ev->fdtabs_ready[i] maybe be set NULL in timer callback */
if (fdp == NULL || fdp->stream == NULL)
continue;
event_type = fdp->event_type;
if ((event_type & ACL_EVENT_XCPT) != 0) {
type = fdp->event_type;
if ((type & ACL_EVENT_XCPT) != 0) {
fdp->event_type &= ~ACL_EVENT_XCPT;
r_callback = fdp->r_callback;
w_callback = fdp->w_callback;
@ -119,7 +120,7 @@ void event_fire(ACL_EVENT *ev)
continue;
}
if ((event_type & ACL_EVENT_RW_TIMEOUT) != 0) {
if ((type & ACL_EVENT_RW_TIMEOUT) != 0) {
fdp->event_type &= ~ACL_EVENT_RW_TIMEOUT;
r_timeout = fdp->r_timeout;
w_timeout = fdp->w_timeout;
@ -141,21 +142,22 @@ void event_fire(ACL_EVENT *ev)
continue;
}
if ((event_type & ACL_EVENT_READ) != 0) {
fdp->event_type &= ~ACL_EVENT_READ;
if ((type & (ACL_EVENT_READ | ACL_EVENT_ACCEPT))) {
fdp->event_type &= ~(ACL_EVENT_READ | ACL_EVENT_ACCEPT);
if (fdp->r_timeout > 0)
fdp->r_ttl = ev->present + fdp->r_timeout;
fdp->r_callback(ACL_EVENT_READ, ev,
fdp->stream, fdp->r_context);
fdp->r_callback(type, ev, fdp->stream, fdp->r_context);
}
/* ev->fdtabs_ready[i] maybe be set NULL in fdp->r_callback() */
if ((event_type & ACL_EVENT_WRITE) && ev->fdtabs_ready[i]) {
if (ev->fdtabs_ready[i] == NULL)
continue;
if ((type & (ACL_EVENT_WRITE | ACL_EVENT_CONNECT))) {
if (fdp->w_timeout > 0)
fdp->w_ttl = ev->present + fdp->w_timeout;
fdp->event_type &= ~ACL_EVENT_WRITE;
fdp->w_callback(ACL_EVENT_WRITE, ev,
fdp->stream, fdp->w_context);
fdp->event_type &= ~(ACL_EVENT_WRITE | ACL_EVENT_CONNECT);
fdp->w_callback(type, ev, fdp->stream, fdp->w_context);
}
}
@ -215,9 +217,10 @@ int event_thr_prepare(ACL_EVENT *ev)
void event_thr_fire(ACL_EVENT *ev)
{
ACL_EVENT_FDTABLE *fdp;
ACL_EVENT_NOTIFY_RDWR worker_fn;
ACL_EVENT_NOTIFY_RDWR callback;
ACL_VSTREAM *stream;
void *worker_arg;
void *context;
int type;
int i;
if (ev->fire_begin)
@ -231,54 +234,54 @@ void event_thr_fire(ACL_EVENT *ev)
continue;
stream = fdp->stream;
type = fdp->event_type;
if (fdp->event_type & ACL_EVENT_READ) {
fdp->event_type &= ~ACL_EVENT_READ;
worker_fn = fdp->r_callback;
worker_arg = fdp->r_context;
if ((type & (ACL_EVENT_READ | ACL_EVENT_ACCEPT))) {
fdp->event_type &= ~(ACL_EVENT_READ | ACL_EVENT_ACCEPT);
callback = fdp->r_callback;
context = fdp->r_context;
if (!fdp->listener)
ev->disable_readwrite_fn(ev, stream);
worker_fn(ACL_EVENT_READ, ev, stream, worker_arg);
} else if (fdp->event_type & ACL_EVENT_WRITE) {
fdp->event_type &= ~ACL_EVENT_WRITE;
worker_fn = fdp->w_callback;
worker_arg = fdp->w_context;
callback(ACL_EVENT_READ, ev, stream, context);
} else if ((type & (ACL_EVENT_WRITE | ACL_EVENT_CONNECT))) {
fdp->event_type &= ~(ACL_EVENT_WRITE | ACL_EVENT_CONNECT);
callback = fdp->w_callback;
context = fdp->w_context;
ev->disable_readwrite_fn(ev, stream);
worker_fn(ACL_EVENT_WRITE, ev, stream, worker_arg);
} else if (fdp->event_type & ACL_EVENT_RW_TIMEOUT) {
callback(ACL_EVENT_WRITE, ev, stream, context);
} else if ((type & ACL_EVENT_RW_TIMEOUT)) {
fdp->event_type &= ~ACL_EVENT_RW_TIMEOUT;
if (fdp->r_callback) {
worker_fn = fdp->r_callback;
worker_arg = fdp->r_context;
callback = fdp->r_callback;
context = fdp->r_context;
} else if (fdp->w_callback) {
worker_fn = fdp->w_callback;
worker_arg = fdp->w_context;
callback = fdp->w_callback;
context = fdp->w_context;
} else {
worker_fn = NULL;
worker_arg = NULL;
callback = NULL;
context = NULL;
}
if (!fdp->listener)
ev->disable_readwrite_fn(ev, stream);
if (worker_fn)
worker_fn(ACL_EVENT_RW_TIMEOUT, ev,
stream, worker_arg);
} else if (fdp->event_type & ACL_EVENT_XCPT) {
if (callback)
callback(ACL_EVENT_RW_TIMEOUT, ev,
stream, context);
} else if ((type & ACL_EVENT_XCPT)) {
fdp->event_type &= ~ACL_EVENT_XCPT;
if (fdp->r_callback) {
worker_fn = fdp->r_callback;
worker_arg = fdp->r_context;
callback = fdp->r_callback;
context = fdp->r_context;
} else if (fdp->w_callback) {
worker_fn = fdp->w_callback;
worker_arg = fdp->w_context;
callback = fdp->w_callback;
context = fdp->w_context;
} else {
worker_fn = NULL;
worker_arg = NULL;
callback = NULL;
context = NULL;
}
if (!fdp->listener)
ev->disable_readwrite_fn(ev, stream);
if (worker_fn)
worker_fn(ACL_EVENT_XCPT, ev,
stream, worker_arg);
if (callback)
callback(ACL_EVENT_XCPT, ev, stream, context);
}
}

View File

@ -483,13 +483,15 @@ static void event_loop(ACL_EVENT *eventp)
continue;
if ((bp->events & EPOLLIN) != 0) {
fdp->stream->sys_read_ready = 1;
if ((fdp->event_type & ACL_EVENT_READ) == 0) {
fdp->event_type |= ACL_EVENT_READ;
fdp->fdidx_ready = eventp->fdcnt_ready;
eventp->fdtabs_ready[eventp->fdcnt_ready] = fdp;
eventp->fdcnt_ready++;
}
if (fdp->listener)
fdp->event_type |= ACL_EVENT_ACCEPT;
fdp->stream->sys_read_ready = 1;
} else if ((bp->events & EPOLLOUT) != 0) {
fdp->event_type |= ACL_EVENT_WRITE;
fdp->fdidx_ready = eventp->fdcnt_ready;

View File

@ -302,7 +302,7 @@ DEL_READ_TAG:
fdp->r_ttl = 0;
fdp->r_timeout = 0;
fdp->r_callback = NULL;
fdp->event_type &= ~ACL_EVENT_READ;
fdp->event_type &= ~(ACL_EVENT_READ | ACL_EVENT_ACCEPT);
if ((fdp->flag & EVENT_FDTABLE_FLAG_WRITE)
|| (fdp->flag & EVENT_FDTABLE_FLAG_ADD_WRITE))
@ -369,7 +369,7 @@ DEL_WRITE_TAG:
fdp->w_ttl = 0;
fdp->w_timeout = 0;
fdp->w_callback = NULL;
fdp->event_type &= ~ACL_EVENT_WRITE;
fdp->event_type &= ~(ACL_EVENT_WRITE | ACL_EVENT_CONNECT);
if ((fdp->flag & EVENT_FDTABLE_FLAG_READ)
|| (fdp->flag & EVENT_FDTABLE_FLAG_ADD_READ))
@ -620,7 +620,7 @@ static int disable_read(EVENT_KERNEL *ev, ACL_EVENT_FDTABLE *fdp)
fdp->flag &= ~EVENT_FDTABLE_FLAG_DEL_READ;
fdp->flag &= ~EVENT_FDTABLE_FLAG_READ;
fdp->event_type &= ~ACL_EVENT_READ;
fdp->event_type &= ~(ACL_EVENT_READ | ACL_EVENT_ACCEPT);
return 1;
}
@ -630,7 +630,7 @@ static int disable_write(EVENT_KERNEL *ev, ACL_EVENT_FDTABLE *fdp)
fdp->flag &= ~EVENT_FDTABLE_FLAG_DEL_WRITE;
fdp->flag &= ~EVENT_FDTABLE_FLAG_WRITE;
fdp->event_type &= ~ACL_EVENT_WRITE;
fdp->event_type &= ~(ACL_EVENT_WRITE | ACL_EVENT_CONNECT);
return 1;
}
@ -815,6 +815,8 @@ TAG_DONE:
| ACL_EVENT_WRITE)) == 0)
{
fdp->event_type |= ACL_EVENT_READ;
if (fdp->listener)
fdp->event_type |= ACL_EVENT_ACCEPT;
fdp->fdidx_ready = eventp->fdcnt_ready;
eventp->fdtabs_ready[eventp->fdcnt_ready] = fdp;
eventp->fdcnt_ready++;

View File

@ -308,7 +308,7 @@ DEL_READ_TAG:
fdp->r_ttl = 0;
fdp->r_timeout = 0;
fdp->r_callback = NULL;
fdp->event_type &= ~ACL_EVENT_READ;
fdp->event_type &= ~(ACL_EVENT_READ | ACL_EVENT_ACCEPT);
if ((fdp->flag & EVENT_FDTABLE_FLAG_WRITE)
|| (fdp->flag & EVENT_FDTABLE_FLAG_ADD_WRITE))
@ -375,7 +375,7 @@ DEL_WRITE_TAG:
fdp->w_ttl = 0;
fdp->w_timeout = 0;
fdp->w_callback = NULL;
fdp->event_type &= ~ACL_EVENT_WRITE;
fdp->event_type &= ~(ACL_EVENT_WRITE | ACL_EVENT_CONNECT);
if ((fdp->flag & EVENT_FDTABLE_FLAG_READ)
|| (fdp->flag & EVENT_FDTABLE_FLAG_ADD_READ))
@ -520,7 +520,7 @@ static int disable_read(EVENT_KERNEL *ev, ACL_EVENT_FDTABLE *fdp)
fdp->flag &= ~EVENT_FDTABLE_FLAG_DEL_READ;
fdp->flag &= ~EVENT_FDTABLE_FLAG_READ;
fdp->event_type &= ~ACL_EVENT_READ;
fdp->event_type &= ~(ACL_EVENT_READ | ACL_EVENT_ACCEPT);
if ((fdp->flag & EVENT_FDTABLE_FLAG_WRITE)) {
#if (ACL_EVENTS_KERNEL_STYLE == ACL_EVENTS_STYLE_KQUEUE)
@ -556,7 +556,7 @@ static int disable_write(EVENT_KERNEL *ev, ACL_EVENT_FDTABLE *fdp)
fdp->flag &= ~EVENT_FDTABLE_FLAG_DEL_WRITE;
fdp->flag &= ~EVENT_FDTABLE_FLAG_WRITE;
fdp->event_type &= ~ACL_EVENT_WRITE;
fdp->event_type &= ~(ACL_EVENT_WRITE | ACL_EVENT_CONNECT);
if ((fdp->flag & EVENT_FDTABLE_FLAG_READ)) {
#if (ACL_EVENTS_KERNEL_STYLE == ACL_EVENTS_STYLE_KQUEUE)
@ -726,6 +726,9 @@ static void event_loop(ACL_EVENT *eventp)
if ((fdp->event_type & (ACL_EVENT_READ | ACL_EVENT_WRITE)) == 0)
{
fdp->event_type |= ACL_EVENT_READ;
if (fdp->listener)
fdp->event_type |= ACL_EVENT_ACCEPT;
fdp->fdidx_ready = eventp->fdcnt_ready;
eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp;
}

View File

@ -455,12 +455,14 @@ static void event_loop(ACL_EVENT *eventp)
if ((fdp->flag & EVENT_FDTABLE_FLAG_READ) && EVENT_TEST_READ(bp))
{
fdp->stream->sys_read_ready = 1;
if ((fdp->event_type & ACL_EVENT_READ) == 0) {
fdp->event_type |= ACL_EVENT_READ;
fdp->fdidx_ready = eventp->fdcnt_ready;
eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp;
}
if (fdp->listener)
fdp->event_type |= ACL_EVENT_ACCEPT;
fdp->stream->sys_read_ready = 1;
} else if ((fdp->flag & EVENT_FDTABLE_FLAG_WRITE)
&& EVENT_TEST_WRITE(bp))
{

View File

@ -207,7 +207,7 @@ static void event_disable_read(ACL_EVENT *eventp, ACL_VSTREAM *stream)
fdp->r_ttl = 0;
fdp->r_timeout = 0;
fdp->r_callback = NULL;
fdp->event_type &= ~ACL_EVENT_READ;
fdp->event_type &= ~(ACL_EVENT_READ | ACL_EVENT_ACCEPT);
fdp->flag &= ~EVENT_FDTABLE_FLAG_READ;
if ((fdp->flag & EVENT_FDTABLE_FLAG_WRITE)) {
@ -265,7 +265,7 @@ static void event_disable_write(ACL_EVENT *eventp, ACL_VSTREAM *stream)
fdp->w_ttl = 0;
fdp->w_timeout = 0;
fdp->w_callback = NULL;
fdp->event_type &= ~ACL_EVENT_WRITE;
fdp->event_type &= ~(ACL_EVENT_WRITE | ACL_EVENT_CONNECT);
fdp->flag &= ~EVENT_FDTABLE_FLAG_WRITE;
if ((fdp->flag & EVENT_FDTABLE_FLAG_READ)) {
@ -442,6 +442,8 @@ static void event_loop(ACL_EVENT *eventp)
if ((fdp->event_type & (ACL_EVENT_READ | ACL_EVENT_WRITE)) == 0)
{
fdp->event_type |= ACL_EVENT_READ;
if (fdp->listener)
fdp->event_type |= ACL_EVENT_ACCEPT;
fdp->fdidx_ready = eventp->fdcnt_ready;
eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp;
}

View File

@ -407,6 +407,8 @@ static void event_loop(ACL_EVENT *eventp)
fdp->stream->sys_read_ready = 1;
if ((fdp->event_type & ACL_EVENT_READ) == 0) {
fdp->event_type |= ACL_EVENT_READ;
if (fdp->listener)
fdp->event_type |= ACL_EVENT_ACCEPT;
fdp->fdidx_ready = eventp->fdcnt_ready;
eventp->fdtabs_ready[eventp->fdcnt_ready] = fdp;
eventp->fdcnt_ready++;

View File

@ -197,7 +197,7 @@ static void event_disable_read(ACL_EVENT *eventp, ACL_VSTREAM *stream)
fdp->r_ttl = 0;
fdp->r_timeout = 0;
fdp->r_callback = NULL;
fdp->event_type &= ~ACL_EVENT_READ;
fdp->event_type &= ~(ACL_EVENT_READ | ACL_EVENT_ACCEPT);
fdp->flag &= ~EVENT_FDTABLE_FLAG_READ;
if ((fdp->flag & EVENT_FDTABLE_FLAG_WRITE)) {
@ -259,7 +259,7 @@ static void event_disable_write(ACL_EVENT *eventp, ACL_VSTREAM *stream)
fdp->w_ttl = 0;
fdp->w_timeout = 0;
fdp->w_callback = NULL;
fdp->event_type &= ~ACL_EVENT_WRITE;
fdp->event_type &= ~(ACL_EVENT_WRITE | ACL_EVENT_CONNECT);
fdp->flag &= ~EVENT_FDTABLE_FLAG_WRITE;
if ((fdp->flag & EVENT_FDTABLE_FLAG_READ)) {
@ -471,6 +471,8 @@ static void event_loop(ACL_EVENT *eventp)
if ((fdp->event_type & (ACL_EVENT_READ | ACL_EVENT_WRITE)) == 0)
{
fdp->event_type |= ACL_EVENT_READ;
if (fdp->listener)
fdp->event_type |= ACL_EVENT_ACCEPT;
fdp->fdidx_ready = eventp->fdcnt_ready;
eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp;
}

View File

@ -433,6 +433,8 @@ static void event_loop(ACL_EVENT *eventp)
/* has been set in fdtabs_ready ? */
if ((fdp->event_type & ACL_EVENT_READ) == 0) {
fdp->event_type |= ACL_EVENT_READ;
if (fdp->listener)
fdp->event_type |= ACL_EVENT_ACCEPT;
fdp->fdidx_ready = eventp->fdcnt_ready;
eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp;
}

View File

@ -236,7 +236,7 @@ static void event_disable_read(ACL_EVENT *eventp, ACL_VSTREAM *stream)
fdp->r_ttl = 0;
fdp->r_timeout = 0;
fdp->r_callback = NULL;
fdp->event_type &= ~ACL_EVENT_READ;
fdp->event_type &= ~(ACL_EVENT_READ | ACL_EVENT_ACCEPT);
fdp->flag &= ~EVENT_FDTABLE_FLAG_READ;
if ((fdp->flag & EVENT_FDTABLE_FLAG_WRITE)) {
@ -293,7 +293,7 @@ static void event_disable_write(ACL_EVENT *eventp, ACL_VSTREAM *stream)
fdp->w_ttl = 0;
fdp->w_timeout = 0;
fdp->w_callback = NULL;
fdp->event_type &= ~ACL_EVENT_WRITE;
fdp->event_type &= ~(ACL_EVENT_WRITE | ACL_EVENT_CONNECT);
fdp->flag &= ~EVENT_FDTABLE_FLAG_WRITE;
if ((fdp->flag & EVENT_FDTABLE_FLAG_READ)) {
@ -505,8 +505,8 @@ static void handleConnect(EVENT_WMSG *ev, ACL_SOCKET sockfd)
myname, __LINE__, (int) sockfd);
else {
fdp->stream->flag &= ~ACL_VSTREAM_FLAG_CONNECTING;
fdp->w_callback(ACL_EVENT_WRITE, &ev->event,
fdp->stream, fdp->w_context);
fdp->w_callback(ACL_EVENT_WRITE | ACL_EVENT_CONNECT,
&ev->event, fdp->stream, fdp->w_context);
}
}
@ -520,7 +520,7 @@ static void handleAccept(EVENT_WMSG *ev, ACL_SOCKET sockfd)
else if (fdp->r_callback == NULL)
acl_msg_fatal("%s(%d): fdp callback null", myname, __LINE__);
fdp->r_callback(ACL_EVENT_READ, &ev->event,
fdp->r_callback(ACL_EVENT_READ | ACL_EVENT_ACCEPT, &ev->event,
fdp->stream, fdp->r_context);
}

View File

@ -84,6 +84,7 @@ int acl_var_threads_check_inter;
int acl_var_threads_qlen_warn;
int acl_var_threads_schedule_warn;
int acl_var_threads_schedule_wait;
int acl_var_threads_thread_accept;
static ACL_CONFIG_INT_TABLE __conf_int_tab[] = {
{ ACL_VAR_THREADS_BUF_SIZE, ACL_DEF_THREADS_BUF_SIZE, &acl_var_threads_buf_size, 0, 0 },
@ -108,6 +109,7 @@ static ACL_CONFIG_INT_TABLE __conf_int_tab[] = {
{ ACL_VAR_THREADS_SCHEDULE_WARN, ACL_DEF_THREADS_SCHEDULE_WARN, &acl_var_threads_schedule_warn, 0, 0 },
{ ACL_VAR_THREADS_SCHEDULE_WAIT, ACL_DEF_THREADS_SCHEDULE_WAIT, &acl_var_threads_schedule_wait, 0, 0 },
{ ACL_VAR_THREADS_CHECK_INTER, ACL_DEF_THREADS_CHECK_INTER, &acl_var_threads_check_inter, 0, 0 },
{ ACL_VAR_THREADS_THREAD_ACCEPT, ACL_DEF_THREADS_THREAD_ACCEPT, &acl_var_threads_thread_accept, 0, 0 },
{ 0, 0, 0, 0, 0 },
};
@ -273,14 +275,15 @@ static void listen_cleanup(ACL_EVENT *event)
acl_event_disable_readwrite(event, __sstreams[i]);
/**
*
* 线线线线
* 线 select()
* 线线
* 线 select(),
* select() 使
* select()
*
*
* 线线线
* 线线 select()
* 线
* 线
* 线 select(), select()
* 使
*
* select
*/
acl_event_request_timer(event, listen_cleanup_timer, NULL, 1000000, 0);
}
@ -396,24 +399,37 @@ typedef struct {
void *serv_arg;
} READ_CTX;
static void client_wakeup(ACL_EVENT *event, acl_pthread_pool_t *threads,
ACL_VSTREAM *stream)
{
READ_CTX *ctx = (READ_CTX*) stream->ioctl_read_ctx;
(void) threads;
if (acl_var_threads_status_notify && acl_var_threads_master_maxproc > 1
&& acl_master_notify(acl_var_threads_pid, __server_generation,
ACL_MASTER_STAT_TAKEN) < 0)
{
server_abort(ACL_EVENT_NULL_TYPE, event, stream,
ACL_EVENT_NULL_CONTEXT);
}
acl_event_enable_read(event, stream, stream->rw_timeout,
ctx->read_callback, ctx);
if (acl_var_threads_status_notify && acl_var_threads_master_maxproc > 1
&& acl_master_notify(acl_var_threads_pid, __server_generation,
ACL_MASTER_STAT_AVAIL) < 0)
{
server_abort(ACL_EVENT_NULL_TYPE, event, stream,
ACL_EVENT_NULL_CONTEXT);
}
}
static void thread_callback(void *arg)
{
READ_CTX *ctx = (READ_CTX*) arg;
int ret;
switch (ctx->event_type) {
case ACL_EVENT_READ:
ret = ctx->serv_callback(ctx->stream, ctx->serv_arg);
if (ret < 0) {
if (__server_on_close != NULL)
__server_on_close(ctx->stream, ctx->serv_arg);
acl_vstream_close(ctx->stream);
} else if (ret == 0)
acl_event_enable_read(ctx->event, ctx->stream,
ctx->stream->rw_timeout,
ctx->read_callback, ctx);
break;
case ACL_EVENT_RW_TIMEOUT:
if ((ctx->event_type & ACL_EVENT_RW_TIMEOUT) != 0) {
if (__server_on_timeout == NULL) {
if (__server_on_close != NULL)
__server_on_close(ctx->stream, ctx->serv_arg);
@ -426,17 +442,32 @@ static void thread_callback(void *arg)
acl_event_enable_read(ctx->event, ctx->stream,
ctx->stream->rw_timeout,
ctx->read_callback, ctx);
break;
case ACL_EVENT_XCPT:
} else if ((ctx->event_type & ACL_EVENT_XCPT) != 0) {
if (__server_on_close != NULL)
__server_on_close(ctx->stream, ctx->serv_arg);
acl_vstream_close(ctx->stream);
break;
default:
} else if ((ctx->event_type & ACL_EVENT_ACCEPT) != 0) {
acl_assert(__server_on_accept != NULL);
if (__server_on_accept(ctx->stream) >= 0)
client_wakeup(ctx->event, ctx->threads, ctx->stream);
else if (__server_on_close != NULL) {
__server_on_close(ctx->stream, __service_ctx);
acl_vstream_close(ctx->stream);
}
} else if ((ctx->event_type & ACL_EVENT_READ) != 0) {
int ret = ctx->serv_callback(ctx->stream, ctx->serv_arg);
if (ret < 0) {
if (__server_on_close != NULL)
__server_on_close(ctx->stream, ctx->serv_arg);
acl_vstream_close(ctx->stream);
} else if (ret == 0)
acl_event_enable_read(ctx->event, ctx->stream,
ctx->stream->rw_timeout,
ctx->read_callback, ctx);
} else
acl_msg_fatal("%s, %s(%d): unknown event type(%d)",
__FILE__, __FUNCTION__, __LINE__, ctx->event_type);
break;
}
}
static void read_callback1(int event_type, ACL_EVENT *event acl_unused,
@ -475,51 +506,6 @@ static void free_ctx(ACL_VSTREAM *stream acl_unused, void *context)
acl_myfree(ctx);
}
static void server_execute(ACL_EVENT *event, acl_pthread_pool_t *threads,
ACL_VSTREAM *stream)
{
READ_CTX *ctx;
if (acl_var_threads_status_notify && acl_var_threads_master_maxproc > 1
&& acl_master_notify(acl_var_threads_pid, __server_generation,
ACL_MASTER_STAT_TAKEN) < 0)
{
server_abort(ACL_EVENT_NULL_TYPE, event, stream,
ACL_EVENT_NULL_CONTEXT);
}
if (stream->ioctl_read_ctx == NULL) {
ctx = (READ_CTX*) acl_mymalloc(sizeof(READ_CTX));
ctx->stream = stream;
ctx->threads = threads;
ctx->event = event;
ctx->event_type = -1;
ctx->serv_callback = __service_main;
ctx->serv_arg = __service_ctx;
ctx->job = acl_pthread_pool_alloc_job(thread_callback, ctx, 1);
if (acl_var_threads_batadd)
ctx->read_callback = read_callback1;
else
ctx->read_callback = read_callback2;
stream->ioctl_read_ctx = ctx;
acl_vstream_add_close_handle(stream, free_ctx, ctx);
} else
ctx = (READ_CTX*) stream->ioctl_read_ctx;
acl_event_enable_read(event, stream, stream->rw_timeout,
ctx->read_callback, ctx);
if (acl_var_threads_status_notify && acl_var_threads_master_maxproc > 1
&& acl_master_notify(acl_var_threads_pid, __server_generation,
ACL_MASTER_STAT_AVAIL) < 0)
{
server_abort(ACL_EVENT_NULL_TYPE, event, stream,
ACL_EVENT_NULL_CONTEXT);
}
}
static void decrease_counter_callback(ACL_VSTREAM *stream acl_unused,
void *arg acl_unused)
{
@ -527,7 +513,47 @@ static void decrease_counter_callback(ACL_VSTREAM *stream acl_unused,
decrease_client_counter();
}
static void server_wakeup(ACL_EVENT *event, acl_pthread_pool_t *threads,
static void create_job(ACL_EVENT *event, acl_pthread_pool_t *threads,
ACL_VSTREAM *stream)
{
READ_CTX *ctx = (READ_CTX*) acl_mymalloc(sizeof(READ_CTX));
ctx->stream = stream;
ctx->threads = threads;
ctx->event = event;
ctx->event_type = -1;
ctx->serv_callback = __service_main;
ctx->serv_arg = __service_ctx;
ctx->job = acl_pthread_pool_alloc_job(thread_callback, ctx, 1);
if (acl_var_threads_batadd)
ctx->read_callback = read_callback1;
else
ctx->read_callback = read_callback2;
stream->ioctl_read_ctx = ctx;
acl_vstream_add_close_handle(stream, free_ctx, ctx);
/* 如果没有 on_accept 回调函数,则直接在主线程中处理 */
if (__server_on_accept == NULL)
client_wakeup(event, threads, stream);
/* 为了防止 on_accept 的回调处理过程过长阻塞了主线程,可以通过配置
* 线线
*/
else if (acl_var_threads_thread_accept) {
ctx->event_type = ACL_EVENT_ACCEPT;
acl_pthread_pool_add_job(ctx->threads, ctx->job);
} else if (__server_on_accept(stream) >= 0)
client_wakeup(event, threads, stream);
else {
if (__server_on_close != NULL)
__server_on_close(stream, __service_ctx);
acl_vstream_close(stream);
}
}
static void client_open(ACL_EVENT *event, acl_pthread_pool_t *threads,
int fd, const char *remote, const char *local)
{
ACL_VSTREAM *stream;
@ -550,6 +576,7 @@ static void server_wakeup(ACL_EVENT *event, acl_pthread_pool_t *threads,
*ptr = 0;
} else
addr[0] = 0;
if (local)
acl_vstream_set_local(stream, local);
@ -559,25 +586,14 @@ static void server_wakeup(ACL_EVENT *event, acl_pthread_pool_t *threads,
*/
acl_vstream_add_close_handle(stream, decrease_counter_callback, NULL);
if (__server_on_accept) {
if (__server_on_accept(stream) < 0) {
if (__server_on_close != NULL)
__server_on_close(stream, __service_ctx);
acl_vstream_close(stream);
return;
}
}
if (addr[0] != 0 && !acl_access_permit(addr)) {
if (__deny_info && *__deny_info)
acl_vstream_fprintf(stream, "%s\r\n", __deny_info);
if (__server_on_close != NULL)
__server_on_close(stream, __service_ctx);
acl_vstream_close(stream);
return;
}
server_execute(event, threads, stream);
} else
create_job(event, threads, stream);
}
/* restart listening */
@ -627,7 +643,7 @@ static void server_accept_sock(int event_type, ACL_EVENT *event,
acl_tcp_set_nodelay(fd);
if (acl_getsockname(fd, local, sizeof(local)) < 0)
memset(local, 0, sizeof(local));
server_wakeup(event, threads, fd, remote, local);
client_open(event, threads, fd, remote, local);
continue;
}
@ -876,7 +892,7 @@ static void dispatch_receive(int event_type acl_unused, ACL_EVENT *event,
remote[0] = 0;
/* begin handle one client connection same as accept */
server_wakeup(event, threads, fd, remote, local);
client_open(event, threads, fd, remote, local);
acl_event_enable_read(event, conn, 0, dispatch_receive, threads);
}

View File

@ -47,6 +47,8 @@ master_service::~master_service()
bool master_service::thread_on_read(acl::socket_stream* stream)
{
logger("thread id: %lu", acl_pthread_self());
acl::http_response res(stream);
// 响应数据体为 xml 格式
res.response_header().set_content_type("text/html");
@ -82,6 +84,7 @@ bool master_service::thread_on_accept(acl::socket_stream* conn)
{
if (0)
acl_tcp_so_linger(conn->sock_handle(), 1, 0);
logger("thread id: %lu", acl_pthread_self());
return true;
}
@ -92,6 +95,7 @@ bool master_service::thread_on_timeout(acl::socket_stream*)
void master_service::thread_on_close(acl::socket_stream*)
{
logger("thread id: %lu", acl_pthread_self());
}
void master_service::thread_on_init()
@ -104,6 +108,8 @@ void master_service::thread_on_exit()
void master_service::proc_on_init()
{
logger("thread id: %lu", acl_pthread_self());
if (var_cfg_buf_size <= 0)
var_cfg_buf_size = 1024;
res_buf_ = (char*) acl_mymalloc(var_cfg_buf_size + 1);