Add files via upload

This commit is contained in:
liuyanlei25 2018-03-07 17:13:33 +08:00 committed by GitHub
parent 9ce14cfee5
commit 3a19bc8658
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -0,0 +1,430 @@
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os
import re
import time
import json
import logging
import datetime
import MySQLdb
CONFIG = {"logs": "/data/cetus/xa_suspension_logs/xa-suspension.log",
"backend": "/home/mysql-cetus/cetus_install/conf/cetus.conf",
"user": "/home/mysql-cetus/cetus_install/conf/users.json",
"temp_file": "/data/cetus/xa_suspension_logs/"
}
BACKEND_CONF = []
USER_CONF = []
'''
悬挂事务查找模块
'''
def set_search_log():
"""日志记录"""
global CONFIG
logger = logging.getLogger()
logger.setLevel(logging.NOTSET)
log_path = CONFIG['logs']
hdlr = logging.FileHandler(log_path)
logger.addHandler(hdlr)
log_format = "%(asctime)s - %(levelname)s - %(message)s"
time_format = "%m/%d/%Y %H:%M:%S %p"
formatter = logging.Formatter(log_format, time_format)
hdlr.setFormatter(formatter)
return logger
def get_config():
"""读取cetus后端配置信息"""
global CONFIG
global BACKEND_CONF
global USER_CONF
back_path = CONFIG['backend']
with open(back_path, 'r') as backend_conf_file:
conf_info = backend_conf_file.read()
line = re.findall(r"proxy-backend-addresses.*", conf_info)[0]
BACKEND_CONF = re.findall(r"(\d+\.\d+\.\d+\.\d+)(\:\d{4})?", line)
for x in BACKEND_CONF:
if not x[1]:
x[1] = ':3306'
user_path = CONFIG['user']
with open(user_path, 'r') as user_conf_file:
data = json.load(user_conf_file)
USER_CONF = data['users']
def xa_recover(backend_conf):
"""读取mysql中xa recover的内容"""
global USER_CONF
host = backend_conf[0]
port = backend_conf[1][1:]
user = USER_CONF[0]['user']
passwd = USER_CONF[0]['server_pwd']
config = {
'host': backend_conf[0],
'port': int(backend_conf[1][1:]),
'user': USER_CONF[0]['user'],
'passwd': USER_CONF[0]['server_pwd']
}
try:
db = MySQLdb.connect(**config)
cursor = db.cursor()
sql = "xa recover"
cursor.execute(sql)
results = cursor.fetchall()
except Exception, e:
# logging.error("mysql: %s:%s xa recover 查询失败! %s,%s", host, port, Exception, e)
return False
else:
xid_list = []
if results:
for i in results:
xid_list.append(i[3])
return xid_list
def get_suspension_xid(xid_list, next_xid_list):
"""读取xa recover中长时间处于悬挂的事务xid列表"""
final_xid_list = []
for i in xid_list:
for j in next_xid_list:
if i == j:
final_xid_list.append(i)
return final_xid_list
def get_all_suspesion_xid():
"""读取间隔时间的所有后端xa recover中的xid列表确定长时间悬挂的事务"""
global BACKEND_CONF
final_xid = []
all_backend_xid = map(xa_recover, BACKEND_CONF)
time.sleep(10)
all_backend_xid_next = map(xa_recover, BACKEND_CONF)
for i in range(len(BACKEND_CONF)):
final_xid_list = get_suspension_xid(all_backend_xid[i], all_backend_xid_next[i])
final_xid.append(final_xid_list)
return final_xid
def xid_final_is_null(final_xid):
"""判断final_xid是否为空"""
element_number = 0
for i in final_xid:
if not i:
continue
else:
element_number += 1
return bool(element_number)
def get_all_xid(final_xid):
"""将所有后端xid去重存入字符串"""
all_xid = []
for i in final_xid:
all_xid.extend(i)
all_xid = list(set(all_xid))
return all_xid
def get_binlog_name(host, port):
"""读取mysql中show binlog的内容获取binlog列表"""
global USER_CONF
config = {
'host': host,
'port': int(port),
'user': USER_CONF[0]['user'],
'passwd': USER_CONF[0]['server_pwd']
}
try:
db = MySQLdb.connect(**config)
cursor = db.cursor()
sql = "show binary logs"
cursor.execute(sql)
results = cursor.fetchall()
except Exception, e:
# logging.error("mysql: %s:%s show binary logs 查询失败! %s,%s", host, port, Exception, e)
return False
else:
length = len(results)
binlog_name = []
for i in range(length):
binlog_name.append(results[length - i - 1][0])
return binlog_name
def get_binlog(host, port, binlog_name, xid_time):
"""读取binlog中指定时间的日志"""
global USER_CONF
global CONFIG
user = USER_CONF[0]['user']
passwd = USER_CONF[0]['server_pwd']
current_time = time.strftime('%Y-%m-%d', time.localtime())
current_hour = time.strftime("%H", time.localtime())
mysql_link = 'mysqlbinlog -R --start-datetime="' + current_time + ' ' + xid_time +\
':00:00" --stop-datetime="' + current_time + ' ' + str(int(xid_time)+1) +\
':00:00" -h' + host +' -u' + user + ' -p' + passwd + ' -P' + port + ' -vv '
if xid_time == '23' and int(current_hour) == 0:
yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
mysql_link = 'mysqlbinlog -R --start-datetime=\"' + yesterday + ' ' + xid_time +\
':00:00" --stop-datetime="' + current_time + ' ' + '00:00:00" -h' +\
host +' -u' + user + ' -p' + passwd + ' -P' + port + ' -vv '
sql = binlog_name + ' 2>/dev/null|egrep "^XA(.*),1$" >> ' +\
CONFIG['temp_file'] + host + port +'.txt 2>/dev/null'
os.system(mysql_link + sql)
def grep_status(host, port, xid):
global CONFIG
shell = 'cat ' + CONFIG['temp_file'] + host + port +\
'.txt|grep ' + xid.encode('hex') + ' 2>/dev/null'
grep_status_file = os.popen(shell)
r = grep_status_file.read()
grep_status_file.close()
status_list = re.findall(r"XA\s(\w+)", r)
return status_list
def get_status(args):
global USER_CONF
global CONFIG
host = args[0][0]
port = args[0][1][1:]
all_xid = args[1]
xid_time_dict = args[2]
time_list = xid_time_dict.keys()
user = USER_CONF[0]['user']
passwd = USER_CONF[0]['server_pwd']
binlog_name = get_binlog_name(host, port)
status = {}
if not binlog_name:
return status
for xid_time in time_list:
os.system('rm ' + CONFIG['temp_file'] + host + port +'.txt 2>/dev/null')
for binlog in binlog_name:
get_binlog(host, port, binlog, xid_time)
binlog_start = os.popen('mysqlbinlog -R -h' + host + ' -u' + user + ' -p' +
passwd + ' -P' + port + ' -vv ' + binlog +
' 2>/dev/null|egrep "^#(\w{6})" 2>/dev/null|head -n 1')
r = binlog_start.read()
binlog_start.close()
binlog_start_time = re.findall(r"^#(\d{6})", r)
current_time = time.strftime('%y%m%d', time.localtime())
if binlog_start_time[0] < current_time:
break
xid_is_time_same = xid_time_dict[xid_time]
for xid in xid_is_time_same:
status_list = grep_status(host, port, xid)
print "-------status_list----------"
print status_list
if status_list:
if 'COMMIT' in status_list:
logging.info("Mysql后端:%s:%s,悬挂事务xid:%s,最终状态:COMMIT", host, port, xid)
status[xid] = ('COMMIT')
continue
elif 'ROLLBACK' in status_list:
logging.info("Mysql后端:%s:%s,悬挂事务xid:%s,最终状态:ROLLBACK", host, port, xid)
status[xid] = ('ROLLBACK')
continue
elif 'PREPARE' in status_list:
logging.info("Mysql后端:%s:%s,悬挂事务xid:%s,最终状态:PREPARE", host, port, xid)
status[xid] = ('PREPARE')
continue
elif 'END' in status_list:
logging.info("Mysql后端:%s:%s,悬挂事务xid:%s,最终状态:END", host, port, xid)
status[xid] = ('END')
continue
elif 'START' in status_list:
logging.info("Mysql后端:%s:%s,悬挂事务xid:%s,最终状态:START", host, port, xid)
status[xid] = ('START')
continue
else:
logging.info("Mysql后端:%s:%s,没有该悬挂事务xid:%s", host, port, xid)
status[xid] = ('NULL')
continue
print "-------status----------"
print status
return status
def suspension_status(final_xid):
"""获得所有后端的xid的最终状态"""
global BACKEND_CONF
all_xid = get_all_xid(final_xid)
xid_time_dict = {}
for i in range(len(all_xid)):
xid_time = re.findall(r"_(\d{2})_", all_xid[i])
if xid_time[0] in xid_time_dict.keys():
xid_time_dict[xid_time[0]].append(all_xid[i])
else:
xid_time_dict.setdefault(xid_time[0],[all_xid[i]])
args = []
for m in BACKEND_CONF:
mysql = []
mysql.append(m)
mysql.append(all_xid)
mysql.append(xid_time_dict)
args.append(mysql)
print "-------all_xid----------"
print all_xid
pool = Pool(len(BACKEND_CONF))
status = pool.map(get_status, args)
print "============== status all============="
print status
pool.close()
final_status = []
for xid in all_xid:
xid_status = []
for j in range(len(BACKEND_CONF)):
if status[j].has_key(xid):
xid_status.append(status[j][xid])
if 'COMMIT' in xid_status:
final_status.append('COMMIT')
continue
elif 'ROLLBACK' in xid_status:
final_status.append('ROLLBACK')
continue
elif 'PREPARE' in xid_status:
final_status.append('PREPARE')
continue
elif 'END' in xid_status:
final_status.append('END')
continue
elif 'START' in xid_status:
final_status.append('START')
continue
else:
final_status.append('NULL')
continue
return final_status, all_xid
'''
悬挂事务处理模块
'''
#在mysql中执行commit操作
def xa_commit(backend_conf, xid):
global USER_CONF
config = {
'host': backend_conf[0],
'port': int(backend_conf[1][1:]),
'user': USER_CONF[0]['user'],
'passwd': USER_CONF[0]['server_pwd']
}
try:
db = MySQLdb.connect(**config)
cursor = db.cursor()
sql_set = "set autocommit = on"
sql = "xa commit '%s'"%xid
cursor.execute(sql_set)
cursor.execute(sql)
db.commit()
logging.info("mysql: %s:%s xid:%s 提交成功!",
config['host'], config['port'], xid)
except Exception, e:
logging.error("mysql: %s:%s xid:%s 提交失败! %s,%s",
config['host'], config['port'], xid, Exception, e)
return False
#在mysql中执行rollback操作
def xa_rollback(backend_conf, xid):
global USER_CONF
config = {
'host': backend_conf[0],
'port': int(backend_conf[1][1:]),
'user': USER_CONF[0]['user'],
'passwd': USER_CONF[0]['server_pwd']
}
try:
db = MySQLdb.connect(**config)
cursor = db.cursor()
sql_set = "set autocommit = on"
sql = "xa rollback '%s';"%xid
cursor.execute(sql_set)
cursor.execute(sql)
db.commit()
logging.info("mysql: %s:%s xid:%s 回滚成功!",
config['host'], config['port'], xid)
except Exception, e:
logging.error("mysql: %s:%s xid:%s 回滚失败! %s,%s",
config['host'], config['port'], xid, Exception, e)
return False
#根据悬挂事务的最终状态,处理悬挂事务
def handle_suspension(final_xid, status, all_xid):
global BACKEND_CONF
for i in range(len(BACKEND_CONF)):
for xid in final_xid[i]:
for j in range(len(all_xid)):
if xid == all_xid[j]:
if status[j] in 'PREPARE ROLLBACK END START':
xa_rollback(BACKEND_CONF[i], xid)
if status[j] == 'COMMIT':
xa_commit(BACKEND_CONF[i], xid)
def main():
"""功能主体"""
get_config()
logging = set_search_log()
while True:
try:
final_xid = get_all_suspesion_xid()
except Exception, e:
logging.error("后端连接有问题!%s,%s", Exception, e)
continue
if xid_final_is_null(final_xid):
logging.info("发现悬挂事务,开始处理!")
(status, all_xid) = suspension_status(final_xid)
logging.info("状态查找结束!")
handle_suspension(final_xid, status, all_xid)
else:
continue
if __name__ == '__main__':
main()