driver pool (continue)

This commit is contained in:
fasiondog 2021-01-20 00:32:01 +08:00
parent 03c9a1c5b5
commit ff30c6e1a0
13 changed files with 307 additions and 3 deletions

View File

@ -298,8 +298,15 @@ Stock StockManager::addTempCsvStock(const string& code, const string& day_filena
Stock result("TMP", new_code, day_filename, STOCKTYPE_TMP, true, Datetime(199901010000),
Null<Datetime>(), tick, tickValue, precision, minTradeNumber, maxTradeNumber);
KDataTempCsvDriver* p = new KDataTempCsvDriver(day_filename, min_filename);
result.setKDataDriver(KDataDriverPtr(p));
Parameter param;
param.set<string>("type", "TMPCSV");
KDataDriverPtr driver = DataDriverFactory::getKDataDriver(param);
KDataTempCsvDriver* p = dynamic_cast<KDataTempCsvDriver*>(driver.get());
p->setDayFileName(day_filename);
p->setMinFileName(min_filename);
result.setKDataDriver(driver);
// KDataTempCsvDriver* p = new KDataTempCsvDriver(day_filename, min_filename);
// result.setKDataDriver(KDataDriverPtr(p));
const auto& preload_param = getPreloadParameter();
if (preload_param.tryGet<bool>("day", true)) {
result.loadKDataToBuffer(KQuery::DAY);

View File

@ -13,6 +13,7 @@
#include "kdata/hdf5/H5KDataDriver.h"
#include "kdata/mysql/MySQLKDataDriver.h"
#include "kdata/tdx/TdxKDataDriver.h"
#include "kdata/cvs/KDataTempCsvDriver.h"
#include "DataDriverFactory.h"
#include "KDataDriver.h"
@ -22,6 +23,8 @@ map<string, BaseInfoDriverPtr>* DataDriverFactory::m_baseInfoDrivers{nullptr};
map<string, BlockInfoDriverPtr>* DataDriverFactory::m_blockDrivers{nullptr};
map<string, KDataDriverPtr>* DataDriverFactory::m_kdataDrivers{nullptr};
map<string, DataDriverFactory::KDataDriverPool*>* DataDriverFactory::m_kdataDriverPools{nullptr};
void DataDriverFactory::init() {
m_baseInfoDrivers = new map<string, BaseInfoDriverPtr>();
DataDriverFactory::regBaseInfoDriver(make_shared<SQLiteBaseInfoDriver>());
@ -30,10 +33,13 @@ void DataDriverFactory::init() {
m_blockDrivers = new map<string, BlockInfoDriverPtr>();
DataDriverFactory::regBlockDriver(make_shared<QLBlockInfoDriver>());
m_kdataDriverPools = new map<string, KDataDriverPool*>();
m_kdataDrivers = new map<string, KDataDriverPtr>();
DataDriverFactory::regKDataDriver(make_shared<TdxKDataDriver>());
DataDriverFactory::regKDataDriver(make_shared<H5KDataDriver>());
DataDriverFactory::regKDataDriver(make_shared<MySQLKDataDriver>());
DataDriverFactory::regKDataDriver(make_shared<KDataTempCsvDriver>());
}
void DataDriverFactory::release() {
@ -45,6 +51,12 @@ void DataDriverFactory::release() {
m_kdataDrivers->clear();
delete m_kdataDrivers;
for (auto iter = m_kdataDriverPools->begin(); iter != m_kdataDriverPools->end(); ++iter) {
delete iter->second;
}
m_kdataDriverPools->clear();
delete m_kdataDriverPools;
}
void DataDriverFactory::regBaseInfoDriver(const BaseInfoDriverPtr& driver) {
@ -103,13 +115,21 @@ BlockInfoDriverPtr DataDriverFactory::getBlockDriver(const Parameter& params) {
void DataDriverFactory::regKDataDriver(const KDataDriverPtr& driver) {
string new_type(driver->name());
to_upper(new_type);
HKU_CHECK(m_kdataDriverPools->find(new_type) == m_kdataDriverPools->end(),
"Repeat regKDataDriver!");
(*m_kdataDrivers)[new_type] = driver;
(*m_kdataDriverPools)[new_type] = new KDataDriverPool(driver);
}
void DataDriverFactory::removeKDataDriver(const string& name) {
string new_name(name);
to_upper(new_name);
m_kdataDrivers->erase(new_name);
auto iter = m_kdataDriverPools->find(new_name);
if (iter != m_kdataDriverPools->end()) {
delete iter->second;
m_kdataDriverPools->erase(iter);
}
}
KDataDriverPtr DataDriverFactory::getKDataDriver(const Parameter& params) {
@ -124,4 +144,17 @@ KDataDriverPtr DataDriverFactory::getKDataDriver(const Parameter& params) {
return result;
}
DataDriverFactory::KDataDriverPool* DataDriverFactory::getKDataDriverPool(const Parameter& params) {
KDataDriverPool* result;
string name = params.get<string>("type");
to_upper(name);
auto iter = m_kdataDriverPools->find(name);
if (iter != m_kdataDriverPools->end()) {
result = iter->second;
auto prototype = result->getPrototype();
prototype->init(params);
}
return result;
}
} /* namespace hku */

View File

@ -9,6 +9,7 @@
#ifndef DATADRIVERFACTORY_H_
#define DATADRIVERFACTORY_H_
#include "DriverPool.h"
#include "BaseInfoDriver.h"
#include "KDataDriver.h"
#include "BlockInfoDriver.h"
@ -43,10 +44,15 @@ public:
static void removeKDataDriver(const string &name);
static KDataDriverPtr getKDataDriver(const Parameter &);
typedef DriverPool<KDataDriver> KDataDriverPool;
static KDataDriverPool *getKDataDriverPool(const Parameter &);
private:
static map<string, BaseInfoDriverPtr> *m_baseInfoDrivers;
static map<string, BlockInfoDriverPtr> *m_blockDrivers;
static map<string, KDataDriverPtr> *m_kdataDrivers;
static map<string, KDataDriverPool *> *m_kdataDriverPools;
};
} /* namespace hku */

View File

@ -0,0 +1,165 @@
/*
* Copyright(C) 2021 hikyuu.org
*
* Create on: 2021-01-19
* Author: fasiondog
*/
#pragma once
#include <thread>
#include <mutex>
#include <queue>
#include <memory>
#include "../utilities/Parameter.h"
namespace hku {
/**
*
* @tparam DriverType DriverType *clone()
* @ingroup DataDriver
*/
template <typename DriverType>
class DriverPool {
public:
DriverPool() = delete;
DriverPool(const DriverPool &) = delete;
DriverPool &operator=(const DriverPool &) = delete;
/**
*
* @param param pool
* @param maxConnect 0
* @param maxIdleConnect 0
*/
explicit DriverPool(const std::shared_ptr<DriverType> &prototype, size_t maxConnect = 0,
size_t maxIdleConnect = 100)
: m_maxSize(maxConnect),
m_maxIdelSize(maxIdleConnect),
m_count(0),
m_prototype(prototype),
m_closer(this) {}
/**
*
*/
virtual ~DriverPool() {
while (!m_driverList.empty()) {
DriverType *p = m_driverList.front();
m_driverList.pop();
if (p) {
delete p;
}
}
}
/** 连接实例指针类型 */
typedef std::shared_ptr<DriverType> DriverPtr;
/** 获取可用连接,如超出允许的最大连接数将返回空指针 */
DriverPtr get() noexcept {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_driverList.empty()) {
if (m_maxSize > 0 && m_count >= m_maxSize) {
HKU_WARN(
"There are no idle connections. The current maximum number of connections: {}",
m_maxSize);
return DriverPtr();
}
m_count++;
return DriverPtr(m_prototype->clone(), m_closer);
}
DriverType *p = m_driverList.front();
m_driverList.pop();
return DriverPtr(p, m_closer);
}
/**
*
* @param timeout 0
* @return
*/
DriverPtr getAndWait(int timeout = 0) {
int sleep = 100;
int count = 0, max_count = timeout / sleep;
DriverPtr result = get();
while (!result) {
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
result = get();
if (timeout > 0) {
YH_CHECK(count++ < max_count, "Can't get connect, timeout!");
}
}
return result;
}
DriverPtr getPrototype() {
return m_prototype;
}
/** 当前活动的连接数 */
size_t count() const {
return m_count;
}
/** 当前空闲的资源数 */
size_t idleCount() const {
return m_driverList.size();
}
/** 释放当前所有的空闲资源 */
void releaseIdleConnect() {
std::lock_guard<std::mutex> lock(m_mutex);
while (!m_driverList.empty()) {
DriverType *p = m_driverList.front();
m_driverList.pop();
m_count--;
if (p) {
delete p;
}
}
}
private:
/** 归还至连接池 */
void returnDriver(DriverType *p) {
std::lock_guard<std::mutex> lock(m_mutex);
if (p) {
if (m_driverList.size() < m_maxIdelSize) {
m_driverList.push(p);
} else {
delete p;
m_count--;
}
} else {
m_count--;
HKU_WARN("Trying to return an empty pointer!");
}
}
private:
size_t m_maxSize; //允许的最大连接数
size_t m_maxIdelSize; //允许的最大空闲连接数
size_t m_count; //当前活动的连接数
std::shared_ptr<DriverType> m_prototype; // 驱动原型
std::mutex m_mutex;
std::queue<DriverType *> m_driverList;
class DriverCloser {
public:
explicit DriverCloser(DriverPool *pool) : m_pool(pool) {}
void operator()(DriverType *conn) {
if (m_pool && conn) {
m_pool->returnDriver(conn);
}
}
private:
DriverPool *m_pool;
};
DriverCloser m_closer;
};
} // namespace hku

View File

@ -27,10 +27,20 @@ HKU_API std::ostream& operator<<(std::ostream& os, const KDataDriverPtr& driver)
KDataDriver::KDataDriver() : m_name("") {}
KDataDriver::KDataDriver(const Parameter& params) : m_params(params) {}
KDataDriver::KDataDriver(const string& name) : m_name(name) {
to_upper(m_name);
}
shared_ptr<KDataDriver> KDataDriver::clone() {
shared_ptr<KDataDriver> ptr = _clone();
ptr->m_params = m_params;
ptr->m_name = m_name;
ptr->_init();
return ptr;
}
bool KDataDriver::checkType() {
bool result = false;
try {

View File

@ -26,6 +26,8 @@ class HKU_API KDataDriver {
public:
KDataDriver();
KDataDriver(const Parameter& params);
/**
*
* @param name
@ -39,6 +41,17 @@ public:
/** 驱动初始化 */
bool init(const Parameter&);
typedef shared_ptr<KDataDriver> KDataDriverPtr;
/**
*
*/
KDataDriverPtr clone();
/**
*
*/
virtual KDataDriverPtr _clone() = 0;
/**
*
* @return

View File

@ -17,8 +17,10 @@ namespace hku {
KDataTempCsvDriver::~KDataTempCsvDriver() {}
KDataTempCsvDriver::KDataTempCsvDriver() : KDataTempCsvDriver("", "") {}
KDataTempCsvDriver::KDataTempCsvDriver(const string& day_filename, const string& min_filename)
: m_day_filename(day_filename), m_min_filename(min_filename) {
: KDataDriver("TMPCSV"), m_day_filename(day_filename), m_min_filename(min_filename) {
for (int i = 0; i < LAST; ++i) {
m_column[i] = Null<size_t>();
}

View File

@ -19,9 +19,22 @@ namespace hku {
*/
class KDataTempCsvDriver : public KDataDriver {
public:
KDataTempCsvDriver();
KDataTempCsvDriver(const string& day_filename, const string& min_filename);
virtual ~KDataTempCsvDriver();
void setDayFileName(const string& day_filename) {
m_day_filename = day_filename;
}
void setMinFileName(const string& min_filename) {
m_min_filename = min_filename;
}
virtual KDataDriverPtr _clone() override {
return std::make_shared<KDataTempCsvDriver>(m_day_filename, m_min_filename);
}
virtual bool isIndexFirst() override {
return false;
}

View File

@ -20,6 +20,10 @@ public:
H5KDataDriver();
virtual ~H5KDataDriver();
virtual KDataDriverPtr _clone() override {
return std::make_shared<H5KDataDriver>();
}
virtual bool _init() override;
virtual bool isIndexFirst() override {

View File

@ -27,6 +27,10 @@ public:
MySQLKDataDriver();
virtual ~MySQLKDataDriver();
virtual KDataDriverPtr _clone() override {
return std::make_shared<MySQLKDataDriver>();
}
virtual bool _init() override;
virtual bool isIndexFirst() override {

View File

@ -18,6 +18,10 @@ public:
TdxKDataDriver();
virtual ~TdxKDataDriver();
virtual KDataDriverPtr _clone() override {
return std::make_shared<TdxKDataDriver>();
}
virtual bool _init() override;
virtual bool isIndexFirst() override {

View File

@ -75,11 +75,48 @@ public:
return ConnectPtr(p, m_closer);
}
/**
*
* @param timeout 0
* @return
*/
ConnectPtr getAndWait(int timeout = 0) {
int sleep = 100;
int count = 0, max_count = timeout / sleep;
ConnectPtr result = getConnect();
while (!result) {
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
result = getConnect();
if (timeout > 0) {
YH_CHECK(count++ < max_count, "Can't get connect, timeout!");
}
}
return result;
}
/** 当前活动的连接数 */
size_t count() const {
return m_count;
}
/** 当前空闲的资源数 */
size_t idleCount() const {
return m_connectList.size();
}
/** 释放当前所有的空闲资源 */
void releaseIdleConnect() {
std::lock_guard<std::mutex> lock(m_mutex);
while (!m_connectList.empty()) {
ConnectType *p = m_connectList.front();
m_connectList.pop();
m_count--;
if (p) {
delete p;
}
}
}
private:
/** 归还至连接池 */
void returnDriver(ConnectType *p) {

View File

@ -17,6 +17,10 @@ public:
KDataDriverWrap(const string& name) : KDataDriver(name) {}
virtual ~KDataDriverWrap() {}
KDataDriverPtr _clone() {
return this->get_override("_clone")();
}
bool _init() {
if (override call = get_override("_init")) {
return call();
@ -154,7 +158,9 @@ void export_KDataDriver() {
.add_property("name",
make_function(&KDataDriver::name, return_value_policy<copy_const_reference>()))
.def("getParam", &KDataDriver::getParam<boost::any>)
.def("clone", &KDataDriver::clone)
.def("_clone", pure_virtual(&KDataDriver::_clone))
.def("_init", &KDataDriver::_init, &KDataDriverWrap::default_init)
.def("isIndexFirst", pure_virtual(&KDataDriver::isIndexFirst))
.def("canParallelLoad", pure_virtual(&KDataDriver::canParallelLoad))