在linux下测试了在终端中运行的功能

删除.api文件
This commit is contained in:
hzy15610046011 2020-09-25 16:31:45 +08:00
parent 98890d4748
commit 4ffff763b8
9 changed files with 516 additions and 82 deletions

View File

@ -1,62 +1,62 @@
{
"applications_toolbar": {
"enabled": true
},
"cftool": {
"enabled": true
},
"code_editor": {
"enabled": true
},
"dialog_demo": {
"enabled": true
},
"drawings_toolbar": {
"enabled": true
},
"extension_app_demo": {
"enabled": true
},
"extension_demo": {
"enabled": true
},
"extension_dialog_demo": {
"enabled": true
},
"file_tree": {
"enabled": true
},
"ipython_console": {
"enabled": true
},
"pmagg": {
"enabled": true
},
"setting_manager": {
"enabled": true
},
"test_ext": {
"enabled": true
},
"variable_viewer": {
"enabled": true
},
"workspace_inspector": {
"enabled": true
},
"pyminer_server": {
"enabled": true
},
"jsonrpc-dataserver": {
"enabled": true
},
"socket-dataserver": {
"enabled": true
},
"socket-server": {
"enabled": true
},
"data_miner": {
"enabled": true
}
{
"applications_toolbar": {
"enabled": true
},
"cftool": {
"enabled": true
},
"code_editor": {
"enabled": true
},
"dialog_demo": {
"enabled": true
},
"drawings_toolbar": {
"enabled": true
},
"extension_app_demo": {
"enabled": true
},
"extension_demo": {
"enabled": true
},
"extension_dialog_demo": {
"enabled": true
},
"file_tree": {
"enabled": true
},
"ipython_console": {
"enabled": true
},
"pmagg": {
"enabled": true
},
"setting_manager": {
"enabled": true
},
"test_ext": {
"enabled": true
},
"variable_viewer": {
"enabled": true
},
"workspace_inspector": {
"enabled": true
},
"pyminer_server": {
"enabled": true
},
"jsonrpc-dataserver": {
"enabled": true
},
"socket-dataserver": {
"enabled": true
},
"socket-server": {
"enabled": true
},
"data_miner": {
"enabled": true
}
}

View File

@ -0,0 +1,238 @@
# -*- coding: utf-8 -*-
"""
日期2020-09-21
作者: gandaiwei
说明
dbConnectAccountTool -> 连接账号管理工具
dbConnectTool -> 连接通道工具
dbFuncTool -> SQL命令处理工具
"""
from sqlalchemy import create_engine
from sshtunnel import SSHTunnelForwarder
import pandas as pd
import pickle
import os
class dbConnectAccountTool(object):
'''
数据库链接账号处理
注意
创建以后需要优先执行 LoadAccount
'''
def __init__(self):
self.pklroad = ".\dbConnectAccount.pkl"
self.dbconnectaccount = {}
self.dbtype = ""
self.connectname = ""
def attachConnetName(self, dbtype = "", connectname = ""):
'''
参数
1 dbtype(str)数据库类型
2 connectname(str)链接的名称用户填写用于区分同一个数据库下的不同链接
'''
self.dbtype = dbtype
self.connectname = connectname
def getConnectAccountSSH(self):
'''
提取SSH信息
'''
CA = self.getConnectAccount()
ssh = CA["SSH"]
account = CA["account"]
return(account["host"], account["port"], ssh["ssh_host"], ssh["ssh_port"], ssh["ssh_username"], ssh["ssh_password"])
def getConnectAccount(self):
'''
用途
获取连接
返回结果
连接的账号IPportpassword, SSH
'''
self.loadConnectAccount()
connectaccount = self.dbconnectaccount[self.dbtype].get(self.connectname)
return(connectaccount)
def getConnectAccountDesc(self):
'''
用途
获取连接的信息列表connectname desc
返回结果
字典格式数据结构{dbtype:{connectname: desc}}
'''
self.loadConnectAccount()
res = {}
for k, v in self.dbconnectaccount.items():
res.update({k: {}})
for vk, vv in v.items():
res[k].update({vk: vv.get("connectdescribe")})
return(res)
def delConnectAccount(self):
'''
用途
删除链接通道
注意
前端需要验证是否有选择需要删除的链接
'''
del self.dbconnectaccount[self.dbtype][self.connectname]
self.writeConnectAccount()
def updateConnectAccount(self, connectaccount={}):
'''
用途
新增更新连接通道
参数
3 connectaccount(dict)账号的信息包括用户密码地址等对应的JSON结构
注意上游传入的 connectaccount 结构说明
1account(json) -> 账号信息
|- 1user = 用户 2password = 密码 3host = 地址
|- 4port = 端口 5database = 数据库 6charset = 字符集
2usessh(boolean) -> 是否使用 SSH 加密通道默认为 False表示不使用该通道
暂时实现了 SSH 加密通道使用密码加密的方法
3SSH(dict) -> SSH加密通道
|- 1ssh_host = 地址 2ssh_port = 端口 3ssh_username = 用户名
|- 4ssh_authenmethod = 加密模式5ssh_password = 密码
4 connectdescribe(str) -> 对连接的描述前端传入时默认为空字符串
'''
if self.dbtype not in self.dbconnectaccount:
self.dbconnectaccount.update(dbtype={})
self.dbconnectaccount[self.dbtype].update({self.connectname: connectaccount})
# 如果连接不存在,则直接新增;
# 前端需要检验并提醒是否有同名连接,如果同名会覆盖
self.writeConnectAccount()
def writeConnectAccount(self):
with open(self.pklroad, 'wb') as wfile:
pickle.dump(self.dbconnectaccount, wfile)
wfile.close()
def loadConnectAccount(self):
'''
用途
pickle 文件中获取链接账号数据
返回
dbConnectAccount(dict)连接账号集合数据结构{数据库类型:{名称: {连接账号信息}}}
'''
if not os.path.exists(self.pklroad):
# 如果文件不存在,则创建一个测试用账号写入到 pkl 文件中
testdt = dict(
account = dict(
user="root", password="", host="localhost",
port="3306", database="", charset="utf-8"
),
usessh = False,
SSH = {},
connectdescribe = "这是一个测试模块"
)
self.dbconnectaccount = {"mysql": {"testdt": testdt}}
self.writeConnectAccount()
with open(self.pklroad, 'rb') as rfile:
self.dbconnectaccount = pickle.load(rfile)
rfile.close()
class dbConnectTool(object):
'''
数据库通用连接工具
'''
def __init__(self, account, conn_url):
'''
参数
1 account(class)dbConnectAccountTool(dbtype, connectname)
2 conn_url(str)通过 url 方式连接数据库
'''
self.account = account
self.conn_url = conn_url
def createSSHConn(self):
'''
开启 SSH 连接方式
'''
ssh = self.account["SSH"]
account = self.account["account"]
self.ssh_server = SSHTunnelForwarder(
(ssh["ssh_host"], ssh["ssh_port"]),
ssh_username=ssh["ssh_username"],
ssh_password=ssh["ssh_password"],
remote_bind_address=(account["host"], account["port"])
)
self.ssh_server.start()
self.account["port"] = str(self.ssh_server.local_bind_port)
def createConn(self):
'''
用途
通过 url 的方式创建连接通道
参数
conn_url连接url由数据类型进行定义
注意
暂时没有实现SSL的方法
'''
try:
if self.account["usessh"]:
self.createSSHConn()
self.engine = create_engine(self.conn_url.format(**self.account["account"]), encoding = "utf-8")
conn_status = {"status":"connect", "info":""}
except Exception as e:
conn_status = {"status":"error", "info":e}
return(conn_status)
def closeConn(self):
'''
关闭所有通道
'''
if self.account["usessh"]:
self.ssh_server.close()
class dbFuncTool(object):
'''
数据库通用执行方法
'''
def __init__(self, account, conn_url):
self.dbCT = dbConnectTool(account = account, conn_url = conn_url)
self.conn_status = self.dbCT.createConn()
def execute(self, sql):
'''
执行命令需要增加一个装饰器关于运行时间的装饰器
返回结果
字典结构包含内容
1data(pd.dataframe)数据
2execute_status(str)查询结果状态done = 正常error = 报错
3info(str)返回信息GetData = 返回数据需要呈现ExecuteSQL = 执行命令不用呈现
'''
try:
conn = self.dbCT.engine.execute(sql)
if conn.cursor.description:
df = pd.DataFrame(
data = list(conn.cursor.fetchall()),
columns = list(map(lambda x:x[0], conn.cursor.description))
)
res = {"data":df, "execute_status":"done", "info":"GetData"}
else:
df = pd.DataFrame([])
res = {"data":df, "execute_status":"done", "info":"ExecuteSQL"}
conn.close()
except Exception as e:
res = {"data":pd.DataFrame([]), "execute_status":"error", "info":str(e)}
self.dbCT.closeConn()
return(res)
# "mysql": "mysql+pymysql://{user}:{password}@{host}:{port}/{database}"
# "pgsql": "postgresql://{user}:{password}@{host}:{port}/{database}"
# if __name__ == "__main__":

View File

@ -0,0 +1,65 @@
import os
from dbBaseTool import *
def split_print(dt):
print(dt)
print("=" * 50)
dbCA = dbConnectAccountTool()
dbCA.pklroad = ".\dbConnectAccount.pkl"
# 模拟打开数据库模块后,选择需要连接的方案:
desc = dbCA.getConnectAccountDesc()
split_print(desc)
# 模拟获取某个账号的信息、SSH账号等信息
dbtype, connectname = "mysql", "testdt"
dbCA.attachConnetName(dbtype, connectname)
account = dbCA.getConnectAccount()
split_print(account)
# 模拟增加(或更新)一个新的连接
dbtype, connectname = "mysql", "testaccount"
dbCA.attachConnetName(dbtype, connectname)
connectaccount = dict(
account=dict(
user="gandw", password="123456", host="localhost",
port="3306", database="local_db", charset="utf-8"
),
usessh=False,
SSH={},
connectdescribe="这又是一个测试"
)
dbCA.updateConnectAccount(connectaccount)
dbCA.loadConnectAccount()
split_print(dbCA.dbconnectaccount)
# 模拟数据库的 "测试连接"
mysql_url = "mysql+pymysql://{user}:{password}@{host}:{port}/{database}"
dbtype, connectname = "mysql", "testaccount"
dbCA.attachConnetName(dbtype, connectname)
account = dbCA.getConnectAccount()
dbCT = dbConnectTool(account = account, conn_url = mysql_url)
connect_status = dbCT.createConn()
split_print(connect_status)
# 模拟一个查询操作(mysql)
mysql_url = "mysql+pymysql://{user}:{password}@{host}:{port}/{database}"
dbtype, connectname = "mysql", "testaccount"
dbCA.attachConnetName(dbtype, connectname)
account = dbCA.getConnectAccount()
mysql = dbFuncTool(account = account, conn_url = mysql_url)
print(mysql.dbCT.engine)
df = mysql.execute(sql = "select * from mysql.use")
print(df['execute_status'])
print(df['data'])
df2 = mysql.execute(sql = "select * from mysql.user")
print(df["data"])
# 模拟删除一个连接
dbtype, connectname = "mysql", "testaccount"
dbCA.attachConnetName(dbtype, connectname)
dbCA.delConnectAccount()
dbCA.loadConnectAccount()
# split_print(dbCA.dbconnectaccount)

View File

@ -1,3 +1,4 @@
import os
import platform
import subprocess
import sys
@ -5,6 +6,9 @@ import sys
def check_platform() -> str:
system = platform.system()
plat = platform.platform(1, 1)
print(sys.platform)
print(plat)
return system.lower()
@ -17,27 +21,41 @@ def run_command_in_terminal(cmd: str, close_mode: str = 'wait_key'):
'wait_key': 'start cmd.exe /k \"%s &&pause &&exit \"'
}
command = close_action[close_mode] % cmd
subprocess.Popen(command, shell=True)
elif platform_name == 'deepin':
command = 'deepin-terminal -x bash -c \" %s \" ' % (cmd)
elif platform_name == 'linux':
command = 'gnome-terminal -x bash -c \"%s ;read\" ' % (cmd)
ret = os.system('which gnome-terminal')
if ret == 0:
close_action = {'auto': 'deepin-terminal -C \"%s\"',
'no': 'deepin-terminal -C \"%s\" --keep-open',
'wait_key': 'deepin-terminal -C \"%s\" --keep-open'
}
command = close_action[close_mode] % cmd
subprocess.Popen(command, shell=True)
else:
close_action = {'auto': 'gnome-terminal -x bash -c "%s;"',
'no': 'gnome-terminal -x bash -c "%s; read"',
'wait_key': 'gnome-terminal -x bash -c "%s; read"'
}
command = close_action[close_mode] % (cmd)
subprocess.Popen(command, shell=True)
else:
return
subprocess.Popen(command, shell=True)
def run_python_file_in_terminal(file_path, interpreter_path: str = None, close_mode: str = 'wait_key'):
if interpreter_path is None:
interpreter_path = sys.executable
run_command_in_terminal('%s %s' % (interpreter_path, file_path),close_mode=close_mode)
run_command_in_terminal('%s %s' % (interpreter_path, file_path), close_mode=close_mode)
def check_application(app_name):
os.system(app_name)
if __name__ == '__main__':
def test_run_in_terminal():
import time
run_command_in_terminal('dir', close_mode='no')
time.sleep(1)
run_command_in_terminal('dir', close_mode='wait_key')
time.sleep(1)
run_command_in_terminal('dir', close_mode='auto')
plat = check_platform()
print(plat)

