delete StealTask (无法解决在Python中的使用)

This commit is contained in:
fasiondog 2020-05-09 22:49:01 +08:00
parent be548312b0
commit 7c221d8624
12 changed files with 0 additions and 885 deletions

View File

@ -1,48 +0,0 @@
/*
* StealMasterQueue.cpp
*
* Copyright (c) hikyuu.org
*
* Created on: 2020-4-27
* Author: fasiondog
*/
#include "StealMasterQueue.h"
namespace hku {
// 将任务放入队列尾部
void StealMasterQueue::push(const StealTaskPtr& item) {
std::lock_guard<std::mutex> lk(m_mutex);
m_queue.push(item);
m_cond.notify_one();
}
// 工作线程等待并弹出头部任务,队列为空时,工作线程阻塞休眠并等待唤醒
StealTaskPtr StealMasterQueue::wait_and_pop() {
std::unique_lock<std::mutex> lk(m_mutex);
m_cond.wait(lk, [this] { return !m_queue.empty(); });
auto result = m_queue.front();
m_queue.pop();
return result;
}
// 尝试从队列头部弹出任务,如果队列为空则弹出空指针,不阻塞线程
StealTaskPtr StealMasterQueue::try_pop() {
std::lock_guard<std::mutex> lk(m_mutex);
StealTaskPtr result;
if (m_queue.empty()) {
return result;
}
result = m_queue.front();
m_queue.pop();
return result;
}
// 队列是否为空
bool StealMasterQueue::empty() const {
std::lock_guard<std::mutex> lk(m_mutex);
return m_queue.empty();
}
} // namespace hku

View File

@ -1,54 +0,0 @@
/*
* StealMasterQueue.h
*
* Copyright (c) hikyuu.org
*
* Created on: 2020-4-27
* Author: fasiondog
*/
#pragma once
#ifndef HIKYUU_UTILITIES_TASK_STEAL_MASTER_QUEUE_H
#define HIKYUU_UTILITIES_TASK_STEAL_MASTER_QUEUE_H
#include <queue>
#include <mutex>
#include "StealTaskBase.h"
namespace hku {
/**
* 线
* @ingroup TaskGroup
*/
class StealMasterQueue {
public:
StealMasterQueue() = default;
/**
*
* @param item
*/
void push(const StealTaskPtr& item);
/**
* 线线
* @return
*/
StealTaskPtr wait_and_pop();
/** 尝试从队列头部弹出任务,如果队列为空则弹出空指针,不阻塞线程 */
StealTaskPtr try_pop();
/** 队列是否为空 */
bool empty() const;
private:
mutable std::mutex m_mutex;
std::queue<StealTaskPtr> m_queue;
std::condition_variable m_cond;
};
} /* namespace hku */
#endif /* HIKYUU_UTILITIES_TASK_STEAL_MASTER_QUEUE_H */

View File

@ -1,58 +0,0 @@
/*
* StealRunnerQueue.h
*
* Copyright (c) hikyuu.org
*
* Created on: 2020-4-27
* Author: fasiondog
*/
#include "StealRunnerQueue.h"
namespace hku {
/* 将数据插入队列头部 */
void StealRunnerQueue::push_front(const StealTaskPtr& task) {
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push_front(task);
}
/* 将数据插入队列尾部 */
void StealRunnerQueue::push_back(const StealTaskPtr& task) {
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push_back(task);
}
/* 队列是否为空 */
bool StealRunnerQueue::empty() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_queue.empty();
}
/* 尝试从队列头部弹出一条数数据, 如果失败返回空指针 */
StealTaskPtr StealRunnerQueue::try_pop() {
std::lock_guard<std::mutex> lock(m_mutex);
StealTaskPtr result;
if (m_queue.empty()) {
return result;
}
result = m_queue.front();
m_queue.pop_front();
return result;
}
/* 尝试从队列尾部偷取一条数据,失败返回空指针 */
StealTaskPtr StealRunnerQueue::try_steal() {
std::lock_guard<std::mutex> lock(m_mutex);
StealTaskPtr result;
if (m_queue.empty()) {
return result;
}
result = m_queue.back();
m_queue.pop_back();
return result;
}
} /* namespace hku */

View File

