mirror of
https://gitee.com/wangbin579/cetus.git
synced 2024-12-02 03:47:41 +08:00
950 lines
38 KiB
Python
Executable File
950 lines
38 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# coding: utf-8
|
|
|
|
###
|
|
# A script console for cetus.
|
|
# For changing proxy config on remote server.
|
|
###
|
|
|
|
|
|
import os
|
|
import sys
|
|
import re
|
|
import traceback
|
|
import MySQLdb
|
|
import json
|
|
|
|
###
|
|
# DB settings
|
|
#
|
|
# Please EDIT this before running
|
|
###
|
|
HOST = '10.238.7.6'
|
|
PORT = 3306
|
|
USER = 'root'
|
|
PASSWD = '123456'
|
|
DB = 'proxy_management'
|
|
|
|
|
|
conn = None
|
|
PROXY_ID = None
|
|
BACKENDS_NDX_MAP = {}
|
|
|
|
# a pattern for check IP format
|
|
IP_RE_PATTERN = ('^(?:[1-9]?\d|1\d{2}|2(?:[0-4]\d|5[0-5]))'
|
|
'(?:\.(?:[1-9]?\d|1\d{2}|2(?:[0-4]\d|5[0-5]))){3}$')
|
|
|
|
|
|
# Print msg to console with color
|
|
def print_console(msg):
|
|
print('{0}{1}{2}'.format('\033[1;32m', msg, '\033[0m'))
|
|
|
|
|
|
def strwide(msg):
|
|
return '\n{0}\n'.format(msg)
|
|
|
|
|
|
def print_console_wide(msg):
|
|
print_console(strwide(msg))
|
|
|
|
|
|
# The decoration for reconnecting db
|
|
def reconnect(f):
|
|
def deco(*args, **kwargs):
|
|
global conn
|
|
if not conn:
|
|
conn = connect_db(HOST, PORT, USER, PASSWD, DB)
|
|
return f(*args, **kwargs)
|
|
return deco
|
|
|
|
|
|
# Connect to database
|
|
def connect_db(host, port, user, passwd, db):
|
|
return MySQLdb.Connection(
|
|
host=host,
|
|
port=port,
|
|
user=user,
|
|
passwd=passwd,
|
|
db=db,
|
|
charset='utf8')
|
|
|
|
|
|
# Select data
|
|
@reconnect
|
|
def query_data(sql):
|
|
global conn
|
|
cur = conn.cursor()
|
|
cur.execute(sql)
|
|
data = cur.fetchall()
|
|
cur.close()
|
|
return data
|
|
|
|
|
|
@reconnect
|
|
def query(sql):
|
|
global conn
|
|
cur = conn.cursor()
|
|
ret = cur.execute(sql)
|
|
conn.commit()
|
|
cur.close()
|
|
return ret
|
|
|
|
|
|
@reconnect
|
|
def querymany(sql, data):
|
|
global conn
|
|
cur = conn.cursor()
|
|
ret = cur.executemany(sql, data)
|
|
conn.commit()
|
|
cur.close()
|
|
return ret
|
|
|
|
|
|
@reconnect
|
|
def insertone(sql):
|
|
global conn
|
|
cur = conn.cursor()
|
|
ret = 0
|
|
try:
|
|
cur.execute(sql)
|
|
ret = cur.lastrowid
|
|
except MySQLdb.Error:
|
|
pass
|
|
finally:
|
|
conn.commit()
|
|
cur.close()
|
|
return ret
|
|
|
|
|
|
def gen_backend_ndx():
|
|
global BACKENDS_NDX_MAP
|
|
BACKENDS_NDX_MAP = {}
|
|
|
|
|
|
# Process user's command
|
|
def run_cmd(cmd):
|
|
global PROXY_ID
|
|
global BACKENDS_NDX_MAP
|
|
|
|
###
|
|
# Show simple settings for proxy to start
|
|
###
|
|
if cmd == 'show':
|
|
cmap = {}
|
|
data = query_data('select proxy_id,s_key,s_value from settings')
|
|
max_key = 0
|
|
for line in data:
|
|
id, k, v = line
|
|
if not id in cmap:
|
|
cmap[id] = {}
|
|
if not k in cmap[id]:
|
|
cmap[id][k] = []
|
|
cmap[id][k].append(v)
|
|
if len(k) > max_key:
|
|
max_key = len(k)
|
|
msg_list = ['']
|
|
for ndx in cmap:
|
|
msg_list.append('-'*10)
|
|
msg_list.append('PROXY_ID: {0}'.format(ndx))
|
|
msg_list.append('')
|
|
for k in sorted(cmap[ndx].keys()):
|
|
i = 0
|
|
for v in cmap[ndx][k]:
|
|
if i is 0:
|
|
msg_list.append('{0}{1}'.format(k.ljust(max_key+3), v))
|
|
else:
|
|
msg_list.append('{0}{1}'.format(''.ljust(max_key+3), v))
|
|
i += 1
|
|
msg_list.append('-'*10)
|
|
msg_list.append('')
|
|
msg = '\n'.join(msg_list)
|
|
|
|
###
|
|
# Show backends info details
|
|
###
|
|
elif cmd == 'show backends':
|
|
data = query_data(
|
|
'select a.id, a.proxy_id, a.s_key, a.s_value, b.stat from '
|
|
'settings as a, backends as b where a.s_key in '
|
|
'("proxy-backend-addresses", "proxy-read-only-backend-addresses")'
|
|
' and a.id=b.settings_id')
|
|
cmap = {}
|
|
BACKENDS_NDX_MAP = {}
|
|
ndx_map = {}
|
|
for line in data:
|
|
id, pxy_id, k, v, st = line
|
|
if not pxy_id in BACKENDS_NDX_MAP:
|
|
BACKENDS_NDX_MAP[pxy_id] = {}
|
|
ndx_map[pxy_id] = 1
|
|
BACKENDS_NDX_MAP[pxy_id][str(ndx_map[pxy_id])] = id
|
|
if not pxy_id in cmap:
|
|
cmap[pxy_id] = []
|
|
bk_addr_list = re.split('[@:]', v)
|
|
addr = bk_addr_list[0]
|
|
port = bk_addr_list[1]
|
|
if k == 'proxy-backend-addresses':
|
|
tp = 'rw'
|
|
else:
|
|
tp = 'ro'
|
|
cmap[pxy_id].append({'ndx': ndx_map[pxy_id], 'addr':addr, 'port':port, 'type':tp, 'stat':st})
|
|
ndx_map[pxy_id] += 1
|
|
msg_list = ['']
|
|
for pxy_id in cmap:
|
|
msg_list.append('-'*10)
|
|
msg_list.append('PROXY_ID: {0}'.format(pxy_id))
|
|
msg_list.append('')
|
|
for backend in cmap[pxy_id]:
|
|
msg_list.append('{0}{1}'.format('NDX'.ljust(8), backend['ndx']))
|
|
msg_list.append('{0}{1}'.format('ADDR'.ljust(8), backend['addr']))
|
|
msg_list.append('{0}{1}'.format('PORT'.ljust(8), backend['port']))
|
|
msg_list.append('{0}{1}'.format('TYPE'.ljust(8), backend['type']))
|
|
msg_list.append('{0}{1}'.format('STAT'.ljust(8), backend['stat']))
|
|
msg_list.append('')
|
|
msg_list[-1] = '-'*10
|
|
msg_list.append('')
|
|
msg = '\n'.join(msg_list)
|
|
|
|
###
|
|
# Show sharding info details
|
|
###
|
|
elif cmd == 'show sharding':
|
|
data = query_data('select proxy_id, shard_db, shard_table, pkey, shard_key_type, '
|
|
'method, partitions, logic_shard_num from sharding ')
|
|
shard_map = {}
|
|
for line in data:
|
|
proxy_id = line[0]
|
|
if not proxy_id in shard_map:
|
|
shard_map[proxy_id] = []
|
|
shard_map[proxy_id].append({
|
|
'shard_db': line[1],
|
|
'shard_table': line[2],
|
|
'pkey': line[3],
|
|
'shard_key_type': line[4],
|
|
'method': line[5],
|
|
'partitions': line[6],
|
|
'logic_shard_num': line[7]
|
|
})
|
|
msg_list = ['']
|
|
for pid in shard_map:
|
|
msg_list.append('PROXY_ID: {0}'.format(pid))
|
|
msg_list.append('='*20)
|
|
for shard in shard_map[pid]:
|
|
if shard['shard_db'] == '_default_group':
|
|
msg_list.append('{0}{1}'.format('DEFAULT_GROUP'.ljust(18), shard['partitions']))
|
|
elif shard['shard_db'] == '_all_groups':
|
|
msg_list.append('{0}{1}'.format('ALL_GROUPS'.ljust(18), shard['partitions']))
|
|
else:
|
|
msg_list.append('{0}{1}'.format('DB'.ljust(18), shard['shard_db']))
|
|
msg_list.append('{0}{1}'.format('TABLE'.ljust(18), shard['shard_table']))
|
|
msg_list.append('{0}{1}'.format('KEY'.ljust(18), shard['pkey']))
|
|
msg_list.append('{0}{1}'.format('KEYTYPE'.ljust(18), shard['shard_key_type']))
|
|
msg_list.append('{0}{1}'.format('METHOD'.ljust(18), shard['method']))
|
|
msg_list.append('{0}{1}'.format('PARTITIONS'.ljust(18), shard['partitions']))
|
|
msg_list.append('{0}{1}'.format('SHARDNUM'.ljust(18), shard['logic_shard_num']))
|
|
msg_list.append('-'*10)
|
|
msg_list.append('')
|
|
msg = '\n'.join(msg_list)
|
|
|
|
###
|
|
# set global proxy_id, can ignore proxy_id in where clause below.
|
|
###
|
|
elif cmd.startswith('use '):
|
|
try:
|
|
PROXY_ID = cmd.split(' ', 1)[1]
|
|
msg = strwide('Proxy changed.')
|
|
except IndexError:
|
|
msg = strwide('Unknown Proxy.')
|
|
|
|
###
|
|
# Add or update settings
|
|
###
|
|
elif cmd.startswith('add ') or cmd.startswith('update '):
|
|
syntax_error = 0
|
|
msg = ''
|
|
###
|
|
# RE Pattern for add/update
|
|
# match `update (cap1) set (cap2) where (cap3)`
|
|
# Consider a JSON string may contains any character
|
|
###
|
|
up_pat = ('^\s*(add|update)\s+([a-zA-Z_]+)\s+'
|
|
'set\s+(\S+\s*=\s*(?:(\'|").*?(?<!\\\\)\\4|\d+)\s*'
|
|
'(?:,\s*\S+\s*=\s*(?:(\'|").*?(?<!\\\\)\\5|\d+)\s*)*)'
|
|
'(?:\s+where\s+(\S+\s*=\s*(?:(\'|").*?(?<!\\\\)\\7|\d+)'
|
|
'(?:\s+and\s+\S+\s*=\s*(?:(\'|").*?(?<!\\\\)\\8|\d+))*))?\s*$')
|
|
res = re.match(up_pat, cmd, re.IGNORECASE)
|
|
# Parse update command
|
|
if res is None:
|
|
syntax_error = 1
|
|
else:
|
|
c0 = res.group(1)
|
|
# settings/backend/sharding
|
|
c1 = res.group(2)
|
|
# set key/value
|
|
c2 = res.group(3)
|
|
# where clause, can be None if `use proxy_id` is set
|
|
c3 = res.group(6)
|
|
|
|
pxy = PROXY_ID
|
|
set_map = {}
|
|
cond_map = {}
|
|
# Parse set clause
|
|
set_pat = '(?:^|\s+)(\S+\s*=\s*(?:(\'|").*?(?<!\\\\)\\2|\d+))(?:\s+|,|$)'
|
|
for set_group in re.findall(set_pat, c2):
|
|
set_item = set_group[0]
|
|
k, v = set_item.split('=', 1)
|
|
k = k.strip().lower()
|
|
v = v.strip()
|
|
# If string is in single/double quotation, remove them
|
|
if re.match(r'^(\'|").*?\1$', v):
|
|
v = v[1:-1]
|
|
set_map[k] = v
|
|
# Parse where clause if exists
|
|
if c3 is not None:
|
|
where_pat = '(?:^|\s+)(\S+\s*=\s*(?:(\'|").*?(?<!\\\\)\\2|\d+))(?:\s+|$)'
|
|
for where_group in re.findall(where_pat, c3):
|
|
where_item = where_group[0]
|
|
k, v = where_item.split('=')
|
|
k = k.strip().lower()
|
|
v = v.strip()
|
|
if re.match(r'^(\'|").*?\1$', v):
|
|
v = v[1:-1]
|
|
cond_map[k] = v
|
|
if 'proxy_id' in cond_map:
|
|
pxy = cond_map['proxy_id']
|
|
# proxy_id should be set in where clause or use
|
|
if not pxy:
|
|
msg = strwide('`PROXY_ID` need to be specified')
|
|
elif c0 == 'add':
|
|
r = 0
|
|
if c1 == 'settings':
|
|
for k, v in set_map.items():
|
|
if not k in ['plugins', 'proxy-backend-addresses',
|
|
'proxy-read-only-backend-addresses']:
|
|
exist = query_data(
|
|
('select id from settings where proxy_id="{0}"'
|
|
' and s_key="{1}"').format(pxy, k))
|
|
if exist:
|
|
print_console('"{0}" currently exists, use `update` instead.')
|
|
continue
|
|
r += query(
|
|
('insert into settings set proxy_id="{0}", s_key="{1}",'
|
|
's_value="{2}"').format(pxy, k, v))
|
|
elif c1 == 'backend':
|
|
if not 'addr' in set_map:
|
|
return strwide('"addr" is needed.')
|
|
bk_addr = set_map['addr']
|
|
if not re.match(IP_RE_PATTERN, bk_addr):
|
|
return strwide('IP format error.')
|
|
bk_port = set_map.get('port', '3306')
|
|
if int(bk_port) > 65535 or int(bk_port) < 0:
|
|
return strwide('PORT format error.\nShould be integer in 0~65535')
|
|
bk_type = set_map.get('type', 'ro')
|
|
if not bk_type in ['ro', 'rw']:
|
|
return strwide("TYPE format error.\nShould be 'rw' or 'ro'.")
|
|
bk_stat = set_map.get('stat', 'unknown')
|
|
if not bk_stat in ['unknown', 'up', 'down', 'delete', 'maintaining']:
|
|
return strwide("STAT format error.\n"
|
|
"Should be 'unknown', 'up', 'down', 'delete' or 'maintaining'")
|
|
bk_shard = set_map.get('shard_group', None)
|
|
exist = query_data(
|
|
('select a.s_key, a.s_value, b.stat from settings as a, backends as b'
|
|
' where s_key in ("proxy-backend-addresses", "proxy-read-only-backend-addresses")'
|
|
' and a.id=b.settings_id and proxy_id="{0}"').format(pxy))
|
|
bk_full_addr = '{0}:{1}'.format(bk_addr, bk_port)
|
|
if bk_shard:
|
|
bk_full = '{0}@{1}'.format(bk_full_addr, bk_shard)
|
|
else:
|
|
bk_full = bk_addr_list
|
|
for bk in exist:
|
|
if bk[1] == bk_full_addr or bk[1].startswith('{0}@'.format(bk_full_addr)):
|
|
return strwide('backend "{0}:{1}" exists'.format(bk_addr, bk_port))
|
|
if bk_type == 'rw':
|
|
bk_key = 'proxy-backend-addresses'
|
|
else:
|
|
bk_key = 'proxy-read-only-backend-addresses'
|
|
last_id = insertone(
|
|
('insert into settings set proxy_id="{0}", '
|
|
's_key="{1}", s_value="{2}"').format(pxy, bk_key, bk_full))
|
|
if last_id:
|
|
r += 1
|
|
if insertone('insert into backends set settings_id={0}, stat="{1}"'.format(
|
|
last_id, bk_stat)):
|
|
r += 1
|
|
else:
|
|
query('delete from settings where id={0}'.format(last_id))
|
|
print_console('Insert failed. Dirty data exists in table backends.')
|
|
elif c1 == 'sharding':
|
|
shard_keys = set_map.keys()
|
|
for k in ['default_group', 'all_groups']:
|
|
if k in shard_keys:
|
|
shard_keys.remove(k)
|
|
real_k = '_' + k
|
|
v = set_map[k]
|
|
exist = query_data(
|
|
('select id from sharding where proxy_id="{0}" and '
|
|
'shard_db="{1}"').format(pxy, real_k))
|
|
try:
|
|
json.loads(v)
|
|
except ValueError:
|
|
print_console('"{0}" value should be in JSON format.'.format(real_k))
|
|
continue
|
|
if exist:
|
|
print_console('"{0}" exists, skip.'.format(real_k))
|
|
continue
|
|
in_sql = ('insert into sharding set proxy_id="{0}",'
|
|
'shard_db="{1}", shard_table="{1}", partitions="{2}"'
|
|
).format(pxy, real_k, v.replace('"', '\\"'))
|
|
r += query(in_sql)
|
|
# Check if all params is set.
|
|
if shard_keys:
|
|
for kk in ['db', 'table', 'key', 'keytype', 'method',
|
|
'partitions', 'shardnum']:
|
|
if not kk in set_map:
|
|
return strwide('Lack parameter "{0}".'.format(kk))
|
|
if not set_map['keytype'] in ['int', 'str', 'datetime', 'date']:
|
|
return strwide('"keytype" should be [int|str|date|datetime].')
|
|
if not set_map['method'] in ['range', 'hash']:
|
|
return strwide('"method" should be hash or range.')
|
|
try:
|
|
json.loads(set_map['partitions'])
|
|
except ValueError:
|
|
return strwide('"partitions" should be in JSON format.')
|
|
set_map['partitions'] = set_map['partitions'].replace('"', '\\"')
|
|
r += query(
|
|
('insert into sharding set proxy_id="{0}", shard_db="{1[db]}",'
|
|
'shard_table="{1[table]}", pkey="{1[key]}", shard_key_type='
|
|
'"{1[keytype]}", method="{1[method]}", partitions="{1[partitions]}",'
|
|
'logic_shard_num="{1[shardnum]}"').format(pxy, set_map))
|
|
else:
|
|
syntax_error = 1
|
|
print_console_wide('{0} rows added.'.format(r))
|
|
elif c0 == 'update':
|
|
r = 0
|
|
# Set normal start config
|
|
if c1 == 'settings':
|
|
for k, v in set_map.items():
|
|
exist = query_data(('select id from settings where proxy_id="{0}"'
|
|
' and s_key="{1}"').format(pxy, k))
|
|
if exist and exist[0]:
|
|
r += query('update settings set s_value="{0}" where id={1}'\
|
|
.format(v, exist[0][0]))
|
|
else:
|
|
print_console('"{0}" does not exist, use `add settings` instead.')
|
|
print_console_wide('{0} rows changed.'.format(r))
|
|
# Set backends config
|
|
elif c1 == 'backend':
|
|
# `show backends` should be run before update
|
|
# so that backend ndx will be create
|
|
if not BACKENDS_NDX_MAP:
|
|
msg = strwide('Please type `show backends` to confirm backend index.')
|
|
elif not 'ndx' in cond_map:
|
|
msg = strwide("'ndx' should be specified.")
|
|
else:
|
|
try:
|
|
backend = query_data((
|
|
'select a.s_key, a.s_value, b.stat from settings as a,'
|
|
'backends as b where a.id={0} and a.id=b.settings_id'
|
|
).format(BACKENDS_NDX_MAP[pxy][cond_map['ndx']]))
|
|
except IndexError:
|
|
return strwide('PROXY_ID or NDX error.')
|
|
if backend and backend[0]:
|
|
bk_spl = re.split('[@:]', backend[0][1])
|
|
bk_addr = bk_spl[0]
|
|
bk_port = bk_spl[1]
|
|
if len(bk_spl) is 3:
|
|
bk_shard = bk_spl[2]
|
|
else:
|
|
bk_shard = None
|
|
if backend[0][0] == 'proxy-backend-addresses':
|
|
bk_type = 'rw'
|
|
else:
|
|
bk_type = 'ro'
|
|
bk_stat = backend[0][2]
|
|
else:
|
|
return strwide('Backend info error.\n'
|
|
'Retry `show backends` and update again.')
|
|
for k, v in set_map.items():
|
|
if k == 'addr':
|
|
if not re.match(IP_RE_PATTERN, v):
|
|
return strwide('IP format error.')
|
|
bk_addr = v
|
|
elif k == 'port':
|
|
try:
|
|
new_port = int(v)
|
|
if new_port > 65535:
|
|
raise ValueError
|
|
except ValueError:
|
|
return strwide('PORT format error.\nShould be integer in 0~65535')
|
|
bk_port = v
|
|
elif k == 'type':
|
|
if v != 'rw' and v != 'ro':
|
|
print v
|
|
return strwide("TYPE format error.\nShould be 'rw' or 'ro'.")
|
|
bk_type = v
|
|
elif k == 'stat':
|
|
if not v in ['unknown', 'up', 'down', 'delete', 'maintaining']:
|
|
return strwide("STAT format error.\n"
|
|
"Should be 'unknown', 'up', 'down', 'delete' or 'maintaining'")
|
|
bk_stat = v
|
|
elif k == 'shard_group':
|
|
bk_shard = v
|
|
else:
|
|
return strwide("Backends setting key should be either 'addr', 'port', 'type' or 'stat'.")
|
|
if bk_shard:
|
|
new_bk = '{0}:{1}@{2}'.format(bk_addr, bk_port, bk_shard)
|
|
else:
|
|
new_bk = ':'.join([bk_addr, bk_port])
|
|
if bk_type == 'rw':
|
|
bk_key = 'proxy-backend-addresses'
|
|
else:
|
|
bk_key = 'proxy-read-only-backend-addresses'
|
|
r += query('update settings set s_key="{0}", s_value="{1}" where id={2}'.format(
|
|
bk_key, new_bk, BACKENDS_NDX_MAP[pxy][cond_map['ndx']]))
|
|
r += query('update backends set stat="{0}" where settings_id={1}'.format(
|
|
bk_stat, BACKENDS_NDX_MAP[pxy][cond_map['ndx']]))
|
|
print_console_wide('{0} rows changed.'.format(r))
|
|
msg_list = ['']
|
|
msg_list.append('PROXY_ID: {0}'.format(pxy))
|
|
msg_list.append('')
|
|
msg_list.append('{0}{1}'.format('NDX'.ljust(8), cond_map['ndx']))
|
|
msg_list.append('{0}{1}'.format('ADDR'.ljust(8), bk_addr))
|
|
msg_list.append('{0}{1}'.format('PORT'.ljust(8), bk_port))
|
|
msg_list.append('{0}{1}'.format('TYPE'.ljust(8), bk_type))
|
|
msg_list.append('{0}{1}'.format('STAT'.ljust(8), bk_stat))
|
|
msg_list.append('')
|
|
msg = '\n'.join(msg_list)
|
|
elif c1 == 'sharding':
|
|
# shard settings for table need 'db' and 'table'
|
|
shard_keys = set_map.keys()
|
|
try:
|
|
shard_keys.remove('default_group')
|
|
shard_keys.remove('all_groups')
|
|
except ValueError:
|
|
pass
|
|
if shard_keys and (not 'db' in cond_map or not 'table' in cond_map):
|
|
return strwide('"db" and "table" are needed in where clause')
|
|
# set 'default_group' and 'all_groups'
|
|
if 'default_group' in set_map:
|
|
r += query(('update sharding set partitions="{0}" where proxy_id="{1}"'
|
|
' and shard_db="_default_group"').format(
|
|
set_map['default_group'].replace('"', '\\"'), pxy))
|
|
if 'all_groups' in set_map:
|
|
r += query(('update sharding set partitions="{0}" where proxy_id="{1}"'
|
|
' and shard_db="_all_groups"').format(
|
|
set_map['all_groups'].replace('"', '\\"'), pxy))
|
|
# Normal shard settings
|
|
for k in shard_keys:
|
|
exist = query_data(('select id from sharding where proxy_id={0}'
|
|
' and shard_db="{1}" and shard_table="{2}"').format(
|
|
pxy, cond_map['db'], cond_map['table']))
|
|
if not exist or not exist[0]:
|
|
return strwide('shard rule does not exists, use "add sharding" instead')
|
|
rule_id = exist[0][0]
|
|
shard_set_list = []
|
|
for k, v in set_map.items():
|
|
if k == 'db':
|
|
shard_set_list.append('shard_db="{0}"'.format(v))
|
|
elif k == 'table':
|
|
shard_set_list.append('shard_table="{0}"'.format(v))
|
|
elif k == 'key':
|
|
shard_set_list.append('pkey="{0}"'.format(v))
|
|
elif k == 'keytype':
|
|
if not v in ['int', 'str', 'datetime', 'date']:
|
|
return strwide('"keytype" should be [int|str|date|datetime]')
|
|
shard_set_list.append('shard_key_type="{0}"'.format(v))
|
|
elif k == 'method':
|
|
if not v in ['range', 'hash']:
|
|
return strwide('"method" should be hash or range')
|
|
shard_set_list.append('method="{0}"'.format(v))
|
|
elif k == 'partitions':
|
|
try:
|
|
json.loads(v)
|
|
except ValueError:
|
|
return strwide('"partitions" should be a json string')
|
|
shard_set_list.append('partitions="{0}"'.format(v.replace('"', '\\"')))
|
|
r += query('update sharding set {0} where id={id}'.format(
|
|
','.join(shard_set_list), id=rule_id))
|
|
print_console_wide('{0} rows changed.'.format(r))
|
|
else:
|
|
syntax_error = 1
|
|
if syntax_error is 1:
|
|
msg = strwide('Syntax error with command `{0}`').format(cmd)
|
|
|
|
###
|
|
# delete settings, include sharding and backend
|
|
###
|
|
elif cmd.startswith('remove '):
|
|
syntax_error = 0
|
|
msg = ''
|
|
r = 0
|
|
rm_pat = ('^\s*remove\s+([a-zA-Z_]+)\s+'
|
|
'where\s+(\S+\s*=\s*(?:(\'|").*?(?<!\\\\)\\3|\d+)'
|
|
'(?:\s+and\s+\S+\s*=\s*(?:(\'|").*?(?<!\\\\)\\4|\d+))*)\s*$')
|
|
res = re.match(rm_pat, cmd, re.IGNORECASE)
|
|
if res is None:
|
|
syntax_error = 1
|
|
else:
|
|
c1 = res.group(1)
|
|
c2 = res.group(2)
|
|
pxy = PROXY_ID
|
|
cond_map = {}
|
|
# Parse where clause
|
|
where_pat = '(?:^|\s+)(\S+\s*=\s*(?:(\'|").*?(?<!\\\\)\\2|\d+))(?:\s+|$)'
|
|
for where_group in re.findall(where_pat, c2):
|
|
where_item = where_group[0]
|
|
k, v = where_item.split('=')
|
|
k = k.strip().lower()
|
|
v = v.strip()
|
|
if re.match(r'^(\'|").*?\1$', v):
|
|
v = v[1:-1]
|
|
cond_map[k] = v
|
|
if 'proxy_id' in cond_map:
|
|
pxy = cond_map['proxy_id']
|
|
if not pxy:
|
|
msg = strwide('`PROXY_ID` need to be specified')
|
|
else:
|
|
if c1 == 'settings':
|
|
for k, v in cond_map.items():
|
|
del_sql = 'delete from settings where proxy_id="{0}"'.format(pxy)
|
|
if 'key' in cond_map:
|
|
del_sql += ' and s_key="{0}"'.format(cond_map['key'])
|
|
if 'value' in cond_map:
|
|
del_sql += ' and s_value="{0}"'.format(cond_map['value'])
|
|
r += query(del_sql)
|
|
elif c1 == 'backend':
|
|
if not BACKENDS_NDX_MAP:
|
|
msg = strwide('Please type `show backends` to confirm backend index.')
|
|
elif not 'ndx' in cond_map:
|
|
msg = strwide("'ndx' should be specified.")
|
|
else:
|
|
try:
|
|
bk_id = BACKENDS_NDX_MAP[pxy][cond_map['ndx']]
|
|
r += query('delete from settings where id={0}'.format(bk_id))
|
|
r += query('delete from backends where settings_id={0}'
|
|
.format(bk_id))
|
|
except IndexError:
|
|
return strwide('PROXY_ID or NDX error.')
|
|
elif c1 == 'sharding':
|
|
if not 'db' in cond_map or not 'table' in cond_map:
|
|
return strwide('"db" and "table" must be specified.')
|
|
del_sql = ('delete from sharding where proxy_id="{0}"'
|
|
' and shard_db="{1}" and shard_table="{2}"'.format(
|
|
pxy, cond_map['db'], cond_map['table']))
|
|
r += query(del_sql)
|
|
else:
|
|
syntax_error = 1
|
|
if syntax_error is 1:
|
|
msg = strwide('Syntax error with command `{0}`').format(cmd)
|
|
else:
|
|
print_console_wide('{0} rows deleted.'.format(r))
|
|
|
|
###
|
|
# Set a flag in mysql so that proxy will reload when detect
|
|
###
|
|
elif cmd.startswith('apply ') or cmd == 'apply':
|
|
pxy = PROXY_ID
|
|
try:
|
|
proxy_id = cmd.split()[1].strip('\'" ')
|
|
except IndexError:
|
|
proxy_id = None
|
|
if proxy_id:
|
|
pxy = proxy_id
|
|
if not pxy:
|
|
return strwide('proxy_id should be specified.')
|
|
query('update switch set apply_switch=1 where proxy_id="{0}"'.format(pxy))
|
|
msg = strwide('Apply config success.\nProxy will reload in a short while.')
|
|
|
|
###
|
|
# Initialize new proxy settings
|
|
###
|
|
elif cmd.startswith('init '):
|
|
proxy_id = cmd.split()[1]
|
|
if not proxy_id:
|
|
return strwide('No `proxy_id` specified.')
|
|
try:
|
|
r = init_new(proxy_id)
|
|
except EOFError:
|
|
print_console('User quit.')
|
|
return ''
|
|
if r is 0:
|
|
print_console('User quit.')
|
|
return ''
|
|
msg = strwide('Init complete.')
|
|
|
|
###
|
|
# Delete proxy settings
|
|
###
|
|
elif cmd.startswith('delete '):
|
|
proxy_id = cmd.split()[1]
|
|
r = delete_one(proxy_id)
|
|
if r:
|
|
msg = strwide('Delete done.')
|
|
else:
|
|
msg = ''
|
|
|
|
###
|
|
# Create tables which store settings if not exist
|
|
###
|
|
elif cmd == 'create tables':
|
|
if create_tables() is 0:
|
|
msg = 'Create tables DONE!'
|
|
else:
|
|
msg = 'Error!'
|
|
|
|
###
|
|
# Show help message
|
|
###
|
|
elif cmd == 'h' or cmd == 'help':
|
|
msg = show_help()
|
|
|
|
###
|
|
# Quit script
|
|
###
|
|
elif cmd == 'q' or cmd == 'quit':
|
|
#sys.exit()
|
|
raise EOFError
|
|
|
|
###
|
|
# Unrecognized command
|
|
###
|
|
else:
|
|
msg = strwide('Unkonwn command `{0}`.\n'
|
|
'Type `help` to see command list.').format(cmd)
|
|
return msg
|
|
|
|
|
|
# Init settings for a proxy instance
|
|
def init_new(id):
|
|
init = init_fetchone('Init {0}? [y/N]: '.format(id), 'n')
|
|
if not init:
|
|
return 0
|
|
settings = {}
|
|
settings['proxy-address'] = init_fetchone(
|
|
'proxy-address=? [127.0.0.1:4440]: ', '127.0.0.1:4440', False)
|
|
settings['proxy-backend-addresses'] = init_fetchmulti(
|
|
'proxy-backend-addresses=? [127.0.0.1:3306]: ', '127.0.0.1:3306')
|
|
settings['log-file'] = init_fetchone('log-file=? []: ', '', False)
|
|
settings['plugin-dir'] = init_fetchone('plugin-dir=? []: ', '', False)
|
|
admin = init_fetchone('Set admin? [Y/n]: ', 'y', )
|
|
if admin:
|
|
plugins = ['admin', 'proxy']
|
|
settings['admin-address'] = init_fetchone(
|
|
'admin-address=? [127.0.0.1:4441]: ', '127.0.0.1:4441', False)
|
|
settings['admin-lua-script'] = init_fetchone('admin-lua-script=? []: ', '', False)
|
|
settings['admin-username'] = init_fetchone(
|
|
'admin-username=? [admin]: ', 'admin', False)
|
|
settings['admin-password'] = init_fetchone(
|
|
'admin-password=? [admin_pass]: ', 'admin_pass', False)
|
|
else:
|
|
plugins = ['proxy']
|
|
settings['plugins'] = plugins
|
|
settings['default-username'] = init_fetchone('default-username=? [root]: ', 'root', False)
|
|
settings['default-db'] = init_fetchone('default-db=? [test]: ', 'test', False)
|
|
settings['default-pool-size'] = init_fetchone(
|
|
'default-pool-size=? [50]: ', '50', False, check_func=check_int)
|
|
settings['user-pwd'] = init_fetchone('user-pwd=? [root@123456]: ', 'root@123456', False)
|
|
settings['auto-connect'] = init_fetchone('auto-connect=? [Y/n]: ', 'y')
|
|
settings['app-user-pwd'] = init_fetchone('app-user-pwd=? [root@123]: ', 'root@123', False)
|
|
settings['sharding-mode'] = init_fetchone('sharding-mode=? [y/N]: ', 'n')
|
|
settings['log-level'] = init_fetchone('log-level=? [debug]: ', 'debug', False)
|
|
print_console('\n'+'-'*20)
|
|
print_console('Init confirmation:')
|
|
query_data = []
|
|
for k in sorted(settings.keys()):
|
|
v = settings[k]
|
|
print('{0}={1}'.format(k, v))
|
|
if isinstance(v, list):
|
|
for v_obj in v:
|
|
query_data.append((k, v_obj))
|
|
else:
|
|
query_data.append((k, v))
|
|
print_console('-'*20+'\n')
|
|
if not init_fetchone('Continue? [Y/n]: ', 'y'):
|
|
return 0
|
|
init_sql = 'insert into settings set proxy_id="{0}", s_key=%s, s_value=%s'.format(id)
|
|
querymany(init_sql, query_data)
|
|
# create switch
|
|
query('insert into switch set proxy_id="{0}", apply_switch=0'.format(id))
|
|
return
|
|
|
|
|
|
def init_fetchone(intro, default, yn=True, check_func=None):
|
|
while 1:
|
|
r = raw_input(intro)
|
|
if r == '':
|
|
r = default
|
|
if yn:
|
|
r = r.lower()
|
|
if r == 'y':
|
|
return 1
|
|
elif r == 'n':
|
|
return 0
|
|
else:
|
|
print('Input error, type `y` or `n`')
|
|
continue
|
|
else:
|
|
if check_func is not None and not check_func(r):
|
|
print('Value error.')
|
|
continue
|
|
return r
|
|
|
|
|
|
def init_fetchmulti(intro, default):
|
|
ret = []
|
|
while 1:
|
|
r = raw_input(intro)
|
|
if r == '':
|
|
r = default
|
|
ret.append(r)
|
|
r_next = init_fetchone('Another one? [y/N]: ', 'n')
|
|
if not r_next:
|
|
break
|
|
return ret
|
|
|
|
|
|
def check_int(n):
|
|
try:
|
|
int(n)
|
|
except ValueError:
|
|
print('Integer is needed')
|
|
return 0
|
|
return 1
|
|
|
|
|
|
# Delete settings for ONE proxy
|
|
def delete_one(id):
|
|
r = init_fetchone('Delete {0}? (Cannot recover) [y/N]: ', 'n')
|
|
if r:
|
|
ret = query('delete from settings where proxy_id={0}'.format(id))
|
|
print_console_wide('{0} rows changed.'.format(ret))
|
|
return r
|
|
|
|
|
|
# Show prompt message
|
|
def show_prompt():
|
|
print_console_wide('### Welcome to Mysql-Proxy Console. ###')
|
|
|
|
|
|
# Create db tables
|
|
def create_tables():
|
|
csql = []
|
|
csql.append(
|
|
'CREATE TABLE IF NOT EXISTS `settings` ('
|
|
'`id` int(11) NOT NULL AUTO_INCREMENT,'
|
|
'`proxy_id` varchar(64) NOT NULL,'
|
|
'`s_key` varchar(64) NOT NULL,'
|
|
'`s_value` varchar(255) NOT NULL,'
|
|
'PRIMARY KEY (`id`)'
|
|
') ENGINE=InnoDB DEFAULT CHARSET=utf8')
|
|
csql.append(
|
|
'CREATE TABLE IF NOT EXISTS `sharding` ('
|
|
'`id` int(11) NOT NULL AUTO_INCREMENT,'
|
|
'`proxy_id` varchar(64) NOT NULL,'
|
|
'`shard_db` varchar(32) NOT NULL,'
|
|
'`shard_table` varchar(64) NOT NULL,'
|
|
'`pkey` varchar(32) DEFAULT NULL,'
|
|
"`shard_key_type` enum('int','str','datetime') DEFAULT NULL,"
|
|
"`method` enum('hash','range') DEFAULT NULL,"
|
|
'`partitions` varchar(255) NOT NULL,'
|
|
'`logic_shard_num` int(11) DEFAULT NULL,'
|
|
'PRIMARY KEY (`id`)'
|
|
') ENGINE=InnoDB DEFAULT CHARSET=utf8')
|
|
csql.append(
|
|
'CREATE TABLE IF NOT EXISTS `backends` ('
|
|
'`id` int(11) NOT NULL AUTO_INCREMENT,'
|
|
'`settings_id` int(11) DEFAULT NULL,'
|
|
'`stat` varchar(32) NOT NULL,'
|
|
'PRIMARY KEY (`id`),'
|
|
'UNIQUE KEY `settings_id` (`settings_id`)'
|
|
') ENGINE=InnoDB DEFAULT CHARSET=utf8')
|
|
csql.append(
|
|
'CREATE TABLE IF NOT EXISTS `switch` ('
|
|
'`proxy_id` varchar(64) NOT NULL,'
|
|
"`apply_switch` tinyint(4) NOT NULL DEFAULT '0',"
|
|
'UNIQUE KEY `proxy_id` (`proxy_id`)'
|
|
') ENGINE=InnoDB DEFAULT CHARSET=utf8')
|
|
# Ignore table exists warning
|
|
from warnings import filterwarnings
|
|
filterwarnings('ignore', category=MySQLdb.Warning)
|
|
for sql in csql:
|
|
query(sql)
|
|
return 0
|
|
|
|
|
|
# Show help message
|
|
def show_help():
|
|
return '''
|
|
List all commands:
|
|
|
|
show Show all config
|
|
show backends Show backends config
|
|
show sharding Show sharding config
|
|
|
|
add settings set [option]=[value], ... where proxy_id=[id]
|
|
update settings set [key]=[value] where proxy_id=[id]
|
|
remove settings where proxy_id=[id] and key=[value] and value=[value]
|
|
|
|
add backend set [option]=[value], ... where proxy_id=[id]
|
|
update backend set [option]=[value], ... where proxy_id=[id] and ndx=[index]
|
|
remove backend where proxy_id=[id] and ndx=[index]
|
|
|
|
add sharding set [option]=[value], ... where proxy_id=[id]
|
|
update sharding set [option]=[value], ... where proxy_id=[id] and db=[value] and table=[value]
|
|
remove sharding where proxy_id=[id] and db=[value] and table=[value]
|
|
|
|
init [proxy_id] Init config for `proxy_id`
|
|
use [proxy_id] Can ignore "where proxy_id=..." at follow when add/update/remove
|
|
delete [proxy_id] Delete config for `proxy_id`
|
|
|
|
apply [proxy_id] Apply config to a proxy
|
|
|
|
create tables Create tables if not exist on server
|
|
|
|
help Show this message
|
|
quit exit
|
|
|
|
--------
|
|
|
|
Backend option
|
|
|
|
addr IP address of mysql
|
|
port Port of mysql
|
|
type read only or read write [rw|ro]
|
|
stat status of backend [unknown|up|down|delete|maintaining]
|
|
shard_group sharding group name
|
|
|
|
----
|
|
|
|
Sharding option
|
|
|
|
db database name
|
|
table table name
|
|
key shard column
|
|
keytype type of key, [int|str|datetime|...]
|
|
method range or hash [range|hash]
|
|
partitions detail of sharding
|
|
shardnum total number of shard partitions
|
|
default_group default sharding group
|
|
all_groups all groups list
|
|
|
|
'''
|
|
|
|
|
|
if __name__ == '__main__':
|
|
show_prompt()
|
|
try:
|
|
while 1:
|
|
cmd = raw_input('\033[1;32m>\033[0m ')
|
|
cmd = cmd.strip().lower().rstrip(';')
|
|
if not cmd:
|
|
continue
|
|
print_console(run_cmd(cmd))
|
|
except EOFError:
|
|
print_console_wide('### Good Bye! ###')
|
|
except:
|
|
traceback.print_exc()
|
|
print_console_wide('Error detected! Exit.')
|
|
finally:
|
|
if conn:
|
|
conn.close()
|