优化DriverPool

This commit is contained in:
fasiondog 2021-01-24 20:47:38 +08:00
parent 4fe7210542
commit 77e1dfbd10
8 changed files with 120 additions and 90 deletions

View File

@ -220,7 +220,7 @@ size_t Stock::maxTradeNumber() const {
return m_data ? m_data->m_maxTradeNumber : default_maxTradeNumber;
}
void Stock::setKDataDriver(const KDataDriverPoolPtr& kdataDriver) {
void Stock::setKDataDriver(const KDataDriverConnectPoolPtr& kdataDriver) {
HKU_CHECK(kdataDriver, "kdataDriver is nullptr!");
m_kdataDriver = kdataDriver;
if (m_data) {
@ -285,15 +285,15 @@ void Stock::loadKDataToBuffer(KQuery::KType inkType) {
HKU_ERROR_IF_RETURN(max_num < 0, void(), "Invalid preload {} param: {}", preload_type, max_num);
auto driver = m_kdataDriver->getConnect();
size_t total = driver->get()->getCount(m_data->m_market, m_data->m_code, kType);
size_t total = driver->getCount(m_data->m_market, m_data->m_code, kType);
HKU_IF_RETURN(total == 0, void());
int start = total <= max_num ? 0 : total - max_num;
{
std::unique_lock<std::shared_mutex> lock(*(m_data->pMutex[kType]));
KRecordList* ptr_klist = new KRecordList;
m_data->pKData[kType] = ptr_klist;
(*ptr_klist) = driver->get()->getKRecordList(m_data->m_market, m_data->m_code,
KQuery(start, Null<int64_t>(), kType));
(*ptr_klist) = driver->getKRecordList(m_data->m_market, m_data->m_code,
KQuery(start, Null<int64_t>(), kType));
}
}
@ -343,8 +343,7 @@ size_t Stock::getCount(KQuery::KType kType) const {
return _getCountFromBuffer(nktype);
}
return m_kdataDriver ? m_kdataDriver->getConnect()->get()->getCount(market(), code(), nktype)
: 0;
return m_kdataDriver ? m_kdataDriver->getConnect()->getCount(market(), code(), nktype) : 0;
}
price_t Stock::getMarketValue(const Datetime& datetime, KQuery::KType inktype) const {
@ -355,7 +354,7 @@ price_t Stock::getMarketValue(const Datetime& datetime, KQuery::KType inktype) c
to_upper(ktype);
// 如果为内存缓存或者数据驱动为索引优先,则按索引方式获取
if (isBuffer(ktype) || m_kdataDriver->getConnect()->get()->isIndexFirst()) {
if (isBuffer(ktype) || m_kdataDriver->getConnect()->isIndexFirst()) {
KQuery query = KQueryByDate(datetime, Null<Datetime>(), ktype);
size_t out_start, out_end;
if (getIndexRange(query, out_start, out_end)) {
@ -412,8 +411,8 @@ bool Stock::getIndexRange(const KQuery& query, size_t& out_start, size_t& out_en
return _getIndexRangeByDateFromBuffer(query, out_start, out_end);
}
if (!m_kdataDriver->getConnect()->get()->getIndexRangeByDate(m_data->m_market, m_data->m_code,
query, out_start, out_end)) {
if (!m_kdataDriver->getConnect()->getIndexRangeByDate(m_data->m_market, m_data->m_code, query,
out_start, out_end)) {
out_start = 0;
out_end = 0;
return false;
@ -555,8 +554,8 @@ KRecord Stock::getKRecord(size_t pos, KQuery::KType kType) const {
}
HKU_IF_RETURN(!m_kdataDriver || pos >= size_t(Null<int64_t>()), Null<KRecord>());
auto klist = m_kdataDriver->getConnect()->get()->getKRecordList(market(), code(),
KQuery(pos, pos + 1, kType));
auto klist =
m_kdataDriver->getConnect()->getKRecordList(market(), code(), KQuery(pos, pos + 1, kType));
return klist.size() > 0 ? klist[0] : Null<KRecord>();
}
@ -566,12 +565,12 @@ KRecord Stock::getKRecord(const Datetime& datetime, KQuery::KType ktype) const {
KQuery query = KQueryByDate(datetime, datetime + Minutes(1), ktype);
auto driver = m_kdataDriver->getConnect();
if (isBuffer(query.kType()) || driver->get()->isIndexFirst()) {
if (isBuffer(query.kType()) || driver->isIndexFirst()) {
size_t startix = 0, endix = 0;
return getIndexRange(query, startix, endix) ? getKRecord(startix, ktype) : Null<KRecord>();
}
auto klist = driver->get()->getKRecordList(market(), code(), query);
auto klist = driver->getKRecordList(market(), code(), query);
return klist.size() > 0 ? klist[0] : Null<KRecord>();
}
@ -616,8 +615,8 @@ KRecordList Stock::getKRecordList(const KQuery& query) const {
} else {
if (query.queryType() == KQuery::DATE) {
result = m_kdataDriver->getConnect()->get()->getKRecordList(m_data->m_market,
m_data->m_code, query);
result =
m_kdataDriver->getConnect()->getKRecordList(m_data->m_market, m_data->m_code, query);
} else {
size_t start_ix = 0, end_ix = 0;
if (query.queryType() == KQuery::INDEX) {
@ -631,7 +630,7 @@ KRecordList Stock::getKRecordList(const KQuery& query) const {
end_ix = query.end();
}
}
result = m_kdataDriver->getConnect()->get()->getKRecordList(
result = m_kdataDriver->getConnect()->getKRecordList(
m_data->m_market, m_data->m_code, KQuery(start_ix, end_ix, query.kType()));
}
}
@ -650,13 +649,12 @@ DatetimeList Stock::getDatetimeList(const KQuery& query) const {
}
TimeLineList Stock::getTimeLineList(const KQuery& query) const {
return m_kdataDriver
? m_kdataDriver->getConnect()->get()->getTimeLineList(market(), code(), query)
: TimeLineList();
return m_kdataDriver ? m_kdataDriver->getConnect()->getTimeLineList(market(), code(), query)
: TimeLineList();
}
TransList Stock::getTransList(const KQuery& query) const {
return m_kdataDriver ? m_kdataDriver->getConnect()->get()->getTransList(market(), code(), query)
return m_kdataDriver ? m_kdataDriver->getConnect()->getTransList(market(), code(), query)
: TransList();
}

View File

@ -18,14 +18,13 @@
namespace hku {
class HKU_API StockManager;
class KDataDriver;
class KDataDriverConnect;
template <class DriverType>
class DriverPool;
template <class DriverConnectT>
class DriverConnectPool;
typedef shared_ptr<KDataDriver> KDataDriverPtr;
typedef DriverPool<KDataDriver> KDataDriverPool;
typedef shared_ptr<KDataDriverPool> KDataDriverPoolPtr;
typedef DriverConnectPool<KDataDriverConnect> KDataDriverConnectPool;
typedef shared_ptr<KDataDriverConnectPool> KDataDriverConnectPoolPtr;
class HKU_API KData;
class HKU_API Parameter;
@ -186,7 +185,7 @@ public:
bool isTransactionTime(Datetime time);
/** 设置K线数据获取驱动 */
void setKDataDriver(const KDataDriverPoolPtr& kdataDriver);
void setKDataDriver(const KDataDriverConnectPoolPtr& kdataDriver);
/**
* K线数据做自身缓存
@ -222,7 +221,7 @@ private:
private:
struct HKU_API Data;
shared_ptr<Data> m_data;
KDataDriverPoolPtr m_kdataDriver;
KDataDriverConnectPoolPtr m_kdataDriver;
};
struct HKU_API Stock::Data {

View File

@ -132,7 +132,7 @@ void StockManager::init(const Parameter& baseInfoParam, const Parameter& blockPa
HKU_INFO("{:<.2f}s Loaded Data.", sec.count());
}
void StockManager::setKDataDriver(const KDataDriverPoolPtr& driver) {
void StockManager::setKDataDriver(const KDataDriverConnectPoolPtr& driver) {
HKU_ERROR_IF_RETURN(!driver, void(), "kdata driver is null!");
if (m_kdataDriverParam != driver->getPrototype()->getParameter()) {

View File

@ -183,7 +183,7 @@ public:
private:
/* 设置K线驱动 */
void setKDataDriver(const KDataDriverPoolPtr&);
void setKDataDriver(const KDataDriverConnectPoolPtr&);
private:
StockManager();

View File

@ -23,7 +23,7 @@ map<string, BaseInfoDriverPtr>* DataDriverFactory::m_baseInfoDrivers{nullptr};
map<string, BlockInfoDriverPtr>* DataDriverFactory::m_blockDrivers{nullptr};
map<string, KDataDriverPtr>* DataDriverFactory::m_kdataPrototypeDrivers{nullptr};
map<string, KDataDriverPoolPtr>* DataDriverFactory::m_kdataDriverPools{nullptr};
map<string, KDataDriverConnectPoolPtr>* DataDriverFactory::m_kdataDriverPools{nullptr};
void DataDriverFactory::init() {
m_baseInfoDrivers = new map<string, BaseInfoDriverPtr>();
@ -34,7 +34,7 @@ void DataDriverFactory::init() {
DataDriverFactory::regBlockDriver(make_shared<QLBlockInfoDriver>());
m_kdataPrototypeDrivers = new map<string, KDataDriverPtr>();
m_kdataDriverPools = new map<string, KDataDriverPoolPtr>();
m_kdataDriverPools = new map<string, KDataDriverConnectPoolPtr>();
DataDriverFactory::regKDataDriver(make_shared<TdxKDataDriver>());
DataDriverFactory::regKDataDriver(make_shared<H5KDataDriver>());
@ -129,8 +129,8 @@ void DataDriverFactory::removeKDataDriver(const string& name) {
}
}
KDataDriverPoolPtr DataDriverFactory::getKDataDriverPool(const Parameter& params) {
KDataDriverPoolPtr result;
KDataDriverConnectPoolPtr DataDriverFactory::getKDataDriverPool(const Parameter& params) {
KDataDriverConnectPoolPtr result;
string name = params.get<string>("type");
to_upper(name);
auto iter = m_kdataDriverPools->find(name);
@ -140,8 +140,8 @@ KDataDriverPoolPtr DataDriverFactory::getKDataDriverPool(const Parameter& params
auto prototype_iter = m_kdataPrototypeDrivers->find(name);
HKU_CHECK(prototype_iter != m_kdataPrototypeDrivers->end(), "Unregistered driver{}",
name);
HKU_CHECK(prototype_iter->second->init(params),"Failed init driver: {}", name);
(*m_kdataDriverPools)[name] = make_shared<KDataDriverPool>(prototype_iter->second);
HKU_CHECK(prototype_iter->second->init(params), "Failed init driver: {}", name);
(*m_kdataDriverPools)[name] = make_shared<KDataDriverConnectPool>(prototype_iter->second);
result = (*m_kdataDriverPools)[name];
}
return result;

View File

@ -9,15 +9,15 @@
#ifndef DATADRIVERFACTORY_H_
#define DATADRIVERFACTORY_H_
#include "DriverPool.h"
#include "DriverConnectPool.h"
#include "BaseInfoDriver.h"
#include "KDataDriver.h"
#include "BlockInfoDriver.h"
namespace hku {
typedef DriverPool<KDataDriver> KDataDriverPool;
typedef shared_ptr<KDataDriverPool> KDataDriverPoolPtr;
typedef DriverConnectPool<KDataDriverConnect> KDataDriverConnectPool;
typedef shared_ptr<KDataDriverConnectPool> KDataDriverConnectPoolPtr;
/**
*
@ -45,13 +45,13 @@ public:
static void regKDataDriver(const KDataDriverPtr &);
static void removeKDataDriver(const string &name);
static KDataDriverPoolPtr getKDataDriverPool(const Parameter &);
static KDataDriverConnectPoolPtr getKDataDriverPool(const Parameter &);
private:
static map<string, BaseInfoDriverPtr> *m_baseInfoDrivers;
static map<string, BlockInfoDriverPtr> *m_blockDrivers;
static map<string, KDataDriverPtr> *m_kdataPrototypeDrivers; // K线驱动原型
static map<string, KDataDriverPoolPtr> *m_kdataDriverPools; // K线驱动池
static map<string, KDataDriverPtr> *m_kdataPrototypeDrivers; // K线驱动原型
static map<string, KDataDriverConnectPoolPtr> *m_kdataDriverPools; // K线驱动池
};
} /* namespace hku */

View File

@ -15,37 +15,20 @@
namespace hku {
template <typename DriverType>
class DriverWrap {
public:
DriverWrap() = default;
typedef std::shared_ptr<DriverType> DriverPtr;
DriverWrap(const DriverPtr &driver) : m_driver(driver) {}
DriverPtr get() {
return m_driver;
}
explicit operator bool() const noexcept {
return m_driver.get() != nullptr;
}
private:
DriverPtr m_driver;
};
/**
*
* @tparam DriverType DriverType *clone()
* @tparam DriverConnectT DriverType *clone()
* @ingroup DataDriver
*/
template <typename DriverType>
class DriverPool {
template <class DriverConnectT>
class DriverConnectPool {
public:
DriverPool() = delete;
DriverPool(const DriverPool &) = delete;
DriverPool &operator=(const DriverPool &) = delete;
DriverConnectPool() = delete;
DriverConnectPool(const DriverConnectPool &) = delete;
DriverConnectPool &operator=(const DriverConnectPool &) = delete;
typedef typename DriverConnectT::DriverTypePtr DriverPtr;
typedef std::shared_ptr<DriverConnectT> DriverConnectPtr;
/**
*
@ -53,24 +36,20 @@ public:
* @param maxConnect 0
* @param maxIdleConnect 0 CPU数
*/
explicit DriverPool(const std::shared_ptr<DriverType> &prototype, size_t maxConnect = 0,
size_t maxIdleConnect = std::thread::hardware_concurrency())
explicit DriverConnectPool(const DriverPtr &prototype, size_t maxConnect = 0,
size_t maxIdleConnect = std::thread::hardware_concurrency())
: m_maxSize(maxConnect),
m_maxIdelSize(maxIdleConnect),
m_count(0),
m_prototype(prototype),
m_closer(this) {}
typedef DriverWrap<DriverType> Wrap;
typedef std::shared_ptr<Wrap> WrapPtr;
typedef std::shared_ptr<DriverType> DriverPtr;
/**
*
*/
virtual ~DriverPool() {
virtual ~DriverConnectPool() {
while (!m_driverList.empty()) {
Wrap *p = m_driverList.front();
DriverConnectT *p = m_driverList.front();
m_driverList.pop();
if (p) {
delete p;
@ -79,19 +58,19 @@ public:
}
/** 获取可用连接,如超出允许的最大连接数,将阻塞等待,直到获得空闲资源 */
WrapPtr getConnect() noexcept {
DriverConnectPtr getConnect() noexcept {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_driverList.empty()) {
if (m_maxSize > 0 && m_count >= m_maxSize) {
m_cond.wait(lock, [this] { return !m_driverList.empty(); });
} else {
m_count++;
return WrapPtr(new Wrap(m_prototype->clone()), m_closer);
return DriverConnectPtr(new DriverConnectT(m_prototype->clone()), m_closer);
}
}
Wrap *p = m_driverList.front();
DriverConnectT *p = m_driverList.front();
m_driverList.pop();
return WrapPtr(p, m_closer);
return DriverConnectPtr(p, m_closer);
}
DriverPtr getPrototype() {
@ -112,7 +91,7 @@ public:
void releaseIdleConnect() {
std::lock_guard<std::mutex> lock(m_mutex);
while (!m_driverList.empty()) {
Wrap *p = m_driverList.front();
DriverConnectT *p = m_driverList.front();
m_driverList.pop();
m_count--;
if (p) {
@ -123,7 +102,7 @@ public:
private:
/** 归还至连接池 */
void returnDriver(Wrap *p) {
void returnDriver(DriverConnectT *p) {
std::unique_lock<std::mutex> lock(m_mutex);
if (p) {
if (m_driverList.size() < m_maxIdelSize) {
@ -140,25 +119,25 @@ private:
}
private:
size_t m_maxSize; //允许的最大连接数
size_t m_maxIdelSize; //允许的最大空闲连接数
size_t m_count; //当前活动的连接数
std::shared_ptr<DriverType> m_prototype; // 驱动原型
size_t m_maxSize; //允许的最大连接数
size_t m_maxIdelSize; //允许的最大空闲连接数
size_t m_count; //当前活动的连接数
DriverPtr m_prototype; // 驱动原型
std::mutex m_mutex;
std::condition_variable m_cond;
std::queue<Wrap *> m_driverList;
std::queue<DriverConnectT *> m_driverList;
class DriverCloser {
public:
explicit DriverCloser(DriverPool *pool) : m_pool(pool) {}
void operator()(Wrap *conn) {
explicit DriverCloser(DriverConnectPool *pool) : m_pool(pool) {}
void operator()(DriverConnectT *conn) {
if (m_pool && conn) {
m_pool->returnDriver(conn);
}
}
private:
DriverPool *m_pool;
DriverConnectPool *m_pool;
};
DriverCloser m_closer;

View File

@ -135,6 +135,60 @@ inline const string& KDataDriver::name() const {
return m_name;
}
class KDataDriverConnect {
public:
typedef KDataDriver DriverType;
typedef KDataDriverPtr DriverTypePtr;
KDataDriverConnect(const KDataDriverPtr& driver) : m_driver(driver) {}
~KDataDriverConnect() = default;
KDataDriverConnect(const KDataDriverConnect&) = delete;
KDataDriverConnect(KDataDriverConnect&&) = delete;
KDataDriverConnect& operator=(const KDataDriverConnect&) = delete;
KDataDriverConnect& operator=(KDataDriverConnect&&) = delete;
explicit operator bool() const noexcept {
return m_driver.get() != nullptr;
}
const string& name() const {
return m_driver->name();
}
bool isIndexFirst() {
return m_driver->isIndexFirst();
}
bool canParallelLoad() {
return m_driver->canParallelLoad();
}
size_t getCount(const string& market, const string& code, KQuery::KType kType) {
return m_driver->getCount(market, code, kType);
}
bool getIndexRangeByDate(const string& market, const string& code, const KQuery& query,
size_t& out_start, size_t& out_end) {
return m_driver->getIndexRangeByDate(market, code, query, out_start, out_end);
}
KRecordList getKRecordList(const string& market, const string& code, const KQuery& query) {
return m_driver->getKRecordList(market, code, query);
}
TimeLineList getTimeLineList(const string& market, const string& code, const KQuery& query) {
return m_driver->getTimeLineList(market, code, query);
}
TransList getTransList(const string& market, const string& code, const KQuery& query) {
return m_driver->getTransList(market, code, query);
}
private:
KDataDriverPtr m_driver;
};
} // namespace hku
#endif /* KDATADRIVER_H_ */