@ -1,75 +0,0 @@
/*
* StealRunnerQueue.h
*
* Copyright (c) hikyuu.org
*
* Created on: 2020-4-26
* Author: fasiondog
*/
#pragma once
#ifndef HIKYUU_UTILITIES_TASK_STEAL_RUNNER_QUEUE_H
#define HIKYUU_UTILITIES_TASK_STEAL_RUNNER_QUEUE_H
#include <deque>
#include <mutex>
#include "StealTaskBase.h"
namespace hku {
/**
* 线
* @ingroup TaskGroup
*/
class StealRunnerQueue {
public:
/** 构造函数 */
StealRunnerQueue() = default;
~StealRunnerQueue() = default;
// 禁用赋值构造和赋值重载
StealRunnerQueue(const StealRunnerQueue& other) = delete;
StealRunnerQueue& operator=(const StealRunnerQueue& other) = delete;
/**
*
* @param task
*/
void push_front(const StealTaskPtr& task);
/**
*
* @param task
*/
void push_back(const StealTaskPtr& task);
/** 队列是否为空 */
bool empty() const;
/** 队列当前大小 */
size_t size() const {
return m_queue.size();
}
/**
*
* @param res
* @return
*/
StealTaskPtr try_pop();
/**
*
* @param res
* @return
*/
StealTaskPtr try_steal();
private:
std::deque<StealTaskPtr> m_queue;
mutable std::mutex m_mutex;
};
} /* namespace hku */
#endif /* HIKYUU_UTILITIES_TASK_STEAL_RUNNER_QUEUE_H */

View File

@ -1,51 +0,0 @@
/*
* StealTaskBase.cpp
*
* Created on: 2010-3-19
* Author: fasiondog
*/
#include <iostream>
#include "../../Log.h"
#include "../exception.h"
#include "StealTaskBase.h"
#include "StealTaskRunner.h"
#include "StealTaskGroup.h"
namespace hku {
StealTaskBase::StealTaskBase() : m_done(false) {}
StealTaskBase::~StealTaskBase() {}
void StealTaskBase::join() {
if (m_done) {
return;
}
if (StealTaskRunner::m_local_runner) {
// 当前在子线程中
StealTaskRunner::m_local_runner->taskJoin(shared_from_this());
} else {
// 当前在主线程中
HKU_CHECK(m_group, "This taks had not be added to any task group!");
m_group->taskJoinInMaster(shared_from_this());
}
}
void StealTaskBase::invoke() {
try {
run();
} catch (std::exception& e) {
HKU_ERROR(e.what());
} catch (...) {
HKU_ERROR("Unknown error in task running!");
}
m_done = true;
}
void StealTaskBase::run() {
HKU_WARN("This is empty task!");
}
} // namespace hku

View File

@ -1,88 +0,0 @@
/*
* StealTaskBase.h
*
* Created on: 2010-3-19
* Author: fasiondog
*/
#pragma once
#ifndef STEALTASKBASE_H_
#define STEALTASKBASE_H_
#ifndef HKU_API
#define HKU_API
#endif
#include <thread>
#include <memory>
namespace hku {
class StealTaskRunner;
class StealTaskGroup;
/**
*
* @ingroup TaskGroup
*/
class HKU_API StealTaskBase : public std::enable_shared_from_this<StealTaskBase> {
friend class StealTaskRunner;
friend class StealTaskGroup;
public:
StealTaskBase();
virtual ~StealTaskBase();
/**
*
*/
virtual void run();
/**
*
*/
bool done() const {
return m_done;
}
/**
*
*/
void join();
private:
// StealTaskRunner 实际执行任务
void invoke();
void setTaskGroup(StealTaskGroup *group) {
m_group = group;
}
private:
mutable bool m_done; // 标记该任务是否已执行完毕
StealTaskGroup *m_group; // 任务组指针
};
typedef std::shared_ptr<StealTaskBase> StealTaskPtr;
typedef std::vector<StealTaskPtr> StealTaskList;
typedef std::shared_ptr<StealTaskList> StealTaskListPtr;
typedef StealTaskBase TaskBase;
typedef StealTaskPtr TaskPtr;
typedef StealTaskList TaskList;
typedef StealTaskListPtr TaskListPtr;
/**
* 线
*/
class StopTask : public StealTaskBase {
public:
StopTask() {}
virtual ~StopTask() {}
void run(){};
};
} // namespace hku
#endif /* STEALTASKBASE_H_ */

View File

