/* * StockManager.cpp * * Created on: 2011-11-9 * Author: fasiondog */ #ifndef NOMINMAX #define NOMINMAX #endif #include "GlobalInitializer.h" #include #include #include #include #include "utilities/IniParser.h" #include "utilities/util.h" #include "utilities/thread/ThreadPool.h" #include "StockManager.h" #include "global/GlobalTaskGroup.h" #include "global/schedule/inner_tasks.h" #include "data_driver/kdata/cvs/KDataTempCsvDriver.h" #include "data_driver/base_info/sqlite/SQLiteBaseInfoDriver.h" #include "data_driver/base_info/mysql/MySQLBaseInfoDriver.h" #include "data_driver/block_info/qianlong/QLBlockInfoDriver.h" #include "data_driver/kdata/hdf5/H5KDataDriver.h" #include "data_driver/kdata/tdx/TdxKDataDriver.h" #include "data_driver/kdata/mysql/MySQLKDataDriver.h" namespace hku { StockManager* StockManager::m_sm = nullptr; void StockManager::quit() { if (m_sm) { delete m_sm; m_sm = nullptr; } } StockManager::StockManager() : m_initializing(false) { m_stockDict_mutex = new std::mutex; m_marketInfoDict_mutex = new std::mutex; m_stockTypeInfo_mutex = new std::mutex; m_holidays_mutex = new std::mutex; } StockManager::~StockManager() { delete m_stockDict_mutex; delete m_marketInfoDict_mutex; delete m_stockTypeInfo_mutex; delete m_holidays_mutex; fmt::print("Quit Hikyuu system!\n\n"); } StockManager& StockManager::instance() { if (!m_sm) { m_sm = new StockManager(); } return (*m_sm); } Parameter default_preload_param() { Parameter param; param.set("day", true); param.set("week", false); param.set("month", false); param.set("quarter", false); param.set("halfyear", false); param.set("year", false); param.set("min", false); param.set("min5", false); param.set("min15", false); param.set("min30", false); param.set("min60", false); param.set("ticks", false); param.set("day_max", 100000); param.set("week_max", 100000); param.set("month_max", 100000); param.set("quarter_max", 100000); param.set("halfyear_max", 100000); param.set("year_max", 100000); param.set("min_max", 5120); param.set("min5_max", 5120); param.set("min15_max", 5120); param.set("min30_max", 5120); param.set("min60_max", 5120); param.set("ticks_max", 5120); return param; } Parameter default_other_param() { Parameter param; param.set("tmpdir", "."); param.set("logger", ""); return param; } void StockManager::init(const Parameter& baseInfoParam, const Parameter& blockParam, const Parameter& kdataParam, const Parameter& preloadParam, const Parameter& hikyuuParam, const StrategyContext& context) { HKU_WARN_IF_RETURN(m_initializing, void(), "The last initialization has not finished. Please try again later!"); m_initializing = true; m_thread_id = std::this_thread::get_id(); m_baseInfoDriverParam = baseInfoParam; m_blockDriverParam = blockParam; m_kdataDriverParam = kdataParam; m_preloadParam = preloadParam; m_hikyuuParam = hikyuuParam; m_context = context; // 获取路径信息 m_tmpdir = hikyuuParam.tryGet("tmpdir", "."); m_datadir = hikyuuParam.tryGet("datadir", "."); m_stockDict.clear(); m_marketInfoDict.clear(); m_stockTypeInfo.clear(); string funcname(" [StockManager::init]"); //加载证券基本信息 m_baseInfoDriver = DataDriverFactory::getBaseInfoDriver(baseInfoParam); HKU_CHECK(m_baseInfoDriver, "Failed get base info driver!"); loadAllHolidays(); loadAllMarketInfos(); loadAllStockTypeInfo(); loadAllStocks(); loadAllStockWeights(); //获取板块驱动 m_blockDriver = DataDriverFactory::getBlockDriver(blockParam); //获取K线数据驱动并预加载指定的数据 HKU_INFO("Loading KData..."); std::chrono::system_clock::time_point start_time = std::chrono::system_clock::now(); setKDataDriver(DataDriverFactory::getKDataDriverPool(m_kdataDriverParam)); // add special Market, for temp csv file m_marketInfoDict["TMP"] = MarketInfo("TMP", "Temp Csv file", "temp load from csv file", "000001", Null(), TimeDelta(0), TimeDelta(0), TimeDelta(0), TimeDelta(0)); std::chrono::duration sec = std::chrono::system_clock::now() - start_time; HKU_INFO("{:<.2f}s Loaded Data.", sec.count()); m_initializing = false; } void StockManager::setKDataDriver(const KDataDriverConnectPoolPtr& driver) { HKU_ERROR_IF_RETURN(!driver, void(), "kdata driver is null!"); if (m_kdataDriverParam != driver->getPrototype()->getParameter()) { m_kdataDriverParam = driver->getPrototype()->getParameter(); } bool preload_day = m_preloadParam.tryGet("day", false); HKU_INFO_IF(preload_day, "Preloading all day kdata to buffer!"); bool preload_week = m_preloadParam.tryGet("week", false); HKU_INFO_IF(preload_week, "Preloading all week kdata to buffer!"); bool preload_month = m_preloadParam.tryGet("month", false); HKU_INFO_IF(preload_month, "Preloading all month kdata to buffer!"); bool preload_quarter = m_preloadParam.tryGet("quarter", false); HKU_INFO_IF(preload_quarter, "Preloading all quarter kdata to buffer!"); bool preload_halfyear = m_preloadParam.tryGet("halfyear", false); HKU_INFO_IF(preload_halfyear, "Preloading all halfyear kdata to buffer!"); bool preload_year = m_preloadParam.tryGet("year", false); HKU_INFO_IF(preload_year, "Preloading all year kdata to buffer!"); bool preload_min = m_preloadParam.tryGet("min", false); HKU_INFO_IF(preload_min, "Preloading all 1 min kdata to buffer!"); bool preload_min5 = m_preloadParam.tryGet("min5", false); HKU_INFO_IF(preload_min5, "Preloading all 5 min kdata to buffer!"); bool preload_min15 = m_preloadParam.tryGet("min15", false); HKU_INFO_IF(preload_min15, "Preloading all 15 min kdata to buffer!"); bool preload_min30 = m_preloadParam.tryGet("min30", false); HKU_INFO_IF(preload_min30, "Preloading all 30 min kdata to buffer!"); bool preload_min60 = m_preloadParam.tryGet("min60", false); HKU_INFO_IF(preload_min60, "Preloading all 60 min kdata to buffer!"); if (!driver->getPrototype()->canParallelLoad()) { for (auto iter = m_stockDict.begin(); iter != m_stockDict.end(); ++iter) { if (iter->second.market() == "TMP") continue; iter->second.setKDataDriver(driver); if (preload_day) iter->second.loadKDataToBuffer(KQuery::DAY); if (preload_week) iter->second.loadKDataToBuffer(KQuery::WEEK); if (preload_month) iter->second.loadKDataToBuffer(KQuery::MONTH); if (preload_quarter) iter->second.loadKDataToBuffer(KQuery::QUARTER); if (preload_halfyear) iter->second.loadKDataToBuffer(KQuery::HALFYEAR); if (preload_year) iter->second.loadKDataToBuffer(KQuery::YEAR); if (preload_min) iter->second.loadKDataToBuffer(KQuery::MIN); if (preload_min5) iter->second.loadKDataToBuffer(KQuery::MIN5); if (preload_min15) iter->second.loadKDataToBuffer(KQuery::MIN15); if (preload_min30) iter->second.loadKDataToBuffer(KQuery::MIN30); if (preload_min60) iter->second.loadKDataToBuffer(KQuery::MIN60); } } else { // 异步并行加载 auto& tg = *getGlobalTaskGroup(); for (auto iter = m_stockDict.begin(); iter != m_stockDict.end(); ++iter) { if (iter->second.market() == "TMP") continue; iter->second.setKDataDriver(driver); if (preload_day) tg.submit([=]() mutable { iter->second.loadKDataToBuffer(KQuery::DAY); }); if (preload_week) tg.submit([=]() mutable { iter->second.loadKDataToBuffer(KQuery::WEEK); }); if (preload_month) tg.submit([=]() mutable { iter->second.loadKDataToBuffer(KQuery::MONTH); }); if (preload_quarter) tg.submit([=]() mutable { iter->second.loadKDataToBuffer(KQuery::QUARTER); }); if (preload_halfyear) tg.submit([=]() mutable { iter->second.loadKDataToBuffer(KQuery::HALFYEAR); }); if (preload_year) tg.submit([=]() mutable { iter->second.loadKDataToBuffer(KQuery::YEAR); }); if (preload_min) tg.submit([=]() mutable { iter->second.loadKDataToBuffer(KQuery::MIN); }); if (preload_min5) tg.submit([=]() mutable { iter->second.loadKDataToBuffer(KQuery::MIN5); }); if (preload_min15) tg.submit([=]() mutable { iter->second.loadKDataToBuffer(KQuery::MIN15); }); if (preload_min30) tg.submit([=]() mutable { iter->second.loadKDataToBuffer(KQuery::MIN30); }); if (preload_min60) tg.submit([=]() mutable { iter->second.loadKDataToBuffer(KQuery::MIN60); }); } } initInnerTasek(); } void StockManager::reload() { loadAllHolidays(); loadAllMarketInfos(); loadAllStockTypeInfo(); loadAllStocks(); loadAllStockWeights(); HKU_INFO("start reload kdata to buffer"); std::vector can_not_parallel_stk_list; // 记录不支持并行加载的Stock { auto* tg = getGlobalTaskGroup(); std::lock_guard lock(*m_stockDict_mutex); for (auto iter = m_stockDict.begin(); iter != m_stockDict.end(); ++iter) { auto driver = iter->second.getKDataDirver(); if (!driver->getPrototype()->canParallelLoad()) { can_not_parallel_stk_list.push_back(iter->second); continue; } auto& ktype_list = KQuery::getAllKType(); for (auto& ktype : ktype_list) { if (iter->second.isBuffer(ktype)) { tg->submit([=]() mutable { Stock& stk = iter->second; stk.loadKDataToBuffer(ktype); }); } } } } for (auto& stk : can_not_parallel_stk_list) { auto& ktype_list = KQuery::getAllKType(); for (auto& ktype : ktype_list) { if (stk.isBuffer(ktype)) { stk.loadKDataToBuffer(ktype); } } } } string StockManager::tmpdir() const { return m_tmpdir; } string StockManager::datadir() const { return m_datadir; } Stock StockManager::getStock(const string& querystr) const { Stock result; string query_str = querystr; to_upper(query_str); std::lock_guard lock(*m_stockDict_mutex); auto iter = m_stockDict.find(query_str); return (iter != m_stockDict.end()) ? iter->second : result; } MarketInfo StockManager::getMarketInfo(const string& market) const { MarketInfo result; string market_tmp = market; to_upper(market_tmp); std::lock_guard lock(*m_marketInfoDict_mutex); auto iter = m_marketInfoDict.find(market_tmp); if (iter != m_marketInfoDict.end()) { result = iter->second; } else { result = m_baseInfoDriver->getMarketInfo(market_tmp); if (result != Null()) { m_marketInfoDict[market_tmp] = result; } } return result; } StockTypeInfo StockManager::getStockTypeInfo(uint32_t type) const { StockTypeInfo result; std::lock_guard lock(*m_stockTypeInfo_mutex); auto iter = m_stockTypeInfo.find(type); if (iter != m_stockTypeInfo.end()) { result = iter->second; } else { result = m_baseInfoDriver->getStockTypeInfo(type); if (result != Null()) { m_stockTypeInfo[type] = result; } } return result; } MarketList StockManager::getAllMarket() const { MarketList result; std::lock_guard lock(*m_marketInfoDict_mutex); auto iter = m_marketInfoDict.begin(); for (; iter != m_marketInfoDict.end(); ++iter) { result.push_back(iter->first); } return result; } Block StockManager::getBlock(const string& category, const string& name) { return m_blockDriver ? m_blockDriver->getBlock(category, name) : Block(); } BlockList StockManager::getBlockList(const string& category) { return m_blockDriver ? m_blockDriver->getBlockList(category) : BlockList(); } BlockList StockManager::getBlockList() { return m_blockDriver ? m_blockDriver->getBlockList() : BlockList(); } DatetimeList StockManager::getTradingCalendar(const KQuery& query, const string& market) { auto marketinfo = getMarketInfo(market); return getStock(fmt::format("{}{}", marketinfo.market(), marketinfo.code())) .getDatetimeList(query); } Stock StockManager::addTempCsvStock(const string& code, const string& day_filename, const string& min_filename, price_t tick, price_t tickValue, int precision, size_t minTradeNumber, size_t maxTradeNumber) { string new_code(code); to_upper(new_code); Stock result("TMP", new_code, day_filename, STOCKTYPE_TMP, true, Datetime(199901010000), Null(), tick, tickValue, precision, minTradeNumber, maxTradeNumber); Parameter param; param.set("type", "TMPCSV"); auto driver_pool = DataDriverFactory::getKDataDriverPool(param); auto driver = driver_pool->getPrototype(); KDataTempCsvDriver* p = dynamic_cast(driver.get()); p->setDayFileName(day_filename); p->setMinFileName(min_filename); result.setKDataDriver(driver_pool); const auto& preload_param = getPreloadParameter(); if (preload_param.tryGet("day", true)) { result.loadKDataToBuffer(KQuery::DAY); } if (preload_param.tryGet("min", false)) { result.loadKDataToBuffer(KQuery::MIN); } return addStock(result) ? result : Null(); } void StockManager::removeTempCsvStock(const string& code) { string query_str = "TMP" + code; to_upper(query_str); auto iter = m_stockDict.find(query_str); if (iter != m_stockDict.end()) { m_stockDict.erase(iter); } } bool StockManager::addStock(const Stock& stock) { string market_code(stock.market_code()); to_upper(market_code); std::lock_guard lock(*m_stockDict_mutex); HKU_ERROR_IF_RETURN(m_stockDict.find(market_code) != m_stockDict.end(), false, "The stock had exist! {}", market_code); m_stockDict[market_code] = stock; return true; } void StockManager::loadAllStocks() { HKU_INFO("Loading stock information..."); vector stockInfos; if (m_context.isAll()) { stockInfos = m_baseInfoDriver->getAllStockInfo(); } else { const vector& context_stock_code_list = m_context.getStockCodeList(); auto all_market = getAllMarket(); for (auto stkcode : context_stock_code_list) { to_upper(stkcode); bool find = false; for (auto& market : all_market) { auto pos = stkcode.find(market); if (pos != string::npos && market.size() <= stkcode.size()) { string stk_market = stkcode.substr(pos, market.size()); string stk_code = stkcode.substr(market.size(), stkcode.size()); stockInfos.push_back(m_baseInfoDriver->getStockInfo(stk_market, stk_code)); find = true; break; } } HKU_WARN_IF(!find, "Invalid stock code: {}", stkcode); } } std::lock_guard lock(*m_stockDict_mutex); for (auto& info : stockInfos) { Datetime startDate, endDate; try { startDate = Datetime(info.startDate * 10000LL); } catch (...) { startDate = Null(); } try { endDate = Datetime(info.endDate * 10000LL); } catch (...) { endDate = Null(); } string market_code = format("{}{}", info.market, info.code); to_upper(market_code); auto iter = m_stockDict.find(market_code); if (iter == m_stockDict.end()) { Stock stock(info.market, info.code, info.name, info.type, info.valid, startDate, endDate, info.tick, info.tickValue, info.precision, info.minTradeNumber, info.maxTradeNumber); m_stockDict[market_code] = stock; } else { Stock& stock = iter->second; if (!stock.m_data) { stock.m_data = shared_ptr( new Stock::Data(info.market, info.code, info.name, info.type, info.valid, startDate, endDate, info.tick, info.tickValue, info.precision, info.minTradeNumber, info.maxTradeNumber)); } else { stock.m_data->m_market = info.market; stock.m_data->m_code = info.code; stock.m_data->m_name = info.name; stock.m_data->m_type = info.type; stock.m_data->m_valid = info.valid; stock.m_data->m_startDate = startDate; stock.m_data->m_lastDate = endDate; stock.m_data->m_tick = info.tick; stock.m_data->m_tickValue = info.tickValue; stock.m_data->m_precision = info.precision; stock.m_data->m_minTradeNumber = info.minTradeNumber; stock.m_data->m_maxTradeNumber = info.maxTradeNumber; } } } } void StockManager::loadAllMarketInfos() { HKU_INFO("Loading market information..."); auto marketInfos = m_baseInfoDriver->getAllMarketInfo(); std::lock_guard lock(*m_marketInfoDict_mutex); m_marketInfoDict.clear(); m_marketInfoDict.reserve(marketInfos.size()); for (auto& marketInfo : marketInfos) { string market = marketInfo.market(); to_upper(market); m_marketInfoDict[market] = marketInfo; } } void StockManager::loadAllStockTypeInfo() { HKU_INFO("Loading stock type information..."); auto stkTypeInfos = m_baseInfoDriver->getAllStockTypeInfo(); std::lock_guard lock(*m_stockTypeInfo_mutex); m_stockTypeInfo.clear(); m_stockTypeInfo.reserve(stkTypeInfos.size()); for (auto& stkTypeInfo : stkTypeInfos) { m_stockTypeInfo[stkTypeInfo.type()] = stkTypeInfo; } } void StockManager::loadAllHolidays() { auto holidays = m_baseInfoDriver->getAllHolidays(); std::lock_guard lock(*m_holidays_mutex); m_holidays = std::move(holidays); } void StockManager::loadAllStockWeights() { HKU_INFO("Loading stock weight..."); ThreadPool tg; // 这里不用全局的线程池,可以避免在初始化后立即reload导致过长的等待 std::vector> task_list; std::lock_guard lock(*m_stockDict_mutex); for (auto iter = m_stockDict.begin(); iter != m_stockDict.end(); ++iter) { task_list.push_back(tg.submit([=]() mutable { Stock& stock = iter->second; StockWeightList weightList = m_baseInfoDriver->getStockWeightList( stock.market(), stock.code(), Datetime::min(), Null()); if (stock.m_data) { std::lock_guard lock(stock.m_data->m_weight_mutex); stock.m_data->m_weightList.swap(weightList); } })); } // 权息信息如果不等待加载完毕,在数据加载期间进行计算可能导致复权错误,所以这里需要等待 for (auto& task : task_list) { task.get(); } } } // namespace hku