From 55e29c55b357ee61417650e2414f929e41157810 Mon Sep 17 00:00:00 2001 From: zhengshuxin Date: Tue, 11 Jun 2019 23:16:03 +0800 Subject: [PATCH] aio_istream/aio_ostream can be created with socket fd. --- lib_acl_cpp/changes.txt | 2 + .../include/acl_cpp/stream/aio_istream.hpp | 11 + .../include/acl_cpp/stream/aio_ostream.hpp | 11 + .../acl_cpp/stream/aio_socket_stream.hpp | 2 +- .../samples/aio/http_aclient_ssl/main.cpp | 113 +++++++++- lib_acl_cpp/src/stream/aio_istream.cpp | 210 +++++++++--------- lib_acl_cpp/src/stream/aio_ostream.cpp | 160 ++++++------- lib_acl_cpp/src/stream/aio_socket_stream.cpp | 2 +- lib_acl_cpp/src/stream/aio_stream.cpp | 1 + 9 files changed, 320 insertions(+), 192 deletions(-) diff --git a/lib_acl_cpp/changes.txt b/lib_acl_cpp/changes.txt index ac450df17..4e3afc6e4 100644 --- a/lib_acl_cpp/changes.txt +++ b/lib_acl_cpp/changes.txt @@ -3,6 +3,8 @@ 575) 2019.6.11 575.1) bugfix: websocket::set_frame_pong 中没有设置 payload length 575.2) bugfix: polarssl_io.cpp 中针对静态编译缺少函数声明,导致编译有错 +575.3) feature: aio_istream/aio_ostream 分别提供输入参数为套接字句柄的构造方法, +方便将非阻塞管道纳入异步 IO 处理中 574) 2019.6.10 574.1) feature: http_aclient 类增加自动解压功能 diff --git a/lib_acl_cpp/include/acl_cpp/stream/aio_istream.hpp b/lib_acl_cpp/include/acl_cpp/stream/aio_istream.hpp index 8d57d6c7b..abe751706 100644 --- a/lib_acl_cpp/include/acl_cpp/stream/aio_istream.hpp +++ b/lib_acl_cpp/include/acl_cpp/stream/aio_istream.hpp @@ -65,6 +65,17 @@ public: */ aio_istream(aio_handle* handle); + /** + * 构造函数,创建异步读流对象,并 hook 读过程及关闭/超时过程 + * @param handle {aio_handle*} 异步事件引擎句柄 + * @param fd {int} 连接套接口句柄 + */ +#if defined(_WIN32) || defined(_WIN64) + aio_istream(aio_handle* handle, SOCKET fd); +#else + aio_istream(aio_handle* handle, int fd); +#endif + /** * 添加异可读时的回调类对象指针,如果该回调类对象已经存在,则只是 * 使该对象处于打开可用状态 diff --git a/lib_acl_cpp/include/acl_cpp/stream/aio_ostream.hpp b/lib_acl_cpp/include/acl_cpp/stream/aio_ostream.hpp index 4141b946a..f72391121 100644 --- a/lib_acl_cpp/include/acl_cpp/stream/aio_ostream.hpp +++ b/lib_acl_cpp/include/acl_cpp/stream/aio_ostream.hpp @@ -65,6 +65,17 @@ public: */ aio_ostream(aio_handle* handle); + /** + * 构造函数,创建异步写流对象,并 hook 写过程及关闭/超时过程 + * @param handle {aio_handle*} 异步事件引擎句柄 + * @param fd {int} 连接套接口句柄 + */ +#if defined(_WIN32) || defined(_WIN64) + aio_ostream(aio_handle* handle, SOCKET fd); +#else + aio_ostream(aio_handle* handle, int fd); +#endif + /** * 添加异可写时的回调类对象指针,如果该回调类对象已经存在,则只是 * 使该对象处于打开可用状态 diff --git a/lib_acl_cpp/include/acl_cpp/stream/aio_socket_stream.hpp b/lib_acl_cpp/include/acl_cpp/stream/aio_socket_stream.hpp index 395241a2e..e472bf224 100644 --- a/lib_acl_cpp/include/acl_cpp/stream/aio_socket_stream.hpp +++ b/lib_acl_cpp/include/acl_cpp/stream/aio_socket_stream.hpp @@ -54,7 +54,7 @@ public: /** * 构造函数,创建网络异步客户端流,并 hook 读写过程及关闭/超时过程 * @param handle {aio_handle*} 异步引擎句柄 - * @param fd {ACL_SOCKET} 连接套接口句柄 + * @param fd {int} 连接套接口句柄 */ #if defined(_WIN32) || defined(_WIN64) aio_socket_stream(aio_handle* handle, SOCKET fd); diff --git a/lib_acl_cpp/samples/aio/http_aclient_ssl/main.cpp b/lib_acl_cpp/samples/aio/http_aclient_ssl/main.cpp index 165866960..90f78f368 100644 --- a/lib_acl_cpp/samples/aio/http_aclient_ssl/main.cpp +++ b/lib_acl_cpp/samples/aio/http_aclient_ssl/main.cpp @@ -2,9 +2,99 @@ #include #include #include -//#include "lib_acl.h" +#include +#include +#include "lib_acl.h" #include "acl_cpp/lib_acl.hpp" +static acl::atomic_long __refer = 2; + +////////////////////////////////////////////////////////////////////////////// + +class pipe_reader : public acl::aio_callback +{ +public: + pipe_reader(acl::aio_handle& handle, int fd) + : handle_(handle) + { + in_ = new acl::aio_istream(&handle, fd); + } + + void start(void) + { + in_->add_read_callback(this); + in_->add_close_callback(this); + in_->read(); + } + +protected: + // @override + bool read_callback(char* data, int len) + { + const char* prompt = "reader->"; + (void) write(1, prompt, strlen(prompt)); + (void) write(1, data, len); + return true; + } + + // @override + void close_callback(void) + { + printf("reader->being closed!\r\n"); + fflush(stdout); + delete this; + } + +private: + acl::aio_handle& handle_; + acl::aio_istream* in_; + + ~pipe_reader(void) + { + printf("reader be deleted!\r\n"); + + if (--__refer == 0) { + printf("%s: stop aio engine now!\r\n", __FUNCTION__); + handle_.stop(); + } + } +}; + +////////////////////////////////////////////////////////////////////////////// + +class pipe_writer : public acl::thread +{ +public: + pipe_writer(int fd) + { + out_ = new acl::socket_stream; + out_->open(fd); + } + +protected: + // @override + void* run(void) + { + for (int i = 0; i < 5; i++) { + sleep(1); + out_->write("hello world!\r\n"); + } + + delete this; + return NULL; + } + +private: + acl::socket_stream* out_; + + ~pipe_writer(void) + { + delete out_; + } +}; + +////////////////////////////////////////////////////////////////////////////// + class http_aio_client : public acl::http_aclient { public: @@ -20,8 +110,11 @@ public: ~http_aio_client(void) { - printf("delete http_aio_client and begin stop aio engine\r\n"); - handle_.stop(); + printf("delete http_aio_client!\r\n"); + if (--__refer == 0) { + printf("%s: stop aio engine now!\r\n", __FUNCTION__); + handle_.stop(); + } } http_aio_client& enable_debug(bool on) @@ -283,6 +376,20 @@ int main(int argc, char* argv[]) // 定义 AIO 事件引擎 acl::aio_handle handle(acl::ENGINE_KERNEL); + int fds[2]; + int ret = acl_sane_socketpair(AF_UNIX, SOCK_STREAM, 0, fds); + if (ret < 0) { + printf("acl_sane_socketpair error %s\r\n", acl::last_serror()); + return 1; + } + + pipe_reader* in = new pipe_reader(handle, fds[0]); + pipe_writer* out = new pipe_writer(fds[1]); + + in->start(); + out->start(); + + // 设置 DNS 域名服务器地址 handle.set_dns(name_server.c_str(), 5); diff --git a/lib_acl_cpp/src/stream/aio_istream.cpp b/lib_acl_cpp/src/stream/aio_istream.cpp index 7e2fb8f50..32bf77055 100644 --- a/lib_acl_cpp/src/stream/aio_istream.cpp +++ b/lib_acl_cpp/src/stream/aio_istream.cpp @@ -8,18 +8,19 @@ namespace acl void aio_timer_reader::timer_callback(unsigned int id acl_unused) { + if (in_ == NULL) { + return; + } + in_->timer_reader_ = NULL; - if (delay_gets_) - { + if (delay_gets_) { int timeout = delay_timeout_; - bool nonl = delay_nonl_; + bool nonl = delay_nonl_; in_->gets(timeout, nonl, 0); - } - else - { + } else { int timeout = delay_timeout_; - int count = delay_count_; + int count = delay_count_; in_->read(count, timeout, 0); } @@ -32,20 +33,39 @@ aio_istream::aio_istream(aio_handle* handle) , timer_reader_(NULL) , read_hooked_(false) { +} +aio_istream::aio_istream(aio_handle* handle, ACL_SOCKET fd) +: aio_stream(handle) +, timer_reader_(NULL) +, read_hooked_(false) +{ + acl_assert(handle); + + ACL_VSTREAM* vstream = acl_vstream_fdopen(fd, O_RDWR, 8192, 0, + ACL_VSTREAM_TYPE_SOCK); + stream_ = acl_aio_open(handle->get_handle(), vstream); + + // 调用基类的 hook_error 以向 handle 中增加异步流计数, + // 同时 hook 关闭及超时回调过程 + hook_error(); + + // 只有当流连接成功后才可 hook IO 读写状态 + // hook 读回调过程 + hook_read(); } aio_istream::~aio_istream() { - if (timer_reader_ != NULL) - { + if (timer_reader_ != NULL) { handle_->del_timer(timer_reader_); timer_reader_->destroy(); } std::list::iterator it = read_callbacks_.begin(); - for (; it != read_callbacks_.end(); ++it) + for (; it != read_callbacks_.end(); ++it) { acl_myfree((*it)); + } read_callbacks_.clear(); } @@ -60,22 +80,19 @@ void aio_istream::add_read_callback(aio_callback* callback) // 先查询该回调对象已经存在 std::list::iterator it = read_callbacks_.begin(); - for (; it != read_callbacks_.end(); ++it) - { - if ((*it)->callback == callback) - { - if ((*it)->enable == false) + for (; it != read_callbacks_.end(); ++it) { + if ((*it)->callback == callback) { + if ((*it)->enable == false) { (*it)->enable = true; + } return; } } // 找一个空位 it = read_callbacks_.begin(); - for (; it != read_callbacks_.end(); ++it) - { - if ((*it)->callback == NULL) - { + for (; it != read_callbacks_.end(); ++it) { + if ((*it)->callback == NULL) { (*it)->enable = true; (*it)->callback = callback; return; @@ -83,9 +100,8 @@ void aio_istream::add_read_callback(aio_callback* callback) } // 分配一个新的位置 - AIO_CALLBACK* ac = (AIO_CALLBACK*) - acl_mycalloc(1, sizeof(AIO_CALLBACK)); - ac->enable = true; + AIO_CALLBACK* ac = (AIO_CALLBACK*) acl_mycalloc(1, sizeof(AIO_CALLBACK)); + ac->enable = true; ac->callback = callback; // 添加进回调对象队列中 @@ -97,23 +113,20 @@ int aio_istream::del_read_callback(aio_callback* callback /* = NULL */) std::list::iterator it = read_callbacks_.begin(); int n = 0; - if (callback == NULL) - { - for (; it != read_callbacks_.end(); ++it) - { - if ((*it)->callback == NULL) + if (callback == NULL) { + for (; it != read_callbacks_.end(); ++it) { + if ((*it)->callback == NULL) { continue; + } (*it)->enable = false; (*it)->callback = NULL; n++; } - } - else - { - for (; it != read_callbacks_.end(); ++it) - { - if ((*it)->callback != callback) + } else { + for (; it != read_callbacks_.end(); ++it) { + if ((*it)->callback != callback) { continue; + } (*it)->enable = false; (*it)->callback = NULL; n++; @@ -129,22 +142,19 @@ int aio_istream::disable_read_callback(aio_callback* callback) std::list::iterator it = read_callbacks_.begin(); int n = 0; - if (callback == NULL) - { - for (; it != read_callbacks_.end(); ++it) - { - if ((*it)->callback == NULL || !(*it)->enable) + if (callback == NULL) { + for (; it != read_callbacks_.end(); ++it) { + if ((*it)->callback == NULL || !(*it)->enable) { continue; + } (*it)->enable = false; n++; } - } - else - { - for (; it != read_callbacks_.end(); ++it) - { - if ((*it)->callback != callback || !(*it)->enable) + } else { + for (; it != read_callbacks_.end(); ++it) { + if ((*it)->callback != callback || !(*it)->enable) { continue; + } (*it)->enable = false; n++; break; @@ -159,23 +169,16 @@ int aio_istream::enable_read_callback(aio_callback* callback /* = NULL */) std::list::iterator it = read_callbacks_.begin(); int n = 0; - if (callback == NULL) - { - for (; it != read_callbacks_.end(); ++it) - { - if (!(*it)->enable && (*it)->callback != NULL) - { + if (callback == NULL) { + for (; it != read_callbacks_.end(); ++it) { + if (!(*it)->enable && (*it)->callback != NULL) { (*it)->enable = true; n++; } } - } - else - { - for (; it != read_callbacks_.end(); ++it) - { - if (!(*it)->enable && (*it)->callback == callback) - { + } else { + for (; it != read_callbacks_.end(); ++it) { + if (!(*it)->enable && (*it)->callback == callback) { (*it)->enable = true; n++; } @@ -188,16 +191,11 @@ int aio_istream::enable_read_callback(aio_callback* callback /* = NULL */) void aio_istream::hook_read() { acl_assert(stream_); - if (read_hooked_) + if (read_hooked_) { return; + } read_hooked_ = true; - /* - acl_aio_ctl(stream_, - ACL_AIO_CTL_READ_HOOK_ADD, read_callback, this, - ACL_AIO_CTL_CTX, this, - ACL_AIO_CTL_END); - */ acl_aio_add_read_hook(stream_, read_callback, this); } @@ -235,36 +233,32 @@ int aio_istream::get_buf_max(void) const void aio_istream::gets(int timeout /* = 0 */, bool nonl /* = true */, acl_int64 delay /* = 0 */, aio_timer_reader* callback /* = NULL */) { - if (delay > 0) - { + if (delay > 0) { // 设置新的或重置读延迟定时器 disable_read(); - if (callback != NULL) - { - if (timer_reader_ != NULL) - { + if (callback != NULL) { + if (timer_reader_ != NULL) { handle_->del_timer(timer_reader_); timer_reader_->destroy(); } timer_reader_= callback; } - if (timer_reader_ == NULL) + if (timer_reader_ == NULL) { timer_reader_ = NEW aio_timer_reader(); + } // 设置 timer_reader_ 对象的成员变量 - timer_reader_->in_ = this; - timer_reader_->delay_gets_ = true; + timer_reader_->in_ = this; + timer_reader_->delay_gets_ = true; timer_reader_->delay_timeout_ = timeout; - timer_reader_->delay_nonl_ = nonl; + timer_reader_->delay_nonl_ = nonl; // 设置异步读定时器 handle_->set_timer(timer_reader_, delay); return; - } - else if (timer_reader_ != NULL) - { + } else if (timer_reader_ != NULL) { // 立即取消之前设置的异步读定时器 handle_->del_timer(timer_reader_); timer_reader_->destroy(); @@ -272,48 +266,45 @@ void aio_istream::gets(int timeout /* = 0 */, bool nonl /* = true */, } // 设置流的异步读超时时间 - if (timeout >= 0) - if (timeout >= 0) - ACL_AIO_SET_TIMEOUT(stream_, timeout); - if (nonl) + if (timeout >= 0) { + ACL_AIO_SET_TIMEOUT(stream_, timeout); + } + if (nonl) { acl_aio_gets_nonl(stream_); - else + } else { acl_aio_gets(stream_); + } } void aio_istream::read(int count /* = 0 */, int timeout /* = 0 */, acl_int64 delay /* = 0 */, aio_timer_reader* callback /* = NULL */) { - if (delay > 0) - { + if (delay > 0) { // 设置新的或重置读延迟定时器 disable_read(); - if (callback != NULL) - { - if (timer_reader_ != NULL) - { + if (callback != NULL) { + if (timer_reader_ != NULL) { handle_->del_timer(timer_reader_); timer_reader_->destroy(); } timer_reader_= callback; } - if (timer_reader_ == NULL) + if (timer_reader_ == NULL) { timer_reader_ = NEW aio_timer_reader(); + } // 设置 timer_reader_ 对象的成员变量 - timer_reader_->in_ = this; - timer_reader_->delay_gets_ = false; + timer_reader_->in_ = this; + timer_reader_->delay_gets_ = false; timer_reader_->delay_timeout_ = timeout; - timer_reader_->delay_count_ = count; + timer_reader_->delay_count_ = count; // 设置异步读定时器 handle_->set_timer(timer_reader_, delay); return; - } - else if (timer_reader_ != NULL) - { + } else if (timer_reader_ != NULL) { // 立即取消之前设置的异步读定时器 handle_->del_timer(timer_reader_); timer_reader_->destroy(); @@ -321,19 +312,22 @@ void aio_istream::read(int count /* = 0 */, int timeout /* = 0 */, } // 设置流的异步读超时时间 - if (timeout >= 0) + if (timeout >= 0) { ACL_AIO_SET_TIMEOUT(stream_, timeout); - if (count > 0) + } + if (count > 0) { acl_aio_readn(stream_, count); - else + } else { acl_aio_read(stream_); + } } void aio_istream::read_wait(int timeout /* = 0 */) { // 设置流的异步读超时时间 - if (timeout >= 0) + if (timeout >= 0) { ACL_AIO_SET_TIMEOUT(stream_, timeout); + } acl_aio_enable_read(stream_, read_wakeup, this); } @@ -342,13 +336,14 @@ int aio_istream::read_callback(ACL_ASTREAM* stream acl_unused, void* ctx, { aio_istream* in = (aio_istream*) ctx; std::list::iterator it = in->read_callbacks_.begin(); - for (; it != in->read_callbacks_.end(); ++it) - { - if ((*it)->enable == false || (*it)->callback == NULL) + for (; it != in->read_callbacks_.end(); ++it) { + if ((*it)->enable == false || (*it)->callback == NULL) { continue; + } - if ((*it)->callback->read_callback(data, len) == false) + if ((*it)->callback->read_callback(data, len) == false) { return -1; + } } return 0; } @@ -357,13 +352,14 @@ int aio_istream::read_wakeup(ACL_ASTREAM* stream acl_unused, void* ctx) { aio_istream* in = (aio_istream*) ctx; std::list::iterator it = in->read_callbacks_.begin(); - for (; it != in->read_callbacks_.end(); ++it) - { - if ((*it)->enable == false || (*it)->callback == NULL) + for (; it != in->read_callbacks_.end(); ++it) { + if ((*it)->enable == false || (*it)->callback == NULL) { continue; + } - if ((*it)->callback->read_wakeup() == false) + if ((*it)->callback->read_wakeup() == false) { return -1; + } } return 0; } diff --git a/lib_acl_cpp/src/stream/aio_ostream.cpp b/lib_acl_cpp/src/stream/aio_ostream.cpp index b7687932f..f746375f0 100644 --- a/lib_acl_cpp/src/stream/aio_ostream.cpp +++ b/lib_acl_cpp/src/stream/aio_ostream.cpp @@ -20,24 +20,24 @@ aio_timer_writer::~aio_timer_writer() void aio_timer_writer::timer_callback(unsigned int id acl_unused) { - if (out_ == NULL) + if (out_ == NULL) { return; + } bool findit = false; std::list::iterator it = out_->timer_writers_.begin(); - for (; it != out_->timer_writers_.end(); ++it) - { - if ((*it) == this) - { + for (; it != out_->timer_writers_.end(); ++it) { + if ((*it) == this) { out_->timer_writers_.erase(it); findit = true; break; } } - if (findit == false) + if (findit == false) { logger_warn("Warning: timer_writer is the end!"); + } out_->write(buf_.c_str(), (int) buf_.length(), 0, NULL); } @@ -45,26 +45,44 @@ void aio_timer_writer::timer_callback(unsigned int id acl_unused) ////////////////////////////////////////////////////////////////////// aio_ostream::aio_ostream(aio_handle* handle) - : aio_stream(handle) - , write_hooked_(false) +: aio_stream(handle) +, write_hooked_(false) { +} +aio_ostream::aio_ostream(aio_handle* handle, ACL_SOCKET fd) +: aio_stream(handle) +, write_hooked_(false) +{ + acl_assert(handle); + + ACL_VSTREAM* vstream = acl_vstream_fdopen(fd, O_RDWR, 8192, 0, + ACL_VSTREAM_TYPE_SOCK); + stream_ = acl_aio_open(handle->get_handle(), vstream); + + // 调用基类的 hook_error 以向 handle 中增加异步流计数, + // 同时 hook 关闭及超时回调过程 + hook_error(); + + // 只有当流连接成功后才可 hook IO 写状态 + // hook 写回调过程 + hook_write(); } aio_ostream::~aio_ostream() { std::list::iterator it = timer_writers_.begin(); - for (; it != timer_writers_.end(); ++it) - { + for (; it != timer_writers_.end(); ++it) { handle_->del_timer(*it); (*it)->destroy(); } timer_writers_.clear(); std::list::iterator it2 = write_callbacks_.begin(); - for (; it2 != write_callbacks_.end(); ++it2) + for (; it2 != write_callbacks_.end(); ++it2) { acl_myfree((*it2)); + } write_callbacks_.clear(); } @@ -79,32 +97,28 @@ void aio_ostream::add_write_callback(aio_callback* callback) // 先查询该回调对象已经存在 std::list::iterator it = write_callbacks_.begin(); - for (; it != write_callbacks_.end(); ++it) - { - if ((*it)->callback == callback) - { - if ((*it)->enable == false) + for (; it != write_callbacks_.end(); ++it) { + if ((*it)->callback == callback) { + if ((*it)->enable == false) { (*it)->enable = true; + } return; } } // 找一个空位 it = write_callbacks_.begin(); - for (; it != write_callbacks_.end(); ++it) - { - if ((*it)->callback == NULL) - { - (*it)->enable = true; + for (; it != write_callbacks_.end(); ++it) { + if ((*it)->callback == NULL) { + (*it)->enable = true; (*it)->callback = callback; return; } } // 分配一个新的位置 - AIO_CALLBACK* ac = (AIO_CALLBACK*) - acl_mycalloc(1, sizeof(AIO_CALLBACK)); - ac->enable = true; + AIO_CALLBACK* ac = (AIO_CALLBACK*) acl_mycalloc(1, sizeof(AIO_CALLBACK)); + ac->enable = true; ac->callback = callback; // 添加进回调对象队列中 @@ -116,24 +130,21 @@ int aio_ostream::del_write_callback(aio_callback* callback) std::list::iterator it = write_callbacks_.begin(); int n = 0; - if (callback == NULL) - { - for (; it != write_callbacks_.end(); ++it) - { - if ((*it)->callback == NULL) + if (callback == NULL) { + for (; it != write_callbacks_.end(); ++it) { + if ((*it)->callback == NULL) { continue; - (*it)->enable = false; + } + (*it)->enable = false; (*it)->callback = NULL; n++; } - } - else - { - for (; it != write_callbacks_.end(); ++it) - { - if ((*it)->callback != callback) + } else { + for (; it != write_callbacks_.end(); ++it) { + if ((*it)->callback != callback) { continue; - (*it)->enable = false; + } + (*it)->enable = false; (*it)->callback = NULL; n++; break; @@ -148,22 +159,19 @@ int aio_ostream::disable_write_callback(aio_callback* callback) std::list::iterator it = write_callbacks_.begin(); int n = 0; - if (callback == NULL) - { - for (; it != write_callbacks_.end(); ++it) - { - if ((*it)->callback == NULL || !(*it)->enable) + if (callback == NULL) { + for (; it != write_callbacks_.end(); ++it) { + if ((*it)->callback == NULL || !(*it)->enable) { continue; + } (*it)->enable = false; n++; } - } - else - { - for (; it != write_callbacks_.end(); ++it) - { - if ((*it)->callback != callback || !(*it)->enable) + } else { + for (; it != write_callbacks_.end(); ++it) { + if ((*it)->callback != callback || !(*it)->enable) { continue; + } (*it)->enable = false; n++; break; @@ -178,23 +186,16 @@ int aio_ostream::enable_write_callback(aio_callback* callback /* = NULL */) std::list::iterator it = write_callbacks_.begin(); int n = 0; - if (callback == NULL) - { - for (; it != write_callbacks_.end(); ++it) - { - if (!(*it)->enable && (*it)->callback != NULL) - { + if (callback == NULL) { + for (; it != write_callbacks_.end(); ++it) { + if (!(*it)->enable && (*it)->callback != NULL) { (*it)->enable = true; n++; } } - } - else - { - for (; it != write_callbacks_.end(); ++it) - { - if (!(*it)->enable && (*it)->callback == callback) - { + } else { + for (; it != write_callbacks_.end(); ++it) { + if (!(*it)->enable && (*it)->callback == callback) { (*it)->enable = true; n++; } @@ -207,15 +208,11 @@ int aio_ostream::enable_write_callback(aio_callback* callback /* = NULL */) void aio_ostream::hook_write() { acl_assert(stream_); - if (write_hooked_) + if (write_hooked_) { return; + } write_hooked_ = true; - /* - acl_aio_ctl(stream_, - ACL_AIO_CTL_WRITE_HOOK_ADD, write_callback, this, - ACL_AIO_CTL_END); - */ acl_aio_add_write_hook(stream_, write_callback, this); } @@ -229,16 +226,16 @@ void aio_ostream::write(const void* data, int len, acl_int64 delay /* = 0 */, aio_timer_writer* callback /* = NULL */) { - if (delay > 0) - { + if (delay > 0) { disable_write(); aio_timer_writer* timer_writer; - if (callback != NULL) + if (callback != NULL) { timer_writer= callback; - else + } else { timer_writer = NEW aio_timer_writer(); + } // 设置 timer_writer_ 对象的成员变量 timer_writer->out_ = this; @@ -271,8 +268,9 @@ void aio_ostream::vformat(const char* fmt, va_list ap) void aio_ostream::write_wait(int timeout /* = 0 */) { // 设置流的异步读超时时间 - if (timeout >= 0) + if (timeout >= 0) { ACL_AIO_SET_TIMEOUT(stream_, timeout); + } acl_aio_enable_write(stream_, write_wakup, this); } @@ -280,13 +278,14 @@ int aio_ostream::write_callback(ACL_ASTREAM* stream acl_unused, void* ctx) { aio_ostream* aos = (aio_ostream*) ctx; std::list::iterator it = aos->write_callbacks_.begin(); - for (; it != aos->write_callbacks_.end(); ++it) - { - if ((*it)->enable == false || (*it)->callback == NULL) + for (; it != aos->write_callbacks_.end(); ++it) { + if ((*it)->enable == false || (*it)->callback == NULL) { continue; + } - if ((*it)->callback->write_callback() == false) + if ((*it)->callback->write_callback() == false) { return -1; + } } return 0; } @@ -295,13 +294,14 @@ int aio_ostream::write_wakup(ACL_ASTREAM* stream acl_unused, void* ctx) { aio_ostream* out = (aio_ostream*) ctx; std::list::iterator it = out->write_callbacks_.begin(); - for (; it != out->write_callbacks_.end(); ++it) - { - if ((*it)->enable == false || (*it)->callback == NULL) + for (; it != out->write_callbacks_.end(); ++it) { + if ((*it)->enable == false || (*it)->callback == NULL) { continue; + } - if ((*it)->callback->write_wakeup() == false) + if ((*it)->callback->write_wakeup() == false) { return -1; + } } return 0; } diff --git a/lib_acl_cpp/src/stream/aio_socket_stream.cpp b/lib_acl_cpp/src/stream/aio_socket_stream.cpp index 06ffdb043..0e306afc1 100644 --- a/lib_acl_cpp/src/stream/aio_socket_stream.cpp +++ b/lib_acl_cpp/src/stream/aio_socket_stream.cpp @@ -47,7 +47,7 @@ aio_socket_stream::aio_socket_stream(aio_handle* handle, ACL_SOCKET fd) // 同时 hook 关闭及超时回调过程 hook_error(); - // 只有当流连接成功后才可 hook IO 读写状态 + // 只有当流连接成功后才可 hook IO 读状态 // hook 读回调过程 hook_read(); diff --git a/lib_acl_cpp/src/stream/aio_stream.cpp b/lib_acl_cpp/src/stream/aio_stream.cpp index 4d892c4fb..9db5a3648 100644 --- a/lib_acl_cpp/src/stream/aio_stream.cpp +++ b/lib_acl_cpp/src/stream/aio_stream.cpp @@ -32,6 +32,7 @@ aio_stream::~aio_stream(void) handle_->decrease(); acl_aio_iocp_close(stream_); } + std::list::iterator it = close_callbacks_.begin(); for (; it != close_callbacks_.end(); ++it) { acl_myfree((*it));