更新mysql数据导入

This commit is contained in:
fasiondog 2020-10-26 00:01:28 +08:00
parent 4d438971c1
commit 14edaa5186
5 changed files with 353 additions and 64 deletions

View File

@ -23,11 +23,12 @@
# SOFTWARE.
import os
import datetime
from pathlib import Path
import mysql.connector
from .common import MARKETID, get_stktype_list
from hikyuu.data.common import MARKETID, get_stktype_list
def is_exist_db(connect):
@ -117,9 +118,27 @@ def get_stock_list(connect, market, quotations):
def get_table(connect, market, code, ktype):
"""note: ktype: 'DAY' | 'MIN' | 'MIN5' """
cur = connect.cursor()
schema = "{market}_{ktype}".format(market=market, ktype=ktype).lower()
ktype_dict = {
'day': 'day',
'week': 'week',
'month': 'month',
'quarter': 'quarter',
'halfyear': 'halfyear',
'year': 'year',
'min': 'min',
'1min': 'min',
'5min': 'min5',
'15min': 'min15',
'30min': 'min30',
'60min': 'min60',
'min1': 'min',
'min5': 'min5',
'min15': 'min15',
'min30': 'min30',
'min60': 'min60',
}
schema = "{market}_{ktype}".format(market=market, ktype=ktype_dict[ktype.lower()]).lower()
cur.execute("SELECT 1 FROM information_schema.SCHEMATA where SCHEMA_NAME='{}'".format(schema))
a = cur.fetchone()
if not a:
@ -164,6 +183,281 @@ def get_lastdatetime(connect, tablename):
return a[0]
def update_extern_data(connect, market, code, data_type):
"""更新周线、月线、15分钟线等扩展数据索引"""
def getWeekDate(olddate):
y = olddate // 100000000
m = olddate // 1000000 - y * 100
d = olddate // 10000 - (y * 10000 + m * 100)
tempdate = datetime.date(y, m, d)
# python中周一是第0天周五的第4天
startdate = tempdate + datetime.timedelta(0 - tempdate.weekday())
enddate = tempdate + datetime.timedelta(4 - tempdate.weekday())
return (
startdate.year * 100000000 + startdate.month * 1000000 + startdate.day * 10000,
enddate.year * 100000000 + enddate.month * 1000000 + enddate.day * 10000
)
def getMonthDate(olddate):
y = olddate // 100000000
m = olddate // 1000000 - y * 100
import calendar
_, d = calendar.monthrange(y, m)
return (y * 100000000 + m * 1000000 + 10000, y * 100000000 + m * 1000000 + d * 10000)
def getQuarterDate(olddate):
startDict = {1: 1, 2: 1, 3: 1, 4: 4, 5: 4, 6: 4, 7: 7, 8: 7, 9: 7, 10: 10, 11: 10, 12: 10}
endDict = {1: 3, 2: 3, 3: 3, 4: 6, 5: 6, 6: 6, 7: 9, 8: 9, 9: 9, 10: 12, 11: 12, 12: 12}
d_dict = {3: 310000, 6: 300000, 9: 300000, 12: 310000}
y = olddate // 100000000
m = olddate // 1000000 - y * 100
start_m = startDict[m]
end_m = endDict[m]
return (
y * 100000000 + start_m * 1000000 + 10000,
y * 100000000 + end_m * 1000000 + d_dict[end_m]
)
def getHalfyearDate(olddate):
y = olddate // 100000000
m = olddate // 1000000 - y * 100
return (
y * 100000000 + (1010000 if m < 7 else 7010000),
y * 100000000 + (6300000 if m < 7 else 12310000)
)
def getYearDate(olddate):
y = olddate // 100000000
return (y * 100000000 + 1010000, y * 100000000 + 12310000)
def getMin60Date(olddate):
mint = olddate - olddate // 10000 * 10000
newdate = olddate // 10000 * 10000
if mint <= 1030:
startdate = newdate + 931
enddate = newdate + 1030
elif mint <= 1130:
startdate = newdate + 1031
enddate = newdate + 1130
elif mint <= 1400:
startdate = newdate + 1301
enddate = newdate + 1400
else:
startdate = newdate + 1401
enddate = newdate + 1500
return (startdate, enddate)
def getMin15Date(olddate):
mint = olddate - olddate // 10000 * 10000
newdate = olddate // 10000 * 10000
if mint <= 945:
startdate = newdate + 931
enddate = newdate + 945
elif mint <= 1000:
startdate = newdate + 946
enddate = newdate + 1000
elif mint <= 1015:
startdate = newdate + 1001
enddate = newdate + 1015
elif mint <= 1030:
startdate = newdate + 1016
enddate = newdate + 1030
elif mint <= 1045:
startdate = newdate + 1031
enddate = newdate + 1045
elif mint <= 1100:
startdate = newdate + 1046
enddate = newdate + 1100
elif mint <= 1115:
startdate = newdate + 1101
enddate = newdate + 1115
elif mint <= 1130:
startdate = newdate + 1116
enddate = newdate + 1130
elif mint <= 1315:
startdate = newdate + 1301
enddate = newdate + 1315
elif mint <= 1330:
startdate = newdate + 1316
enddate = newdate + 1330
elif mint <= 1345:
startdate = newdate + 1331
enddate = newdate + 1345
elif mint <= 1400:
startdate = newdate + 1346
enddate = newdate + 1400
elif mint <= 1415:
startdate = newdate + 1401
enddate = newdate + 1415
elif mint <= 1430:
startdate = newdate + 1416
enddate = newdate + 1430
elif mint <= 1445:
startdate = newdate + 1431
enddate = newdate + 1445
else:
startdate = newdate + 1446
enddate = newdate + 1500
return (startdate, enddate)
def getMin30Date(olddate):
mint = olddate - olddate // 10000 * 10000
newdate = olddate // 10000 * 10000
if mint <= 1000:
startdate = newdate + 931
enddate = newdate + 1000
elif mint <= 1030:
startdate = newdate + 1001
enddate = newdate + 1030
elif mint <= 1100:
startdate = newdate + 1031
enddate = newdate + 1100
elif mint <= 1130:
startdate = newdate + 1101
enddate = newdate + 1130
elif mint <= 1330:
startdate = newdate + 1301
enddate = newdate + 1330
elif mint <= 1400:
startdate = newdate + 1331
enddate = newdate + 1400
elif mint <= 1430:
startdate = newdate + 1401
enddate = newdate + 1430
else:
startdate = newdate + 1431
enddate = newdate + 1500
return (startdate, enddate)
def getNewDate(index_type, olddate):
if index_type == 'week':
return getWeekDate(olddate)
elif index_type == 'month':
return getMonthDate(olddate)
elif index_type == 'quarter':
return getQuarterDate(olddate)
elif index_type == 'halfyear':
return getHalfyearDate(olddate)
elif index_type == 'year':
return getYearDate(olddate)
elif index_type == 'min15':
return getMin15Date(olddate)
elif index_type == 'min30':
return getMin30Date(olddate)
elif index_type == 'min60':
return getMin60Date(olddate)
else:
return None
if data_type.lower() == 'day':
index_list = ('week', 'month', 'quarter', 'halfyear', 'year')
#index_list = ('week', )
base_table = get_table(connect, market, code, 'day')
else:
index_list = ('min15', 'min30', 'min60')
#index_list = ('min15', )
base_table = get_table(connect, market, code, 'min5')
base_lastdate = get_lastdatetime(connect, base_table)
if base_lastdate is None:
return
for index_type in index_list:
index_table = get_table(connect, market, code, index_type)
index_last_date = get_lastdatetime(connect, index_table)
# 获取当前日期大于等于索引表最大日期的基础表日期列表
cur = connect.cursor()
if index_last_date is None:
cur.execute(
'select date, open, high, low, close, amount, count from {} order by date asc '.
format(base_table)
)
else:
start_date, _ = getNewDate(index_type, index_last_date)
cur.execute(
'select date, open, high, low, close, amount, count from {} where date>={}'.format(
base_table, start_date
)
)
base_list = [x for x in cur]
cur.close()
last_start_date = 199012010000
last_end_date = 199012010000
update_buffer = []
insert_buffer = []
#for current_base in base_list:
length_base_all = len(base_list)
for x in range(length_base_all):
current_date = base_list[x][0]
if current_date <= last_end_date:
continue
last_start_date, last_end_date = getNewDate(index_type, current_date)
#cur = connect.cursor()
#cur.execute(
# 'select date, open, high, low, close, amount, count from {} \
# where date>={} and date<={} order by date asc'.format(
# base_table, last_start_date, last_end_date
# )
#)
#base_record_list = [r for r in cur]
#cur.close()
base_record_list = []
start_ix = x
ix_date = current_date
while start_ix < length_base_all and \
ix_date >= last_start_date and ix_date <= last_end_date:
base_record_list.append(base_list[start_ix])
ix_date = base_list[start_ix][0]
start_ix += 1
if not base_record_list:
continue
length = len(base_record_list)
open_price = base_record_list[0][1]
high_price = base_record_list[0][2]
low_price = base_record_list[0][3]
close_price = base_record_list[length - 1][4]
amount = base_record_list[0][5]
count = base_record_list[0][6]
for i in range(1, length):
if base_record_list[i][2] > high_price:
high_price = base_record_list[i][2]
if base_record_list[i][3] < low_price:
low_price = base_record_list[i][3]
amount += base_record_list[i][5]
count += base_record_list[i][6]
if last_end_date == index_last_date:
update_buffer.append(
(open_price, high_price, low_price, close_price, amount, count, last_end_date)
)
else:
insert_buffer.append(
(last_end_date, open_price, high_price, low_price, close_price, amount, count)
)
if update_buffer:
cur = connect.cursor()
cur.executemany(
"update {} set open=%s, high=%s, low=%s, close=%s, amount=%s, count=%s \
where date=%s".format(index_table), update_buffer
)
connect.commit()
cur.close()
if insert_buffer:
cur = connect.cursor()
cur.executemany(
"insert into {} (date, open, high, low, close, amount, count) \
values (%s, %s, %s, %s, %s, %s, %s)".format(index_table), insert_buffer
)
connect.commit()
cur.close()
if __name__ == '__main__':
host = '127.0.0.1'
port = 3306
@ -172,18 +466,5 @@ if __name__ == '__main__':
cnx = mysql.connector.connect(user=usr, password=pwd, host=host, port=port)
create_database(cnx)
#print(get_codepre_list(cnx, 2, ['stock']))
#update_last_date(cnx, 1, 20180101)
#print(get_last_date(cnx, 1))
#print(get_stock_list(cnx, 'sh', ['stock']))
#print(get_lastdatetime(cnx, "`hb__min`.`bch_usd`"))
print(get_table(cnx, 'sh', '000001', 'MIN'))
from pathlib import Path
#x = list(Path("./mysql_upgrade").glob("*.sql"))
#print(x)
cur = cnx.cursor()
cur.close()
cnx.commit()
update_extern_data(cnx, 'SH', '000001', 'day')
cnx.close()

