Merge pull request #37 from fasiondog/realbuffer

Realbuffer
This commit is contained in:
fasiondog 2020-11-20 00:02:09 +08:00 committed by GitHub
commit 1ac0793f75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 88 additions and 32 deletions

View File

@ -1,3 +1,4 @@
dist: bionic
sudo: true
language: C++
matrix:
@ -36,8 +37,9 @@ matrix:
addons:
apt:
sources:
- ubuntu-toolchain-r-test
- llvm-toolchain-bionic-5.0
packages:
- g++-8
- clang-5.0
env:
- MATRIX_EVAL="CC=clang-5.0 && CXX=clang++-5.0"
@ -46,8 +48,9 @@ matrix:
addons:
apt:
sources:
- ubuntu-toolchain-r-test
- llvm-toolchain-bionic-6.0
packages:
- g++-8
- clang-6.0
env:
- MATRIX_EVAL="CC=clang-6.0 && CXX=clang++-6.0"
@ -91,6 +94,6 @@ script:
- ls .
- echo $CXX
- xmake f --cxx=$CXX --confirm=y --test=small
- xmake -b small-test
- xmake -bvD small-test
- xmake r small-test

View File

@ -99,6 +99,12 @@ Stock::Data::~Data() {
delete iter->second;
}
}
for (auto iter = pMutex.begin(); iter != pMutex.end(); ++iter) {
if (iter->second) {
delete iter->second;
}
}
}
Stock::Stock() {}
@ -219,6 +225,11 @@ void Stock::setKDataDriver(const KDataDriverPtr& kdataDriver) {
delete iter->second;
}
m_data->pKData.clear();
for (auto iter = m_data->pMutex.begin(); iter != m_data->pMutex.end(); ++iter) {
delete iter->second;
}
m_data->pMutex.clear();
}
}
@ -256,17 +267,25 @@ void Stock::releaseKDataBuffer(KQuery::KType inkType) {
delete iter->second;
m_data->pKData.erase(kType);
}
auto mutex_iter = m_data->pMutex.find(kType);
if (mutex_iter != m_data->pMutex.end()) {
delete mutex_iter->second;
m_data->pMutex.erase(kType);
}
}
}
// 仅在初始化时调用
void Stock::loadKDataToBuffer(KQuery::KType inkType) {
if (m_data) {
string kType(inkType);
to_upper(kType);
releaseKDataBuffer(kType);
m_data->pKData[kType] = new KRecordList;
if (m_kdataDriver) {
m_data->pKData[kType] = new KRecordList;
m_data->pMutex[kType] = new std::shared_mutex();
*(m_data->pKData[kType]) = m_kdataDriver->getKRecordList(
m_data->m_market, m_data->m_code, KQuery(0, Null<int64_t>(), kType));
}
@ -308,6 +327,11 @@ KData Stock::getKData(const KQuery& query) const {
return KData(*this, query);
}
size_t Stock::_getCountFromBuffer(KQuery::KType ktype) const {
std::shared_lock<std::shared_mutex> lock(*(m_data->pMutex[ktype]));
return m_data->pKData[ktype]->size();
}
size_t Stock::getCount(KQuery::KType kType) const {
if (!m_data)
return 0;
@ -315,7 +339,7 @@ size_t Stock::getCount(KQuery::KType kType) const {
string nktype(kType);
to_upper(nktype);
if (m_data->pKData.find(nktype) != m_data->pKData.end()) {
return m_data->pKData[nktype]->size();
return _getCountFromBuffer(nktype);
}
return m_kdataDriver ? m_kdataDriver->getCount(market(), code(), nktype) : 0;
@ -456,16 +480,16 @@ bool Stock::_getIndexRangeByIndex(const KQuery& query, size_t& out_start, size_t
bool Stock::_getIndexRangeByDateFromBuffer(const KQuery& query, size_t& out_start,
size_t& out_end) const {
std::shared_lock<std::shared_mutex> lock(*(m_data->pMutex[query.kType()]));
out_start = 0;
out_end = 0;
//总数为0视为失败
size_t total = getCount(query.kType());
const KRecordList& kdata = *(m_data->pKData[query.kType()]);
size_t total = kdata.size();
if (0 == total) {
return false;
}
const KRecordList& kdata = *(m_data->pKData[query.kType()]);
size_t mid = total, low = 0, high = total - 1;
size_t startpos, endpos;
while (low <= high) {
@ -525,12 +549,18 @@ bool Stock::_getIndexRangeByDateFromBuffer(const KQuery& query, size_t& out_star
return true;
}
KRecord Stock::_getKRecordFromBuffer(size_t pos, KQuery::KType ktype) const {
std::shared_lock<std::shared_mutex> lock(*(m_data->pMutex[ktype]));
return m_data->pKData[ktype]->at(pos);
}
KRecord Stock::getKRecord(size_t pos, KQuery::KType kType) const {
if (!m_data)
return Null<KRecord>();
if (m_data->pKData.find(kType) != m_data->pKData.end())
return m_data->pKData[kType]->at(pos);
if (m_data->pKData.find(kType) != m_data->pKData.end()) {
return _getKRecordFromBuffer(pos, kType);
}
if (!m_kdataDriver || pos >= size_t(Null<int64_t>()))
return Null<KRecord>();
@ -556,6 +586,22 @@ KRecord Stock::getKRecord(const Datetime& datetime, KQuery::KType ktype) const {
return klist.size() > 0 ? klist[0] : Null<KRecord>();
}
KRecordList Stock::_getKRecordListFromBuffer(size_t start_ix, size_t end_ix,
KQuery::KType ktype) const {
std::shared_lock<std::shared_mutex> lock(*(m_data->pMutex[ktype]));
KRecordList result;
size_t total = m_data->pKData[ktype]->size();
if (start_ix >= end_ix || start_ix >= total) {
HKU_WARN("Invalid param! ({}, {})", start_ix, end_ix);
return result;
}
size_t length = end_ix > total ? total - start_ix : end_ix - start_ix;
result.resize(length);
std::memcpy(&(result.front()), &((*m_data->pKData[ktype])[start_ix]), sizeof(KRecord) * length);
return result;
}
KRecordList Stock::getKRecordList(const KQuery& query) const {
KRecordList result;
if (isNull()) {
@ -580,17 +626,7 @@ KRecordList Stock::getKRecordList(const KQuery& query) const {
end_ix = query.end();
}
}
size_t total = m_data->pKData[query.kType()]->size();
if (start_ix >= end_ix || start_ix >= total) {
HKU_WARN("Invalid param! ({}, {})", start_ix, end_ix);
return result;
}
size_t length = end_ix > total ? total - start_ix : end_ix - start_ix;
result.resize(length);
std::memcpy(&(result.front()), &((*m_data->pKData[query.kType()])[start_ix]),
sizeof(KRecord) * length);
result = _getKRecordListFromBuffer(start_ix, end_ix, query.kType());
} else {
if (query.queryType() == KQuery::DATE) {
@ -658,22 +694,29 @@ PriceList Stock::getHistoryFinanceInfo(const Datetime& date) const {
return PriceList();
}
void Stock::realtimeUpdate(const KRecord& record) {
// if (!m_data || !m_data->pKData[KQuery::DAY] ||
if (!isBuffer(KQuery::DAY) || record.datetime == Null<Datetime>()) {
void Stock::realtimeUpdate(const KRecord& record, KQuery::KType inktype) {
string ktype(inktype);
to_lower(ktype);
if (!m_data || m_data->pKData.find(ktype) == m_data->pKData.end() ||
record.datetime == Null<Datetime>()) {
return;
}
if (m_data->pKData[KQuery::DAY]->empty()) {
m_data->pKData[KQuery::DAY]->push_back(record);
// 加写锁
std::unique_lock<std::shared_mutex> lock(*(m_data->pMutex[ktype]));
if (m_data->pKData[ktype]->empty()) {
m_data->pKData[ktype]->push_back(record);
return;
}
KRecord& tmp = m_data->pKData[KQuery::DAY]->back();
KRecord& tmp = m_data->pKData[ktype]->back();
// 如果传入的记录日期等于最后一条记录日期,则更新最后一条记录;否则,追加入缓存
if (tmp.datetime == record.datetime) {
tmp = record;
} else if (tmp.datetime < record.datetime) {
m_data->pKData[KQuery::DAY]->push_back(record);
m_data->pKData[ktype]->push_back(record);
} else {
HKU_INFO("Ignore record, datetime < last record.datetime!");
}

View File

@ -9,6 +9,7 @@
#ifndef STOCK_H_
#define STOCK_H_
#include <shared_mutex>
#include "StockWeight.h"
#include "KQuery.h"
#include "TimeLineRecord.h"
@ -194,13 +195,19 @@ public:
bool isNull() const;
/** (临时函数)只用于更新缓存中的日线数据 **/
void realtimeUpdate(const KRecord&);
void realtimeUpdate(const KRecord&, KQuery::KType ktype = KQuery::DAY);
/** 仅用于python的__str__ */
string toString() const;
private:
bool _getIndexRangeByIndex(const KQuery&, size_t& out_start, size_t& out_end) const;
// 以下函数属于基础操作添加了读锁
size_t _getCountFromBuffer(KQuery::KType ktype) const;
KRecord _getKRecordFromBuffer(size_t pos, KQuery::KType ktype) const;
KRecordList _getKRecordListFromBuffer(size_t start_ix, size_t end_ix,
KQuery::KType ktype) const;
bool _getIndexRangeByDateFromBuffer(const KQuery&, size_t&, size_t&) const;
private:
@ -230,6 +237,7 @@ struct HKU_API Stock::Data {
size_t m_maxTradeNumber;
unordered_map<string, KRecordList*> pKData;
unordered_map<string, std::shared_mutex*> pMutex;
Data();
Data(const string& market, const string& code, const string& name, uint32_t type, bool valid,

View File

@ -151,11 +151,13 @@ void export_Stock() {
:param Datetime date: 0331063009301231 Datetime(201109300000)
:rtype: PriceList)")
.def("realtime_update", &Stock::realtimeUpdate, R"(realtime_update(self, krecord)
.def("realtime_update", &Stock::realtimeUpdate, (arg("krecord"), arg("ktype") = KQuery::DAY),
R"(realtime_update(self, krecord)
线
线
:param KRecord krecord: K线记录)")
:param KRecord krecord: K线记录
:param KQuery.KType ktype: K 线)")
.def("get_weight", &Stock::getWeight,
(arg("start") = Datetime::min(), arg("end") = Datetime()),