continue for strategy

This commit is contained in:
fasiondog 2021-02-22 00:50:27 +08:00
parent 7e0800a73e
commit f00a31694d
3 changed files with 103 additions and 17 deletions

View File

@ -6,13 +6,23 @@
*/
#include <csignal>
#include <unordered_set>
#include "../utilities/os.h"
#include "../utilities/IniParser.h"
#include "../agent/SpotAgent.h"
#include "../global/GlobalSpotAgent.h"
#include "../global/schedule/scheduler.h"
#include "StrategyBase.h"
namespace hku {
std::atomic_bool StrategyBase::ms_keep_running = true;
void StrategyBase::sig_handler(int sig) {
if (sig == SIGINT) {
ms_keep_running = false;
}
}
StrategyBase::StrategyBase() : StrategyBase("Strategy") {}
StrategyBase::StrategyBase(const string& name) {
@ -28,14 +38,8 @@ StrategyBase::StrategyBase(const string& name) {
StrategyBase::StrategyBase(const string& name, const string& config_file)
: m_name(name), m_config_file(config_file) {}
StrategyBase::~StrategyBase() {}
static bool g_stratege_keep_running = true;
static void sig_handler(int sig) {
if (sig == SIGINT) {
g_stratege_keep_running = false;
}
StrategyBase::~StrategyBase() {
HKU_INFO("Quit Strategy {}", m_name);
}
void StrategyBase::run() {
@ -111,12 +115,34 @@ void StrategyBase::run() {
sm.init(baseParam, blockParam, kdataParam, preloadParam, hkuParam, m_context);
// 启动行情接收代理
auto& agent = SpotAgent::instance();
agent.addPostProcess([this]() { this->on_bar(); });
auto& agent = *getGlobalSpotAgent();
agent.addPostProcess([this]() { this->onSpot(); });
startSpotAgent(true);
while (g_stratege_keep_running) {
std::this_thread::sleep_for(std::chrono::seconds(3));
_startEventLoop();
}
void StrategyBase::onSpot() {
event([this]() { this->onTick(); });
const auto& ktype_list = getKTypeList();
for (const auto& ktype : ktype_list) {
event([this, ktype]() { this->onBar(ktype); });
}
}
/*
* 线 python GIL
*/
void StrategyBase::_startEventLoop() {
while (ms_keep_running) {
event_type task;
m_event_queue.wait_and_pop(task);
if (task.isNullTask()) {
ms_keep_running = false;
} else {
task();
}
}
}

View File

@ -7,9 +7,12 @@
#pragma once
#include <future>
#include "../DataType.h"
#include "../StrategyContext.h"
#include "../utilities/Parameter.h"
#include "../utilities/thread/FuncWrapper.h"
#include "../utilities/thread/ThreadSafeQueue.h"
#include "../trade_sys/portfolio/Portfolio.h"
namespace hku {
@ -32,6 +35,14 @@ public:
m_name = name;
}
TMPtr getTM() const {
return m_tm;
}
void setTM(const TMPtr& tm) {
m_tm = tm;
}
const StrategyContext& context() const {
return m_context;
}
@ -70,14 +81,49 @@ public:
void run();
void onSpot();
virtual void init() = 0;
virtual void on_bar() = 0;
virtual void onTick() {}
virtual void onBar(const KQuery::KType& ktype) = 0;
private:
string m_name;
string m_config_file;
StrategyContext m_context;
TMPtr m_tm;
private:
static std::atomic_bool ms_keep_running;
static void sig_handler(int sig);
typedef FuncWrapper event_type;
ThreadSafeQueue<event_type> m_event_queue; // 消息队列
/** 先消息队列提交任务后返回的对应 future 的类型 */
template <typename ResultType>
using event_handle = std::future<ResultType>;
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4996)
#endif
/** 向线程池提交任务 */
template <typename FunctionType>
event_handle<typename std::result_of<FunctionType()>::type> event(FunctionType f) {
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()> task(f);
event_handle<result_type> res(task.get_future());
m_event_queue.push(std::move(task));
return res;
}
#ifdef _MSC_VER
#pragma warning(pop)
#endif
void _startEventLoop();
};
typedef shared_ptr<StrategyBase> StrategyPtr;

View File

@ -21,8 +21,20 @@ public:
this->get_override("init")();
}
void on_spot() override {
this->get_override("on_spot")();
void onTick() override {
if (override func = this->get_override("on_tick")) {
func();
} else {
this->StrategyBase::onTick();
}
}
void default_onTick() {
this->StrategyBase::onTick();
}
void onBar(const KQuery::KType& ktype) override {
this->get_override("on_bar")(ktype);
}
};
@ -61,6 +73,7 @@ void export_Strategy() {
.add_property("name",
make_function(strategy_get_name, return_value_policy<copy_const_reference>()),
strategy_set_name)
.add_property("tm", &StrategyBase::getTM, &StrategyBase::setTM, "账户管理")
.add_property("start_datetime", get_start_datetime, set_start_datetime, "起始日期")
.add_property(
"stock_list",
@ -73,5 +86,6 @@ void export_Strategy() {
.def("run", &StrategyBase::run)
.def("init", pure_virtual(&StrategyBase::init))
.def("on_spot", pure_virtual(&StrategyBase::on_spot));
.def("on_tick", &StrategyBase::onTick, &StrategyBaseWrap::default_onTick)
.def("on_bar", pure_virtual(&StrategyBase::onBar));
}