stealtask (continue)

This commit is contained in:
fasiondog 2020-04-30 01:50:27 +08:00
parent 785e515475
commit 899569941a
10 changed files with 226 additions and 212 deletions

View File

@ -14,26 +14,29 @@
namespace hku {
StealTaskBase::StealTaskBase() {
m_done = false;
m_runner = NULL;
}
StealTaskBase::StealTaskBase() : m_done(false) {}
StealTaskBase::~StealTaskBase() {}
void StealTaskBase::join() {
HKU_CHECK(m_group, "This taks had not be added to any task group!");
auto runner = m_group->getRunnerByThreadId(std::this_thread::get_id());
runner->taskJoin(shared_from_this());
/*if (m_runner) {
m_runner->taskJoin(shared_from_this());
if (StealTaskRunner::m_local_runner) {
// 当前在子线程中
StealTaskRunner::m_local_runner->taskJoin(shared_from_this());
} else {
HKU_ERROR("Invalid runner!");
}*/
// 当前在主线程中
HKU_CHECK(m_group, "This taks had not be added to any task group!");
m_group->taskJoinInMaster(shared_from_this());
}
}
void StealTaskBase::invoke() {
run();
try {
run();
} catch (std::exception& e) {
HKU_ERROR(e.what());
} catch (...) {
HKU_ERROR("Unknown error in task runnin!");
}
m_done = true;
}

View File

@ -54,18 +54,12 @@ private:
// StealTaskRunner 实际执行任务
void invoke();
// StealTaskGroup 设置
void setTaskRunner(StealTaskRunner *runner) {
m_runner = runner;
}
void setTaskGroup(StealTaskGroup *group) {
m_group = group;
}
private:
mutable bool m_done; // 标记该任务是否已执行完毕
mutable StealTaskRunner *m_runner;
mutable bool m_done; // 标记该任务是否已执行完毕
StealTaskGroup *m_group; // 任务组指针
};
@ -78,6 +72,17 @@ typedef StealTaskPtr TaskPtr;
typedef StealTaskList TaskList;
typedef StealTaskListPtr TaskListPtr;
/**
* 线
*/
class StopTask : public StealTaskBase {
public:
StopTask() {}
virtual ~StopTask() {}
void run(){};
};
} // namespace hku
#endif /* STEALTASKBASE_H_ */

View File

