From bcb2f1b85c26bcedb0b6eb9711650d2428960db2 Mon Sep 17 00:00:00 2001 From: ubuntu14 Date: Sun, 27 Nov 2016 21:03:24 +0800 Subject: [PATCH] fixed one bug in fiber.c for memory leak add some samples for lib_liber --- lib_acl/Makefile | 1 + lib_acl/changes.txt | 3 + lib_acl/include/stdlib/acl_malloc.h | 2 + lib_acl/src/stdlib/iostuff/acl_read_wait.c | 2 + .../src/stdlib/memory/acl_default_malloc.c | 63 ++- lib_acl_cpp/changes.txt | 3 + .../include/acl_cpp/redis/redis_string.hpp | 6 +- lib_acl_cpp/src/stream/istream.cpp | 86 ++-- lib_fiber/c/Makefile | 4 +- lib_fiber/c/include/fiber/lib_fiber.h | 32 ++ lib_fiber/c/src/channel.c | 23 +- lib_fiber/c/src/event.c | 3 +- lib_fiber/c/src/fiber.c | 135 ++++-- lib_fiber/c/src/fiber.h | 2 +- lib_fiber/c/src/fiber_io.c | 6 +- lib_fiber/c/src/fiber_lock.c | 10 +- lib_fiber/c/src/fiber_sem.c | 2 +- lib_fiber/c/src/hook_io.c | 148 +++++- lib_fiber/c/src/hook_net.c | 42 +- lib_fiber/c/src/master/fiber_server.c | 2 +- lib_fiber/changes.txt | 8 + lib_fiber/samples/chat/Makefile | 21 +- lib_fiber/samples/chat/Makefile.in | 126 +++++ lib_fiber/samples/chat/client/Makefile | 2 + lib_fiber/samples/chat/client/main.cpp | 213 +++++++++ lib_fiber/samples/chat/client/stdafx.cpp | 8 + lib_fiber/samples/chat/client/stdafx.h | 20 + lib_fiber/samples/chat/client/user_client.h | 63 +++ lib_fiber/samples/chat/client/valgrind.sh | 3 + lib_fiber/samples/chat/connect/Makefile | 2 + lib_fiber/samples/chat/connect/main.cpp | 113 +++++ lib_fiber/samples/chat/demo/Makefile | 2 + lib_fiber/samples/chat/demo/main.cpp | 330 +++++++++++++ lib_fiber/samples/chat/demo/stdafx.cpp | 8 + lib_fiber/samples/chat/demo/stdafx.h | 20 + lib_fiber/samples/chat/demo/user_client.h | 80 ++++ lib_fiber/samples/chat/reader/Makefile | 2 + lib_fiber/samples/chat/reader/main.cpp | 180 +++++++ lib_fiber/samples/chat/reader/stdafx.cpp | 8 + lib_fiber/samples/chat/reader/stdafx.h | 20 + lib_fiber/samples/chat/reader/user_client.h | 56 +++ lib_fiber/samples/chat/reader/valgrind.sh | 3 + lib_fiber/samples/chat/server/Makefile | 2 + lib_fiber/samples/chat/server/main.cpp | 446 ++++++++++++++++++ lib_fiber/samples/chat/server/stdafx.cpp | 8 + lib_fiber/samples/chat/server/stdafx.h | 20 + lib_fiber/samples/chat/server/user_client.h | 165 +++++++ lib_fiber/samples/chat/server/valgrind.sh | 4 + lib_fiber/samples/chat/writer/Makefile | 2 + lib_fiber/samples/chat/writer/main.cpp | 192 ++++++++ lib_fiber/samples/chat/writer/stdafx.cpp | 8 + lib_fiber/samples/chat/writer/stdafx.h | 20 + lib_fiber/samples/chat/writer/user_client.h | 63 +++ lib_fiber/samples/chat/writer/valgrind.sh | 3 + lib_fiber/samples/redis_channel/main.cpp | 50 +- lib_fiber/samples/redis_channel/valgrind.sh | 3 + lib_fiber/samples/timer_server/main.c | 10 +- 57 files changed, 2735 insertions(+), 124 deletions(-) create mode 100644 lib_fiber/samples/chat/Makefile.in create mode 100644 lib_fiber/samples/chat/client/Makefile create mode 100644 lib_fiber/samples/chat/client/main.cpp create mode 100644 lib_fiber/samples/chat/client/stdafx.cpp create mode 100644 lib_fiber/samples/chat/client/stdafx.h create mode 100644 lib_fiber/samples/chat/client/user_client.h create mode 100644 lib_fiber/samples/chat/client/valgrind.sh create mode 100644 lib_fiber/samples/chat/connect/Makefile create mode 100644 lib_fiber/samples/chat/connect/main.cpp create mode 100644 lib_fiber/samples/chat/demo/Makefile create mode 100644 lib_fiber/samples/chat/demo/main.cpp create mode 100644 lib_fiber/samples/chat/demo/stdafx.cpp create mode 100644 lib_fiber/samples/chat/demo/stdafx.h create mode 100644 lib_fiber/samples/chat/demo/user_client.h create mode 100644 lib_fiber/samples/chat/reader/Makefile create mode 100644 lib_fiber/samples/chat/reader/main.cpp create mode 100644 lib_fiber/samples/chat/reader/stdafx.cpp create mode 100644 lib_fiber/samples/chat/reader/stdafx.h create mode 100644 lib_fiber/samples/chat/reader/user_client.h create mode 100644 lib_fiber/samples/chat/reader/valgrind.sh create mode 100644 lib_fiber/samples/chat/server/Makefile create mode 100644 lib_fiber/samples/chat/server/main.cpp create mode 100644 lib_fiber/samples/chat/server/stdafx.cpp create mode 100644 lib_fiber/samples/chat/server/stdafx.h create mode 100644 lib_fiber/samples/chat/server/user_client.h create mode 100644 lib_fiber/samples/chat/server/valgrind.sh create mode 100644 lib_fiber/samples/chat/writer/Makefile create mode 100644 lib_fiber/samples/chat/writer/main.cpp create mode 100644 lib_fiber/samples/chat/writer/stdafx.cpp create mode 100644 lib_fiber/samples/chat/writer/stdafx.h create mode 100644 lib_fiber/samples/chat/writer/user_client.h create mode 100644 lib_fiber/samples/chat/writer/valgrind.sh create mode 100644 lib_fiber/samples/redis_channel/valgrind.sh diff --git a/lib_acl/Makefile b/lib_acl/Makefile index 09ba75cc8..4a26025b3 100644 --- a/lib_acl/Makefile +++ b/lib_acl/Makefile @@ -30,6 +30,7 @@ CFLAGS = -c -g -W \ -D_POSIX_PTHREAD_SEMANTICS \ -DACL_PREPARE_COMPILE \ -Winvalid-pch +#-DDEBUG_MEM #-DUSE_EPOLL \ #-Wno-tautological-compare \ #-Wno-invalid-source-encoding \ diff --git a/lib_acl/changes.txt b/lib_acl/changes.txt index 0a2f94dfb..6520d19e3 100644 --- a/lib_acl/changes.txt +++ b/lib_acl/changes.txt @@ -1,5 +1,8 @@ 修改历史列表: +569) 2016.11.27 +569.1) feature: 增加了内存调试函数 acl_default_meminfo 用来显示内存分配及释放情况 + 568) 2016.11.18 568.1) bugfix: acl_vstream.c 中的函数 acl_vstream_fdopen 设置 IO 句柄为非阻塞 模式是存在问题的,上层应用应该自己决定是否设为非阻塞模式 diff --git a/lib_acl/include/stdlib/acl_malloc.h b/lib_acl/include/stdlib/acl_malloc.h index ec61dd1b5..d0b0715b7 100644 --- a/lib_acl/include/stdlib/acl_malloc.h +++ b/lib_acl/include/stdlib/acl_malloc.h @@ -68,6 +68,8 @@ ACL_API int acl_mempool_total_allocated(void); ACL_API void acl_default_memstat(const char *filename, int line, void *ptr, size_t *len, size_t *real_len); +ACL_API void acl_default_meminfo(void); + /** * 设置内存分配最大报警值,即当调用者分配的内存大小达到此报警值后,内部会自动 * 记录报警日志,同时将调用堆栈打印至日志中;内部缺少值是 100000000 diff --git a/lib_acl/src/stdlib/iostuff/acl_read_wait.c b/lib_acl/src/stdlib/iostuff/acl_read_wait.c index b3427885c..a2df0b946 100644 --- a/lib_acl/src/stdlib/iostuff/acl_read_wait.c +++ b/lib_acl/src/stdlib/iostuff/acl_read_wait.c @@ -268,10 +268,12 @@ int acl_read_wait(ACL_SOCKET fd, int timeout) acl_last_serror(), (int) fd); return -1; case 0: + /* acl_msg_warn("%s(%d), %s: poll timeout: %s, fd: %d, " "delay: %d, spent: %ld", __FILE__, __LINE__, myname, acl_last_serror(), fd, delay, (long) (time(NULL) - begin)); + */ acl_set_error(ACL_ETIMEDOUT); return -1; default: diff --git a/lib_acl/src/stdlib/memory/acl_default_malloc.c b/lib_acl/src/stdlib/memory/acl_default_malloc.c index 7b81da49b..25f94bab3 100644 --- a/lib_acl/src/stdlib/memory/acl_default_malloc.c +++ b/lib_acl/src/stdlib/memory/acl_default_malloc.c @@ -126,6 +126,17 @@ static char empty_string[] = ""; } while (0) #endif /* ACL_WINDOWS */ +#ifdef DEBUG_MEM +static __thread int __nmalloc = 0; +static __thread int __ncalloc = 0; +static __thread int __nrealloc = 0; +static __thread int __nfree = 0; +static __thread int __nstrdup = 0; +static __thread int __nstrndup = 0; +static __thread int __nmemdup = 0; +static __thread ssize_t __nsize = 0; +#endif + void acl_default_memstat(const char *filename, int line, void *ptr, size_t *len, size_t *real_len) { @@ -145,6 +156,17 @@ void acl_default_memstat(const char *filename, int line, *real_len = SPACE_FOR(*len); } +void acl_default_meminfo(void) +{ +#ifdef DEBUG_MEM + printf("%s(%d): __nmalloc: %d, __ncalloc: %d, __nrealloc: %d, " + "__nfree: %d, diff: %d, __nsize: %ld\r\n", + __FUNCTION__, __LINE__, __nmalloc, __ncalloc, __nrealloc, + __nfree, __nmalloc + __nrealloc - __nfree, + (unsigned long) __nsize); +#endif +} + void acl_default_set_memlimit(size_t len) { acl_assert(len > 0); @@ -163,6 +185,7 @@ void *acl_default_malloc(const char *filename, int line, size_t len) char *ptr; MBLOCK *real_ptr; const char *pname = NULL; + #if 0 printf("%s:%d, len: %d\r\n", filename, line, (int) len); acl_trace_info(); @@ -190,6 +213,11 @@ void *acl_default_malloc(const char *filename, int line, size_t len) acl_trace_info(); } +#ifdef DEBUG_MEM + __nmalloc++; + __nsize += new_len; +#endif + #ifdef _USE_GLIB if ((real_ptr = (MBLOCK *) g_malloc(new_len)) == 0) { acl_msg_error("%s(%d)->%s: new_len: %d, g_malloc error(%s)", @@ -204,11 +232,8 @@ void *acl_default_malloc(const char *filename, int line, size_t len) return 0; } #endif - CHECK_OUT_PTR(ptr, real_ptr, len); -#if 0 - memset(ptr, FILLER, len); -#endif + CHECK_OUT_PTR(ptr, real_ptr, len); return ptr; } @@ -218,6 +243,9 @@ void *acl_default_calloc(const char *filename, int line, void *ptr; int n; +#ifdef DEBUG_MEM + __ncalloc++; +#endif n = (int) (nmemb * size); ptr = acl_default_malloc(filename, line, n); memset(ptr, FILLER, n); @@ -266,6 +294,11 @@ void *acl_default_realloc(const char *filename, int line, acl_trace_info(); } +#ifdef DEBUG_MEM + __nrealloc++; + __nsize += new_len - old_len; +#endif + #ifdef _USE_GLIB if ((real_ptr = (MBLOCK *) g_realloc((char *) real_ptr, new_len)) == 0) acl_msg_fatal("%s(%d)->%s: realloc: insufficient memory: %s", @@ -310,6 +343,12 @@ void acl_default_free(const char *filename, int line, void *ptr) /* memset((char *) real_ptr, FILLER, SPACE_FOR(len)); */ + +#ifdef DEBUG_MEM + __nfree++; + __nsize -= len; +#endif + #ifdef _USE_GLIB g_free(real_ptr); #else @@ -340,6 +379,11 @@ char *acl_default_strdup(const char *filename, int line, const char *str) if (*str == 0) return (char *) empty_string; #endif + +#ifdef DEBUG_MEM + __nstrdup++; +#endif + return strcpy(acl_default_malloc(pname, line, strlen(str) + 1), str); } @@ -366,8 +410,14 @@ char *acl_default_strndup(const char *filename, int line, if (*str == 0) return (char *) empty_string; #endif + if ((cp = memchr(str, 0, len)) != 0) len = cp - str; + +#ifdef DEBUG_MEM + __nstrndup++; +#endif + result = memcpy(acl_default_malloc(pname, line, len + 1), str, len); result[len] = 0; return result; @@ -389,5 +439,10 @@ void *acl_default_memdup(const char *filename, int line, if (ptr == 0) acl_msg_fatal("%s(%d)->%s: null pointer argument", pname, line, myname); + +#ifdef DEBUG_MEM + __nmemdup++; +#endif + return memcpy(acl_default_malloc(pname, line, len), ptr, len); } diff --git a/lib_acl_cpp/changes.txt b/lib_acl_cpp/changes.txt index 4535f562d..eebf99fa3 100644 --- a/lib_acl_cpp/changes.txt +++ b/lib_acl_cpp/changes.txt @@ -1,6 +1,9 @@ 修改历史列表: ----------------------------------------------------------------------- +453) 2016.11.24 +453.1) bugfix: istream.cpp 中的 gets 函数不应当读超时时设置 eof_ 标志位 + 452) 2016.11.18 452.1) bugfix: redis_list::bpop 及 redis_pubsub::get_message 的超时时间应 不受 socket_stream 里的超时时间影响 diff --git a/lib_acl_cpp/include/acl_cpp/redis/redis_string.hpp b/lib_acl_cpp/include/acl_cpp/redis/redis_string.hpp index e6c7837e1..e04f0c860 100644 --- a/lib_acl_cpp/include/acl_cpp/redis/redis_string.hpp +++ b/lib_acl_cpp/include/acl_cpp/redis/redis_string.hpp @@ -130,8 +130,10 @@ public: * get the value of a key * @param key {const char*} 字符串对象的 key * the key of a string - * @param buf {string&} 操作成功后存储字符串对象的值 - * store the value of a key after GET executed correctly + * @param buf {string&} 操作成功后存储字符串对象的值,如果返回 true 且 + * 该缓冲区为空则表示对应 key 不存在 + * store the value of a key after GET executed correctly, key not + * exist if the buf is empty when return true * @return {bool} 操作是否成功,返回 false 表示出错或 key 非字符串对象 * if the GET was executed correctly, false if error happened or * is is not a string of the key diff --git a/lib_acl_cpp/src/stream/istream.cpp b/lib_acl_cpp/src/stream/istream.cpp index 75e1eb794..c4858b64c 100644 --- a/lib_acl_cpp/src/stream/istream.cpp +++ b/lib_acl_cpp/src/stream/istream.cpp @@ -7,6 +7,21 @@ namespace acl { +#if ACL_EWOULDBLOCK == ACL_EAGAIN +# define CHECK_ERROR(e) do { \ + if (e != ACL_EINTR && e != ACL_ETIMEDOUT && e != ACL_EWOULDBLOCK) \ + eof_ = true; \ +} while (0) +#else +# define CHECK_ERROR(e) do { \ + if (e != ACL_EINTR && e != ACL_ETIMEDOUT \ + && e != ACL_EWOULDBLOCK && e != ACL_EAGAIN) \ + { \ + eof_ = true; \ + } \ +} while (0) +#endif + int istream::read(void* buf, size_t size, bool loop /* = true */) { int ret; @@ -16,7 +31,7 @@ int istream::read(void* buf, size_t size, bool loop /* = true */) ret = acl_vstream_read(stream_, buf, size); if (ret == ACL_VSTREAM_EOF) { - eof_ = true; + CHECK_ERROR(errno); return -1; } else return ret; @@ -28,7 +43,7 @@ bool istream::readtags(void *buf, size_t* size, const char *tag, size_t taglen) if (ret == ACL_VSTREAM_EOF) { *size = 0; - eof_ = true; + CHECK_ERROR(errno); return false; } @@ -40,25 +55,6 @@ bool istream::readtags(void *buf, size_t* size, const char *tag, size_t taglen) return false; } -bool istream::gets(void* buf, size_t* size, bool nonl /* = true */) -{ - int ret; - if (nonl) - ret = acl_vstream_gets_nonl(stream_, buf, *size); - else - ret = acl_vstream_gets(stream_, buf, *size); - if (ret == ACL_VSTREAM_EOF) { - eof_ = true; - *size = 0; - return false; - } else { - *size = ret; - if ((stream_->flag & ACL_VSTREAM_FLAG_TAGYES)) - return true; - return false; - } -} - bool istream::read(acl_int64& n, bool loop /* = true */) { return read(&n, sizeof(n), loop) == (int) sizeof(n); @@ -113,6 +109,25 @@ bool istream::read(string* s, size_t max, bool loop /* = true */) return read(*s, max, loop); } +bool istream::gets(void* buf, size_t* size, bool nonl /* = true */) +{ + int ret; + if (nonl) + ret = acl_vstream_gets_nonl(stream_, buf, *size); + else + ret = acl_vstream_gets(stream_, buf, *size); + if (ret == ACL_VSTREAM_EOF) { + CHECK_ERROR(errno); + *size = 0; + return false; + } else { + *size = ret; + if ((stream_->flag & ACL_VSTREAM_FLAG_TAGYES)) + return true; + return false; + } +} + bool istream::gets(string& s, bool nonl /* = true */, size_t max /* = 0 */) { char buf[8192]; @@ -132,8 +147,12 @@ bool istream::gets(string& s, bool nonl /* = true */, size_t max /* = 0 */) return true; } - if (size > 0) - s.append(buf, size); + + if (size == 0) + break; + + printf(">>>size: %d\r\n", (int) size); + s.append(buf, size); } return false; @@ -151,8 +170,10 @@ bool istream::gets(string& s, bool nonl /* = true */, size_t max /* = 0 */) return true; } - if (size > 0) - s.append(buf, size); + if (size == 0) + break; + + s.append(buf, size); max -= size; if (max == 0) @@ -187,9 +208,13 @@ bool istream::readtags(string& s, const string& tag) s.append(buf, size); return true; } - if (size > 0) - s.append(buf, size); + + if (size == 0) + break; + + s.append(buf, size); } + return false; } @@ -203,15 +228,15 @@ int istream::getch() { int ret = acl_vstream_getc(stream_); if (ret == ACL_VSTREAM_EOF) - eof_ = true; + CHECK_ERROR(errno); return ret; } int istream::ugetch(int ch) { - int ret = acl_vstream_ungetc(stream_, ch); + int ret = acl_vstream_ungetc(stream_, ch); if (ret == ACL_VSTREAM_EOF) - eof_ = true; + CHECK_ERROR(errno); return ret; } @@ -242,6 +267,7 @@ bool istream::gets_peek(string& buf, bool nonl /* = true */, eof_ = true; } } + return ready ? true : false; } diff --git a/lib_fiber/c/Makefile b/lib_fiber/c/Makefile index 4a7cb2c2e..01661ae36 100644 --- a/lib_fiber/c/Makefile +++ b/lib_fiber/c/Makefile @@ -18,8 +18,8 @@ CFLAGS = -c -g -W \ -D_POSIX_PTHREAD_SEMANTICS \ -D_USE_FAST_MACRO \ -Wno-long-long \ --DUSE_JMP -#-DUSE_VALGRIND +-DUSE_JMP \ +-DUSE_VALGRIND #-Wno-clobbered #-O3 diff --git a/lib_fiber/c/include/fiber/lib_fiber.h b/lib_fiber/c/include/fiber/lib_fiber.h index 0dec52f29..429cef5cf 100644 --- a/lib_fiber/c/include/fiber/lib_fiber.h +++ b/lib_fiber/c/include/fiber/lib_fiber.h @@ -26,6 +26,18 @@ void acl_fiber_hook_api(int onoff); ACL_FIBER *acl_fiber_create(void (*fn)(ACL_FIBER *, void *), void *arg, size_t size); +/** + * 杩斿洖褰撳墠绾跨▼涓浜庢秷浜$姸鎬佺殑鍗忕▼鏁 + * @retur {int} + */ +int acl_fiber_ndead(void); + +/** + * 杩斿洖褰撳墠姝e湪杩愯鐨勫崗绋嬪璞 + * @retur {ACL_FIBER*} 杩斿洖 NULL 琛ㄧず褰撳墠娌℃湁姝e湪杩愯鐨勫崗绋 + */ +ACL_FIBER *acl_fiber_running(void); + /** * 鑾峰緱鎵缁欏崗绋嬬殑鍗忕▼ ID 鍙 * @param fiber {const ACL_FIBER*} 鐢 acl_fiber_create 鍒涘缓鐨勫崗绋嬪璞 @@ -53,6 +65,14 @@ void acl_fiber_set_errno(ACL_FIBER *fiber, int errnum); */ int acl_fiber_errno(ACL_FIBER *fiber); +/** + * 鏄惁淇濇寔鎵鎸囧畾鍗忕▼鐨勯敊璇彿锛屽綋璁剧疆涓衡滀繚鎸佲濆悗锛屽垯璇ュ崗绋嬩粎淇濇寔褰撳墠鐘舵佷笅鐨 + * 閿欒鍙凤紝涔嬪悗璇ュ崗绋嬬殑閿欒鍙 errno 灏嗕笉鍐嶆敼鍙橈紝璧板埌鍐嶆璋冪敤鏈嚱鏁板彇娑堜繚鎸 + * @param fiber {ACL_FIBER*} 鍗忕▼瀵硅薄 + * @param yesno {int} 鏄惁淇濇寔 + */ +void acl_fiber_keep_errno(ACL_FIBER *fiber, int yesno); + /** * 鑾峰緱鎸囧畾鍗忕▼鐨勫綋鍓嶇姸鎬 * @param fiber {const ACL_FIBER*} 鍗忕▼瀵硅薄 @@ -60,6 +80,18 @@ int acl_fiber_errno(ACL_FIBER *fiber); */ int acl_fiber_status(const ACL_FIBER *fiber); +/** + * 閫氱煡鎸囧畾鍗忕▼閫鍑 + * @param fiber {const ACL_FIBER*} 鍗忕▼瀵硅薄 + */ +void acl_fiber_kill(ACL_FIBER *fiber); + +/** + * 妫鏌ユ湰鍗忕▼鏄惁琚叾瀹冨崗绋嬮氱煡閫鍑 + * @param fiber {const ACL_FIBER*} 鍗忕▼瀵硅薄 + */ +int acl_fiber_killed(ACL_FIBER *fiber); + /** * 灏嗗綋鍓嶈繍琛岀殑鍗忕▼鎸傝捣锛岀敱璋冨害鍣ㄩ夋嫨涓嬩竴涓渶瑕佽繍琛岀殑鍗忕▼ * @return {int} diff --git a/lib_fiber/c/src/channel.c b/lib_fiber/c/src/channel.c index 9582766f5..97aa6103c 100644 --- a/lib_fiber/c/src/channel.c +++ b/lib_fiber/c/src/channel.c @@ -229,7 +229,7 @@ static int channel_alt(FIBER_ALT a[]) n = i; canblock = a[i].op == CHANEND; - t = fiber_running(); + t = acl_fiber_running(); for (i = 0; i < n; i++) { a[i].fiber = t; @@ -297,6 +297,9 @@ static int channel_alt(FIBER_ALT a[]) acl_fiber_switch(); + if (acl_fiber_killed(t)) + return -1; + /* * the guy who ran the op took care of dequeueing us * and then set a[0].alt to the one that was executed. @@ -345,9 +348,10 @@ int acl_channel_sendp(ACL_CHANNEL *c, void *v) void *acl_channel_recvp(ACL_CHANNEL *c) { - void *v; + void *v = NULL; - channel_op(c, CHANRCV, (void *) &v, 1); + if (channel_op(c, CHANRCV, (void *) &v, 1) < 0) + return NULL; return v; } @@ -358,9 +362,10 @@ int acl_channel_sendp_nb(ACL_CHANNEL *c, void *v) void *acl_channel_recvp_nb(ACL_CHANNEL *c) { - void *v; + void *v = NULL; - channel_op(c, CHANRCV, (void *) &v, 0); + if (channel_op(c, CHANRCV, (void *) &v, 0) < 0) + return NULL; return v; } @@ -371,9 +376,10 @@ int acl_channel_sendul(ACL_CHANNEL *c, unsigned long val) unsigned long acl_channel_recvul(ACL_CHANNEL *c) { - unsigned long val; + unsigned long val = (unsigned long) -1; - channel_op(c, CHANRCV, &val, 1); + if (channel_op(c, CHANRCV, &val, 1) < 0) + return (unsigned long) -1; return val; } @@ -386,6 +392,7 @@ unsigned long acl_channel_recvul_nb(ACL_CHANNEL *c) { unsigned long val; - channel_op(c, CHANRCV, &val, 0); + if (channel_op(c, CHANRCV, &val, 0) < 0) + return (unsigned long) -1; return val; } diff --git a/lib_fiber/c/src/event.c b/lib_fiber/c/src/event.c index 844ba3e97..7f775b9b6 100644 --- a/lib_fiber/c/src/event.c +++ b/lib_fiber/c/src/event.c @@ -71,7 +71,8 @@ static int check_fdtype(int fd) struct stat s; if (fstat(fd, &s) < 0) { - acl_msg_info("fd: %d fstat error", fd); + acl_msg_info("%s(%d), %s: fd: %d fstat error %s", __FILE__, + __LINE__, __FUNCTION__, fd, acl_last_serror()); return -1; } diff --git a/lib_fiber/c/src/fiber.c b/lib_fiber/c/src/fiber.c index 95e8a8d3f..c153fafe3 100644 --- a/lib_fiber/c/src/fiber.c +++ b/lib_fiber/c/src/fiber.c @@ -10,6 +10,8 @@ #include "event_epoll.h" /* just for hook_epoll */ #include "fiber.h" +#define MAX_CACHE 100 + typedef int *(*errno_fn)(void); typedef int (*fcntl_fn)(int, int, ...); @@ -21,6 +23,7 @@ typedef struct { ACL_RING dead; /* dead fiber queue */ ACL_FIBER **fibers; size_t size; + size_t slot; int exitcode; ACL_FIBER *running; ACL_FIBER original; @@ -39,6 +42,7 @@ static acl_pthread_key_t __fiber_key; /* forward declare */ static ACL_FIBER *fiber_alloc(void (*fn)(ACL_FIBER *, void *), void *arg, size_t size); +static void fiber_free(ACL_FIBER *fiber); void acl_fiber_hook_api(int onoff) { @@ -99,6 +103,7 @@ static void fiber_check(void) #endif __thread_fiber->fibers = NULL; __thread_fiber->size = 0; + __thread_fiber->slot = 0; __thread_fiber->idgen = 0; __thread_fiber->count = 0; @@ -174,10 +179,17 @@ void acl_fiber_set_errno(ACL_FIBER *fiber, int errnum) int acl_fiber_errno(ACL_FIBER *fiber) { - fiber->flag |= FIBER_F_SAVE_ERRNO; return fiber->errnum; } +void acl_fiber_keep_errno(ACL_FIBER *fiber, int yesno) +{ + if (yesno) + fiber->flag |= FIBER_F_SAVE_ERRNO; + else + fiber->flag &= ~FIBER_F_SAVE_ERRNO; +} + void fiber_save_errno(void) { ACL_FIBER *curr; @@ -189,14 +201,14 @@ void fiber_save_errno(void) curr = &__thread_fiber->original; if (curr->flag & FIBER_F_SAVE_ERRNO) { - curr->flag &= ~FIBER_F_SAVE_ERRNO; + //curr->flag &= ~FIBER_F_SAVE_ERRNO; return; } if (__sys_errno != NULL) - curr->errnum = *__sys_errno(); + acl_fiber_set_errno(curr, *__sys_errno()); else - curr->errnum = errno; + acl_fiber_set_errno(curr, errno); } #if defined(__x86_64__) @@ -286,8 +298,45 @@ void fiber_save_errno(void) siglongjmp(ctx, 1) #endif +static void fiber_kick(int max) +{ + ACL_RING *head; + ACL_FIBER *fiber; + + while (max > 0) { + head = acl_ring_pop_head(&__thread_fiber->dead); + if (head == NULL) + break; + fiber = ACL_RING_TO_APPL(head, ACL_FIBER,me); + fiber_free(fiber); + max--; + } +} + static void fiber_swap(ACL_FIBER *from, ACL_FIBER *to) { + if (from->status == FIBER_STATUS_EXITING) { + size_t slot = from->slot; + int n = acl_ring_size(&__thread_fiber->dead); + + /* if the cached dead fibers reached the limit, + * some will be freed + */ + if (n > MAX_CACHE) { + n -= MAX_CACHE; + fiber_kick(n); + } + + if (!from->sys) + __thread_fiber->count--; + + __thread_fiber->fibers[slot] = + __thread_fiber->fibers[--__thread_fiber->slot]; + __thread_fiber->fibers[slot]->slot = slot; + + acl_ring_prepend(&__thread_fiber->dead, &from->me); + } + #ifdef USE_JMP /* use setcontext() for the initial jump, as it allows us to set up * a stack, but continue with longjmp() as it's much faster. @@ -309,12 +358,36 @@ static void fiber_swap(ACL_FIBER *from, ACL_FIBER *to) #endif } -ACL_FIBER *fiber_running(void) +ACL_FIBER *acl_fiber_running(void) { fiber_check(); return __thread_fiber->running; } +void acl_fiber_kill(ACL_FIBER *fiber) +{ + if (fiber == NULL) { + acl_msg_error("%s(%d), %s: fiber NULL", + __FILE__, __LINE__, __FUNCTION__); + return; + } + + fiber->flag |= FIBER_F_KILLED; + + if (fiber == acl_fiber_running()) { + acl_msg_error("%s(%d), %s: fiber-%d kill itself disable!", + __FILE__, __LINE__, __FUNCTION__, acl_fiber_id(fiber)); + return; + } + + acl_fiber_ready(fiber); +} + +int acl_fiber_killed(ACL_FIBER *fiber) +{ + return fiber->flag & FIBER_F_KILLED; +} + void fiber_exit(int exit_code) { fiber_check(); @@ -386,7 +459,12 @@ static void fiber_start(unsigned int x, unsigned int y) fiber_exit(0); } -#define MAX_CACHE 100 +int acl_fiber_ndead(void) +{ + if (__thread_fiber == NULL) + return 0; + return acl_ring_size(&__thread_fiber->dead); +} static void fiber_free(ACL_FIBER *fiber) { @@ -406,24 +484,9 @@ static ACL_FIBER *fiber_alloc(void (*fn)(ACL_FIBER *, void *), sigset_t zero; union cc_arg carg; ACL_RING *head; - size_t n; fiber_check(); - n = acl_ring_size(&__thread_fiber->dead); - - /* if the cached dead fibers reached the limit, some will be freed */ - if (n > MAX_CACHE) { - n -= MAX_CACHE; - while (n > 0) { - head = acl_ring_pop_head(&__thread_fiber->dead); - acl_assert(head != NULL); - fiber = ACL_RING_TO_APPL(head, ACL_FIBER,me); - fiber_free(fiber); - n--; - } - } - #define APPL ACL_RING_TO_APPL /* try to reuse the fiber memory in dead queue */ @@ -441,10 +504,12 @@ static ACL_FIBER *fiber_alloc(void (*fn)(ACL_FIBER *, void *), fiber->arg = arg; fiber->size = size; fiber->id = ++__thread_fiber->idgen; + fiber->flag = 0; carg.p = fiber; - fiber->context = (ucontext_t *) acl_mymalloc(sizeof(ucontext_t)); + if (fiber->context == NULL) + fiber->context = (ucontext_t *) acl_mymalloc(sizeof(ucontext_t)); sigemptyset(&zero); sigprocmask(SIG_BLOCK, &zero, &fiber->context->uc_sigmask); @@ -479,13 +544,17 @@ ACL_FIBER *acl_fiber_create(void (*fn)(ACL_FIBER *, void *), ACL_FIBER *fiber = fiber_alloc(fn, arg, size); __thread_fiber->count++; - if (__thread_fiber->size % 64 == 0) + + if (__thread_fiber->slot >= __thread_fiber->size) { + __thread_fiber->size += 128; __thread_fiber->fibers = (ACL_FIBER **) acl_myrealloc( __thread_fiber->fibers, - (__thread_fiber->size + 64) * sizeof(ACL_FIBER *)); + __thread_fiber->size * sizeof(ACL_FIBER *)); + } + + fiber->slot = __thread_fiber->slot; + __thread_fiber->fibers[__thread_fiber->slot++] = fiber; - fiber->slot = __thread_fiber->size; - __thread_fiber->fibers[__thread_fiber->size++] = fiber; acl_fiber_ready(fiber); return fiber; @@ -498,7 +567,7 @@ int acl_fiber_id(const ACL_FIBER *fiber) int acl_fiber_self(void) { - ACL_FIBER *curr = fiber_running(); + ACL_FIBER *curr = acl_fiber_running(); return acl_fiber_id(curr); } @@ -586,18 +655,6 @@ void acl_fiber_switch(void) acl_assert(current); #endif - if (current->status == FIBER_STATUS_EXITING) { - size_t slot = current->slot; - - if (!current->sys) - __thread_fiber->count--; - - __thread_fiber->fibers[slot] = - __thread_fiber->fibers[--__thread_fiber->size]; - __thread_fiber->fibers[slot]->slot = slot; - acl_ring_append(&__thread_fiber->dead, ¤t->me); - } - head = acl_ring_pop_head(&__thread_fiber->ready); if (head == NULL) { diff --git a/lib_fiber/c/src/fiber.h b/lib_fiber/c/src/fiber.h index dde1de618..a0450e93e 100644 --- a/lib_fiber/c/src/fiber.h +++ b/lib_fiber/c/src/fiber.h @@ -29,6 +29,7 @@ struct ACL_FIBER { int sys; unsigned int flag; #define FIBER_F_SAVE_ERRNO (unsigned) 1 << 0 +#define FIBER_F_KILLED (unsigned) 1 << 1 FIBER_LOCAL **locals; int nlocal; @@ -109,7 +110,6 @@ struct ACL_FIBER_SEM { extern __thread int acl_var_hook_sys_api; /* in fiber_schedule.c */ -ACL_FIBER *fiber_running(void); void fiber_save_errno(void); void fiber_exit(int exit_code); void fiber_system(void); diff --git a/lib_fiber/c/src/fiber_io.c b/lib_fiber/c/src/fiber_io.c index 49ee46f83..586c9973d 100644 --- a/lib_fiber/c/src/fiber_io.c +++ b/lib_fiber/c/src/fiber_io.c @@ -236,7 +236,7 @@ unsigned int acl_fiber_delay(unsigned int milliseconds) ev->timeout = 1; - fiber = fiber_running(); + fiber = acl_fiber_running(); fiber->when = when; acl_ring_detach(&fiber->me); @@ -337,7 +337,7 @@ void fiber_wait_read(int fd) return; } - __thread_fiber->io_fibers[fd] = fiber_running(); + __thread_fiber->io_fibers[fd] = acl_fiber_running(); __thread_fiber->io_count++; acl_fiber_switch(); @@ -362,7 +362,7 @@ void fiber_wait_write(int fd) return; } - __thread_fiber->io_fibers[fd] = fiber_running(); + __thread_fiber->io_fibers[fd] = acl_fiber_running(); __thread_fiber->io_count++; acl_fiber_switch(); diff --git a/lib_fiber/c/src/fiber_lock.c b/lib_fiber/c/src/fiber_lock.c index da963c036..a7aaa4873 100644 --- a/lib_fiber/c/src/fiber_lock.c +++ b/lib_fiber/c/src/fiber_lock.c @@ -22,14 +22,14 @@ static int __lock(ACL_FIBER_MUTEX *l, int block) ACL_FIBER *curr; if (l->owner == NULL){ - l->owner = fiber_running(); + l->owner = acl_fiber_running(); return 1; } if(!block) return 0; - curr = fiber_running(); + curr = acl_fiber_running(); acl_ring_prepend(&l->waiting, &curr->me); acl_fiber_switch(); @@ -105,7 +105,7 @@ static int __rlock(ACL_FIBER_RWLOCK *l, int block) if (!block) return 0; - curr = fiber_running(); + curr = acl_fiber_running(); acl_ring_prepend(&l->rwaiting, &curr->me); acl_fiber_switch(); @@ -127,14 +127,14 @@ static int __wlock(ACL_FIBER_RWLOCK *l, int block) ACL_FIBER *curr; if (l->writer == NULL && l->readers == 0) { - l->writer = fiber_running(); + l->writer = acl_fiber_running(); return 1; } if (!block) return 0; - curr = fiber_running(); + curr = acl_fiber_running(); acl_ring_prepend(&l->wwaiting, &curr->me); acl_fiber_switch(); diff --git a/lib_fiber/c/src/fiber_sem.c b/lib_fiber/c/src/fiber_sem.c index 51ac0a979..654df819e 100644 --- a/lib_fiber/c/src/fiber_sem.c +++ b/lib_fiber/c/src/fiber_sem.c @@ -26,7 +26,7 @@ int acl_fiber_sem_wait(ACL_FIBER_SEM *sem) return sem->num; } - curr = fiber_running(); + curr = acl_fiber_running(); acl_ring_prepend(&sem->waiting, &curr->me); acl_fiber_switch(); diff --git a/lib_fiber/c/src/hook_io.c b/lib_fiber/c/src/hook_io.c index 86254ca85..cb4669ed4 100644 --- a/lib_fiber/c/src/hook_io.c +++ b/lib_fiber/c/src/hook_io.c @@ -209,6 +209,7 @@ inline ssize_t fiber_read(int fd, void *buf, size_t count) { ssize_t ret; EVENT *ev; + ACL_FIBER *me; if (fd < 0) { acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd); @@ -233,8 +234,16 @@ inline ssize_t fiber_read(int fd, void *buf, size_t count) event_clear_readable(ev, fd); ret = __sys_read(fd, buf, count); - if (ret < 0) - fiber_save_errno(); + if (ret >= 0) + return ret; + + fiber_save_errno(); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me)); + return ret; } @@ -242,6 +251,7 @@ inline ssize_t fiber_readv(int fd, const struct iovec *iov, int iovcnt) { ssize_t ret; EVENT *ev; + ACL_FIBER *me; if (fd < 0) { acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd); @@ -266,8 +276,16 @@ inline ssize_t fiber_readv(int fd, const struct iovec *iov, int iovcnt) event_clear_readable(ev, fd); ret = __sys_readv(fd, iov, iovcnt); - if (ret < 0) - fiber_save_errno(); + if (ret >= 0) + return ret; + + fiber_save_errno(); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me)); + return ret; } @@ -275,6 +293,7 @@ inline ssize_t fiber_recv(int sockfd, void *buf, size_t len, int flags) { ssize_t ret; EVENT *ev; + ACL_FIBER *me; if (sockfd < 0) { acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd); @@ -298,9 +317,18 @@ inline ssize_t fiber_recv(int sockfd, void *buf, size_t len, int flags) if (ev) event_clear_readable(ev, sockfd); + ret = __sys_recv(sockfd, buf, len, flags); - if (ret < 0) - fiber_save_errno(); + if (ret >= 0) + return ret; + + fiber_save_errno(); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me)); + return ret; } @@ -309,6 +337,7 @@ inline ssize_t fiber_recvfrom(int sockfd, void *buf, size_t len, int flags, { ssize_t ret; EVENT *ev; + ACL_FIBER *me; if (sockfd < 0) { acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd); @@ -335,8 +364,18 @@ inline ssize_t fiber_recvfrom(int sockfd, void *buf, size_t len, int flags, event_clear_readable(ev, sockfd); ret = __sys_recvfrom(sockfd, buf, len, flags, src_addr, addrlen); - if (ret < 0) - fiber_save_errno(); + if (ret >= 0) + return ret; + + fiber_save_errno(); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) { + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me)); + return -1; + } + return ret; } @@ -344,6 +383,7 @@ inline ssize_t fiber_recvmsg(int sockfd, struct msghdr *msg, int flags) { ssize_t ret; EVENT *ev; + ACL_FIBER *me; if (sockfd < 0) { acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd); @@ -368,8 +408,16 @@ inline ssize_t fiber_recvmsg(int sockfd, struct msghdr *msg, int flags) event_clear_readable(ev, sockfd); ret = __sys_recvmsg(sockfd, msg, flags); - if (ret < 0) - fiber_save_errno(); + if (ret >= 0) + return ret; + + fiber_save_errno(); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me)); + return ret; } @@ -377,6 +425,8 @@ inline ssize_t fiber_recvmsg(int sockfd, struct msghdr *msg, int flags) inline ssize_t fiber_read(int fd, void *buf, size_t count) { + ACL_FIBER *me; + while (1) { ssize_t n = __sys_read(fd, buf, count); @@ -395,11 +445,19 @@ inline ssize_t fiber_read(int fd, void *buf, size_t count) #endif return -1; fiber_wait_read(fd); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, + acl_fiber_id(me)); } } inline ssize_t fiber_readv(int fd, const struct iovec *iov, int iovcnt) { + ACL_FIBER *me; + while (1) { ssize_t n = __sys_readv(fd, iov, iovcnt); @@ -419,11 +477,19 @@ inline ssize_t fiber_readv(int fd, const struct iovec *iov, int iovcnt) return -1; fiber_wait_read(fd); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, + acl_fiber_id(me)); } } inline ssize_t fiber_recv(int sockfd, void *buf, size_t len, int flags) { + ACL_FIBER *me; + while (1) { ssize_t n = __sys_recv(sockfd, buf, len, flags); @@ -443,12 +509,20 @@ inline ssize_t fiber_recv(int sockfd, void *buf, size_t len, int flags) return -1; fiber_wait_read(sockfd); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, + acl_fiber_id(me)); } } inline ssize_t fiber_recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen) { + ACL_FIBER *me; + while (1) { ssize_t n = __sys_recvfrom(sockfd, buf, len, flags, src_addr, addrlen); @@ -469,11 +543,19 @@ inline ssize_t fiber_recvfrom(int sockfd, void *buf, size_t len, int flags, return -1; fiber_wait_read(sockfd); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, + acl_fiber_id(me)); } } inline ssize_t fiber_recvmsg(int sockfd, struct msghdr *msg, int flags) { + ACL_FIBER *me; + while (1) { ssize_t n = __sys_recvmsg(sockfd, msg, flags); @@ -493,6 +575,12 @@ inline ssize_t fiber_recvmsg(int sockfd, struct msghdr *msg, int flags) return -1; fiber_wait_read(sockfd); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, + acl_fiber_id(me)); } } @@ -502,6 +590,8 @@ inline ssize_t fiber_recvmsg(int sockfd, struct msghdr *msg, int flags) inline ssize_t fiber_write(int fd, const void *buf, size_t count) { + ACL_FIBER *me; + while (1) { ssize_t n = __sys_write(fd, buf, count); @@ -521,11 +611,19 @@ inline ssize_t fiber_write(int fd, const void *buf, size_t count) return -1; fiber_wait_write(fd); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, + acl_fiber_id(me)); } } inline ssize_t fiber_writev(int fd, const struct iovec *iov, int iovcnt) { + ACL_FIBER *me; + while (1) { ssize_t n = __sys_writev(fd, iov, iovcnt); @@ -545,11 +643,19 @@ inline ssize_t fiber_writev(int fd, const struct iovec *iov, int iovcnt) return -1; fiber_wait_write(fd); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, + acl_fiber_id(me)); } } inline ssize_t fiber_send(int sockfd, const void *buf, size_t len, int flags) { + ACL_FIBER *me; + while (1) { ssize_t n = __sys_send(sockfd, buf, len, flags); @@ -569,12 +675,20 @@ inline ssize_t fiber_send(int sockfd, const void *buf, size_t len, int flags) return -1; fiber_wait_write(sockfd); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, + acl_fiber_id(me)); } } inline ssize_t fiber_sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen) { + ACL_FIBER *me; + while (1) { ssize_t n = __sys_sendto(sockfd, buf, len, flags, dest_addr, addrlen); @@ -595,11 +709,19 @@ inline ssize_t fiber_sendto(int sockfd, const void *buf, size_t len, int flags, return -1; fiber_wait_write(sockfd); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, + acl_fiber_id(me)); } } inline ssize_t fiber_sendmsg(int sockfd, const struct msghdr *msg, int flags) { + ACL_FIBER *me; + while (1) { ssize_t n = __sys_sendmsg(sockfd, msg, flags); @@ -619,6 +741,12 @@ inline ssize_t fiber_sendmsg(int sockfd, const struct msghdr *msg, int flags) return -1; fiber_wait_write(sockfd); + + me = acl_fiber_running(); + if ((me->flag & FIBER_F_KILLED) != 0) + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, + acl_fiber_id(me)); } } diff --git a/lib_fiber/c/src/hook_net.c b/lib_fiber/c/src/hook_net.c index ac937a038..0799b74a3 100644 --- a/lib_fiber/c/src/hook_net.c +++ b/lib_fiber/c/src/hook_net.c @@ -127,6 +127,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { int clifd; EVENT *ev; + ACL_FIBER *me = acl_fiber_running(); if (sockfd < 0) { acl_msg_error("%s: invalid sockfd %d", __FUNCTION__, sockfd); @@ -163,6 +164,12 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) if (ev) event_clear_readable(ev, sockfd); + if ((me->flag & FIBER_F_KILLED) != 0) { + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me)); + return -1; + } + clifd = __sys_accept(sockfd, addr, addrlen); if (clifd >= 0) { @@ -190,6 +197,12 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) if (ev) event_clear_readable(ev, sockfd); + if ((me->flag & FIBER_F_KILLED) != 0) { + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me)); + return -1; + } + clifd = __sys_accept(sockfd, addr, addrlen); if (clifd >= 0) { @@ -207,6 +220,7 @@ int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) { int err; socklen_t len = sizeof(err); + ACL_FIBER *me = acl_fiber_running(); if (!acl_var_hook_sys_api) return __sys_connect(sockfd, addr, addrlen); @@ -258,6 +272,12 @@ int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) fiber_wait_write(sockfd); + if ((me->flag & FIBER_F_KILLED) != 0) { + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me)); + return -1; + } + ret = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *) &err, &len); if (ret == 0 && err == 0) { @@ -365,7 +385,7 @@ int poll(struct pollfd *fds, nfds_t nfds, int timeout) ev = fiber_io_event(); pe.fds = fds; pe.nfds = nfds; - pe.fiber = fiber_running(); + pe.fiber = acl_fiber_running(); pe.proc = poll_callback; SET_TIME(begin); @@ -375,8 +395,17 @@ int poll(struct pollfd *fds, nfds_t nfds, int timeout) fiber_io_inc(); acl_fiber_switch(); + if ((pe.fiber->flag & FIBER_F_KILLED) != 0) { + acl_ring_detach(&pe.me); + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, + acl_fiber_id(pe.fiber)); + break; + } + if (acl_ring_size(&ev->poll_list) == 0) ev->timeout = -1; + if (pe.nready != 0 || timeout == 0) break; @@ -775,7 +804,7 @@ int epoll_wait(int epfd, struct epoll_event *events, ee->events = events; ee->maxevents = maxevents; - ee->fiber = fiber_running(); + ee->fiber = acl_fiber_running(); ee->proc = epoll_callback; SET_TIME(begin); @@ -786,6 +815,15 @@ int epoll_wait(int epfd, struct epoll_event *events, acl_fiber_switch(); ev->timeout = -1; + + if ((ee->fiber->flag & FIBER_F_KILLED) != 0) { + acl_ring_detach(&ee->me); + acl_msg_info("%s(%d), %s: fiber-%d is existing", + __FILE__, __LINE__, __FUNCTION__, + acl_fiber_id(ee->fiber)); + break; + } + if (ee->nready != 0 || timeout == 0) break; diff --git a/lib_fiber/c/src/master/fiber_server.c b/lib_fiber/c/src/master/fiber_server.c index 8b34b223c..674effd60 100644 --- a/lib_fiber/c/src/master/fiber_server.c +++ b/lib_fiber/c/src/master/fiber_server.c @@ -327,7 +327,7 @@ static void fiber_accept_main(ACL_FIBER *fiber, void *ctx) acl_msg_warn("accept connection: %s(%d, %d), stoping ...", acl_last_serror(), errno, ACL_EAGAIN); - server_abort(fiber_running()); + server_abort(acl_fiber_running()); } acl_msg_info("%s(%d), %s: fiber-%d exit now", __FILE__, __LINE__, diff --git a/lib_fiber/changes.txt b/lib_fiber/changes.txt index a470583f4..61a390bcd 100644 --- a/lib_fiber/changes.txt +++ b/lib_fiber/changes.txt @@ -1,4 +1,12 @@ +30) 2016.11.27 +30.1) bugfix: fiber.c 涓嚱鏁 acl_fiber_create 涓瓨鍦ㄨ繍琛屾椂鍐呭瓨澧為暱闂锛屽嵆 +__thread_fiber->fibers 瀵硅薄鍦 realloc 杩囩▼涓唴瀛樹笉鏂闀匡紝鍥犱负 __thread_fier->size +璁$畻鏂规硶鏈夐棶棰 + +29) 2016.11.24 +29.1) feature: 澧炲姞 acl_fiber_kill 鎺ュ彛鐢ㄦ潵閫氱煡鎸囧畾鍗忕▼閫鍑 + 28) 2016.10.31 28.1) feature: fiber.c 涓疄鐜颁簡 __i386__ 涓 SETJMP锛孡ONGJMP 鐨勬眹缂栨柟寮忥紝浠 libdill 鍊熼壌鑰屾潵 diff --git a/lib_fiber/samples/chat/Makefile b/lib_fiber/samples/chat/Makefile index ea955a8bf..77c388bb9 100644 --- a/lib_fiber/samples/chat/Makefile +++ b/lib_fiber/samples/chat/Makefile @@ -1,2 +1,19 @@ -include ../Makefile_cpp.in -PROG = chat +.PHONEY = all clean cl rebuild rb + +all: + @(cd server; make) + @(cd client; make) + @(cd reader; make) + @(cd writer; make) + @(cd connect; make) + @(cd demo; make) + +clean cl: + @(cd server; make clean) + @(cd client; make clean) + @(cd reader; make clean) + @(cd writer; make clean) + @(cd connect; make clean) + @(cd demo; make clean) + +rebuild rb: cl all diff --git a/lib_fiber/samples/chat/Makefile.in b/lib_fiber/samples/chat/Makefile.in new file mode 100644 index 000000000..2e90f4a51 --- /dev/null +++ b/lib_fiber/samples/chat/Makefile.in @@ -0,0 +1,126 @@ +CC = g++ + +CFLAGS = -c -g -W -Wall -Wcast-qual -Wcast-align \ +-Waggregate-return -Wno-long-long \ +-Wpointer-arith -Werror -Wshadow -O3 \ +-D_REENTRANT -D_POSIX_PTHREAD_SEMANTICS -D_USE_FAST_MACRO + +########################################################### +#Check system: +# Linux, SunOS, Solaris, BSD variants, AIX, HP-UX +SYSLIB = -ldl -lpthread -lz +CHECKSYSRES = @echo "Unknow system type!";exit 1 +UNIXNAME = $(shell uname -sm) +OSTYPE = $(shell uname -p) +RPATH = linux64 + +ifeq ($(CC),) + CC = gcc +endif + +# For FreeBSD +ifeq ($(findstring FreeBSD, $(UNIXNAME)), FreeBSD) + ifeq ($(findstring gcc, $(CC)), gcc) + CFLAGS += -Wstrict-prototypes + endif + CFLAGS += -DFREEBSD -D_REENTRANT + SYSLIB = -lcrypt -lpthread -lz + RPATH = freebsd +endif + +# For Darwin +ifeq ($(findstring Darwin, $(UNIXNAME)), Darwin) + CFLAGS += -DMACOSX -Wno-invalid-source-encoding \ + -Wno-extended-offsetof + UNIXTYPE = MACOSX + SYSLIB += -liconv -rdynamic + RPATH = macos +endif + +#Path for Linux +ifeq ($(findstring Linux, $(UNIXNAME)), Linux) + ifeq ($CC, "gcc") + CFLAGS += -Wstrict-prototypes + endif + ifeq ($(findstring i686, $(OSTYPE)), i686) + RPATH = linux32 + endif + ifeq ($(findstring x86_64, $(OSTYPE)), x86_64) + RPATH = linux64 + endif + CFLAGS += -DLINUX2 -D_REENTRANT + SYSLIB += -lcrypt +endif + +#Path for SunOS +ifeq ($(findstring SunOS, $(UNIXNAME)), SunOS) + ifeq ($(findstring 86, $(UNIXNAME)), 86) + SYSLIB += -lsocket -lnsl -lrt + endif + ifeq ($(findstring sun4u, $(UNIXNAME)), sun4u) + SYSLIB += -lsocket -lnsl -lrt + endif + ifeq ($CC, "gcc") + CFLAGS += -Wstrict-prototypes + endif + CFLAGS += -DSUNOS5 -D_REENTRANT + RPATH = sunos_x86 +endif + +#Path for HP-UX +ifeq ($(findstring HP-UX, $(UNIXNAME)), HP-UX) + ifeq ($CC, "gcc") + CFLAGS += -Wstrict-prototypes + endif + CFLAGS += -DHP_UX -DHPUX11 + PLAT_NAME=hp-ux +endif + +#Find system type. +ifneq ($(SYSPATH),) + CHECKSYSRES = @echo "System is $(shell uname -sm)" +endif +########################################################### + +CFLAGS += -I../.. -I../../../../lib_acl/include \ + -I../../../../lib_protocol/include \ + -I../../../../lib_acl_cpp/include \ + -I../../../../lib_fiber/c/include \ + -I../../../../lib_fiber/cpp/include +EXTLIBS = +LDFLAGS = -L../../../../lib_fiber/lib -l_fiber_cpp -l_fiber \ + -L../../../../lib_acl_cpp/lib -l_acl_cpp \ + -L../../../../lib_protocol/lib -l_protocol \ + -L../../../../lib_acl/lib -l_acl \ + $(EXTLIBS) $(SYSLIB) + +COMPILE = $(CC) $(CFLAGS) +LINK = $(CC) $(OBJ) $(LDFLAGS) +########################################################### +OBJ_PATH = . + +#Project's objs +SRC = $(wildcard *.cpp) +OBJ = $(patsubst %.cpp, $(OBJ_PATH)/%.o, $(notdir $(SRC))) + +$(OBJ_PATH)/%.o: %.cpp + $(COMPILE) $< -o $@ + +.PHONY = all clean +all: RM $(OBJ) + $(LINK) -o $(PROG) + @echo "" + @echo "All ok! Output:$(PROG)" + @echo "" +RM: + rm -f $(PROG) +clean cl: + rm -f $(PROG) + rm -f $(OBJ) + +rebuild rb: clean all + +install: + cp $(PROG) ../../../dist/master/libexec/$(RPATH)/ + cp $(PROG).cf ../../../dist/master/conf/service/ +########################################################### diff --git a/lib_fiber/samples/chat/client/Makefile b/lib_fiber/samples/chat/client/Makefile new file mode 100644 index 000000000..69489ae78 --- /dev/null +++ b/lib_fiber/samples/chat/client/Makefile @@ -0,0 +1,2 @@ +include ../Makefile.in +PROG = chat_client diff --git a/lib_fiber/samples/chat/client/main.cpp b/lib_fiber/samples/chat/client/main.cpp new file mode 100644 index 000000000..97f7aab3a --- /dev/null +++ b/lib_fiber/samples/chat/client/main.cpp @@ -0,0 +1,213 @@ +#include "stdafx.h" +#include +#include +#include +#include +#include "fiber/lib_fiber.hpp" +#include "stamp.h" +#include "user_client.h" + +#define STACK_SIZE 128000 + +static const char* __user_prefix = "user"; +static int __rw_timeout = 0; +static int __max_client = 10; +static int __cur_client = 10; +static struct timeval __begin; +static int __total_read = 0; + +static bool client_login(user_client* uc) +{ + acl::string buf; + buf.format("login|%s\r\n", uc->get_name()); + + if (uc->get_stream().write(buf) == -1) + { + printf("login failed error, user: %s\r\n", uc->get_name()); + return false; + } + + if (uc->get_stream().gets(buf) == false) + { + printf("login failed, gets error %s\r\n", acl::last_serror()); + return false; + } + + return true; +} + +static void client_logout(user_client* client) +{ + client->notify_exit(); +} + +static void fiber_reader(user_client* client) +{ + acl::socket_stream& conn = client->get_stream(); + conn.set_rw_timeout(5); + + if (client_login(client) == false) + { + client_logout(client); + return; + } + + printf("fiber-%d: user: %s login OK\r\n", acl_fiber_self(), + client->get_name()); + + acl::string buf; + + const char* to = client->get_to(); + acl::string msg; + int max_loop = client->get_max_loop(), i = 0, n = 0; + + for (;;) + { + msg.format("chat|%s|hello world\r\n", to); + if (conn.write(msg) != (int) msg.size()) + { + printf("fiber-%d: msg(%s) write error %s\r\n", + acl_fiber_self(), msg.c_str(), + acl::last_serror()); + } + + if (conn.gets(buf)) + { + if (++i <= 1) + printf("fiber-%d: gets->%s\r\n", + acl_fiber_self(), buf.c_str()); + n++; + __total_read++; + if (n == max_loop) + break; + continue; + } + + printf("%s(%d): user: %s, gets error %s, fiber: %d\r\n", + __FUNCTION__, __LINE__, client->get_name(), + acl::last_serror(), acl_fiber_self()); + + if (client->existing()) + { + printf("----existing now----\r\n"); + break; + } + + if (errno == ETIMEDOUT) + printf("ETIMEDOUT\r\n"); + else if (errno == EAGAIN) + printf("EAGAIN\r\n"); + else { + printf("gets error: %d, %s\r\n", + errno, acl::last_serror()); + break; + } + } + + printf(">>%s(%d), user: %s, logout, loop: %d, ngets: %d\r\n", + __FUNCTION__, __LINE__, client->get_name(), i, n); + + client_logout(client); +} + +static void fiber_client(const char* addr, const char* user, + const char* to, int loop) +{ + acl::socket_stream conn; + if (conn.open(addr, 0, 10) == false) + { + printf("connect %s error %s\r\n", addr, acl::last_serror()); + } + else + { + printf("fiber-%d, connect %s ok\r\n", acl_fiber_self(), addr); + + user_client* client = new user_client(conn, user, to, loop); + go[=] { + fiber_reader(client); + }; + + client->wait_exit(); + printf("--- client %s exit now ----\r\n", client->get_name()); + delete client; + } + + __cur_client--; + + if (__cur_client == 0) + { + printf("-----All fibers over!-----\r\n"); + struct timeval end; + gettimeofday(&end, NULL); + double spent = stamp_sub(&end, &__begin); + printf("---Total read: %d, spent: %.2f, speed: %.2f---\r\n", + __total_read, spent, + (1000 * __total_read) / (spent < 1 ? 1 : spent)); + + acl_fiber_schedule_stop(); + } +} + +static void usage(const char *procname) +{ + printf("usage: %s -h [help]\r\n" + " -s server_addr\r\n" + " -n max_loop\r\n" + " -c max_client\r\n" + " -r rw_timeout\r\n" , procname); +} + +int main(int argc, char *argv[]) +{ + char addr[64]; + int ch, max_loop = 100; + + acl::log::stdout_open(true); + snprintf(addr, sizeof(addr), "%s", "127.0.0.1:9002"); + + while ((ch = getopt(argc, argv, "hs:r:c:n:")) > 0) { + switch (ch) { + case 'h': + usage(argv[0]); + return 0; + case 's': + snprintf(addr, sizeof(addr), "%s", optarg); + break; + case 'r': + __rw_timeout = atoi(optarg); + break; + case 'c': + __max_client = atoi(optarg); + break; + case 'n': + max_loop = atoi(optarg); + break; + default: + break; + } + } + + __cur_client = __max_client; + + gettimeofday(&__begin, NULL); + + for (int i = 0; i < __max_client; i++) + { + char user[128], to[128]; + + snprintf(user, sizeof(user), "%s-%d", __user_prefix, i); + + if (i + 1 == __max_client) + snprintf(to, sizeof(to), "%s-0", __user_prefix); + else + snprintf(to, sizeof(to), "%s-%d", __user_prefix, i); + + go[=] { + fiber_client(addr, user, to, max_loop); + }; + } + + acl::fiber::schedule(); + + return 0; +} diff --git a/lib_fiber/samples/chat/client/stdafx.cpp b/lib_fiber/samples/chat/client/stdafx.cpp new file mode 100644 index 000000000..f01a2ff42 --- /dev/null +++ b/lib_fiber/samples/chat/client/stdafx.cpp @@ -0,0 +1,8 @@ +// stdafx.cpp : 只包括标准包含文件的源文件 +// master_threads.pch 将成为预编译头 +// stdafx.obj 将包含预编译类型信息 + +#include "stdafx.h" + +// TODO: 在 STDAFX.H 中 +//引用任何所需的附加头文件,而不是在此文件中引用 diff --git a/lib_fiber/samples/chat/client/stdafx.h b/lib_fiber/samples/chat/client/stdafx.h new file mode 100644 index 000000000..ab94a5314 --- /dev/null +++ b/lib_fiber/samples/chat/client/stdafx.h @@ -0,0 +1,20 @@ +// stdafx.h : 标准系统包含文件的包含文件, +// 或是常用但不常更改的项目特定的包含文件 +// + +#pragma once + + +//#include +//#include + +// TODO: 在此处引用程序要求的附加头文件 + +#include "lib_acl.h" +#include "fiber/lib_fiber.h" +#include "acl_cpp/lib_acl.hpp" + +#ifdef WIN32 +#define snprintf _snprintf +#endif + diff --git a/lib_fiber/samples/chat/client/user_client.h b/lib_fiber/samples/chat/client/user_client.h new file mode 100644 index 000000000..f12c95333 --- /dev/null +++ b/lib_fiber/samples/chat/client/user_client.h @@ -0,0 +1,63 @@ +#pragma once +#include + +class user_client +{ +public: + user_client(acl::socket_stream& conn, const char* user, + const char* to, int max_loop) + : conn_(conn), name_(user), to_(to), max_loop_(max_loop) {} + ~user_client(void) + { + } + + acl::socket_stream& get_stream(void) const + { + return conn_; + } + + const char* get_name(void) const + { + return name_.c_str(); + } + + const char* get_to(void) const + { + return to_.c_str(); + } + + int get_max_loop(void) const + { + return max_loop_; + } + + bool existing(void) const + { + return existing_; + } + + void set_existing(void) + { + existing_ = true; + } + + void wait_exit(void) + { + int mtype; + chan_exit_.pop(mtype); + } + + void notify_exit(void) + { + int mtype = 1; + chan_exit_.put(mtype); + } + +private: + acl::socket_stream& conn_; + acl::channel chan_exit_; + acl::string name_; + acl::string to_; + int max_loop_; + bool existing_ = false; +}; diff --git a/lib_fiber/samples/chat/client/valgrind.sh b/lib_fiber/samples/chat/client/valgrind.sh new file mode 100644 index 000000000..3b90585ff --- /dev/null +++ b/lib_fiber/samples/chat/client/valgrind.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes -v ./chat_client diff --git a/lib_fiber/samples/chat/connect/Makefile b/lib_fiber/samples/chat/connect/Makefile new file mode 100644 index 000000000..172f87db9 --- /dev/null +++ b/lib_fiber/samples/chat/connect/Makefile @@ -0,0 +1,2 @@ +include ../Makefile.in +PROG = client diff --git a/lib_fiber/samples/chat/connect/main.cpp b/lib_fiber/samples/chat/connect/main.cpp new file mode 100644 index 000000000..9c91d6301 --- /dev/null +++ b/lib_fiber/samples/chat/connect/main.cpp @@ -0,0 +1,113 @@ +#include +#include +#include +#include +#include +#include +#include +#include "lib_acl.h" +#include "acl_cpp/lib_acl.hpp" + +static void usage(const char *procname) +{ + printf("usage: %s -h [help]\r\n" + " -s server_ip\r\n" + " -n max_loop\r\n", procname); +} + +struct A { + int m; + ACL_RING entry; +}; + +static void test(void) +{ + ACL_RING ring; + + acl_ring_init(&ring); + for (int i = 0; i < 10; i++) + { + A* a = new A; + a->m = i; + acl_ring_prepend(&ring, &a->entry); + printf("put %d\r\n", i); + } + + printf("-------------------------------------------------------\r\n"); + + while (true) + { + ACL_RING* head = acl_ring_pop_head(&ring); + if (head == NULL) + break; + A* a = ACL_RING_TO_APPL(head, A, entry); + printf("pop %d\r\n", a->m); + delete a; + } + + exit (0); +} + +int main(int argc, char *argv[]) +{ + int ch, max_loop = 100, n = 100; + acl::string addr("127.0.0.1:9002"); + + if (0) + test(); + + signal(SIGPIPE, SIG_IGN); + + while ((ch = getopt(argc, argv, "hn:s:l:")) > 0) { + switch (ch) { + case 'h': + usage(argv[0]); + return 0; + case 'n': + max_loop = atoi(optarg); + break; + case 'l': + n = atoi(optarg); + break; + case 's': + addr = optarg; + break; + default: + break; + } + } + + for (int j = 0; j < n; j++) + { + std::vector conns; + int i = 0; + for (; i < max_loop; i++) + { + acl::socket_stream* conn = new acl::socket_stream; + if (conn->open(addr, 0, 0) == false) + { + printf("connect %s error %s\r\n", + addr.c_str(), acl::last_serror()); + delete conn; + break; + } + conns.push_back(conn); + } + + printf("i: %d, max_loop: %d\r\n", i, max_loop); + + sleep(1); + + for (std::vector::iterator + it = conns.begin(); it != conns.end(); ++it) + { + //int sock = (*it)->sock_handle(); + //shutdown(sock, SHUT_RDWR); + delete *it; + } + + conns.clear(); + } + + return 0; +} diff --git a/lib_fiber/samples/chat/demo/Makefile b/lib_fiber/samples/chat/demo/Makefile new file mode 100644 index 000000000..585b78197 --- /dev/null +++ b/lib_fiber/samples/chat/demo/Makefile @@ -0,0 +1,2 @@ +include ../Makefile.in +PROG = chat diff --git a/lib_fiber/samples/chat/demo/main.cpp b/lib_fiber/samples/chat/demo/main.cpp new file mode 100644 index 000000000..b0e33f165 --- /dev/null +++ b/lib_fiber/samples/chat/demo/main.cpp @@ -0,0 +1,330 @@ +#include "stdafx.h" +#include +#include +#include +#include +#include "fiber/lib_fiber.h" +#include "user_client.h" + +#define STACK_SIZE 128000 + +static void client_read_callback(int type, ACL_EVENT *event, + ACL_VSTREAM *conn, void *ctx); + +static int __rw_timeout = 0; +static int __stop = 0; +static int __use_kernel = 0; + +static std::map __users; + +static void remove_user(user_client* uc) +{ + const char* name = uc->get_name(); + if (name == NULL || *name == 0) + { + printf("no name!\r\n"); + return; + } + + std::map::iterator it = __users.find(name); + if (it == __users.end()) + printf("not exist, name: %s\r\n", name); + else + { + __users.erase(it); + printf("delete user ok, name: %s\r\n", name); + } +} + +static bool client_login(user_client* uc, ACL_ARGV* tokens) +{ + if (tokens->argc < 2) + { + acl::string tmp; + tmp.format("invalid argc: %d < 2\r\n", tokens->argc); + printf("%s", tmp.c_str()); + + return acl_vstream_writen(uc->get_stream(), tmp.c_str(), + (int) tmp.size()) != ACL_VSTREAM_EOF; + } + + acl::string msg; + + const char* name = tokens->argv[1]; + std::map::iterator it = __users.find(name); + if (it == __users.end()) + { + __users[name] = uc; + uc->set_name(name); + msg.format("user %s login ok\r\n", name); + } + else + msg.format("user %s already login\r\n", name); + + printf("%s", msg.c_str()); + + return acl_vstream_writen(uc->get_stream(), msg.c_str(), + (int) msg.size()) != ACL_VSTREAM_EOF; +} + +static bool client_chat(user_client* uc, ACL_ARGV* tokens) +{ + if (tokens->argc < 3) + { + acl::string tmp; + tmp.format("invalid argc: %d < 3\r\n", tokens->argc); + printf("%s", tmp.c_str()); + + return acl_vstream_writen(uc->get_stream(), tmp.c_str(), + (int) tmp.size()) != ACL_VSTREAM_EOF; + } + + const char* to = tokens->argv[1]; + const char* msg = tokens->argv[2]; + + std::map::iterator it = __users.find(to); + if (it == __users.end()) + { + acl::string tmp; + tmp.format("from user: %s, to user: %s not exist\r\n", + uc->get_name(), to); + printf("%s", tmp.c_str()); + + return acl_vstream_writen(uc->get_stream(), tmp.c_str(), + (int) tmp.size()) != ACL_VSTREAM_EOF; + } + else + { + it->second->push(msg); + return true; + } +} + +static void fiber_client_read(ACL_FIBER *fiber acl_unused, void *ctx) +{ + user_client* uc = (user_client*) ctx; + ACL_VSTREAM *conn = uc->get_stream(); + ACL_EVENT *event = uc->get_event(); + char buf[8192]; + + int ret = acl_vstream_gets_nonl(conn, buf, sizeof(buf) - 1); + printf(">>>ret: %d\r\n", ret); + if (ret == ACL_VSTREAM_EOF) + { + if (uc->already_login()) + remove_user(uc); + acl_vstream_close(conn); + delete uc; + return; + } + buf[ret] = 0; + + printf("gets: %s\r\n", buf); + + bool status; + ACL_ARGV* tokens = acl_argv_split(buf, "|"); + + if (strcasecmp(tokens->argv[0], "login") == 0) + { + if (uc->already_login()) + { + status = true; + printf("user: %s already logined\r\n", uc->get_name()); + } + else + status = client_login(uc, tokens); + } + else if (!uc->already_login()) + { + acl::string tmp; + tmp.format("you're not login yet!\r\n"); + printf("%s", tmp.c_str()); + + status = acl_vstream_writen(conn, tmp.c_str(), (int) tmp.size()) + == ACL_VSTREAM_EOF ? false : true; + } + else if (strcasecmp(tokens->argv[0], "chat") == 0) + status = client_chat(uc, tokens); + else + { + acl::string tmp; + tmp.format("invalid data: %s\r\n", buf); + printf("%s", tmp.c_str()); + + status = acl_vstream_writen(conn, tmp.c_str(), (int) tmp.size()) + == ACL_VSTREAM_EOF ? false : true; + } + + acl_argv_free(tokens); + + if (status == false) + { + if (uc->already_login()) + remove_user(uc); + + acl_vstream_close(conn); + delete uc; + return; + } + + acl_event_enable_read(event, conn, 120, client_read_callback, uc); +} + +static void client_read_callback(int type acl_unused, ACL_EVENT *event, + ACL_VSTREAM *conn, void *ctx) +{ + user_client* uc = (user_client*) ctx; + + acl_event_disable_readwrite(event, conn); + acl_fiber_create(fiber_client_read, uc, STACK_SIZE); +} + +static void listen_callback(int type acl_unused, ACL_EVENT *event, + ACL_VSTREAM *sstream, void *ctx acl_unused) +{ + char ip[64]; + ACL_VSTREAM *conn = acl_vstream_accept(sstream, ip, sizeof(ip)); + + if (conn == NULL) { + printf("accept error %s\r\n", acl_last_serror()); + return; + } + + printf(">>>accept one, fd: %d\r\n", ACL_VSTREAM_SOCK(conn)); + + user_client* uc = new user_client(event, conn); + acl_event_enable_read(event, conn, 120, client_read_callback, uc); +} + +static void fiber_flush(ACL_FIBER* fiber acl_unused, void* ctx) +{ + user_client* uc = (user_client*) ctx; + ACL_EVENT* event = uc->get_event(); + ACL_VSTREAM* conn = uc->get_stream(); + acl::string* msg; + + uc->set_busy(true); + + while ((msg = uc->pop()) != NULL) + { + if (acl_vstream_writen(conn, msg->c_str(), (int) msg->size()) + == ACL_VSTREAM_EOF) + { + printf("flush to user: %s error\r\n", uc->get_name()); + delete msg; + break; + + // don't delete uc here, which will be remove in + // read callback + //acl_vstream_close(conn); + //delete uc; + //return; + } + } + + uc->set_busy(false); + acl_event_enable_read(event, conn, 120, client_read_callback, uc); +} + +static void fflush_all(void) +{ + std::map::iterator it = __users.begin(); + std::map::iterator next = it; + for (; it != __users.end(); it = next) + { + ++next; + if (it->second->empty() || it->second->is_busy()) + continue; + + ACL_EVENT* event = it->second->get_event(); + ACL_VSTREAM* conn = it->second->get_stream(); + acl_event_disable_readwrite(event, conn); + + acl_fiber_create(fiber_flush, it->second, STACK_SIZE); + } +} + +static void fiber_event(ACL_FIBER *fiber acl_unused, void *ctx) +{ + ACL_VSTREAM *sstream = (ACL_VSTREAM *) ctx; + ACL_EVENT *event; + time_t last_ping = time(NULL); + + if (__use_kernel) + event = acl_event_new(ACL_EVENT_KERNEL, 0, 0, 100000); + else + event = acl_event_new(ACL_EVENT_POLL, 0, 0, 100000); + + printf(">>>enable read fd: %d\r\n", ACL_VSTREAM_SOCK(sstream)); + acl_event_enable_listen(event, sstream, 0, listen_callback, NULL); + + while (!__stop) + { + acl_event_loop(event); + fflush_all(); + + if (time(NULL) - last_ping >= 1) + { + //ping_all(); + last_ping = time(NULL); + } + } + + acl_vstream_close(sstream); + acl_event_free(event); + + acl_fiber_schedule_stop(); +} + +static void usage(const char *procname) +{ + printf("usage: %s -h [help] -s listen_addr\r\n" + " -r rw_timeout\r\n" + " -R [use real http]\r\n" + " -k [use kernel event]\r\n", procname); +} + +int main(int argc, char *argv[]) +{ + char addr[64]; + ACL_VSTREAM *sstream; + int ch; + + acl::log::stdout_open(true); + snprintf(addr, sizeof(addr), "%s", "127.0.0.1:9002"); + + while ((ch = getopt(argc, argv, "hs:r:k")) > 0) { + switch (ch) { + case 'h': + usage(argv[0]); + return 0; + case 's': + snprintf(addr, sizeof(addr), "%s", optarg); + break; + case 'r': + __rw_timeout = atoi(optarg); + break; + case 'k': + __use_kernel = 1; + break; + default: + break; + } + } + + sstream = acl_vstream_listen(addr, 128); + if (sstream == NULL) { + printf("acl_vstream_listen error %s\r\n", acl_last_serror()); + return 1; + } + + printf("listen %s ok\r\n", addr); + + printf("%s: call fiber_creater\r\n", __FUNCTION__); + acl_fiber_create(fiber_event, sstream, STACK_SIZE); + + printf("call fiber_schedule\r\n"); + acl_fiber_schedule(); + + return 0; +} diff --git a/lib_fiber/samples/chat/demo/stdafx.cpp b/lib_fiber/samples/chat/demo/stdafx.cpp new file mode 100644 index 000000000..f01a2ff42 --- /dev/null +++ b/lib_fiber/samples/chat/demo/stdafx.cpp @@ -0,0 +1,8 @@ +// stdafx.cpp : 只包括标准包含文件的源文件 +// master_threads.pch 将成为预编译头 +// stdafx.obj 将包含预编译类型信息 + +#include "stdafx.h" + +// TODO: 在 STDAFX.H 中 +//引用任何所需的附加头文件,而不是在此文件中引用 diff --git a/lib_fiber/samples/chat/demo/stdafx.h b/lib_fiber/samples/chat/demo/stdafx.h new file mode 100644 index 000000000..ab94a5314 --- /dev/null +++ b/lib_fiber/samples/chat/demo/stdafx.h @@ -0,0 +1,20 @@ +// stdafx.h : 标准系统包含文件的包含文件, +// 或是常用但不常更改的项目特定的包含文件 +// + +#pragma once + + +//#include +//#include + +// TODO: 在此处引用程序要求的附加头文件 + +#include "lib_acl.h" +#include "fiber/lib_fiber.h" +#include "acl_cpp/lib_acl.hpp" + +#ifdef WIN32 +#define snprintf _snprintf +#endif + diff --git a/lib_fiber/samples/chat/demo/user_client.h b/lib_fiber/samples/chat/demo/user_client.h new file mode 100644 index 000000000..2069c97c7 --- /dev/null +++ b/lib_fiber/samples/chat/demo/user_client.h @@ -0,0 +1,80 @@ +#pragma once +#include + +class user_client +{ +public: + user_client(ACL_EVENT*event, ACL_VSTREAM* conn) + : event_(event), conn_(conn), busy_(false) {} + ~user_client(void) + { + for (std::list::iterator it = messages_.begin(); + it != messages_.end(); ++it) + { + delete *it; + } + } + + ACL_EVENT* get_event(void) const + { + return event_; + } + + ACL_VSTREAM* get_stream(void) const + { + return conn_; + } + + bool already_login(void) const + { + return !name_.empty(); + } + + bool empty(void) const + { + return messages_.empty(); + } + + acl::string* pop(void) + { + if (messages_.empty()) + return NULL; + acl::string* msg = messages_.front(); + messages_.pop_front(); + return msg; + } + + void push(const char* msg) + { + acl::string* buf = new acl::string(msg); + (*buf) << "\r\n"; + messages_.push_back(buf); + } + + void set_name(const char* name) + { + name_ = name; + } + + const char* get_name(void) const + { + return name_.c_str(); + } + + bool is_busy(void) const + { + return busy_; + } + + void set_busy(bool yes) + { + busy_ = yes; + } + +private: + ACL_EVENT* event_; + ACL_VSTREAM* conn_; + acl::string name_; + std::list messages_; + bool busy_; +}; diff --git a/lib_fiber/samples/chat/reader/Makefile b/lib_fiber/samples/chat/reader/Makefile new file mode 100644 index 000000000..048394d6c --- /dev/null +++ b/lib_fiber/samples/chat/reader/Makefile @@ -0,0 +1,2 @@ +include ../Makefile.in +PROG = reader diff --git a/lib_fiber/samples/chat/reader/main.cpp b/lib_fiber/samples/chat/reader/main.cpp new file mode 100644 index 000000000..4e04263d3 --- /dev/null +++ b/lib_fiber/samples/chat/reader/main.cpp @@ -0,0 +1,180 @@ +#include "stdafx.h" +#include +#include +#include +#include +#include "fiber/lib_fiber.hpp" +#include "stamp.h" +#include "user_client.h" + +#define STACK_SIZE 128000 + +static const char* __user_prefix = "reader"; +static int __rw_timeout = 0; +static int __max_client = 10; +static int __cur_client = 10; +static struct timeval __begin; +static int __total_read = 0; + +static bool client_login(user_client* uc) +{ + acl::string buf; + buf.format("login|%s\r\n", uc->get_name()); + + if (uc->get_stream().write(buf) == -1) + { + printf("login failed error, user: %s\r\n", uc->get_name()); + return false; + } + + if (uc->get_stream().gets(buf) == false) + { + printf("login failed, gets error %s\r\n", acl::last_serror()); + return false; + } + + return true; +} + +static void client_logout(user_client* client) +{ + client->notify_exit(); +} + +static void fiber_reader(user_client* client) +{ + acl::socket_stream& conn = client->get_stream(); + conn.set_rw_timeout(0); + + if (client_login(client) == false) + { + client_logout(client); + return; + } + + printf("fiber-%d: user: %s login OK\r\n", acl_fiber_self(), + client->get_name()); + + acl::string buf; + + int max_loop = client->get_max_loop(), i; + + for (i = 0; i < max_loop; ++i) + { + if (conn.gets(buf)) + { + if (i <= 1) + printf("fiber-%d: gets->%s\r\n", + acl_fiber_self(), buf.c_str()); + __total_read++; + continue; + } + + printf("%s(%d): user: %s, gets error %s, fiber: %d\r\n", + __FUNCTION__, __LINE__, client->get_name(), + acl::last_serror(), acl_fiber_self()); + + break; + } + + printf(">>%s(%d), user: %s, logout, loop: %d\r\n", + __FUNCTION__, __LINE__, client->get_name(), i); + + client_logout(client); +} + +static void fiber_client(const char* addr, const char* user, int max_loop) +{ + acl::socket_stream conn; + if (conn.open(addr, 0, 0) == false) + { + printf("connect %s error %s\r\n", addr, acl::last_serror()); + } + else + { + printf("fiber-%d, connect %s ok\r\n", acl_fiber_self(), addr); + + user_client* client = new user_client(conn, user, max_loop); + go[=] { + fiber_reader(client); + }; + + client->wait_exit(); + printf("--- client %s exit now ----\r\n", client->get_name()); + delete client; + } + + __cur_client--; + + if (__cur_client == 0) + { + printf("-----All fibers over!-----\r\n"); + struct timeval end; + gettimeofday(&end, NULL); + double spent = stamp_sub(&end, &__begin); + printf("---Total read: %d, spent: %.2f, speed: %.2f---\r\n", + __total_read, spent, + (1000 * __total_read) / (spent < 1 ? 1 : spent)); + + acl_fiber_schedule_stop(); + } +} + +static void usage(const char *procname) +{ + printf("usage: %s -h [help]\r\n" + " -s server_addr\r\n" + " -n max_loop\r\n" + " -c max_client\r\n" + " -r rw_timeout\r\n" , procname); +} + +int main(int argc, char *argv[]) +{ + char addr[64]; + int ch, max_loop = 100; + + acl::log::stdout_open(true); + snprintf(addr, sizeof(addr), "%s", "127.0.0.1:9002"); + + while ((ch = getopt(argc, argv, "hs:r:c:n:")) > 0) { + switch (ch) { + case 'h': + usage(argv[0]); + return 0; + case 's': + snprintf(addr, sizeof(addr), "%s", optarg); + break; + case 'r': + __rw_timeout = atoi(optarg); + break; + case 'c': + __max_client = atoi(optarg); + break; + case 'n': + max_loop = atoi(optarg); + break; + default: + break; + } + } + + __cur_client = __max_client; + + gettimeofday(&__begin, NULL); + + for (int i = 0; i < __max_client; i++) + { + char user[128]; + + snprintf(user, sizeof(user), "%s-%d", __user_prefix, i); + + go[=] { + fiber_client(addr, user, max_loop); + }; + } + + acl::fiber::schedule(); + + return 0; +} diff --git a/lib_fiber/samples/chat/reader/stdafx.cpp b/lib_fiber/samples/chat/reader/stdafx.cpp new file mode 100644 index 000000000..f01a2ff42 --- /dev/null +++ b/lib_fiber/samples/chat/reader/stdafx.cpp @@ -0,0 +1,8 @@ +// stdafx.cpp : 只包括标准包含文件的源文件 +// master_threads.pch 将成为预编译头 +// stdafx.obj 将包含预编译类型信息 + +#include "stdafx.h" + +// TODO: 在 STDAFX.H 中 +//引用任何所需的附加头文件,而不是在此文件中引用 diff --git a/lib_fiber/samples/chat/reader/stdafx.h b/lib_fiber/samples/chat/reader/stdafx.h new file mode 100644 index 000000000..ab94a5314 --- /dev/null +++ b/lib_fiber/samples/chat/reader/stdafx.h @@ -0,0 +1,20 @@ +// stdafx.h : 标准系统包含文件的包含文件, +// 或是常用但不常更改的项目特定的包含文件 +// + +#pragma once + + +//#include +//#include + +// TODO: 在此处引用程序要求的附加头文件 + +#include "lib_acl.h" +#include "fiber/lib_fiber.h" +#include "acl_cpp/lib_acl.hpp" + +#ifdef WIN32 +#define snprintf _snprintf +#endif + diff --git a/lib_fiber/samples/chat/reader/user_client.h b/lib_fiber/samples/chat/reader/user_client.h new file mode 100644 index 000000000..821b1bf6e --- /dev/null +++ b/lib_fiber/samples/chat/reader/user_client.h @@ -0,0 +1,56 @@ +#pragma once +#include + +class user_client +{ +public: + user_client(acl::socket_stream& conn, const char* user, int max_loop) + : conn_(conn), name_(user), max_loop_(max_loop) {} + ~user_client(void) + { + } + + acl::socket_stream& get_stream(void) const + { + return conn_; + } + + const char* get_name(void) const + { + return name_.c_str(); + } + + int get_max_loop(void) const + { + return max_loop_; + } + + bool existing(void) const + { + return existing_; + } + + void set_existing(void) + { + existing_ = true; + } + + void wait_exit(void) + { + int mtype; + chan_exit_.pop(mtype); + } + + void notify_exit(void) + { + int mtype = 1; + chan_exit_.put(mtype); + } + +private: + acl::socket_stream& conn_; + acl::channel chan_exit_; + acl::string name_; + int max_loop_; + bool existing_ = false; +}; diff --git a/lib_fiber/samples/chat/reader/valgrind.sh b/lib_fiber/samples/chat/reader/valgrind.sh new file mode 100644 index 000000000..3b90585ff --- /dev/null +++ b/lib_fiber/samples/chat/reader/valgrind.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes -v ./chat_client diff --git a/lib_fiber/samples/chat/server/Makefile b/lib_fiber/samples/chat/server/Makefile new file mode 100644 index 000000000..5f9058071 --- /dev/null +++ b/lib_fiber/samples/chat/server/Makefile @@ -0,0 +1,2 @@ +include ../Makefile.in +PROG = chat_server diff --git a/lib_fiber/samples/chat/server/main.cpp b/lib_fiber/samples/chat/server/main.cpp new file mode 100644 index 000000000..2e5b6f307 --- /dev/null +++ b/lib_fiber/samples/chat/server/main.cpp @@ -0,0 +1,446 @@ +#include "stdafx.h" +#include +#include +#include +#include +#include "fiber/lib_fiber.hpp" +#include "user_client.h" + +#define STACK_SIZE 32000 + +static int __rw_timeout = 0; + +static acl::channel __chan_monitor; +static std::map __users; + +static void remove_user(user_client* uc) +{ + const char* name = uc->get_name(); + if (name == NULL || *name == 0) + { + printf("%s(%d): no name!\r\n", __FUNCTION__, __LINE__); + return; + } + + std::map::iterator it = __users.find(name); + if (it == __users.end()) + printf("%s(%d): not exist, name: %s\r\n", + __FUNCTION__, __LINE__, name); + else + { + __users.erase(it); + printf("delete user ok, name: %s\r\n", name); + } +} + +static void client_logout(user_client* client) +{ + if (client->already_login()) + remove_user(client); + + if (client->is_reading()) + { + printf("%s(%d): user: %s, kill_reader\r\n", + __FUNCTION__, __LINE__, client->get_name()); + client->kill_reader(); + } + + if (client->is_waiting()) + { + printf("fiber-%d: %s(%d): user: %s, notify logout\r\n", + acl_fiber_self(), __FUNCTION__, __LINE__, + client->get_name()); + client->notify(MT_LOGOUT); + } + + if (!client->is_reading() && !client->is_waiting()) + client->notify_exit(); +} + +static bool client_flush(user_client* client) +{ + acl::socket_stream& conn = client->get_stream(); + acl::string* msg; + + bool ret = true; + + while ((msg = client->pop()) != NULL) + { + if (conn.write(*msg) == -1) + { + printf("flush to user: %s error %s\r\n", + client->get_name(), acl::last_serror()); + delete msg; + ret = false; + break; + } + + delete msg; + } + + return ret; +} + +static int __nwriter = 0; + +static void fiber_writer(user_client* client) +{ + client->set_waiter(); + client->set_waiting(true); + + while (true) + { + int mtype; + client->wait(mtype); + if (client_flush(client) == false) + { + printf("%s(%d), user: %s, flush error %s\r\n", + __FUNCTION__, __LINE__, client->get_name(), + acl::last_serror()); + break; + } + +#ifdef USE_CHAN + if (mtype == MT_LOGOUT) + { + printf("%s(%d), user: %s, MT_LOGOUT\r\n", + __FUNCTION__, __LINE__, client->get_name()); + break; + } + if (mtype == MT_KICK) + { + printf("%s(%d), user: %s, MT_KICK\r\n", + __FUNCTION__, __LINE__, client->get_name()); + client->get_stream().write("You're kicked\r\n"); + break; + } +#endif + } + + client->set_waiting(false); + printf(">>%s(%d), user: %s, logout\r\n", __FUNCTION__, __LINE__, + client->get_name()); + client_logout(client); + + printf("-------__nwriter: %d-----\r\n", --__nwriter); +} + +static bool client_login(user_client* uc) +{ + acl::string buf; + + while (true) + { + if (uc->get_stream().gets(buf) == false) + { + printf("%s(%d): gets error %s\r\n", + __FUNCTION__, __LINE__, acl::last_serror()); + + if (errno == ETIMEDOUT) + { + printf("Login timeout\r\n"); + uc->get_stream().write("Login timeout\r\n"); + } + return false; + } + if (!buf.empty()) + break; + } + + std::vector& tokens = buf.split2("|"); + if (tokens.size() < 2) + { + acl::string tmp; + tmp.format("invalid argc: %d < 2\r\n", (int) tokens.size()); + printf("%s", tmp.c_str()); + + return uc->get_stream().write(tmp) != -1; + } + + acl::string msg; + + const acl::string& name = tokens[1]; + std::map::iterator it = __users.find(name); + if (it == __users.end()) + { + __users[name] = uc; + uc->set_name(name); + msg.format("user %s login ok\r\n", name.c_str()); + } + else + msg.format("user %s already login\r\n", name.c_str()); + + printf("%s", msg.c_str()); + + return uc->get_stream().write(msg) != -1; +} + +static bool client_chat(user_client* uc, std::vector& tokens) +{ + if (tokens.size() < 3) + { + printf("invalid argc: %d < 3\r\n", (int) tokens.size()); + return true; + } + + const acl::string& to = tokens[1]; + const acl::string& msg = tokens[2]; + + std::map::iterator it = __users.find(to); + if (it == __users.end()) + { + acl::string tmp; + tmp.format("chat >> from user: %s, to user: %s not exist\r\n", + uc->get_name(), to.c_str()); + printf("%s", tmp.c_str()); + + return uc->get_stream().write(tmp) != -1; + } + + it->second->push(msg); + it->second->notify(MT_MSG); + return true; +} + +static bool client_kick(user_client* uc, std::vector& tokens) +{ + if (tokens.size() < 2) + { + printf("invalid argc: %d < 2\r\n", (int) tokens.size()); + return true; + } + + const acl::string& to = tokens[1]; + + std::map::iterator it = __users.find(to); + if (it == __users.end()) + { + acl::string tmp; + tmp.format("kick >> from user: %s, to user: %s not exist\r\n", + uc->get_name(), to.c_str()); + printf("%s", tmp.c_str()); + + return uc->get_stream().write(tmp) != -1; + } + + it->second->notify(MT_KICK); + + return true; +} + +static int __nreader = 0; + +static void fiber_reader(user_client* client) +{ + acl::socket_stream& conn = client->get_stream(); + conn.set_rw_timeout(0); + + client->set_reader(); + client->set_reading(true); + + if (client_login(client) == false) + { + client->set_reading(false); + printf("----------client_logout-------\r\n"); + client_logout(client); + + printf("----__nreader: %d-----\r\n", --__nreader); + return; + } + + go_stack(STACK_SIZE) [&] { + __nwriter++; + fiber_writer(client); + }; + + conn.set_rw_timeout(0); + + bool stop = false; + acl::string buf; + + while (true) + { + bool ret = conn.gets(buf); + if (ret == false) + { + printf("%s(%d): user: %s, gets error %s, fiber: %d\r\n", + __FUNCTION__, __LINE__, client->get_name(), + acl::last_serror(), acl_fiber_self()); + + if (client->exiting()) + { + printf("----exiting now----\r\n"); + break; + } + + if (errno == ETIMEDOUT) + { + if (conn.write("ping\r\n") == -1) + { + printf("ping error\r\n"); + break; + } + } + else if (errno == EAGAIN) + printf("EAGAIN\r\n"); + else { + printf("gets error: %d, %s\r\n", + errno, acl::last_serror()); + break; + } + + continue; + } + + if (buf.empty()) + continue; + + std::vector& tokens = buf.split2("|"); + if (tokens[0] == "quit" || tokens[0] == "exit") + { + conn.write("Bye!\r\n"); + break; + } + else if (tokens[0] == "chat") + { + if (client_chat(client, tokens) == false) + break; + } + else if (tokens[0] == "kick") + { + if (client_kick(client, tokens) == false) + break; + } + else if (tokens[0] == "stop") + { + stop = true; + break; + } + else + printf("invalid data: %s, cmd: [%s]\r\n", + buf.c_str(), tokens[0].c_str()); + } + + printf(">>%s(%d), user: %s, logout\r\n", __FUNCTION__, __LINE__, + client->get_name()); + + client->set_reading(false); + client_logout(client); + + printf("----__nreader: %d-----\r\n", --__nreader); + + if (stop) + { + int dumy = 1; + __chan_monitor.put(dumy); + } +} + +static int __nclients = 0; + +static void fiber_client(acl::socket_stream* conn) +{ + user_client* client = new user_client(*conn); + + go_stack(STACK_SIZE) [=] { + __nreader++; + fiber_reader(client); + }; + + client->wait_exit(); + + printf("----- client (%s), exit now -----\r\n", client->get_name()); + delete client; + delete conn; + + printf("----__nclients: %d-----\r\n", --__nclients); + printf("----dead fibers: %d---\r\n", acl_fiber_ndead()); +} + +static ACL_FIBER *__fiber_accept = NULL; + +static void fiber_accept(acl::server_socket& ss) +{ + __fiber_accept = acl_fiber_running(); + + while (true) + { + acl::socket_stream* conn = ss.accept(); + if (conn == NULL) + { + printf("accept error %s\r\n", acl::last_serror()); + break; + } + + go_stack(STACK_SIZE) [=] { + __nclients++; + fiber_client(conn); + }; + } +} + +static void fiber_monitor(void) +{ + int n; + + __chan_monitor.pop(n); + + printf("--- kill fiber_accept ---\r\n"); + acl_fiber_kill(__fiber_accept); + + printf("--- stop fiber schedule ---\r\n"); + acl_fiber_schedule_stop(); +} + +static void usage(const char *procname) +{ + printf("usage: %s -h [help]\r\n" + " -s listen_addr\r\n" + " -r rw_timeout\r\n" , procname); +} + +int main(int argc, char *argv[]) +{ + char addr[64]; + int ch; + + acl::log::stdout_open(true); + snprintf(addr, sizeof(addr), "%s", "127.0.0.1:9002"); + + while ((ch = getopt(argc, argv, "hs:r:")) > 0) { + switch (ch) { + case 'h': + usage(argv[0]); + return 0; + case 's': + snprintf(addr, sizeof(addr), "%s", optarg); + break; + case 'r': + __rw_timeout = atoi(optarg); + break; + default: + break; + } + } + + acl::server_socket ss; + if (ss.open(addr) == false) + { + printf("listen %s error %s\r\n", addr, acl::last_serror()); + return 1; + } + + printf("listen %s ok\r\n", addr); + + go[&] { + fiber_accept(ss); + }; + + go[] { + fiber_monitor(); + }; + + acl::fiber::schedule(); + + return 0; +} diff --git a/lib_fiber/samples/chat/server/stdafx.cpp b/lib_fiber/samples/chat/server/stdafx.cpp new file mode 100644 index 000000000..f01a2ff42 --- /dev/null +++ b/lib_fiber/samples/chat/server/stdafx.cpp @@ -0,0 +1,8 @@ +// stdafx.cpp : 只包括标准包含文件的源文件 +// master_threads.pch 将成为预编译头 +// stdafx.obj 将包含预编译类型信息 + +#include "stdafx.h" + +// TODO: 在 STDAFX.H 中 +//引用任何所需的附加头文件,而不是在此文件中引用 diff --git a/lib_fiber/samples/chat/server/stdafx.h b/lib_fiber/samples/chat/server/stdafx.h new file mode 100644 index 000000000..ab94a5314 --- /dev/null +++ b/lib_fiber/samples/chat/server/stdafx.h @@ -0,0 +1,20 @@ +// stdafx.h : 标准系统包含文件的包含文件, +// 或是常用但不常更改的项目特定的包含文件 +// + +#pragma once + + +//#include +//#include + +// TODO: 在此处引用程序要求的附加头文件 + +#include "lib_acl.h" +#include "fiber/lib_fiber.h" +#include "acl_cpp/lib_acl.hpp" + +#ifdef WIN32 +#define snprintf _snprintf +#endif + diff --git a/lib_fiber/samples/chat/server/user_client.h b/lib_fiber/samples/chat/server/user_client.h new file mode 100644 index 000000000..b9a7f432c --- /dev/null +++ b/lib_fiber/samples/chat/server/user_client.h @@ -0,0 +1,165 @@ +#pragma once +#include + +enum +{ + MT_MSG, + MT_LOGOUT, + MT_KICK, +}; + +class user_client +{ +public: +#ifdef USE_CHAN + user_client(acl::socket_stream& conn) : conn_(conn) {} +#else + user_client(acl::socket_stream& conn) : conn_(conn), sem_msg_(1) {} +#endif + ~user_client(void) + { + for (std::list::iterator it = messages_.begin(); + it != messages_.end(); ++it) + { + delete *it; + } + } + + acl::socket_stream& get_stream(void) const + { + return conn_; + } + + bool already_login(void) const + { + return !name_.empty(); + } + + bool empty(void) const + { + return messages_.empty(); + } + + acl::string* pop(void) + { + if (messages_.empty()) + return NULL; + acl::string* msg = messages_.front(); + messages_.pop_front(); + return msg; + } + + void push(const char* msg) + { + acl::string* buf = new acl::string(msg); + (*buf) << "\r\n"; + messages_.push_back(buf); + } + + void set_name(const char* name) + { + name_ = name; + } + + const char* get_name(void) const + { + return name_.c_str(); + } + + void wait(int& mtype) + { +#ifdef USE_CHAN + chan_msg_.pop(mtype); +#else + (void) mtype; + sem_msg_.wait(); +#endif + } + + void notify(int mtype) + { +#ifdef USE_CHAN + chan_msg_.put(mtype); +#else + (void) mtype; + (void) sem_msg_.post(); +#endif + } + + void kill_reader(void) + { + if (fiber_reader_) + { + exiting_ = true; + acl_fiber_kill(fiber_reader_); + } + } + + bool exiting(void) const + { + return exiting_; + } + + void set_exiting(void) + { + exiting_ = true; + } + + void set_reading(bool yes) + { + reading_ = yes; + } + + bool is_reading(void) const + { + return reading_; + } + + void set_waiting(bool yes) + { + waiting_ = yes; + } + + bool is_waiting(void) const + { + return waiting_; + } + + void wait_exit(void) + { + int mtype; + chan_exit_.pop(mtype); + } + + void notify_exit(void) + { + int mtype = MT_LOGOUT; + chan_exit_.put(mtype); + } + + void set_reader(void) + { + fiber_reader_ = acl_fiber_running(); + } + + void set_waiter(void) + { + fiber_waiter_ = acl_fiber_running(); + } + +private: + acl::socket_stream& conn_; +#ifdef USE_CHAN + acl::channel chan_msg_; +#else + acl::fiber_sem sem_msg_; +#endif + acl::channel chan_exit_; + acl::string name_; + std::list messages_; + bool exiting_ = false; + bool reading_ = false; + bool waiting_ = false; + ACL_FIBER* fiber_reader_ = NULL; + ACL_FIBER* fiber_waiter_ = NULL; +}; diff --git a/lib_fiber/samples/chat/server/valgrind.sh b/lib_fiber/samples/chat/server/valgrind.sh new file mode 100644 index 000000000..670d565c4 --- /dev/null +++ b/lib_fiber/samples/chat/server/valgrind.sh @@ -0,0 +1,4 @@ +#!/bin/sh + +#valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes --max-stackframe=3426305034400000 -v ./fiber -n 10 -m 20 +valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes -v ./chat_server diff --git a/lib_fiber/samples/chat/writer/Makefile b/lib_fiber/samples/chat/writer/Makefile new file mode 100644 index 000000000..24ca23a86 --- /dev/null +++ b/lib_fiber/samples/chat/writer/Makefile @@ -0,0 +1,2 @@ +include ../Makefile.in +PROG = writer diff --git a/lib_fiber/samples/chat/writer/main.cpp b/lib_fiber/samples/chat/writer/main.cpp new file mode 100644 index 000000000..acf3d8a60 --- /dev/null +++ b/lib_fiber/samples/chat/writer/main.cpp @@ -0,0 +1,192 @@ +#include "stdafx.h" +#include +#include +#include +#include +#include "fiber/lib_fiber.hpp" +#include "stamp.h" +#include "user_client.h" + +#define STACK_SIZE 128000 + +static const char* __writer_prefix = "writer"; +static const char* __reader_prefix = "reader"; +static int __rw_timeout = 0; +static int __max_client = 10; +static int __cur_client = 10; +static struct timeval __begin; +static int __total_write = 0; + +static bool client_login(user_client* uc) +{ + acl::string buf; + buf.format("login|%s\r\n", uc->get_name()); + + if (uc->get_stream().write(buf) == -1) + { + printf("login failed error, user: %s\r\n", uc->get_name()); + return false; + } + + if (uc->get_stream().gets(buf) == false) + { + printf("login failed, gets error %s\r\n", acl::last_serror()); + return false; + } + + return true; +} + +static void client_logout(user_client* client) +{ + client->notify_exit(); +} + +static void fiber_writer(user_client* client) +{ + acl::socket_stream& conn = client->get_stream(); + conn.set_rw_timeout(0); + + if (client_login(client) == false) + { + client_logout(client); + return; + } + + printf("fiber-%d: user: %s login OK\r\n", acl_fiber_self(), + client->get_name()); + + acl::string buf; + + const char* to = client->get_to(); + acl::string msg; + int max_loop = client->get_max_loop(), i; + + for (i = 0; i < max_loop; i++) + { + msg.format("chat|%s|hello world\r\n", to); + + if (conn.write(msg) != (int) msg.size()) + { + printf("fiber-%d: msg(%s) write error %s\r\n", + acl_fiber_self(), msg.c_str(), + acl::last_serror()); + + if (client->existing()) + { + printf("----existing now----\r\n"); + break; + } + } + + __total_write++; + + if (i < 1) + printf(">>>msg: %s", msg.c_str()); + + acl::fiber::yield(); + } + + printf(">>%s(%d), user: %s, logout, nwrite: %d\r\n", + __FUNCTION__, __LINE__, client->get_name(), i); + + client_logout(client); +} + +static void fiber_client(const char* addr, const char* user, + const char* to, int loop) +{ + acl::socket_stream conn; + if (conn.open(addr, 0, 0) == false) + { + printf("connect %s error %s\r\n", addr, acl::last_serror()); + } + else + { + printf("fiber-%d, connect %s ok\r\n", acl_fiber_self(), addr); + + user_client* client = new user_client(conn, user, to, loop); + go[=] { + fiber_writer(client); + }; + + client->wait_exit(); + printf("--- client %s exit now ----\r\n", client->get_name()); + delete client; + } + + __cur_client--; + + if (__cur_client == 0) + { + printf("-----All fibers over!-----\r\n"); + struct timeval end; + gettimeofday(&end, NULL); + double spent = stamp_sub(&end, &__begin); + printf("---Total write: %d, spent: %.2f, speed: %.2f---\r\n", + __total_write, spent, + (1000 * __total_write) / (spent < 1 ? 1 : spent)); + + acl_fiber_schedule_stop(); + } +} + +static void usage(const char *procname) +{ + printf("usage: %s -h [help]\r\n" + " -s server_addr\r\n" + " -n max_loop\r\n" + " -c max_client\r\n" + " -r rw_timeout\r\n" , procname); +} + +int main(int argc, char *argv[]) +{ + char addr[64]; + int ch, max_loop = 100; + + acl::log::stdout_open(true); + snprintf(addr, sizeof(addr), "%s", "127.0.0.1:9002"); + + while ((ch = getopt(argc, argv, "hs:r:c:n:")) > 0) { + switch (ch) { + case 'h': + usage(argv[0]); + return 0; + case 's': + snprintf(addr, sizeof(addr), "%s", optarg); + break; + case 'r': + __rw_timeout = atoi(optarg); + break; + case 'c': + __max_client = atoi(optarg); + break; + case 'n': + max_loop = atoi(optarg); + break; + default: + break; + } + } + + __cur_client = __max_client; + + gettimeofday(&__begin, NULL); + + for (int i = 0; i < __max_client; i++) + { + char user[128], to[128]; + + snprintf(user, sizeof(user), "%s-%d", __writer_prefix, i); + snprintf(to, sizeof(to), "%s-%d", __reader_prefix, i); + + go[=] { + fiber_client(addr, user, to, max_loop); + }; + } + + acl::fiber::schedule(); + + return 0; +} diff --git a/lib_fiber/samples/chat/writer/stdafx.cpp b/lib_fiber/samples/chat/writer/stdafx.cpp new file mode 100644 index 000000000..f01a2ff42 --- /dev/null +++ b/lib_fiber/samples/chat/writer/stdafx.cpp @@ -0,0 +1,8 @@ +// stdafx.cpp : 只包括标准包含文件的源文件 +// master_threads.pch 将成为预编译头 +// stdafx.obj 将包含预编译类型信息 + +#include "stdafx.h" + +// TODO: 在 STDAFX.H 中 +//引用任何所需的附加头文件,而不是在此文件中引用 diff --git a/lib_fiber/samples/chat/writer/stdafx.h b/lib_fiber/samples/chat/writer/stdafx.h new file mode 100644 index 000000000..ab94a5314 --- /dev/null +++ b/lib_fiber/samples/chat/writer/stdafx.h @@ -0,0 +1,20 @@ +// stdafx.h : 标准系统包含文件的包含文件, +// 或是常用但不常更改的项目特定的包含文件 +// + +#pragma once + + +//#include +//#include + +// TODO: 在此处引用程序要求的附加头文件 + +#include "lib_acl.h" +#include "fiber/lib_fiber.h" +#include "acl_cpp/lib_acl.hpp" + +#ifdef WIN32 +#define snprintf _snprintf +#endif + diff --git a/lib_fiber/samples/chat/writer/user_client.h b/lib_fiber/samples/chat/writer/user_client.h new file mode 100644 index 000000000..f12c95333 --- /dev/null +++ b/lib_fiber/samples/chat/writer/user_client.h @@ -0,0 +1,63 @@ +#pragma once +#include + +class user_client +{ +public: + user_client(acl::socket_stream& conn, const char* user, + const char* to, int max_loop) + : conn_(conn), name_(user), to_(to), max_loop_(max_loop) {} + ~user_client(void) + { + } + + acl::socket_stream& get_stream(void) const + { + return conn_; + } + + const char* get_name(void) const + { + return name_.c_str(); + } + + const char* get_to(void) const + { + return to_.c_str(); + } + + int get_max_loop(void) const + { + return max_loop_; + } + + bool existing(void) const + { + return existing_; + } + + void set_existing(void) + { + existing_ = true; + } + + void wait_exit(void) + { + int mtype; + chan_exit_.pop(mtype); + } + + void notify_exit(void) + { + int mtype = 1; + chan_exit_.put(mtype); + } + +private: + acl::socket_stream& conn_; + acl::channel chan_exit_; + acl::string name_; + acl::string to_; + int max_loop_; + bool existing_ = false; +}; diff --git a/lib_fiber/samples/chat/writer/valgrind.sh b/lib_fiber/samples/chat/writer/valgrind.sh new file mode 100644 index 000000000..3b90585ff --- /dev/null +++ b/lib_fiber/samples/chat/writer/valgrind.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes -v ./chat_client diff --git a/lib_fiber/samples/redis_channel/main.cpp b/lib_fiber/samples/redis_channel/main.cpp index bfc83d960..7b92b4b27 100644 --- a/lib_fiber/samples/redis_channel/main.cpp +++ b/lib_fiber/samples/redis_channel/main.cpp @@ -5,6 +5,8 @@ static int __fibers_count = 2; static int __fibers_max = 2; static int __oper_count = 100; //static struct timeval __begin, __end; +static ACL_FIBER **__workers = NULL; +static ACL_CHANNEL *__chan_exit = NULL; static acl::redis_client_cluster __redis_cluster; @@ -150,7 +152,7 @@ static bool redis_del(ACL_FIBER& fiber, ACL_CHANNEL &chan, PKT &pkt) return true; } -static void fiber_redis_worker(ACL_FIBER *fiber, void *ctx) +static void fiber_worker(ACL_FIBER *fiber, void *ctx) { ACL_CHANNEL *chan = ((MYCHAN *) ctx)->chan; @@ -199,11 +201,11 @@ static void fiber_redis_worker(ACL_FIBER *fiber, void *ctx) static int __display = 0; -static void fiber_redis(ACL_FIBER *fiber, void *ctx) +static void fiber_result(ACL_FIBER *fiber, void *ctx) { - MYCHANS *mychans = (MYCHANS *) ctx; - MYCHAN *mychan = &mychans->chans[mychans->off++]; - ACL_CHANNEL *chan = mychan->chan; + MYCHANS *mychans = (MYCHANS *) ctx; + MYCHAN *mychan = &mychans->chans[mychans->off++]; + ACL_CHANNEL *chan = mychan->chan; PKT pkt; if (mychans->off == mychans->size) @@ -227,7 +229,7 @@ static void fiber_redis(ACL_FIBER *fiber, void *ctx) PKT* res = (PKT *) acl_channel_recvp(chan); if (res == NULL) { - printf("%s(%d): fiber-%d: acl_channel_recvp errork, key = %s\r\n", + printf("%s(%d): fiber-%d: acl_channel_recvp error, key = %s\r\n", __FUNCTION__, __LINE__, acl_fiber_id(fiber), pkt.key.c_str()); break; @@ -259,10 +261,28 @@ static void fiber_redis(ACL_FIBER *fiber, void *ctx) if (--__fibers_count == 0) { printf("---All fibers are over!---\r\n"); - acl_fiber_schedule_stop(); + unsigned long n = 100; + acl_channel_sendul(__chan_exit, n); } } +static void fiber_wait(ACL_FIBER *, void *ctx) +{ + ACL_CHANNEL *chan = (ACL_CHANNEL *) ctx; + unsigned long n = acl_channel_recvul(chan); + + printf("----fiber-%d: get n: %lu---\r\n", acl_fiber_self(), n); + + for (int i = 0; __workers[i] != NULL; i++) + { + printf("kill fiber-%d\r\n", acl_fiber_id(__workers[i])); + acl_fiber_kill(__workers[i]); + } + + printf("---- fiber schedul stopping now ----\r\n"); + acl_fiber_schedule_stop(); +} + static void usage(const char *procname) { printf("usage: %s -h [help]\r\n" @@ -333,13 +353,20 @@ int main(int argc, char *argv[]) mychans.chans[i].cmd = cmd; } + __workers = new ACL_FIBER*[nworkers + 1]; for (int i = 0; i < nworkers; i++) - acl_fiber_create(fiber_redis_worker, &mychans.chans[i], 32000); + __workers[i] = + acl_fiber_create(fiber_worker, &mychans.chans[i], 32000); - __fibers_count = __fibers_max; + __workers[nworkers] = NULL; + + __fibers_count = nworkers; for (int i = 0; i < __fibers_max; i++) - acl_fiber_create(fiber_redis, &mychans, 32000); + (void) acl_fiber_create(fiber_result, &mychans, 32000); + + __chan_exit = acl_channel_create(sizeof(unsigned long), 1000); + acl_fiber_create(fiber_wait, __chan_exit, 32000); acl_fiber_schedule(); @@ -347,6 +374,9 @@ int main(int argc, char *argv[]) acl_channel_free(mychans.chans[i].chan); delete [] mychans.chans; + acl_channel_free(__chan_exit); + + delete [] __workers; return 0; } diff --git a/lib_fiber/samples/redis_channel/valgrind.sh b/lib_fiber/samples/redis_channel/valgrind.sh new file mode 100644 index 000000000..df80e378f --- /dev/null +++ b/lib_fiber/samples/redis_channel/valgrind.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes -v ./redis -c 100 -n 100 -w 10 diff --git a/lib_fiber/samples/timer_server/main.c b/lib_fiber/samples/timer_server/main.c index 1ad88ea7c..68e23bb57 100644 --- a/lib_fiber/samples/timer_server/main.c +++ b/lib_fiber/samples/timer_server/main.c @@ -20,10 +20,11 @@ static void io_timer(ACL_FIBER *fiber, void *ctx) assert(fiber == ft->timer); acl_fiber_set_errno(ft->fiber, ETIMEDOUT); + acl_fiber_keep_errno(ft->fiber, 1); - printf("timer-%d wakeup, set fiber-%d, errno: %d, %d\r\n", - acl_fiber_id(fiber), acl_fiber_id(ft->fiber), - ETIMEDOUT, acl_fiber_errno(ft->fiber)); +// printf("timer-%d wakeup, set fiber-%d, errno: %d, %d\r\n", +// acl_fiber_id(fiber), acl_fiber_id(ft->fiber), +// ETIMEDOUT, acl_fiber_errno(ft->fiber)); acl_fiber_ready(ft->fiber); } @@ -55,7 +56,10 @@ static void echo_client(ACL_FIBER *fiber, void *ctx) break; if (++ntimeout > 2) + { + printf("too many timeout: %d\r\n", ntimeout); break; + } printf("ntimeout: %d\r\n", ntimeout); ft->timer = acl_fiber_create_timer(__rw_timeout * 1000,