fixed 新异步加载linux下退出时hdf5关闭异常;其他小调整

This commit is contained in:
fasiondog 2024-08-31 01:31:25 +08:00
parent d360deb079
commit 9038473eb8
5 changed files with 51 additions and 18 deletions

View File

@ -89,6 +89,14 @@ void GlobalInitializer::clean() {
IndicatorImp::releaseDynEngine();
#if !HKU_OS_WINDOWS
// 主动停止异步数据加载任务组,否则 hdf5 在 linux 下会报关闭异常
auto *tg = StockManager::instance().getLoadTaskGroup();
if (tg) {
tg->stop();
}
#endif
#if HKU_ENABLE_LEAK_DETECT || defined(MSVC_LEAKER_DETECT)
// 非内存泄漏检测时,内存让系统自动释放,避免某些场景下 windows 下退出速度过慢
StockManager::quit();

View File

@ -105,16 +105,23 @@ void StockManager::loadData() {
std::chrono::system_clock::time_point start_time = std::chrono::system_clock::now();
m_data_ready = false;
loadAllHolidays();
ThreadPool tg(2);
tg.submit([this]() { this->loadAllHolidays(); });
tg.submit([this]() { this->loadHistoryFinanceField(); });
tg.submit([this]() { this->loadAllStockTypeInfo(); });
tg.submit([this]() { this->loadAllZhBond10(); });
// loadAllHolidays();
loadAllMarketInfos();
loadAllStockTypeInfo();
// loadAllStockTypeInfo();
loadAllStocks();
loadAllStockWeights();
loadAllZhBond10();
loadHistoryFinanceField();
// loadAllZhBond10();
// loadHistoryFinanceField();
HKU_INFO("Loading block...");
m_blockDriver->load();
// m_blockDriver->load();
tg.submit([this]() { this->m_blockDriver->load(); });
// 获取K线数据驱动并预加载指定的数据
HKU_INFO("Loading KData...");
@ -124,6 +131,8 @@ void StockManager::loadData() {
// 加载K线及历史财务信息
loadAllKData();
tg.join();
std::chrono::duration<double> sec = std::chrono::system_clock::now() - start_time;
HKU_INFO("{:<.2f}s Loaded Data.", sec.count());
}
@ -171,14 +180,16 @@ void StockManager::loadAllKData() {
} else {
// 异步并行加载
std::thread t = std::thread([this, ktypes, low_ktypes]() {
ThreadPool tg(std::thread::hardware_concurrency());
this->m_load_tg = std::make_unique<ThreadPool>();
// ThreadPool tg(std::thread::hardware_concurrency());
for (size_t i = 0, len = ktypes.size(); i < len; i++) {
std::lock_guard<std::mutex> lock(*m_stockDict_mutex);
for (auto iter = m_stockDict.begin(); iter != m_stockDict.end(); ++iter) {
if (m_preloadParam.tryGet<bool>(low_ktypes[i], false)) {
tg.submit([stk = iter->second, ktype = std::move(ktypes[i])]() mutable {
stk.loadKDataToBuffer(ktype);
});
m_load_tg->submit(
[stk = iter->second, ktype = std::move(ktypes[i])]() mutable {
stk.loadKDataToBuffer(ktype);
});
}
}
}
@ -186,11 +197,12 @@ void StockManager::loadAllKData() {
if (m_hikyuuParam.tryGet<bool>("load_history_finance", true)) {
std::lock_guard<std::mutex> lock(*m_stockDict_mutex);
for (auto iter = m_stockDict.begin(); iter != m_stockDict.end(); ++iter) {
tg.submit([stk = iter->second]() { stk.getHistoryFinance(); });
m_load_tg->submit([stk = iter->second]() { stk.getHistoryFinance(); });
}
}
tg.join();
m_load_tg->join();
m_load_tg.reset();
m_data_ready = true;
});
t.detach();

View File

@ -10,8 +10,9 @@
#include <mutex>
#include <thread>
#include "utilities/Parameter.h"
#include "data_driver/DataDriverFactory.h"
#include "hikyuu/utilities/Parameter.h"
#include "hikyuu/utilities/thread/thread.h"
#include "hikyuu/data_driver/DataDriverFactory.h"
#include "Block.h"
#include "MarketInfo.h"
#include "StockTypeInfo.h"
@ -218,6 +219,11 @@ public:
return m_thread_id;
}
/** 仅由程序退出使使用!!! */
ThreadPool* getLoadTaskGroup() {
return m_load_tg.get();
}
public:
typedef StockMapIterator const_iterator;
const_iterator begin() const {
@ -293,6 +299,8 @@ private:
Parameter m_preloadParam;
Parameter m_hikyuuParam;
StrategyContext m_context;
std::unique_ptr<ThreadPool> m_load_tg; // 异步数据加载辅助线程组
};
inline size_t StockManager::size() const {

View File

@ -25,14 +25,18 @@ const size_t SpotAgent::ms_endTagLength = strlen(SpotAgent::ms_endTag);
Datetime SpotAgent::ms_start_rev_time;
void SpotAgent::setQuotationServer(const string& server) {
ms_pubUrl = server;
SpotAgent::SpotAgent() {
m_tg = std::make_unique<ThreadPool>();
}
SpotAgent::~SpotAgent() {
stop();
}
void SpotAgent::setQuotationServer(const string& server) {
ms_pubUrl = server;
}
void SpotAgent::start() {
stop();
if (m_stop) {
@ -138,7 +142,7 @@ void SpotAgent::parseSpotData(const void* buf, size_t buf_len) {
auto spot_record = parseFlatSpot(spot);
if (spot_record) {
for (const auto& process : m_processList) {
m_process_task_list.emplace_back(m_tg.submit(ProcessTask(process, *spot_record)));
m_process_task_list.emplace_back(m_tg->submit(ProcessTask(process, *spot_record)));
}
}
}

View File

@ -27,7 +27,7 @@ namespace hku {
*/
class HKU_API SpotAgent {
public:
SpotAgent() = default;
SpotAgent();
/** 析构函数 */
virtual ~SpotAgent();
@ -109,7 +109,8 @@ private:
int m_revTimeout = 100; // 连接数据服务超时时长(毫秒)
size_t m_batch_count = 0; // 记录本次批次接收的数据数量
std::thread m_receiveThread; // 数据接收线程
ThreadPool m_tg; // 数据处理任务线程池
// ThreadPool m_tg; // 数据处理任务线程池
std::unique_ptr<ThreadPool> m_tg;
vector<std::future<void>> m_process_task_list;
// 下面属性被修改时需要加锁,以便可以使用多线程方式运行 strategy