From 2e5724cbeb828cf283f57240ed148c0029409f40 Mon Sep 17 00:00:00 2001 From: ubuntu14 Date: Sun, 26 Jun 2016 12:17:17 +0800 Subject: [PATCH] add acl_mbox.c for thread communication --- lib_acl/changes.txt | 4 + lib_acl/include/stdlib/acl_mbox.h | 23 + .../stdlib/{unix => }/acl_sane_socketpair.h | 7 +- lib_acl/include/stdlib/acl_stdlib.h | 2 + lib_acl/include/stdlib/acl_sys_patch.h | 11 + lib_acl/include/stdlib/unix/acl_unix.h | 1 - lib_acl/lib_acl_vc2003.vcproj | 6 + lib_acl/lib_acl_vc2008.vcproj | 20 +- lib_acl/lib_acl_vc2010.vcxproj | 7 +- lib_acl/lib_acl_vc2010.vcxproj.filters | 17 +- lib_acl/lib_acl_vc2012.vcxproj | 6 +- lib_acl/lib_acl_vc2012.vcxproj.filters | 18 +- lib_acl/lib_acl_vc2015.vcxproj | 5 +- lib_acl/lib_acl_vc2015.vcxproj.filters | 17 +- lib_acl/src/stdlib/acl_mbox.c | 131 ++++++ lib_acl/src/stdlib/iostuff/acl_duplex_pipe.c | 2 +- lib_acl/src/stdlib/sys/acl_sane_socketpair.c | 107 +++++ .../src/stdlib/sys/unix/acl_sane_socketpair.c | 56 --- lib_fiber/c/src/fiber.c | 4 +- lib_fiber/c/src/fiber.h | 7 +- lib_fiber/c/src/fiber_io.c | 393 +--------------- lib_fiber/c/src/hook_io.c | 442 ++++++++++++++++++ lib_fiber/c/src/{fiber_net.c => hook_net.c} | 34 +- lib_fiber/samples/thread_mbox/Makefile | 2 + lib_fiber/samples/thread_mbox/main.c | 106 +++++ 25 files changed, 928 insertions(+), 500 deletions(-) create mode 100644 lib_acl/include/stdlib/acl_mbox.h rename lib_acl/include/stdlib/{unix => }/acl_sane_socketpair.h (50%) create mode 100644 lib_acl/src/stdlib/acl_mbox.c create mode 100644 lib_acl/src/stdlib/sys/acl_sane_socketpair.c delete mode 100644 lib_acl/src/stdlib/sys/unix/acl_sane_socketpair.c create mode 100644 lib_fiber/c/src/hook_io.c rename lib_fiber/c/src/{fiber_net.c => hook_net.c} (87%) create mode 100644 lib_fiber/samples/thread_mbox/Makefile create mode 100644 lib_fiber/samples/thread_mbox/main.c diff --git a/lib_acl/changes.txt b/lib_acl/changes.txt index 0ce015fce..fcdf9509c 100644 --- a/lib_acl/changes.txt +++ b/lib_acl/changes.txt @@ -1,6 +1,10 @@ 修改历史列表: ------------------------------------------------------------------------ +552) 2016.6.26 +552.1) feature: 增加 acl_mbox.c 用于线程之间的单向通信,内部使用了无锁队列方式 +效率更高 -- by niukey@qq.com + 551) 2016.6.21 551.1) safety: acl_vstream.c->acl_vstream_fdopen 中的 rw_timeout > 0 时,应该 再通过 acl_getsocktype 判断一下是否是需要设置非阻塞模式 diff --git a/lib_acl/include/stdlib/acl_mbox.h b/lib_acl/include/stdlib/acl_mbox.h new file mode 100644 index 000000000..a3614d969 --- /dev/null +++ b/lib_acl/include/stdlib/acl_mbox.h @@ -0,0 +1,23 @@ +#ifndef ACL_MBOX_INCLUDE_H +#define ACL_MBOX_INCLUDE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "acl_define.h" + +typedef struct ACL_MBOX ACL_MBOX; + +ACL_API ACL_MBOX *acl_mbox_create(void); +ACL_API void acl_mbox_free(ACL_MBOX *mbox, void (*free_fn)(void*)); +ACL_API int acl_mbox_send(ACL_MBOX *mbox, void *msg); +ACL_API void *acl_mbox_read(ACL_MBOX *mbox, int timeout, int *success); +ACL_API size_t acl_mbox_nsend(ACL_MBOX *mbox); +ACL_API size_t acl_mbox_nread(ACL_MBOX *mbox); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/lib_acl/include/stdlib/unix/acl_sane_socketpair.h b/lib_acl/include/stdlib/acl_sane_socketpair.h similarity index 50% rename from lib_acl/include/stdlib/unix/acl_sane_socketpair.h rename to lib_acl/include/stdlib/acl_sane_socketpair.h index fef2e7b49..bb90ad901 100644 --- a/lib_acl/include/stdlib/unix/acl_sane_socketpair.h +++ b/lib_acl/include/stdlib/acl_sane_socketpair.h @@ -5,14 +5,11 @@ extern "C" { #endif -#include "../acl_define.h" -#ifdef ACL_UNIX +#include "acl_define.h" /* External interface. */ -int acl_sane_socketpair(int domain, int type, int protocol, int result[2]); - -#endif /* ACL_UNIX */ +int acl_sane_socketpair(int domain, int type, int protocol, ACL_SOCKET result[2]); #ifdef __cplusplus } diff --git a/lib_acl/include/stdlib/acl_stdlib.h b/lib_acl/include/stdlib/acl_stdlib.h index a7709a748..9d9b286e6 100644 --- a/lib_acl/include/stdlib/acl_stdlib.h +++ b/lib_acl/include/stdlib/acl_stdlib.h @@ -80,6 +80,8 @@ extern "C" { #include "acl_yqueue.h" #include "acl_ypipe.h" +#include "acl_mbox.h" + #ifdef __cplusplus } #endif diff --git a/lib_acl/include/stdlib/acl_sys_patch.h b/lib_acl/include/stdlib/acl_sys_patch.h index 0688f38ac..eae255024 100644 --- a/lib_acl/include/stdlib/acl_sys_patch.h +++ b/lib_acl/include/stdlib/acl_sys_patch.h @@ -193,6 +193,17 @@ ACL_API acl_int64 acl_file_size(const char *filename); */ ACL_API acl_int64 acl_file_fsize(ACL_FILE_HANDLE fh, ACL_VSTREAM *fp, void *arg); +/** + * 创建 SOCKET 对 + * @param domain {int} + * @param type {int} + * @param protocol {int} + * @param result {ACL_SOCKET [2]} 存储结果 + * @return {int} 成功返回 0,失败返回 -1 + */ +ACL_API int acl_sane_socketpair(int domain, int type, int protocol, + ACL_SOCKET result[2]); + # ifdef __cplusplus } # endif diff --git a/lib_acl/include/stdlib/unix/acl_unix.h b/lib_acl/include/stdlib/unix/acl_unix.h index bed023c28..b5be0caa0 100644 --- a/lib_acl/include/stdlib/unix/acl_unix.h +++ b/lib_acl/include/stdlib/unix/acl_unix.h @@ -9,7 +9,6 @@ extern "C" { #include "acl_timed_wait.h" #include "acl_set_ugid.h" #include "acl_set_eugid.h" -#include "acl_sane_socketpair.h" #include "acl_mychown.h" #include "acl_chroot_uid.h" #include "acl_safe_open.h" diff --git a/lib_acl/lib_acl_vc2003.vcproj b/lib_acl/lib_acl_vc2003.vcproj index b08bb4d28..caa4e55bb 100644 --- a/lib_acl/lib_acl_vc2003.vcproj +++ b/lib_acl/lib_acl_vc2003.vcproj @@ -287,6 +287,9 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y + + @@ -1462,6 +1465,9 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y + + diff --git a/lib_acl/lib_acl_vc2008.vcproj b/lib_acl/lib_acl_vc2008.vcproj index c1d206c0a..91a0b577e 100644 --- a/lib_acl/lib_acl_vc2008.vcproj +++ b/lib_acl/lib_acl_vc2008.vcproj @@ -400,6 +400,10 @@ RelativePath=".\src\stdlib\acl_getopt.c" > + + @@ -663,6 +667,10 @@ RelativePath=".\src\stdlib\sys\acl_safe_getenv.c" > + + @@ -710,10 +718,6 @@ RelativePath=".\src\stdlib\sys\unix\acl_safe_open.c" > - - @@ -1917,6 +1921,10 @@ RelativePath=".\include\stdlib\acl_malloc.h" > + + @@ -2076,10 +2084,6 @@ RelativePath=".\include\stdlib\unix\acl_safe_open.h" > - - diff --git a/lib_acl/lib_acl_vc2010.vcxproj b/lib_acl/lib_acl_vc2010.vcxproj index 317c7ccbf..36ac593f5 100644 --- a/lib_acl/lib_acl_vc2010.vcxproj +++ b/lib_acl/lib_acl_vc2010.vcxproj @@ -224,6 +224,7 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y + @@ -281,6 +282,7 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y + @@ -292,7 +294,6 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y - @@ -569,6 +570,7 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y + @@ -606,7 +608,6 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y - @@ -682,4 +683,4 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y - \ No newline at end of file + diff --git a/lib_acl/lib_acl_vc2010.vcxproj.filters b/lib_acl/lib_acl_vc2010.vcxproj.filters index bc0bf6e57..c0b27e126 100644 --- a/lib_acl/lib_acl_vc2010.vcxproj.filters +++ b/lib_acl/lib_acl_vc2010.vcxproj.filters @@ -191,6 +191,9 @@ Source Files\stdlib + + Source Files\stdlib + Source Files\stdlib @@ -362,6 +365,9 @@ Source Files\stdlib\sys + + Source Files\stdlib\sys + Source Files\stdlib\sys @@ -395,9 +401,6 @@ Source Files\stdlib\sys\unix - - Source Files\stdlib\sys\unix - Source Files\stdlib\sys\unix @@ -1209,6 +1212,9 @@ Header Files\stdlib + + Header Files\stdlib + Header Files\stdlib @@ -1320,9 +1326,6 @@ Header Files\stdlib\unix - - Header Files\stdlib\unix - Header Files\stdlib\unix @@ -1561,4 +1564,4 @@ - \ No newline at end of file + diff --git a/lib_acl/lib_acl_vc2012.vcxproj b/lib_acl/lib_acl_vc2012.vcxproj index 984dfc1cf..7264c8e1d 100644 --- a/lib_acl/lib_acl_vc2012.vcxproj +++ b/lib_acl/lib_acl_vc2012.vcxproj @@ -431,6 +431,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y + @@ -499,7 +500,6 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y - @@ -665,6 +665,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y + @@ -734,6 +735,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y + @@ -777,6 +779,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y + @@ -814,7 +817,6 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y - diff --git a/lib_acl/lib_acl_vc2012.vcxproj.filters b/lib_acl/lib_acl_vc2012.vcxproj.filters index d6fb62a71..7de61f58e 100644 --- a/lib_acl/lib_acl_vc2012.vcxproj.filters +++ b/lib_acl/lib_acl_vc2012.vcxproj.filters @@ -191,6 +191,9 @@ Source Files\stdlib + + Source Files\stdlib + Source Files\stdlib @@ -395,9 +398,6 @@ Source Files\stdlib\sys\unix - - Source Files\stdlib\sys\unix - Source Files\stdlib\sys\unix @@ -935,6 +935,9 @@ Source Files\stdlib + + Source Files\stdlib\sys + @@ -1209,6 +1212,9 @@ Header Files\stdlib + + Header Files\stdlib + Header Files\stdlib @@ -1320,9 +1326,6 @@ Header Files\stdlib\unix - - Header Files\stdlib\unix - Header Files\stdlib\unix @@ -1554,6 +1557,9 @@ Header Files\stdlib + + Header Files\stdlib + diff --git a/lib_acl/lib_acl_vc2015.vcxproj b/lib_acl/lib_acl_vc2015.vcxproj index 0f140830a..5c97f128d 100644 --- a/lib_acl/lib_acl_vc2015.vcxproj +++ b/lib_acl/lib_acl_vc2015.vcxproj @@ -431,6 +431,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y + @@ -488,6 +489,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y + @@ -499,7 +501,6 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y - @@ -771,6 +772,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y + @@ -808,7 +810,6 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y - diff --git a/lib_acl/lib_acl_vc2015.vcxproj.filters b/lib_acl/lib_acl_vc2015.vcxproj.filters index 53897da03..26230772c 100644 --- a/lib_acl/lib_acl_vc2015.vcxproj.filters +++ b/lib_acl/lib_acl_vc2015.vcxproj.filters @@ -191,6 +191,9 @@ Source Files\stdlib + + Source Files\stdlib + Source Files\stdlib @@ -362,6 +365,9 @@ Source Files\stdlib\sys + + Source Files\stdlib\sys + Source Files\stdlib\sys @@ -395,9 +401,6 @@ Source Files\stdlib\sys\unix - - Source Files\stdlib\sys\unix - Source Files\stdlib\sys\unix @@ -1200,6 +1203,9 @@ Header Files\stdlib + + Header Files\stdlib + Header Files\stdlib @@ -1311,9 +1317,6 @@ Header Files\stdlib\unix - - Header Files\stdlib\unix - Header Files\stdlib\unix @@ -1548,4 +1551,4 @@ doc - \ No newline at end of file + diff --git a/lib_acl/src/stdlib/acl_mbox.c b/lib_acl/src/stdlib/acl_mbox.c new file mode 100644 index 000000000..388b92528 --- /dev/null +++ b/lib_acl/src/stdlib/acl_mbox.c @@ -0,0 +1,131 @@ +#include "StdAfx.h" +#ifndef ACL_PREPARE_COMPILE + +#include "stdlib/acl_define.h" +#include "thread/acl_pthread.h" +#include "stdlib/acl_msg.h" +#include "stdlib/acl_mymalloc.h" +#include "stdlib/acl_ypipe.h" +#include "stdlib/acl_vstream.h" +#include "stdlib/acl_iostuff.h" +#include "stdlib/acl_sys_patch.h" +#include "stdlib/acl_mbox.h" + +#endif + +struct ACL_MBOX { + ACL_VSTREAM *in; + ACL_VSTREAM *out; + size_t nsend; + size_t nread; + ACL_YPIPE *ypipe; + acl_pthread_mutex_t *lock; +}; + +static const char __key[] = "k"; + +ACL_MBOX *acl_mbox_create(void) +{ + ACL_MBOX *mbox; + ACL_SOCKET fds[2]; + + if (acl_sane_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) { + acl_msg_error("%s(%d), %s: acl_duplex_pipe error %s", + __FILE__, __LINE__, __FUNCTION__, acl_last_serror()); + return NULL; + } + + mbox = (ACL_MBOX *) acl_mymalloc(sizeof(ACL_MBOX)); + mbox->in = acl_vstream_fdopen(fds[0], O_RDONLY, sizeof(__key), + 0, ACL_VSTREAM_TYPE_SOCK); + mbox->out = acl_vstream_fdopen(fds[1], O_WRONLY, sizeof(__key), + 0, ACL_VSTREAM_TYPE_SOCK); + mbox->nsend = 0; + mbox->nread = 0; + mbox->ypipe = acl_ypipe_new(); + mbox->lock = acl_pthread_mutex_create(); + + return mbox; +} + +void acl_mbox_free(ACL_MBOX *mbox, void (*free_fn)(void*)) +{ + acl_vstream_close(mbox->in); + acl_vstream_close(mbox->out); + acl_ypipe_free(mbox->ypipe, free_fn); + acl_pthread_mutex_destroy(mbox->lock); + acl_myfree(mbox); +} + +int acl_mbox_send(ACL_MBOX *mbox, void *msg) +{ + int ret; + + acl_pthread_mutex_lock(mbox->lock); + acl_ypipe_write(mbox->ypipe, msg); + ret = acl_ypipe_flush(mbox->ypipe); + acl_pthread_mutex_unlock(mbox->lock); + if (ret == 0) + return 0; + + mbox->nsend++; + + if (acl_vstream_writen(mbox->out, __key, sizeof(__key) - 1) + == ACL_VSTREAM_EOF) + { + return -1; + } else + return 0; + +} + +void *acl_mbox_read(ACL_MBOX *mbox, int timeout, int *success) +{ + int ret; + char kbuf[sizeof(__key)]; + void *msg = acl_ypipe_read(mbox->ypipe); + + if (msg != NULL) { + if (success) + *success = 0; + return msg; + } + + mbox->nread++; + mbox->in->rw_timeout = timeout; + + ret = acl_vstream_readn(mbox->in, kbuf, sizeof(kbuf) - 1); + if (ret == ACL_VSTREAM_EOF) { + if (mbox->in->errnum == ACL_ETIMEDOUT) { + if (success) + *success = 0; + return NULL; + } + + if (success) + *success = -1; + return NULL; + } + + if (kbuf[0] != __key[0]) { + acl_msg_error("%s(%d), %s: read invalid: %c", + __FILE__, __LINE__, __FUNCTION__, kbuf[0]); + if (success) + *success = -1; + return NULL; + } + + if (success) + *success = 0; + return acl_ypipe_read(mbox->ypipe); +} + +size_t acl_mbox_nsend(ACL_MBOX *mbox) +{ + return mbox->nsend; +} + +size_t acl_mbox_nread(ACL_MBOX *mbox) +{ + return mbox->nread; +} diff --git a/lib_acl/src/stdlib/iostuff/acl_duplex_pipe.c b/lib_acl/src/stdlib/iostuff/acl_duplex_pipe.c index 471327160..65b158eaa 100644 --- a/lib_acl/src/stdlib/iostuff/acl_duplex_pipe.c +++ b/lib_acl/src/stdlib/iostuff/acl_duplex_pipe.c @@ -9,7 +9,7 @@ #ifdef ACL_UNIX #include #include -#include "stdlib/unix/acl_sane_socketpair.h" +#include "stdlib/acl_sys_patch.h" #endif #ifdef ACL_WINDOWS diff --git a/lib_acl/src/stdlib/sys/acl_sane_socketpair.c b/lib_acl/src/stdlib/sys/acl_sane_socketpair.c new file mode 100644 index 000000000..70c9f5153 --- /dev/null +++ b/lib_acl/src/stdlib/sys/acl_sane_socketpair.c @@ -0,0 +1,107 @@ +/* System library. */ +#include "StdAfx.h" +#ifndef ACL_PREPARE_COMPILE + +#include "stdlib/acl_define.h" + +#ifdef ACL_BCB_COMPILER +#pragma hdrstop +#endif + +#endif + +#ifdef ACL_UNIX + +#include +#include +#include +#include + +/* Utility library. */ + +#include "stdlib/acl_msg.h" +#include "stdlib/acl_sys_patch.h" + +/* sane_socketpair - sanitize socketpair() error returns */ + +int acl_sane_socketpair(int domain, int type, int protocol, ACL_SOCKET result[2]) +{ + static int socketpair_ok_errors[] = { + EINTR, + 0, + }; + int count; + int err; + int ret; + + /* + * Solaris socketpair() can fail with EINTR. + */ + while ((ret = socketpair(domain, type, protocol, result)) < 0) { + for (count = 0; /* void */ ; count++) { + if ((err = socketpair_ok_errors[count]) == 0) + return (ret); + if (acl_last_error() == err) { + char tbuf[256]; + acl_msg_warn("socketpair: %s (trying again)", + acl_last_strerror(tbuf, sizeof(tbuf))); + sleep(1); + break; + } + } + } + return (ret); +} + +#elif defined(ACL_WINDOWS) + +int acl_sane_socketpair(int domain, int type, int protocol, ACL_SOCKET result[2]) +{ + ACL_SOCKET listener = acl_inet_listen("127.0.0.1:0", 1, ACL_BLOCKING); + char addr[64]; + + (void) domain; + + result[0] = ACL_SOCKET_INVALID; + result[1] = ACL_SOCKET_INVALID; + + if (listener == ACL_SOCKET_INVALID) { + acl_msg_error("%s(%d), %s: listen error %s", + __FILE__, __LINE__, __FUNCTION__, acl_last_serror()); + return -1; + } + + acl_tcp_set_nodelay(listener); + if (acl_getsockname(listener, addr, sizeof(addr)) < 0) { + acl_msg_error("%s(%d), %s: getoskname error %s", + __FILE__, __LINE__, __FUNCTION__, acl_last_serror()); + acl_socket_close(listener); + return -1; + } + + result[0] = acl_inet_connect(addr, ACL_BLOCKING, 0); + if (result[0] == ACL_SOCKET_INVALID) { + acl_msg_error("%s(%d), %s: connect %s error %s", + __FILE__, __LINE__, __FUNCTION__, addr, acl_last_serror()); + acl_socket_close(listener); + return -1; + } + + result[1] = acl_inet_accept(listener); + + acl_socket_close(listener); + + if (result[1] == ACL_SOCKET_INVALID) { + acl_msg_error("%s(%d), %s: accept error %s", + __FILE__, __LINE__, __FUNCTION__, acl_last_serror()); + acl_socket_close(result[0]); + result[0] = ACL_SOCKET_INVALID; + return -1; + } + + acl_tcp_set_nodelay(result[0]); + acl_tcp_set_nodelay(result[1]); + return 0; +} + +#endif /* ACL_WINDOWS */ diff --git a/lib_acl/src/stdlib/sys/unix/acl_sane_socketpair.c b/lib_acl/src/stdlib/sys/unix/acl_sane_socketpair.c deleted file mode 100644 index 8ae0748b6..000000000 --- a/lib_acl/src/stdlib/sys/unix/acl_sane_socketpair.c +++ /dev/null @@ -1,56 +0,0 @@ -/* System library. */ -#include "StdAfx.h" -#ifndef ACL_PREPARE_COMPILE - -#include "stdlib/acl_define.h" - -#ifdef ACL_BCB_COMPILER -#pragma hdrstop -#endif - -#endif - -#ifdef ACL_UNIX - -#include -#include -#include -#include - -/* Utility library. */ - -#include "stdlib/acl_msg.h" -#include "stdlib/unix/acl_sane_socketpair.h" - -/* sane_socketpair - sanitize socketpair() error returns */ - -int acl_sane_socketpair(int domain, int type, int protocol, int result[2]) -{ - static int socketpair_ok_errors[] = { - EINTR, - 0, - }; - int count; - int err; - int ret; - - /* - * Solaris socketpair() can fail with EINTR. - */ - while ((ret = socketpair(domain, type, protocol, result)) < 0) { - for (count = 0; /* void */ ; count++) { - if ((err = socketpair_ok_errors[count]) == 0) - return (ret); - if (acl_last_error() == err) { - char tbuf[256]; - acl_msg_warn("socketpair: %s (trying again)", - acl_last_strerror(tbuf, sizeof(tbuf))); - sleep(1); - break; - } - } - } - return (ret); -} -#endif /* ACL_UNIX*/ - diff --git a/lib_fiber/c/src/fiber.c b/lib_fiber/c/src/fiber.c index bb0893ef8..d1f3681da 100644 --- a/lib_fiber/c/src/fiber.c +++ b/lib_fiber/c/src/fiber.c @@ -345,8 +345,8 @@ static void fiber_init(void) __sys_errno = (errno_fn) dlsym(RTLD_NEXT, "__errno_location"); __sys_fcntl = (fcntl_fn) dlsym(RTLD_NEXT, "fcntl"); - fiber_io_hook(); - fiber_net_hook(); + fiber_hook_io(); + fiber_hook_net(); } void fiber_schedule(void) diff --git a/lib_fiber/c/src/fiber.h b/lib_fiber/c/src/fiber.h index c7106196f..320ba294c 100644 --- a/lib_fiber/c/src/fiber.h +++ b/lib_fiber/c/src/fiber.h @@ -103,15 +103,18 @@ void fiber_count_inc(void); void fiber_count_dec(void); /* in fiber_io.c */ -void fiber_io_hook(void); void fiber_io_check(void); +void fiber_io_close(int fd); void fiber_wait_read(int fd); void fiber_wait_write(int fd); void fiber_io_dec(void); void fiber_io_inc(void); EVENT *fiber_io_event(void); +/* in hook_io.c */ +void fiber_hook_io(void); + /* in fiber_net.c */ -void fiber_net_hook(void); +void fiber_hook_net(void); #endif diff --git a/lib_fiber/c/src/fiber_io.c b/lib_fiber/c/src/fiber_io.c index 39848f9f5..16bae46cf 100644 --- a/lib_fiber/c/src/fiber_io.c +++ b/lib_fiber/c/src/fiber_io.c @@ -1,39 +1,8 @@ #include "stdafx.h" -#include -#define __USE_GNU -#include -#include #include "fiber/lib_fiber.h" #include "event.h" #include "fiber.h" -typedef int (*close_fn)(int); -typedef ssize_t (*read_fn)(int, void *, size_t); -typedef ssize_t (*readv_fn)(int, const struct iovec *, int); -typedef ssize_t (*recv_fn)(int, void *, size_t, int); -typedef ssize_t (*recvfrom_fn)(int, void *, size_t, int, - struct sockaddr *, socklen_t *); -typedef ssize_t (*recvmsg_fn)(int, struct msghdr *, int); -typedef ssize_t (*write_fn)(int, const void *, size_t); -typedef ssize_t (*writev_fn)(int, const struct iovec *, int); -typedef ssize_t (*send_fn)(int, const void *, size_t, int); -typedef ssize_t (*sendto_fn)(int, const void *, size_t, int, - const struct sockaddr *, socklen_t); -typedef ssize_t (*sendmsg_fn)(int, const struct msghdr *, int); - -static close_fn __sys_close = NULL; -static read_fn __sys_read = NULL; -static readv_fn __sys_readv = NULL; -static recv_fn __sys_recv = NULL; -static recvfrom_fn __sys_recvfrom = NULL; -static recvmsg_fn __sys_recvmsg = NULL; - -static write_fn __sys_write = NULL; -static writev_fn __sys_writev = NULL; -static send_fn __sys_send = NULL; -static sendto_fn __sys_sendto = NULL; -static sendmsg_fn __sys_sendmsg = NULL; - typedef struct { EVENT *event; FIBER **io_fibers; @@ -53,29 +22,6 @@ static void fiber_io_loop(FIBER *fiber, void *ctx); #define STACK_SIZE 819200 static int __maxfd = 1024; -void fiber_io_hook(void) -{ - static int __called = 0; - - if (__called) - return; - - __called++; - - __sys_close = (close_fn) dlsym(RTLD_NEXT, "close"); - __sys_read = (read_fn) dlsym(RTLD_NEXT, "read"); - __sys_readv = (readv_fn) dlsym(RTLD_NEXT, "readv"); - __sys_recv = (recv_fn) dlsym(RTLD_NEXT, "recv"); - __sys_recvfrom = (recvfrom_fn) dlsym(RTLD_NEXT, "recvfrom"); - __sys_recvmsg = (recvmsg_fn) dlsym(RTLD_NEXT, "recvmsg"); - - __sys_write = (write_fn) dlsym(RTLD_NEXT, "write"); - __sys_writev = (writev_fn) dlsym(RTLD_NEXT, "writev"); - __sys_send = (send_fn) dlsym(RTLD_NEXT, "send"); - __sys_sendto = (sendto_fn) dlsym(RTLD_NEXT, "sendto"); - __sys_sendmsg = (sendmsg_fn) dlsym(RTLD_NEXT, "sendmsg"); -} - void fiber_io_stop(void) { fiber_io_check(); @@ -177,6 +123,12 @@ EVENT *fiber_io_event(void) return __thread_fiber->event; } +void fiber_io_close(int fd) +{ + if (__thread_fiber != NULL) + event_del(__thread_fiber->event, fd, EVENT_ERROR); +} + static void fiber_io_loop(FIBER *self acl_unused, void *ctx) { EVENT *ev = (EVENT *) ctx; @@ -381,336 +333,3 @@ void fiber_wait_write(int fd) fiber_switch(); } - -int close(int fd) -{ - int ret; - - if (fd < 0) { - acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd); - return -1; - } - - if (__thread_fiber != NULL) - event_del(__thread_fiber->event, fd, EVENT_ERROR); - - ret = __sys_close(fd); - if (ret == 0) - return ret; - - fiber_save_errno(); - return ret; -} - -#define READ_WAIT_FIRST - -#ifdef READ_WAIT_FIRST - -ssize_t read(int fd, void *buf, size_t count) -{ - ssize_t ret; - - if (fd < 0) { - acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd); - return -1; - } - - fiber_wait_read(fd); - - ret = __sys_read(fd, buf, count); - if (ret > 0) - return ret; - - fiber_save_errno(); - return ret; -} - -ssize_t readv(int fd, const struct iovec *iov, int iovcnt) -{ - ssize_t ret; - - if (fd < 0) { - acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd); - return -1; - } - - fiber_wait_read(fd); - ret = __sys_readv(fd, iov, iovcnt); - if (ret > 0) - return ret; - - fiber_save_errno(); - return ret; -} - -ssize_t recv(int sockfd, void *buf, size_t len, int flags) -{ - ssize_t ret; - - if (sockfd < 0) { - acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd); - return -1; - } - - fiber_wait_read(sockfd); - ret = __sys_recv(sockfd, buf, len, flags); - if (ret > 0) - return ret; - - fiber_save_errno(); - return ret; -} - -ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, - struct sockaddr *src_addr, socklen_t *addrlen) -{ - ssize_t ret; - - if (sockfd < 0) { - acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd); - return -1; - } - - fiber_wait_read(sockfd); - ret = __sys_recvfrom(sockfd, buf, len, flags, src_addr, addrlen); - if (ret > 0) - return ret; - - fiber_save_errno(); - return ret; -} - -ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags) -{ - ssize_t ret; - - if (sockfd < 0) { - acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd); - return -1; - } - - fiber_wait_read(sockfd); - ret = __sys_recvmsg(sockfd, msg, flags); - if (ret > 0) - return ret; - - fiber_save_errno(); - return ret; -} - -#else - -ssize_t read(int fd, void *buf, size_t count) -{ - while (1) { - ssize_t n = __sys_read(fd, buf, count); - - if (n >= 0) - return n; - - fiber_save_errno(); - -#if EAGAIN == EWOULDBLOCK - if (errno != EAGAIN) -#else - if (errno != EAGAIN && errno != EWOULDBLOCK) -#endif - return -1; - fiber_wait_read(fd); - } -} - -ssize_t readv(int fd, const struct iovec *iov, int iovcnt) -{ - while (1) { - ssize_t n = __sys_readv(fd, iov, iovcnt); - - if (n >= 0) - return n; - - fiber_save_errno(); - -#if EAGAIN == EWOULDBLOCK - if (errno != EAGAIN) -#else - if (errno != EAGAIN && errno != EWOULDBLOCK) -#endif - return -1; - - fiber_wait_read(fd); - } -} - -ssize_t recv(int sockfd, void *buf, size_t len, int flags) -{ - while (1) { - ssize_t n = __sys_recv(sockfd, buf, len, flags); - - if (n >= 0) - return n; - - fiber_save_errno(); - -#if EAGAIN == EWOULDBLOCK - if (errno != EAGAIN) -#else - if (errno != EAGAIN && errno != EWOULDBLOCK) -#endif - return -1; - - fiber_wait_read(sockfd); - } -} - -ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, - struct sockaddr *src_addr, socklen_t *addrlen) -{ - while (1) { - ssize_t n = __sys_recvfrom(sockfd, buf, len, flags, - src_addr, addrlen); - - if (n >= 0) - return n; - - fiber_save_errno(); - -#if EAGAIN == EWOULDBLOCK - if (errno != EAGAIN) -#else - if (errno != EAGAIN && errno != EWOULDBLOCK) -#endif - return -1; - - fiber_wait_read(sockfd); - } -} - -ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags) -{ - while (1) { - ssize_t n = __sys_recvmsg(sockfd, msg, flags); - - if (n >= 0) - return n; - - fiber_save_errno(); - -#if EAGAIN == EWOULDBLOCK - if (errno != EAGAIN) -#else - if (errno != EAGAIN && errno != EWOULDBLOCK) -#endif - return -1; - - fiber_wait_read(sockfd); - } -} - -#endif - -ssize_t write(int fd, const void *buf, size_t count) -{ - while (1) { - ssize_t n = __sys_write(fd, buf, count); - - if (n >= 0) - return n; - - fiber_save_errno(); - -#if EAGAIN == EWOULDBLOCK - if (errno != EAGAIN) -#else - if (errno != EAGAIN && errno != EWOULDBLOCK) -#endif - return -1; - - fiber_wait_write(fd); - } -} - -ssize_t writev(int fd, const struct iovec *iov, int iovcnt) -{ - while (1) { - ssize_t n = __sys_writev(fd, iov, iovcnt); - - if (n >= 0) - return n; - - fiber_save_errno(); - -#if EAGAIN == EWOULDBLOCK - if (errno != EAGAIN) -#else - if (errno != EAGAIN && errno != EWOULDBLOCK) -#endif - return -1; - - fiber_wait_write(fd); - } -} - -ssize_t send(int sockfd, const void *buf, size_t len, int flags) -{ - while (1) { - ssize_t n = __sys_send(sockfd, buf, len, flags); - - if (n >= 0) - return n; - - fiber_save_errno(); - -#if EAGAIN == EWOULDBLOCK - if (errno != EAGAIN) -#else - if (errno != EAGAIN && errno != EWOULDBLOCK) -#endif - return -1; - - fiber_wait_write(sockfd); - } -} - -ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, - const struct sockaddr *dest_addr, socklen_t addrlen) -{ - while (1) { - ssize_t n = __sys_sendto(sockfd, buf, len, flags, - dest_addr, addrlen); - - if (n >= 0) - return n; - - fiber_save_errno(); - -#if EAGAIN == EWOULDBLOCK - if (errno != EAGAIN) -#else - if (errno != EAGAIN && errno != EWOULDBLOCK) -#endif - return -1; - - fiber_wait_write(sockfd); - } -} - -ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags) -{ - while (1) { - ssize_t n = __sys_sendmsg(sockfd, msg, flags); - - if (n >= 0) - return n; - - fiber_save_errno(); - -#if EAGAIN == EWOULDBLOCK - if (errno != EAGAIN) -#else - if (errno != EAGAIN && errno != EWOULDBLOCK) -#endif - return -1; - - fiber_wait_write(sockfd); - } -} diff --git a/lib_fiber/c/src/hook_io.c b/lib_fiber/c/src/hook_io.c new file mode 100644 index 000000000..7d9a40d35 --- /dev/null +++ b/lib_fiber/c/src/hook_io.c @@ -0,0 +1,442 @@ +#include "stdafx.h" + +#ifdef HAS_PIPE2 +#define _GNU_SOURCE +#endif + +#include +#include + +#define __USE_GNU +#include +#include +#include "fiber.h" + +typedef int (*pipe_fn)(int pipefd[2]); +#ifdef HAS_PIPE2 +typedef int (*pipe2_fn)(int pipefd[2], int flags); +#endif +typedef FILE *(*popen_fn)(const char *, const char *); +typedef int (*pclose_fn)(FILE *); +typedef int (*close_fn)(int); +typedef ssize_t (*read_fn)(int, void *, size_t); +typedef ssize_t (*readv_fn)(int, const struct iovec *, int); +typedef ssize_t (*recv_fn)(int, void *, size_t, int); +typedef ssize_t (*recvfrom_fn)(int, void *, size_t, int, + struct sockaddr *, socklen_t *); +typedef ssize_t (*recvmsg_fn)(int, struct msghdr *, int); +typedef ssize_t (*write_fn)(int, const void *, size_t); +typedef ssize_t (*writev_fn)(int, const struct iovec *, int); +typedef ssize_t (*send_fn)(int, const void *, size_t, int); +typedef ssize_t (*sendto_fn)(int, const void *, size_t, int, + const struct sockaddr *, socklen_t); +typedef ssize_t (*sendmsg_fn)(int, const struct msghdr *, int); + +static pipe_fn __sys_pipe = NULL; +#ifdef HAS_PIPE2 +static pipe2_fn __sys_pipe2 = NULL; +#endif +static popen_fn __sys_popen = NULL; +static pclose_fn __sys_pclose = NULL; +static close_fn __sys_close = NULL; +static read_fn __sys_read = NULL; +static readv_fn __sys_readv = NULL; +static recv_fn __sys_recv = NULL; +static recvfrom_fn __sys_recvfrom = NULL; +static recvmsg_fn __sys_recvmsg = NULL; + +static write_fn __sys_write = NULL; +static writev_fn __sys_writev = NULL; +static send_fn __sys_send = NULL; +static sendto_fn __sys_sendto = NULL; +static sendmsg_fn __sys_sendmsg = NULL; + +void fiber_hook_io(void) +{ + static int __called = 0; + + if (__called) + return; + + __called++; + + __sys_pipe = (pipe_fn) dlsym(RTLD_NEXT, "pipe"); +#ifdef HAS_PIPE2 + __sys_pipe2 = (pipe2_fn) dlsym(RTLD_NEXT, "pipe2"); +#endif + __sys_popen = (popen_fn) dlsym(RTLD_NEXT, "popen"); + __sys_pclose = (pclose_fn) dlsym(RTLD_NEXT, "pclose"); + __sys_close = (close_fn) dlsym(RTLD_NEXT, "close"); + __sys_read = (read_fn) dlsym(RTLD_NEXT, "read"); + __sys_readv = (readv_fn) dlsym(RTLD_NEXT, "readv"); + __sys_recv = (recv_fn) dlsym(RTLD_NEXT, "recv"); + __sys_recvfrom = (recvfrom_fn) dlsym(RTLD_NEXT, "recvfrom"); + __sys_recvmsg = (recvmsg_fn) dlsym(RTLD_NEXT, "recvmsg"); + + __sys_write = (write_fn) dlsym(RTLD_NEXT, "write"); + __sys_writev = (writev_fn) dlsym(RTLD_NEXT, "writev"); + __sys_send = (send_fn) dlsym(RTLD_NEXT, "send"); + __sys_sendto = (sendto_fn) dlsym(RTLD_NEXT, "sendto"); + __sys_sendmsg = (sendmsg_fn) dlsym(RTLD_NEXT, "sendmsg"); +} + +int pipe(int pipefd[2]) +{ + int ret = __sys_pipe(pipefd); + + if (ret < 0) + fiber_save_errno(); + return ret; +} + +#ifdef HAS_PIPE2 +int pipe2(int pipefd[2], int flags) +{ + int ret = __sys_pipe2(pipefd, flags); + + if (ret < 0) + fiber_save_errno(); + return ret; +} +#endif + +FILE *popen(const char *command, const char *type) +{ + FILE *fp = __sys_popen(command, type); + + if (fp == NULL) + fiber_save_errno(); + return fp; +} + +int close(int fd) +{ + int ret; + + if (fd < 0) { + acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd); + return -1; + } + + fiber_io_close(fd); + + ret = __sys_close(fd); + if (ret == 0) + return ret; + + fiber_save_errno(); + return ret; +} + +#define READ_WAIT_FIRST + +#ifdef READ_WAIT_FIRST + +ssize_t read(int fd, void *buf, size_t count) +{ + ssize_t ret; + + if (fd < 0) { + acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd); + return -1; + } + + fiber_wait_read(fd); + + ret = __sys_read(fd, buf, count); + if (ret > 0) + return ret; + + fiber_save_errno(); + return ret; +} + +ssize_t readv(int fd, const struct iovec *iov, int iovcnt) +{ + ssize_t ret; + + if (fd < 0) { + acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd); + return -1; + } + + fiber_wait_read(fd); + ret = __sys_readv(fd, iov, iovcnt); + if (ret > 0) + return ret; + + fiber_save_errno(); + return ret; +} + +ssize_t recv(int sockfd, void *buf, size_t len, int flags) +{ + ssize_t ret; + + if (sockfd < 0) { + acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd); + return -1; + } + + fiber_wait_read(sockfd); + ret = __sys_recv(sockfd, buf, len, flags); + if (ret > 0) + return ret; + + fiber_save_errno(); + return ret; +} + +ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, + struct sockaddr *src_addr, socklen_t *addrlen) +{ + ssize_t ret; + + if (sockfd < 0) { + acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd); + return -1; + } + + fiber_wait_read(sockfd); + ret = __sys_recvfrom(sockfd, buf, len, flags, src_addr, addrlen); + if (ret > 0) + return ret; + + fiber_save_errno(); + return ret; +} + +ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags) +{ + ssize_t ret; + + if (sockfd < 0) { + acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd); + return -1; + } + + fiber_wait_read(sockfd); + ret = __sys_recvmsg(sockfd, msg, flags); + if (ret > 0) + return ret; + + fiber_save_errno(); + return ret; +} + +#else + +ssize_t read(int fd, void *buf, size_t count) +{ + while (1) { + ssize_t n = __sys_read(fd, buf, count); + + if (n >= 0) + return n; + + fiber_save_errno(); + +#if EAGAIN == EWOULDBLOCK + if (errno != EAGAIN) +#else + if (errno != EAGAIN && errno != EWOULDBLOCK) +#endif + return -1; + fiber_wait_read(fd); + } +} + +ssize_t readv(int fd, const struct iovec *iov, int iovcnt) +{ + while (1) { + ssize_t n = __sys_readv(fd, iov, iovcnt); + + if (n >= 0) + return n; + + fiber_save_errno(); + +#if EAGAIN == EWOULDBLOCK + if (errno != EAGAIN) +#else + if (errno != EAGAIN && errno != EWOULDBLOCK) +#endif + return -1; + + fiber_wait_read(fd); + } +} + +ssize_t recv(int sockfd, void *buf, size_t len, int flags) +{ + while (1) { + ssize_t n = __sys_recv(sockfd, buf, len, flags); + + if (n >= 0) + return n; + + fiber_save_errno(); + +#if EAGAIN == EWOULDBLOCK + if (errno != EAGAIN) +#else + if (errno != EAGAIN && errno != EWOULDBLOCK) +#endif + return -1; + + fiber_wait_read(sockfd); + } +} + +ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, + struct sockaddr *src_addr, socklen_t *addrlen) +{ + while (1) { + ssize_t n = __sys_recvfrom(sockfd, buf, len, flags, + src_addr, addrlen); + + if (n >= 0) + return n; + + fiber_save_errno(); + +#if EAGAIN == EWOULDBLOCK + if (errno != EAGAIN) +#else + if (errno != EAGAIN && errno != EWOULDBLOCK) +#endif + return -1; + + fiber_wait_read(sockfd); + } +} + +ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags) +{ + while (1) { + ssize_t n = __sys_recvmsg(sockfd, msg, flags); + + if (n >= 0) + return n; + + fiber_save_errno(); + +#if EAGAIN == EWOULDBLOCK + if (errno != EAGAIN) +#else + if (errno != EAGAIN && errno != EWOULDBLOCK) +#endif + return -1; + + fiber_wait_read(sockfd); + } +} + +#endif + +ssize_t write(int fd, const void *buf, size_t count) +{ + while (1) { + ssize_t n = __sys_write(fd, buf, count); + + if (n >= 0) + return n; + + fiber_save_errno(); + +#if EAGAIN == EWOULDBLOCK + if (errno != EAGAIN) +#else + if (errno != EAGAIN && errno != EWOULDBLOCK) +#endif + return -1; + + fiber_wait_write(fd); + } +} + +ssize_t writev(int fd, const struct iovec *iov, int iovcnt) +{ + while (1) { + ssize_t n = __sys_writev(fd, iov, iovcnt); + + if (n >= 0) + return n; + + fiber_save_errno(); + +#if EAGAIN == EWOULDBLOCK + if (errno != EAGAIN) +#else + if (errno != EAGAIN && errno != EWOULDBLOCK) +#endif + return -1; + + fiber_wait_write(fd); + } +} + +ssize_t send(int sockfd, const void *buf, size_t len, int flags) +{ + while (1) { + ssize_t n = __sys_send(sockfd, buf, len, flags); + + if (n >= 0) + return n; + + fiber_save_errno(); + +#if EAGAIN == EWOULDBLOCK + if (errno != EAGAIN) +#else + if (errno != EAGAIN && errno != EWOULDBLOCK) +#endif + return -1; + + fiber_wait_write(sockfd); + } +} + +ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, + const struct sockaddr *dest_addr, socklen_t addrlen) +{ + while (1) { + ssize_t n = __sys_sendto(sockfd, buf, len, flags, + dest_addr, addrlen); + + if (n >= 0) + return n; + + fiber_save_errno(); + +#if EAGAIN == EWOULDBLOCK + if (errno != EAGAIN) +#else + if (errno != EAGAIN && errno != EWOULDBLOCK) +#endif + return -1; + + fiber_wait_write(sockfd); + } +} + +ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags) +{ + while (1) { + ssize_t n = __sys_sendmsg(sockfd, msg, flags); + + if (n >= 0) + return n; + + fiber_save_errno(); + +#if EAGAIN == EWOULDBLOCK + if (errno != EAGAIN) +#else + if (errno != EAGAIN && errno != EWOULDBLOCK) +#endif + return -1; + + fiber_wait_write(sockfd); + } +} diff --git a/lib_fiber/c/src/fiber_net.c b/lib_fiber/c/src/hook_net.c similarity index 87% rename from lib_fiber/c/src/fiber_net.c rename to lib_fiber/c/src/hook_net.c index c0f986504..149ddfbae 100644 --- a/lib_fiber/c/src/fiber_net.c +++ b/lib_fiber/c/src/hook_net.c @@ -10,18 +10,20 @@ #include "fiber.h" typedef int (*socket_fn)(int, int, int); +typedef int (*socketpair_fn)(int, int, int, int sv[2]); typedef int (*bind_fn)(int, const struct sockaddr *, socklen_t); typedef int (*listen_fn)(int, int); typedef int (*accept_fn)(int, struct sockaddr *, socklen_t *); typedef int (*connect_fn)(int, const struct sockaddr *, socklen_t); -static socket_fn __sys_socket = NULL; -static bind_fn __sys_bind = NULL; -static listen_fn __sys_listen = NULL; -static accept_fn __sys_accept = NULL; -static connect_fn __sys_connect = NULL; +static socket_fn __sys_socket = NULL; +static socketpair_fn __sys_socketpair = NULL; +static bind_fn __sys_bind = NULL; +static listen_fn __sys_listen = NULL; +static accept_fn __sys_accept = NULL; +static connect_fn __sys_connect = NULL; -void fiber_net_hook(void) +void fiber_hook_net(void) { static int __called = 0; @@ -30,11 +32,12 @@ void fiber_net_hook(void) __called++; - __sys_socket = (socket_fn) dlsym(RTLD_NEXT, "socket"); - __sys_bind = (bind_fn) dlsym(RTLD_NEXT, "bind"); - __sys_listen = (listen_fn) dlsym(RTLD_NEXT, "listen"); - __sys_accept = (accept_fn) dlsym(RTLD_NEXT, "accept"); - __sys_connect = (connect_fn) dlsym(RTLD_NEXT, "connect"); + __sys_socket = (socket_fn) dlsym(RTLD_NEXT, "socket"); + __sys_socketpair = (socketpair_fn) dlsym(RTLD_NEXT, "socketpair"); + __sys_bind = (bind_fn) dlsym(RTLD_NEXT, "bind"); + __sys_listen = (listen_fn) dlsym(RTLD_NEXT, "listen"); + __sys_accept = (accept_fn) dlsym(RTLD_NEXT, "accept"); + __sys_connect = (connect_fn) dlsym(RTLD_NEXT, "connect"); } int socket(int domain, int type, int protocol) @@ -48,6 +51,15 @@ int socket(int domain, int type, int protocol) return sockfd; } +int socketpair(int domain, int type, int protocol, int sv[2]) +{ + int ret = __sys_socketpair(domain, type, protocol, sv); + + if (ret < 0) + fiber_save_errno(); + return ret; +} + int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen) { if (__sys_bind(sockfd, addr, addrlen) == 0) diff --git a/lib_fiber/samples/thread_mbox/Makefile b/lib_fiber/samples/thread_mbox/Makefile new file mode 100644 index 000000000..c63289264 --- /dev/null +++ b/lib_fiber/samples/thread_mbox/Makefile @@ -0,0 +1,2 @@ +include ../Makefile.in +PROG = thread_mbox diff --git a/lib_fiber/samples/thread_mbox/main.c b/lib_fiber/samples/thread_mbox/main.c new file mode 100644 index 000000000..c37268532 --- /dev/null +++ b/lib_fiber/samples/thread_mbox/main.c @@ -0,0 +1,106 @@ +#include "lib_acl.h" +#include +#include +#include +#include +#include "fiber/lib_fiber.h" + +static int __oper_count = 2; +static int __fibers_max = 2; +static int __fibers_cur = 2; + +static void *thread_main(void *ctx) +{ + ACL_MBOX *mbox = (ACL_MBOX *) ctx; + int i; + + for (i = 0; i < __oper_count; i++) { + char *ptr = acl_mystrdup("hello world!"); + if (acl_mbox_send(mbox, ptr) < 0) { + printf("send error!\r\n"); + break; + } + } + + return NULL; +} + +static void free_msg(void *msg) +{ + printf("---free one ---\r\n"); + acl_myfree(msg); +} + +static void fiber_main(FIBER *fiber acl_unused, void *ctx) +{ + ACL_MBOX *mbox = (ACL_MBOX *) ctx; + acl_pthread_attr_t attr; + acl_pthread_t tid; + int i; + + acl_pthread_attr_init(&attr); + acl_pthread_attr_setdetachstate(&attr, ACL_PTHREAD_CREATE_DETACHED); + acl_pthread_create(&tid, &attr, thread_main, mbox); + + for (i = 0; i < __oper_count; i++) { + char *ptr = (char *) acl_mbox_read(mbox, 0, NULL); + if (ptr == NULL) + break; + if (i < 10) + printf("read in: %s\r\n", ptr); + acl_myfree(ptr); + } + + printf(">>>nsend: %d / %d, nread: %d / %d\r\n", + (int) acl_mbox_nsend(mbox), __oper_count, + (int) acl_mbox_nread(mbox), __oper_count); + + acl_mbox_free(mbox, free_msg); + + if (--__fibers_cur == 0) { + printf("---- All over now! ----\r\n"); + printf("Enter any key to exit ..."); + fflush(stdout); + getchar(); + + fiber_io_stop(); + } +} + +static void usage(const char *procname) +{ + printf("usage: %s -h [help] -c nfibers\r\n", procname); +} + +int main(int argc, char *argv[]) +{ + int ch, i; + + while ((ch = getopt(argc, argv, "hc:n:")) > 0) { + switch (ch) { + case 'h': + usage(argv[0]); + return 0; + case 'c': + __fibers_max = atoi(optarg); + break; + case 'n': + __oper_count = atoi(optarg); + break; + default: + break; + } + } + + __fibers_cur = __fibers_max; + printf("fibers: %d\r\n", __fibers_max); + + for (i = 0; i < __fibers_max; i++) { + ACL_MBOX *mbox = acl_mbox_create(); + fiber_create(fiber_main, mbox, 64000); + } + + fiber_schedule(); + + return 0; +}