fixed 并行加载错误

This commit is contained in:
fasiondog 2021-01-04 00:16:19 +08:00
parent 39e5d03aa6
commit 89458028e4
8 changed files with 84 additions and 57 deletions

View File

@ -111,11 +111,11 @@ class MyMainWindow(QMainWindow, Ui_MainWindow):
'preload', 'halfyear_max', fallback=100000
),
year_max=current_config.getint('preload', 'year_max', fallback=100000),
min1_max=current_config.getint('preload', 'min_max', fallback=5120),
min5_max=current_config.getint('preload', 'min5_max', fallback=5120),
min15_max=current_config.getint('preload', 'min15_max', fallback=5120),
min30_max=current_config.getint('preload', 'min30_max', fallback=5120),
min60_max=current_config.getint('preload', 'min60_max', fallback=5120),
min1_max=current_config.getint('preload', 'min_max', fallback=4096),
min5_max=current_config.getint('preload', 'min5_max', fallback=4096),
min15_max=current_config.getint('preload', 'min15_max', fallback=4096),
min30_max=current_config.getint('preload', 'min30_max', fallback=4096),
min60_max=current_config.getint('preload', 'min60_max', fallback=4096),
)
)
else:
@ -149,11 +149,11 @@ class MyMainWindow(QMainWindow, Ui_MainWindow):
'preload', 'halfyear_max', fallback=100000
),
year_max=current_config.getint('preload', 'year_max', fallback=100000),
min1_max=current_config.getint('preload', 'min_max', fallback=5120),
min5_max=current_config.getint('preload', 'min5_max', fallback=5120),
min15_max=current_config.getint('preload', 'min15_max', fallback=5120),
min30_max=current_config.getint('preload', 'min30_max', fallback=5120),
min60_max=current_config.getint('preload', 'min60_max', fallback=5120),
min1_max=current_config.getint('preload', 'min_max', fallback=4096),
min5_max=current_config.getint('preload', 'min5_max', fallback=4096),
min15_max=current_config.getint('preload', 'min15_max', fallback=4096),
min30_max=current_config.getint('preload', 'min30_max', fallback=4096),
min60_max=current_config.getint('preload', 'min60_max', fallback=4096),
)
)

View File

@ -11,7 +11,7 @@ target("demo")
set_default(false)
end
add_packages("spdlog", "fmt")
add_packages("spdlog", "fmt", "flatbuffers")
add_includedirs("..")
if is_plat("windows") then

View File

@ -28,6 +28,14 @@ const string KQuery::HOUR6("HOUR5");
const string KQuery::HOUR12("HOUR12");
// const string KQuery::INVALID_KTYPE("Z");
// 获取所有的 KType
vector<string> KQuery::getAllKType() {
return vector<string>{KQuery::MIN, KQuery::MIN5, KQuery::MIN15, KQuery::MIN30,
KQuery::MIN60, KQuery::DAY, KQuery::WEEK, KQuery::MONTH,
KQuery::QUARTER, KQuery::HALFYEAR, KQuery::YEAR, KQuery::MIN3,
KQuery::HOUR2, KQuery::HOUR4, KQuery::HOUR6, KQuery::HOUR12};
}
KQuery::KQuery(Datetime start, Datetime end, KType ktype, RecoverType recoverType)
: m_start(start == Null<Datetime>() ? (int64_t)start.number()
: (int64_t)(start.number() * 100 + start.second())),

View File