@ -12,17 +12,6 @@
namespace hku {
/**
*
*/
class StopTask : public StealTaskBase {
public:
StopTask() {}
virtual ~StopTask() {}
void run(){};
};
/**
* 线StopTask
*/
@ -34,24 +23,22 @@ public:
virtual ~WatchTask() {}
void run() {
const StealTaskList& taskList = _pTaskGroup->getTaskList();
/*const StealTaskList& taskList = _pTaskGroup->getTaskList();
size_t total = taskList.size();
for (size_t i = 0; i < total; i++) {
taskList[i]->join();
}
}*/
_pTaskGroup->cancel();
//_pTaskGroup->cancel();
}
private:
StealTaskGroup* _pTaskGroup;
};
StealTaskGroup::StealTaskGroup(size_t groupSize) {
StealTaskGroup::StealTaskGroup(size_t groupSize) : m_currentRunnerId(0), m_done(false) {
m_runnerNum = (groupSize != 0) ? groupSize : std::thread::hardware_concurrency();
m_runnerList.reserve(m_runnerNum);
_stopTask = StealTaskPtr(new StopTask());
_currentRunnerId = 0;
// 创建工作线程队列
for (auto i = 0; i < m_runnerNum; i++) {
@ -60,33 +47,26 @@ StealTaskGroup::StealTaskGroup(size_t groupSize) {
// 创建工作线程
for (size_t i = 0; i < m_runnerNum; i++) {
m_runnerList.push_back(std::make_shared<StealTaskRunner>(this, i, _stopTask));
m_runnerList.push_back(std::make_shared<StealTaskRunner>(this, i));
}
// 启动各个工作线程
for (auto runnerIter = m_runnerList.begin(); runnerIter != m_runnerList.end(); ++runnerIter) {
(*runnerIter)->start();
m_thread_runner_map[(*runnerIter)->get_thread_id()] = *runnerIter;
}
}
StealTaskGroup::~StealTaskGroup() {}
StealTaskRunnerPtr StealTaskGroup::getRunner(size_t id) {
StealTaskRunnerPtr result;
if (id >= m_runnerNum) {
std::cerr << "[StealTaskGroup::getRunner] Invalid id: " << id << std::endl;
return result;
StealTaskGroup::~StealTaskGroup() {
if (!m_done) {
join();
}
return m_runnerList[id];
}
StealTaskRunnerPtr StealTaskGroup::getCurrentRunner() {
StealTaskRunnerPtr result = m_runnerList[_currentRunnerId];
_currentRunnerId++;
if (_currentRunnerId >= m_runnerNum) {
_currentRunnerId = 0;
StealTaskRunnerPtr result = m_runnerList[m_currentRunnerId];
m_currentRunnerId++;
if (m_currentRunnerId >= m_runnerNum) {
m_currentRunnerId = 0;
}
return result;
}
@ -94,43 +74,113 @@ StealTaskRunnerPtr StealTaskGroup::getCurrentRunner() {
StealTaskPtr StealTaskGroup::addTask(const StealTaskPtr& task) {
HKU_CHECK(task, "Input task is nullptr!");
task->setTaskGroup(this);
if (StealTaskRunner::m_local_queue) {
// 如果是在子线程中执行增加任务,则直接插入子任务队列的头部
StealTaskRunner::m_local_queue->push_front(task);
} else {
m_master_queue.push(task);
// 如果是在主线程中增加任务,则轮流插入主线程和各子线程任务队列的尾部
if (m_currentRunnerId >= m_runnerNum) {
m_master_queue.push(task);
} else {
m_runner_queues[m_currentRunnerId]->push_back(task);
}
m_currentRunnerId++;
if (m_currentRunnerId > m_runnerNum) {
m_currentRunnerId = 0;
}
}
_taskList.push_back(task);
StealTaskRunnerPtr runner = getCurrentRunner();
task->setTaskRunner(runner.get());
runner->putTask(task);
return task;
}
void StealTaskGroup::cancel() {
RunnerList::iterator runnerIter;
for (runnerIter = m_runnerList.begin(); runnerIter != m_runnerList.end(); runnerIter++) {
(*runnerIter)->putTask(_stopTask);
}
}
void StealTaskGroup::join() {
if (m_done) {
return;
}
// 向主任务队列插入“停止”任务
std::vector<StealTaskPtr> stopTaskList;
for (auto i = 0; i < m_runnerNum; i++) {
auto stopTask = std::make_shared<StopTask>();
m_master_queue.push(stopTask);
}
// 等待“停止”任务被执行
for (auto& task : stopTaskList) {
task->join();
}
m_done = true;
RunnerList::iterator runnerIter;
for (runnerIter = m_runnerList.begin(); runnerIter != m_runnerList.end(); runnerIter++) {
for (runnerIter = m_runnerList.begin(); runnerIter != m_runnerList.end(); ++runnerIter) {
(*runnerIter)->join();
}
}
void StealTaskGroup::stop() {
StealTaskPtr watchTask(new WatchTask(this));
StealTaskRunnerPtr runner = getCurrentRunner();
watchTask->setTaskRunner(runner.get());
runner->putWatchTask(watchTask);
if (m_done) {
return;
}
m_done = true;
// 指示各子线程需要停止
for (auto runnerIter = m_runnerList.begin(); runnerIter != m_runnerList.end(); ++runnerIter) {
(*runnerIter)->stop();
}
// 同时向主任务队列插入任务停止标识,以便唤醒休眠状态的子线程
for (auto i = 0; i < m_runnerNum; i++) {
m_master_queue.push(std::make_shared<StopTask>());
}
}
void StealTaskGroup::run() {
stop();
join();
void StealTaskGroup::taskJoinInMaster(const StealTaskPtr& waitingFor) {
while (!waitingFor->done()) {
auto task = m_master_queue.try_pop();
if (task && !task->done()) {
task->invoke();
} else {
// steal(waitingFor);
}
}
}
void StealTaskGroup::stealInMaster(const StealTaskPtr& waitingFor) {
StealTaskPtr task;
#ifdef _WIN32
std::srand((unsigned)time(NULL));
#else
struct timeval tstart;
struct timezone tz;
gettimeofday(&tstart, &tz);
size_t temp = tstart.tv_usec;
std::srand(temp);
#endif
size_t total = m_runnerNum;
size_t ran_num = std::rand() % total;
for (size_t i = 0; i < total; i++) {
if (waitingFor && waitingFor->done()) {
break;
}
task = m_runner_queues[ran_num]->try_steal();
if (task) {
break;
}
std::this_thread::yield();
ran_num = (ran_num + 1) % total;
}
if (task && !task->done()) {
task->invoke();
}
}
} // namespace hku

View File

@ -43,11 +43,6 @@ public:
return m_runnerNum;
}
StealTaskRunnerPtr getRunnerByThreadId(std::thread::id thread_id) {
return m_thread_runner_map[thread_id];
}
StealTaskRunnerPtr getRunner(size_t id);
StealTaskRunnerPtr getCurrentRunner();
//增加一个任务
@ -57,28 +52,23 @@ public:
void stop();
//强制终止
void cancel();
// void cancel();
//等待执行结束
void join();
void run();
const StealTaskList& getTaskList() const {
return _taskList;
}
void taskJoinInMaster(const StealTaskPtr& waitingFor);
void stealInMaster(const StealTaskPtr& waitingFor);
private:
typedef std::vector<StealTaskRunnerPtr> RunnerList;
RunnerList m_runnerList;
StealTaskList _taskList;
size_t m_runnerNum;
StealTaskPtr _stopTask;
size_t _currentRunnerId; //记录当前执行addTask任务时需放入的TaskRunnerid用于均衡任务分配
size_t m_currentRunnerId; //记录当前执行addTask任务时需放入的TaskRunnerId用于均衡任务分配
bool m_done; // 任务组执行结束标志
StealMasterQueue m_master_queue; // 主任务队列
std::vector<std::shared_ptr<StealRunnerQueue>> m_runner_queues; // 任务队列(每个工作线程一个)
std::unordered_map<std::thread::id, StealTaskRunnerPtr> m_thread_runner_map;
};
typedef std::shared_ptr<StealTaskGroup> StealTaskGroupPtr;

View File

@ -19,61 +19,35 @@
namespace hku {
StealTaskRunner::StealTaskRunner(StealTaskGroup* group, size_t id, StealTaskPtr stopTask) {
StealTaskRunner::StealTaskRunner(StealTaskGroup* group, size_t id) : m_done(false) {
m_index = id;
m_group = group;
_stopTask = stopTask;
}
StealTaskRunner::~StealTaskRunner() {}
// 加入一个普通任务,将其放入私有队列的后端
void StealTaskRunner::putTask(const StealTaskPtr& task) {
QUEUE_LOCK;
m_queue.push_back(task);
// 尝试从自己的任务队列中提取一个任务,供自己执行
StealTaskPtr StealTaskRunner::takeTaskFromLocal() {
return m_local_queue->try_pop();
}
/**
*
* @param task -
*/
void StealTaskRunner::putWatchTask(const StealTaskPtr& task) {
QUEUE_LOCK;
m_queue.push_front(task);
// 阻塞等待直至从主线程任务队列中获取到任务
StealTaskPtr StealTaskRunner::takeTaskFromMasterAndWait() {
return m_local_group->m_master_queue.wait_and_pop();
}
/**
*
* @return StealTaskPtr -
*/
StealTaskPtr StealTaskRunner::takeTaskBySelf() {
QUEUE_LOCK;
StealTaskPtr result;
if (!m_queue.empty()) {
result = m_queue.back();
m_queue.pop_back();
}
return result;
}
/**
* 线使
* @return StealTaskPtr -
*/
StealTaskPtr StealTaskRunner::takeTaskByOther() {
QUEUE_LOCK;
StealTaskPtr result;
if (!m_queue.empty()) {
StealTaskPtr front = m_queue.front();
//如果提取的任务是停止任务,则放弃并返回空
if (front != _stopTask) {
result = front;
m_queue.pop_front();
// 尝试从其他子线程任务队列中偷取任务
StealTaskPtr StealTaskRunner::takeTaskFromOther() {
StealTaskPtr task;
auto total = m_local_group->m_runnerNum;
for (size_t i = 0; i < total; ++i) {
size_t index = (m_local_index + i + 1) % total;
task = m_local_group->m_runner_queues[index]->try_steal();
if (task && typeid(*task) != typeid(StopTask)) {
return task;
}
}
return result;
return task;
}
/**
@ -87,87 +61,73 @@ void StealTaskRunner::start() {
* 线
*/
void StealTaskRunner::join() {
m_thread.join();
if (m_thread.joinable()) {
m_thread.join();
}
}
void StealTaskRunner::stop() {
// 设置结束表示,同时在本地任务队列头部插入停止任务
m_done = true;
m_local_queue->push_front(std::make_shared<StopTask>());
}
/**
*
* 线
*/
void StealTaskRunner::run() {
m_thread_id = std::this_thread::get_id();
m_local_queue = m_group->m_runner_queues[m_index].get();
m_local_group = m_group;
m_local_runner = this;
m_local_queue = m_local_group->m_runner_queues[m_index].get();
m_local_index = m_index;
m_locla_need_stop = false;
StealTaskPtr task;
try {
while (task != _stopTask) {
task = takeTaskBySelf();
if (task) {
if (!task->done()) {
task->invoke();
}
while (!m_done && (!task || typeid(*task) != typeid(StopTask))) {
// 从本地队列中获取待执行任务
task = takeTaskFromLocal();
} else {
steal(StealTaskPtr());
// 如果本地队列中没有取到任务,则尝试从其他子线程队列中偷取任务
if (!task) {
task = takeTaskFromOther();
}
}
HKU_INFO("{} local size: {}", std::this_thread::get_id(), m_local_queue->size());
// 如果本地和其他子线程任务队列中都无法获取任务,则等待并直至从主任务队列中获取任务
if (!task) {
task = takeTaskFromMasterAndWait();
}
if (!task->done()) {
task->invoke();
}
std::this_thread::yield();
}
} catch (std::exception& e) {
HKU_ERROR(e.what());
} catch (...) {
std::cerr << "[TaskRunner::run] Some error!" << std::endl;
HKU_ERROR("Unknown error!");
}
}
/**
*
* 线
* @param waitingFor -
*/
void StealTaskRunner::taskJoin(const StealTaskPtr& waitingFor) {
while (!waitingFor->done()) {
StealTaskPtr task = takeTaskBySelf();
if (task) {
if (!task->done()) {
task->invoke();
}
} else {
steal(waitingFor);
}
}
}
void StealTaskRunner::steal(const StealTaskPtr& waitingFor) {
StealTaskPtr task;
#ifdef _WIN32
std::srand((unsigned)time(NULL));
#else
struct timeval tstart;
struct timezone tz;
gettimeofday(&tstart, &tz);
size_t temp = tstart.tv_usec;
std::srand(temp);
#endif
size_t total = m_group->size();
size_t ran_num = std::rand() % total;
for (size_t i = 0; i < total; i++) {
StealTaskRunnerPtr tr = m_group->getRunner(ran_num);
if (waitingFor && waitingFor->done()) {
break;
while (waitingFor && !waitingFor->done()) {
// 如果获取的任务有效且任务未执行,则执行该任务;
// 否则从其他子线程任务队列中进行偷取
auto task = takeTaskFromLocal();
if (!task) {
task = takeTaskFromOther();
}
if (tr && tr.get() != this) {
task = tr->takeTaskByOther();
if (task) {
break;
}
std::this_thread::yield();
ran_num = (ran_num + 1) % total;
if (task && !task->done()) {
task->invoke();
}
}
if (task && !task->done()) {
task->invoke();
std::this_thread::yield();
}
}

View File

@ -9,9 +9,10 @@
#ifndef STEALTASKRUNNER_H_
#define STEALTASKRUNNER_H_
#include <thread>
#include <mutex>
#include <atomic>
#include <deque>
#include <list>
#include "StealTaskBase.h"
#include "StealRunnerQueue.h"
@ -28,27 +29,28 @@ class StealTaskRunner {
friend class StealTaskBase;
public:
StealTaskRunner(StealTaskGroup* group, size_t id, StealTaskPtr stopTask);
/**
* 线
* @param group
* @param id id
* @param stopTask 线
*/
StealTaskRunner(StealTaskGroup* group, size_t id);
virtual ~StealTaskRunner();
private:
/**
*
* @param task
*/
void putTask(const StealTaskPtr&);
void putWatchTask(const StealTaskPtr&);
StealTaskPtr takeTaskBySelf();
StealTaskPtr takeTaskByOther();
StealTaskPtr takeTaskFromLocal(); // 从自己的任务队列中获取任务
StealTaskPtr takeTaskFromMasterAndWait(); // 等待直至从主线程队列中获取任务
StealTaskPtr takeTaskFromOther(); // 从其他线程队列偷取任务
void start();
void join();
void run();
void steal(const StealTaskPtr& waitingFor);
void taskJoin(const StealTaskPtr& waitingFor);
void stop();
StealTaskGroup* getTaskRunnerGroup() {
return m_group;
}
@ -60,19 +62,16 @@ private:
private:
size_t m_index; // 表示在任务组中的第几个线程
StealTaskGroup* m_group; // 所属任务组的指针
StealTaskPtr _stopTask;
inline static thread_local StealRunnerQueue* m_local_queue = nullptr; //本地任务队列
inline static thread_local StealTaskGroup* m_local_group = nullptr; //任务组指针
inline static thread_local StealTaskRunner* m_local_runner = nullptr; // 记录自身runner指针
inline static thread_local size_t m_local_index = 0; // 在任务组中的序号(m_index)
inline static thread_local bool m_locla_need_stop = false; // 线程停止运行指示
std::atomic_bool m_done; //// 线程终止指示
std::thread m_thread; // 本地工作线程
std::thread::id m_thread_id; // 线程id
// 线程内工作任务队列
std::mutex m_queue_mutex;
typedef std::deque<StealTaskPtr> Queue;
Queue m_queue;
};
typedef std::shared_ptr<StealTaskRunner> StealTaskRunnerPtr;

View File

@ -10,7 +10,7 @@
#include "doctest/doctest.h"
#include <hikyuu/utilities/thread/ThreadPool.h>
#include <hikyuu/utilities/SpendTimer.h>
#include <fmt/format.h>
#include <hikyuu/Log.h>
using namespace hku;
@ -25,8 +25,8 @@ TEST_CASE("test_ThreadPool") {
{
SPEND_TIME(test_temp);
ThreadPool tg;
for (int i = 0; i < 10; i++) {
tg.submit([=]() { std::cout << fmt::format("{}---------------------\n", i); });
for (int i = 0; i < 100; i++) {
tg.submit([=]() { fmt::print("{}: ----------------------\n", i); });
}
tg.join();
}

View File

@ -7,6 +7,7 @@
#include "doctest/doctest.h"
#include <hikyuu/Log.h>
#include <hikyuu/utilities/SpendTimer.h>
#include <hikyuu/utilities/task/StealTaskBase.h>
#include <hikyuu/utilities/task/StealTaskGroup.h>
@ -25,6 +26,7 @@ public:
virtual void run() {
HKU_INFO("{}: *****************", m_i);
// fmt::print("{}: ----------------------\n", m_i);
}
private:
@ -33,11 +35,15 @@ private:
/** @par 检测点 */
TEST_CASE("test_TaskGroup") {
TaskGroup tg;
for (int i = 0; i < 30; i++) {
tg.addTask(std::make_shared<TestTask>(i));
{
SPEND_TIME(test_temp);
TaskGroup tg(6);
for (int i = 0; i < 100; i++) {
tg.addTask(std::make_shared<TestTask>(i));
}
tg.join();
}
tg.run();
// tg.run();
}
/** @} */

View File

@ -86,6 +86,7 @@ target("small-test")
-- add files
add_files("./hikyuu/utilities/test_TaskGroup.cpp");
add_files("./hikyuu/utilities/test_ThreadPool.cpp");
--add_files("./hikyuu/hikyuu/**.cpp");
--add_files("./hikyuu/hikyuu/test_StockManager.cpp");
add_files("./hikyuu/test_main.cpp")

View File

@ -10,7 +10,7 @@ if is_mode("debug") then
else
set_configvar("LOG_ACTIVE_LEVEL", 2) -- 激活的日志级别
end
set_configvar("USE_SPDLOG_LOGGER", 1) -- 是否使用spdlog作为日志输出
set_configvar("USE_SPDLOG_LOGGER", 0) -- 是否使用spdlog作为日志输出
set_configvar("USE_SPDLOG_ASYNC_LOGGER", 0) -- 使用异步的spdlog
set_configvar("CHECK_ACCESS_BOUND", 1)
set_configvar("SUPPORT_SERIALIZATION", is_mode("release") and 1 or 0)