Merge pull request #342 from ChrisCatCP/master

refactor wait_group
This commit is contained in:
郑树新 2024-06-26 13:45:46 +08:00 committed by GitHub
commit fc6a460252
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 59 additions and 25 deletions

View File

@ -1,4 +1,5 @@
#pragma once
#include "acl_cpp/stdlib/atomic.hpp"
namespace acl {
@ -9,12 +10,12 @@ public:
wait_group(void);
~wait_group(void);
void add(size_t n);
void add(int n);
void done(void);
size_t wait(void);
void wait(void);
private:
size_t count_;
atomic_long state_;
fiber_tbox<unsigned long>* box_;
};

View File

@ -6,7 +6,7 @@ namespace acl {
wait_group::wait_group(void)
{
count_ = 0;
state_ = 0;
box_ = new acl::fiber_tbox<unsigned long>;
}
@ -15,13 +15,30 @@ wait_group::~wait_group(void)
delete box_;
}
void wait_group::add(size_t n)
void wait_group::add(int n)
{
count_ += n;
long long state = state_.add_fetch((long long)n << 32);
//高32位为任务数量
int c = (int)(state >> 32);
//低32位为等待者数量
uint32_t w = (uint32_t)state;
//count不能小于0
if(c < 0){
acl_msg_fatal("wait_group: negative wait_group counter");
}
void wait_group::done(void)
{
if(w != 0 && n > 0 && c == n){
acl_msg_fatal("wait_group: add called concurrently with wait");
}
if(c > 0 || w ==0){
return;
}
//检查state是否被修改
if(state_ != state){
acl_msg_fatal("wait_group: add called concurrently with wait");
}
//这里count为0了清空state并唤醒所有等待者
state_ = 0;
for (size_t i = 0; i < w; i++) {
#ifdef _DEBUG
unsigned long* tid = new unsigned long;
*tid = acl::thread::self();
@ -30,11 +47,23 @@ void wait_group::done(void)
box_->push(NULL);
#endif
}
}
size_t wait_group::wait(void)
void wait_group::done(void)
{
size_t i;
for (i = 0; i < count_; i++) {
add(-1);
}
void wait_group::wait(void)
{
for(;;){
long long state = state_;
int c = (int)(state >> 32);
uint32_t w = (uint32_t)state;
//没有任务直接返回
if(c == 0) return;
//等待者数量加一失败的话重新获取state
if(state_.cas(state, state + 1) == state){
bool found;
#ifdef _DEBUG
unsigned long* tid = box_->pop(-1, &found);
@ -44,8 +73,12 @@ size_t wait_group::wait(void)
(void) box_->pop(-1, &found);
assert(found);
#endif
if(state_ != 0){
acl_msg_fatal("wait_group: wait_group is reused before previous wait has returned");
}
return;
}
}
return i;
}
} // namespace acl

View File

@ -71,7 +71,7 @@ private:
// @override
void run(void) {
size_t ret = sync_.wait();
printf("All threads and fibers were done, ret=%zd\r\n", ret);
printf("All threads and fibers were done\r\n");
}
};