add acl_mbox.c for thread communication

This commit is contained in:
ubuntu14 2016-06-26 12:17:17 +08:00
parent 2d14abfec4
commit 2e5724cbeb
25 changed files with 928 additions and 500 deletions

View File

@ -1,6 +1,10 @@
修改历史列表:
------------------------------------------------------------------------
552) 2016.6.26
552.1) feature: 增加 acl_mbox.c 用于线程之间的单向通信,内部使用了无锁队列方式
效率更高 -- by niukey@qq.com
551) 2016.6.21
551.1) safety: acl_vstream.c->acl_vstream_fdopen 中的 rw_timeout > 0 时,应该
再通过 acl_getsocktype 判断一下是否是需要设置非阻塞模式

View File

@ -0,0 +1,23 @@
#ifndef ACL_MBOX_INCLUDE_H
#define ACL_MBOX_INCLUDE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "acl_define.h"
typedef struct ACL_MBOX ACL_MBOX;
ACL_API ACL_MBOX *acl_mbox_create(void);
ACL_API void acl_mbox_free(ACL_MBOX *mbox, void (*free_fn)(void*));
ACL_API int acl_mbox_send(ACL_MBOX *mbox, void *msg);
ACL_API void *acl_mbox_read(ACL_MBOX *mbox, int timeout, int *success);
ACL_API size_t acl_mbox_nsend(ACL_MBOX *mbox);
ACL_API size_t acl_mbox_nread(ACL_MBOX *mbox);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -5,14 +5,11 @@
extern "C" {
#endif
#include "../acl_define.h"
#ifdef ACL_UNIX
#include "acl_define.h"
/* External interface. */
int acl_sane_socketpair(int domain, int type, int protocol, int result[2]);
#endif /* ACL_UNIX */
int acl_sane_socketpair(int domain, int type, int protocol, ACL_SOCKET result[2]);
#ifdef __cplusplus
}

View File

@ -80,6 +80,8 @@ extern "C" {
#include "acl_yqueue.h"
#include "acl_ypipe.h"
#include "acl_mbox.h"
#ifdef __cplusplus
}
#endif

View File

@ -193,6 +193,17 @@ ACL_API acl_int64 acl_file_size(const char *filename);
*/
ACL_API acl_int64 acl_file_fsize(ACL_FILE_HANDLE fh, ACL_VSTREAM *fp, void *arg);
/**
* SOCKET
* @param domain {int}
* @param type {int}
* @param protocol {int}
* @param result {ACL_SOCKET [2]}
* @return {int} 0 -1
*/
ACL_API int acl_sane_socketpair(int domain, int type, int protocol,
ACL_SOCKET result[2]);
# ifdef __cplusplus
}
# endif

View File

