diff --git a/lib_fiber/samples-c++1x/aio_server/main.cpp b/lib_fiber/samples-c++1x/aio_server/main.cpp index 7fa038586..2c44a2e94 100644 --- a/lib_fiber/samples-c++1x/aio_server/main.cpp +++ b/lib_fiber/samples-c++1x/aio_server/main.cpp @@ -16,11 +16,9 @@ static int __timeout = 0; /** * 延迟读回调处理类 */ -class timer_reader: public acl::aio_timer_reader -{ +class timer_reader: public acl::aio_timer_reader { public: - timer_reader(int delay) - { + timer_reader(int delay) { delay_ = delay; std::cout << "timer_reader init, delay: " << delay << std::endl; } @@ -30,16 +28,14 @@ protected: // aio_timer_reader 的子类必须重载 destroy 方法 // @override - void destroy(void) - { + void destroy(void) { std::cout << "timer_reader delete, delay: " << delay_ << std::endl; delete this; } // 重载基类回调方法 // @override - void timer_callback(unsigned int id) - { + void timer_callback(unsigned int id) { std::cout << "timer_reader(" << id << "): timer_callback, delay: " << delay_ << std::endl; @@ -54,11 +50,9 @@ private: /** * 延迟写回调处理类 */ -class timer_writer: public acl::aio_timer_writer -{ +class timer_writer: public acl::aio_timer_writer { public: - timer_writer(int delay) - { + timer_writer(int delay) { delay_ = delay; std::cout << "timer_writer init, delay: " << delay << std::endl; } @@ -68,16 +62,14 @@ protected: // aio_timer_reader 的子类必须重载 destroy 方法 // @override - void destroy(void) - { + void destroy(void) { std::cout << "timer_writer delete, delay: " << delay_ << std::endl; delete this; } // 重载基类回调方法 // @override - void timer_callback(unsigned int id) - { + void timer_callback(unsigned int id) { std::cout << "timer_writer(" << id << "): timer_callback, delay: " << delay_ << std::endl; @@ -92,16 +84,13 @@ private: /** * 异步客户端流的回调类的子类 */ -class io_callback : public acl::aio_callback -{ +class io_callback : public acl::aio_callback { public: io_callback(acl::aio_socket_stream* client) - : client_(client) - , i_(0) {} + : client_(client), i_(0) {} protected: - ~io_callback(void) - { + ~io_callback(void) { std::cout << "delete io_callback now ..." << std::endl; } @@ -111,8 +100,7 @@ protected: * @param len {int} 读到的数据长度 * @return {bool} 返回 true 表示继续,否则希望关闭该异步流 */ - bool read_callback(char* data, int len) - { + bool read_callback(char* data, int len) { i_++; if (i_ < 5) { std::cout << ">>gets(i:" << i_ << "): " @@ -176,16 +164,14 @@ protected: * 实现父类中的虚函数,客户端流的写成功回调过程 * @return {bool} 返回 true 表示继续,否则希望关闭该异步流 */ - bool write_callback(void) - { + bool write_callback(void) { return true; } /** * 实现父类中的虚函数,客户端流的超时回调过程 */ - void close_callback(void) - { + void close_callback(void) { // 必须在此处删除该动态分配的回调类对象以防止内存泄露 delete this; } @@ -194,8 +180,7 @@ protected: * 实现父类中的虚函数,客户端流的超时回调过程 * @return {bool} 返回 true 表示继续,否则希望关闭该异步流 */ - bool timeout_callback(void) - { + bool timeout_callback(void) { std::cout << "Timeout, delete it ..." << std::endl; return (false); } @@ -213,8 +198,8 @@ class io_accept_callback : public acl::aio_accept_callback { public: io_accept_callback(void) {} - ~io_accept_callback(void) - { + + ~io_accept_callback(void) { printf(">>io_accept_callback over!\n"); } @@ -223,8 +208,7 @@ public: * @param client {aio_socket_stream*} 异步客户端流 * @return {bool} 返回 true 以通知监听流继续监听 */ - bool accept_callback(acl::aio_socket_stream* client) - { + bool accept_callback(acl::aio_socket_stream* client) { printf("proactor accept one\r\n"); return handle_client(client); } @@ -234,8 +218,7 @@ public: * @param server {acl::aio_listen_stream&} 异步监听流 * @return {bool} */ - bool listen_callback(acl::aio_listen_stream& server) - { + bool listen_callback(acl::aio_listen_stream& server) { // reactor 模式下需要用户自己调用 accept 方法 acl::aio_socket_stream* client = server.accept(); if (client == NULL) { @@ -248,8 +231,7 @@ public: } private: - bool handle_client(acl::aio_socket_stream* client) - { + bool handle_client(acl::aio_socket_stream* client) { // 创建异步客户端流的回调对象并与该异步流进行绑定 io_callback* callback = new io_callback(client); @@ -309,8 +291,7 @@ private: //#include static void aio_run(bool use_reactor, acl::aio_handle& handle, - acl::aio_listen_stream* sstream) -{ + acl::aio_listen_stream* sstream) { // 创建回调类对象,当有新连接到达时自动调用此类对象的回调过程 io_accept_callback callback; @@ -351,8 +332,7 @@ static void aio_run(bool use_reactor, acl::aio_handle& handle, } static acl::aio_listen_stream* bind_addr(acl::aio_handle& handle, - const acl::string& addr) -{ + const acl::string& addr) { // 创建监听异步流 acl::aio_listen_stream* sstream = new acl::aio_listen_stream(&handle); @@ -371,45 +351,43 @@ static acl::aio_listen_stream* bind_addr(acl::aio_handle& handle, return sstream; } -static void usage(const char* procname) -{ +static void usage(const char* procname) { printf("usage: %s -h[help]\r\n" - " -l ip:port\r\n" - " -L line_max_length\r\n" - " -t timeout\r\n" - " -r [use reactor mode other proactor mode, default: proactor mode]\r\n" - " -f [if use fiber mode]\r\n" - " -k[use kernel event: epoll/iocp/kqueue/devpool]\r\n", + " -s ip:port, default: 127.0.0.1:9001\r\n" + " -d line_max_length\r\n" + " -t timeout\r\n" + " -R [use reactor mode other proactor mode, default: proactor mode]\r\n" + " -F [if use fiber mode]\r\n" + " -K [use kernel event: epoll/iocp/kqueue/devpool]\r\n", procname); } -int main(int argc, char* argv[]) -{ +int main(int argc, char* argv[]) { bool use_kernel = false, use_reactor = false, use_fiber = false; + acl::string addr("127.0.0.1:9001"); int ch; - acl::string addr(":9001"); - while ((ch = getopt(argc, argv, "l:hkL:t:rf")) > 0) { + while ((ch = getopt(argc, argv, "s:hKd:t:RF")) > 0) { switch (ch) { case 'h': usage(argv[0]); return 0; - case 'l': + case 's': addr = optarg; break; - case 'k': + case 'K': use_kernel = true; break; - case 'L': + case 'd': __max = atoi(optarg); break; case 't': __timeout = atoi(optarg); break; - case 'r': + case 'R': use_reactor = true; break; - case 'f': + case 'F': use_fiber = true; break; default: @@ -423,7 +401,6 @@ int main(int argc, char* argv[]) acl::fiber::stdout_open(true); acl::log::stdout_open(true); - if (use_fiber) { go[&] { // 构建异步引擎类对象 diff --git a/lib_fiber/samples-c++1x/fiber_pool/Makefile b/lib_fiber/samples-c++1x/fiber_pool/Makefile new file mode 100644 index 000000000..7d81992ec --- /dev/null +++ b/lib_fiber/samples-c++1x/fiber_pool/Makefile @@ -0,0 +1,3 @@ +include ../Makefile_cpp.in +CFLAGS += -std=c++11 +PROG = fiber_pool diff --git a/lib_fiber/samples-c++1x/fiber_pool/main.cpp b/lib_fiber/samples-c++1x/fiber_pool/main.cpp new file mode 100644 index 000000000..17ba28013 --- /dev/null +++ b/lib_fiber/samples-c++1x/fiber_pool/main.cpp @@ -0,0 +1,173 @@ +#include "stdafx.h" +#include +#include + +class client_socket { +public: + client_socket(acl::socket_stream* conn, std::atomic& nusers) + : conn_(conn), nusers_(nusers) {} + + ~client_socket(void) { + printf("delete conn=%p\r\n", conn_); + --nusers_; + delete conn_; + } + + acl::socket_stream& get_conn(void) { + return *conn_; + } + +private: + acl::socket_stream* conn_; + std::atomic& nusers_; +}; + +using shared_client = std::shared_ptr; + +class message { +public: + message(shared_client client, std::atomic& nmsgs, + const char* buf, size_t len) + : client_(client), nmsgs_(nmsgs), buf_(buf, len) {} + + ~message(void) { --nmsgs_; } + + const std::string& get_data(void) const { + return buf_; + } + + shared_client get_client(void) { + return client_; + } + +private: + shared_client client_; + std::atomic& nmsgs_; + std::string buf_; +}; + +using shared_message = std::shared_ptr; + +class mybox { +public: + mybox(void) : sem_(0) {} + ~mybox(void) {} + + void push(shared_message msg) { + msgs_.emplace_back(msg); + sem_.post(); + } + + shared_message pop(void) { + (void) sem_.wait(); + shared_message msg = msgs_.front(); + msgs_.pop_front(); + return msg; + } + +private: + acl::fiber_sem sem_; + std::list msgs_; +}; + +static void usage(const char* procname) { + printf("usage: %s -h[help]\r\n" + " -s ip:port, default: 127.0.0.1:9001\r\n" + " -c fiber_pool_count [default: 100] \r\n" + " -r timeout\r\n" + , procname); +} + +int main(int argc, char* argv[]) { + acl::string addr("127.0.0.1:9001"); + int ch, nfibers = 100; + + while ((ch = getopt(argc, argv, "hs:c:r:")) > 0) { + switch (ch) { + case 'h': + usage(argv[0]); + return 0; + case 's': + addr = optarg; + break; + case 'c': + nfibers = atoi(optarg); + break; + default: + break; + } + } + + acl::fiber::stdout_open(true); + acl::log::stdout_open(true); + + acl::server_socket ss; + if (!ss.open(addr)) { + printf("listen %s error %s\r\n", addr.c_str(), acl::last_serror()); + return 1; + } + + printf("listen on %s, fiber pool: %d\r\n", addr.c_str(), nfibers); + + mybox box; + + for (int i = 0; i < nfibers; i++) { + go[&box] { + while (true) { + shared_message msg = box.pop(); + auto client = msg->get_client(); + auto data = msg->get_data(); + if (client->get_conn().write(data.c_str(), data.size()) == -1) { + printf("write error: %s\r\n", acl::last_serror()); + break; + } + } + }; + } + + std::atomic nusers, nmsgs; + + go[&nusers, &nmsgs] { + while (true) { + std::cout << "client count: " << nusers << "; message count: " << nmsgs << std::endl; + ::sleep(1); + } + }; + + go[&ss, &box, &nusers, &nmsgs] { + while (true) { + acl::socket_stream* conn = ss.accept(); + if (conn == NULL) { + printf("accept error %s\r\n", acl::last_serror()); + break; + } + + ++nusers; + + auto client = std::make_shared(conn, nusers); + + go[&box, &nmsgs, client] { + char buf[4096]; + while (true) { + int ret = client->get_conn().read(buf, sizeof(buf), false); + if (ret <= 0) { + break; + } + + if (client->get_conn().write(buf, ret) != ret) { + break; + } else { + continue; + } + + ++nmsgs; + auto msg = std::make_shared(client, nmsgs, buf, ret); + box.push(msg); + } + }; + } + }; + + acl::fiber::schedule(); + return 0; +} diff --git a/lib_fiber/samples-c++1x/fiber_pool/stdafx.h b/lib_fiber/samples-c++1x/fiber_pool/stdafx.h new file mode 100644 index 000000000..0020d0704 --- /dev/null +++ b/lib_fiber/samples-c++1x/fiber_pool/stdafx.h @@ -0,0 +1,18 @@ +// stdafx.h : 标准系统包含文件的包含文件, +// 或是常用但不常更改的项目特定的包含文件 +// + +#pragma once + + +//#include +//#include + +// TODO: 在此处引用程序要求的附加头文件 +#include +#include + +#include "lib_acl.h" // just for getopt on Windows +#include "acl_cpp/lib_acl.hpp" +#include "fiber/libfiber.hpp" +#include "fiber/go_fiber.hpp" diff --git a/lib_fiber/samples/client2/main.c b/lib_fiber/samples/client2/main.c index 01dcb9281..0e6d31b6c 100644 --- a/lib_fiber/samples/client2/main.c +++ b/lib_fiber/samples/client2/main.c @@ -1,3 +1,5 @@ +#define _GNU_SOURCE + #include #include #include @@ -189,7 +191,6 @@ static void fiber_connect(ACL_FIBER *fiber acl_unused, void *ctx acl_unused) printf("fiber-%d: connect %s:%d error %s\r\n", acl_fiber_self(), __server_ip, __server_port, acl_last_serror()); - return; } else { __total_clients++; printf("fiber-%d: connect %s:%d ok, clients: %d, fd: %d\r\n", diff --git a/lib_fiber/samples/patch.c b/lib_fiber/samples/patch.c index 7541be2e0..d8e9541dd 100644 --- a/lib_fiber/samples/patch.c +++ b/lib_fiber/samples/patch.c @@ -1,3 +1,4 @@ +#define _GNU_SOURCE #include #include #include