调整权息数据下载为多进程

This commit is contained in:
fasiondog 2024-09-04 02:26:18 +08:00
parent 9f3dbb078e
commit 3cf031e829
4 changed files with 69 additions and 37 deletions

View File

@ -631,6 +631,7 @@ class MyMainWindow(QMainWindow, Ui_MainWindow):
self.escape_time_thread = None
self.start_import_pushButton.setEnabled(True)
self.import_detail_textEdit.append("导入完毕!")
self.hdf5_weight_label.setText("导入完毕!")
if can_upgrade():
self.import_detail_textEdit.append("========================================================")
self.import_detail_textEdit.append(
@ -661,7 +662,6 @@ class MyMainWindow(QMainWindow, Ui_MainWindow):
self.import_detail_textEdit.append('导入 {} 分时记录数:{}'.format(msg[3], msg[5]))
elif msg_task_name == 'IMPORT_WEIGHT':
self.hdf5_weight_label.setText(msg[2])
if msg[2] == '导入权息数据完毕!':
self.import_detail_textEdit.append('导入权息记录数:{}'.format(msg[3]))
elif msg[2] == '导入通达信财务信息完毕!':

View File

@ -42,7 +42,7 @@ from hikyuu.util.check import hku_catch, hku_check
class ImportWeightToSqliteTask:
def __init__(self, log_queue, queue, config, dest_dir):
def __init__(self, log_queue, queue, config, dest_dir, market, cmd, host, port):
self.logger = logging.getLogger(self.__class__.__name__)
self.log_queue = log_queue
self.queue = queue
@ -50,6 +50,10 @@ class ImportWeightToSqliteTask:
self.dest_dir = dest_dir
self.msg_name = 'IMPORT_WEIGHT'
self.status = "no run"
self.market = market
self.cmd = cmd # "weight" | "finance"
self.host = host
self.port = port
@hku_catch(trace=True)
def __call__(self):
@ -76,44 +80,31 @@ class ImportWeightToSqliteTask:
self.logger.debug('use mysql import weight')
except Exception as e:
#self.queue.put([self.msg_name, str(e), -1, 0, total_count])
# self.queue.put([self.msg_name, str(e), -1, 0, total_count])
self.queue.put([self.msg_name, 'INFO', str(e), 0, 0])
self.queue.put([self.msg_name, '', 0, None, total_count])
self.status = "failure"
return
try:
hosts = search_best_tdx()
api = TdxHq_API()
hku_check(api.connect(hosts[0][2], hosts[0][3]), "failed connect pytdx {}:{}!", hosts[0][2], hosts[0][3])
hku_check(api.connect(self.host, self.port), "failed connect pytdx {}:{}!", self.host, self.port)
self.logger.info('正在导入权息数据')
self.queue.put([self.msg_name, '正在导入权息数据...', 0, 0, 0])
total_count = 0
for market in g_market_list:
count = pytdx_import_weight(api, connect, market)
self.logger.info("导入 {} 权息记录数: {}".format(market, count))
total_count += count
self.queue.put([self.msg_name, '导入权息数据完毕!', 0, 0, total_count])
self.logger.info('导入权息数据完毕')
self.queue.put([self.msg_name, '下载通达信财务信息(上证)...', 0, 0, 0])
x = pytdx_import_finance(connect, api, "SH")
self.queue.put([self.msg_name, '下载通达信财务信息(深证)...', 0, 0, 0])
x += pytdx_import_finance(connect, api, "SZ")
self.queue.put([self.msg_name, '下载通达信财务信息(北证)...', 0, 0, 0])
x += pytdx_import_finance(connect, api, "BJ")
self.queue.put([self.msg_name, '导入通达信财务信息完毕!', 0, 0, x])
if self.cmd == 'weight':
count = pytdx_import_weight(api, connect, self.market)
self.logger.info("导入 {} 权息记录数: {}".format(self.market, count))
self.queue.put([self.msg_name, '导入权息数据完毕!', 0, 0, f'{self.market} {total_count}'])
elif self.cmd == 'finance':
self.queue.put([self.msg_name, f'下载通达信当前财务信息({self.market})...', 0, 0, 0])
x = pytdx_import_finance(connect, api, self.market)
self.logger.info(f'导入 {self.market} 通达信当前财务信息数: {x}')
self.queue.put([self.msg_name, '导入通达信财务信息完毕!', 0, 0, f'{self.market} {x}'])
api.disconnect()
except Exception as e:
self.logger.error(e)
#self.queue.put([self.msg_name, str(e), -1, 0, total_count])
# self.queue.put([self.msg_name, str(e), -1, 0, total_count])
self.queue.put([self.msg_name, 'INFO', str(e), 0, 0])
finally:
connect.commit()

View File

@ -87,10 +87,10 @@ class UsePytdxImportToH5Thread(QThread):
sqlite_file_name = dest_dir + "/stock.db"
self.tasks = []
if self.config.getboolean('weight', 'enable', fallback=False):
self.tasks.append(
ImportWeightToSqliteTask(self.log_queue, self.queue,
self.config, dest_dir))
# if self.config.getboolean('weight', 'enable', fallback=False):
# self.tasks.append(
# ImportWeightToSqliteTask(self.log_queue, self.queue,
# self.config, dest_dir))
if self.config.getboolean('finance', 'enable', fallback=True):
self.tasks.append(
@ -112,6 +112,8 @@ class UsePytdxImportToH5Thread(QThread):
task_count += market_count
if self.config.getboolean('ktype', 'time', fallback=False):
task_count += market_count
if self.config.getboolean('weight', 'enable', fallback=False):
task_count += (market_count*2)
self.logger.info('搜索通达信服务器')
self.send_message(['INFO', '搜索通达信服务器'])
@ -210,6 +212,19 @@ class UsePytdxImportToH5Thread(QThread):
start_date.month * 1000000 + start_date.day * 10000))
cur_host += 1
if self.config.getboolean('weight', 'enable', fallback=False):
for market in g_market_list:
self.tasks.append(
ImportWeightToSqliteTask(self.log_queue, self.queue,
self.config, dest_dir, market, 'weight', use_hosts[cur_host][0],
use_hosts[cur_host][1]))
cur_host += 1
self.tasks.append(
ImportWeightToSqliteTask(self.log_queue, self.queue,
self.config, dest_dir, market, 'finance', use_hosts[cur_host][0],
use_hosts[cur_host][1]))
cur_host += 1
def run(self):
try:
self.init_task()
@ -298,6 +313,8 @@ class UsePytdxImportToH5Thread(QThread):
self.send_message([taskname, ktype])
elif taskname == 'IMPORT_ZH_BOND10':
self.send_message([taskname, ktype])
elif taskname == 'IMPORT_WEIGHT':
pass
else:
self.send_message([taskname, 'FINISHED'])
continue