@ -9,7 +9,6 @@ extern "C" {
#include "acl_timed_wait.h"
#include "acl_set_ugid.h"
#include "acl_set_eugid.h"
#include "acl_sane_socketpair.h"
#include "acl_mychown.h"
#include "acl_chroot_uid.h"
#include "acl_safe_open.h"

View File

@ -287,6 +287,9 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y
<File
RelativePath=".\src\stdlib\acl_getopt.c">
</File>
<File
RelativePath=".\src\stdlib\acl_mbox.c">
</File>
<File
RelativePath=".\src\stdlib\acl_meter_time.c">
</File>
@ -1462,6 +1465,9 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y
<File
RelativePath=".\include\stdlib\acl_malloc.h">
</File>
<File
RelativePath=".\include\stdlib\acl_mbox.h">
</File>
<File
RelativePath=".\include\stdlib\acl_mem_hook.h">
</File>

View File

@ -400,6 +400,10 @@
RelativePath=".\src\stdlib\acl_getopt.c"
>
</File>
<File
RelativePath=".\src\stdlib\acl_mbox.c"
>
</File>
<File
RelativePath=".\src\stdlib\acl_meter_time.c"
>
@ -663,6 +667,10 @@
RelativePath=".\src\stdlib\sys\acl_safe_getenv.c"
>
</File>
<File
RelativePath=".\src\stdlib\sys\acl_sane_socketpair.c"
>
</File>
<File
RelativePath=".\src\stdlib\sys\acl_sys_file.c"
>
@ -710,10 +718,6 @@
RelativePath=".\src\stdlib\sys\unix\acl_safe_open.c"
>
</File>
<File
RelativePath=".\src\stdlib\sys\unix\acl_sane_socketpair.c"
>
</File>
<File
RelativePath=".\src\stdlib\sys\unix\acl_set_eugid.c"
>
@ -1917,6 +1921,10 @@
RelativePath=".\include\stdlib\acl_malloc.h"
>
</File>
<File
RelativePath=".\include\stdlib\acl_mbox.h"
>
</File>
<File
RelativePath=".\include\stdlib\acl_mem_hook.h"
>
@ -2076,10 +2084,6 @@
RelativePath=".\include\stdlib\unix\acl_safe_open.h"
>
</File>
<File
RelativePath=".\include\stdlib\unix\acl_sane_socketpair.h"
>
</File>
<File
RelativePath=".\include\stdlib\unix\acl_set_eugid.h"
>

View File

@ -224,6 +224,7 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y
<ClCompile Include=".\src\stdlib\acl_debug.c" />
<ClCompile Include=".\src\stdlib\acl_file.c" />
<ClCompile Include=".\src\stdlib\acl_getopt.c" />
<ClCompile Include=".\src\stdlib\acl_mbox.c" />
<ClCompile Include=".\src\stdlib\acl_meter_time.c" />
<ClCompile Include=".\src\stdlib\acl_msg.c" />
<ClCompile Include=".\src\stdlib\acl_myflock.c" />
@ -281,6 +282,7 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y
<ClCompile Include=".\src\stdlib\sys\acl_exec_command.c" />
<ClCompile Include=".\src\stdlib\sys\acl_process.c" />
<ClCompile Include=".\src\stdlib\sys\acl_safe_getenv.c" />
<ClCompile Include=".\src\stdlib\sys\acl_sane_socketpair.c" />
<ClCompile Include=".\src\stdlib\sys\acl_sys_file.c" />
<ClCompile Include=".\src\stdlib\sys\acl_sys_socket.c" />
<ClCompile Include=".\src\stdlib\sys\acl_unsafe.c" />
@ -292,7 +294,6 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y
<ClCompile Include=".\src\stdlib\sys\unix\acl_mychown.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_open_lock.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_safe_open.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_sane_socketpair.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_set_eugid.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_set_ugid.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_timed_wait.c" />
@ -569,6 +570,7 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y
<ClInclude Include=".\include\stdlib\acl_loadcfg.h" />
<ClInclude Include=".\include\stdlib\acl_make_dirs.h" />
<ClInclude Include=".\include\stdlib\acl_malloc.h" />
<ClInclude Include=".\include\stdlib\acl_mbox.h" />
<ClInclude Include=".\include\stdlib\acl_mem_hook.h" />
<ClInclude Include=".\include\stdlib\acl_mem_slice.h" />
<ClInclude Include=".\include\stdlib\acl_meter_time.h" />
@ -606,7 +608,6 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y
<ClInclude Include=".\include\stdlib\unix\acl_mychown.h" />
<ClInclude Include=".\include\stdlib\unix\acl_open_lock.h" />
<ClInclude Include=".\include\stdlib\unix\acl_safe_open.h" />
<ClInclude Include=".\include\stdlib\unix\acl_sane_socketpair.h" />
<ClInclude Include=".\include\stdlib\unix\acl_set_eugid.h" />
<ClInclude Include=".\include\stdlib\unix\acl_set_ugid.h" />
<ClInclude Include=".\include\stdlib\unix\acl_timed_wait.h" />
@ -682,4 +683,4 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
</Project>

View File

@ -191,6 +191,9 @@
<ClCompile Include=".\src\stdlib\acl_getopt.c">
<Filter>Source Files\stdlib</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\acl_mbox.c">
<Filter>Source Files\stdlib</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\acl_meter_time.c">
<Filter>Source Files\stdlib</Filter>
</ClCompile>
@ -362,6 +365,9 @@
<ClCompile Include=".\src\stdlib\sys\acl_safe_getenv.c">
<Filter>Source Files\stdlib\sys</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\sys\acl_sane_socketpair.c">
<Filter>Source Files\stdlib\sys</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\sys\acl_sys_file.c">
<Filter>Source Files\stdlib\sys</Filter>
</ClCompile>
@ -395,9 +401,6 @@
<ClCompile Include=".\src\stdlib\sys\unix\acl_safe_open.c">
<Filter>Source Files\stdlib\sys\unix</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\sys\unix\acl_sane_socketpair.c">
<Filter>Source Files\stdlib\sys\unix</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\sys\unix\acl_set_eugid.c">
<Filter>Source Files\stdlib\sys\unix</Filter>
</ClCompile>
@ -1209,6 +1212,9 @@
<ClInclude Include=".\include\stdlib\acl_malloc.h">
<Filter>Header Files\stdlib</Filter>
</ClInclude>
<ClInclude Include=".\include\stdlib\acl_mbox.h">
<Filter>Header Files\stdlib</Filter>
</ClInclude>
<ClInclude Include=".\include\stdlib\acl_mem_hook.h">
<Filter>Header Files\stdlib</Filter>
</ClInclude>
@ -1320,9 +1326,6 @@
<ClInclude Include=".\include\stdlib\unix\acl_safe_open.h">
<Filter>Header Files\stdlib\unix</Filter>
</ClInclude>
<ClInclude Include=".\include\stdlib\unix\acl_sane_socketpair.h">
<Filter>Header Files\stdlib\unix</Filter>
</ClInclude>
<ClInclude Include=".\include\stdlib\unix\acl_set_eugid.h">
<Filter>Header Files\stdlib\unix</Filter>
</ClInclude>
@ -1561,4 +1564,4 @@
</None>
<None Include="Readme.txt" />
</ItemGroup>
</Project>
</Project>

View File

@ -431,6 +431,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClCompile Include=".\src\stdlib\acl_debug.c" />
<ClCompile Include=".\src\stdlib\acl_file.c" />
<ClCompile Include=".\src\stdlib\acl_getopt.c" />
<ClCompile Include=".\src\stdlib\acl_mbox.c" />
<ClCompile Include=".\src\stdlib\acl_meter_time.c" />
<ClCompile Include=".\src\stdlib\acl_msg.c" />
<ClCompile Include=".\src\stdlib\acl_myflock.c" />
@ -499,7 +500,6 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClCompile Include=".\src\stdlib\sys\unix\acl_mychown.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_open_lock.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_safe_open.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_sane_socketpair.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_set_eugid.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_set_ugid.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_timed_wait.c" />
@ -665,6 +665,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClCompile Include="src\stdlib\acl_atomic.c" />
<ClCompile Include="src\stdlib\common\acl_ypipe.c" />
<ClCompile Include="src\stdlib\common\acl_yqueue.c" />
<ClCompile Include="src\stdlib\sys\acl_sane_socketpair.c" />
<ClCompile Include="src\stdlib\sys\unix\acl_trace.c" />
<ClCompile Include="src\xml\acl_xml2.c" />
<ClCompile Include="src\xml\acl_xml2_parse.c" />
@ -734,6 +735,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClInclude Include=".\include\lib_acl.h" />
<ClInclude Include="include\code\acl_xmlcode.h" />
<ClInclude Include="include\stdlib\acl_atomic.h" />
<ClInclude Include="include\stdlib\acl_sane_socketpair.h" />
<ClInclude Include="include\stdlib\acl_ypipe.h" />
<ClInclude Include="include\stdlib\acl_yqueue.h" />
<ClInclude Include="include\stdlib\unix\acl_trace.h" />
@ -777,6 +779,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClInclude Include=".\include\stdlib\acl_loadcfg.h" />
<ClInclude Include=".\include\stdlib\acl_make_dirs.h" />
<ClInclude Include=".\include\stdlib\acl_malloc.h" />
<ClInclude Include=".\include\stdlib\acl_mbox.h" />
<ClInclude Include=".\include\stdlib\acl_mem_hook.h" />
<ClInclude Include=".\include\stdlib\acl_mem_slice.h" />
<ClInclude Include=".\include\stdlib\acl_meter_time.h" />
@ -814,7 +817,6 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClInclude Include=".\include\stdlib\unix\acl_mychown.h" />
<ClInclude Include=".\include\stdlib\unix\acl_open_lock.h" />
<ClInclude Include=".\include\stdlib\unix\acl_safe_open.h" />
<ClInclude Include=".\include\stdlib\unix\acl_sane_socketpair.h" />
<ClInclude Include=".\include\stdlib\unix\acl_set_eugid.h" />
<ClInclude Include=".\include\stdlib\unix\acl_set_ugid.h" />
<ClInclude Include=".\include\stdlib\unix\acl_timed_wait.h" />

View File

@ -191,6 +191,9 @@
<ClCompile Include=".\src\stdlib\acl_getopt.c">
<Filter>Source Files\stdlib</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\acl_mbox.c">
<Filter>Source Files\stdlib</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\acl_meter_time.c">
<Filter>Source Files\stdlib</Filter>
</ClCompile>
@ -395,9 +398,6 @@
<ClCompile Include=".\src\stdlib\sys\unix\acl_safe_open.c">
<Filter>Source Files\stdlib\sys\unix</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\sys\unix\acl_sane_socketpair.c">
<Filter>Source Files\stdlib\sys\unix</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\sys\unix\acl_set_eugid.c">
<Filter>Source Files\stdlib\sys\unix</Filter>
</ClCompile>
@ -935,6 +935,9 @@
<ClCompile Include="src\stdlib\acl_atomic.c">
<Filter>Source Files\stdlib</Filter>
</ClCompile>
<ClCompile Include="src\stdlib\sys\acl_sane_socketpair.c">
<Filter>Source Files\stdlib\sys</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="lib_acl.rc">
@ -1209,6 +1212,9 @@
<ClInclude Include=".\include\stdlib\acl_malloc.h">
<Filter>Header Files\stdlib</Filter>
</ClInclude>
<ClInclude Include=".\include\stdlib\acl_mbox.h">
<Filter>Header Files\stdlib</Filter>
</ClInclude>
<ClInclude Include=".\include\stdlib\acl_mem_hook.h">
<Filter>Header Files\stdlib</Filter>
</ClInclude>
@ -1320,9 +1326,6 @@
<ClInclude Include=".\include\stdlib\unix\acl_safe_open.h">
<Filter>Header Files\stdlib\unix</Filter>
</ClInclude>
<ClInclude Include=".\include\stdlib\unix\acl_sane_socketpair.h">
<Filter>Header Files\stdlib\unix</Filter>
</ClInclude>
<ClInclude Include=".\include\stdlib\unix\acl_set_eugid.h">
<Filter>Header Files\stdlib\unix</Filter>
</ClInclude>
@ -1554,6 +1557,9 @@
<ClInclude Include="include\stdlib\acl_yqueue.h">
<Filter>Header Files\stdlib</Filter>
</ClInclude>
<ClInclude Include="include\stdlib\acl_sane_socketpair.h">
<Filter>Header Files\stdlib</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<None Include=".\changes.txt">

View File

@ -431,6 +431,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClCompile Include=".\src\stdlib\acl_debug.c" />
<ClCompile Include=".\src\stdlib\acl_file.c" />
<ClCompile Include=".\src\stdlib\acl_getopt.c" />
<ClCompile Include=".\src\stdlib\acl_mbox.c" />
<ClCompile Include=".\src\stdlib\acl_meter_time.c" />
<ClCompile Include=".\src\stdlib\acl_msg.c" />
<ClCompile Include=".\src\stdlib\acl_myflock.c" />
@ -488,6 +489,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClCompile Include=".\src\stdlib\sys\acl_exec_command.c" />
<ClCompile Include=".\src\stdlib\sys\acl_process.c" />
<ClCompile Include=".\src\stdlib\sys\acl_safe_getenv.c" />
<ClCompile Include=".\src\stdlib\sys\acl_sane_socketpair.c" />
<ClCompile Include=".\src\stdlib\sys\acl_sys_file.c" />
<ClCompile Include=".\src\stdlib\sys\acl_sys_socket.c" />
<ClCompile Include=".\src\stdlib\sys\acl_unsafe.c" />
@ -499,7 +501,6 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClCompile Include=".\src\stdlib\sys\unix\acl_mychown.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_open_lock.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_safe_open.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_sane_socketpair.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_set_eugid.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_set_ugid.c" />
<ClCompile Include=".\src\stdlib\sys\unix\acl_timed_wait.c" />
@ -771,6 +772,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClInclude Include=".\include\stdlib\acl_loadcfg.h" />
<ClInclude Include=".\include\stdlib\acl_make_dirs.h" />
<ClInclude Include=".\include\stdlib\acl_malloc.h" />
<ClInclude Include=".\include\stdlib\acl_mbox.h" />
<ClInclude Include=".\include\stdlib\acl_mem_hook.h" />
<ClInclude Include=".\include\stdlib\acl_mem_slice.h" />
<ClInclude Include=".\include\stdlib\acl_meter_time.h" />
@ -808,7 +810,6 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClInclude Include=".\include\stdlib\unix\acl_mychown.h" />
<ClInclude Include=".\include\stdlib\unix\acl_open_lock.h" />
<ClInclude Include=".\include\stdlib\unix\acl_safe_open.h" />
<ClInclude Include=".\include\stdlib\unix\acl_sane_socketpair.h" />
<ClInclude Include=".\include\stdlib\unix\acl_set_eugid.h" />
<ClInclude Include=".\include\stdlib\unix\acl_set_ugid.h" />
<ClInclude Include=".\include\stdlib\unix\acl_timed_wait.h" />

View File

@ -191,6 +191,9 @@
<ClCompile Include=".\src\stdlib\acl_getopt.c">
<Filter>Source Files\stdlib</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\acl_mbox.c">
<Filter>Source Files\stdlib</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\acl_meter_time.c">
<Filter>Source Files\stdlib</Filter>
</ClCompile>
@ -362,6 +365,9 @@
<ClCompile Include=".\src\stdlib\sys\acl_safe_getenv.c">
<Filter>Source Files\stdlib\sys</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\sys\acl_sane_socketpair.c">
<Filter>Source Files\stdlib\sys</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\sys\acl_sys_file.c">
<Filter>Source Files\stdlib\sys</Filter>
</ClCompile>
@ -395,9 +401,6 @@
<ClCompile Include=".\src\stdlib\sys\unix\acl_safe_open.c">
<Filter>Source Files\stdlib\sys\unix</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\sys\unix\acl_sane_socketpair.c">
<Filter>Source Files\stdlib\sys\unix</Filter>
</ClCompile>
<ClCompile Include=".\src\stdlib\sys\unix\acl_set_eugid.c">
<Filter>Source Files\stdlib\sys\unix</Filter>
</ClCompile>
@ -1200,6 +1203,9 @@
<ClInclude Include=".\include\stdlib\acl_malloc.h">
<Filter>Header Files\stdlib</Filter>
</ClInclude>
<ClInclude Include=".\include\stdlib\acl_mbox.h">
<Filter>Header Files\stdlib</Filter>
</ClInclude>
<ClInclude Include=".\include\stdlib\acl_mem_hook.h">
<Filter>Header Files\stdlib</Filter>
</ClInclude>
@ -1311,9 +1317,6 @@
<ClInclude Include=".\include\stdlib\unix\acl_safe_open.h">
<Filter>Header Files\stdlib\unix</Filter>
</ClInclude>
<ClInclude Include=".\include\stdlib\unix\acl_sane_socketpair.h">
<Filter>Header Files\stdlib\unix</Filter>
</ClInclude>
<ClInclude Include=".\include\stdlib\unix\acl_set_eugid.h">
<Filter>Header Files\stdlib\unix</Filter>
</ClInclude>
@ -1548,4 +1551,4 @@
<Filter>doc</Filter>
</Text>
</ItemGroup>
</Project>
</Project>

View File

@ -0,0 +1,131 @@
#include "StdAfx.h"
#ifndef ACL_PREPARE_COMPILE
#include "stdlib/acl_define.h"
#include "thread/acl_pthread.h"
#include "stdlib/acl_msg.h"
#include "stdlib/acl_mymalloc.h"
#include "stdlib/acl_ypipe.h"
#include "stdlib/acl_vstream.h"
#include "stdlib/acl_iostuff.h"
#include "stdlib/acl_sys_patch.h"
#include "stdlib/acl_mbox.h"
#endif
struct ACL_MBOX {
ACL_VSTREAM *in;
ACL_VSTREAM *out;
size_t nsend;
size_t nread;
ACL_YPIPE *ypipe;
acl_pthread_mutex_t *lock;
};
static const char __key[] = "k";
ACL_MBOX *acl_mbox_create(void)
{
ACL_MBOX *mbox;
ACL_SOCKET fds[2];
if (acl_sane_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
acl_msg_error("%s(%d), %s: acl_duplex_pipe error %s",
__FILE__, __LINE__, __FUNCTION__, acl_last_serror());
return NULL;
}
mbox = (ACL_MBOX *) acl_mymalloc(sizeof(ACL_MBOX));
mbox->in = acl_vstream_fdopen(fds[0], O_RDONLY, sizeof(__key),
0, ACL_VSTREAM_TYPE_SOCK);
mbox->out = acl_vstream_fdopen(fds[1], O_WRONLY, sizeof(__key),
0, ACL_VSTREAM_TYPE_SOCK);
mbox->nsend = 0;
mbox->nread = 0;
mbox->ypipe = acl_ypipe_new();
mbox->lock = acl_pthread_mutex_create();
return mbox;
}
void acl_mbox_free(ACL_MBOX *mbox, void (*free_fn)(void*))
{
acl_vstream_close(mbox->in);
acl_vstream_close(mbox->out);
acl_ypipe_free(mbox->ypipe, free_fn);
acl_pthread_mutex_destroy(mbox->lock);
acl_myfree(mbox);
}
int acl_mbox_send(ACL_MBOX *mbox, void *msg)
{
int ret;
acl_pthread_mutex_lock(mbox->lock);
acl_ypipe_write(mbox->ypipe, msg);
ret = acl_ypipe_flush(mbox->ypipe);
acl_pthread_mutex_unlock(mbox->lock);
if (ret == 0)
return 0;
mbox->nsend++;
if (acl_vstream_writen(mbox->out, __key, sizeof(__key) - 1)
== ACL_VSTREAM_EOF)
{
return -1;
} else
return 0;
}
void *acl_mbox_read(ACL_MBOX *mbox, int timeout, int *success)
{
int ret;
char kbuf[sizeof(__key)];
void *msg = acl_ypipe_read(mbox->ypipe);
if (msg != NULL) {
if (success)
*success = 0;
return msg;
}
mbox->nread++;
mbox->in->rw_timeout = timeout;
ret = acl_vstream_readn(mbox->in, kbuf, sizeof(kbuf) - 1);
if (ret == ACL_VSTREAM_EOF) {
if (mbox->in->errnum == ACL_ETIMEDOUT) {
if (success)
*success = 0;
return NULL;
}
if (success)
*success = -1;
return NULL;
}
if (kbuf[0] != __key[0]) {
acl_msg_error("%s(%d), %s: read invalid: %c",
__FILE__, __LINE__, __FUNCTION__, kbuf[0]);
if (success)
*success = -1;
return NULL;
}
if (success)
*success = 0;
return acl_ypipe_read(mbox->ypipe);
}
size_t acl_mbox_nsend(ACL_MBOX *mbox)
{
return mbox->nsend;
}
size_t acl_mbox_nread(ACL_MBOX *mbox)
{
return mbox->nread;
}

View File

@ -9,7 +9,7 @@
#ifdef ACL_UNIX
#include <sys/socket.h>
#include <unistd.h>
#include "stdlib/unix/acl_sane_socketpair.h"
#include "stdlib/acl_sys_patch.h"
#endif
#ifdef ACL_WINDOWS

View File

@ -0,0 +1,107 @@
/* System library. */
#include "StdAfx.h"
#ifndef ACL_PREPARE_COMPILE
#include "stdlib/acl_define.h"
#ifdef ACL_BCB_COMPILER
#pragma hdrstop
#endif
#endif
#ifdef ACL_UNIX
#include <errno.h>
#include <sys/socket.h>
#include <unistd.h>
#include <string.h>
/* Utility library. */
#include "stdlib/acl_msg.h"
#include "stdlib/acl_sys_patch.h"
/* sane_socketpair - sanitize socketpair() error returns */
int acl_sane_socketpair(int domain, int type, int protocol, ACL_SOCKET result[2])
{
static int socketpair_ok_errors[] = {
EINTR,
0,
};
int count;
int err;
int ret;
/*
* Solaris socketpair() can fail with EINTR.
*/
while ((ret = socketpair(domain, type, protocol, result)) < 0) {
for (count = 0; /* void */ ; count++) {
if ((err = socketpair_ok_errors[count]) == 0)
return (ret);
if (acl_last_error() == err) {
char tbuf[256];
acl_msg_warn("socketpair: %s (trying again)",
acl_last_strerror(tbuf, sizeof(tbuf)));
sleep(1);
break;
}
}
}
return (ret);
}
#elif defined(ACL_WINDOWS)
int acl_sane_socketpair(int domain, int type, int protocol, ACL_SOCKET result[2])
{
ACL_SOCKET listener = acl_inet_listen("127.0.0.1:0", 1, ACL_BLOCKING);
char addr[64];
(void) domain;
result[0] = ACL_SOCKET_INVALID;
result[1] = ACL_SOCKET_INVALID;
if (listener == ACL_SOCKET_INVALID) {
acl_msg_error("%s(%d), %s: listen error %s",
__FILE__, __LINE__, __FUNCTION__, acl_last_serror());
return -1;
}
acl_tcp_set_nodelay(listener);
if (acl_getsockname(listener, addr, sizeof(addr)) < 0) {
acl_msg_error("%s(%d), %s: getoskname error %s",
__FILE__, __LINE__, __FUNCTION__, acl_last_serror());
acl_socket_close(listener);
return -1;
}
result[0] = acl_inet_connect(addr, ACL_BLOCKING, 0);
if (result[0] == ACL_SOCKET_INVALID) {
acl_msg_error("%s(%d), %s: connect %s error %s",
__FILE__, __LINE__, __FUNCTION__, addr, acl_last_serror());
acl_socket_close(listener);
return -1;
}
result[1] = acl_inet_accept(listener);
acl_socket_close(listener);
if (result[1] == ACL_SOCKET_INVALID) {
acl_msg_error("%s(%d), %s: accept error %s",
__FILE__, __LINE__, __FUNCTION__, acl_last_serror());
acl_socket_close(result[0]);
result[0] = ACL_SOCKET_INVALID;
return -1;
}
acl_tcp_set_nodelay(result[0]);
acl_tcp_set_nodelay(result[1]);
return 0;
}
#endif /* ACL_WINDOWS */

View File

@ -1,56 +0,0 @@
/* System library. */
#include "StdAfx.h"
#ifndef ACL_PREPARE_COMPILE
#include "stdlib/acl_define.h"
#ifdef ACL_BCB_COMPILER
#pragma hdrstop
#endif
#endif
#ifdef ACL_UNIX
#include <errno.h>
#include <sys/socket.h>
#include <unistd.h>
#include <string.h>
/* Utility library. */
#include "stdlib/acl_msg.h"
#include "stdlib/unix/acl_sane_socketpair.h"
/* sane_socketpair - sanitize socketpair() error returns */
int acl_sane_socketpair(int domain, int type, int protocol, int result[2])
{
static int socketpair_ok_errors[] = {
EINTR,
0,
};
int count;
int err;
int ret;
/*
* Solaris socketpair() can fail with EINTR.
*/
while ((ret = socketpair(domain, type, protocol, result)) < 0) {
for (count = 0; /* void */ ; count++) {
if ((err = socketpair_ok_errors[count]) == 0)
return (ret);
if (acl_last_error() == err) {
char tbuf[256];
acl_msg_warn("socketpair: %s (trying again)",
acl_last_strerror(tbuf, sizeof(tbuf)));
sleep(1);
break;
}
}
}
return (ret);
}
#endif /* ACL_UNIX*/

View File

@ -345,8 +345,8 @@ static void fiber_init(void)
__sys_errno = (errno_fn) dlsym(RTLD_NEXT, "__errno_location");
__sys_fcntl = (fcntl_fn) dlsym(RTLD_NEXT, "fcntl");
fiber_io_hook();
fiber_net_hook();
fiber_hook_io();
fiber_hook_net();
}
void fiber_schedule(void)

