Merge pull request #208 from fasiondog/feature/factor

pf 优化; 并行代码优化
This commit is contained in:
fasiondog 2024-03-28 15:47:40 +08:00 committed by GitHub
commit 56316f71d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 238 additions and 109 deletions

View File

@ -19,37 +19,30 @@ vector<AnalysisSystemWithBlockOut> HKU_API analysisSystemList(const SystemList&
size_t total = sys_list.size();
HKU_IF_RETURN(0 == total, result);
MQStealThreadPool tg;
vector<std::future<AnalysisSystemWithBlockOut>> tasks;
for (size_t i = 0; i < total; i++) {
result = parallel_for_index(0, total, [&](size_t i) {
const auto& sys = sys_list[i];
const auto& stk = stk_list[i];
AnalysisSystemWithBlockOut ret;
if (!sys || stk.isNull()) {
continue;
return ret;
}
tasks.emplace_back(tg.submit([&sys, &stk, &query]() {
AnalysisSystemWithBlockOut ret;
try {
sys->run(stk, query);
Performance per;
per.statistics(sys->getTM());
ret.market_code = stk.market_code();
ret.name = stk.name();
ret.values = per.values();
} catch (const std::exception& e) {
HKU_ERROR(e.what());
} catch (...) {
HKU_ERROR("Unknown error!");
}
return ret;
}));
}
try {
sys->run(stk, query);
Performance per;
per.statistics(sys->getTM());
ret.market_code = stk.market_code();
ret.name = stk.name();
ret.values = per.values();
} catch (const std::exception& e) {
HKU_ERROR(e.what());
} catch (...) {
HKU_ERROR("Unknown error!");
}
return ret;
});
for (auto& task : tasks) {
result.emplace_back(task.get());
}
return result;
}

View File

@ -36,15 +36,13 @@ Indicator HKU_API POS(const Block& block, KQuery query, SignalPtr sg) {
}
}
ThreadPool tg;
vector<SGPtr> sgs;
for (auto stk_iter = block.begin(); stk_iter != block.end(); ++stk_iter) {
auto stks = block.getAllStocks();
vector<SGPtr> sgs = parallel_for_index(0, stks.size(), [&](size_t i) {
auto tmpsg = sg->clone();
sgs.push_back(tmpsg);
KData kdata = stk_iter->getKData(query);
tg.submit([tmpsg, k = std::move(kdata)]() { tmpsg->setTO(k); });
}
tg.join();
auto kdata = stks[i].getKData(query);
tmpsg->setTO(kdata);
return tmpsg;
});
// 计算每日持仓的股票数
vector<size_t> position(dayTotal);

View File

@ -43,6 +43,31 @@ public:
// 当前收益 = 当前净资产 - 当前投入本值资产
// = cash + market_value - short_market_value - borrow_cash - base_cash - base_asset
// 当前总资产
price_t total_assets() const {
return cash + market_value + borrow_asset - short_market_value;
}
// 当前净资产
price_t net_assets() const {
return cash + market_value - short_market_value - borrow_cash;
}
// 总负债
price_t total_borrow() const {
return borrow_cash + borrow_asset;
}
// 当前投入本值资产
price_t total_base() const {
return base_cash + base_asset;
}
// 当前收益
price_t profit() const {
return cash + market_value - short_market_value - borrow_cash - base_cash - base_asset;
}
FundsRecord operator+(const FundsRecord& other) const;
FundsRecord& operator+=(const FundsRecord& other);

View File

