#include "stdafx.h" #include #include #include "server_pool.h" using shared_stream = std::shared_ptr; class socket_client { public: socket_client(shared_stream conn, std::atomic& nusers) : conn_(conn), nusers_(nusers) {} ~socket_client(void) { --nusers_; } acl::socket_stream& get_conn(void) { return *conn_; } private: shared_stream conn_; std::atomic& nusers_; }; using shared_client = std::shared_ptr; class message { public: message(shared_client client, std::atomic& nmsgs, const char* buf, size_t len) : client_(client), nmsgs_(nmsgs), buf_(buf, len) {} ~message(void) { --nmsgs_; } const std::string& get_data(void) const { return buf_; } shared_client get_client(void) { return client_; } private: shared_client client_; std::atomic& nmsgs_; std::string buf_; }; using shared_message = std::shared_ptr; void server_pool_run(const char* addr, bool sync, int nfibers) { acl::server_socket ss; if (!ss.open(addr)) { printf("listen %s error %s\r\n", addr, acl::last_serror()); return; } printf("listen on %s, fiber pool: %d\r\n", addr, nfibers); acl::fiber_sbox2 box; for (int i = 0; i < nfibers; i++) { go[&box] { while (true) { shared_message msg; if (!box.pop(msg)) { printf("POP end!\r\n"); break; } auto client = msg->get_client(); auto data = msg->get_data(); if (client->get_conn().write(data.c_str(), data.size()) == -1) { printf("write error: %s\r\n", acl::last_serror()); break; } } }; } std::atomic nusers(0), nmsgs(0); go[&nusers, &nmsgs] { while (true) { std::cout << "client count: " << nusers << "; message count: " << nmsgs << std::endl; ::sleep(1); } }; go[&ss, &box, &nusers, &nmsgs, sync] { while (true) { auto conn = ss.shared_accept(); if (conn.get() == NULL) { printf("accept error %s\r\n", acl::last_serror()); break; } ++nusers; auto client = std::make_shared(conn, nusers); go[&box, &nmsgs, client, sync] { char buf[4096]; while (true) { int ret = client->get_conn().read(buf, sizeof(buf), false); if (ret <= 0) { break; } if (sync) { if (client->get_conn().write(buf, ret) != ret) { break; } else { continue; } } ++nmsgs; auto msg = std::make_shared(client, nmsgs, buf, ret); box.push(msg); } }; } }; acl::fiber::schedule(); }