View File

@ -103,15 +103,18 @@ void fiber_count_inc(void);
void fiber_count_dec(void);
/* in fiber_io.c */
void fiber_io_hook(void);
void fiber_io_check(void);
void fiber_io_close(int fd);
void fiber_wait_read(int fd);
void fiber_wait_write(int fd);
void fiber_io_dec(void);
void fiber_io_inc(void);
EVENT *fiber_io_event(void);
/* in hook_io.c */
void fiber_hook_io(void);
/* in fiber_net.c */
void fiber_net_hook(void);
void fiber_hook_net(void);
#endif

View File

@ -1,39 +1,8 @@
#include "stdafx.h"
#include <fcntl.h>
#define __USE_GNU
#include <dlfcn.h>
#include <sys/stat.h>
#include "fiber/lib_fiber.h"
#include "event.h"
#include "fiber.h"
typedef int (*close_fn)(int);
typedef ssize_t (*read_fn)(int, void *, size_t);
typedef ssize_t (*readv_fn)(int, const struct iovec *, int);
typedef ssize_t (*recv_fn)(int, void *, size_t, int);
typedef ssize_t (*recvfrom_fn)(int, void *, size_t, int,
struct sockaddr *, socklen_t *);
typedef ssize_t (*recvmsg_fn)(int, struct msghdr *, int);
typedef ssize_t (*write_fn)(int, const void *, size_t);
typedef ssize_t (*writev_fn)(int, const struct iovec *, int);
typedef ssize_t (*send_fn)(int, const void *, size_t, int);
typedef ssize_t (*sendto_fn)(int, const void *, size_t, int,
const struct sockaddr *, socklen_t);
typedef ssize_t (*sendmsg_fn)(int, const struct msghdr *, int);
static close_fn __sys_close = NULL;
static read_fn __sys_read = NULL;
static readv_fn __sys_readv = NULL;
static recv_fn __sys_recv = NULL;
static recvfrom_fn __sys_recvfrom = NULL;
static recvmsg_fn __sys_recvmsg = NULL;
static write_fn __sys_write = NULL;
static writev_fn __sys_writev = NULL;
static send_fn __sys_send = NULL;
static sendto_fn __sys_sendto = NULL;
static sendmsg_fn __sys_sendmsg = NULL;
typedef struct {
EVENT *event;
FIBER **io_fibers;
@ -53,29 +22,6 @@ static void fiber_io_loop(FIBER *fiber, void *ctx);
#define STACK_SIZE 819200
static int __maxfd = 1024;
void fiber_io_hook(void)
{
static int __called = 0;
if (__called)
return;
__called++;
__sys_close = (close_fn) dlsym(RTLD_NEXT, "close");
__sys_read = (read_fn) dlsym(RTLD_NEXT, "read");
__sys_readv = (readv_fn) dlsym(RTLD_NEXT, "readv");
__sys_recv = (recv_fn) dlsym(RTLD_NEXT, "recv");
__sys_recvfrom = (recvfrom_fn) dlsym(RTLD_NEXT, "recvfrom");
__sys_recvmsg = (recvmsg_fn) dlsym(RTLD_NEXT, "recvmsg");
__sys_write = (write_fn) dlsym(RTLD_NEXT, "write");
__sys_writev = (writev_fn) dlsym(RTLD_NEXT, "writev");
__sys_send = (send_fn) dlsym(RTLD_NEXT, "send");
__sys_sendto = (sendto_fn) dlsym(RTLD_NEXT, "sendto");
__sys_sendmsg = (sendmsg_fn) dlsym(RTLD_NEXT, "sendmsg");
}
void fiber_io_stop(void)
{
fiber_io_check();
@ -177,6 +123,12 @@ EVENT *fiber_io_event(void)
return __thread_fiber->event;
}
void fiber_io_close(int fd)
{
if (__thread_fiber != NULL)
event_del(__thread_fiber->event, fd, EVENT_ERROR);
}
static void fiber_io_loop(FIBER *self acl_unused, void *ctx)
{
EVENT *ev = (EVENT *) ctx;
@ -381,336 +333,3 @@ void fiber_wait_write(int fd)
fiber_switch();
}
int close(int fd)
{
int ret;
if (fd < 0) {
acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
return -1;
}
if (__thread_fiber != NULL)
event_del(__thread_fiber->event, fd, EVENT_ERROR);
ret = __sys_close(fd);
if (ret == 0)
return ret;
fiber_save_errno();
return ret;
}
#define READ_WAIT_FIRST
#ifdef READ_WAIT_FIRST
ssize_t read(int fd, void *buf, size_t count)
{
ssize_t ret;
if (fd < 0) {
acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
return -1;
}
fiber_wait_read(fd);
ret = __sys_read(fd, buf, count);
if (ret > 0)
return ret;
fiber_save_errno();
return ret;
}
ssize_t readv(int fd, const struct iovec *iov, int iovcnt)
{
ssize_t ret;
if (fd < 0) {
acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
return -1;
}
fiber_wait_read(fd);
ret = __sys_readv(fd, iov, iovcnt);
if (ret > 0)
return ret;
fiber_save_errno();
return ret;
}
ssize_t recv(int sockfd, void *buf, size_t len, int flags)
{
ssize_t ret;
if (sockfd < 0) {
acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd);
return -1;
}
fiber_wait_read(sockfd);
ret = __sys_recv(sockfd, buf, len, flags);
if (ret > 0)
return ret;
fiber_save_errno();
return ret;
}
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen)
{
ssize_t ret;
if (sockfd < 0) {
acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd);
return -1;
}
fiber_wait_read(sockfd);
ret = __sys_recvfrom(sockfd, buf, len, flags, src_addr, addrlen);
if (ret > 0)
return ret;
fiber_save_errno();
return ret;
}
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags)
{
ssize_t ret;
if (sockfd < 0) {
acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd);
return -1;
}
fiber_wait_read(sockfd);
ret = __sys_recvmsg(sockfd, msg, flags);
if (ret > 0)
return ret;
fiber_save_errno();
return ret;
}
#else
ssize_t read(int fd, void *buf, size_t count)
{
while (1) {
ssize_t n = __sys_read(fd, buf, count);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_read(fd);
}
}
ssize_t readv(int fd, const struct iovec *iov, int iovcnt)
{
while (1) {
ssize_t n = __sys_readv(fd, iov, iovcnt);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_read(fd);
}
}
ssize_t recv(int sockfd, void *buf, size_t len, int flags)
{
while (1) {
ssize_t n = __sys_recv(sockfd, buf, len, flags);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_read(sockfd);
}
}
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen)
{
while (1) {
ssize_t n = __sys_recvfrom(sockfd, buf, len, flags,
src_addr, addrlen);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_read(sockfd);
}
}
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags)
{
while (1) {
ssize_t n = __sys_recvmsg(sockfd, msg, flags);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_read(sockfd);
}
}
#endif
ssize_t write(int fd, const void *buf, size_t count)
{
while (1) {
ssize_t n = __sys_write(fd, buf, count);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_write(fd);
}
}
ssize_t writev(int fd, const struct iovec *iov, int iovcnt)
{
while (1) {
ssize_t n = __sys_writev(fd, iov, iovcnt);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_write(fd);
}
}
ssize_t send(int sockfd, const void *buf, size_t len, int flags)
{
while (1) {
ssize_t n = __sys_send(sockfd, buf, len, flags);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_write(sockfd);
}
}
ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
while (1) {
ssize_t n = __sys_sendto(sockfd, buf, len, flags,
dest_addr, addrlen);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_write(sockfd);
}
}
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags)
{
while (1) {
ssize_t n = __sys_sendmsg(sockfd, msg, flags);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_write(sockfd);
}
}

