mirror of
https://gitee.com/wangbin579/cetus.git
synced 2024-12-02 03:47:41 +08:00
Merge branch 'master' into shard-online
Conflicts: plugins/admin/admin-plugin.c
This commit is contained in:
commit
f073746e7a
@ -194,7 +194,7 @@ Default: 10485760 (10MB)
|
||||
|
||||
每个后端返回结果集的最大数量
|
||||
|
||||
> max-resp-size = 1024
|
||||
> max-resp-size = 1048576
|
||||
|
||||
### master-preferred
|
||||
|
||||
@ -322,6 +322,16 @@ Default: slave-delay-down / 2 (seconds)
|
||||
|
||||
> slave-delay-recover = 5
|
||||
|
||||
## MGR配置
|
||||
|
||||
### group-replication-mode
|
||||
|
||||
Default: 0 (普通MySQL集群)
|
||||
|
||||
当后端MySQL集群是单主模式的MGR时,该参数设置为1,Cetus可以自动检测MGR集群的主从状态及节点主从角色变换。目前Cetus只支持单主MGR模式。
|
||||
|
||||
> group-replication-mode = 1
|
||||
|
||||
## 其它
|
||||
|
||||
### verbose-shutdown
|
||||
|
@ -16,8 +16,8 @@
|
||||
| select conn_details from backend | display the idle conns |
|
||||
| select * from backends | list the backends and their state |
|
||||
| show connectionlist [\<num>] | show \<num> connections |
|
||||
| show allow_ip \<module> | show allow_ip rules of module, currently admin\|proxy\|shard |
|
||||
| show deny_ip \<module> | show deny_ip rules of module, currently admin\|proxy\|shard |
|
||||
| show allow_ip \<module> | show allow_ip rules of module, currently admin\|proxy |
|
||||
| show deny_ip \<module> | show deny_ip rules of module, currently admin\|proxy |
|
||||
| add allow_ip \<module> \<address> | add address to white list of module |
|
||||
| add deny_ip \<module> \<address> | add address to black list of module |
|
||||
| delete allow_ip \<module> \<address> | delete address from white list of module |
|
||||
@ -191,21 +191,13 @@ update后端的state只包括up|down|maintaining三种状态,delete/remove后
|
||||
* `pool.max_resp_len` 最大结果集长度
|
||||
* `pool.master_preferred` 是否只允许走主库
|
||||
|
||||
### 修改连接池/通用配置
|
||||
### 修改配置
|
||||
|
||||
`config set [<item>]`
|
||||
|
||||
`config set common.[option] = [value]`修改基本配置
|
||||
`config set <key>=<value>`
|
||||
|
||||
例如
|
||||
|
||||
>config set common.slave_delay_down = 3
|
||||
|
||||
`config set pool.[option] = [value]`修改连接池配置
|
||||
|
||||
例如
|
||||
|
||||
>config set pool.max_pool_size = 200
|
||||
>config set slave_delay_down = 3
|
||||
|
||||
### 查看参数配置
|
||||
|
||||
@ -315,14 +307,6 @@ update后端的state只包括up|down|maintaining三种状态,delete/remove后
|
||||
|
||||
`delete from user_pwd where user='<name>'`
|
||||
|
||||
删除特定用户的后端密码。
|
||||
|
||||
例如
|
||||
|
||||
>delete from user_pwd where user='root'
|
||||
|
||||
`delete from app_user_pwd where user='<name>'`
|
||||
|
||||
删除特定用户连接Proxy的密码。
|
||||
|
||||
例如
|
||||
@ -467,7 +451,7 @@ Proxy: 仅配置IP,代表限制该IP来源所有用户的访问;配置User@I
|
||||
| query_time_table.2 | 5 |
|
||||
| query_time_table.5 | 1 |
|
||||
|
||||
表示用时1秒的SQL有3条,用时2秒的SQL有5条,用时5秒的SQL有1条
|
||||
表示用时1毫秒的SQL有3条,用时2毫秒的SQL有5条,用时5毫秒的SQL有1条
|
||||
|
||||
```
|
||||
说明
|
||||
@ -507,3 +491,11 @@ Com_select_bad_key 分库键未识别导致走全库的SELECT数量
|
||||
### 减少系统占用的内存
|
||||
|
||||
`reduce memory`
|
||||
# Cetus 读写分离版本管理手册
|
||||
|
||||
## 前言
|
||||
|
||||
**有配置修改均能动态生效,配置更改后请务必修改原始配置文件,以确保下次重启时配置能够保留。**
|
||||
|
||||
## 查看帮助
|
||||
|
||||
|
@ -17,8 +17,8 @@
|
||||
| select * from backends | list the backends and their state |
|
||||
| select * from groups | list the backends and their groups |
|
||||
| show connectionlist [\<num>] | show \<num> connections |
|
||||
| show allow_ip \<module> | show allow_ip rules of module, currently admin\|proxy\|shard |
|
||||
| show deny_ip \<module> | show deny_ip rules of module, currently admin\|proxy\|shard |
|
||||
| show allow_ip \<module> | show allow_ip rules of module, currently admin\|shard |
|
||||
| show deny_ip \<module> | show deny_ip rules of module, currently admin\|shard |
|
||||
| add allow_ip \<module> \<address> | add address to white list of module |
|
||||
| add deny_ip \<module> \<address> | add address to black list of module |
|
||||
| delete allow_ip \<module> \<address> | delete address from white list of module |
|
||||
@ -215,21 +215,13 @@ update后端的state只包括up|down|maintaining三种状态,delete/remove后
|
||||
* `pool.max_resp_len` 最大结果集长度
|
||||
* `pool.master_preferred` 是否只允许走主库
|
||||
|
||||
### 修改连接池/通用配置
|
||||
### 修改配置
|
||||
|
||||
`config set [<item>]`
|
||||
|
||||
`config set common.[option] = [value]`修改基本配置
|
||||
`config set <key>=<value>`
|
||||
|
||||
例如
|
||||
|
||||
>config set common.slave_delay_down = 3
|
||||
|
||||
`config set pool.[option] = [value]`修改连接池配置
|
||||
|
||||
例如
|
||||
|
||||
>config set pool.max_pool_size = 200
|
||||
>config set slave_delay_down = 3
|
||||
|
||||
### 查看参数配置
|
||||
|
||||
@ -517,7 +509,7 @@ Shard: 仅配置IP,代表限制该IP来源所有用户的访问;配置User@I
|
||||
| query_time_table.2 | 5 |
|
||||
| query_time_table.5 | 1 |
|
||||
|
||||
表示用时1秒的SQL有3条,用时2秒的SQL有5条,用时5秒的SQL有1条
|
||||
表示用时1毫秒的SQL有3条,用时2毫秒的SQL有5条,用时5毫秒的SQL有1条
|
||||
|
||||
```
|
||||
说明
|
||||
|
@ -2556,12 +2556,12 @@ network_mysqld_proxy_plugin_get_options(chassis_plugin_config *config)
|
||||
|
||||
chassis_options_add(&opts, "proxy-read-timeout",
|
||||
0, 0, OPTION_ARG_DOUBLE, &(config->read_timeout_dbl),
|
||||
"read timeout in seconds (default: 10 minuates)", NULL,
|
||||
"read timeout in seconds (default: 10 minutes)", NULL,
|
||||
assign_proxy_read_timeout, show_proxy_read_timeout, ALL_OPTS_PROPERTY);
|
||||
|
||||
chassis_options_add(&opts, "proxy-write-timeout",
|
||||
0, 0, OPTION_ARG_DOUBLE, &(config->write_timeout_dbl),
|
||||
"write timeout in seconds (default: 10 minuates)", NULL,
|
||||
"write timeout in seconds (default: 10 minutes)", NULL,
|
||||
assign_proxy_write_timeout, show_proxy_write_timeout, ALL_OPTS_PROPERTY);
|
||||
|
||||
chassis_options_add(&opts, "proxy-allow-ip",
|
||||
|
@ -80,6 +80,8 @@ struct chassis_plugin_config {
|
||||
gdouble connect_timeout_dbl;
|
||||
/* exposed in the config as double */
|
||||
gdouble read_timeout_dbl;
|
||||
|
||||
gdouble dist_tran_decided_read_timeout_dbl;
|
||||
/* exposed in the config as double */
|
||||
gdouble write_timeout_dbl;
|
||||
|
||||
@ -1943,6 +1945,9 @@ NETWORK_MYSQLD_PLUGIN_PROTO(proxy_init)
|
||||
if (config->read_timeout_dbl >= 0) {
|
||||
chassis_timeval_from_double(&con->read_timeout, config->read_timeout_dbl);
|
||||
}
|
||||
if (config->dist_tran_decided_read_timeout_dbl >= 0) {
|
||||
chassis_timeval_from_double(&con->dist_tran_decided_read_timeout, config->dist_tran_decided_read_timeout_dbl);
|
||||
}
|
||||
if (config->write_timeout_dbl >= 0) {
|
||||
chassis_timeval_from_double(&con->write_timeout, config->write_timeout_dbl);
|
||||
}
|
||||
@ -2045,6 +2050,7 @@ network_mysqld_shard_plugin_new(void)
|
||||
/* use negative values as defaults to make them ignored */
|
||||
config->connect_timeout_dbl = -1.0;
|
||||
config->read_timeout_dbl = -1.0;
|
||||
config->dist_tran_decided_read_timeout_dbl = -1.0;
|
||||
config->write_timeout_dbl = -1.0;
|
||||
|
||||
return config;
|
||||
@ -2231,6 +2237,44 @@ assign_proxy_read_timeout(const gchar *newval, gpointer param) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
static gchar*
|
||||
show_proxy_dist_tran_decided_read_timeout(gpointer param) {
|
||||
struct external_param *opt_param = (struct external_param *)param;
|
||||
gint opt_type = opt_param->opt_type;
|
||||
if(CAN_SHOW_OPTS_PROPERTY(opt_type)) {
|
||||
return g_strdup_printf("%lf (s)", config->dist_tran_decided_read_timeout_dbl);
|
||||
}
|
||||
if(CAN_SAVE_OPTS_PROPERTY(opt_type)) {
|
||||
if(config->dist_tran_decided_read_timeout_dbl == -1) {
|
||||
return NULL;
|
||||
}
|
||||
return g_strdup_printf("%lf", config->dist_tran_decided_read_timeout_dbl);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static gint
|
||||
assign_proxy_dist_tran_decided_read_timeout(const gchar *newval, gpointer param) {
|
||||
gint ret = ASSIGN_ERROR;
|
||||
struct external_param *opt_param = (struct external_param *)param;
|
||||
gint opt_type = opt_param->opt_type;
|
||||
if(CAN_ASSIGN_OPTS_PROPERTY(opt_type)) {
|
||||
if(NULL != newval) {
|
||||
gdouble value = 0;
|
||||
if(try_get_double_value(newval, &value)) {
|
||||
config->dist_tran_decided_read_timeout_dbl = value;
|
||||
ret = ASSIGN_OK;
|
||||
} else {
|
||||
ret = ASSIGN_VALUE_INVALID;
|
||||
}
|
||||
} else {
|
||||
ret = ASSIGN_VALUE_INVALID;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
static gchar*
|
||||
show_proxy_write_timeout(gpointer param) {
|
||||
struct external_param *opt_param = (struct external_param *)param;
|
||||
@ -2358,12 +2402,18 @@ network_mysqld_shard_plugin_get_options(chassis_plugin_config *config)
|
||||
|
||||
chassis_options_add(&opts, "proxy-read-timeout",
|
||||
0, 0, OPTION_ARG_DOUBLE, &(config->read_timeout_dbl),
|
||||
"read timeout in seconds (default: 10 minuates)", NULL,
|
||||
"read timeout in seconds (default: 10 minutes)", NULL,
|
||||
assign_proxy_read_timeout, show_proxy_read_timeout, ALL_OPTS_PROPERTY);
|
||||
|
||||
chassis_options_add(&opts, "proxy-xa-commit-or-rollback-read-timeout",
|
||||
0, 0, OPTION_ARG_DOUBLE, &(config->dist_tran_decided_read_timeout_dbl),
|
||||
"xa commit or rollback read timeout in seconds (default: 30 seconds)", NULL,
|
||||
assign_proxy_dist_tran_decided_read_timeout,
|
||||
show_proxy_dist_tran_decided_read_timeout, ALL_OPTS_PROPERTY);
|
||||
|
||||
chassis_options_add(&opts, "proxy-write-timeout",
|
||||
0, 0, OPTION_ARG_DOUBLE, &(config->write_timeout_dbl),
|
||||
"write timeout in seconds (default: 10 minuates)", NULL,
|
||||
"write timeout in seconds (default: 10 minutes)", NULL,
|
||||
assign_proxy_write_timeout, show_proxy_write_timeout, ALL_OPTS_PROPERTY);
|
||||
|
||||
chassis_options_add(&opts, "proxy-allow-ip",
|
||||
|
@ -32,6 +32,7 @@
|
||||
#include <string.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/time.h>
|
||||
#include <arpa/inet.h>
|
||||
|
||||
#include "cetus-users.h"
|
||||
#include "cetus-util.h"
|
||||
@ -40,10 +41,14 @@
|
||||
#include "glib-ext.h"
|
||||
#include "sharding-config.h"
|
||||
|
||||
#include <netdb.h>
|
||||
|
||||
#define CHECK_ALIVE_INTERVAL 3
|
||||
#define CHECK_ALIVE_TIMES 2
|
||||
#define CHECK_DELAY_INTERVAL 300 * 1000 /* 300ms */
|
||||
|
||||
#define ADDRESS_LEN 64
|
||||
|
||||
/* Each backend should have db <proxy_heart_beat> and table <tb_heartbeat> */
|
||||
#define HEARTBEAT_DB "proxy_heart_beat"
|
||||
|
||||
@ -124,6 +129,255 @@ get_mysql_connection(cetus_monitor_t *monitor, char *addr)
|
||||
return conn;
|
||||
}
|
||||
|
||||
static gint
|
||||
get_ip_by_name(const gchar *name, gchar *ip) {
|
||||
if(ip == NULL || name == NULL) return -1;
|
||||
char **pptr;
|
||||
struct hostent *hptr;
|
||||
hptr = gethostbyname(name);
|
||||
if(hptr == NULL) {
|
||||
g_debug("gethostbyname failed.");
|
||||
return -1;
|
||||
}
|
||||
for(pptr = hptr->h_addr_list; *pptr != NULL; pptr++) {
|
||||
if(inet_ntop(hptr->h_addrtype, *pptr, ip, ADDRESS_LEN)) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
static gint
|
||||
slave_list_compare(gconstpointer a, gconstpointer b) {
|
||||
gchar *old_value = (gchar *)a;
|
||||
gchar *search_value = (gchar *)b;
|
||||
return strcasecmp(old_value, search_value);
|
||||
}
|
||||
|
||||
static void
|
||||
group_replication_detect(network_backends_t *bs, cetus_monitor_t *monitor)
|
||||
{
|
||||
if(bs == NULL) return ;
|
||||
|
||||
GList *slave_list = NULL;
|
||||
gchar master_addr[ADDRESS_LEN] = {""};
|
||||
gchar slave_addr[ADDRESS_LEN] = {""};
|
||||
gchar server_group[64] = {""};
|
||||
guint has_master = 0;
|
||||
guint i = 0;
|
||||
guint backends_num = 0;
|
||||
|
||||
gchar *sql1 = "SELECT `MEMBER_HOST`, `MEMBER_PORT` FROM "
|
||||
"performance_schema.replication_group_members "
|
||||
"WHERE MEMBER_STATE = 'ONLINE' AND MEMBER_ID = "
|
||||
"(SELECT VARIABLE_VALUE FROM performance_schema.global_status "
|
||||
"WHERE VARIABLE_NAME = 'group_replication_primary_member') ";
|
||||
gchar *sql2 = "SELECT `MEMBER_HOST`, `MEMBER_PORT` FROM "
|
||||
"performance_schema.replication_group_members "
|
||||
"WHERE MEMBER_STATE = 'ONLINE' AND MEMBER_ID <> "
|
||||
"(SELECT VARIABLE_VALUE FROM performance_schema.global_status "
|
||||
"WHERE VARIABLE_NAME = 'group_replication_primary_member') ";
|
||||
|
||||
backends_num = network_backends_count(bs);
|
||||
for (i = 0; i < backends_num; i++) {
|
||||
network_backend_t *backend = network_backends_get(bs, i);
|
||||
if (backend->state == BACKEND_STATE_MAINTAINING)
|
||||
continue;
|
||||
|
||||
char *backend_addr = backend->addr->name->str;
|
||||
MYSQL *conn = get_mysql_connection(monitor, backend_addr);
|
||||
int result = 0;
|
||||
MYSQL_RES *rs_set = NULL;
|
||||
MYSQL_ROW row = NULL;
|
||||
gchar ip[ADDRESS_LEN] = {""};
|
||||
gchar old_master[ADDRESS_LEN] = {""};
|
||||
|
||||
if(conn == NULL) {
|
||||
g_debug("get connection failed. error: %d, text: %s, backend: %s",
|
||||
mysql_errno(conn), mysql_error(conn), backend_addr);
|
||||
continue;
|
||||
}
|
||||
|
||||
if(mysql_real_query(conn, L(sql1))) {
|
||||
g_debug("select primary info failed for group_replication. error: %d, text: %s, backend: %s",
|
||||
mysql_errno(conn), mysql_error(conn), backend_addr);
|
||||
continue;
|
||||
}
|
||||
|
||||
rs_set = mysql_store_result(conn);
|
||||
if(rs_set == NULL) {
|
||||
g_debug("get primary info result set failed for group_replication. error: %d, text: %s, backend: %s",
|
||||
mysql_errno(conn), mysql_error(conn), backend_addr);
|
||||
continue;
|
||||
}
|
||||
|
||||
row = mysql_fetch_row(rs_set);
|
||||
if(row == NULL || row[0] == NULL || row[1] == NULL) {
|
||||
g_debug("get primary info rows failed for group_replication. error: %d, text: %s, backend: %s",
|
||||
mysql_errno(conn), mysql_error(conn), backend_addr);
|
||||
mysql_free_result(rs_set);
|
||||
continue;
|
||||
}
|
||||
|
||||
if((get_ip_by_name(row[0], ip) != 0) || ip[0] == '\0') {
|
||||
g_debug("get master ip by name failed. error: %d, text: %s, backend: %s",
|
||||
mysql_errno(conn), mysql_error(conn), backend_addr);
|
||||
mysql_free_result(rs_set);
|
||||
continue;
|
||||
}
|
||||
|
||||
memcpy(old_master, master_addr, strlen(master_addr));
|
||||
snprintf(master_addr, ADDRESS_LEN, "%s:%s", ip, row[1]);
|
||||
|
||||
if(old_master[0] != '\0' && strcasecmp(old_master, master_addr) != 0) {
|
||||
g_warning("exists more than one masters.");
|
||||
return ;
|
||||
} else if (old_master[0] != '\0' && strcasecmp(old_master, master_addr) == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
mysql_free_result(rs_set);
|
||||
rs_set = NULL;
|
||||
|
||||
if(master_addr[0] == '\0') {
|
||||
g_debug("get master address failed. error: %d, text: %s, backend: %s",
|
||||
mysql_errno(conn), mysql_error(conn), backend_addr);
|
||||
continue;
|
||||
}
|
||||
|
||||
if(strcasecmp(backend_addr, master_addr)) {
|
||||
conn = get_mysql_connection(monitor, master_addr);
|
||||
if(conn == NULL) {
|
||||
g_debug("get connection failed. error: %d, text: %s, backend: %s",
|
||||
mysql_errno(conn), mysql_error(conn), master_addr);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if(mysql_real_query(conn, L(sql2))) {
|
||||
g_debug("select slave info failed for group_replication. error: %d, text: %s, backend: %s",
|
||||
mysql_errno(conn), mysql_error(conn), master_addr);
|
||||
continue;
|
||||
}
|
||||
|
||||
rs_set = mysql_store_result(conn);
|
||||
if(rs_set == NULL) {
|
||||
g_debug("get slave info result set failed for group_replication. error: %d, text: %s",
|
||||
mysql_errno(conn), mysql_error(conn));
|
||||
continue;
|
||||
}
|
||||
while(row=mysql_fetch_row(rs_set)) {
|
||||
memset(ip, 0, ADDRESS_LEN);
|
||||
if((get_ip_by_name(row[0], ip) != 0) || ip[0] == '\0') {
|
||||
g_debug("get slave ip by name failed. error: %d, text: %s",
|
||||
mysql_errno(conn), mysql_error(conn));
|
||||
mysql_free_result(rs_set);
|
||||
continue;
|
||||
}
|
||||
memset(slave_addr, 0, ADDRESS_LEN);
|
||||
snprintf(slave_addr, ADDRESS_LEN, "%s:%s", ip, row[1]);
|
||||
if(slave_addr[0] != '\0') {
|
||||
slave_list = g_list_append(slave_list, strdup(slave_addr));
|
||||
g_debug("add slave %s in list, %d", slave_addr, g_list_length(slave_list));
|
||||
} else {
|
||||
g_debug("get slave address failed. error: %d, text: %s",
|
||||
mysql_errno(conn), mysql_error(conn));
|
||||
}
|
||||
}
|
||||
mysql_free_result(rs_set);
|
||||
}
|
||||
|
||||
backends_num = network_backends_count(bs);
|
||||
for (i = 0; i < backends_num; i++) {
|
||||
network_backend_t *backend = network_backends_get(bs, i);
|
||||
|
||||
char *backend_addr = backend->addr->name->str;
|
||||
|
||||
if(server_group[0] == '\0' && backend->server_group && backend->server_group->len) {
|
||||
snprintf(server_group, 32, "%s", backend->server_group->str);
|
||||
}
|
||||
if(backend->type == BACKEND_TYPE_RW) {
|
||||
has_master++;
|
||||
if(!strcasecmp(backend_addr, master_addr)) {
|
||||
if(backend->state != BACKEND_STATE_UP) {
|
||||
network_backends_modify(bs, i, BACKEND_TYPE_RW, BACKEND_STATE_UP, NO_PREVIOUS_STATE);
|
||||
}
|
||||
break;
|
||||
}
|
||||
GList *it = g_list_find_custom(slave_list, backend_addr, slave_list_compare);
|
||||
if(it) {
|
||||
if(backend->state == BACKEND_STATE_DELETED || backend->state == BACKEND_STATE_MAINTAINING) {
|
||||
network_backends_modify(bs, i, BACKEND_TYPE_RO, BACKEND_STATE_UNKNOWN, NO_PREVIOUS_STATE);
|
||||
} else {
|
||||
network_backends_modify(bs, i, BACKEND_TYPE_RO, backend->state, NO_PREVIOUS_STATE);
|
||||
}
|
||||
slave_list = g_list_remove_link(slave_list, it);
|
||||
g_free(it->data);
|
||||
g_list_free(it);
|
||||
} else {
|
||||
network_backends_modify(bs, i, BACKEND_TYPE_RO, BACKEND_STATE_DELETED, NO_PREVIOUS_STATE);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
backends_num = network_backends_count(bs);
|
||||
for (i = 0; i < backends_num; i++) {
|
||||
network_backend_t *backend = network_backends_get(bs, i);
|
||||
|
||||
char *backend_addr = backend->addr->name->str;
|
||||
|
||||
if(server_group[0] == '\0' && backend->server_group && backend->server_group->len) {
|
||||
snprintf(server_group, 32, "%s", backend->server_group->str);
|
||||
}
|
||||
if(backend->type == BACKEND_TYPE_RO || backend->type == BACKEND_TYPE_UNKNOWN) {
|
||||
GList *it = g_list_find_custom(slave_list, backend_addr, slave_list_compare);
|
||||
if(it) {
|
||||
if(backend->state == BACKEND_STATE_DELETED || backend->state == BACKEND_STATE_MAINTAINING) {
|
||||
network_backends_modify(bs, i, BACKEND_TYPE_RO, BACKEND_STATE_UNKNOWN, NO_PREVIOUS_STATE);
|
||||
}
|
||||
slave_list = g_list_remove_link(slave_list, it);
|
||||
g_free(it->data);
|
||||
g_list_free(it);
|
||||
} else {
|
||||
if(master_addr[0] != '\0' && !strcasecmp(backend_addr, master_addr)) {
|
||||
network_backends_modify(bs, i, BACKEND_TYPE_RW, BACKEND_STATE_UP, NO_PREVIOUS_STATE);
|
||||
has_master++;
|
||||
} else {
|
||||
if(backend->type != BACKEND_TYPE_RO || backend->state != BACKEND_STATE_DELETED) {
|
||||
network_backends_modify(bs, i, BACKEND_TYPE_RO, BACKEND_STATE_DELETED, NO_PREVIOUS_STATE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(!has_master && master_addr[0] != '\0') {
|
||||
if(server_group[0] != '\0') {
|
||||
gchar master_addr_temp[ADDRESS_LEN] = {""};
|
||||
snprintf(master_addr_temp, ADDRESS_LEN, "%s@%s", master_addr, server_group);
|
||||
network_backends_add(bs, master_addr_temp, BACKEND_TYPE_RW, BACKEND_STATE_UP, monitor->chas);
|
||||
} else {
|
||||
network_backends_add(bs, master_addr, BACKEND_TYPE_RW, BACKEND_STATE_UP, monitor->chas);
|
||||
}
|
||||
}
|
||||
|
||||
if(g_list_length(slave_list)) {
|
||||
GList *it = NULL;
|
||||
for(it = slave_list; it; it = it->next) {
|
||||
if(server_group[0] != '\0') {
|
||||
gchar slave_addr_temp[ADDRESS_LEN] = {""};
|
||||
snprintf(slave_addr_temp, ADDRESS_LEN, "%s@%s", (char *)(it->data), server_group);
|
||||
network_backends_add(bs, slave_addr_temp, BACKEND_TYPE_RO, BACKEND_STATE_UNKNOWN, monitor->chas);
|
||||
} else {
|
||||
network_backends_add(bs, it->data, BACKEND_TYPE_RO, BACKEND_STATE_UNKNOWN, monitor->chas);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
g_list_free_full(slave_list, g_free);
|
||||
}
|
||||
|
||||
#define ADD_MONITOR_TIMER(ev_struct, ev_cb, timeout) \
|
||||
evtimer_set(&(monitor->ev_struct), ev_cb, monitor);\
|
||||
event_base_set(monitor->evloop, &(monitor->ev_struct));\
|
||||
@ -137,6 +391,11 @@ check_backend_alive(int fd, short what, void *arg)
|
||||
|
||||
int i;
|
||||
network_backends_t *bs = chas->priv->backends;
|
||||
|
||||
if(chas->group_replication_mode ==1) {
|
||||
group_replication_detect(bs, monitor);
|
||||
}
|
||||
|
||||
for (i = 0; i < network_backends_count(bs); i++) {
|
||||
network_backend_t *backend = network_backends_get(bs, i);
|
||||
backend_state_t oldstate = backend->state;
|
||||
@ -194,6 +453,11 @@ update_master_timestamp(int fd, short what, void *arg)
|
||||
chassis *chas = monitor->chas;
|
||||
int i;
|
||||
network_backends_t *bs = chas->priv->backends;
|
||||
|
||||
if(chas->group_replication_mode ==1) {
|
||||
group_replication_detect(bs, monitor);
|
||||
}
|
||||
|
||||
/* Catch RW time
|
||||
* Need a table to write from master and read from slave.
|
||||
* CREATE TABLE `tb_heartbeat` (
|
||||
|
@ -110,6 +110,8 @@ chassis_new()
|
||||
chas->remote_config_url = NULL;
|
||||
chas->default_file = NULL;
|
||||
|
||||
chas->group_replication_mode = 0;
|
||||
|
||||
return chas;
|
||||
}
|
||||
|
||||
|
@ -191,6 +191,8 @@ struct chassis {
|
||||
char *remote_config_url;
|
||||
gchar *default_file;
|
||||
gint print_version;
|
||||
|
||||
gint group_replication_mode;
|
||||
};
|
||||
|
||||
CHASSIS_API chassis *chassis_new(void);
|
||||
|
@ -1095,6 +1095,50 @@ show_remote_conf_url(gpointer param) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
gchar*
|
||||
show_group_replication_mode(gpointer param) {
|
||||
struct external_param *opt_param = (struct external_param *)param;
|
||||
chassis *srv = opt_param->chas;
|
||||
gint opt_type = opt_param->opt_type;
|
||||
if(CAN_SHOW_OPTS_PROPERTY(opt_type)) {
|
||||
return g_strdup_printf("%d", srv->group_replication_mode);
|
||||
}
|
||||
if(CAN_SAVE_OPTS_PROPERTY(opt_type)) {
|
||||
if(srv->group_replication_mode == 0) {
|
||||
return NULL;
|
||||
} else {
|
||||
return g_strdup_printf("%d", srv->group_replication_mode);
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
gint
|
||||
assign_group_replication(const gchar *newval, gpointer param) {
|
||||
gint ret = ASSIGN_ERROR;
|
||||
struct external_param *opt_param = (struct external_param *)param;
|
||||
chassis *srv = opt_param->chas;
|
||||
gint opt_type = opt_param->opt_type;
|
||||
if(CAN_ASSIGN_OPTS_PROPERTY(opt_type)) {
|
||||
if(NULL != newval) {
|
||||
gint value = 0;
|
||||
if(try_get_int_value(newval, &value)) {
|
||||
if(value == 0 || value == 1) {
|
||||
srv->group_replication_mode = value;
|
||||
ret = ASSIGN_OK;
|
||||
} else {
|
||||
ret = ASSIGN_VALUE_INVALID;
|
||||
}
|
||||
} else {
|
||||
ret = ASSIGN_VALUE_INVALID;
|
||||
}
|
||||
} else {
|
||||
ret = ASSIGN_VALUE_INVALID;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
gint
|
||||
chassis_options_save(GKeyFile *keyfile, chassis_options_t *opts, chassis *chas)
|
||||
{
|
||||
|
@ -90,6 +90,7 @@ CHASSIS_API gchar* show_disable_dns_cache(gpointer param);
|
||||
CHASSIS_API gchar* show_master_preferred(gpointer param);
|
||||
CHASSIS_API gchar* show_max_allowed_packet(gpointer param);
|
||||
CHASSIS_API gchar* show_remote_conf_url(gpointer param);
|
||||
CHASSIS_API gchar* show_group_replication_mode(gpointer param);
|
||||
|
||||
/* assign utils */
|
||||
CHASSIS_API gint assign_log_level(const gchar *newval, gpointer param);
|
||||
@ -108,6 +109,7 @@ CHASSIS_API gint assign_default_query_cache_timeout(const gchar *newval, gpointe
|
||||
CHASSIS_API gint assign_default_client_idle_timeout(const gchar *newval, gpointer param);
|
||||
CHASSIS_API gint assign_long_query_time(const gchar *newval, gpointer param);
|
||||
CHASSIS_API gint assign_max_allowed_packet(const gchar *newval, gpointer param);
|
||||
CHASSIS_API gint assign_group_replication(const gchar *newval, gpointer param);
|
||||
|
||||
CHASSIS_API gint chassis_options_save(GKeyFile *keyfile, chassis_options_t *opts, chassis *chas);
|
||||
|
||||
|
@ -145,6 +145,8 @@ struct chassis_frontend_t {
|
||||
char *default_db;
|
||||
|
||||
char *remote_config_url;
|
||||
|
||||
gint group_replication_mode;
|
||||
};
|
||||
|
||||
/**
|
||||
@ -175,6 +177,8 @@ chassis_frontend_new(void)
|
||||
frontend->long_query_time = MAX_QUERY_TIME;
|
||||
frontend->cetus_max_allowed_packet = MAX_ALLOWED_PACKET_DEFAULT;
|
||||
frontend->disable_dns_cache = 0;
|
||||
|
||||
frontend->group_replication_mode = 0;
|
||||
return frontend;
|
||||
}
|
||||
|
||||
@ -452,6 +456,11 @@ chassis_frontend_set_chassis_options(struct chassis_frontend_t *frontend, chassi
|
||||
0, 0, OPTION_ARG_STRING, &(frontend->remote_config_url),
|
||||
"Remote config url, mysql://xx", "<string>",
|
||||
NULL, show_remote_conf_url, SHOW_OPTS_PROPERTY);
|
||||
chassis_options_add(opts,
|
||||
"group-replication-mode",
|
||||
0, 0, OPTION_ARG_INT, &(frontend->group_replication_mode),
|
||||
"mysql group replication mode, 0:not support(defaults) 1:support single primary mode 2:support multi primary mode(not implement yet)", "<int>",
|
||||
assign_group_replication, show_group_replication_mode, ALL_OPTS_PROPERTY);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -993,6 +1002,12 @@ main_cmdline(int argc, char **argv)
|
||||
}
|
||||
}
|
||||
|
||||
if(frontend->group_replication_mode != 0 && frontend->group_replication_mode != 1) {
|
||||
g_critical("group-replication-mode is invalid, current value is %d", frontend->group_replication_mode);
|
||||
GOTO_EXIT(EXIT_FAILURE);
|
||||
}
|
||||
srv->group_replication_mode = frontend->group_replication_mode;
|
||||
|
||||
/*
|
||||
* log the versions of all loaded plugins
|
||||
*/
|
||||
|
@ -1155,7 +1155,7 @@ network_mysqld_auth_challenge_set_challenge(network_mysqld_auth_challenge *shake
|
||||
}
|
||||
|
||||
shake->auth_plugin_data->len = 21;
|
||||
shake->auth_plugin_data->str[shake->auth_plugin_data->len] = '\0';
|
||||
shake->auth_plugin_data->str[shake->auth_plugin_data->len - 1] = '\0';
|
||||
g_string_assign(shake->auth_plugin_name, "mysql_native_password");
|
||||
}
|
||||
|
||||
|
@ -282,6 +282,9 @@ network_mysqld_con_new()
|
||||
con->read_timeout.tv_sec = 10 * MINUTES;
|
||||
con->read_timeout.tv_usec = 0;
|
||||
|
||||
con->dist_tran_decided_read_timeout.tv_sec = 30;
|
||||
con->dist_tran_decided_read_timeout.tv_usec = 0;
|
||||
|
||||
con->write_timeout.tv_sec = 10 * MINUTES;
|
||||
con->write_timeout.tv_usec = 0;
|
||||
|
||||
@ -2617,7 +2620,14 @@ shard_read_response(network_mysqld_con *con, server_session_t *ss)
|
||||
if (ss->server->to_read == 0) {
|
||||
ss->state = NET_RW_STATE_READ;
|
||||
g_debug("%s:read wait here for con:%p", G_STRLOC, con);
|
||||
server_sess_wait_for_event(ss, EV_READ, &con->read_timeout);
|
||||
if (con->dist_tran_decided) {
|
||||
server_sess_wait_for_event(ss, EV_READ,
|
||||
&con->dist_tran_decided_read_timeout);
|
||||
g_debug("%s:use dist_tran_decided_read_timeout for con:%p",
|
||||
G_STRLOC, con);
|
||||
} else {
|
||||
server_sess_wait_for_event(ss, EV_READ, &con->read_timeout);
|
||||
}
|
||||
return DISP_CONTINUE;
|
||||
}
|
||||
break;
|
||||
@ -2654,7 +2664,13 @@ shard_read_response(network_mysqld_con *con, server_session_t *ss)
|
||||
} else {
|
||||
ss->state = NET_RW_STATE_READ;
|
||||
g_debug("%s:server_sess_wait_for_event for con:%p", G_STRLOC, con);
|
||||
server_sess_wait_for_event(ss, EV_READ, &con->read_timeout);
|
||||
if (con->dist_tran_decided) {
|
||||
server_sess_wait_for_event(ss, EV_READ,
|
||||
&con->dist_tran_decided_read_timeout);
|
||||
g_message("%s:use dist_tran_decided_read_timeout for con:%p", G_STRLOC, con);
|
||||
} else {
|
||||
server_sess_wait_for_event(ss, EV_READ, &con->read_timeout);
|
||||
}
|
||||
con->num_read_pending++;
|
||||
ss->read_cal_flag = 0;
|
||||
g_debug("%s:num_read_pending:%d, ss->index:%d for con:%p",
|
||||
@ -2691,7 +2707,12 @@ shard_read_response(network_mysqld_con *con, server_session_t *ss)
|
||||
ss->state = NET_RW_STATE_READ;
|
||||
g_debug("%s:num_read_pending:%d for fd:%d, ss->index:%d",
|
||||
G_STRLOC, con->num_read_pending, ss->server->fd, ss->index);
|
||||
server_sess_wait_for_event(ss, EV_READ, &con->read_timeout);
|
||||
if (con->dist_tran_decided) {
|
||||
server_sess_wait_for_event(ss, EV_READ, &con->dist_tran_decided_read_timeout);
|
||||
g_message("%s:use dist_tran_decided_read_timeout for con:%p", G_STRLOC, con);
|
||||
} else {
|
||||
server_sess_wait_for_event(ss, EV_READ, &con->read_timeout);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case NETWORK_SOCKET_ERROR:
|
||||
@ -3911,11 +3932,11 @@ network_mysqld_con_handle(int event_fd, short events, void *user_data)
|
||||
case NETWORK_SOCKET_WAIT_FOR_EVENT:
|
||||
if (con->retry_serv_cnt < con->max_retry_serv_cnt) {
|
||||
con->master_conn_shortaged = 1;
|
||||
con->retry_serv_cnt++;
|
||||
con->is_wait_server = 1;
|
||||
if (con->retry_serv_cnt % 8 == 0) {
|
||||
network_connection_pool_create_conn(con);
|
||||
}
|
||||
con->retry_serv_cnt++;
|
||||
struct timeval timeout = network_mysqld_con_retry_timeout(con);
|
||||
|
||||
g_debug(G_STRLOC ": wait again:%d, con:%p, l:%d", con->retry_serv_cnt, con, (int)timeout.tv_usec);
|
||||
@ -4585,7 +4606,7 @@ network_mysqld_self_con_handle(int event_fd, short events, void *user_data)
|
||||
case ST_ASYNC_CONN:
|
||||
switch (network_socket_connect_finish(con->server)) {
|
||||
case NETWORK_SOCKET_SUCCESS:
|
||||
if (con->backend->state != BACKEND_STATE_UP) {
|
||||
if (con->backend->state != BACKEND_STATE_UP && srv->group_replication_mode == 0) {
|
||||
con->backend->state = BACKEND_STATE_UP;
|
||||
g_get_current_time(&(con->backend->state_since));
|
||||
g_message(G_STRLOC ": set backend: %s (%p) up", con->backend->addr->name->str, con->backend);
|
||||
@ -4688,6 +4709,9 @@ network_connection_pool_create_conn(network_mysqld_con *con)
|
||||
if (backend != NULL) {
|
||||
|
||||
if (backend->state != BACKEND_STATE_UP) {
|
||||
if (backend->state != BACKEND_STATE_UNKNOWN) {
|
||||
continue;
|
||||
}
|
||||
if (backend->last_check_time == cur) {
|
||||
g_debug("%s: omit create, backend:%d state:%d", G_STRLOC, i, backend->state);
|
||||
continue;
|
||||
|
@ -640,6 +640,7 @@ struct network_mysqld_con {
|
||||
struct timeval connect_timeout; /* default = 2 s */
|
||||
struct timeval read_timeout; /* default = 10 min */
|
||||
struct timeval write_timeout; /* default = 10 min */
|
||||
struct timeval dist_tran_decided_read_timeout; /* default = 30 sec */
|
||||
struct timeval wait_clt_next_sql;
|
||||
char xid_str[XID_LEN];
|
||||
char last_backends_type[MAX_SERVER_NUM];
|
||||
|
@ -73,20 +73,34 @@ typedef int socklen_t;
|
||||
|
||||
#define MAX_CACHED_ITEMS 65536
|
||||
|
||||
/* judge if client_ip is in ip range of allow or deny ip_table*/
|
||||
/* judge if client_ip_with_username is in allow or deny ip_table*/
|
||||
static gboolean
|
||||
ip_range_lookup(GHashTable *ip_table, char *client_ip)
|
||||
client_ip_table_lookup(GHashTable *ip_table, char *client_ip_with_username)
|
||||
{
|
||||
char ip_range[128] = { 0 };
|
||||
char wildcard[128] = { 0 };
|
||||
char client_user[128] = { 0 };
|
||||
char client_ip[128] = { 0 };
|
||||
sscanf(client_ip_with_username, "%64[a-zA-Z]@%64[0-9.]", client_user, client_ip);
|
||||
GList *ip_range_table = g_hash_table_get_keys(ip_table);
|
||||
GList *l;
|
||||
for (l = ip_range_table; l; l = l->next) {
|
||||
sscanf(l->data, "%64[0-9.].%s", ip_range, wildcard);
|
||||
char address[128] = { 0 };
|
||||
sscanf(l->data, "%64[a-zA-Z@0-9.]", address);
|
||||
gchar *pos = NULL;
|
||||
if ((pos = strcasestr(client_ip, ip_range))) {
|
||||
if(pos == client_ip) {
|
||||
return TRUE;
|
||||
if (strrchr(address, '@') == NULL) {
|
||||
sscanf(address, "%64[0-9.].%s", ip_range, wildcard);
|
||||
if ((pos = strcasestr(client_ip, ip_range))) {
|
||||
if(pos == client_ip) {
|
||||
return TRUE;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
sscanf(address, "%64[a-zA-Z@0-9.].%s", ip_range, wildcard);
|
||||
if ((pos = strcasestr(client_ip_with_username, ip_range))) {
|
||||
if(pos == client_ip_with_username) {
|
||||
return TRUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -198,15 +212,13 @@ do_read_auth(network_mysqld_con *con, GHashTable *allow_ip_table, GHashTable *de
|
||||
char *client_username = con->client->response->username->str;
|
||||
char *client_ip_with_username = g_strdup_printf("%s@%s", client_username, client_ip);
|
||||
char *ip_err_msg = NULL;
|
||||
if ((g_hash_table_size(allow_ip_table) != 0 &&
|
||||
(g_hash_table_lookup(allow_ip_table, client_ip) || g_hash_table_lookup(allow_ip_table, "*") || ip_range_lookup(allow_ip_table, client_ip))) ||
|
||||
g_hash_table_lookup(allow_ip_table, client_ip_with_username)) {
|
||||
if (g_hash_table_size(allow_ip_table) != 0
|
||||
&& (g_hash_table_lookup(allow_ip_table, "*") || client_ip_table_lookup(allow_ip_table, client_ip_with_username))) {
|
||||
check_ip = FALSE;
|
||||
} else if ((g_hash_table_size(deny_ip_table) != 0 &&
|
||||
(g_hash_table_lookup(deny_ip_table, client_ip) || g_hash_table_lookup(deny_ip_table, "*") || ip_range_lookup(deny_ip_table, client_ip))) ||
|
||||
g_hash_table_lookup(deny_ip_table, client_ip_with_username)) {
|
||||
} else if (g_hash_table_size(deny_ip_table) != 0
|
||||
&& (g_hash_table_lookup(deny_ip_table, "*") || client_ip_table_lookup(deny_ip_table, client_ip_with_username))) {
|
||||
check_ip = TRUE;
|
||||
ip_err_msg = g_strdup_printf("Access denied for user '%s'@'%s'", client_username, client_ip);
|
||||
ip_err_msg = g_strdup_printf("Access denied for user '%s'", client_ip_with_username);
|
||||
} else {
|
||||
check_ip = FALSE;
|
||||
}
|
||||
@ -441,7 +453,7 @@ plugin_add_backends(chassis *chas, gchar **backend_addresses, gchar **read_only_
|
||||
|
||||
GPtrArray *backends_arr = g->backends->backends;
|
||||
for (i = 0; backend_addresses[i]; i++) {
|
||||
if (-1 == network_backends_add(g->backends, backend_addresses[i], BACKEND_TYPE_RW, BACKEND_STATE_DOWN, chas)) {
|
||||
if (-1 == network_backends_add(g->backends, backend_addresses[i], BACKEND_TYPE_RW, BACKEND_STATE_UNKNOWN, chas)) {
|
||||
return -1;
|
||||
}
|
||||
network_backend_init_extra(backends_arr->pdata[backends_arr->len - 1], chas);
|
||||
@ -449,7 +461,7 @@ plugin_add_backends(chassis *chas, gchar **backend_addresses, gchar **read_only_
|
||||
|
||||
for (i = 0; read_only_backend_addresses && read_only_backend_addresses[i]; i++) {
|
||||
if (-1 == network_backends_add(g->backends,
|
||||
read_only_backend_addresses[i], BACKEND_TYPE_RO, BACKEND_STATE_DOWN, chas)) {
|
||||
read_only_backend_addresses[i], BACKEND_TYPE_RO, BACKEND_STATE_UNKNOWN, chas)) {
|
||||
return -1;
|
||||
}
|
||||
/* set conn-pool config */
|
||||
|
Loading…
Reference in New Issue
Block a user