diff --git a/xa-suspension-tool/xa-suspension.py b/xa-suspension-tool/xa-suspension.py new file mode 100644 index 0000000..e354575 --- /dev/null +++ b/xa-suspension-tool/xa-suspension.py @@ -0,0 +1,430 @@ +#!/usr/bin/env python +# -*- coding:utf-8 -*- + +from multiprocessing import Pool +import os +import re +import time +import json +import logging +import datetime +import MySQLdb + +CONFIG = {"logs": "/data/cetus/xa_suspension_logs/xa-suspension.log", + "backend": "/home/mysql-cetus/cetus_install/conf/cetus.conf", + "user": "/home/mysql-cetus/cetus_install/conf/users.json", + "temp_file": "/data/cetus/xa_suspension_logs/" + } +BACKEND_CONF = [] +USER_CONF = [] + +''' +悬挂事务查找模块 +''' + +def set_search_log(): + """日志记录""" + + global CONFIG + logger = logging.getLogger() + logger.setLevel(logging.NOTSET) + log_path = CONFIG['logs'] + hdlr = logging.FileHandler(log_path) + logger.addHandler(hdlr) + log_format = "%(asctime)s - %(levelname)s - %(message)s" + time_format = "%m/%d/%Y %H:%M:%S %p" + formatter = logging.Formatter(log_format, time_format) + hdlr.setFormatter(formatter) + return logger + +def get_config(): + """读取cetus后端配置信息""" + + global CONFIG + global BACKEND_CONF + global USER_CONF + + back_path = CONFIG['backend'] + with open(back_path, 'r') as backend_conf_file: + conf_info = backend_conf_file.read() + line = re.findall(r"proxy-backend-addresses.*", conf_info)[0] + BACKEND_CONF = re.findall(r"(\d+\.\d+\.\d+\.\d+)(\:\d{4})?", line) + for x in BACKEND_CONF: + if not x[1]: + x[1] = ':3306' + + user_path = CONFIG['user'] + with open(user_path, 'r') as user_conf_file: + data = json.load(user_conf_file) + USER_CONF = data['users'] + +def xa_recover(backend_conf): + """读取mysql中xa recover的内容""" + + global USER_CONF + + host = backend_conf[0] + port = backend_conf[1][1:] + user = USER_CONF[0]['user'] + passwd = USER_CONF[0]['server_pwd'] + + config = { + 'host': backend_conf[0], + 'port': int(backend_conf[1][1:]), + 'user': USER_CONF[0]['user'], + 'passwd': USER_CONF[0]['server_pwd'] + } + + try: + db = MySQLdb.connect(**config) + cursor = db.cursor() + sql = "xa recover" + cursor.execute(sql) + results = cursor.fetchall() + except Exception, e: + # logging.error("mysql: %s:%s xa recover 查询失败! %s,%s", host, port, Exception, e) + return False + else: + xid_list = [] + if results: + for i in results: + xid_list.append(i[3]) + return xid_list + + +def get_suspension_xid(xid_list, next_xid_list): + """读取xa recover中长时间处于悬挂的事务xid列表""" + + final_xid_list = [] + for i in xid_list: + for j in next_xid_list: + if i == j: + final_xid_list.append(i) + return final_xid_list + +def get_all_suspesion_xid(): + """读取间隔时间的所有后端xa recover中的xid列表,确定长时间悬挂的事务""" + + global BACKEND_CONF + + final_xid = [] + all_backend_xid = map(xa_recover, BACKEND_CONF) + + time.sleep(10) + + all_backend_xid_next = map(xa_recover, BACKEND_CONF) + + for i in range(len(BACKEND_CONF)): + final_xid_list = get_suspension_xid(all_backend_xid[i], all_backend_xid_next[i]) + final_xid.append(final_xid_list) + return final_xid + +def xid_final_is_null(final_xid): + """判断final_xid是否为空""" + + element_number = 0 + for i in final_xid: + if not i: + continue + else: + element_number += 1 + return bool(element_number) + +def get_all_xid(final_xid): + """将所有后端xid去重,存入字符串""" + + all_xid = [] + for i in final_xid: + all_xid.extend(i) + all_xid = list(set(all_xid)) + return all_xid + +def get_binlog_name(host, port): + """读取mysql中show binlog的内容,获取binlog列表""" + + global USER_CONF + + config = { + 'host': host, + 'port': int(port), + 'user': USER_CONF[0]['user'], + 'passwd': USER_CONF[0]['server_pwd'] + } + try: + db = MySQLdb.connect(**config) + cursor = db.cursor() + sql = "show binary logs" + cursor.execute(sql) + results = cursor.fetchall() + except Exception, e: + # logging.error("mysql: %s:%s show binary logs 查询失败! %s,%s", host, port, Exception, e) + return False + else: + length = len(results) + binlog_name = [] + for i in range(length): + binlog_name.append(results[length - i - 1][0]) + return binlog_name + +def get_binlog(host, port, binlog_name, xid_time): + """读取binlog中指定时间的日志""" + + global USER_CONF + global CONFIG + + user = USER_CONF[0]['user'] + passwd = USER_CONF[0]['server_pwd'] + current_time = time.strftime('%Y-%m-%d', time.localtime()) + current_hour = time.strftime("%H", time.localtime()) + + mysql_link = 'mysqlbinlog -R --start-datetime="' + current_time + ' ' + xid_time +\ + ':00:00" --stop-datetime="' + current_time + ' ' + str(int(xid_time)+1) +\ + ':00:00" -h' + host +' -u' + user + ' -p' + passwd + ' -P' + port + ' -vv ' + + if xid_time == '23' and int(current_hour) == 0: + yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d") + mysql_link = 'mysqlbinlog -R --start-datetime=\"' + yesterday + ' ' + xid_time +\ + ':00:00" --stop-datetime="' + current_time + ' ' + '00:00:00" -h' +\ + host +' -u' + user + ' -p' + passwd + ' -P' + port + ' -vv ' + + sql = binlog_name + ' 2>/dev/null|egrep "^XA(.*),1$" >> ' +\ + CONFIG['temp_file'] + host + port +'.txt 2>/dev/null' + + os.system(mysql_link + sql) + +def grep_status(host, port, xid): + + global CONFIG + + shell = 'cat ' + CONFIG['temp_file'] + host + port +\ + '.txt|grep ' + xid.encode('hex') + ' 2>/dev/null' + grep_status_file = os.popen(shell) + r = grep_status_file.read() + grep_status_file.close() + status_list = re.findall(r"XA\s(\w+)", r) + return status_list + +def get_status(args): + + global USER_CONF + global CONFIG + + host = args[0][0] + port = args[0][1][1:] + all_xid = args[1] + xid_time_dict = args[2] + + time_list = xid_time_dict.keys() + + user = USER_CONF[0]['user'] + passwd = USER_CONF[0]['server_pwd'] + binlog_name = get_binlog_name(host, port) + + status = {} + + if not binlog_name: + return status + + for xid_time in time_list: + os.system('rm ' + CONFIG['temp_file'] + host + port +'.txt 2>/dev/null') + for binlog in binlog_name: + get_binlog(host, port, binlog, xid_time) + binlog_start = os.popen('mysqlbinlog -R -h' + host + ' -u' + user + ' -p' + + passwd + ' -P' + port + ' -vv ' + binlog + + ' 2>/dev/null|egrep "^#(\w{6})" 2>/dev/null|head -n 1') + r = binlog_start.read() + binlog_start.close() + binlog_start_time = re.findall(r"^#(\d{6})", r) + current_time = time.strftime('%y%m%d', time.localtime()) + if binlog_start_time[0] < current_time: + break + + xid_is_time_same = xid_time_dict[xid_time] + + for xid in xid_is_time_same: + status_list = grep_status(host, port, xid) + print "-------status_list----------" + print status_list + + if status_list: + if 'COMMIT' in status_list: + logging.info("Mysql后端:%s:%s,悬挂事务xid:%s,最终状态:COMMIT", host, port, xid) + status[xid] = ('COMMIT') + continue + elif 'ROLLBACK' in status_list: + logging.info("Mysql后端:%s:%s,悬挂事务xid:%s,最终状态:ROLLBACK", host, port, xid) + status[xid] = ('ROLLBACK') + continue + elif 'PREPARE' in status_list: + logging.info("Mysql后端:%s:%s,悬挂事务xid:%s,最终状态:PREPARE", host, port, xid) + status[xid] = ('PREPARE') + continue + elif 'END' in status_list: + logging.info("Mysql后端:%s:%s,悬挂事务xid:%s,最终状态:END", host, port, xid) + status[xid] = ('END') + continue + elif 'START' in status_list: + logging.info("Mysql后端:%s:%s,悬挂事务xid:%s,最终状态:START", host, port, xid) + status[xid] = ('START') + continue + else: + logging.info("Mysql后端:%s:%s,没有该悬挂事务xid:%s", host, port, xid) + status[xid] = ('NULL') + continue + print "-------status----------" + print status + return status + + +def suspension_status(final_xid): + """获得所有后端的xid的最终状态""" + + global BACKEND_CONF + + all_xid = get_all_xid(final_xid) + + xid_time_dict = {} + for i in range(len(all_xid)): + xid_time = re.findall(r"_(\d{2})_", all_xid[i]) + if xid_time[0] in xid_time_dict.keys(): + xid_time_dict[xid_time[0]].append(all_xid[i]) + else: + xid_time_dict.setdefault(xid_time[0],[all_xid[i]]) + + args = [] + for m in BACKEND_CONF: + mysql = [] + mysql.append(m) + mysql.append(all_xid) + mysql.append(xid_time_dict) + args.append(mysql) + + print "-------all_xid----------" + print all_xid + pool = Pool(len(BACKEND_CONF)) + status = pool.map(get_status, args) + print "============== status all=============" + print status + + pool.close() + final_status = [] + for xid in all_xid: + xid_status = [] + for j in range(len(BACKEND_CONF)): + if status[j].has_key(xid): + xid_status.append(status[j][xid]) + + if 'COMMIT' in xid_status: + final_status.append('COMMIT') + continue + elif 'ROLLBACK' in xid_status: + final_status.append('ROLLBACK') + continue + elif 'PREPARE' in xid_status: + final_status.append('PREPARE') + continue + elif 'END' in xid_status: + final_status.append('END') + continue + elif 'START' in xid_status: + final_status.append('START') + continue + else: + final_status.append('NULL') + continue + return final_status, all_xid + + +''' +悬挂事务处理模块 +''' + +#在mysql中执行commit操作 +def xa_commit(backend_conf, xid): + + global USER_CONF + + config = { + 'host': backend_conf[0], + 'port': int(backend_conf[1][1:]), + 'user': USER_CONF[0]['user'], + 'passwd': USER_CONF[0]['server_pwd'] + } + try: + db = MySQLdb.connect(**config) + cursor = db.cursor() + sql_set = "set autocommit = on" + sql = "xa commit '%s'"%xid + cursor.execute(sql_set) + cursor.execute(sql) + db.commit() + logging.info("mysql: %s:%s xid:%s 提交成功!", + config['host'], config['port'], xid) + except Exception, e: + logging.error("mysql: %s:%s xid:%s 提交失败! %s,%s", + config['host'], config['port'], xid, Exception, e) + return False + +#在mysql中执行rollback操作 +def xa_rollback(backend_conf, xid): + + global USER_CONF + + config = { + 'host': backend_conf[0], + 'port': int(backend_conf[1][1:]), + 'user': USER_CONF[0]['user'], + 'passwd': USER_CONF[0]['server_pwd'] + } + + try: + db = MySQLdb.connect(**config) + cursor = db.cursor() + sql_set = "set autocommit = on" + sql = "xa rollback '%s';"%xid + cursor.execute(sql_set) + cursor.execute(sql) + db.commit() + logging.info("mysql: %s:%s xid:%s 回滚成功!", + config['host'], config['port'], xid) + except Exception, e: + logging.error("mysql: %s:%s xid:%s 回滚失败! %s,%s", + config['host'], config['port'], xid, Exception, e) + return False + +#根据悬挂事务的最终状态,处理悬挂事务 +def handle_suspension(final_xid, status, all_xid): + + global BACKEND_CONF + + for i in range(len(BACKEND_CONF)): + for xid in final_xid[i]: + for j in range(len(all_xid)): + if xid == all_xid[j]: + if status[j] in 'PREPARE ROLLBACK END START': + xa_rollback(BACKEND_CONF[i], xid) + if status[j] == 'COMMIT': + xa_commit(BACKEND_CONF[i], xid) + + +def main(): + """功能主体""" + + get_config() + logging = set_search_log() + while True: + try: + final_xid = get_all_suspesion_xid() + except Exception, e: + logging.error("后端连接有问题!%s,%s", Exception, e) + continue + if xid_final_is_null(final_xid): + logging.info("发现悬挂事务,开始处理!") + (status, all_xid) = suspension_status(final_xid) + logging.info("状态查找结束!") + handle_suspension(final_xid, status, all_xid) + else: + continue + +if __name__ == '__main__': + main()