View File

@ -48,8 +48,7 @@ import logging
from PyQt5.QtCore import pyqtSignal, QTimer, Qt, QTranslator, QLocale, QSize
from PyQt5.QtGui import QCloseEvent, QTextCursor, QResizeEvent, QFontDatabase, QMoveEvent, QFont, QIcon, QPixmap
from PyQt5.QtWidgets import QApplication, \
QMenu, QTextEdit, QSizePolicy, QMessageBox, QToolBar, QLabel, QSplashScreen, QFileDialog
from PyQt5.QtWidgets import QApplication, QMenu, QTextEdit, QMessageBox, QToolBar, QSplashScreen, QFileDialog
from pyminer2.features.io import sample
from pyminer2.features import base
@ -122,7 +121,8 @@ class PMToolBarHome(PMGToolBar):
self.add_buttons(2, ['button_search_for_files', 'button_compare_files'],
[pmlocale._('Search'), pmlocale._('Compare')],
[":/color/source/theme/color/icons/mActionQueryByLine.svg", ":/color/source/theme/color/icons/mActionChangePolylineByExitline.svg"])
[":/color/source/theme/color/icons/mActionQueryByLine.svg",
":/color/source/theme/color/icons/mActionChangePolylineByExitline.svg"])
self.addSeparator()

