优化 Strategy, SpotAgent 以便能用以多线程方式执行 strategy

This commit is contained in:
fasiondog 2024-08-20 00:38:00 +08:00
parent a86ba248df
commit b7068e7ed0
4 changed files with 74 additions and 61 deletions

View File

@ -171,68 +171,74 @@ void HKU_API startSpotAgent(bool print) {
agent.setPrintFlag(print);
const auto& preloadParam = sm.getPreloadParameter();
if (preloadParam.tryGet<bool>("min", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::MIN));
}
// 防止调用 stopSpotAgent 后重新 startSpotAgent
static std::atomic_bool g_init_spot_agent{false};
if (!g_init_spot_agent) {
const auto& preloadParam = sm.getPreloadParameter();
if (preloadParam.tryGet<bool>("min", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::MIN));
}
if (preloadParam.tryGet<bool>("day", false)) {
agent.addProcess(updateStockDayData);
}
if (preloadParam.tryGet<bool>("day", false)) {
agent.addProcess(updateStockDayData);
}
if (preloadParam.tryGet<bool>("week", false)) {
agent.addProcess(std::bind(updateStockDayUpData, std::placeholders::_1, KQuery::WEEK));
}
if (preloadParam.tryGet<bool>("week", false)) {
agent.addProcess(std::bind(updateStockDayUpData, std::placeholders::_1, KQuery::WEEK));
}
if (preloadParam.tryGet<bool>("month", false)) {
agent.addProcess(std::bind(updateStockDayUpData, std::placeholders::_1, KQuery::MONTH));
}
if (preloadParam.tryGet<bool>("month", false)) {
agent.addProcess(std::bind(updateStockDayUpData, std::placeholders::_1, KQuery::MONTH));
}
if (preloadParam.tryGet<bool>("quarter", false)) {
agent.addProcess(std::bind(updateStockDayUpData, std::placeholders::_1, KQuery::QUARTER));
}
if (preloadParam.tryGet<bool>("quarter", false)) {
agent.addProcess(
std::bind(updateStockDayUpData, std::placeholders::_1, KQuery::QUARTER));
}
if (preloadParam.tryGet<bool>("halfyear", false)) {
agent.addProcess(std::bind(updateStockDayUpData, std::placeholders::_1, KQuery::HALFYEAR));
}
if (preloadParam.tryGet<bool>("halfyear", false)) {
agent.addProcess(
std::bind(updateStockDayUpData, std::placeholders::_1, KQuery::HALFYEAR));
}
if (preloadParam.tryGet<bool>("year", false)) {
agent.addProcess(std::bind(updateStockDayUpData, std::placeholders::_1, KQuery::YEAR));
}
if (preloadParam.tryGet<bool>("year", false)) {
agent.addProcess(std::bind(updateStockDayUpData, std::placeholders::_1, KQuery::YEAR));
}
if (preloadParam.tryGet<bool>("min5", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::MIN5));
}
if (preloadParam.tryGet<bool>("min5", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::MIN5));
}
if (preloadParam.tryGet<bool>("min15", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::MIN15));
}
if (preloadParam.tryGet<bool>("min15", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::MIN15));
}
if (preloadParam.tryGet<bool>("min30", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::MIN30));
}
if (preloadParam.tryGet<bool>("min30", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::MIN30));
}
if (preloadParam.tryGet<bool>("min60", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::MIN60));
}
if (preloadParam.tryGet<bool>("min3", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::MIN3));
}
if (preloadParam.tryGet<bool>("min60", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::MIN60));
}
if (preloadParam.tryGet<bool>("min3", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::MIN3));
}
if (preloadParam.tryGet<bool>("hour2", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::HOUR2));
}
if (preloadParam.tryGet<bool>("hour2", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::HOUR2));
}
if (preloadParam.tryGet<bool>("hour4", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::HOUR4));
}
if (preloadParam.tryGet<bool>("hour4", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::HOUR4));
}
if (preloadParam.tryGet<bool>("hour6", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::HOUR6));
}
if (preloadParam.tryGet<bool>("hour6", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::HOUR6));
}
if (preloadParam.tryGet<bool>("hour12", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::HOUR12));
if (preloadParam.tryGet<bool>("hour12", false)) {
agent.addProcess(std::bind(updateStockMinData, std::placeholders::_1, KQuery::HOUR12));
}
}
agent.start();

View File

@ -215,21 +215,25 @@ void SpotAgent::work_thread() {
void SpotAgent::addProcess(std::function<void(const SpotRecord&)> process) {
HKU_CHECK(m_stop, "SpotAgent is running, please stop agent first!");
std::lock_guard<std::mutex> lock(m_mutex);
m_processList.push_back(process);
}
void SpotAgent::addPostProcess(std::function<void(Datetime)> func) {
HKU_CHECK(m_stop, "SpotAgent is running, please stop agent first!");
std::lock_guard<std::mutex> lock(m_mutex);
m_postProcessList.push_back(func);
}
void SpotAgent::clearProcessList() {
HKU_CHECK(m_stop, "SpotAgent is running, please stop agent first!");
std::lock_guard<std::mutex> lock(m_mutex);
m_processList.clear();
}
void SpotAgent::clearPostProcessList() {
HKU_CHECK(m_stop, "SpotAgent is running, please stop agent first!");
std::lock_guard<std::mutex> lock(m_mutex);
m_postProcessList.clear();
}

View File

@ -45,6 +45,7 @@ public:
/** 设置是否打印数据接收进展情况,主要用于在交互环境下关闭打印 */
void setPrintFlag(bool print) {
std::lock_guard<std::mutex> lock(m_mutex);
m_print = print;
}
@ -104,14 +105,18 @@ private:
enum STATUS { WAITING, RECEIVING }; // 等待新的批次数据,正在接收批次数据中
enum STATUS m_status = WAITING; // 当前内部状态
std::atomic_bool m_stop = true; // 结束代理工作标识
bool m_print = true; // 是否打印接收进度,防止的交互模式的影响
int m_revTimeout = 100; // 连接数据服务超时时长(毫秒)
size_t m_batch_count = 0; // 记录本次批次接收的数据数量
std::thread m_receiveThread; // 数据接收线程
ThreadPool m_tg; // 数据处理任务线程池
vector<std::future<void>> m_process_task_list;
// 下面属性被修改时需要加锁,以便可以使用多线程方式运行 strategy
std::mutex m_mutex;
bool m_print = true; // 是否打印接收进度,防止的交互模式的影响
list<std::function<void(const SpotRecord&)>> m_processList; // 已注册的 spot 处理函数列表
list<std::function<void(Datetime)>> m_postProcessList; // 已注册的批次后处理函数列表
vector<std::future<void>> m_process_task_list;
};
} // namespace hku

View File

@ -71,19 +71,17 @@ void Strategy::run() {
StockManager& sm = StockManager::instance();
// 非独立进程方式运行 Stratege 或 重复执行,则直接返回
if (sm.thread_id() == std::this_thread::get_id()) {
return;
// sm 尚未初始化,则初始化
if (sm.thread_id() == std::thread::id()) {
// 注册 ctrl-c 终止信号
std::signal(SIGINT, sig_handler);
CLS_INFO("{} is running! You can press Ctrl-C to terminte ...", m_name);
// 初始化
hikyuu_init(m_config_file, false, m_context);
}
// 注册 ctrl-c 终止信号
std::signal(SIGINT, sig_handler);
CLS_INFO("{} is running! You can press Ctrl-C to terminte ...", m_name);
// 初始化
hikyuu_init(m_config_file, false, m_context);
// 先将行情接收代理停止,以便后面加入处理函数
stopSpotAgent();