@ -1,152 +0,0 @@
/*
* StealTaskGroup.cpp
*
* Created on: 2010-3-19
* Author: fasiondog
*/
#include <iostream>
#include "../../Log.h"
#include "../exception.h"
#include "StealTaskGroup.h"
namespace hku {
StealTaskGroup::StealTaskGroup(size_t groupSize) : m_currentRunnerId(0), m_done(false) {
m_runnerNum = (groupSize != 0) ? groupSize : std::thread::hardware_concurrency();
m_runnerList.reserve(m_runnerNum);
m_master_queue = std::make_shared<StealMasterQueue>();
// 创建工作线程队列
for (auto i = 0; i < m_runnerNum; i++) {
m_runner_queues.push_back(std::make_shared<StealRunnerQueue>());
}
// 创建工作线程
for (size_t i = 0; i < m_runnerNum; i++) {
m_runnerList.push_back(std::make_shared<StealTaskRunner>(this, i));
}
// 启动各个工作线程
for (auto runnerIter = m_runnerList.begin(); runnerIter != m_runnerList.end(); ++runnerIter) {
(*runnerIter)->start();
}
}
StealTaskGroup::~StealTaskGroup() {
if (!m_done) {
join();
}
}
StealTaskRunnerPtr StealTaskGroup::getCurrentRunner() {
StealTaskRunnerPtr result = m_runnerList[m_currentRunnerId];
m_currentRunnerId++;
if (m_currentRunnerId >= m_runnerNum) {
m_currentRunnerId = 0;
}
return result;
}
StealTaskPtr StealTaskGroup::addTask(const StealTaskPtr& task, bool inMain) {
HKU_CHECK(task, "Input task is nullptr!");
task->setTaskGroup(this);
// 如果指定不在主线程且当前线程有效时,插入到当前线性任务队列头部
// 否则,放入主线程队列
if (!inMain && StealTaskRunner::m_local_queue) {
StealTaskRunner::m_local_queue->push_front(task);
} else {
m_master_queue->push(task);
}
return task;
}
void StealTaskGroup::join() {
if (m_done) {
return;
}
// 向主任务队列插入“停止”任务
std::vector<StealTaskPtr> stopTaskList;
for (auto i = 0; i < m_runnerNum; i++) {
auto stopTask = std::make_shared<StopTask>();
m_master_queue->push(stopTask);
}
// 等待“停止”任务被执行
for (auto& task : stopTaskList) {
task->join();
}
m_done = true;
RunnerList::iterator runnerIter;
for (runnerIter = m_runnerList.begin(); runnerIter != m_runnerList.end(); ++runnerIter) {
(*runnerIter)->join();
}
}
void StealTaskGroup::stop() {
if (m_done) {
return;
}
// 指示各子线程需要停止
for (auto runnerIter = m_runnerList.begin(); runnerIter != m_runnerList.end(); ++runnerIter) {
(*runnerIter)->stop();
}
// 同时向主任务队列插入任务停止标识,以便唤醒休眠状态的子线程
for (auto i = 0; i < m_runnerNum; i++) {
m_master_queue->push(std::make_shared<StopTask>());
}
m_done = true;
}
void StealTaskGroup::taskJoinInMaster(const StealTaskPtr& waitingFor) {
while (waitingFor && !waitingFor->done()) {
auto task = m_master_queue->try_pop();
if (task && !task->done()) {
task->invoke();
} else {
stealInMaster(waitingFor);
}
std::this_thread::yield();
}
}
void StealTaskGroup::stealInMaster(const StealTaskPtr& waitingFor) {
StealTaskPtr task;
#ifdef _WIN32
std::srand((unsigned)time(NULL));
#else
struct timeval tstart;
struct timezone tz;
gettimeofday(&tstart, &tz);
size_t temp = tstart.tv_usec;
std::srand(temp);
#endif
size_t total = m_runnerNum;
size_t ran_num = std::rand() % total;
for (size_t i = 0; i < total; i++) {
if (waitingFor && waitingFor->done()) {
return;
}
task = m_runner_queues[ran_num]->try_steal();
if (task && !task->done()) {
task->invoke();
return;
}
ran_num = (ran_num + 1) % total;
}
}
} // namespace hku

View File

