fixed bugs of acl_fiber_cond_timedwait in acl_fiber_cond.c

This commit is contained in:
zsx 2018-12-25 11:32:42 +08:00
parent 50be7b26a6
commit 0004373bcd
10 changed files with 161 additions and 41 deletions

View File

@ -1,5 +1,8 @@
修改历史列表: 修改历史列表:
558) 2018.12.25
558.1) workaround: tbox::push 增加 bool 类型的返回值
557) 2018.12.19 557) 2018.12.19
557.1) feature: connect_pool 支持放置非自己创建的连接对象,同时支持在 peek 连接 557.1) feature: connect_pool 支持放置非自己创建的连接对象,同时支持在 peek 连接
时,仅从已有的连接中提取而不是创建新的连接 时,仅从已有的连接中提取而不是创建新的连接

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include "../acl_cpp_define.hpp" #include "../acl_cpp_define.hpp"
#include <list> #include <list>
#include <stdlib.h>
#include "thread_mutex.hpp" #include "thread_mutex.hpp"
#include "thread_cond.hpp" #include "thread_cond.hpp"
#include "noncopyable.hpp" #include "noncopyable.hpp"
@ -74,14 +75,22 @@ public:
/** /**
* *
* @param t {T*} * @param t {T*}
* @return {bool}
*/ */
void push(T* t) bool push(T* t)
{ {
lock_.lock(); if (lock_.lock() == false) {
abort();
}
tbox_.push_back(t); tbox_.push_back(t);
size_++; size_++;
lock_.unlock(); if (lock_.unlock() == false) {
cond_.notify(); abort();
}
if (cond_.notify() == false) {
abort();
}
return true;
} }
/** /**
@ -101,11 +110,15 @@ public:
{ {
long long n = ((long long) wait_ms) * 1000; long long n = ((long long) wait_ms) * 1000;
bool found_flag; bool found_flag;
lock_.lock(); if (lock_.lock() == false) {
abort();
}
while (true) { while (true) {
T* t = peek(found_flag); T* t = peek(found_flag);
if (found_flag) { if (found_flag) {
lock_.unlock(); if (lock_.unlock() == false) {
abort();
}
if (found) { if (found) {
*found = found_flag; *found = found_flag;
} }
@ -114,7 +127,9 @@ public:
// 注意调用顺序,必须先调用 wait 再判断 wait_ms // 注意调用顺序,必须先调用 wait 再判断 wait_ms
if (!cond_.wait(n, true) && wait_ms >= 0) { if (!cond_.wait(n, true) && wait_ms >= 0) {
lock_.unlock(); if (lock_.unlock() == false) {
abort();
}
if (found) { if (found) {
*found = false; *found = false;
} }
@ -135,12 +150,16 @@ public:
public: public:
void lock(void) void lock(void)
{ {
lock_.lock(); if (lock_.lock() == false) {
abort();
}
} }
void unlock(void) void unlock(void)
{ {
lock_.unlock(); if (lock_.unlock() == false) {
abort();
}
} }
private: private:

View File

@ -25,9 +25,15 @@ FIBER_API ACL_FIBER* acl_fiber_create(void (*fn)(ACL_FIBER*, void*),
/** /**
* get the fibers count in deading status * get the fibers count in deading status
* @retur {int} * @return {unsigned}
*/ */
FIBER_API int acl_fiber_ndead(void); FIBER_API unsigned acl_fiber_ndead(void);
/**
* get the fibers count in aliving status
* @return {unsigned}
*/
FIBER_API unsigned acl_fiber_number(void);
/** /**
* create one fiber in background for freeing the dead fibers, specify the * create one fiber in background for freeing the dead fibers, specify the

View File

@ -492,12 +492,20 @@ int acl_fiber_yield(void)
#endif #endif
} }
int acl_fiber_ndead(void) unsigned acl_fiber_ndead(void)
{ {
if (__thread_fiber == NULL) { if (__thread_fiber == NULL) {
return 0; return 0;
} }
return ring_size(&__thread_fiber->dead); return (unsigned) ring_size(&__thread_fiber->dead);
}
unsigned acl_fiber_number(void)
{
if (__thread_fiber == NULL) {
return 0;
}
return __thread_fiber->slot;
} }
static void fbase_init(FIBER_BASE *fbase, int flag) static void fbase_init(FIBER_BASE *fbase, int flag)

View File

@ -157,6 +157,7 @@ int acl_fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_EVENT *event,
return EINVAL; return EINVAL;
} }
while (1) {
if (read_wait(fbase->event_in, delay_ms) == -1) { if (read_wait(fbase->event_in, delay_ms) == -1) {
if (acl_fiber_event_wait(event) == -1) { if (acl_fiber_event_wait(event) == -1) {
msg_fatal("%s(%d), %s: wait event error", msg_fatal("%s(%d), %s: wait event error",
@ -166,7 +167,20 @@ int acl_fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_EVENT *event,
return ETIMEDOUT; return ETIMEDOUT;
} }
__ll_lock(cond);
if (atomic_int64_cas(cond->atomic, 0, 1) == 0) {
break;
}
__ll_unlock(cond);
}
if (fbase_event_wait(fbase) == -1) { if (fbase_event_wait(fbase) == -1) {
if (atomic_int64_cas(cond->atomic, 1, 0) != 1) {
msg_fatal("%s(%d), %s: cond corrupt",
__FILE__, __LINE__, __FUNCTION__);
}
__ll_unlock(cond);
if (acl_fiber_event_wait(event) == -1) { if (acl_fiber_event_wait(event) == -1) {
msg_fatal("%s(%d), %s: wait event error", msg_fatal("%s(%d), %s: wait event error",
__FILE__, __LINE__, __FUNCTION__); __FILE__, __LINE__, __FUNCTION__);
@ -175,6 +189,12 @@ int acl_fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_EVENT *event,
return EINVAL; return EINVAL;
} }
if (atomic_int64_cas(cond->atomic, 1, 0) != 1) {
msg_fatal("%s(%d), %s: cond corrupt",
__FILE__, __LINE__, __FUNCTION__);
}
__ll_unlock(cond);
if (acl_fiber_event_wait(event) == -1) { if (acl_fiber_event_wait(event) == -1) {
DETACHE; DETACHE;
msg_error("acl_fiber_event_wait error"); msg_error("acl_fiber_event_wait error");

View File

@ -1,4 +1,9 @@
99) 2018.12.25
99.1) bugfix: acl_fiber_cond.c 中如果多个线程调用同一个条件变量的的
acl_fiber_cond_timedwait 方法时,可能会存在超时值失效的情况
99.2) workaround: fiber_tbox::push 增加 bool 返回值
98) 2018.12.20 98) 2018.12.20
98.1) buffix: fiber_server.cpp 当有连接未断开时,进程无法正常退出 98.1) buffix: fiber_server.cpp 当有连接未断开时,进程无法正常退出

View File

@ -168,6 +168,18 @@ public:
*/ */
static unsigned int delay(unsigned int milliseconds); static unsigned int delay(unsigned int milliseconds);
/**
*
* @return {unsigned}
*/
static unsigned alive_number(void);
/**
* 退
* @return {unsigned}
*/
static unsigned dead_number(void);
/** /**
* 线线 hook API * 线线 hook API
* hook API * hook API

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include "fiber_cpp_define.hpp" #include "fiber_cpp_define.hpp"
#include <list> #include <list>
#include <stdlib.h>
#include "fiber_event.hpp" #include "fiber_event.hpp"
#include "fiber_cond.hpp" #include "fiber_cond.hpp"
@ -82,22 +83,35 @@ public:
* ( pop fiber_tbox ), * ( pop fiber_tbox ),
* true push fiber_tbox * true push fiber_tbox
* 访 * 访
* @return {bool}
*/ */
void push(T* t, bool notify_first = true) bool push(T* t, bool notify_first = true)
{ {
// 先加锁 // 先加锁
event_.wait(); if (event_.wait() == false) {
abort();
}
// 向队列中添加消息对象 // 向队列中添加消息对象
tbox_.push_back(t); tbox_.push_back(t);
size_++; size_++;
if (notify_first) { if (notify_first) {
cond_.notify(); if (cond_.notify() == false) {
event_.notify(); abort();
}
if (event_.notify() == false) {
abort();
}
return true;
} else { } else {
event_.notify(); if (event_.notify() == false) {
cond_.notify(); abort();
}
if (cond_.notify() == false) {
abort();
}
return true;
} }
} }
@ -117,11 +131,15 @@ public:
T* pop(int wait_ms = -1, bool* found = NULL) T* pop(int wait_ms = -1, bool* found = NULL)
{ {
bool found_flag; bool found_flag;
event_.wait(); if (event_.wait() == false) {
abort();
}
while (true) { while (true) {
T* t = peek(found_flag); T* t = peek(found_flag);
if (found_flag) { if (found_flag) {
event_.notify(); if (event_.notify() == false) {
abort();
}
if (found) { if (found) {
*found = found_flag; *found = found_flag;
} }
@ -130,7 +148,9 @@ public:
// 注意调用顺序,必须先调用 wait 再判断 wait_ms // 注意调用顺序,必须先调用 wait 再判断 wait_ms
if (!cond_.wait(event_, wait_ms) && wait_ms >= 0) { if (!cond_.wait(event_, wait_ms) && wait_ms >= 0) {
event_.notify(); if (event_.notify() == false) {
abort();
}
if (found) { if (found) {
*found = false; *found = false;
} }
@ -151,12 +171,16 @@ public:
public: public:
void lock(void) void lock(void)
{ {
event_.wait(); if (event_.wait() == false) {
abort();
}
} }
void unlock(void) void unlock(void)
{ {
event_.notify(); if (event_.notify() == false) {
abort();
}
} }
private: private:

View File

@ -80,6 +80,16 @@ unsigned int fiber::delay(unsigned int milliseconds)
return acl_fiber_delay(milliseconds); return acl_fiber_delay(milliseconds);
} }
unsigned fiber::alive_number(void)
{
return acl_fiber_number();
}
unsigned fiber::dead_number(void)
{
return acl_fiber_ndead();
}
void fiber::hook_api(bool on) void fiber::hook_api(bool on)
{ {
acl_fiber_hook_api(on ? 1 : 0); acl_fiber_hook_api(on ? 1 : 0);

View File

@ -8,6 +8,7 @@ static int __delay = 0;
static acl::atomic_long __producing = 0; static acl::atomic_long __producing = 0;
static acl::atomic_long __consuming = 0; static acl::atomic_long __consuming = 0;
static acl::atomic_long __timedout = 0;
class myobj class myobj
{ {
@ -56,8 +57,9 @@ private:
class consumer : public acl::thread class consumer : public acl::thread
{ {
public: public:
consumer(acl::fiber_tbox<myobj>& tbox) consumer(acl::fiber_tbox<myobj>& tbox, int timeout)
: tbox_(tbox) : tbox_(tbox)
, timeout_(timeout)
{ {
this->set_detachable(false); this->set_detachable(false);
} }
@ -67,22 +69,29 @@ private:
private: private:
acl::fiber_tbox<myobj>& tbox_; acl::fiber_tbox<myobj>& tbox_;
int timeout_;
void* run(void) void* run(void)
{ {
int n = 0;
for (int i = 0; i < __nloop; i++) { for (int i = 0; i < __nloop; i++) {
myobj* o = tbox_.pop(10); myobj* o = tbox_.pop(timeout_);
if (o && i <= 10) { if (o && i <= 10) {
o->test(); o->test();
} }
if (!o) { if (o) {
__consuming++;
n++;
} else {
__timedout++;
printf("pop timeout\r\n"); printf("pop timeout\r\n");
} }
delete o; delete o;
__consuming++;
} }
printf("consumer over=%lld\r\n", __consuming.value()); printf("consumer-%lu over=%lld, n=%d\r\n",
acl::thread::self(), __consuming.value(), n);
return NULL; return NULL;
} }
}; };
@ -94,18 +103,19 @@ static void usage(const char* procname)
printf("usage: %s -h [help]\r\n" printf("usage: %s -h [help]\r\n"
" -c threads[default: 1]\r\n" " -c threads[default: 1]\r\n"
" -n nloop[default: 2]\r\n" " -n nloop[default: 2]\r\n"
" -t timeout[default: -1 ms]\r\n"
" -d delay[default: 0 ms]\r\n" " -d delay[default: 0 ms]\r\n"
, procname); , procname);
} }
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
int ch, nthreads = 1; int ch, nthreads = 1, timeout = -1;
acl::acl_cpp_init(); acl::acl_cpp_init();
acl::log::stdout_open(true); acl::log::stdout_open(true);
while ((ch = getopt(argc, argv, "hc:n:d:")) > 0) { while ((ch = getopt(argc, argv, "hc:n:d:t:")) > 0) {
switch (ch) { switch (ch) {
case 'h': case 'h':
usage(argv[0]); usage(argv[0]);
@ -119,6 +129,9 @@ int main(int argc, char *argv[])
case 'd': case 'd':
__delay = atoi(optarg); __delay = atoi(optarg);
break; break;
case 't':
timeout = atoi(optarg);
break;
default: default:
break; break;
} }
@ -130,12 +143,12 @@ int main(int argc, char *argv[])
std::vector<acl::thread*> threads; std::vector<acl::thread*> threads;
for (int i = 0; i < nthreads; i++) { for (int i = 0; i < nthreads; i++) {
acl::thread* thr = new consumer(tbox); acl::thread* thr = new consumer(tbox, timeout);
threads.push_back(thr); threads.push_back(thr);
thr->start(); thr->start();
} }
sleep(10); sleep(1);
for (int i = 0; i < nthreads; i++) { for (int i = 0; i < nthreads; i++) {
acl::thread* thr = new producer(tbox); acl::thread* thr = new producer(tbox);
threads.push_back(thr); threads.push_back(thr);
@ -149,8 +162,8 @@ int main(int argc, char *argv[])
delete *it; delete *it;
} }
printf("all over, nloop=%d, producing=%lld, consuming=%lld\r\n", printf("all over, nloop=%d, producing=%lld, consuming=%lld, timedout=%lld\r\n",
__nloop, __producing.value(), __consuming.value()); __nloop, __producing.value(), __consuming.value(), __timedout.value());
return 0; return 0;
} }