优化 ostream 重定向至 python,C++多线程可能引起 jupyter 执行崩溃

This commit is contained in:
fasiondog 2024-03-26 15:43:30 +08:00
parent 01279141ae
commit ff9494f6e7
9 changed files with 89 additions and 186 deletions

View File

@ -73,11 +73,22 @@ __version__ = get_version()
sm = StockManager.instance()
class iodog:
# Only for compatibility with old code
@staticmethod
def open():
open_ostream_to_python()
@staticmethod
def close():
close_ostream_to_python()
# 如果是在 jupyter 环境中运行重定向C++ stdout/stderr输出至python
if in_ipython_frontend():
sm.python_in_jupyter = True
hku_info("hikyuu version: {}", get_version_with_build())
iodog = OstreamRedirect()
iodog.open()

View File

@ -26,24 +26,11 @@
namespace hku {
static std::thread::id g_main_thread_id = std::this_thread::get_id();
static std::atomic<int> g_ioredirect_to_python_count = 0;
bool isLogInMainThread() {
return std::this_thread::get_id() == g_main_thread_id;
}
int getIORedirectToPythonCount() {
return g_ioredirect_to_python_count;
}
void increaseIORedicrectToPythonCount() {
g_ioredirect_to_python_count++;
}
void decreaseIORedicrectToPythonCount() {
g_ioredirect_to_python_count--;
}
static LOG_LEVEL g_log_level = LOG_LEVEL::LOG_TRACE;
std::string g_unknown_error_msg{"Unknown error!"};

View File

@ -50,10 +50,6 @@ namespace hku {
bool HKU_API isLogInMainThread();
void HKU_API increaseIORedicrectToPythonCount();
void HKU_API decreaseIORedicrectToPythonCount();
int HKU_API getIORedirectToPythonCount();
/**********************************************
* Use SPDLOG for logging
*********************************************/

View File

@ -52,8 +52,14 @@ static py::dict combinate_ind_analysis(const Stock& stk, const KQuery& query, Tr
c_sell_inds.emplace_back(sell_inds[i].cast<Indicator>());
}
std::map<std::string, Performance> pers;
{
OStreamToPython guard(false);
py::gil_scoped_release release;
pers = combinateIndicatorAnalysis(stk, query, tm, sys, c_buy_inds, c_sell_inds, n);
}
py::dict result;
auto pers = combinateIndicatorAnalysis(stk, query, tm, sys, c_buy_inds, c_sell_inds, n);
for (auto iter = pers.begin(); iter != pers.end(); ++iter) {
result[iter->first.c_str()] = std::move(iter->second);
}
@ -74,8 +80,13 @@ static py::dict combinate_ind_analysis_with_block(const Block& blk, const KQuery
c_sell_inds.emplace_back(sell_inds[i].cast<Indicator>());
}
auto records =
combinateIndicatorAnalysisWithBlock(blk, query, tm, sys, c_buy_inds, c_sell_inds, n);
vector<CombinateAnalysisOutput> records;
{
OStreamToPython guard(false);
py::gil_scoped_release release;
records =
combinateIndicatorAnalysisWithBlock(blk, query, tm, sys, c_buy_inds, c_sell_inds, n);
}
std::vector<py::list> tmp;
@ -140,6 +151,7 @@ static py::dict analysis_sys_list(const py::object& pystk_list, const KQuery& qu
vector<AnalysisSystemWithBlockOut> records;
{
OStreamToPython guard(false);
py::gil_scoped_release release;
records = analysisSystemList(sys_list, stk_list, query);
}

View File

@ -5,16 +5,34 @@
* Author: fasiondog
*/
#include <pybind11/pybind11.h>
#include <pybind11/iostream.h>
#include "ioredirect.h"
namespace py = pybind11;
namespace hku {
void export_io_redirect(py::module& m) {
py::class_<OstreamRedirect>(m, "OstreamRedirect")
.def(py::init<bool, bool>(), py::arg("stdout") = true, py::arg("stderr") = true)
.def("__enter__", &OstreamRedirect::enter)
.def("__exit__", &OstreamRedirect::exit)
.def("open", &OstreamRedirect::enter)
.def("close", &OstreamRedirect::exit);
pybind11::detail::OstreamRedirect OStreamToPython::ms_io_redirect(true, true);
bool OStreamToPython::ms_opened{false};
void open_ostream_to_python() {
if (!OStreamToPython::ms_opened) {
OStreamToPython::ms_io_redirect.enter();
OStreamToPython::ms_opened = true;
}
}
void close_ostream_to_python() {
if (OStreamToPython::ms_opened) {
OStreamToPython::ms_io_redirect.exit();
OStreamToPython::ms_opened = false;
}
}
} // namespace hku
namespace py = pybind11;
using namespace hku;
void export_io_redirect(py::module &m) {
m.def("open_ostream_to_python", open_ostream_to_python);
m.def("close_ostream_to_python", close_ostream_to_python);
}

View File

@ -12,164 +12,42 @@
*/
#pragma once
#ifndef IOREDIRECT_H_
#define IOREDIRECT_H_
#include <streambuf>
#include <ostream>
#include <string>
#include <memory>
#include <iostream>
#include <hikyuu/Log.h>
#include <pybind11/iostream.h>
#include <hikyuu/DataType.h>
using namespace pybind11;
namespace hku {
#ifdef __GNUC__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wattributes"
#endif
void open_ostream_to_python();
void close_ostream_to_python();
class OStreamToPython final {
friend void open_ostream_to_python();
friend void close_ostream_to_python();
public:
explicit OStreamToPython(bool open) : m_old_opened(open) {
if (open && !ms_opened) {
ms_io_redirect.enter();
} else if (!open && ms_opened) {
ms_io_redirect.exit();
}
}
~OStreamToPython() {
if (m_old_opened && !ms_opened) {
ms_io_redirect.enter();
} else if (!m_old_opened && ms_opened) {
ms_io_redirect.exit();
}
}
// Buffer that writes to Python instead of C++
class pythonbuf : public std::streambuf {
private:
using traits_type = std::streambuf::traits_type;
bool m_old_opened;
char d_buffer[1024];
object pywrite;
object pyflush;
int overflow(int c) {
if (!traits_type::eq_int_type(c, traits_type::eof())) {
*pptr() = traits_type::to_char_type(c);
pbump(1);
}
return sync() ? traits_type::not_eof(c) : traits_type::eof();
}
int sync() {
if (pbase() != pptr()) {
// This subtraction cannot be negative, so dropping the sign
str line(pbase(), static_cast<size_t>(pptr() - pbase()));
pywrite(line);
pyflush();
setp(pbase(), epptr());
}
return 0;
}
public:
pythonbuf(object pyostream)
: pywrite(pyostream.attr("write")), pyflush(pyostream.attr("flush")) {
setp(d_buffer, d_buffer + sizeof(d_buffer) - 1);
}
/// Sync before destroy
~pythonbuf() {
sync();
}
private:
static pybind11::detail::OstreamRedirect ms_io_redirect;
static bool ms_opened;
};
class scoped_ostream_redirect {
protected:
std::streambuf *old;
std::ostream &costream;
pythonbuf buffer;
public:
scoped_ostream_redirect(std::ostream &costream = std::cout,
object pyostream = module::import("sys").attr("stdout"))
: costream(costream), buffer(pyostream) {
old = costream.rdbuf(&buffer);
}
~scoped_ostream_redirect() {
costream.rdbuf(old);
}
scoped_ostream_redirect(const scoped_ostream_redirect &) = delete;
scoped_ostream_redirect(scoped_ostream_redirect &&other) = default;
scoped_ostream_redirect &operator=(const scoped_ostream_redirect &) = delete;
scoped_ostream_redirect &operator=(scoped_ostream_redirect &&) = delete;
};
class scoped_estream_redirect : public scoped_ostream_redirect {
public:
scoped_estream_redirect(std::ostream &costream = std::cerr,
object pyostream = module::import("sys").attr("stderr"))
: scoped_ostream_redirect(costream, pyostream) {}
};
class OstreamRedirect {
bool do_stdout_;
bool do_stderr_;
bool had_stdout_; // prevent the reentrant
bool had_stderr_; // prevent the reentrant
std::unique_ptr<scoped_ostream_redirect> redirect_stdout;
std::unique_ptr<scoped_estream_redirect> redirect_stderr;
public:
// OstreamRedirect()
// : do_stdout_(false), do_stderr_(false) {}
OstreamRedirect(bool do_stdout = true, bool do_stderr = true)
: do_stdout_(do_stdout), do_stderr_(do_stderr), had_stdout_(false), had_stderr_(false) {}
OstreamRedirect(const OstreamRedirect &src) {
do_stdout_ = src.do_stdout_;
do_stderr_ = src.do_stderr_;
had_stdout_ = src.had_stdout_;
had_stderr_ = src.had_stderr_;
}
~OstreamRedirect() {
if (had_stdout_ && do_stdout_) {
hku::decreaseIORedicrectToPythonCount();
redirect_stdout.reset();
had_stdout_ = false;
std::cout << "redirected std::cout has been returned" << std::endl;
}
if (had_stderr_ && do_stderr_) {
hku::decreaseIORedicrectToPythonCount();
redirect_stderr.reset();
had_stderr_ = false;
std::cout << "redirected std::cerr has been returned" << std::endl;
}
}
void enter() {
if (!had_stdout_ && do_stdout_) {
hku::increaseIORedicrectToPythonCount();
redirect_stdout.reset(new scoped_ostream_redirect());
had_stdout_ = true;
std::cout << "std::cout are redirected to python::stdout" << std::endl;
}
if (!had_stderr_ && do_stderr_) {
hku::increaseIORedicrectToPythonCount();
redirect_stderr.reset(new scoped_estream_redirect());
had_stderr_ = true;
std::cout << "std::cerr are redirected to python::stderr" << std::endl;
}
}
void exit() {
if (had_stdout_ && do_stdout_) {
hku::decreaseIORedicrectToPythonCount();
redirect_stdout.reset();
had_stdout_ = false;
std::cout << "redirected std::cout has been returned" << std::endl;
}
if (had_stderr_ && do_stderr_) {
hku::decreaseIORedicrectToPythonCount();
redirect_stderr.reset();
had_stderr_ = false;
std::cout << "redirected std::cerr has been returned" << std::endl;
}
}
};
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif
#endif /** IOREDIRECT_H_ **/
} // namespace hku

View File

@ -62,14 +62,14 @@ PYBIND11_MODULE(core312, m) {
PYBIND11_MODULE(core, m) {
#endif
py::register_exception<hku::exception>(m, "HKUException");
StockManager::instance().runningInPython(true);
#if HKU_ENABLE_SEND_FEEDBACK
sendPythonVersionFeedBack(PY_MAJOR_VERSION, PY_MINOR_VERSION, PY_MICRO_VERSION);
#endif
py::register_exception<hku::exception>(m, "HKUException");
export_bind_stl(m);
export_DataType(m);
export_Constant(m);

View File

@ -18,6 +18,7 @@
#include <string>
#include "convert_any.h"
#include "pickle_support.h"
#include "ioredirect.h"
namespace py = pybind11;

View File

@ -124,7 +124,7 @@ else
set_configvar("HKU_DEBUG_MODE", 0)
end
set_configvar("USE_SPDLOG_LOGGER", 1) -- 是否使用spdlog作为日志输出
set_configvar("USE_SPDLOG_ASYNC_LOGGER", 0) -- 使用异步的spdlog
set_configvar("USE_SPDLOG_ASYNC_LOGGER", 1) -- 使用异步的spdlog
set_configvar("CHECK_ACCESS_BOUND", 1)
if is_plat("macosx") then
set_configvar("SUPPORT_SERIALIZATION", 0)