View File

@ -30,7 +30,10 @@ from hikyuu.gui.data.ImportTdxToH5Task import ImportTdxToH5Task
from hikyuu.gui.data.ImportWeightToSqliteTask import ImportWeightToSqliteTask
from hikyuu.gui.data.ImportHistoryFinanceTask import ImportHistoryFinanceTask
from pytdx.hq import TdxHq_API
from hikyuu.data.common import g_market_list
from hikyuu.data.common_sqlite3 import create_database
from hikyuu.data.common_pytdx import search_best_tdx
from hikyuu.data.tdx_to_h5 import tdx_import_stock_name_from_file
from hikyuu.util import *
@ -57,18 +60,39 @@ class UseTdxImportToH5Thread(QThread):
self.quotations.append('stock')
if self.config['quotation']['fund']:
self.quotations.append('fund')
#if self.config['quotation']['future']:
# if self.config['quotation']['future']:
# self.quotations.append('future')
#通达信盘后没有债券数据。另外如果用Pytdx下载债券数据
#每个债券本身的数据很少但债券种类太多占用空间和时间太多,用途较少不再考虑导入
#if self.config['quotation']['bond']:
# 通达信盘后没有债券数据。另外如果用Pytdx下载债券数据
# 每个债券本身的数据很少但债券种类太多占用空间和时间太多,用途较少不再考虑导入
# if self.config['quotation']['bond']:
# self.quotations.append('bond')
hosts = search_best_tdx()
api = TdxHq_API()
hku_check(api.connect(hosts[0][2], hosts[0][3]), "failed connect pytdx {}:{}!", hosts[0][2], hosts[0][3])
self.queue = Queue()
self.tasks = []
cur_host = 0
if self.config.getboolean('weight', 'enable', fallback=False):
self.tasks.append(ImportWeightToSqliteTask(self.log_queue, self.queue, self.config, dest_dir))
for market in g_market_list:
self.tasks.append(
ImportWeightToSqliteTask(self.log_queue, self.queue,
self.config, dest_dir, market, 'weight', hosts[cur_host][2],
hosts[cur_host][3]))
cur_host += 1
if cur_host >= len(hosts):
cur_host = 0
self.tasks.append(
ImportWeightToSqliteTask(self.log_queue, self.queue,
self.config, dest_dir, market, 'finance', hosts[cur_host][2],
hosts[cur_host][3]))
cur_host += 1
if cur_host >= len(hosts):
cur_host = 0
if self.config.getboolean('finance', 'enable', fallback=True):
self.tasks.append(ImportHistoryFinanceTask(self.log_queue, self.queue, self.config, dest_dir))
if self.config.getboolean('ktype', 'day', fallback=False):
@ -115,7 +139,7 @@ class UseTdxImportToH5Thread(QThread):
dest_dir = self.config['hdf5']['dir']
hdf5_import_progress = {'SH': {'DAY': 0, '1MIN': 0, '5MIN': 0}, 'SZ': {'DAY': 0, '1MIN': 0, '5MIN': 0}}
#正在导入代码表
# 正在导入代码表
self.send_message(['START_IMPORT_CODE'])
connect = sqlite3.connect(dest_dir + "/stock.db")