From 5de837070044d1ea9f1b5aaa53b858106e97f135 Mon Sep 17 00:00:00 2001 From: zhengshuxin Date: Tue, 4 Apr 2023 16:35:31 +0800 Subject: [PATCH] Optimize and test sharing stack of fiber. --- lib_fiber/c/src/fiber.c | 10 ++ lib_fiber/cpp/include/fiber/fiber.hpp | 3 +- lib_fiber/cpp/include/fiber/go_fiber.hpp | 10 +- lib_fiber/cpp/src/fiber.cpp | 11 +- lib_fiber/samples/chat/client/valgrind.sh | 0 lib_fiber/samples/chat/server/main.cpp | 146 ++++++++------------ lib_fiber/samples/chat/server/user_client.h | 11 +- lib_fiber/samples/chat/server/valgrind.sh | 0 lib_fiber/samples/server3/main.cpp | 27 ++-- 9 files changed, 114 insertions(+), 104 deletions(-) mode change 100644 => 100755 lib_fiber/samples/chat/client/valgrind.sh mode change 100644 => 100755 lib_fiber/samples/chat/server/valgrind.sh diff --git a/lib_fiber/c/src/fiber.c b/lib_fiber/c/src/fiber.c index 9fe620148..3129e168f 100644 --- a/lib_fiber/c/src/fiber.c +++ b/lib_fiber/c/src/fiber.c @@ -36,6 +36,9 @@ typedef struct THREAD { int nlocal; #ifdef SHARE_STACK +# ifdef USE_VALGRIND + unsigned int vid; +# endif char *stack_buff; size_t stack_size; size_t stack_dlen; @@ -89,6 +92,9 @@ static void thread_free(void *ctx) } #ifdef SHARE_STACK +# ifdef USE_VALGRIND + VALGRIND_STACK_DEREGISTER(tf->vid); +# endif mem_free(tf->stack_buff); #endif @@ -159,6 +165,10 @@ static void fiber_check(void) #ifdef SHARE_STACK __thread_fiber->stack_size = __shared_stack_size; __thread_fiber->stack_buff = mem_malloc(__thread_fiber->stack_size); +# ifdef USE_VALGRIND + __thread_fiber->vid = VALGRIND_STACK_REGISTER(__thread_fiber->stack_buff, + __thread_fiber->stack_buff + __shared_stack_size); +# endif __thread_fiber->stack_dlen = 0; #endif diff --git a/lib_fiber/cpp/include/fiber/fiber.hpp b/lib_fiber/cpp/include/fiber/fiber.hpp index 31ef42afc..ba8895b8e 100644 --- a/lib_fiber/cpp/include/fiber/fiber.hpp +++ b/lib_fiber/cpp/include/fiber/fiber.hpp @@ -267,9 +267,10 @@ public: * @param fn {void (*)(ACL_FIBER*, void*)} 协程函数执行入口 * @param ctx {void*} 传递给协程执行函数的参数 * @param size {size_t} 协程栈大小 + * @param share_stack {bool} 是否创建共享栈协程 */ static void fiber_create(void (*fn)(ACL_FIBER*, void*), - void* ctx, size_t size); + void* ctx, size_t size, bool share_stack = false); /** * 获得指定协程的堆栈 diff --git a/lib_fiber/cpp/include/fiber/go_fiber.hpp b/lib_fiber/cpp/include/fiber/go_fiber.hpp index 9e725a2e6..a2a609f0a 100644 --- a/lib_fiber/cpp/include/fiber/go_fiber.hpp +++ b/lib_fiber/cpp/include/fiber/go_fiber.hpp @@ -22,7 +22,8 @@ public: }; #define go acl::go_fiber()> -#define go_stack(size) acl::go_fiber(size)> +#define go_stack(size) acl::go_fiber(size, false)> +#define go_share(size) acl::go_fiber(size, true)> #define go_wait_fiber acl::go_fiber()< #define go_wait_thread acl::go_fiber()<< @@ -32,12 +33,12 @@ class go_fiber { public: go_fiber(void) {} - go_fiber(size_t stack_size) : stack_size_(stack_size) {} + go_fiber(size_t stack_size, bool on) : stack_size_(stack_size), stack_share_(on) {} void operator > (std::function fn) { fiber_ctx* ctx = new fiber_ctx(fn); - fiber::fiber_create(fiber_main, (void*) ctx, stack_size_); + fiber::fiber_create(fiber_main, (void*) ctx, stack_size_, stack_share_); } void operator < (std::function fn) @@ -65,7 +66,8 @@ public: } private: - size_t stack_size_ = 320000; + size_t stack_size_ = 320000; + bool stack_share_ = false; static void fiber_main(ACL_FIBER*, void* ctx) { diff --git a/lib_fiber/cpp/src/fiber.cpp b/lib_fiber/cpp/src/fiber.cpp index 64a34e137..6c0353477 100644 --- a/lib_fiber/cpp/src/fiber.cpp +++ b/lib_fiber/cpp/src/fiber.cpp @@ -177,6 +177,7 @@ void fiber::start(size_t stack_size /* 64000 */, bool share_stack /* false */) acl_msg_fatal("%s(%d), %s: fiber-%u, already running!", __FILE__, __LINE__, __FUNCTION__, self()); } + ACL_FIBER_ATTR attr; acl_fiber_attr_init(&attr); acl_fiber_attr_setstacksize(&attr, stack_size); @@ -307,9 +308,15 @@ void fiber::stdout_open(bool on) acl_fiber_msg_stdout_enable(on ? 1 : 0); } -void fiber::fiber_create(void (*fn)(ACL_FIBER*, void*), void* ctx, size_t size) +void fiber::fiber_create(void (*fn)(ACL_FIBER*, void*), void* ctx, + size_t stack_size, bool share_stack /* false */) { - acl_fiber_create(fn, (void*) ctx, size); + ACL_FIBER_ATTR attr; + acl_fiber_attr_init(&attr); + acl_fiber_attr_setstacksize(&attr, stack_size); + acl_fiber_attr_setsharestack(&attr, share_stack ? 1 : 0); + + acl_fiber_create2(&attr, fn, ctx); } void fiber::stacktrace(const fiber& fb, std::vector& out, size_t max) diff --git a/lib_fiber/samples/chat/client/valgrind.sh b/lib_fiber/samples/chat/client/valgrind.sh old mode 100644 new mode 100755 diff --git a/lib_fiber/samples/chat/server/main.cpp b/lib_fiber/samples/chat/server/main.cpp index a71bbd334..1635a8884 100644 --- a/lib_fiber/samples/chat/server/main.cpp +++ b/lib_fiber/samples/chat/server/main.cpp @@ -16,18 +16,16 @@ static std::map __users; static void remove_user(user_client* uc) { const char* name = uc->get_name(); - if (name == NULL || *name == 0) - { + if (name == NULL || *name == 0) { printf("%s(%d): no name!\r\n", __FUNCTION__, __LINE__); return; } std::map::iterator it = __users.find(name); - if (it == __users.end()) + if (it == __users.end()) { printf("%s(%d): not exist, name: %s\r\n", __FUNCTION__, __LINE__, name); - else - { + } else { __users.erase(it); printf("delete user ok, name: %s\r\n", name); } @@ -37,20 +35,19 @@ static void remove_user(user_client* uc) static void client_logout(user_client* client) { // 浠庡鎴风鍒楄〃涓垹闄 - if (client->already_login()) + if (client->already_login()) { remove_user(client); + } // 濡傛灉璇ュ鎴风鐨勮鍗忕▼杩樺湪宸ヤ綔锛屽垯閫氱煡璇ヨ鍗忕▼閫鍑 - if (client->is_reading()) - { + if (client->is_reading()) { printf("%s(%d): user: %s, kill_reader\r\n", __FUNCTION__, __LINE__, client->get_name()); client->kill_reader(); } // 濡傛灉璇ュ鎴风鐨勫啓鍗忕▼杩樺湪宸ヤ綔锛屽垯閫氱煡璇ュ啓鍗忕▼閫鍑 - if (client->is_waiting()) - { + if (client->is_waiting()) { printf("fiber-%d: %s(%d): user: %s, notify logout\r\n", acl_fiber_self(), __FUNCTION__, __LINE__, client->get_name()); @@ -58,8 +55,9 @@ static void client_logout(user_client* client) } // 濡傛灉璇ュ鎴风鐨勮銆佸啓鍗忕▼閮藉凡缁忛鍑猴紝鍒欓氱煡璇ュ鎴风閫鍑 - if (!client->is_reading() && !client->is_waiting()) + if (!client->is_reading() && !client->is_waiting()) { client->notify_exit(); + } } static bool client_flush(user_client* client) @@ -70,10 +68,8 @@ static bool client_flush(user_client* client) bool ret = true; // 浠庡鎴风鐨勬秷鎭槦鍒椾腑鎻愬彇娑堟伅骞跺彂閫佽嚦璇ュ鎴风 - while ((msg = client->pop()) != NULL) - { - if (conn.write(*msg) == -1) - { + while ((msg = client->pop()) != NULL) { + if (conn.write(*msg) == -1) { printf("flush to user: %s error %s\r\n", client->get_name(), acl::last_serror()); delete msg; @@ -95,16 +91,14 @@ static void fiber_writer(user_client* client) client->set_waiter(); client->set_waiting(true); - while (true) - { + while (true) { int mtype; // 绛夊緟娑堟伅閫氱煡 client->wait(mtype); // 浠庢湰韬秷鎭槦鍒椾腑鎻愬彇娑堟伅骞跺彂閫佽嚦鏈鎴风 - if (client_flush(client) == false) - { + if (client_flush(client) == false) { printf("%s(%d), user: %s, flush error %s\r\n", __FUNCTION__, __LINE__, client->get_name(), acl::last_serror()); @@ -112,22 +106,19 @@ static void fiber_writer(user_client* client) } #ifdef USE_CHAN - if (mtype == MT_LOGOUT) - { + if (mtype == MT_LOGOUT) { printf("%s(%d), user: %s, MT_LOGOUT\r\n", __FUNCTION__, __LINE__, client->get_name()); break; } - if (mtype == MT_KICK) - { + if (mtype == MT_KICK) { printf("%s(%d), user: %s, MT_KICK\r\n", __FUNCTION__, __LINE__, client->get_name()); client->get_stream().write("You're kicked\r\n"); break; } #else - if (client->exiting()) - { + if (client->exiting()) { printf("%s(%d), user: %s exiting\r\n", __FUNCTION__, __LINE__, client->get_name()); break; @@ -150,29 +141,26 @@ static bool client_login(user_client* uc) { acl::string buf; - while (true) - { + while (true) { // 璇诲彇涓琛屾暟鎹紝涓旇嚜鍔ㄥ幓鎺夊熬閮ㄧ殑 \r\n - if (uc->get_stream().gets(buf) == false) - { + if (uc->get_stream().gets(buf) == false) { printf("%s(%d): gets error %s\r\n", __FUNCTION__, __LINE__, acl::last_serror()); - if (errno == FIBER_ETIME) - { + if (errno == FIBER_ETIME) { printf("Login timeout\r\n"); uc->get_stream().write("Login timeout\r\n"); } return false; } - if (!buf.empty()) + if (!buf.empty()) { break; + } } // 鍒嗘瀽鐧诲叆娑堟伅锛屾暟鎹牸寮忥細login|xxx std::vector& tokens = buf.split2("|"); - if (tokens.size() < 2) - { + if (tokens.size() < 2) { acl::string tmp; tmp.format("invalid argc: %d < 2\r\n", (int) tokens.size()); printf("%s", tmp.c_str()); @@ -185,14 +173,13 @@ static bool client_login(user_client* uc) // 褰撹瀹㈡埛绔笉瀛樺湪鏃舵坊鍔犺繘瀹㈡埛绔垪琛ㄤ腑 const acl::string& name = tokens[1]; std::map::iterator it = __users.find(name); - if (it == __users.end()) - { + if (it == __users.end()) { __users[name] = uc; uc->set_name(name); msg.format("user %s login ok\r\n", name.c_str()); - } - else + } else { msg.format("user %s already login\r\n", name.c_str()); + } printf("%s", msg.c_str()); @@ -203,8 +190,7 @@ static bool client_login(user_client* uc) // 涓庡叾瀹冨鎴风鑱婂ぉ杩囩▼ static bool client_chat(user_client* uc, std::vector& tokens) { - if (tokens.size() < 3) - { + if (tokens.size() < 3) { printf("invalid argc: %d < 3\r\n", (int) tokens.size()); return true; } @@ -214,8 +200,7 @@ static bool client_chat(user_client* uc, std::vector& tokens) // 鏌ユ壘鐩爣瀹㈡埛绔璞 std::map::iterator it = __users.find(to); - if (it == __users.end()) - { + if (it == __users.end()) { acl::string tmp; tmp.format("chat >> from user: %s, to user: %s not exist\r\n", uc->get_name(), to.c_str()); @@ -234,8 +219,7 @@ static bool client_chat(user_client* uc, std::vector& tokens) // 韪㈠嚭涓涓鎴风瀵硅薄 static bool client_kick(user_client* uc, std::vector& tokens) { - if (tokens.size() < 2) - { + if (tokens.size() < 2) { printf("invalid argc: %d < 2\r\n", (int) tokens.size()); return true; } @@ -244,8 +228,7 @@ static bool client_kick(user_client* uc, std::vector& tokens) // 鏌ユ壘灏嗚韪㈠嚭鐨勫鎴风瀵硅薄 std::map::iterator it = __users.find(to); - if (it == __users.end()) - { + if (it == __users.end()) { acl::string tmp; tmp.format("kick >> from user: %s, to user: %s not exist\r\n", uc->get_name(), to.c_str()); @@ -271,8 +254,7 @@ static void fiber_reader(user_client* client) client->set_reading(true); // 鐧诲叆鏈嶅姟鍣 - if (client_login(client) == false) - { + if (client_login(client) == false) { client->set_reading(false); printf("----------client_logout-------\r\n"); @@ -284,7 +266,7 @@ static void fiber_reader(user_client* client) } // 鐧诲叆鎴愬姛锛屽垯鍒涘缓鍐欏崗绋嬬敤鏉ュ悜瀹㈡埛绔彂閫佹秷鎭 - go_stack(STACK_SIZE) [&] { + go_share(STACK_SIZE) [&] { __nwriter++; fiber_writer(client); }; @@ -295,33 +277,27 @@ static void fiber_reader(user_client* client) acl::string buf; // 浠庡鎴风寰幆璇诲彇娑堟伅 - while (true) - { + while (true) { bool ret = conn.gets(buf); - if (ret == false) - { + if (ret == false) { printf("%s(%d): user: %s, gets error %s, fiber: %d\r\n", __FUNCTION__, __LINE__, client->get_name(), acl::last_serror(), acl_fiber_self()); // 瀹㈡埛绔鍑 - if (client->exiting()) - { + if (client->exiting()) { printf("----exiting now----\r\n"); break; } - if (errno == FIBER_ETIME) - { - if (conn.write("ping\r\n") == -1) - { + if (errno == FIBER_ETIME) { + if (conn.write("ping\r\n") == -1) { printf("ping error\r\n"); break; } - } - else if (errno == EAGAIN) + } else if (errno == EAGAIN) { printf("EAGAIN\r\n"); - else { + } else { printf("gets error: %d, %s\r\n", errno, acl::last_serror()); break; @@ -330,42 +306,41 @@ static void fiber_reader(user_client* client) continue; } - if (buf.empty()) + if (buf.empty()) { continue; + } // 鍒嗘瀽瀹㈡埛绔彂閫佺殑娑堟伅锛屼氦鐢变笉鍚岀殑澶勭悊杩囩▼ std::vector& tokens = buf.split2("|"); // 鏈鎴风瑕佹眰閫鍑 - if (tokens[0] == "quit" || tokens[0] == "exit") - { + if (tokens[0] == "quit" || tokens[0] == "exit") { conn.write("Bye!\r\n"); break; } // 鏈鎴风鍙戦佽亰澶╂秷鎭 - else if (tokens[0] == "chat") - { - if (client_chat(client, tokens) == false) + else if (tokens[0] == "chat") { + if (client_chat(client, tokens) == false) { break; + } } // 鏈鎴风韪㈠嚭鍏跺畠瀹㈡埛绔 - else if (tokens[0] == "kick") - { - if (client_kick(client, tokens) == false) + else if (tokens[0] == "kick") { + if (client_kick(client, tokens) == false) { break; + } } // 瑕佹眰鏁翠釜鏈嶅姟杩涚▼閫鍑 - else if (tokens[0] == "stop") - { + else if (tokens[0] == "stop") { stop = true; break; - } - else + } else { printf("invalid data: %s, cmd: [%s]\r\n", buf.c_str(), tokens[0].c_str()); + } } printf(">>%s(%d), user: %s, logout\r\n", __FUNCTION__, __LINE__, @@ -378,8 +353,7 @@ static void fiber_reader(user_client* client) printf("----__nreader: %d-----\r\n", --__nreader); - if (stop) - { + if (stop) { int dumy = 1; // 濡傛灉瑕佸仠姝㈡湇鍔★紝鍒欓氱煡鐩戞帶鍗忕▼ __chan_monitor.put(dumy); @@ -394,7 +368,7 @@ static void fiber_client(acl::socket_stream* conn) user_client* client = new user_client(*conn); // 鍒涘缓浠庡鎴风杩炴帴璇诲彇鏁版嵁鐨勫崗绋 - go_stack(STACK_SIZE) [=] { + go_share(STACK_SIZE) [=] { __nreader++; fiber_reader(client); }; @@ -418,18 +392,16 @@ static void fiber_accept(acl::server_socket& ss) { __fiber_accept = acl_fiber_running(); - while (true) - { + while (true) { // 绛夊緟鎺ユ敹瀹㈡埛绔繛鎺 acl::socket_stream* conn = ss.accept(); - if (conn == NULL) - { + if (conn == NULL) { printf("accept error %s\r\n", acl::last_serror()); break; } // 鍒涘缓澶勭悊瀹㈡埛绔璞$殑鍗忕▼ - go_stack(STACK_SIZE) [=] { + go_share(STACK_SIZE) [=] { __nclients++; fiber_client(conn); }; @@ -455,7 +427,7 @@ static void fiber_monitor(void) static void usage(const char *procname) { printf("usage: %s -h [help]\r\n" - " -s listen_addr\r\n" + " -s listen_addr[default: 127.0.0.1:9002]\r\n" " -r rw_timeout\r\n" , procname); } @@ -485,9 +457,9 @@ int main(int argc, char *argv[]) } acl::server_socket ss; // 鐩戝惉濂楁帴鍙e璞 + // 鐩戝惉鎸囧畾鍦板潃 - if (ss.open(addr) == false) - { + if (ss.open(addr) == false) { printf("listen %s error %s\r\n", addr, acl::last_serror()); return 1; } @@ -495,7 +467,7 @@ int main(int argc, char *argv[]) printf("listen %s ok\r\n", addr); // 鍒涘缓鏈嶅姟鍣ㄦ帴鏀惰繛鎺ュ崗绋 - go[&] { + go_share(8192)[&] { fiber_accept(ss); }; diff --git a/lib_fiber/samples/chat/server/user_client.h b/lib_fiber/samples/chat/server/user_client.h index c94f40f80..db1e00a40 100644 --- a/lib_fiber/samples/chat/server/user_client.h +++ b/lib_fiber/samples/chat/server/user_client.h @@ -128,14 +128,22 @@ public: void wait_exit(void) { +#ifdef USE_CHAN int mtype; chan_exit_.pop(mtype); +#else + chan_exit_.pop(); +#endif } void notify_exit(void) { +#ifdef USE_CHAN int mtype = MT_LOGOUT; chan_exit_.put(mtype); +#else + chan_exit_.push(NULL); +#endif } void set_reader(void) @@ -152,10 +160,11 @@ private: acl::socket_stream& conn_; #ifdef USE_CHAN acl::channel chan_msg_; + acl::channel chan_exit_; #else acl::fiber_sem sem_msg_; + acl::fiber_tbox chan_exit_; #endif - acl::channel chan_exit_; acl::string name_; std::list messages_; bool exiting_ = false; diff --git a/lib_fiber/samples/chat/server/valgrind.sh b/lib_fiber/samples/chat/server/valgrind.sh old mode 100644 new mode 100755 diff --git a/lib_fiber/samples/server3/main.cpp b/lib_fiber/samples/server3/main.cpp index a2cfece9e..5a9f50fd4 100644 --- a/lib_fiber/samples/server3/main.cpp +++ b/lib_fiber/samples/server3/main.cpp @@ -49,7 +49,8 @@ static void test_file(void) { class fiber_server : public acl::fiber { public: - fiber_server(acl::server_socket& ss) : ss_(ss) {} + fiber_server(acl::server_socket& ss, bool stack_share) + : ss_(ss), stack_share_(stack_share) {} ~fiber_server(void) {} protected: @@ -68,45 +69,50 @@ protected: // create one fiber for one connection fiber_client* fc = new fiber_client(conn); // start the client fiber - fc->start(); + fc->start(stack_share_ ? 8000 : 32000, stack_share_); } } private: acl::server_socket& ss_; + bool stack_share_; }; class thread_server : public acl::thread { public: - thread_server(acl::server_socket& ss) : ss_(ss) {} + thread_server(acl::server_socket& ss, bool stack_share) + : ss_(ss), stack_share_(stack_share) {} ~thread_server(void) {} protected: void* run(void) { - fiber_server fs(ss_); - fs.start(); // start listen fiber + fiber_server fs(ss_, stack_share_); + + // start listen fiber + fs.start(stack_share_ ? 8000 : 32000, stack_share_); acl::fiber::schedule(); // start fiber schedule - return NULL; } private: acl::server_socket& ss_; + bool stack_share_; }; static void usage(const char* procname) { - printf("usage: %s -h [help] -s listen_addr -t nthreads\r\n", procname); + printf("usage: %s -h [help] -s listen_addr -t nthreads -S [if use stack sharing]\r\n", procname); } int main(int argc, char *argv[]) { int ch, nthreads = 1; + bool stack_share = false; acl::acl_cpp_init(); acl::string addr("127.0.0.1:9006"); acl::log::stdout_open(true); - while ((ch = getopt(argc, argv, "hs:t:")) > 0) { + while ((ch = getopt(argc, argv, "hs:t:S")) > 0) { switch (ch) { case 'h': usage(argv[0]); @@ -117,6 +123,9 @@ int main(int argc, char *argv[]) { case 't': nthreads = atoi(optarg); break; + case 'S': + stack_share = true; + break; default: break; } @@ -132,7 +141,7 @@ int main(int argc, char *argv[]) { std::vector threads; for (int i = 0; i < nthreads; i++) { - acl::thread* thread = new thread_server(ss); + acl::thread* thread = new thread_server(ss, stack_share); thread->start(); threads.push_back(thread); }