@ -69,6 +69,9 @@ public:
static const string HOUR12;
// static const string INVALID_KTYPE;
/** 获取所有的 KType */
static vector<string> getAllKType();
/**
*
* @note 线线/线

View File

@ -64,7 +64,13 @@ Stock::Data::Data()
m_unit(default_unit),
m_precision(default_precision),
m_minTradeNumber(default_minTradeNumber),
m_maxTradeNumber(default_maxTradeNumber) {}
m_maxTradeNumber(default_maxTradeNumber) {
auto ktype_list = KQuery::getAllKType();
for (auto& ktype : ktype_list) {
pKData[ktype] = nullptr;
pMutex[ktype] = nullptr;
}
}
Stock::Data::Data(const string& market, const string& code, const string& name, uint32_t type,
bool valid, const Datetime& startDate, const Datetime& lastDate, price_t tick,
@ -91,18 +97,25 @@ Stock::Data::Data(const string& market, const string& code, const string& name,
to_upper(m_market);
m_market_code = m_market + m_code;
auto ktype_list = KQuery::getAllKType();
for (auto& ktype : ktype_list) {
pKData[ktype] = nullptr;
pMutex[ktype] = nullptr;
}
}
Stock::Data::~Data() {
for (auto iter = pKData.begin(); iter != pKData.end(); ++iter) {
if (iter->second) {
delete iter->second;
}
}
auto ktype_list = KQuery::getAllKType();
for (auto& ktype : ktype_list) {
if (pMutex[ktype]) {
std::unique_lock<std::shared_mutex> lock(*pMutex[ktype]);
if (pKData[ktype]) {
delete pKData[ktype];
pKData[ktype] = nullptr;
}
for (auto iter = pMutex.begin(); iter != pMutex.end(); ++iter) {
if (iter->second) {
delete iter->second;
delete pMutex[ktype];
}
}
}
@ -212,15 +225,14 @@ void Stock::setKDataDriver(const KDataDriverPtr& kdataDriver) {
HKU_CHECK(kdataDriver, "kdataDriver is nullptr!");
m_kdataDriver = kdataDriver;
if (m_data) {
for (auto iter = m_data->pKData.begin(); iter != m_data->pKData.end(); ++iter) {
delete iter->second;
auto ktype_list = KQuery::getAllKType();
for (auto& ktype : ktype_list) {
if (m_data->pMutex[ktype]) {
std::unique_lock<std::shared_mutex> lock(*(m_data->pMutex[ktype]));
delete m_data->pKData[ktype];
m_data->pKData[ktype] = nullptr;
}
}
m_data->pKData.clear();
for (auto iter = m_data->pMutex.begin(); iter != m_data->pMutex.end(); ++iter) {
delete iter->second;
}
m_data->pMutex.clear();
}
}
@ -241,7 +253,7 @@ bool Stock::isBuffer(KQuery::KType ktype) const {
HKU_IF_RETURN(!m_data, false);
string nktype(ktype);
to_upper(nktype);
return m_data->pKData.find(nktype) != m_data->pKData.end();
return m_data->pKData.find(nktype) != m_data->pKData.end() && m_data->pKData[nktype];
}
bool Stock::isNull() const {
@ -250,18 +262,13 @@ bool Stock::isNull() const {
void Stock::releaseKDataBuffer(KQuery::KType inkType) {
if (m_data) {
string kType(inkType);
to_upper(kType);
auto iter = m_data->pKData.find(kType);
if (iter != m_data->pKData.end()) {
delete iter->second;
m_data->pKData.erase(kType);
}
string ktype(inkType);
to_upper(ktype);
auto mutex_iter = m_data->pMutex.find(kType);
if (mutex_iter != m_data->pMutex.end()) {
delete mutex_iter->second;
m_data->pMutex.erase(kType);
if (m_data->pKData.find(ktype) != m_data->pKData.end() && m_data->pMutex[ktype]) {
std::unique_lock<std::shared_mutex> lock(*(m_data->pMutex[ktype]));
delete m_data->pKData[ktype];
m_data->pKData[ktype] = nullptr;
}
}
}
@ -273,19 +280,27 @@ void Stock::loadKDataToBuffer(KQuery::KType inkType) {
to_upper(kType);
releaseKDataBuffer(kType);
if (m_kdataDriver) {
m_data->pKData[kType] = new KRecordList;
m_data->pMutex[kType] = new std::shared_mutex();
if (m_kdataDriver && m_data->pKData.find(kType) != m_data->pKData.end()) {
KRecordList* ptr_klist = new KRecordList;
std::shared_mutex* ptr_mutex = m_data->pMutex[kType];
if (!ptr_mutex) {
ptr_mutex = new std::shared_mutex();
}
m_data->pKData[kType] = ptr_klist;
m_data->pMutex[kType] = ptr_mutex;
const auto& param = StockManager::instance().getPreloadParameter();
string preload_type = fmt::format("{}_max", kType);
to_lower(preload_type);
int max_num = param.tryGet<int>(preload_type, 5120);
int max_num = param.tryGet<int>(preload_type, 4096);
HKU_ERROR_IF_RETURN(max_num < 0, void(), "Invalid preload {} param: {}", preload_type,
max_num);
size_t total = m_kdataDriver->getCount(m_data->m_market, m_data->m_code, kType);
int start = total <= max_num ? 0 : total - max_num;
*(m_data->pKData[kType]) = m_kdataDriver->getKRecordList(
m_data->m_market, m_data->m_code, KQuery(start, Null<int64_t>(), kType));
{
std::unique_lock<std::shared_mutex> lock(*ptr_mutex);
(*ptr_klist) = m_kdataDriver->getKRecordList(m_data->m_market, m_data->m_code,
KQuery(start, Null<int64_t>(), kType));
}
}
}
}
@ -332,7 +347,7 @@ size_t Stock::getCount(KQuery::KType kType) const {
HKU_IF_RETURN(!m_data, 0);
string nktype(kType);
to_upper(nktype);
if (m_data->pKData.find(nktype) != m_data->pKData.end()) {
if (m_data->pKData.find(nktype) != m_data->pKData.end() && m_data->pKData[nktype]) {
return _getCountFromBuffer(nktype);
}
@ -347,7 +362,7 @@ price_t Stock::getMarketValue(const Datetime& datetime, KQuery::KType inktype) c
to_upper(ktype);
// 如果为内存缓存或者数据驱动为索引优先,则按索引方式获取
if (m_data->pKData.find(ktype) != m_data->pKData.end() || m_kdataDriver->isIndexFirst()) {
if (m_data->pKData[ktype] || m_kdataDriver->isIndexFirst()) {
KQuery query = KQueryByDate(datetime, Null<Datetime>(), ktype);
size_t out_start, out_end;
if (getIndexRange(query, out_start, out_end)) {
@ -542,7 +557,7 @@ KRecord Stock::_getKRecordFromBuffer(size_t pos, KQuery::KType ktype) const {
KRecord Stock::getKRecord(size_t pos, KQuery::KType kType) const {
HKU_IF_RETURN(!m_data, Null<KRecord>());
if (m_data->pKData.find(kType) != m_data->pKData.end()) {
if (m_data->pKData[kType]) {
return _getKRecordFromBuffer(pos, kType);
}
@ -558,7 +573,7 @@ KRecord Stock::getKRecord(const Datetime& datetime, KQuery::KType ktype) const {
// string ktype(inktype);
// to_upper(ktype);
KQuery query = KQueryByDate(datetime, datetime + Minutes(1), ktype);
if (m_data->pKData.find(ktype) != m_data->pKData.end() || m_kdataDriver->isIndexFirst()) {
if (m_data->pKData[ktype] || m_kdataDriver->isIndexFirst()) {
size_t startix = 0, endix = 0;
return getIndexRange(query, startix, endix) ? getKRecord(startix, ktype) : Null<KRecord>();
}
@ -585,7 +600,7 @@ KRecordList Stock::getKRecordList(const KQuery& query) const {
HKU_IF_RETURN(isNull(), result);
// 如果是在内存缓存中
if (m_data->pKData.find(query.kType()) != m_data->pKData.end()) {
if (m_data->pKData[query.kType()]) {
size_t start_ix = 0, end_ix = 0;
if (query.queryType() == KQuery::DATE) {
if (!_getIndexRangeByDateFromBuffer(query, start_ix, end_ix)) {
@ -694,7 +709,7 @@ bool Stock::_isTransactionTime(Datetime time) {
void Stock::realtimeUpdate(KRecord record, KQuery::KType inktype) {
string ktype(inktype);
to_upper(ktype);
if (!m_data || m_data->pKData.find(ktype) == m_data->pKData.end() || record.datetime.isNull()) {
if (!m_data || m_data->pKData[ktype] || record.datetime.isNull()) {
return;
}

View File

@ -150,7 +150,7 @@ void StockManager::setKDataDriver(const KDataDriverPtr& driver) {
HKU_INFO_IF(preload_week, "Preloading all week kdata to buffer!");
bool preload_month = m_preloadParam.tryGet<bool>("month", false);
HKU_INFO_IF(preload_week, "Preloading all month kdata to buffer!");
HKU_INFO_IF(preload_month, "Preloading all month kdata to buffer!");
bool preload_quarter = m_preloadParam.tryGet<bool>("quarter", false);
HKU_INFO_IF(preload_quarter, "Preloading all quarter kdata to buffer!");

View File

@ -55,7 +55,7 @@ bool KDataDriver::init(const Parameter& params) {
HKU_IF_RETURN(m_params == params, true);
m_params = params;
HKU_IF_RETURN(!checkType(), false);
HKU_INFO("Using {} KDataDriver", name());
HKU_INFO("Using {} KDataDriver, load parallel: {}", name(), canParallelLoad());
return _init();
}

View File

@ -339,14 +339,15 @@ bool H5KDataDriver::getIndexRangeByDate(const string& market, const string& code
HKU_IF_RETURN(!h5file, false);
std::lock_guard<std::mutex> lock(*(m_mutex_map[h5file->getId()]));
bool result = false;
if (KQuery::MIN5 == query.kType() || KQuery::MIN == query.kType() ||
KQuery::DAY == query.kType()) {
return _getBaseIndexRangeByDate(market, code, query, out_start, out_end);
result = _getBaseIndexRangeByDate(market, code, query, out_start, out_end);
} else {
return _getOtherIndexRangeByDate(market, code, query, out_start, out_end);
result = _getOtherIndexRangeByDate(market, code, query, out_start, out_end);
}
return false;
return result;
}
bool H5KDataDriver::_getBaseIndexRangeByDate(const string& market, const string& code,