442
lib_fiber/c/src/hook_io.c Normal file
View File

@ -0,0 +1,442 @@
#include "stdafx.h"
#ifdef HAS_PIPE2
#define _GNU_SOURCE
#endif
#include <fcntl.h>
#include <unistd.h>
#define __USE_GNU
#include <dlfcn.h>
#include <sys/stat.h>
#include "fiber.h"
typedef int (*pipe_fn)(int pipefd[2]);
#ifdef HAS_PIPE2
typedef int (*pipe2_fn)(int pipefd[2], int flags);
#endif
typedef FILE *(*popen_fn)(const char *, const char *);
typedef int (*pclose_fn)(FILE *);
typedef int (*close_fn)(int);
typedef ssize_t (*read_fn)(int, void *, size_t);
typedef ssize_t (*readv_fn)(int, const struct iovec *, int);
typedef ssize_t (*recv_fn)(int, void *, size_t, int);
typedef ssize_t (*recvfrom_fn)(int, void *, size_t, int,
struct sockaddr *, socklen_t *);
typedef ssize_t (*recvmsg_fn)(int, struct msghdr *, int);
typedef ssize_t (*write_fn)(int, const void *, size_t);
typedef ssize_t (*writev_fn)(int, const struct iovec *, int);
typedef ssize_t (*send_fn)(int, const void *, size_t, int);
typedef ssize_t (*sendto_fn)(int, const void *, size_t, int,
const struct sockaddr *, socklen_t);
typedef ssize_t (*sendmsg_fn)(int, const struct msghdr *, int);
static pipe_fn __sys_pipe = NULL;
#ifdef HAS_PIPE2
static pipe2_fn __sys_pipe2 = NULL;
#endif
static popen_fn __sys_popen = NULL;
static pclose_fn __sys_pclose = NULL;
static close_fn __sys_close = NULL;
static read_fn __sys_read = NULL;
static readv_fn __sys_readv = NULL;
static recv_fn __sys_recv = NULL;
static recvfrom_fn __sys_recvfrom = NULL;
static recvmsg_fn __sys_recvmsg = NULL;
static write_fn __sys_write = NULL;
static writev_fn __sys_writev = NULL;
static send_fn __sys_send = NULL;
static sendto_fn __sys_sendto = NULL;
static sendmsg_fn __sys_sendmsg = NULL;
void fiber_hook_io(void)
{
static int __called = 0;
if (__called)
return;
__called++;
__sys_pipe = (pipe_fn) dlsym(RTLD_NEXT, "pipe");
#ifdef HAS_PIPE2
__sys_pipe2 = (pipe2_fn) dlsym(RTLD_NEXT, "pipe2");
#endif
__sys_popen = (popen_fn) dlsym(RTLD_NEXT, "popen");
__sys_pclose = (pclose_fn) dlsym(RTLD_NEXT, "pclose");
__sys_close = (close_fn) dlsym(RTLD_NEXT, "close");
__sys_read = (read_fn) dlsym(RTLD_NEXT, "read");
__sys_readv = (readv_fn) dlsym(RTLD_NEXT, "readv");
__sys_recv = (recv_fn) dlsym(RTLD_NEXT, "recv");
__sys_recvfrom = (recvfrom_fn) dlsym(RTLD_NEXT, "recvfrom");
__sys_recvmsg = (recvmsg_fn) dlsym(RTLD_NEXT, "recvmsg");
__sys_write = (write_fn) dlsym(RTLD_NEXT, "write");
__sys_writev = (writev_fn) dlsym(RTLD_NEXT, "writev");
__sys_send = (send_fn) dlsym(RTLD_NEXT, "send");
__sys_sendto = (sendto_fn) dlsym(RTLD_NEXT, "sendto");
__sys_sendmsg = (sendmsg_fn) dlsym(RTLD_NEXT, "sendmsg");
}
int pipe(int pipefd[2])
{
int ret = __sys_pipe(pipefd);
if (ret < 0)
fiber_save_errno();
return ret;
}
#ifdef HAS_PIPE2
int pipe2(int pipefd[2], int flags)
{
int ret = __sys_pipe2(pipefd, flags);
if (ret < 0)
fiber_save_errno();
return ret;
}
#endif
FILE *popen(const char *command, const char *type)
{
FILE *fp = __sys_popen(command, type);
if (fp == NULL)
fiber_save_errno();
return fp;
}
int close(int fd)
{
int ret;
if (fd < 0) {
acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
return -1;
}
fiber_io_close(fd);
ret = __sys_close(fd);
if (ret == 0)
return ret;
fiber_save_errno();
return ret;
}
#define READ_WAIT_FIRST
#ifdef READ_WAIT_FIRST
ssize_t read(int fd, void *buf, size_t count)
{
ssize_t ret;
if (fd < 0) {
acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
return -1;
}
fiber_wait_read(fd);
ret = __sys_read(fd, buf, count);
if (ret > 0)
return ret;
fiber_save_errno();
return ret;
}
ssize_t readv(int fd, const struct iovec *iov, int iovcnt)
{
ssize_t ret;
if (fd < 0) {
acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
return -1;
}
fiber_wait_read(fd);
ret = __sys_readv(fd, iov, iovcnt);
if (ret > 0)
return ret;
fiber_save_errno();
return ret;
}
ssize_t recv(int sockfd, void *buf, size_t len, int flags)
{
ssize_t ret;
if (sockfd < 0) {
acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd);
return -1;
}
fiber_wait_read(sockfd);
ret = __sys_recv(sockfd, buf, len, flags);
if (ret > 0)
return ret;
fiber_save_errno();
return ret;
}
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen)
{
ssize_t ret;
if (sockfd < 0) {
acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd);
return -1;
}
fiber_wait_read(sockfd);
ret = __sys_recvfrom(sockfd, buf, len, flags, src_addr, addrlen);
if (ret > 0)
return ret;
fiber_save_errno();
return ret;
}
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags)
{
ssize_t ret;
if (sockfd < 0) {
acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd);
return -1;
}
fiber_wait_read(sockfd);
ret = __sys_recvmsg(sockfd, msg, flags);
if (ret > 0)
return ret;
fiber_save_errno();
return ret;
}
#else
ssize_t read(int fd, void *buf, size_t count)
{
while (1) {
ssize_t n = __sys_read(fd, buf, count);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_read(fd);
}
}
ssize_t readv(int fd, const struct iovec *iov, int iovcnt)
{
while (1) {
ssize_t n = __sys_readv(fd, iov, iovcnt);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_read(fd);
}
}
ssize_t recv(int sockfd, void *buf, size_t len, int flags)
{
while (1) {
ssize_t n = __sys_recv(sockfd, buf, len, flags);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_read(sockfd);
}
}
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen)
{
while (1) {
ssize_t n = __sys_recvfrom(sockfd, buf, len, flags,
src_addr, addrlen);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_read(sockfd);
}
}
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags)
{
while (1) {
ssize_t n = __sys_recvmsg(sockfd, msg, flags);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_read(sockfd);
}
}
#endif
ssize_t write(int fd, const void *buf, size_t count)
{
while (1) {
ssize_t n = __sys_write(fd, buf, count);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_write(fd);
}
}
ssize_t writev(int fd, const struct iovec *iov, int iovcnt)
{
while (1) {
ssize_t n = __sys_writev(fd, iov, iovcnt);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_write(fd);
}
}
ssize_t send(int sockfd, const void *buf, size_t len, int flags)
{
while (1) {
ssize_t n = __sys_send(sockfd, buf, len, flags);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_write(sockfd);
}
}
ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
while (1) {
ssize_t n = __sys_sendto(sockfd, buf, len, flags,
dest_addr, addrlen);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_write(sockfd);
}
}
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags)
{
while (1) {
ssize_t n = __sys_sendmsg(sockfd, msg, flags);
if (n >= 0)
return n;
fiber_save_errno();
#if EAGAIN == EWOULDBLOCK
if (errno != EAGAIN)
#else
if (errno != EAGAIN && errno != EWOULDBLOCK)
#endif
return -1;
fiber_wait_write(sockfd);
}
}

