add fiber_cond.c

This commit is contained in:
zsx 2018-11-28 15:51:22 +08:00
parent 9b26998122
commit 9cd475d83f
6 changed files with 300 additions and 2 deletions

View File

@ -0,0 +1,76 @@
/**
* Copyright (C) 2015-2018 IQIYI
* All rights reserved.
*
* AUTHOR(S)
* Zheng Shuxin
* E-mail: zhengshuxin@qiyi.com
*
* VERSION
* Mon 26 Nov 2018 11:25:42 AM CST
*/
#ifndef FIBER_COND_INCLUDE_H
#define FIBER_COND_INCLUDE_H
#include "fiber_define.h"
#include "fiber_event.h"
#if !defined(_WIN32) && !defined(_WIN64)
#ifdef __cplusplus
extern "C" {
#endif
/* fiber_cond.h */
/**
* fiber_cond object look like pthread_cond_t which is used between threads
* and fibers
*/
typedef struct ACL_FIBER_COND ACL_FIBER_COND;
/**
* create fiber cond which can be used in fibers more or threads mode
* @param flag {unsigned} current not used, just for the future extend
* @return {ACL_FIBER_COND *}
*/
FIBER_API ACL_FIBER_COND *acl_fiber_cond_create(unsigned flag);
/**
* free cond created by acl_fiber_cond_create
* @param cond {ACL_FIBER_COND *}
*/
FIBER_API void acl_fiber_cond_free(ACL_FIBER_COND *cond);
/**
* wait for cond event to be signaled
* @param cond {ACL_FIBER_COND *}
* @param event {ACL_FIBER_EVENT *} must be owned by the current caller
* @return {int} return 0 if ok or return error value
*/
FIBER_API int acl_fiber_cond_wait(ACL_FIBER_COND *cond, ACL_FIBER_EVENT *event);
/**
* wait for cond event to be signaled with the specified timeout
* @param cond {ACL_FIBER_COND *}
* @return {int} return 0 if ok or return error value, when timedout ETIMEDOUT
* will be returned
*/
FIBER_API int acl_fiber_cond_timedwait(ACL_FIBER_COND *cond,
ACL_FIBER_EVENT *event, int delay_ms);
/**
* signle the cond which will wakeup one waiter for the cond to be signaled
* @param cond {ACL_FIBER_COND *}
* @return {int} return 0 if ok or return error value
*/
FIBER_API int acl_fiber_cond_signal(ACL_FIBER_COND *cond);
#ifdef __cplusplus
}
#endif
#endif // !defined(_WIN32) && !defined(_WIN64)
#endif

View File

@ -169,6 +169,7 @@
<ClInclude Include="include\fiber\fiber_base.h" />
<ClInclude Include="include\fiber\fiber_channel.h" />
<ClInclude Include="include\fiber\fiber_define.h" />
<ClInclude Include="include\fiber\fiber_cond.h" />
<ClInclude Include="include\fiber\fiber_event.h" />
<ClInclude Include="include\fiber\fiber_hook.h" />
<ClInclude Include="include\fiber\fiber_lock.h" />
@ -236,9 +237,11 @@
<ClCompile Include="src\event\event_poll.c" />
<ClCompile Include="src\event\event_select.c" />
<ClCompile Include="src\event\event_wmsg.c" />
<ClCompile Include="src\fbase_event.c" />
<ClCompile Include="src\fiber.c" />
<ClCompile Include="src\fiber\fiber_unix.c" />
<ClCompile Include="src\fiber\fiber_win.c" />
<ClCompile Include="src\fiber_cond.c" />
<ClCompile Include="src\fiber_event.c" />
<ClCompile Include="src\fiber_io.c" />
<ClCompile Include="src\fiber_lock.c" />
@ -262,4 +265,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
</Project>

View File