@ -1,85 +0,0 @@
/*
* StealTaskGroup.h
*
* Created on: 2010-3-19
* Author: fasiondog
*/
#pragma once
#ifndef HKU_UTILITIES_STEALTASKGROUP_H_
#define HKU_UTILITIES_STEALTASKGROUP_H_
#include "StealMasterQueue.h"
#include "StealTaskRunner.h"
#include "StealRunnerQueue.h"
namespace hku {
/**
*
* @ingroup TaskGroup
*/
class HKU_API StealTaskGroup {
friend class StealTaskRunner;
public:
/**
*
* @param taskCount -
* @param groupSize - 线 0CPU数量的线程数
* @return
*/
StealTaskGroup(size_t groupSize = 0);
/**
*
*/
virtual ~StealTaskGroup();
/**
* 线
*/
size_t size() const {
return m_runnerNum;
}
StealTaskRunnerPtr getCurrentRunner();
//增加一个任务
StealTaskPtr addTask(const StealTaskPtr& task, bool inMain = true);
//所有任务都已加入指示
void stop();
//强制终止
// void cancel();
//等待执行结束
void join();
bool done() const {
return m_done;
}
void taskJoinInMaster(const StealTaskPtr& waitingFor);
void stealInMaster(const StealTaskPtr& waitingFor);
private:
typedef std::vector<StealTaskRunnerPtr> RunnerList;
RunnerList m_runnerList;
size_t m_runnerNum;
size_t m_currentRunnerId; //记录当前执行addTask任务时需放入的TaskRunnerId用于均衡任务分配
bool m_done; // 任务组执行结束标志
std::shared_ptr<StealMasterQueue> m_master_queue; // 主任务队列
std::vector<std::shared_ptr<StealRunnerQueue>> m_runner_queues; // 任务队列(每个工作线程一个)
};
typedef std::shared_ptr<StealTaskGroup> StealTaskGroupPtr;
typedef StealTaskGroup TaskGroup;
typedef StealTaskGroupPtr TaskGroupPtr;
} // namespace hku
#endif /* HKU_UTILITIES_STEALTASKGROUP_H_ */

View File

@ -1,132 +0,0 @@
/*
* StealTaskRunner.cpp
*
* Created on: 2010-3-19
* Author: fasiondog
*/
#ifdef _WIN32
#include <ctime>
#endif
#include <functional>
#include <iostream>
#include "../../Log.h"
#include "StealTaskRunner.h"
#include "StealTaskGroup.h"
#define QUEUE_LOCK std::lock_guard<std::mutex> lock(m_queue_mutex);
namespace hku {
StealTaskRunner::StealTaskRunner(StealTaskGroup* group, size_t id) : m_done(false) {
m_index = id;
m_group = group;
}
StealTaskRunner::~StealTaskRunner() {}
// 尝试从自己的任务队列中提取一个任务,供自己执行
StealTaskPtr StealTaskRunner::takeTaskFromLocal() {
return m_local_queue->try_pop();
}
// 阻塞等待直至从主线程任务队列中获取到任务
StealTaskPtr StealTaskRunner::takeTaskFromMasterAndWait() {
return m_local_group->m_master_queue->wait_and_pop();
}
// 尝试从其他子线程任务队列中偷取任务
StealTaskPtr StealTaskRunner::takeTaskFromOther() {
StealTaskPtr task;
auto total = m_local_group->m_runnerNum;
for (size_t i = 0; i < total; ++i) {
size_t index = (m_local_index + i + 1) % total;
task = m_local_group->m_runner_queues[index]->try_steal();
if (task && typeid(*task) != typeid(StopTask)) {
return task;
}
}
return task;
}
/**
* 线
*/
void StealTaskRunner::start() {
m_thread = std::thread(std::bind(&StealTaskRunner::run, this));
}
/**
* 线
*/
void StealTaskRunner::join() {
if (m_thread.joinable()) {
m_thread.join();
}
}
void StealTaskRunner::stop() {
m_done = true;
}
/**
* 线
*/
void StealTaskRunner::run() {
m_thread_id = std::this_thread::get_id();
m_local_group = m_group;
m_local_runner = this;
m_local_queue = m_local_group->m_runner_queues[m_index].get();
m_local_index = m_index;
m_locla_need_stop = false;
StealTaskPtr task;
try {
while (!m_done && (!task || typeid(*task) != typeid(StopTask))) {
// 从本地队列中获取待执行任务
task = takeTaskFromLocal();
// 如果本地队列中没有取到任务,则尝试从其他子线程队列中偷取任务
if (!task) {
task = takeTaskFromOther();
}
// 如果本地和其他子线程任务队列中都无法获取任务,则等待并直至从主任务队列中获取任务
if (!task) {
task = takeTaskFromMasterAndWait();
}
if (task && !task->done()) {
task->invoke();
}
std::this_thread::yield();
}
} catch (std::exception& e) {
HKU_ERROR(e.what());
} catch (...) {
HKU_ERROR("Unknown error!");
}
}
/**
* 线
* @param waitingFor -
*/
void StealTaskRunner::taskJoin(const StealTaskPtr& waitingFor) {
while (waitingFor && !waitingFor->done()) {
// 如果获取的任务有效且任务未执行,则执行该任务;
// 否则从其他子线程任务队列中进行偷取
auto task = takeTaskFromLocal();
if (!task) {
task = takeTaskFromOther();
}
if (task && !task->done()) {
task->invoke();
}
std::this_thread::yield();
}
}
} // namespace hku