View File

@ -10,18 +10,20 @@
#include "fiber.h"
typedef int (*socket_fn)(int, int, int);
typedef int (*socketpair_fn)(int, int, int, int sv[2]);
typedef int (*bind_fn)(int, const struct sockaddr *, socklen_t);
typedef int (*listen_fn)(int, int);
typedef int (*accept_fn)(int, struct sockaddr *, socklen_t *);
typedef int (*connect_fn)(int, const struct sockaddr *, socklen_t);
static socket_fn __sys_socket = NULL;
static bind_fn __sys_bind = NULL;
static listen_fn __sys_listen = NULL;
static accept_fn __sys_accept = NULL;
static connect_fn __sys_connect = NULL;
static socket_fn __sys_socket = NULL;
static socketpair_fn __sys_socketpair = NULL;
static bind_fn __sys_bind = NULL;
static listen_fn __sys_listen = NULL;
static accept_fn __sys_accept = NULL;
static connect_fn __sys_connect = NULL;
void fiber_net_hook(void)
void fiber_hook_net(void)
{
static int __called = 0;
@ -30,11 +32,12 @@ void fiber_net_hook(void)
__called++;
__sys_socket = (socket_fn) dlsym(RTLD_NEXT, "socket");
__sys_bind = (bind_fn) dlsym(RTLD_NEXT, "bind");
__sys_listen = (listen_fn) dlsym(RTLD_NEXT, "listen");
__sys_accept = (accept_fn) dlsym(RTLD_NEXT, "accept");
__sys_connect = (connect_fn) dlsym(RTLD_NEXT, "connect");
__sys_socket = (socket_fn) dlsym(RTLD_NEXT, "socket");
__sys_socketpair = (socketpair_fn) dlsym(RTLD_NEXT, "socketpair");
__sys_bind = (bind_fn) dlsym(RTLD_NEXT, "bind");
__sys_listen = (listen_fn) dlsym(RTLD_NEXT, "listen");
__sys_accept = (accept_fn) dlsym(RTLD_NEXT, "accept");
__sys_connect = (connect_fn) dlsym(RTLD_NEXT, "connect");
}
int socket(int domain, int type, int protocol)
@ -48,6 +51,15 @@ int socket(int domain, int type, int protocol)
return sockfd;
}
int socketpair(int domain, int type, int protocol, int sv[2])
{
int ret = __sys_socketpair(domain, type, protocol, sv);
if (ret < 0)
fiber_save_errno();
return ret;
}
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen)
{
if (__sys_bind(sockfd, addr, addrlen) == 0)

View File

@ -0,0 +1,2 @@
include ../Makefile.in
PROG = thread_mbox

View File

@ -0,0 +1,106 @@
#include "lib_acl.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "fiber/lib_fiber.h"
static int __oper_count = 2;
static int __fibers_max = 2;
static int __fibers_cur = 2;
static void *thread_main(void *ctx)
{
ACL_MBOX *mbox = (ACL_MBOX *) ctx;
int i;
for (i = 0; i < __oper_count; i++) {
char *ptr = acl_mystrdup("hello world!");
if (acl_mbox_send(mbox, ptr) < 0) {
printf("send error!\r\n");
break;
}
}
return NULL;
}
static void free_msg(void *msg)
{
printf("---free one ---\r\n");
acl_myfree(msg);
}
static void fiber_main(FIBER *fiber acl_unused, void *ctx)
{
ACL_MBOX *mbox = (ACL_MBOX *) ctx;
acl_pthread_attr_t attr;
acl_pthread_t tid;
int i;
acl_pthread_attr_init(&attr);
acl_pthread_attr_setdetachstate(&attr, ACL_PTHREAD_CREATE_DETACHED);
acl_pthread_create(&tid, &attr, thread_main, mbox);
for (i = 0; i < __oper_count; i++) {
char *ptr = (char *) acl_mbox_read(mbox, 0, NULL);
if (ptr == NULL)
break;
if (i < 10)
printf("read in: %s\r\n", ptr);
acl_myfree(ptr);
}
printf(">>>nsend: %d / %d, nread: %d / %d\r\n",
(int) acl_mbox_nsend(mbox), __oper_count,
(int) acl_mbox_nread(mbox), __oper_count);
acl_mbox_free(mbox, free_msg);
if (--__fibers_cur == 0) {
printf("---- All over now! ----\r\n");
printf("Enter any key to exit ...");
fflush(stdout);
getchar();
fiber_io_stop();
}
}
static void usage(const char *procname)
{
printf("usage: %s -h [help] -c nfibers\r\n", procname);
}
int main(int argc, char *argv[])
{
int ch, i;
while ((ch = getopt(argc, argv, "hc:n:")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
return 0;
case 'c':
__fibers_max = atoi(optarg);
break;
case 'n':
__oper_count = atoi(optarg);
break;
default:
break;
}
}
__fibers_cur = __fibers_max;
printf("fibers: %d\r\n", __fibers_max);
for (i = 0; i < __fibers_max; i++) {
ACL_MBOX *mbox = acl_mbox_create();
fiber_create(fiber_main, mbox, 64000);
}
fiber_schedule();
return 0;
}