diff --git a/hikyuu_cpp/hikyuu/GlobalInitializer.cpp b/hikyuu_cpp/hikyuu/GlobalInitializer.cpp index 5da311f9..a01a3905 100644 --- a/hikyuu_cpp/hikyuu/GlobalInitializer.cpp +++ b/hikyuu_cpp/hikyuu/GlobalInitializer.cpp @@ -89,6 +89,14 @@ void GlobalInitializer::clean() { IndicatorImp::releaseDynEngine(); +#if !HKU_OS_WINDOWS + // 主动停止异步数据加载任务组,否则 hdf5 在 linux 下会报关闭异常 + auto *tg = StockManager::instance().getLoadTaskGroup(); + if (tg) { + tg->stop(); + } +#endif + #if HKU_ENABLE_LEAK_DETECT || defined(MSVC_LEAKER_DETECT) // 非内存泄漏检测时,内存让系统自动释放,避免某些场景下 windows 下退出速度过慢 StockManager::quit(); diff --git a/hikyuu_cpp/hikyuu/StockManager.cpp b/hikyuu_cpp/hikyuu/StockManager.cpp index 822e5513..d97f0b63 100644 --- a/hikyuu_cpp/hikyuu/StockManager.cpp +++ b/hikyuu_cpp/hikyuu/StockManager.cpp @@ -105,16 +105,23 @@ void StockManager::loadData() { std::chrono::system_clock::time_point start_time = std::chrono::system_clock::now(); m_data_ready = false; - loadAllHolidays(); + ThreadPool tg(2); + tg.submit([this]() { this->loadAllHolidays(); }); + tg.submit([this]() { this->loadHistoryFinanceField(); }); + tg.submit([this]() { this->loadAllStockTypeInfo(); }); + tg.submit([this]() { this->loadAllZhBond10(); }); + + // loadAllHolidays(); loadAllMarketInfos(); - loadAllStockTypeInfo(); + // loadAllStockTypeInfo(); loadAllStocks(); loadAllStockWeights(); - loadAllZhBond10(); - loadHistoryFinanceField(); + // loadAllZhBond10(); + // loadHistoryFinanceField(); HKU_INFO("Loading block..."); - m_blockDriver->load(); + // m_blockDriver->load(); + tg.submit([this]() { this->m_blockDriver->load(); }); // 获取K线数据驱动并预加载指定的数据 HKU_INFO("Loading KData..."); @@ -124,6 +131,8 @@ void StockManager::loadData() { // 加载K线及历史财务信息 loadAllKData(); + tg.join(); + std::chrono::duration sec = std::chrono::system_clock::now() - start_time; HKU_INFO("{:<.2f}s Loaded Data.", sec.count()); } @@ -171,14 +180,16 @@ void StockManager::loadAllKData() { } else { // 异步并行加载 std::thread t = std::thread([this, ktypes, low_ktypes]() { - ThreadPool tg(std::thread::hardware_concurrency()); + this->m_load_tg = std::make_unique(); + // ThreadPool tg(std::thread::hardware_concurrency()); for (size_t i = 0, len = ktypes.size(); i < len; i++) { std::lock_guard lock(*m_stockDict_mutex); for (auto iter = m_stockDict.begin(); iter != m_stockDict.end(); ++iter) { if (m_preloadParam.tryGet(low_ktypes[i], false)) { - tg.submit([stk = iter->second, ktype = std::move(ktypes[i])]() mutable { - stk.loadKDataToBuffer(ktype); - }); + m_load_tg->submit( + [stk = iter->second, ktype = std::move(ktypes[i])]() mutable { + stk.loadKDataToBuffer(ktype); + }); } } } @@ -186,11 +197,12 @@ void StockManager::loadAllKData() { if (m_hikyuuParam.tryGet("load_history_finance", true)) { std::lock_guard lock(*m_stockDict_mutex); for (auto iter = m_stockDict.begin(); iter != m_stockDict.end(); ++iter) { - tg.submit([stk = iter->second]() { stk.getHistoryFinance(); }); + m_load_tg->submit([stk = iter->second]() { stk.getHistoryFinance(); }); } } - tg.join(); + m_load_tg->join(); + m_load_tg.reset(); m_data_ready = true; }); t.detach(); diff --git a/hikyuu_cpp/hikyuu/StockManager.h b/hikyuu_cpp/hikyuu/StockManager.h index 33a78103..38b7136b 100644 --- a/hikyuu_cpp/hikyuu/StockManager.h +++ b/hikyuu_cpp/hikyuu/StockManager.h @@ -10,8 +10,9 @@ #include #include -#include "utilities/Parameter.h" -#include "data_driver/DataDriverFactory.h" +#include "hikyuu/utilities/Parameter.h" +#include "hikyuu/utilities/thread/thread.h" +#include "hikyuu/data_driver/DataDriverFactory.h" #include "Block.h" #include "MarketInfo.h" #include "StockTypeInfo.h" @@ -218,6 +219,11 @@ public: return m_thread_id; } + /** 仅由程序退出使使用!!! */ + ThreadPool* getLoadTaskGroup() { + return m_load_tg.get(); + } + public: typedef StockMapIterator const_iterator; const_iterator begin() const { @@ -293,6 +299,8 @@ private: Parameter m_preloadParam; Parameter m_hikyuuParam; StrategyContext m_context; + + std::unique_ptr m_load_tg; // 异步数据加载辅助线程组 }; inline size_t StockManager::size() const { diff --git a/hikyuu_cpp/hikyuu/global/agent/SpotAgent.cpp b/hikyuu_cpp/hikyuu/global/agent/SpotAgent.cpp index 7c2eb084..818a757a 100644 --- a/hikyuu_cpp/hikyuu/global/agent/SpotAgent.cpp +++ b/hikyuu_cpp/hikyuu/global/agent/SpotAgent.cpp @@ -25,14 +25,18 @@ const size_t SpotAgent::ms_endTagLength = strlen(SpotAgent::ms_endTag); Datetime SpotAgent::ms_start_rev_time; -void SpotAgent::setQuotationServer(const string& server) { - ms_pubUrl = server; +SpotAgent::SpotAgent() { + m_tg = std::make_unique(); } SpotAgent::~SpotAgent() { stop(); } +void SpotAgent::setQuotationServer(const string& server) { + ms_pubUrl = server; +} + void SpotAgent::start() { stop(); if (m_stop) { @@ -138,7 +142,7 @@ void SpotAgent::parseSpotData(const void* buf, size_t buf_len) { auto spot_record = parseFlatSpot(spot); if (spot_record) { for (const auto& process : m_processList) { - m_process_task_list.emplace_back(m_tg.submit(ProcessTask(process, *spot_record))); + m_process_task_list.emplace_back(m_tg->submit(ProcessTask(process, *spot_record))); } } } diff --git a/hikyuu_cpp/hikyuu/global/agent/SpotAgent.h b/hikyuu_cpp/hikyuu/global/agent/SpotAgent.h index cafc4b4e..aaa703b3 100644 --- a/hikyuu_cpp/hikyuu/global/agent/SpotAgent.h +++ b/hikyuu_cpp/hikyuu/global/agent/SpotAgent.h @@ -27,7 +27,7 @@ namespace hku { */ class HKU_API SpotAgent { public: - SpotAgent() = default; + SpotAgent(); /** 析构函数 */ virtual ~SpotAgent(); @@ -109,7 +109,8 @@ private: int m_revTimeout = 100; // 连接数据服务超时时长(毫秒) size_t m_batch_count = 0; // 记录本次批次接收的数据数量 std::thread m_receiveThread; // 数据接收线程 - ThreadPool m_tg; // 数据处理任务线程池 + // ThreadPool m_tg; // 数据处理任务线程池 + std::unique_ptr m_tg; vector> m_process_task_list; // 下面属性被修改时需要加锁,以便可以使用多线程方式运行 strategy