refactor kdata driver(continue)

This commit is contained in:
fasiondog 2020-10-01 00:06:58 +08:00
parent fc737e4be5
commit 2dd7e1b54f
11 changed files with 277 additions and 60 deletions

View File

@ -83,6 +83,7 @@ typedef unsigned char uint8;
typedef double price_t;
using std::string;
using std::string_view;
using std::enable_shared_from_this;
using std::make_shared;

View File

@ -19,7 +19,7 @@
#include "utilities/util.h"
#include "StockManager.h"
#include "GlobalTaskGroup.h"
#include "data_driver/KDataTempCsvDriver.h"
#include "data_driver/kdata/cvs/KDataTempCsvDriver.h"
#include "data_driver/base_info/sqlite/SQLiteBaseInfoDriver.h"
#include "data_driver/base_info/mysql/MySQLBaseInfoDriver.h"
#include "data_driver/block_info/qianlong/QLBlockInfoDriver.h"

View File

@ -86,6 +86,11 @@ KRecord KDataDriver::getKRecord(const string& market, const string& code, size_t
return Null<KRecord>();
}
KRecordList KDataDriver::getKRecordList(const string& market, const string& code, KQuery query) {
HKU_INFO("The getKRecordList method has not been implemented! (KDataDriver: {})", m_name);
return KRecordList();
}
TimeLineList KDataDriver::getTimeLineList(const string& market, const string& code,
const KQuery& query) {
HKU_INFO("The getTimeLineList method has not been implemented! (KDataDriver: {})", m_name);

View File

@ -47,6 +47,11 @@ public:
return true;
}
/**
*
*/
virtual bool isIndexFirst() = 0;
/**
* K线数据加载至缓存
* @param market
@ -91,6 +96,14 @@ public:
virtual KRecord getKRecord(const string& market, const string& code, size_t pos,
KQuery::KType kType);
/**
* K 线
* @param market
* @param code
* @param query
*/
virtual KRecordList getKRecordList(const string& market, const string& code, KQuery query);
/**
* 线
* @param market

View File

@ -10,8 +10,8 @@
#include <boost/lexical_cast.hpp>
#include "KDataTempCsvDriver.h"
#include "../utilities/util.h"
#include "../Log.h"
#include "../../../utilities/util.h"
#include "../../../Log.h"
namespace hku {

View File

@ -9,7 +9,7 @@
#ifndef DATA_DRIVER_KDATATEMPCSVDRIVER_H_
#define DATA_DRIVER_KDATATEMPCSVDRIVER_H_
#include "KDataDriver.h"
#include "../../KDataDriver.h"
namespace hku {
@ -22,6 +22,10 @@ public:
KDataTempCsvDriver(const string& day_filename, const string& min_filename);
virtual ~KDataTempCsvDriver();
virtual bool isIndexFirst() override {
return false;
}
/**
* K线数据加载至缓存
* @param market

View File

@ -214,6 +214,12 @@ void H5KDataDriver::loadKData(const string& market, const string& code, KQuery::
return;
}
/*KRecordList result = getKRecordList(market, code, KQuery(start_ix, end_ix, kType));
for (auto& record : result) {
out_buffer->push_back(record);
return;
}*/
if (KQuery::DAY == kType || KQuery::MIN5 == kType || KQuery::MIN == kType) {
_loadBaseData(market, code, kType, start_ix, end_ix, out_buffer);
return;
@ -354,7 +360,7 @@ bool H5KDataDriver::_getH5FileAndGroup(const string& market, const string& code,
KQuery::KType kType, H5FilePtr& out_file,
H5::Group& out_group) {
try {
string key(market + "_" + kType); // KQuery::getKTypeName(kType));
string key(format("{}_{}", market, kType));
to_upper(key);
out_file = m_h5file_map[key];
@ -452,7 +458,7 @@ KRecord H5KDataDriver::getKRecord(const string& market, const string& code, size
KRecord H5KDataDriver::_getBaseRecord(const string& market, const string& code, size_t pos,
KQuery::KType kType) {
assert(KQuery::DAY == kType || KQuery::MIN == kType || KQuery::MIN5 == kType);
HKU_ASSERT(KQuery::DAY == kType || KQuery::MIN == kType || KQuery::MIN5 == kType);
KRecord result;
H5FilePtr h5file;
@ -810,6 +816,154 @@ bool H5KDataDriver::_getOtherIndexRangeByDate(const string& market, const string
return true;
}
KRecordList H5KDataDriver::getKRecordList(const string& market, const string& code, KQuery query) {
KRecordList result;
auto kType = query.kType();
if (query.queryType() == KQuery::INDEX) {
if (query.start() >= query.end()) {
return result;
}
if (KQuery::DAY == kType || KQuery::MIN5 == kType || KQuery::MIN == kType) {
result = _getBaseKRecordList(market, code, kType, query.start(), query.end());
} else {
result = _getIndexKRecordList(market, code, kType, query.start(), query.end());
}
}
return result;
}
KRecordList H5KDataDriver::_getBaseKRecordList(const string& market, const string& code,
KQuery::KType kType, size_t start_ix,
size_t end_ix) {
KRecordList result;
H5FilePtr h5file;
H5::Group group;
if (!_getH5FileAndGroup(market, code, kType, h5file, group)) {
return result;
}
try {
string tablename(format("{}{}", market, code));
CHECK_DATASET_EXISTS_RET(group, tablename, result);
H5::DataSet dataset(group.openDataSet(tablename));
H5::DataSpace dataspace = dataset.getSpace();
size_t all_total = dataspace.getSelectNpoints();
if (0 == all_total || start_ix >= all_total) {
return result;
}
size_t total = end_ix > all_total ? all_total - start_ix : end_ix - start_ix;
std::unique_ptr<H5Record[]> pBuf = std::make_unique<H5Record[]>(total);
H5ReadRecords(dataset, start_ix, total, pBuf.get());
KRecord record;
result.reserve(total + 2);
for (hsize_t i = 0; i < total; i++) {
record.datetime = Datetime(pBuf[i].datetime);
record.openPrice = price_t(pBuf[i].openPrice) * 0.001;
record.highPrice = price_t(pBuf[i].highPrice) * 0.001;
record.lowPrice = price_t(pBuf[i].lowPrice) * 0.001;
record.closePrice = price_t(pBuf[i].closePrice) * 0.001;
record.transAmount = price_t(pBuf[i].transAmount) * 0.1;
record.transCount = price_t(pBuf[i].transCount);
result.push_back(record);
}
} catch (std::out_of_range& e) {
HKU_WARN("Invalid date! market_code({}{}) {}", market, code, e.what());
} catch (std::exception& e) {
HKU_WARN(e.what());
} catch (...) {
//忽略
}
return result;
}
KRecordList H5KDataDriver::_getIndexKRecordList(const string& market, const string& code,
KQuery::KType kType, size_t start_ix,
size_t end_ix) {
KRecordList result;
string tablename(format("{}{}", market, code));
H5FilePtr h5file;
H5::Group index_group;
if (!_getH5FileAndGroup(market, code, kType, h5file, index_group)) {
return result;
}
try {
H5::Group base_group = h5file->openGroup("data");
CHECK_DATASET_EXISTS_RET(base_group, tablename, result);
H5::DataSet base_dataset(base_group.openDataSet(tablename));
H5::DataSpace base_dataspace = base_dataset.getSpace();
size_t base_total = base_dataspace.getSelectNpoints();
if (0 == base_total) {
return result;
}
CHECK_DATASET_EXISTS_RET(index_group, tablename, result);
H5::DataSet index_dataset(index_group.openDataSet(tablename));
H5::DataSpace index_dataspace = index_dataset.getSpace();
size_t index_total = index_dataspace.getSelectNpoints();
if (0 == index_total || start_ix >= index_total) {
return result;
}
size_t index_len = end_ix > index_total ? index_total - start_ix : end_ix - start_ix;
std::unique_ptr<H5IndexRecord[]> p_index_buf =
std::make_unique<H5IndexRecord[]>(index_len + 1);
if (end_ix >= index_total) {
H5ReadIndexRecords(index_dataset, start_ix, index_len, p_index_buf.get());
p_index_buf[index_len].start = base_total;
} else {
index_len = end_ix - start_ix;
H5ReadIndexRecords(index_dataset, start_ix, index_len + 1, p_index_buf.get());
}
size_t base_len = p_index_buf[index_len].start - p_index_buf[0].start;
std::unique_ptr<H5Record[]> p_base_buf = std::make_unique<H5Record[]>(base_len);
H5ReadRecords(base_dataset, p_index_buf[0].start, base_len, p_base_buf.get());
KRecord record;
result.reserve(index_len);
for (hsize_t i = 0; i < index_len; i++) {
record.datetime = Datetime(p_index_buf[i].datetime);
size_t start_pos = p_index_buf[i].start - p_index_buf[0].start;
size_t end_pos = p_index_buf[i + 1].start - p_index_buf[0].start;
record.openPrice = 0.001 * price_t(p_base_buf[start_pos].openPrice);
record.closePrice = 0.001 * price_t(p_base_buf[end_pos - 1].closePrice);
record.highPrice = 0.001 * price_t(p_base_buf[start_pos].highPrice);
record.lowPrice = 0.001 * price_t(p_base_buf[start_pos].lowPrice);
record.transAmount = 0.1 * price_t(p_base_buf[start_pos].transAmount);
record.transCount = price_t(p_base_buf[start_pos].transCount);
for (hsize_t j = start_pos + 1; j < end_pos; j++) {
price_t temp = 0.001 * price_t(p_base_buf[j].highPrice);
if (temp > record.highPrice) {
record.highPrice = temp;
}
temp = 0.001 * price_t(p_base_buf[j].lowPrice);
if (temp < record.lowPrice) {
record.lowPrice = temp;
}
record.transAmount += 0.1 * price_t(p_base_buf[j].transAmount);
record.transCount += p_base_buf[j].transCount;
}
result.push_back(record);
}
} catch (std::out_of_range& e) {
HKU_WARN("Invalid date! {}", e.what());
} catch (...) {
//忽略
}
return result;
}
TimeLineList H5KDataDriver::getTimeLineList(const string& market, const string& code,
const KQuery& query) {
return query.queryType() == KQuery::INDEX

View File

@ -20,6 +20,11 @@ public:
virtual ~H5KDataDriver();
virtual bool _init() override;
virtual bool isIndexFirst() override {
return true;
}
virtual void loadKData(const string& market, const string& code, KQuery::KType kType,
size_t start_ix, size_t end_ix, KRecordListPtr out_buffer) override;
virtual size_t getCount(const string& market, const string& code, KQuery::KType kType) override;
@ -27,6 +32,8 @@ public:
size_t& out_start, size_t& out_end) override;
virtual KRecord getKRecord(const string& market, const string& code, size_t pos,
KQuery::KType kType) override;
virtual KRecordList getKRecordList(const string& market, const string& code,
KQuery query) override;
virtual TimeLineList getTimeLineList(const string& market, const string& code,
const KQuery& query) override;
virtual TransList getTransList(const string& market, const string& code,
@ -54,6 +61,15 @@ private:
bool _getOtherIndexRangeByDate(const string&, const string&, const KQuery&, size_t& out_start,
size_t& out_end);
KRecordList _getBaseKRecordList(const string& market, const string& code, KQuery::KType kType,
size_t start_ix, size_t end_ix);
KRecordList _getIndexKRecordList(const string& market, const string& code, KQuery::KType kType,
size_t start_ix, size_t end_ix);
// KRecordList _getBaseKRecordList(std::const string& market, std::const string& code, Datetime
// start,
// Datetime end);
TimeLineList _getTimeLine(const string& market, const string& code, int64 start, int64 end);
TimeLineList _getTimeLine(const string& market, const string& code, const Datetime& start,
const Datetime& end);

View File

@ -29,6 +29,10 @@ public:
virtual bool _init() override;
virtual bool isIndexFirst() override {
return false;
}
virtual void loadKData(const string& market, const string& code, KQuery::KType kType,
size_t start_ix, size_t end_ix, KRecordListPtr out_buffer) override;

View File

@ -20,6 +20,10 @@ public:
virtual bool _init() override;
virtual bool isIndexFirst() override {
return true;
}
virtual void loadKData(const string& market, const string& code, KQuery::KType kType,
size_t start_ix, size_t end_ix, KRecordListPtr out_buffer) override;
virtual size_t getCount(const string& market, const string& code, KQuery::KType kType) override;

View File

@ -11,10 +11,10 @@
using namespace hku;
using namespace boost::python;
class KDataDriverWrap: public KDataDriver, public wrapper<KDataDriver> {
class KDataDriverWrap : public KDataDriver, public wrapper<KDataDriver> {
public:
KDataDriverWrap(): KDataDriver() {}
KDataDriverWrap(const string& name): KDataDriver(name) {}
KDataDriverWrap() : KDataDriver() {}
KDataDriverWrap(const string& name) : KDataDriver(name) {}
virtual ~KDataDriverWrap() {}
bool _init() {
@ -29,26 +29,25 @@ public:
return this->KDataDriver::_init();
}
void loadKData(const string& market, const string& code,
KQuery::KType ktype, size_t start_ix, size_t end_ix,
KRecordListPtr out_buffer) {
bool isIndexFirst() {
return this->get_override("isIndexFirst")();
}
void loadKData(const string& market, const string& code, KQuery::KType ktype, size_t start_ix,
size_t end_ix, KRecordListPtr out_buffer) {
if (override call = get_override("loadKData")) {
call(market, code, ktype, start_ix, end_ix, out_buffer);
} else {
KDataDriver::loadKData(market, code, ktype,
start_ix, end_ix, out_buffer);
KDataDriver::loadKData(market, code, ktype, start_ix, end_ix, out_buffer);
}
}
void default_loadKData(const string& market, const string& code,
KQuery::KType ktype, size_t start_ix, size_t end_ix,
KRecordListPtr out_buffer) {
this->KDataDriver::loadKData(market, code, ktype,
start_ix, end_ix, out_buffer);
void default_loadKData(const string& market, const string& code, KQuery::KType ktype,
size_t start_ix, size_t end_ix, KRecordListPtr out_buffer) {
this->KDataDriver::loadKData(market, code, ktype, start_ix, end_ix, out_buffer);
}
size_t getCount(const string& market, const string& code,
KQuery::KType ktype) {
size_t getCount(const string& market, const string& code, KQuery::KType ktype) {
if (override call = get_override("getCount")) {
return call(market, code, ktype);
} else {
@ -56,13 +55,12 @@ public:
}
}
size_t default_getCount(const string& market, const string& code,
KQuery::KType ktype) {
size_t default_getCount(const string& market, const string& code, KQuery::KType ktype) {
return this->KDataDriver::getCount(market, code, ktype);
}
virtual object _getIndexRangeByDate(const string& market,
const string& code, const KQuery& query) {
virtual object _getIndexRangeByDate(const string& market, const string& code,
const KQuery& query) {
if (override call = get_override("_getIndexRangeByDate")) {
return call(market, code, query);
}
@ -70,13 +68,13 @@ public:
return make_tuple(0, 0);
}
object default_getIndexRangeByDate(const string& market,
const string& code, const KQuery& query) {
object default_getIndexRangeByDate(const string& market, const string& code,
const KQuery& query) {
return make_tuple(0, 0);
}
bool getIndexRangeByDate(const string& market, const string& code,
const KQuery& query, size_t& out_start, size_t& out_end) {
bool getIndexRangeByDate(const string& market, const string& code, const KQuery& query,
size_t& out_start, size_t& out_end) {
out_start = 0;
out_end = 0;
@ -120,8 +118,7 @@ public:
query, out_start, out_end);
}*/
KRecord getKRecord(const string& market, const string& code,
size_t pos, KQuery::KType ktype) {
KRecord getKRecord(const string& market, const string& code, size_t pos, KQuery::KType ktype) {
if (override call = get_override("getKRecord")) {
return call(market, code, pos, ktype);
} else {
@ -129,13 +126,25 @@ public:
}
}
KRecord default_getKRecord(const string& market, const string& code,
size_t pos, KQuery::KType ktype) {
KRecord default_getKRecord(const string& market, const string& code, size_t pos,
KQuery::KType ktype) {
return this->KDataDriver::getKRecord(market, code, pos, ktype);
}
TimeLineList getTimeLineList(const string& market, const string& code,
const KQuery& query) {
KRecordList getKRecordList(const string& market, const string& code, const KQuery& query) {
if (override call = get_override("getKRecordList")) {
return call(market, code, query);
} else {
return KDataDriver::getKRecordList(market, code, query);
}
}
KRecordList default_getKRecordList(const string& market, const string& code,
const KQuery& query) {
return this->KDataDriver::getKRecordList(market, code, query);
}
TimeLineList getTimeLineList(const string& market, const string& code, const KQuery& query) {
if (override call = get_override("getTimeLineList")) {
return call(market, code, query);
} else {
@ -144,38 +153,45 @@ public:
}
TimeLineList default_getTimeLineList(const string& market, const string& code,
const KQuery& query) {
const KQuery& query) {
return this->KDataDriver::getTimeLineList(market, code, query);
}
TransList getTransList(const string& market, const string& code, const KQuery& query) {
if (override call = get_override("getTransList")) {
return call(market, code, query);
} else {
return KDataDriver::getTransList(market, code, query);
}
}
TransList default_getTransList(const string& market, const string& code, const KQuery& query) {
return this->KDataDriver::getTransList(market, code, query);
}
};
void export_KDataDriver() {
class_<KDataDriverWrap, boost::noncopyable>("KDataDriver", init<>())
.def(init<const string&>())
.def(self_ns::str(self))
.add_property("name", make_function(&KDataDriver::name,
return_value_policy<copy_const_reference>()))
.def("getParam", &KDataDriver::getParam<boost::any>)
.def(init<const string&>())
.def(self_ns::str(self))
.add_property("name",
make_function(&KDataDriver::name, return_value_policy<copy_const_reference>()))
.def("getParam", &KDataDriver::getParam<boost::any>)
.def("_init", &KDataDriver::_init, &KDataDriverWrap::default_init)
.def("loadKData", &KDataDriver::loadKData,
&KDataDriverWrap::default_loadKData)
.def("getCount", &KDataDriver::getCount,
&KDataDriverWrap::default_getCount)
//.def("getIndexRangeByDate", &KDataDriver::getIndexRangeByDate,
// &KDataDriverWrap::default_getIndexRangeByDate)
.def("getKRecord", &KDataDriver::getKRecord,
&KDataDriverWrap::default_getKRecord)
.def("_getIndexRangeByDate",
&KDataDriverWrap::_getIndexRangeByDate,
&KDataDriverWrap::default_getIndexRangeByDate)
.def("getTimeLine",
&KDataDriverWrap::getTimeLineList,
&KDataDriverWrap::default_getTimeLineList)
;
.def("_init", &KDataDriver::_init, &KDataDriverWrap::default_init)
.def("isIndexFirst", pure_virtual(&KDataDriver::isIndexFirst))
.def("loadKData", &KDataDriver::loadKData, &KDataDriverWrap::default_loadKData)
.def("getCount", &KDataDriver::getCount, &KDataDriverWrap::default_getCount)
//.def("getIndexRangeByDate", &KDataDriver::getIndexRangeByDate,
// &KDataDriverWrap::default_getIndexRangeByDate)
.def("getKRecord", &KDataDriver::getKRecord, &KDataDriverWrap::default_getKRecord)
.def("_getIndexRangeByDate", &KDataDriverWrap::_getIndexRangeByDate,
&KDataDriverWrap::default_getIndexRangeByDate)
.def("getKRecordList", &KDataDriverWrap::getKRecordList,
&KDataDriverWrap::default_getKRecordList)
.def("getTimeLine", &KDataDriverWrap::getTimeLineList,
&KDataDriverWrap::default_getTimeLineList)
.def("getTransList", &KDataDriverWrap::getTransList, &KDataDriverWrap::default_getTransList);
register_ptr_to_python<KDataDriverPtr>();
}