@ -32,16 +32,13 @@ HKU_API std::ostream& operator<<(std::ostream& os, const PortfolioPtr& pf) {
return os;
}
Portfolio::Portfolio()
: m_name("Portfolio"), m_query(Null<KQuery>()), m_is_ready(false), m_need_calculate(true) {
setParam<int>("adjust_cycle", 1); // 调仓周期
setParam<bool>("trace", false); // 打印跟踪
Portfolio::Portfolio() : m_name("Portfolio"), m_query(Null<KQuery>()), m_need_calculate(true) {
initParam();
}
Portfolio::Portfolio(const string& name)
: m_name(name), m_query(Null<KQuery>()), m_is_ready(false), m_need_calculate(true) {
setParam<int>("adjust_cycle", 1); // 调仓周期
setParam<bool>("trace", false);
: m_name(name), m_query(Null<KQuery>()), m_need_calculate(true) {
initParam();
}
Portfolio::Portfolio(const TradeManagerPtr& tm, const SelectorPtr& se, const AFPtr& af)
@ -50,14 +47,17 @@ Portfolio::Portfolio(const TradeManagerPtr& tm, const SelectorPtr& se, const AFP
m_se(se),
m_af(af),
m_query(Null<KQuery>()),
m_is_ready(false),
m_need_calculate(true) {
setParam<int>("adjust_cycle", 1); // 调仓周期
setParam<bool>("trace", false);
initParam();
}
Portfolio::~Portfolio() {}
void Portfolio::initParam() {
setParam<int>("adjust_cycle", 1); // 调仓周期
setParam<bool>("trace", false); // 打印跟踪
}
void Portfolio::baseCheckParam(const string& name) const {
if ("adjust_cycle" == name) {
int adjust_cycle = getParam<int>("adjust_cycle");
@ -70,8 +70,6 @@ void Portfolio::paramChanged() {
}
void Portfolio::reset() {
m_is_ready = false;
m_pro_sys_list.clear();
m_real_sys_list.clear();
m_running_sys_set.clear();
m_delay_adjust_sys_list.clear();
@ -85,6 +83,7 @@ void Portfolio::reset() {
m_se->reset();
if (m_af)
m_af->reset();
m_need_calculate = true;
}
PortfolioPtr Portfolio::clone() {
@ -92,9 +91,7 @@ PortfolioPtr Portfolio::clone() {
p->m_params = m_params;
p->m_name = m_name;
p->m_query = m_query;
p->m_pro_sys_list = m_pro_sys_list;
p->m_real_sys_list = m_real_sys_list;
p->m_is_ready = m_is_ready;
p->m_need_calculate = m_need_calculate;
if (m_se)
p->m_se = m_se->clone();
@ -107,85 +104,67 @@ PortfolioPtr Portfolio::clone() {
return p;
}
bool Portfolio::_readyForRun() {
if (!m_se) {
HKU_WARN("m_se is null!");
m_is_ready = false;
return false;
}
void Portfolio::_readyForRun() {
SPEND_TIME(Portfolio_readyForRun);
HKU_CHECK(m_se, "m_se is null!");
HKU_CHECK(m_tm, "m_tm is null!");
HKU_CHECK(m_af, "m_am is null!");
if (!m_tm) {
HKU_WARN("m_tm is null!");
m_is_ready = false;
return false;
}
// se算法和af算法不匹配
HKU_CHECK(m_se->isMatchAF(m_af), "The current SE and AF do not match!");
if (!m_af) {
HKU_WARN("m_am is null!");
m_is_ready = false;
return false;
}
// se算法和af算法不匹配时给出告警
HKU_WARN_IF(!m_se->isMatchAF(m_af), "The current SE and AF do not match!");
// 检查账户是否存在初始资产
FundsRecord funds = m_tm->getFunds();
HKU_CHECK(funds.total_assets() > 0.0, "The current tm is zero assets!");
reset();
// 将影子账户指定给资产分配器
// 生成资金账户
m_cash_tm = m_tm->clone();
// 配置资产分配器
m_af->setTM(m_tm);
m_af->setCashTM(m_cash_tm);
// 为资金分配器设置关联查询条件
m_af->setQuery(m_query);
// 从 se 获取原型系统列表
m_pro_sys_list = m_se->getProtoSystemList();
auto pro_sys_list = m_se->getProtoSystemList();
// 获取所有备选子系统,为无关联账户的子系统分配子账号,对所有子系统做好启动准备
TMPtr pro_tm = crtTM(m_tm->initDatetime(), 0.0, m_tm->costFunc(), "TM_SUB");
size_t total = m_pro_sys_list.size();
size_t total = pro_sys_list.size();
m_real_sys_list.reserve(total);
for (size_t i = 0; i < total; i++) {
SystemPtr& pro_sys = m_pro_sys_list[i];
SystemPtr& pro_sys = pro_sys_list[i];
SystemPtr sys = pro_sys->clone();
m_real_sys_list.emplace_back(sys);
// 如果原型子系统没有关联账户,则为其创建一个和总资金金额相同的账户,以便能够运行
if (!pro_sys->getTM()) {
pro_sys->setTM(m_tm->clone());
}
// 为内部实际执行的系统创建初始资金为0的子账户
sys->setTM(pro_tm->clone());
sys->getTM()->name(fmt::format("TM_SUB{}", i));
sys->name(fmt::format("PF_Real_{}_{}_{}", i, sys->name(), sys->getStock().market_code()));
string sys_name = fmt::format("{}_{}", sys->name(), sys->getStock().market_code());
sys->getTM()->name(fmt::format("TM_SUB_{}", sys_name));
sys->name(fmt::format("PF_{}", sys_name));
HKU_CHECK(sys->readyForRun() && pro_sys->readyForRun(),
"Exist invalid system, it could not ready for run!");
HKU_CHECK(sys->readyForRun(), "Exist invalid system, it could not ready for run!");
KData k = sys->getStock().getKData(m_query);
sys->setTO(k);
pro_sys->setTO(k);
}
// 告知 se 当前实际运行的系统列表
m_se->calculate(m_real_sys_list, m_query);
m_is_ready = true;
return true;
}
void Portfolio::run(const KQuery& query, int adjust_cycle, bool force) {
HKU_CHECK(adjust_cycle > 0, "Invalid param adjust_cycle! {}", adjust_cycle);
SPEND_TIME(Portfolio_run);
setParam<int>("adjust_cycle", adjust_cycle);
setQuery(query);
if (force) {
m_need_calculate = true;
}
HKU_IF_RETURN(!m_need_calculate, void());
HKU_CHECK(_readyForRun(),
"readyForRun fails, check to see if a valid TradeManager, Selector, or "
"AllocateFunds instance have been specified.");
_readyForRun();
DatetimeList datelist = StockManager::instance().getTradingCalendar(query);
HKU_IF_RETURN(datelist.empty(), void());

View File

@ -90,14 +90,16 @@ public:
PortfolioPtr clone();
/** 获取所有原型系统列表,与 SE 同 */
const SystemList& getProtoSystemList() const;
const SystemList& getSystemList() const;
/** 获取所有实际运行的系统列表,与 SE 同 */
const SystemList& getRealSystemList() const;
private:
void initParam();
/** 运行前准备 */
bool _readyForRun();
void _readyForRun();
void _runMoment(const Datetime& date, bool adjust);
@ -109,10 +111,8 @@ protected:
AFPtr m_af;
KQuery m_query; // 关联的查询条件
bool m_is_ready; // 是否已做好运行准备
bool m_need_calculate; // 是否需要计算标志
SystemList m_pro_sys_list; // 所有原型系统列表,来自 SE
SystemList m_real_sys_list; // 所有实际运行的子系统列表
// 用于中间计算的临时数据
@ -136,7 +136,6 @@ private:
ar& BOOST_SERIALIZATION_NVP(m_se);
ar& BOOST_SERIALIZATION_NVP(m_af);
ar& BOOST_SERIALIZATION_NVP(m_query);
ar& BOOST_SERIALIZATION_NVP(m_is_ready);
ar& BOOST_SERIALIZATION_NVP(m_need_calculate);
}
@ -149,7 +148,6 @@ private:
ar& BOOST_SERIALIZATION_NVP(m_se);
ar& BOOST_SERIALIZATION_NVP(m_af);
ar& BOOST_SERIALIZATION_NVP(m_query);
ar& BOOST_SERIALIZATION_NVP(m_is_ready);
ar& BOOST_SERIALIZATION_NVP(m_need_calculate);
}
@ -219,10 +217,6 @@ inline void Portfolio::setAF(const AFPtr& af) {
}
}
inline const SystemList& Portfolio::getProtoSystemList() const {
return m_pro_sys_list;
}
inline const SystemList& Portfolio::getRealSystemList() const {
return m_real_sys_list;
}

View File

@ -17,8 +17,6 @@
namespace hku {
class HKU_API Portfolio;
/**
*
* @ingroup Selector
@ -102,7 +100,6 @@ public:
virtual bool isMatchAF(const AFPtr& af) = 0;
friend class HKU_API Portfolio;
/* 仅供PF调用由PF通知其实际运行的系统列表并启动计算 */
void calculate(const SystemList& sysList, const KQuery& query);

View File

@ -0,0 +1,87 @@
/*
* Copyright (c) 2024 hikyuu.org
*
* Created on: 2024-03-27
* Author: fasiondog
*/
#pragma once
#include <future>
#include <functional>
#include <vector>
#include "ThreadPool.h"
#include "StealThreadPool.h"
#include "MQThreadPool.h"
#include "MQStealThreadPool.h"
namespace hku {
inline std::vector<std::pair<size_t, size_t>> parallelIndexRange(size_t start, size_t end) {
std::vector<std::pair<size_t, size_t>> ret;
if (start >= end) {
return ret;
}
size_t total = end - start;
size_t cpu_num = std::thread::hardware_concurrency();
if (total <= cpu_num) {
ret.emplace_back(start, end);
return ret;
}
size_t per_num = total / cpu_num;
for (size_t i = start; i < per_num; i++) {
size_t first = i * cpu_num;
ret.emplace_back(first, first + cpu_num);
}
for (size_t i = per_num * cpu_num; i < total; i++) {
ret.emplace_back(i, i + 1);
}
return ret;
}
template <typename FunctionType, class TaskGroup = MQStealThreadPool>
void parallel_for_index_void(size_t start, size_t end, FunctionType f) {
auto ranges = parallelIndexRange(start, end);
TaskGroup tg;
for (size_t i = 0, total = ranges.size(); i < total; i++) {
tg.submit([=, range = ranges[i]]() {
for (size_t ix = range.first; ix < range.second; ix++) {
f(ix);
}
});
}
tg.join();
return;
}
template <typename FunctionType, class TaskGroup = MQStealThreadPool>
auto parallel_for_index(size_t start, size_t end, FunctionType f) {
auto ranges = parallelIndexRange(start, end);
TaskGroup tg;
std::vector<std::future<std::vector<typename std::result_of<FunctionType(size_t)>::type>>>
tasks;
for (size_t i = 0, total = ranges.size(); i < total; i++) {
tasks.emplace_back(tg.submit([&, range = ranges[i]]() {
std::vector<typename std::result_of<FunctionType(size_t)>::type> one_ret;
for (size_t ix = range.first; ix < range.second; ix++) {
one_ret.emplace_back(f(ix));
}
return one_ret;
}));
}
std::vector<typename std::result_of<FunctionType(size_t)>::type> ret;
for (auto& task : tasks) {
auto one = task.get();
for (auto&& value : one) {
ret.emplace_back(std::move(value));
}
}
return ret;
}
} // namespace hku

View File

@ -7,7 +7,4 @@
#pragma once
#include "ThreadPool.h"
#include "StealThreadPool.h"
#include "MQThreadPool.h"
#include "MQStealThreadPool.h"
#include "algorithm.h"

View File

@ -0,0 +1,61 @@
/*
* Copyright (c) 2024 hikyuu.org
*
* Created on: 2024-03-27
* Author: fasiondog
*/
#include "../../test_config.h"
#include <hikyuu/utilities/thread/algorithm.h>
using namespace hku;
TEST_CASE("test_parallelIndexRange") {
std::vector<std::pair<size_t, size_t>> expect{{0, 2}};
auto result = parallelIndexRange(0, 2);
CHECK_EQ(result.size(), 1);
for (size_t i = 0, len = expect.size(); i < len; i++) {
CHECK_EQ(result[i].first, expect[i].first);
CHECK_EQ(result[i].second, expect[i].second);
}
size_t cpu_num = std::thread::hardware_concurrency();
if (cpu_num == 32) {
result = parallelIndexRange(0, 100);
expect = {{0, 32}, {32, 64}, {64, 96}, {96, 97}, {97, 98}, {98, 99}, {99, 100}};
CHECK_EQ(result.size(), expect.size());
for (size_t i = 0, len = expect.size(); i < len; i++) {
CHECK_EQ(result[i].first, expect[i].first);
CHECK_EQ(result[i].second, expect[i].second);
}
} else if (cpu_num == 8) {
result = parallelIndexRange(0, 35);
expect = {{0, 8}, {8, 16}, {16, 24}, {24, 32}, {32, 33}, {33, 34}, {34, 35}};
CHECK_EQ(result.size(), expect.size());
for (size_t i = 0, len = expect.size(); i < len; i++) {
CHECK_EQ(result[i].first, expect[i].first);
CHECK_EQ(result[i].second, expect[i].second);
}
}
}
TEST_CASE("test_parallel_for_index") {
std::vector<int> values(100);
for (size_t i = 0, len = values.size(); i < len; i++) {
values[i] = i;
}
auto result = parallel_for_index(0, values.size(), [](size_t i) { return i + 1; });
std::vector<int> expect(100);
for (size_t i = 0, len = expect.size(); i < len; i++) {
expect[i] = i + 1;
}
CHECK_EQ(result.size(), expect.size());
for (size_t i = 0, len = expect.size(); i < len; i++) {
CHECK_EQ(result[i], expect[i]);
}
// parallel_for_index<void>(0, values.size(), [](size_t i) { HKU_INFO("i: {}", i); });
}

View File

@ -31,8 +31,6 @@ void export_Portfolio(py::module& m) {
.def_property("tm", &Portfolio::getTM, &Portfolio::setTM, "设置或获取交易管理对象")
.def_property("se", &Portfolio::getSE, &Portfolio::setSE, "设置或获取交易对象选择算法")
.def_property("af", &Portfolio::getAF, &Portfolio::setAF, "设置或获取资产分配算法")
.def_property_readonly("proto_sys_list", &Portfolio::getProtoSystemList,
py::return_value_policy::copy, "获取原型系统列")
.def_property_readonly("real_sys_list", &Portfolio::getRealSystemList,
py::return_value_policy::copy, "由 PF 运行时设定的实际运行系统列表")