mirror of
https://gitee.com/acl-dev/acl.git
synced 2024-12-02 11:57:43 +08:00
aio_istream/aio_ostream can be created with socket fd.
This commit is contained in:
parent
d4b6538dcd
commit
55e29c55b3
@ -3,6 +3,8 @@
|
||||
575) 2019.6.11
|
||||
575.1) bugfix: websocket::set_frame_pong 中没有设置 payload length
|
||||
575.2) bugfix: polarssl_io.cpp 中针对静态编译缺少函数声明,导致编译有错
|
||||
575.3) feature: aio_istream/aio_ostream 分别提供输入参数为套接字句柄的构造方法,
|
||||
方便将非阻塞管道纳入异步 IO 处理中
|
||||
|
||||
574) 2019.6.10
|
||||
574.1) feature: http_aclient 类增加自动解压功能
|
||||
|
@ -65,6 +65,17 @@ public:
|
||||
*/
|
||||
aio_istream(aio_handle* handle);
|
||||
|
||||
/**
|
||||
* 构造函数,创建异步读流对象,并 hook 读过程及关闭/超时过程
|
||||
* @param handle {aio_handle*} 异步事件引擎句柄
|
||||
* @param fd {int} 连接套接口句柄
|
||||
*/
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
aio_istream(aio_handle* handle, SOCKET fd);
|
||||
#else
|
||||
aio_istream(aio_handle* handle, int fd);
|
||||
#endif
|
||||
|
||||
/**
|
||||
* 添加异可读时的回调类对象指针,如果该回调类对象已经存在,则只是
|
||||
* 使该对象处于打开可用状态
|
||||
|
@ -65,6 +65,17 @@ public:
|
||||
*/
|
||||
aio_ostream(aio_handle* handle);
|
||||
|
||||
/**
|
||||
* 构造函数,创建异步写流对象,并 hook 写过程及关闭/超时过程
|
||||
* @param handle {aio_handle*} 异步事件引擎句柄
|
||||
* @param fd {int} 连接套接口句柄
|
||||
*/
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
aio_ostream(aio_handle* handle, SOCKET fd);
|
||||
#else
|
||||
aio_ostream(aio_handle* handle, int fd);
|
||||
#endif
|
||||
|
||||
/**
|
||||
* 添加异可写时的回调类对象指针,如果该回调类对象已经存在,则只是
|
||||
* 使该对象处于打开可用状态
|
||||
|
@ -54,7 +54,7 @@ public:
|
||||
/**
|
||||
* 构造函数,创建网络异步客户端流,并 hook 读写过程及关闭/超时过程
|
||||
* @param handle {aio_handle*} 异步引擎句柄
|
||||
* @param fd {ACL_SOCKET} Á¬½ÓÌ×½Ó¿Ú¾ä±ú
|
||||
* @param fd {int} Á¬½ÓÌ×½Ó¿Ú¾ä±ú
|
||||
*/
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
aio_socket_stream(aio_handle* handle, SOCKET fd);
|
||||
|
@ -2,9 +2,99 @@
|
||||
#include <assert.h>
|
||||
#include <getopt.h>
|
||||
#include <unistd.h>
|
||||
//#include "lib_acl.h"
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include "lib_acl.h"
|
||||
#include "acl_cpp/lib_acl.hpp"
|
||||
|
||||
static acl::atomic_long __refer = 2;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class pipe_reader : public acl::aio_callback
|
||||
{
|
||||
public:
|
||||
pipe_reader(acl::aio_handle& handle, int fd)
|
||||
: handle_(handle)
|
||||
{
|
||||
in_ = new acl::aio_istream(&handle, fd);
|
||||
}
|
||||
|
||||
void start(void)
|
||||
{
|
||||
in_->add_read_callback(this);
|
||||
in_->add_close_callback(this);
|
||||
in_->read();
|
||||
}
|
||||
|
||||
protected:
|
||||
// @override
|
||||
bool read_callback(char* data, int len)
|
||||
{
|
||||
const char* prompt = "reader->";
|
||||
(void) write(1, prompt, strlen(prompt));
|
||||
(void) write(1, data, len);
|
||||
return true;
|
||||
}
|
||||
|
||||
// @override
|
||||
void close_callback(void)
|
||||
{
|
||||
printf("reader->being closed!\r\n");
|
||||
fflush(stdout);
|
||||
delete this;
|
||||
}
|
||||
|
||||
private:
|
||||
acl::aio_handle& handle_;
|
||||
acl::aio_istream* in_;
|
||||
|
||||
~pipe_reader(void)
|
||||
{
|
||||
printf("reader be deleted!\r\n");
|
||||
|
||||
if (--__refer == 0) {
|
||||
printf("%s: stop aio engine now!\r\n", __FUNCTION__);
|
||||
handle_.stop();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class pipe_writer : public acl::thread
|
||||
{
|
||||
public:
|
||||
pipe_writer(int fd)
|
||||
{
|
||||
out_ = new acl::socket_stream;
|
||||
out_->open(fd);
|
||||
}
|
||||
|
||||
protected:
|
||||
// @override
|
||||
void* run(void)
|
||||
{
|
||||
for (int i = 0; i < 5; i++) {
|
||||
sleep(1);
|
||||
out_->write("hello world!\r\n");
|
||||
}
|
||||
|
||||
delete this;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
private:
|
||||
acl::socket_stream* out_;
|
||||
|
||||
~pipe_writer(void)
|
||||
{
|
||||
delete out_;
|
||||
}
|
||||
};
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class http_aio_client : public acl::http_aclient
|
||||
{
|
||||
public:
|
||||
@ -20,8 +110,11 @@ public:
|
||||
|
||||
~http_aio_client(void)
|
||||
{
|
||||
printf("delete http_aio_client and begin stop aio engine\r\n");
|
||||
handle_.stop();
|
||||
printf("delete http_aio_client!\r\n");
|
||||
if (--__refer == 0) {
|
||||
printf("%s: stop aio engine now!\r\n", __FUNCTION__);
|
||||
handle_.stop();
|
||||
}
|
||||
}
|
||||
|
||||
http_aio_client& enable_debug(bool on)
|
||||
@ -283,6 +376,20 @@ int main(int argc, char* argv[])
|
||||
// 定义 AIO 事件引擎
|
||||
acl::aio_handle handle(acl::ENGINE_KERNEL);
|
||||
|
||||
int fds[2];
|
||||
int ret = acl_sane_socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
|
||||
if (ret < 0) {
|
||||
printf("acl_sane_socketpair error %s\r\n", acl::last_serror());
|
||||
return 1;
|
||||
}
|
||||
|
||||
pipe_reader* in = new pipe_reader(handle, fds[0]);
|
||||
pipe_writer* out = new pipe_writer(fds[1]);
|
||||
|
||||
in->start();
|
||||
out->start();
|
||||
|
||||
|
||||
// 设置 DNS 域名服务器地址
|
||||
handle.set_dns(name_server.c_str(), 5);
|
||||
|
||||
|
@ -8,18 +8,19 @@ namespace acl
|
||||
|
||||
void aio_timer_reader::timer_callback(unsigned int id acl_unused)
|
||||
{
|
||||
if (in_ == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
in_->timer_reader_ = NULL;
|
||||
if (delay_gets_)
|
||||
{
|
||||
if (delay_gets_) {
|
||||
int timeout = delay_timeout_;
|
||||
bool nonl = delay_nonl_;
|
||||
bool nonl = delay_nonl_;
|
||||
|
||||
in_->gets(timeout, nonl, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
int timeout = delay_timeout_;
|
||||
int count = delay_count_;
|
||||
int count = delay_count_;
|
||||
|
||||
in_->read(count, timeout, 0);
|
||||
}
|
||||
@ -32,20 +33,39 @@ aio_istream::aio_istream(aio_handle* handle)
|
||||
, timer_reader_(NULL)
|
||||
, read_hooked_(false)
|
||||
{
|
||||
}
|
||||
|
||||
aio_istream::aio_istream(aio_handle* handle, ACL_SOCKET fd)
|
||||
: aio_stream(handle)
|
||||
, timer_reader_(NULL)
|
||||
, read_hooked_(false)
|
||||
{
|
||||
acl_assert(handle);
|
||||
|
||||
ACL_VSTREAM* vstream = acl_vstream_fdopen(fd, O_RDWR, 8192, 0,
|
||||
ACL_VSTREAM_TYPE_SOCK);
|
||||
stream_ = acl_aio_open(handle->get_handle(), vstream);
|
||||
|
||||
// 调用基类的 hook_error 以向 handle 中增加异步流计数,
|
||||
// 同时 hook 关闭及超时回调过程
|
||||
hook_error();
|
||||
|
||||
// 只有当流连接成功后才可 hook IO 读写状态
|
||||
// hook 读回调过程
|
||||
hook_read();
|
||||
}
|
||||
|
||||
aio_istream::~aio_istream()
|
||||
{
|
||||
if (timer_reader_ != NULL)
|
||||
{
|
||||
if (timer_reader_ != NULL) {
|
||||
handle_->del_timer(timer_reader_);
|
||||
timer_reader_->destroy();
|
||||
}
|
||||
|
||||
std::list<AIO_CALLBACK*>::iterator it = read_callbacks_.begin();
|
||||
for (; it != read_callbacks_.end(); ++it)
|
||||
for (; it != read_callbacks_.end(); ++it) {
|
||||
acl_myfree((*it));
|
||||
}
|
||||
read_callbacks_.clear();
|
||||
}
|
||||
|
||||
@ -60,22 +80,19 @@ void aio_istream::add_read_callback(aio_callback* callback)
|
||||
|
||||
// 先查询该回调对象已经存在
|
||||
std::list<AIO_CALLBACK*>::iterator it = read_callbacks_.begin();
|
||||
for (; it != read_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->callback == callback)
|
||||
{
|
||||
if ((*it)->enable == false)
|
||||
for (; it != read_callbacks_.end(); ++it) {
|
||||
if ((*it)->callback == callback) {
|
||||
if ((*it)->enable == false) {
|
||||
(*it)->enable = true;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 找一个空位
|
||||
it = read_callbacks_.begin();
|
||||
for (; it != read_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->callback == NULL)
|
||||
{
|
||||
for (; it != read_callbacks_.end(); ++it) {
|
||||
if ((*it)->callback == NULL) {
|
||||
(*it)->enable = true;
|
||||
(*it)->callback = callback;
|
||||
return;
|
||||
@ -83,9 +100,8 @@ void aio_istream::add_read_callback(aio_callback* callback)
|
||||
}
|
||||
|
||||
// 分配一个新的位置
|
||||
AIO_CALLBACK* ac = (AIO_CALLBACK*)
|
||||
acl_mycalloc(1, sizeof(AIO_CALLBACK));
|
||||
ac->enable = true;
|
||||
AIO_CALLBACK* ac = (AIO_CALLBACK*) acl_mycalloc(1, sizeof(AIO_CALLBACK));
|
||||
ac->enable = true;
|
||||
ac->callback = callback;
|
||||
|
||||
// 添加进回调对象队列中
|
||||
@ -97,23 +113,20 @@ int aio_istream::del_read_callback(aio_callback* callback /* = NULL */)
|
||||
std::list<AIO_CALLBACK*>::iterator it = read_callbacks_.begin();
|
||||
int n = 0;
|
||||
|
||||
if (callback == NULL)
|
||||
{
|
||||
for (; it != read_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->callback == NULL)
|
||||
if (callback == NULL) {
|
||||
for (; it != read_callbacks_.end(); ++it) {
|
||||
if ((*it)->callback == NULL) {
|
||||
continue;
|
||||
}
|
||||
(*it)->enable = false;
|
||||
(*it)->callback = NULL;
|
||||
n++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (; it != read_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->callback != callback)
|
||||
} else {
|
||||
for (; it != read_callbacks_.end(); ++it) {
|
||||
if ((*it)->callback != callback) {
|
||||
continue;
|
||||
}
|
||||
(*it)->enable = false;
|
||||
(*it)->callback = NULL;
|
||||
n++;
|
||||
@ -129,22 +142,19 @@ int aio_istream::disable_read_callback(aio_callback* callback)
|
||||
std::list<AIO_CALLBACK*>::iterator it = read_callbacks_.begin();
|
||||
int n = 0;
|
||||
|
||||
if (callback == NULL)
|
||||
{
|
||||
for (; it != read_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->callback == NULL || !(*it)->enable)
|
||||
if (callback == NULL) {
|
||||
for (; it != read_callbacks_.end(); ++it) {
|
||||
if ((*it)->callback == NULL || !(*it)->enable) {
|
||||
continue;
|
||||
}
|
||||
(*it)->enable = false;
|
||||
n++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (; it != read_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->callback != callback || !(*it)->enable)
|
||||
} else {
|
||||
for (; it != read_callbacks_.end(); ++it) {
|
||||
if ((*it)->callback != callback || !(*it)->enable) {
|
||||
continue;
|
||||
}
|
||||
(*it)->enable = false;
|
||||
n++;
|
||||
break;
|
||||
@ -159,23 +169,16 @@ int aio_istream::enable_read_callback(aio_callback* callback /* = NULL */)
|
||||
std::list<AIO_CALLBACK*>::iterator it = read_callbacks_.begin();
|
||||
int n = 0;
|
||||
|
||||
if (callback == NULL)
|
||||
{
|
||||
for (; it != read_callbacks_.end(); ++it)
|
||||
{
|
||||
if (!(*it)->enable && (*it)->callback != NULL)
|
||||
{
|
||||
if (callback == NULL) {
|
||||
for (; it != read_callbacks_.end(); ++it) {
|
||||
if (!(*it)->enable && (*it)->callback != NULL) {
|
||||
(*it)->enable = true;
|
||||
n++;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (; it != read_callbacks_.end(); ++it)
|
||||
{
|
||||
if (!(*it)->enable && (*it)->callback == callback)
|
||||
{
|
||||
} else {
|
||||
for (; it != read_callbacks_.end(); ++it) {
|
||||
if (!(*it)->enable && (*it)->callback == callback) {
|
||||
(*it)->enable = true;
|
||||
n++;
|
||||
}
|
||||
@ -188,16 +191,11 @@ int aio_istream::enable_read_callback(aio_callback* callback /* = NULL */)
|
||||
void aio_istream::hook_read()
|
||||
{
|
||||
acl_assert(stream_);
|
||||
if (read_hooked_)
|
||||
if (read_hooked_) {
|
||||
return;
|
||||
}
|
||||
read_hooked_ = true;
|
||||
|
||||
/*
|
||||
acl_aio_ctl(stream_,
|
||||
ACL_AIO_CTL_READ_HOOK_ADD, read_callback, this,
|
||||
ACL_AIO_CTL_CTX, this,
|
||||
ACL_AIO_CTL_END);
|
||||
*/
|
||||
acl_aio_add_read_hook(stream_, read_callback, this);
|
||||
}
|
||||
|
||||
@ -235,36 +233,32 @@ int aio_istream::get_buf_max(void) const
|
||||
void aio_istream::gets(int timeout /* = 0 */, bool nonl /* = true */,
|
||||
acl_int64 delay /* = 0 */, aio_timer_reader* callback /* = NULL */)
|
||||
{
|
||||
if (delay > 0)
|
||||
{
|
||||
if (delay > 0) {
|
||||
// 设置新的或重置读延迟定时器
|
||||
|
||||
disable_read();
|
||||
|
||||
if (callback != NULL)
|
||||
{
|
||||
if (timer_reader_ != NULL)
|
||||
{
|
||||
if (callback != NULL) {
|
||||
if (timer_reader_ != NULL) {
|
||||
handle_->del_timer(timer_reader_);
|
||||
timer_reader_->destroy();
|
||||
}
|
||||
timer_reader_= callback;
|
||||
}
|
||||
|
||||
if (timer_reader_ == NULL)
|
||||
if (timer_reader_ == NULL) {
|
||||
timer_reader_ = NEW aio_timer_reader();
|
||||
}
|
||||
// 设置 timer_reader_ 对象的成员变量
|
||||
timer_reader_->in_ = this;
|
||||
timer_reader_->delay_gets_ = true;
|
||||
timer_reader_->in_ = this;
|
||||
timer_reader_->delay_gets_ = true;
|
||||
timer_reader_->delay_timeout_ = timeout;
|
||||
timer_reader_->delay_nonl_ = nonl;
|
||||
timer_reader_->delay_nonl_ = nonl;
|
||||
|
||||
// 设置异步读定时器
|
||||
handle_->set_timer(timer_reader_, delay);
|
||||
return;
|
||||
}
|
||||
else if (timer_reader_ != NULL)
|
||||
{
|
||||
} else if (timer_reader_ != NULL) {
|
||||
// 立即取消之前设置的异步读定时器
|
||||
handle_->del_timer(timer_reader_);
|
||||
timer_reader_->destroy();
|
||||
@ -272,48 +266,45 @@ void aio_istream::gets(int timeout /* = 0 */, bool nonl /* = true */,
|
||||
}
|
||||
|
||||
// 设置流的异步读超时时间
|
||||
if (timeout >= 0)
|
||||
if (timeout >= 0)
|
||||
ACL_AIO_SET_TIMEOUT(stream_, timeout);
|
||||
if (nonl)
|
||||
if (timeout >= 0) {
|
||||
ACL_AIO_SET_TIMEOUT(stream_, timeout);
|
||||
}
|
||||
if (nonl) {
|
||||
acl_aio_gets_nonl(stream_);
|
||||
else
|
||||
} else {
|
||||
acl_aio_gets(stream_);
|
||||
}
|
||||
}
|
||||
|
||||
void aio_istream::read(int count /* = 0 */, int timeout /* = 0 */,
|
||||
acl_int64 delay /* = 0 */, aio_timer_reader* callback /* = NULL */)
|
||||
{
|
||||
if (delay > 0)
|
||||
{
|
||||
if (delay > 0) {
|
||||
// 设置新的或重置读延迟定时器
|
||||
|
||||
disable_read();
|
||||
|
||||
if (callback != NULL)
|
||||
{
|
||||
if (timer_reader_ != NULL)
|
||||
{
|
||||
if (callback != NULL) {
|
||||
if (timer_reader_ != NULL) {
|
||||
handle_->del_timer(timer_reader_);
|
||||
timer_reader_->destroy();
|
||||
}
|
||||
timer_reader_= callback;
|
||||
}
|
||||
|
||||
if (timer_reader_ == NULL)
|
||||
if (timer_reader_ == NULL) {
|
||||
timer_reader_ = NEW aio_timer_reader();
|
||||
}
|
||||
// 设置 timer_reader_ 对象的成员变量
|
||||
timer_reader_->in_ = this;
|
||||
timer_reader_->delay_gets_ = false;
|
||||
timer_reader_->in_ = this;
|
||||
timer_reader_->delay_gets_ = false;
|
||||
timer_reader_->delay_timeout_ = timeout;
|
||||
timer_reader_->delay_count_ = count;
|
||||
timer_reader_->delay_count_ = count;
|
||||
|
||||
// 设置异步读定时器
|
||||
handle_->set_timer(timer_reader_, delay);
|
||||
return;
|
||||
}
|
||||
else if (timer_reader_ != NULL)
|
||||
{
|
||||
} else if (timer_reader_ != NULL) {
|
||||
// 立即取消之前设置的异步读定时器
|
||||
handle_->del_timer(timer_reader_);
|
||||
timer_reader_->destroy();
|
||||
@ -321,19 +312,22 @@ void aio_istream::read(int count /* = 0 */, int timeout /* = 0 */,
|
||||
}
|
||||
|
||||
// 设置流的异步读超时时间
|
||||
if (timeout >= 0)
|
||||
if (timeout >= 0) {
|
||||
ACL_AIO_SET_TIMEOUT(stream_, timeout);
|
||||
if (count > 0)
|
||||
}
|
||||
if (count > 0) {
|
||||
acl_aio_readn(stream_, count);
|
||||
else
|
||||
} else {
|
||||
acl_aio_read(stream_);
|
||||
}
|
||||
}
|
||||
|
||||
void aio_istream::read_wait(int timeout /* = 0 */)
|
||||
{
|
||||
// 设置流的异步读超时时间
|
||||
if (timeout >= 0)
|
||||
if (timeout >= 0) {
|
||||
ACL_AIO_SET_TIMEOUT(stream_, timeout);
|
||||
}
|
||||
acl_aio_enable_read(stream_, read_wakeup, this);
|
||||
}
|
||||
|
||||
@ -342,13 +336,14 @@ int aio_istream::read_callback(ACL_ASTREAM* stream acl_unused, void* ctx,
|
||||
{
|
||||
aio_istream* in = (aio_istream*) ctx;
|
||||
std::list<AIO_CALLBACK*>::iterator it = in->read_callbacks_.begin();
|
||||
for (; it != in->read_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->enable == false || (*it)->callback == NULL)
|
||||
for (; it != in->read_callbacks_.end(); ++it) {
|
||||
if ((*it)->enable == false || (*it)->callback == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((*it)->callback->read_callback(data, len) == false)
|
||||
if ((*it)->callback->read_callback(data, len) == false) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@ -357,13 +352,14 @@ int aio_istream::read_wakeup(ACL_ASTREAM* stream acl_unused, void* ctx)
|
||||
{
|
||||
aio_istream* in = (aio_istream*) ctx;
|
||||
std::list<AIO_CALLBACK*>::iterator it = in->read_callbacks_.begin();
|
||||
for (; it != in->read_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->enable == false || (*it)->callback == NULL)
|
||||
for (; it != in->read_callbacks_.end(); ++it) {
|
||||
if ((*it)->enable == false || (*it)->callback == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((*it)->callback->read_wakeup() == false)
|
||||
if ((*it)->callback->read_wakeup() == false) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -20,24 +20,24 @@ aio_timer_writer::~aio_timer_writer()
|
||||
|
||||
void aio_timer_writer::timer_callback(unsigned int id acl_unused)
|
||||
{
|
||||
if (out_ == NULL)
|
||||
if (out_ == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
bool findit = false;
|
||||
std::list<aio_timer_writer*>::iterator it =
|
||||
out_->timer_writers_.begin();
|
||||
for (; it != out_->timer_writers_.end(); ++it)
|
||||
{
|
||||
if ((*it) == this)
|
||||
{
|
||||
for (; it != out_->timer_writers_.end(); ++it) {
|
||||
if ((*it) == this) {
|
||||
out_->timer_writers_.erase(it);
|
||||
findit = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (findit == false)
|
||||
if (findit == false) {
|
||||
logger_warn("Warning: timer_writer is the end!");
|
||||
}
|
||||
|
||||
out_->write(buf_.c_str(), (int) buf_.length(), 0, NULL);
|
||||
}
|
||||
@ -45,26 +45,44 @@ void aio_timer_writer::timer_callback(unsigned int id acl_unused)
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
|
||||
aio_ostream::aio_ostream(aio_handle* handle)
|
||||
: aio_stream(handle)
|
||||
, write_hooked_(false)
|
||||
: aio_stream(handle)
|
||||
, write_hooked_(false)
|
||||
{
|
||||
}
|
||||
|
||||
aio_ostream::aio_ostream(aio_handle* handle, ACL_SOCKET fd)
|
||||
: aio_stream(handle)
|
||||
, write_hooked_(false)
|
||||
{
|
||||
acl_assert(handle);
|
||||
|
||||
ACL_VSTREAM* vstream = acl_vstream_fdopen(fd, O_RDWR, 8192, 0,
|
||||
ACL_VSTREAM_TYPE_SOCK);
|
||||
stream_ = acl_aio_open(handle->get_handle(), vstream);
|
||||
|
||||
// 调用基类的 hook_error 以向 handle 中增加异步流计数,
|
||||
// 同时 hook 关闭及超时回调过程
|
||||
hook_error();
|
||||
|
||||
// 只有当流连接成功后才可 hook IO 写状态
|
||||
// hook 写回调过程
|
||||
hook_write();
|
||||
}
|
||||
|
||||
aio_ostream::~aio_ostream()
|
||||
{
|
||||
std::list<aio_timer_writer*>::iterator it = timer_writers_.begin();
|
||||
|
||||
for (; it != timer_writers_.end(); ++it)
|
||||
{
|
||||
for (; it != timer_writers_.end(); ++it) {
|
||||
handle_->del_timer(*it);
|
||||
(*it)->destroy();
|
||||
}
|
||||
timer_writers_.clear();
|
||||
|
||||
std::list<AIO_CALLBACK*>::iterator it2 = write_callbacks_.begin();
|
||||
for (; it2 != write_callbacks_.end(); ++it2)
|
||||
for (; it2 != write_callbacks_.end(); ++it2) {
|
||||
acl_myfree((*it2));
|
||||
}
|
||||
write_callbacks_.clear();
|
||||
}
|
||||
|
||||
@ -79,32 +97,28 @@ void aio_ostream::add_write_callback(aio_callback* callback)
|
||||
|
||||
// 先查询该回调对象已经存在
|
||||
std::list<AIO_CALLBACK*>::iterator it = write_callbacks_.begin();
|
||||
for (; it != write_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->callback == callback)
|
||||
{
|
||||
if ((*it)->enable == false)
|
||||
for (; it != write_callbacks_.end(); ++it) {
|
||||
if ((*it)->callback == callback) {
|
||||
if ((*it)->enable == false) {
|
||||
(*it)->enable = true;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 找一个空位
|
||||
it = write_callbacks_.begin();
|
||||
for (; it != write_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->callback == NULL)
|
||||
{
|
||||
(*it)->enable = true;
|
||||
for (; it != write_callbacks_.end(); ++it) {
|
||||
if ((*it)->callback == NULL) {
|
||||
(*it)->enable = true;
|
||||
(*it)->callback = callback;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 分配一个新的位置
|
||||
AIO_CALLBACK* ac = (AIO_CALLBACK*)
|
||||
acl_mycalloc(1, sizeof(AIO_CALLBACK));
|
||||
ac->enable = true;
|
||||
AIO_CALLBACK* ac = (AIO_CALLBACK*) acl_mycalloc(1, sizeof(AIO_CALLBACK));
|
||||
ac->enable = true;
|
||||
ac->callback = callback;
|
||||
|
||||
// 添加进回调对象队列中
|
||||
@ -116,24 +130,21 @@ int aio_ostream::del_write_callback(aio_callback* callback)
|
||||
std::list<AIO_CALLBACK*>::iterator it = write_callbacks_.begin();
|
||||
int n = 0;
|
||||
|
||||
if (callback == NULL)
|
||||
{
|
||||
for (; it != write_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->callback == NULL)
|
||||
if (callback == NULL) {
|
||||
for (; it != write_callbacks_.end(); ++it) {
|
||||
if ((*it)->callback == NULL) {
|
||||
continue;
|
||||
(*it)->enable = false;
|
||||
}
|
||||
(*it)->enable = false;
|
||||
(*it)->callback = NULL;
|
||||
n++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (; it != write_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->callback != callback)
|
||||
} else {
|
||||
for (; it != write_callbacks_.end(); ++it) {
|
||||
if ((*it)->callback != callback) {
|
||||
continue;
|
||||
(*it)->enable = false;
|
||||
}
|
||||
(*it)->enable = false;
|
||||
(*it)->callback = NULL;
|
||||
n++;
|
||||
break;
|
||||
@ -148,22 +159,19 @@ int aio_ostream::disable_write_callback(aio_callback* callback)
|
||||
std::list<AIO_CALLBACK*>::iterator it = write_callbacks_.begin();
|
||||
int n = 0;
|
||||
|
||||
if (callback == NULL)
|
||||
{
|
||||
for (; it != write_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->callback == NULL || !(*it)->enable)
|
||||
if (callback == NULL) {
|
||||
for (; it != write_callbacks_.end(); ++it) {
|
||||
if ((*it)->callback == NULL || !(*it)->enable) {
|
||||
continue;
|
||||
}
|
||||
(*it)->enable = false;
|
||||
n++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (; it != write_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->callback != callback || !(*it)->enable)
|
||||
} else {
|
||||
for (; it != write_callbacks_.end(); ++it) {
|
||||
if ((*it)->callback != callback || !(*it)->enable) {
|
||||
continue;
|
||||
}
|
||||
(*it)->enable = false;
|
||||
n++;
|
||||
break;
|
||||
@ -178,23 +186,16 @@ int aio_ostream::enable_write_callback(aio_callback* callback /* = NULL */)
|
||||
std::list<AIO_CALLBACK*>::iterator it = write_callbacks_.begin();
|
||||
int n = 0;
|
||||
|
||||
if (callback == NULL)
|
||||
{
|
||||
for (; it != write_callbacks_.end(); ++it)
|
||||
{
|
||||
if (!(*it)->enable && (*it)->callback != NULL)
|
||||
{
|
||||
if (callback == NULL) {
|
||||
for (; it != write_callbacks_.end(); ++it) {
|
||||
if (!(*it)->enable && (*it)->callback != NULL) {
|
||||
(*it)->enable = true;
|
||||
n++;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (; it != write_callbacks_.end(); ++it)
|
||||
{
|
||||
if (!(*it)->enable && (*it)->callback == callback)
|
||||
{
|
||||
} else {
|
||||
for (; it != write_callbacks_.end(); ++it) {
|
||||
if (!(*it)->enable && (*it)->callback == callback) {
|
||||
(*it)->enable = true;
|
||||
n++;
|
||||
}
|
||||
@ -207,15 +208,11 @@ int aio_ostream::enable_write_callback(aio_callback* callback /* = NULL */)
|
||||
void aio_ostream::hook_write()
|
||||
{
|
||||
acl_assert(stream_);
|
||||
if (write_hooked_)
|
||||
if (write_hooked_) {
|
||||
return;
|
||||
}
|
||||
write_hooked_ = true;
|
||||
|
||||
/*
|
||||
acl_aio_ctl(stream_,
|
||||
ACL_AIO_CTL_WRITE_HOOK_ADD, write_callback, this,
|
||||
ACL_AIO_CTL_END);
|
||||
*/
|
||||
acl_aio_add_write_hook(stream_, write_callback, this);
|
||||
}
|
||||
|
||||
@ -229,16 +226,16 @@ void aio_ostream::write(const void* data, int len,
|
||||
acl_int64 delay /* = 0 */,
|
||||
aio_timer_writer* callback /* = NULL */)
|
||||
{
|
||||
if (delay > 0)
|
||||
{
|
||||
if (delay > 0) {
|
||||
disable_write();
|
||||
|
||||
aio_timer_writer* timer_writer;
|
||||
|
||||
if (callback != NULL)
|
||||
if (callback != NULL) {
|
||||
timer_writer= callback;
|
||||
else
|
||||
} else {
|
||||
timer_writer = NEW aio_timer_writer();
|
||||
}
|
||||
|
||||
// 设置 timer_writer_ 对象的成员变量
|
||||
timer_writer->out_ = this;
|
||||
@ -271,8 +268,9 @@ void aio_ostream::vformat(const char* fmt, va_list ap)
|
||||
void aio_ostream::write_wait(int timeout /* = 0 */)
|
||||
{
|
||||
// 设置流的异步读超时时间
|
||||
if (timeout >= 0)
|
||||
if (timeout >= 0) {
|
||||
ACL_AIO_SET_TIMEOUT(stream_, timeout);
|
||||
}
|
||||
acl_aio_enable_write(stream_, write_wakup, this);
|
||||
}
|
||||
|
||||
@ -280,13 +278,14 @@ int aio_ostream::write_callback(ACL_ASTREAM* stream acl_unused, void* ctx)
|
||||
{
|
||||
aio_ostream* aos = (aio_ostream*) ctx;
|
||||
std::list<AIO_CALLBACK*>::iterator it = aos->write_callbacks_.begin();
|
||||
for (; it != aos->write_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->enable == false || (*it)->callback == NULL)
|
||||
for (; it != aos->write_callbacks_.end(); ++it) {
|
||||
if ((*it)->enable == false || (*it)->callback == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((*it)->callback->write_callback() == false)
|
||||
if ((*it)->callback->write_callback() == false) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@ -295,13 +294,14 @@ int aio_ostream::write_wakup(ACL_ASTREAM* stream acl_unused, void* ctx)
|
||||
{
|
||||
aio_ostream* out = (aio_ostream*) ctx;
|
||||
std::list<AIO_CALLBACK*>::iterator it = out->write_callbacks_.begin();
|
||||
for (; it != out->write_callbacks_.end(); ++it)
|
||||
{
|
||||
if ((*it)->enable == false || (*it)->callback == NULL)
|
||||
for (; it != out->write_callbacks_.end(); ++it) {
|
||||
if ((*it)->enable == false || (*it)->callback == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((*it)->callback->write_wakeup() == false)
|
||||
if ((*it)->callback->write_wakeup() == false) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ aio_socket_stream::aio_socket_stream(aio_handle* handle, ACL_SOCKET fd)
|
||||
// 同时 hook 关闭及超时回调过程
|
||||
hook_error();
|
||||
|
||||
// 只有当流连接成功后才可 hook IO 读写状态
|
||||
// 只有当流连接成功后才可 hook IO 读状态
|
||||
// hook 读回调过程
|
||||
hook_read();
|
||||
|
||||
|
@ -32,6 +32,7 @@ aio_stream::~aio_stream(void)
|
||||
handle_->decrease();
|
||||
acl_aio_iocp_close(stream_);
|
||||
}
|
||||
|
||||
std::list<AIO_CALLBACK*>::iterator it = close_callbacks_.begin();
|
||||
for (; it != close_callbacks_.end(); ++it) {
|
||||
acl_myfree((*it));
|
||||
|
Loading…
Reference in New Issue
Block a user