View File

@ -1,7 +1,7 @@
from pyminer2.features.util.platformutil import run_command_in_terminal,run_python_file_in_terminal
def test_run_command():
def test_run_command_win():
import time
run_command_in_terminal('dir', close_mode='auto')
time.sleep(3)
@ -10,7 +10,15 @@ def test_run_command():
run_command_in_terminal('dir', close_mode='no')
time.sleep(3)
pass
def test_run_command_linux():
import time
run_command_in_terminal('ls', close_mode='auto')
time.sleep(3)
run_command_in_terminal('ls', close_mode='wait_key')
time.sleep(3)
run_command_in_terminal('ls', close_mode='no')
time.sleep(3)
pass
def test_run_python_file_in_terminal():
run_python_file_in_terminal('python_file_to_run.py',close_mode='wait_key')
@ -18,5 +26,5 @@ def test_run_python_file_in_terminal():
if __name__ == '__main__':
test_run_command()
test_run_python_file_in_terminal()
test_run_command_linux()
# test_run_python_file_in_terminal()

View File

@ -0,0 +1,69 @@
import queue
import subprocess
import sys
import threading
import time
import chardet
from typing import List
class PMProcess():
def __init__(self, args: List[str]):
self.terminate = False
self.q = queue.Queue()
self.on_command_received = lambda cmd: print(cmd)
self.on_error_received = lambda error: print(error)
self.args = args
self.process = subprocess.Popen(self.args,
stdin=subprocess.PIPE,
shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.to = threading.Thread(
target=self.enqueue_stream, args=(
self.process.stdout, self.q, 1))
self.te = threading.Thread(
target=self.enqueue_stream, args=(
self.process.stderr, self.q, 2))
self.tp = threading.Thread(target=self.consoleLoop)
self.to.setDaemon(True)
self.te.setDaemon(True)
self.tp.setDaemon(True)
self.te.start()
self.to.start()
self.tp.start()
def enqueue_stream(self, stream, queue, type): # 将stderr或者stdout写入到队列q中。
for line in iter(stream.readline, b''):
if self.terminate:break
encoding = chardet.detect(line)['encoding']
queue.put(str(type) + line.decode(encoding))
stream.close()
def consoleLoop(self): # 封装后的内容。
return
idleLoops = 0
while True:
if not self.q.empty():
line = self.q.get()
if line[0] == '1':
self.on_command_received(line[1:])
else:
self.on_error_received(line[1:])
sys.stdout.flush()
else:
time.sleep(0.01)
if idleLoops >= 5:
idleLoops = 0
# print('write!!')
self.process.stdin.write(
'messsage\n'.encode('ascii')) # 模拟输入
self.process.stdin.flush()
continue
idleLoops += 1
if __name__ == '__main__':
pmp = PMProcess(['python', '-u',
'test_open_app.py'])
while (1):
time.sleep(2)
pass

View File

@ -0,0 +1,37 @@
import platform
import subprocess
def check_platform() -> str:
system = platform.system()
print(system)
return system.lower()
def run_command_in_terminal(cmd: str, close_mode: str = 'wait_key'):
print(check_platform())
platform_name = check_platform()
if platform_name == 'windows':
close_action = {'auto': 'start cmd.exe /k \"%s &&exit \"',
'no': 'start cmd.exe /k \"%s \"',
'wait_key': 'start cmd.exe /k \"%s &&pause &&exit \"'
}
command = close_action[close_mode] % cmd
elif platform_name == 'deepin':
command = 'deepin-terminal -x bash -c \" %s \" ' % (cmd)
elif platform_name == 'linux':
command = 'gnome-terminal -x bash -c \"%s ;read\" ' % (cmd)
else:
return
subprocess.Popen(command, shell=True)
if __name__ == '__main__':
def test_run_in_terminal():
import time
run_command_in_terminal('dir', close_mode='no')
time.sleep(1)
run_command_in_terminal('dir', close_mode='wait_key')
time.sleep(1)
run_command_in_terminal('dir', close_mode='auto')

View File

@ -25,7 +25,6 @@ six>=1.15.0
threadpoolctl>=2.1.0
xlrd>=1.2.0
qtconsole>=4.7.7
pywin32>=228
pretty_errors
json-rpc>=1.13.0
Werkzeug>=1.0.1
@ -41,4 +40,4 @@ configparser
vermanager>=0.1.3
send2trash>=1.5.0
pytest
PyQtWebEngine
PyQtWebEngine