View File

@ -28,7 +28,7 @@ CREATE TABLE `hku_base`.`stkweight` (
`totalCount` DOUBLE UNSIGNED NOT NULL,
`freeCount` DOUBLE UNSIGNED NOT NULL,
PRIMARY KEY (`id`),
INDEX `stockid` (`stockid`)
INDEX `ix_stockid_date` (`stockid`, `date`)
)
COLLATE='utf8_general_ci'
ENGINE=InnoDB

View File

@ -31,7 +31,8 @@ import mysql.connector
from .common import MARKETID, STOCKTYPE, get_stktype_list
from .common_mysql import (
create_database, get_marketid, get_codepre_list, get_stock_list, get_table, get_lastdatetime
create_database, get_marketid, get_codepre_list, get_stock_list, get_table, get_lastdatetime,
update_extern_data
)
from .weight_to_mysql import qianlong_import_weight
@ -316,13 +317,12 @@ def import_data(
this_count = import_one_stock_data(connect, api, market, ktype, stock, startDate)
add_record_count += this_count
"""
if this_count > 0:
if ktype == 'DAY':
update_hdf5_extern_data(h5file, market.upper() + stock[2], 'DAY')
update_extern_data(connect, market.upper(), stock[2], 'DAY')
elif ktype == '5MIN':
update_hdf5_extern_data(h5file, market.upper() + stock[2], '5MIN')
"""
update_extern_data(connect, market.upper(), stock[2], '5MIN')
if progress:
progress(i, total)

View File

@ -178,7 +178,7 @@ if __name__ == '__main__':
host = '127.0.0.1'
port = 3306
usr = 'root'
pwd = 'Hikyuu2020*'
pwd = ''
tdx_server = '119.147.212.81'
tdx_port = 7709
quotations = ['stock', 'fund']

View File

@ -65,9 +65,10 @@ KRecordList MySQLKDataDriver::_getKRecordList(const string& market, const string
return result;
};
try {
KRecordTable r(market, code, kType);
SQLStatementPtr st = con->getStatement(
fmt::format("{} order by date limit {}, {}", r.getSelectSQL(), start_ix, end_ix - start_ix));
SQLStatementPtr st = con->getStatement(fmt::format(
"{} order by date limit {}, {}", r.getSelectSQL(), start_ix, end_ix - start_ix));
st->exec();
while (st->moveNext()) {
@ -87,6 +88,9 @@ KRecordList MySQLKDataDriver::_getKRecordList(const string& market, const string
HKU_ERROR("Failed get record: {}", record.str());
}
}
} catch (...) {
// 表可能不存在
}
return result;
}
@ -103,6 +107,7 @@ KRecordList MySQLKDataDriver::_getKRecordList(const string& market, const string
return result;
};
try {
KRecordTable r(market, code, ktype);
SQLStatementPtr st =
con->getStatement(fmt::format("{} order by date where date >= {} and date < {}",
@ -125,6 +130,9 @@ KRecordList MySQLKDataDriver::_getKRecordList(const string& market, const string
HKU_ERROR("Failed get record: {}", record.str());
}
}
} catch (...) {
// 表可能不存在
}
return result;
}