@ -150,9 +150,15 @@
<ClCompile Include="src\event.c">
<Filter>源文件</Filter>
</ClCompile>
<ClCompile Include="src\fbase_event.c">
<Filter>源文件</Filter>
</ClCompile>
<ClCompile Include="src\fiber.c">
<Filter>源文件</Filter>
</ClCompile>
<ClCompile Include="src\fiber_cond.c">
<Filter>源文件</Filter>
</ClCompile>
<ClCompile Include="src\fiber_event.c">
<Filter>源文件</Filter>
</ClCompile>
@ -269,6 +275,9 @@
<ClInclude Include="include\fiber\fiber_define.h">
<Filter>头文件</Filter>
</ClInclude>
<ClInclude Include="include\fiber\fiber_cond.h">
<Filter>头文件</Filter>
</ClInclude>
<ClInclude Include="include\fiber\fiber_event.h">
<Filter>头文件</Filter>
</ClInclude>
@ -297,4 +306,4 @@
<Filter>源文件\event</Filter>
</ClInclude>
</ItemGroup>
</Project>
</Project>

View File

@ -155,6 +155,7 @@
<ClInclude Include="include\fiber\fiber_base.h" />
<ClInclude Include="include\fiber\fiber_channel.h" />
<ClInclude Include="include\fiber\fiber_define.h" />
<ClInclude Include="include\fiber\fiber_cond.h" />
<ClInclude Include="include\fiber\fiber_event.h" />
<ClInclude Include="include\fiber\fiber_hook.h" />
<ClInclude Include="include\fiber\fiber_lock.h" />
@ -223,9 +224,11 @@
<ClCompile Include="src\event\event_poll.c" />
<ClCompile Include="src\event\event_select.c" />
<ClCompile Include="src\event\event_wmsg.c" />
<ClCompile Include="src\fbase_event.c" />
<ClCompile Include="src\fiber.c" />
<ClCompile Include="src\fiber\fiber_unix.c" />
<ClCompile Include="src\fiber\fiber_win.c" />
<ClCompile Include="src\fiber_cond.c" />
<ClCompile Include="src\fiber_event.c" />
<ClCompile Include="src\fiber_io.c" />
<ClCompile Include="src\fiber_lock.c" />

View File

@ -105,6 +105,9 @@
<ClInclude Include="include\fiber\fiber_define.h">
<Filter>头文件</Filter>
</ClInclude>
<ClInclude Include="include\fiber\fiber_cond.h">
<Filter>头文件</Filter>
</ClInclude>
<ClInclude Include="include\fiber\fiber_event.h">
<Filter>头文件</Filter>
</ClInclude>
@ -239,9 +242,15 @@
<ClCompile Include="src\event.c">
<Filter>源文件</Filter>
</ClCompile>
<ClCompile Include="src\fbase_event.c">
<Filter>源文件</Filter>
</ClCompile>
<ClCompile Include="src\fiber.c">
<Filter>源文件</Filter>
</ClCompile>
<ClCompile Include="src\fiber_cond.c">
<Filter>源文件</Filter>
</ClCompile>
<ClCompile Include="src\fiber_event.c">
<Filter>源文件</Filter>
</ClCompile>

View File