View File

@ -1,81 +0,0 @@
/*
* StealTaskRunner.h
*
* Created on: 2010-3-19
* Author: fasiondog
*/
#pragma once
#ifndef STEALTASKRUNNER_H_
#define STEALTASKRUNNER_H_
#include <thread>
#include <mutex>
#include <atomic>
#include <deque>
#include "StealTaskBase.h"
#include "StealRunnerQueue.h"
namespace hku {
class StealTaskGroup;
/**
*
* @ingroup TaskGroup
*/
class StealTaskRunner {
friend class StealTaskGroup;
friend class StealTaskBase;
public:
/**
* 线
* @param group
* @param id id
* @param stopTask 线
*/
StealTaskRunner(StealTaskGroup* group, size_t id);
virtual ~StealTaskRunner();
private:
StealTaskPtr takeTaskFromLocal(); // 从自己的任务队列中获取任务
StealTaskPtr takeTaskFromMasterAndWait(); // 等待直至从主线程队列中获取任务
StealTaskPtr takeTaskFromOther(); // 从其他线程队列偷取任务
void start();
void join();
void run();
void taskJoin(const StealTaskPtr& waitingFor);
void stop();
StealTaskGroup* getTaskRunnerGroup() {
return m_group;
}
std::thread::id get_thread_id() const {
return m_thread_id;
}
private:
size_t m_index; // 表示在任务组中的第几个线程
StealTaskGroup* m_group; // 所属任务组的指针
inline static thread_local StealRunnerQueue* m_local_queue = nullptr; //本地任务队列
inline static thread_local StealTaskGroup* m_local_group = nullptr; //任务组指针
inline static thread_local StealTaskRunner* m_local_runner = nullptr; // 记录自身runner指针
inline static thread_local size_t m_local_index = 0; // 在任务组中的序号(m_index)
inline static thread_local bool m_locla_need_stop = false; // 线程停止运行指示
std::atomic_bool m_done; //// 线程终止指示
std::thread m_thread; // 本地工作线程
std::thread::id m_thread_id; // 线程id
};
typedef std::shared_ptr<StealTaskRunner> StealTaskRunnerPtr;
} // namespace hku
#endif /* STEALTASKRUNNER_H_ */

View File

@ -1,60 +0,0 @@
/*
* test_Parameter.cpp
*
* Created on: 2020-4-26
* Author: fasiondog
*/
#include "doctest/doctest.h"
#include <hikyuu/Log.h>
#include <hikyuu/utilities/SpendTimer.h>
#include <hikyuu/utilities/task/StealTaskBase.h>
#include <hikyuu/utilities/task/StealTaskGroup.h>
using namespace hku;
/**
* @defgroup test_hikyuu_TaskGroup test_hikyuu_TaskGroup
* @ingroup test_hikyuu_utilities
* @{
*/
class TestTask : public TaskBase {
public:
TestTask(int i) : m_i(i) {}
virtual ~TestTask() = default;
virtual void run() {
HKU_INFO("{}: ------------------- [{}]", m_i, std::this_thread::get_id());
// fmt::print("{}: ----------------------\n", m_i);
}
private:
int m_i;
};
/** @par 检测点 */
TEST_CASE("test_TaskGroup") {
{
std::vector<TaskPtr> task_list;
{
SPEND_TIME(test_TaskGroup);
TaskGroup tg(8);
HKU_INFO("worker_num: {}", tg.size());
HKU_INFO("main thread: {}", std::this_thread::get_id());
for (int i = 0; i < 100; i++) {
task_list.push_back(tg.addTask(std::make_shared<TestTask>(i)));
// CHECK(!task_list[i]->done());
}
tg.join();
}
CHECK(task_list.size() == 100);
for (auto& task : task_list) {
CHECK(task->done());
}
}
// tg.run();
}
/** @} */

View File

@ -85,7 +85,6 @@ target("small-test")
end
-- add files
add_files("./hikyuu/utilities/test_TaskGroup.cpp");
add_files("./hikyuu/utilities/test_ThreadPool.cpp");
--add_files("./hikyuu/hikyuu/**.cpp");
--add_files("./hikyuu/hikyuu/test_StockManager.cpp");