hikyuu2/hikyuu/extend.py
2020-07-07 00:34:25 +08:00

384 lines
12 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#
# 对 C++ 引出类和函数进行扩展
#
from datetime import *
from .cpp.core import *
# ------------------------------------------------------------------
# 常量定义各种C++中Null值
# ------------------------------------------------------------------
constant = Constant()
# ------------------------------------------------------------------
# 增加Datetime、Stock的hash支持以便可做为dict的key
# ------------------------------------------------------------------
Datetime.__hash__ = lambda self: self.number * 1000000 + self.millisecond * 1000 + self.microsecond
TimeDelta.__hash__ = lambda self: self.ticks
Stock.__hash__ = lambda self: self.id
# ------------------------------------------------------------------
# 增强 Datetime
# ------------------------------------------------------------------
__old_Datetime_init__ = Datetime.__init__
__old_Datetime_add__ = Datetime.__add__
__old_Datetime_sub__ = Datetime.__sub__
def __new_Datetime_init__(self, *args, **kwargs):
"""
- Datetime("2010-1-1 10:00:00")
- Python dateDatetime(date(2010,1,1))
- Python datetimeDatetime(datetime(2010,1,1,10)
- YYYYMMDDHHMM Datetime(201001011000)
- Datetime(year, month, day, hour=0, minute=0, second=0, millisecond=0, microsecond=0)
:py:func:`getDateRange`
:py:meth:`StockManager.getTradingCalendar`
"""
if not args:
__old_Datetime_init__(self, **kwargs)
# datetime实例同时也是date的实例判断必须放在date之前
elif isinstance(args[0], datetime):
d = args[0]
milliseconds = d.microsecond // 1000
microseconds = d.microsecond - milliseconds * 1000
__old_Datetime_init__(
self, d.year, d.month, d.day, d.hour, d.minute, d.second, milliseconds, microseconds
)
elif isinstance(args[0], date):
d = args[0]
__old_Datetime_init__(self, d.year, d.month, d.day, 0, 0, 0, 0)
elif isinstance(args[0], str):
__old_Datetime_init__(self, args[0])
else:
__old_Datetime_init__(self, *args)
def __new_Datetime_add__(self, td):
"""加上指定时长,时长对象可为 TimeDelta 或 datetime.timedelta 类型
:param TimeDelta td:
:rtype: Datetime
"""
if isinstance(td, TimeDelta):
return __old_Datetime_add__(self, td)
elif isinstance(td, timedelta):
return __old_Datetime_add__(self, TimeDelta(td))
else:
raise TypeError("unsupported operand type(s) for +: 'TimeDelta' and '{}'".format(type(td)))
def __new_Datetime_sub__(self, td):
"""减去指定的时长, 时长对象可为 TimeDelta 或 datetime.timedelta 类型
:param TimeDelta td:
:rtype: Datetime
"""
if isinstance(td, TimeDelta):
return __old_Datetime_sub__(self, td)
elif isinstance(td, timedelta):
return __old_Datetime_sub__(self, TimeDelta(td))
else:
raise TypeError("unsupported operand type(s) for +: 'TimeDelta' and '{}'".format(type(td)))
def Datetime_date(self):
"""转化生成 python 的 date"""
return date(self.year, self.month, self.day)
def Datetime_datetime(self):
"""转化生成 python 的 datetime"""
return datetime(
self.year, self.month, self.day, self.hour, self.minute, self.second, self.microsecond
)
Datetime.__init__ = __new_Datetime_init__
Datetime.__add__ = __new_Datetime_add__
Datetime.__radd__ = __new_Datetime_add__
Datetime.__sub__ = __new_Datetime_sub__
Datetime.date = Datetime_date
Datetime.datetime = Datetime_datetime
# ------------------------------------------------------------------
# 增强 TimeDelta
# ------------------------------------------------------------------
__old_TimeDelta_init__ = TimeDelta.__init__
__old_TimeDelta_add__ = TimeDelta.__add__
__old_TimeDelta_sub__ = TimeDelta.__sub__
def __new_TimeDelta_init__(self, *args, **kwargs):
"""
- datetime.timedelta TimdeDelta(timedelta实例)
- TimeDelta(days=0, hours=0, minutes=0, seconds=0, milliseconds=0, microseconds=0)
- -99999999 <= days <= 99999999
- -100000 <= hours <= 100000
- -100000 <= minutes <= 100000
- -8639900 <= seconds <= 8639900
- -86399000000 <= milliseconds <= 86399000000
- -86399000000 <= microseconds <= 86399000000
"""
if not args:
__old_TimeDelta_init__(self, **kwargs)
elif isinstance(args[0], timedelta):
days = args[0].days
secs = args[0].seconds
hours = secs // 3600
mins = secs // 60 - hours * 60
secs = secs - mins * 60 - hours * 3600
microsecs = args[0].microseconds
millisecs = microsecs // 1000
microsecs = microsecs - millisecs * 1000
__old_TimeDelta_init__(self, days, hours, mins, secs, millisecs, microsecs)
else:
__old_TimeDelta_init__(self, *args)
def __new_TimeDelta_add__(self, td):
"""可和 TimeDelta, datetime.timedelta, Datetime执行相加操作"""
if isinstance(td, TimeDelta):
return __old_TimeDelta_add__(self, td)
elif isinstance(td, timedelta):
return __old_TimeDelta_add__(self, TimeDelta(td))
elif isinstance(td, Datetime):
return td + self
elif isinstance(td, datetime):
return td + Datetime(datetime)
else:
raise TypeError("unsupported operand type(s) for +: 'TimeDelta' and '{}'".format(type(td)))
def __new_TimeDelta_sub__(self, td):
"""可减去TimeDelta, datetime.timedelta"""
return __old_TimeDelta_sub__(self, td) if isinstance(td, TimeDelta) else __old_TimeDelta_sub__(
self, TimeDelta(td)
)
def TimeDelta_timedelta(self):
""" 转化为 datetime.timedelta """
return timedelta(
days=self.days,
hours=self.hours,
minutes=self.minutes,
seconds=self.seconds,
milliseconds=self.milliseconds,
microseconds=self.microseconds
)
TimeDelta.__init__ = __new_TimeDelta_init__
TimeDelta.__add__ = __new_TimeDelta_add__
TimeDelta.__sub__ = __new_TimeDelta_sub__
TimeDelta.timedelta = TimeDelta_timedelta
# ------------------------------------------------------------------
# 增强 KData 的遍历
# ------------------------------------------------------------------
def KData_getitem(kdata, i):
"""
:param i: int | Datetime | slice | str
"""
if isinstance(i, int):
length = len(kdata)
index = length + i if i < 0 else i
if index < 0 or index >= length:
raise IndexError("index out of range: %d" % i)
return kdata.get(index)
elif isinstance(i, Datetime):
return kdata.getByDate(i)
elif isinstance(i, str):
return kdata.getByDate(Datetime(i))
elif isinstance(i, slice):
return [kdata.get(x) for x in range(*i.indices(len(kdata)))]
else:
raise IndexError("Error index type")
def KData_iter(kdata):
for i in range(len(kdata)):
yield kdata[i]
def KData_getPos(kdata, datetime):
"""
:param Datetime datetime:
:return: None
"""
pos = kdata._getPos(datetime)
return pos if pos != constant.null_size else None
KData.__getitem__ = KData_getitem
KData.__iter__ = KData_iter
KData.getPos = KData_getPos
# ------------------------------------------------------------------
# 封装增强其他C++ vector类型的遍历、打印
# ------------------------------------------------------------------
def list_getitem(data, i):
"""对C++引出的vector实现python的切片
vector类的__getitem__函数覆盖即可
"""
if isinstance(i, int):
length = len(data)
index = length + i if i < 0 else i
if index < 0 or index >= length:
raise IndexError("index out of range: %d" % i)
return data.get(index)
elif isinstance(i, slice):
return [data.get(x) for x in range(*i.indices(len(data)))]
else:
raise IndexError("Error index type")
PriceList.__getitem__ = list_getitem
StringList.__getitem__ = list_getitem
DatetimeList.__getitem__ = list_getitem
BlockList.__getitem__ = list_getitem
KRecordList.__getitem__ = list_getitem
TransList.__getitem__ = list_getitem
TimeLineList.__getitem__ = list_getitem
PriceList.__str__ = lambda self: str(list(self))
PriceList.__repr__ = lambda self: repr(list(self))
StringList.__str__ = lambda self: str(list(self))
StringList.__repr__ = lambda self: repr(list(self))
DatetimeList.__str__ = lambda self: str(list(self))
DatetimeList.__repr__ = lambda self: repr(list(self))
BlockList.__str__ = lambda self: str(list(self))
BlockList.__repr__ = lambda self: repr(list(self))
KRecordList.__str__ = lambda self: str(list(self))
KRecordList.__repr__ = lambda self: repr(list(self))
TransList.__str__ = lambda self: str(list(self))
TransList.__repr__ = lambda self: repr(list(self))
TimeLineList.__str__ = lambda self: str(list(self))
TimeLineList.__repr__ = lambda self: repr(list(self))
# ------------------------------------------------------------------
# 增加转化为 np.array、pandas.DataFrame 的功能
# ------------------------------------------------------------------
try:
import numpy as np
import pandas as pd
def KData_to_np(kdata):
"""转化为numpy结构数组"""
if kdata.getQuery().kType in ('DAY', 'WEEK', 'MONTH', 'QUARTER', 'HALFYEAR', 'YEAR'):
k_type = np.dtype(
{
'names': ['datetime', 'open', 'high', 'low', 'close', 'amount', 'volume'],
'formats': ['datetime64[D]', 'd', 'd', 'd', 'd', 'd', 'd']
}
)
else:
k_type = np.dtype(
{
'names': ['datetime', 'open', 'high', 'low', 'close', 'amount', 'volume'],
'formats': ['datetime64[ms]', 'd', 'd', 'd', 'd', 'd', 'd']
}
)
return np.array(
[
(k.datetime.datetime(), k.open, k.high, k.low, k.close, k.amount, k.volume)
for k in kdata
],
dtype=k_type
)
def KData_to_df(kdata):
"""转化为pandas的DataFrame"""
return pd.DataFrame.from_records(KData_to_np(kdata), index='datetime')
KData.to_np = KData_to_np
KData.to_df = KData_to_df
def PriceList_to_np(data):
"""仅在安装了numpy模块时生效转换为numpy.array"""
return np.array(data, dtype='d')
def PriceList_to_df(data):
"""仅在安装了pandas模块时生效转换为pandas.DataFrame"""
return pd.DataFrame(data.to_np(), columns=('Value', ))
PriceList.to_np = PriceList_to_np
PriceList.to_df = PriceList_to_df
def DatetimeList_to_np(data):
"""仅在安装了numpy模块时生效转换为numpy.array"""
return np.array(data, dtype='datetime64[D]')
def DatetimeList_to_df(data):
"""仅在安装了pandas模块时生效转换为pandas.DataFrame"""
return pd.DataFrame(data.to_np(), columns=('Datetime', ))
DatetimeList.to_np = DatetimeList_to_np
DatetimeList.to_df = DatetimeList_to_df
def TimeLine_to_np(data):
"""转化为numpy结构数组"""
t_type = np.dtype(
{
'names': ['datetime', 'price', 'vol'],
'formats': ['datetime64[ms]', 'd', 'd']
}
)
return np.array([(t.datetime.datetime(), t.price, t.vol) for t in data], dtype=t_type)
def TimeLine_to_df(kdata):
"""转化为pandas的DataFrame"""
return pd.DataFrame.from_records(TimeLine_to_np(kdata), index='datetime')
TimeLineList.to_np = TimeLine_to_np
TimeLineList.to_df = TimeLine_to_df
def TransList_to_np(data):
"""转化为numpy结构数组"""
t_type = np.dtype(
{
'names': ['datetime', 'price', 'vol', 'direct'],
'formats': ['datetime64[ms]', 'd', 'd', 'd']
}
)
return np.array(
[(t.datetime.datetime(), t.price, t.vol, t.direct) for t in data], dtype=t_type
)
def TransList_to_df(kdata):
"""转化为pandas的DataFrame"""
return pd.DataFrame.from_records(TransList_to_np(kdata), index='datetime')
TransList.to_np = TransList_to_np
TransList.to_df = TransList_to_df
except:
pass