@ -0,0 +1,198 @@
#include "stdafx.h"
#include "common.h"
#ifdef SYS_UNIX
#include "fiber/libfiber.h"
#include "fiber/fiber_cond.h"
#include "fiber.h"
struct ACL_FIBER_COND {
RING waiters;
ATOMIC *atomic;
long long value;
pthread_mutex_t mutex;
};
ACL_FIBER_COND *acl_fiber_cond_create(unsigned flag fiber_unused)
{
pthread_mutexattr_t attr;
ACL_FIBER_COND *cond = (ACL_FIBER_COND *)
calloc(1, sizeof(ACL_FIBER_COND));
ring_init(&cond->waiters);
cond->atomic = atomic_new();
atomic_set(cond->atomic, &cond->value);
atomic_int64_set(cond->atomic, 0);
pthread_mutexattr_init(&attr);
pthread_mutex_init(&cond->mutex, &attr);
pthread_mutexattr_destroy(&attr);
return cond;
}
void acl_fiber_cond_free(ACL_FIBER_COND *cond)
{
atomic_free(cond->atomic);
free(cond);
}
static void __ll_lock(ACL_FIBER_COND *cond)
{
pthread_mutex_lock(&cond->mutex);
}
static void __ll_unlock(ACL_FIBER_COND *cond)
{
pthread_mutex_unlock(&cond->mutex);
}
#define DETACHE do { \
__ll_lock(cond); \
fbase_event_close(fbase); \
ring_detach(&fbase->event_waiter); \
__ll_unlock(cond); \
if (fbase->flag & FBASE_F_BASE) { \
fbase_free(fbase); \
} \
} while (0)
int acl_fiber_cond_wait(ACL_FIBER_COND *cond, ACL_FIBER_EVENT *event)
{
ACL_FIBER *fiber = acl_fiber_running();
FIBER_BASE *fbase;
fbase = fiber ? &fiber->base : fbase_alloc();
fbase_event_open(fbase);
__ll_lock(cond);
ring_prepend(&cond->waiters, &fbase->event_waiter);
__ll_unlock(cond);
if (acl_fiber_event_notify(event) != 0) {
DETACHE;
msg_error("acl_fiber_event_notify failed");
return EINVAL;
}
if (fbase_event_wait(fbase) == -1) {
DETACHE;
return EINVAL;
}
if (acl_fiber_event_wait(event) == -1) {
DETACHE;
msg_error("acl_fiber_event_wait error");
return EINVAL;
}
fbase_event_close(fbase);
if (fbase->flag & FBASE_F_BASE) {
fbase_free(fbase);
}
return 0;
}
static int read_wait(int fd, int delay)
{
struct pollfd fds;
fds.events = POLLIN;
fds.fd = fd;
for (;;) {
switch (poll(&fds, 1, delay)) {
#ifdef SYS_WIN
case SOCKET_ERROR:
#else
case -1:
#endif
if (acl_fiber_last_error() == EINTR) {
continue;
}
return -1;
case 0:
acl_fiber_set_error(ETIMEDOUT);
return -1;
default:
if ((fds.revents & POLLIN)) {
return 0;
}
if (fds.revents & (POLLHUP | POLLERR | POLLNVAL)) {
return 0;
}
return -1;
}
}
}
int acl_fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_EVENT *event,
int delay_ms)
{
ACL_FIBER *fiber = acl_fiber_running();
FIBER_BASE *fbase;
fbase = fiber ? &fiber->base : fbase_alloc();
fbase_event_open(fbase);
__ll_lock(cond);
ring_prepend(&cond->waiters, &fbase->event_waiter);
__ll_unlock(cond);
if (acl_fiber_event_notify(event) != 0) {
DETACHE;
msg_error("acl_fiber_event_notify failed");
return EINVAL;
}
if (read_wait(fbase->event_in, delay_ms) == -1) {
DETACHE;
return ETIMEDOUT;
}
if (fbase_event_wait(fbase) == -1) {
DETACHE;
return EINVAL;
}
if (acl_fiber_event_wait(event) == -1) {
DETACHE;
msg_error("acl_fiber_event_wait error");
return EINVAL;
}
fbase_event_close(fbase);
if (fbase->flag & FBASE_F_BASE) {
fbase_free(fbase);
}
return 0;
}
int acl_fiber_cond_signal(ACL_FIBER_COND *cond)
{
FIBER_BASE *waiter;
RING *head;
__ll_lock(cond);
head = ring_pop_head(&cond->waiters);
if (head) {
waiter = RING_TO_APPL(head, FIBER_BASE, event_waiter);
} else {
waiter = NULL;
}
__ll_unlock(cond);
if (waiter && fbase_event_wakeup(waiter) == -1) {
return EINVAL;
}
return 0;
}
#endif // SYS_UNIX