hikyuu2/hikyuu/draw/drawplot/bokeh_draw.py
2020-07-27 00:35:54 +08:00

456 lines
16 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf8 -*-
# cp936
"""
K线图K线图
"""
from hikyuu import *
from .common import get_draw_title
from bokeh.plotting import figure,ColumnDataSource
from bokeh.models import DatetimeTickFormatter,HoverTool, Title, Label
from bokeh.layouts import column
from bokeh.io import output_notebook, output_file, show
g_use_in_notbook = False
g_figure = None
g_axes = None
g_use_in_notbook = False
def use_bokeh_in_notebook(in_notebook=False):
global g_use_in_notbook
g_use_in_notbook = in_notebook
if g_use_in_notbook:
output_notebook()
def gcf():
"""获取当前Axis"""
global g_figure
return g_figure
def gca():
"""获取当前figure"""
global g_axes
return g_axes
def show_gcf():
global g_use_in_notbook
if not g_use_in_notbook:
output_file("{}/bokeh.html".format(StockManager.instance().tmpdir()))
show(gcf())
def create_one_axes_figure(figsize=(800,450)):
"""生成一个仅含有1个坐标轴的figure并返回其坐标轴对象
:param figsize: (, )
:return: ax
"""
global g_figure
global g_axes
g_axes = figure(width=figsize[0], height=figsize[1])
g_figure = column(g_axes)
return g_axes
def create_two_axes_figure(figsize=(800,450)):
"""生成一个含有2个坐标轴的figure并返回坐标轴列表
:param figsize: (, )
:return: (ax1, ax2)
"""
global g_figure
global g_axes
ax1 = figure(width=figsize[0], height=int(figsize[1] * 0.667))
ax2 = figure(width=figsize[0], height=int(figsize[1] * 0.333))
g_figure = column(ax1, ax2)
g_axes = ax2
return ax1, ax2
def create_three_axes_figure(figsize=(800,450)):
"""生成一个含有2个坐标轴的figure并返回坐标轴列表
:param figsize: (, )
:return: (ax1, ax2)
"""
global g_figure
global g_axes
ax1 = figure(width=figsize[0], height=int(figsize[1] * 0.5))
ax2 = figure(width=figsize[0], height=int(figsize[1] * 0.25))
ax3 = figure(width=figsize[0], height=int(figsize[1] * 0.25))
g_figure = column(ax1, ax2, ax3)
g_axes = ax3
return ax1, ax2, ax3
def create_figure(n=1, figsize=(800, 450)):
"""生成含有指定坐标轴数量的窗口最大只支持4个坐标轴。
:param int n:
:param figsize: (, )
:return: (ax1, ax2, ...) [1,4]None
"""
if n == 1:
return create_one_axes_figure(figsize)
elif n == 2:
return create_two_axes_figure(figsize)
elif n == 3:
return create_three_axes_figure(figsize)
else:
pass
def trans_color(color):
"""将 matplotlib 常用的 color 转换为对应的 bokeh color如果无法转换则返回原值"""
color_dict = {'r': 'red', 'g': 'green', 'k': 'black', 'b': 'blue'}
return color_dict[color] if color in color_dict else color
def get_date_format(kdata):
return '@datetime{%F}' if kdata.get_query().ktype \
in (Query.DAY, Query.WEEK, Query.MONTH, Query.QUARTER, Query.HALFYEAR, Query.YEAR) \
else '@datetime{%F %H:%M:%S}'
def kplot(kdata, new=True, axes=None, colorup='r', colordown='g'):
"""绘制K线图
:param KData kdata: K线数据
:param bool new: axes时生效
:param axes:
:param colorup: the color of the rectangle where close >= open
:param colordown: the color of the rectangle where close < open
"""
if not kdata:
print("kdata is None")
return
if not axes:
axes = create_figure() if new or gca() is None else gca()
k = kdata
inc_k = [r for r in k if r.close > r.open]
dec_k = [r for r in k if r.close <= r.open]
inc_source = ColumnDataSource(dict(datetime=[r.datetime.datetime() for r in inc_k],
open=[r.open for r in inc_k],
high=[r.high for r in inc_k],
low=[r.low for r in inc_k],
close=[r.close for r in inc_k],
amount=[r.amount for r in inc_k],
volume=[r.volume for r in inc_k]))
dec_source = ColumnDataSource(dict(datetime=[r.datetime.datetime() for r in dec_k],
open=[r.open for r in dec_k],
high=[r.high for r in dec_k],
low=[r.low for r in dec_k],
close=[r.close for r in dec_k],
amount=[r.amount for r in dec_k],
volume=[r.volume for r in dec_k]))
w = 12*60*60*1000
colorup = trans_color(colorup)
colordown = trans_color(colordown)
axes.segment(x0='datetime', y0='high', x1='datetime', y1='low', color=colorup, source=inc_source)
axes.segment(x0='datetime', y0='high', x1='datetime', y1='low', color=colordown, source=dec_source)
axes.vbar(x='datetime', width=w, top='close', bottom='open', fill_color="white",
line_color=colorup, source=inc_source)
axes.vbar(x='datetime', width=w, top='open', bottom='close', fill_color="green",
line_color=colordown, source=dec_source)
axes.add_tools(HoverTool(tooltips=[("index", "$index"), ('', get_date_format(k)),
("开盘价", "@open{0.0000}"), ("最高价", "@high{0.0000}"),
("最低价", "@low{0.0000}"),("收盘价", "@close{0.0000}"),
("成交金额", "@amount{0.0000}"), ("成交量", "@volume{0.0000}")],
formatters = { "datetime": "datetime"}))
axes.xaxis[0].formatter = DatetimeTickFormatter()
axes.title.text = k.get_stock().name
axes.title.align = "center"
axes.title.text_font_size = "16px"
last_record = kdata[-1]
color = colorup if last_record.close > kdata[-2].close else colordown
text = u'%s :%.2f :%.2f :%.2f :%.2f :%.2f%%' % (
last_record.datetime, last_record.open, last_record.high, last_record.low,
last_record.close, 100 * (last_record.close - kdata[-2].close) / kdata[-2].close
)
label = Label(
x=axes.plot_width * 0.01,
y=axes.plot_height * 0.82,
x_units='screen', y_units='screen',
text=text,
render_mode='css',
text_font_size='14px',
text_color=color,
background_fill_color='white',
background_fill_alpha=0.5
)
axes.add_layout(label)
show_gcf()
def mkplot(kdata, new=True, axes=None, colorup='r', colordown='g', ticksize=3):
"""绘制美式K线图
:param KData kdata: K线数据
:param bool new: axes时生效
:param axes:
:param colorup: the color of the lines where close >= open
:param colordown: the color of the lines where close < open
:param ticksize: open/close tick marker in points
"""
print("Bokeh 暂不支持绘制美式K线图, 请使用 matplotlib")
def get_color(index):
"""获取 index 指定的颜色,如果没有固定返回 index=0 的颜色"""
color_list = ['blue', 'orange', 'green', 'red', 'purple', 'darkgoldenrod', 'pink', 'darkcyan']
return color_list[index] if index in range(len(color_list)) else color_list[0]
def iplot(
indicator,
new=True,
axes=None,
kref=None,
legend_on=False,
text_on=False,
text_color='k',
zero_on=False,
label=None,
*args,
**kwargs
):
"""绘制indicator曲线
:param Indicator indicator: indicator实例
:param axes:
:param new: axes时生效
:param kref: K线数据便X坐标
:param legend_on:
:param text_on:
:param text_color:
:param zero_on: y=0线
:param str label: label显示文字信息text_on legend_on True
:param args: pylab plot参数
:param kwargs: pylab plot参数marker
markerfacecolor
markeredgecolor
"""
if not indicator:
print("indicator is None")
return
if not axes:
axes = create_figure() if new or gca() is None else gca()
if not label:
label = "%s %.2f" % (indicator.long_name, indicator[-1])
line_color = get_color(len(axes.tags))
width = 1
py_indicator = [None if x == constant.null_price else x for x in indicator]
if kref:
x_value = [r.datetime() for r in kref.get_datetime_list()]
source = ColumnDataSource(dict(datetime=x_value, value=py_indicator))
if legend_on:
axes.line(x='datetime', y='value', legend_label=label, line_width=width,
line_color=line_color, source=source)
else:
axes.line(x='datetime', y='value', line_width=width, line_color=line_color,
source=source)
axes.add_tools(HoverTool(tooltips=[("index", "$index"), ('', indicator.name),
('', get_date_format(kref)), ("", "@value{0.0000}")],
formatters = { "datetime": "datetime"}))
axes.xaxis[0].formatter = DatetimeTickFormatter()
else:
x_value = list(range(len(indicator)))
source = ColumnDataSource(dict(datetime=x_value, value=py_indicator))
if legend_on:
axes.line(x='datetime', y='value', legend_label=label, line_width=width,
line_color=line_color, source=source)
else:
axes.line(x='datetime', y='value', line_width=width, line_color=line_color,
source=source)
axes.add_tools(HoverTool(tooltips=[("index", "$index"), ('', indicator.name),
("", "@value{0.0000}")]))
if zero_on:
axes.line(x=x_value, y=[0 for i in range(len(indicator))], line_color='black')
if text_on:
label = Label(
x=int(axes.plot_width * 0.01),
y=int(axes.plot_height * 0.88),
x_units='screen', y_units='screen',
text=label,
render_mode='css',
text_font_size='14px',
text_color=trans_color(text_color),
background_fill_color='white',
background_fill_alpha=0.5
)
axes.add_layout(label)
axes.tags.append(line_color)
show_gcf()
def ibar(
indicator,
new=True,
axes=None,
kref=None,
legend_on=False,
text_on=False,
text_color='k',
label=None,
width=0.4,
color='r',
edgecolor='r',
zero_on=False,
*args,
**kwargs
):
"""绘制indicator柱状图
:param Indicator indicator: Indicator实例
:param axes:
:param new: axes时生效
:param kref: K线数据便X坐标
:param legend_on:
:param text_on:
:param text_color:
:param str label: label显示文字信息text_on legend_on True
:param zero_on: y=0线
:param width: Bar的宽度
:param color: Bar的颜色
:param edgecolor: Bar边缘颜色
:param args: pylab plot参数
:param kwargs: pylab plot参数
"""
if not indicator:
print("indicator is None")
return
if not axes:
axes = create_figure() if new or gca() is None else gca()
if not label:
label = "%s %.2f" % (indicator.long_name, indicator[-1])
line_color = get_color(len(axes.tags))
py_indicator = [None if x == constant.null_price else x for x in indicator]
if kref:
width = 12*60*60*1000
x_value = [r.datetime() for r in kref.get_datetime_list()]
source = ColumnDataSource(dict(datetime=x_value, value=py_indicator))
if legend_on:
axes.vbar(x='datetime', top='value', legend_label=label, width=width,
color=line_color, source=source)
else:
axes.vbar(x='datetime', top='value', width=width, color=line_color,
source=source)
axes.add_tools(HoverTool(tooltips=[("index", "$index"), ('', indicator.name),
('', get_date_format(kref)), ("", "@value{0.0000}")],
formatters = { "datetime": "datetime"}))
axes.xaxis[0].formatter = DatetimeTickFormatter()
else:
width = 0.5
x_value = list(range(len(indicator)))
source = ColumnDataSource(dict(datetime=x_value, value=py_indicator))
if legend_on:
axes.vbar(x='datetime', top='value', legend_label=label, width=width,
color=line_color, source=source)
else:
axes.vbar(x='datetime', top='value', width=width, color=line_color,
source=source)
axes.add_tools(HoverTool(tooltips=[("index", "$index"), ('', indicator.name),
("", "@value{0.0000}")]))
if text_on:
label = Label(
x=int(axes.plot_width * 0.01),
y=int(axes.plot_height * 0.88),
x_units='screen', y_units='screen',
text=label,
render_mode='css',
text_font_size='14px',
text_color=trans_color(text_color),
background_fill_color='white',
background_fill_alpha=0.5
)
axes.add_layout(label)
axes.tags.append(line_color)
show_gcf()
def ax_draw_macd2(axes, ref, kdata, n1=12, n2=26, n3=9):
"""绘制MACD。
BAR值变化与参考序列ref变化不一致时
BAR和参考序列ref同时上涨
BAR和参考序列ref同时下跌绿
:param axes:
:param ref: EMA
:param KData kdata: KData
:param int n1: MACD 1
:param int n2: MACD 2
:param int n3: MACD 3
"""
macd = MACD(CLOSE(kdata), n1, n2, n3)
bmacd, fmacd, smacd = macd.get_result(0), macd.get_result(1), macd.get_result(2)
text = 'MACD(%s,%s,%s) DIF:%.2f, DEA:%.2f, BAR:%.2f' % (
n1, n2, n3, fmacd[-1], smacd[-1], bmacd[-1]
)
label = Label(
x=int(axes.plot_width * 0.01),
y=int(axes.plot_height * 0.88),
x_units='screen', y_units='screen',
text=text,
render_mode='css',
text_font_size='14px',
background_fill_color='white',
background_fill_alpha=0.5
)
axes.add_layout(label)
total = len(kdata)
x = [i - 0.2 for i in range(0, total)]
y = bmacd
x1, x2, x3 = [x[0]], [], []
y1, y2, y3 = [y[0]], [], []
for i in range(1, total):
if ref[i] - ref[i - 1] > 0 and y[i] - y[i - 1] > 0:
x2.append(x[i])
y2.append(y[i])
elif ref[i] - ref[i - 1] < 0 and y[i] - y[i - 1] < 0:
x3.append(x[i])
y3.append(y[i])
else:
x1.append(x[i])
y1.append(y[i])
#axes.vbar(x1, y1, width=0.4, color='#BFBFBF', edgecolor='#BFBFBF')
#axes.vbar(x2, y2, width=0.4, color='r', edgecolor='r')
#axes.vbar(x3, y3, width=0.4, color='g', edgecolor='g')
#axt = axes.twinx()
#axt.grid(False)
#axt.set_yticks([])
#fmacd.plot(axes=axt, linestyle='--', legend_on=False, text_on=False)
#smacd.plot(axes=axt, legend_on=False, text_on=False)
fmacd.plot(axes=axes, linestyle='--', legend_on=False, text_on=False)
smacd.plot(axes=axes, legend_on=False, text_on=False)
#for label in axt.get_xticklabels():
# label.set_visible(False)