Merge pull request #45 from tsthght/feature/mgr1

support single primary mode mgr
This commit is contained in:
tsthght 2018-05-09 10:21:21 +08:00 committed by GitHub
commit 9e41f84163
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 325 additions and 1 deletions

View File

@ -40,10 +40,14 @@
#include "glib-ext.h"
#include "sharding-config.h"
#include <netdb.h>
#define CHECK_ALIVE_INTERVAL 3
#define CHECK_ALIVE_TIMES 2
#define CHECK_DELAY_INTERVAL 300 * 1000 /* 300ms */
#define ADDRESS_LEN 64
/* Each backend should have db <proxy_heart_beat> and table <tb_heartbeat> */
#define HEARTBEAT_DB "proxy_heart_beat"
@ -124,6 +128,255 @@ get_mysql_connection(cetus_monitor_t *monitor, char *addr)
return conn;
}
static gint
get_ip_by_name(const gchar *name, gchar *ip) {
if(ip == NULL || name == NULL) return -1;
char **pptr;
struct hostent *hptr;
hptr = gethostbyname(name);
if(hptr == NULL) {
g_debug("gethostbyname failed.");
return -1;
}
for(pptr = hptr->h_addr_list; *pptr != NULL; pptr++) {
if(inet_ntop(hptr->h_addrtype, *pptr, ip, ADDRESS_LEN)) {
return 0;
}
}
return -1;
}
static gint
slave_list_compare(gconstpointer a, gconstpointer b) {
gchar *old_value = (gchar *)a;
gchar *search_value = (gchar *)b;
return strcasecmp(old_value, search_value);
}
static void
group_replication_detect(network_backends_t *bs, cetus_monitor_t *monitor)
{
if(bs == NULL) return ;
GList *slave_list = NULL;
gchar master_addr[ADDRESS_LEN] = {""};
gchar slave_addr[ADDRESS_LEN] = {""};
gchar server_group[64] = {""};
guint has_master = 0;
guint i = 0;
guint backends_num = 0;
gchar *sql1 = "SELECT `MEMBER_HOST`, `MEMBER_PORT` FROM "
"performance_schema.replication_group_members "
"WHERE MEMBER_STATE = 'ONLINE' AND MEMBER_ID = "
"(SELECT VARIABLE_VALUE FROM performance_schema.global_status "
"WHERE VARIABLE_NAME = 'group_replication_primary_member') ";
gchar *sql2 = "SELECT `MEMBER_HOST`, `MEMBER_PORT` FROM "
"performance_schema.replication_group_members "
"WHERE MEMBER_STATE = 'ONLINE' AND MEMBER_ID <> "
"(SELECT VARIABLE_VALUE FROM performance_schema.global_status "
"WHERE VARIABLE_NAME = 'group_replication_primary_member') ";
backends_num = network_backends_count(bs);
for (i = 0; i < backends_num; i++) {
network_backend_t *backend = network_backends_get(bs, i);
if (backend->state == BACKEND_STATE_MAINTAINING)
continue;
char *backend_addr = backend->addr->name->str;
MYSQL *conn = get_mysql_connection(monitor, backend_addr);
int result = 0;
MYSQL_RES *rs_set = NULL;
MYSQL_ROW row = NULL;
gchar ip[ADDRESS_LEN] = {""};
gchar old_master[ADDRESS_LEN] = {""};
if(conn == NULL) {
g_debug("get connection failed. error: %d, text: %s, backend: %s",
mysql_errno(conn), mysql_error(conn), backend_addr);
continue;
}
if(mysql_real_query(conn, L(sql1))) {
g_debug("select primary info failed for group_replication. error: %d, text: %s, backend: %s",
mysql_errno(conn), mysql_error(conn), backend_addr);
continue;
}
rs_set = mysql_store_result(conn);
if(rs_set == NULL) {
g_debug("get primary info result set failed for group_replication. error: %d, text: %s, backend: %s",
mysql_errno(conn), mysql_error(conn), backend_addr);
continue;
}
row = mysql_fetch_row(rs_set);
if(row == NULL || row[0] == NULL || row[1] == NULL) {
g_debug("get primary info rows failed for group_replication. error: %d, text: %s, backend: %s",
mysql_errno(conn), mysql_error(conn), backend_addr);
mysql_free_result(rs_set);
continue;
}
if((get_ip_by_name(row[0], ip) != 0) || ip[0] == '\0') {
g_debug("get master ip by name failed. error: %d, text: %s, backend: %s",
mysql_errno(conn), mysql_error(conn), backend_addr);
mysql_free_result(rs_set);
continue;
}
memcpy(old_master, master_addr, strlen(master_addr));
snprintf(master_addr, ADDRESS_LEN, "%s:%s", ip, row[1]);
if(old_master[0] != '\0' && strcasecmp(old_master, master_addr) != 0) {
g_warning("exists more than one masters.");
return ;
} else if (old_master[0] != '\0' && strcasecmp(old_master, master_addr) == 0) {
continue;
}
mysql_free_result(rs_set);
rs_set = NULL;
if(master_addr[0] == '\0') {
g_debug("get master address failed. error: %d, text: %s, backend: %s",
mysql_errno(conn), mysql_error(conn), backend_addr);
continue;
}
if(strcasecmp(backend_addr, master_addr)) {
conn = get_mysql_connection(monitor, master_addr);
if(conn == NULL) {
g_debug("get connection failed. error: %d, text: %s, backend: %s",
mysql_errno(conn), mysql_error(conn), master_addr);
continue;
}
}
if(mysql_real_query(conn, L(sql2))) {
g_debug("select slave info failed for group_replication. error: %d, text: %s, backend: %s",
mysql_errno(conn), mysql_error(conn), master_addr);
continue;
}
rs_set = mysql_store_result(conn);
if(rs_set == NULL) {
g_debug("get slave info result set failed for group_replication. error: %d, text: %s",
mysql_errno(conn), mysql_error(conn));
continue;
}
while(row=mysql_fetch_row(rs_set)) {
memset(ip, 0, ADDRESS_LEN);
if((get_ip_by_name(row[0], ip) != 0) || ip[0] == '\0') {
g_debug("get slave ip by name failed. error: %d, text: %s",
mysql_errno(conn), mysql_error(conn));
mysql_free_result(rs_set);
continue;
}
memset(slave_addr, 0, ADDRESS_LEN);
snprintf(slave_addr, ADDRESS_LEN, "%s:%s", ip, row[1]);
if(slave_addr[0] != '\0') {
slave_list = g_list_append(slave_list, strdup(slave_addr));
g_debug("add slave %s in list, %d", slave_addr, g_list_length(slave_list));
} else {
g_debug("get slave address failed. error: %d, text: %s",
mysql_errno(conn), mysql_error(conn));
}
}
mysql_free_result(rs_set);
}
backends_num = network_backends_count(bs);
for (i = 0; i < backends_num; i++) {
network_backend_t *backend = network_backends_get(bs, i);
char *backend_addr = backend->addr->name->str;
if(server_group[0] == '\0' && backend->server_group && backend->server_group->len) {
snprintf(server_group, 32, "%s", backend->server_group->str);
}
if(backend->type == BACKEND_TYPE_RW) {
has_master++;
if(!strcasecmp(backend_addr, master_addr)) {
if(backend->state != BACKEND_STATE_UP) {
network_backends_modify(bs, i, BACKEND_TYPE_RW, BACKEND_STATE_UP);
}
break;
}
GList *it = g_list_find_custom(slave_list, backend_addr, slave_list_compare);
if(it) {
if(backend->state == BACKEND_STATE_DELETED || backend->state == BACKEND_STATE_MAINTAINING) {
network_backends_modify(bs, i, BACKEND_TYPE_RO, BACKEND_STATE_UNKNOWN);
} else {
network_backends_modify(bs, i, BACKEND_TYPE_RO, backend->state);
}
slave_list = g_list_remove_link(slave_list, it);
g_free(it->data);
g_list_free(it);
} else {
network_backends_modify(bs, i, BACKEND_TYPE_RO, BACKEND_STATE_DELETED);
}
break;
}
}
backends_num = network_backends_count(bs);
for (i = 0; i < backends_num; i++) {
network_backend_t *backend = network_backends_get(bs, i);
char *backend_addr = backend->addr->name->str;
if(server_group[0] == '\0' && backend->server_group && backend->server_group->len) {
snprintf(server_group, 32, "%s", backend->server_group->str);
}
if(backend->type == BACKEND_TYPE_RO || backend->type == BACKEND_TYPE_UNKNOWN) {
GList *it = g_list_find_custom(slave_list, backend_addr, slave_list_compare);
if(it) {
if(backend->state == BACKEND_STATE_DELETED || backend->state == BACKEND_STATE_MAINTAINING) {
network_backends_modify(bs, i, BACKEND_TYPE_RO, BACKEND_STATE_UNKNOWN);
}
slave_list = g_list_remove_link(slave_list, it);
g_free(it->data);
g_list_free(it);
} else {
if(master_addr[0] != '\0' && !strcasecmp(backend_addr, master_addr)) {
network_backends_modify(bs, i, BACKEND_TYPE_RW, BACKEND_STATE_UP);
has_master++;
} else {
if(backend->type != BACKEND_TYPE_RO || backend->state != BACKEND_STATE_DELETED) {
network_backends_modify(bs, i, BACKEND_TYPE_RO, BACKEND_STATE_DELETED);
}
}
}
}
}
if(!has_master && master_addr[0] != '\0') {
if(server_group[0] != '\0') {
gchar master_addr_temp[ADDRESS_LEN] = {""};
snprintf(master_addr_temp, ADDRESS_LEN, "%s@%s", master_addr, server_group);
network_backends_add(bs, master_addr_temp, BACKEND_TYPE_RW, BACKEND_STATE_UP, monitor->chas);
} else {
network_backends_add(bs, master_addr, BACKEND_TYPE_RW, BACKEND_STATE_UP, monitor->chas);
}
}
if(g_list_length(slave_list)) {
GList *it = NULL;
for(it = slave_list; it; it = it->next) {
if(server_group[0] != '\0') {
gchar slave_addr_temp[ADDRESS_LEN] = {""};
snprintf(slave_addr_temp, ADDRESS_LEN, "%s@%s", it->data, server_group);
network_backends_add(bs, slave_addr_temp, BACKEND_TYPE_RO, BACKEND_STATE_UNKNOWN, monitor->chas);
} else {
network_backends_add(bs, it->data, BACKEND_TYPE_RO, BACKEND_STATE_UNKNOWN, monitor->chas);
}
}
}
g_list_free_full(slave_list, g_free);
}
#define ADD_MONITOR_TIMER(ev_struct, ev_cb, timeout) \
evtimer_set(&(monitor->ev_struct), ev_cb, monitor);\
event_base_set(monitor->evloop, &(monitor->ev_struct));\
@ -137,6 +390,11 @@ check_backend_alive(int fd, short what, void *arg)
int i;
network_backends_t *bs = chas->priv->backends;
if(chas->group_replication_mode ==1) {
group_replication_detect(bs, monitor);
}
for (i = 0; i < network_backends_count(bs); i++) {
network_backend_t *backend = network_backends_get(bs, i);
if (backend->state == BACKEND_STATE_DELETED || backend->state == BACKEND_STATE_MAINTAINING)
@ -184,6 +442,11 @@ update_master_timestamp(int fd, short what, void *arg)
chassis *chas = monitor->chas;
int i;
network_backends_t *bs = chas->priv->backends;
if(chas->group_replication_mode ==1) {
group_replication_detect(bs, monitor);
}
/* Catch RW time
* Need a table to write from master and read from slave.
* CREATE TABLE `tb_heartbeat` (

View File

@ -110,6 +110,8 @@ chassis_new()
chas->remote_config_url = NULL;
chas->default_file = NULL;
chas->group_replication_mode = 0;
return chas;
}

View File

@ -189,6 +189,8 @@ struct chassis {
char *remote_config_url;
gchar *default_file;
gint print_version;
gint group_replication_mode;
};
CHASSIS_API chassis *chassis_new(void);

View File

@ -1003,6 +1003,46 @@ show_remote_conf_url(gpointer param) {
return NULL;
}
gchar*
show_group_replication_mode(gpointer param) {
struct external_param *opt_param = (struct external_param *)param;
chassis *srv = opt_param->chas;
gint opt_type = opt_param->opt_type;
if(CAN_SHOW_OPTS_PROPERTY(opt_type)) {
return g_strdup_printf("%d", srv->group_replication_mode);
}
if(CAN_SAVE_OPTS_PROPERTY(opt_type)) {
return g_strdup_printf("%d", srv->group_replication_mode);
}
return NULL;
}
gint
assign_group_replication(const gchar *newval, gpointer param) {
gint ret = ASSIGN_ERROR;
struct external_param *opt_param = (struct external_param *)param;
chassis *srv = opt_param->chas;
gint opt_type = opt_param->opt_type;
if(CAN_ASSIGN_OPTS_PROPERTY(opt_type)) {
if(NULL != newval) {
gint value = 0;
if(try_get_int_value(newval, &value)) {
if(value == 0 || value == 1) {
srv->group_replication_mode = value;
ret = ASSIGN_OK;
} else {
ret = ASSIGN_VALUE_INVALID;
}
} else {
ret = ASSIGN_VALUE_INVALID;
}
} else {
ret = ASSIGN_VALUE_INVALID;
}
}
return ret;
}
gint
chassis_options_save(GKeyFile *keyfile, chassis_options_t *opts, chassis *chas)
{

View File

@ -89,6 +89,7 @@ CHASSIS_API gchar* show_disable_dns_cache(gpointer param);
CHASSIS_API gchar* show_master_preferred(gpointer param);
CHASSIS_API gchar* show_max_allowed_packet(gpointer param);
CHASSIS_API gchar* show_remote_conf_url(gpointer param);
CHASSIS_API gchar* show_group_replication_mode(gpointer param);
/* assign utils */
CHASSIS_API gint assign_log_level(const gchar *newval, gpointer param);
@ -106,6 +107,7 @@ CHASSIS_API gint assign_slave_delay_down(const gchar *newval, gpointer param);
CHASSIS_API gint assign_default_query_cache_timeout(const gchar *newval, gpointer param);
CHASSIS_API gint assign_long_query_time(const gchar *newval, gpointer param);
CHASSIS_API gint assign_max_allowed_packet(const gchar *newval, gpointer param);
CHASSIS_API gint assign_group_replication(const gchar *newval, gpointer param);
CHASSIS_API gint chassis_options_save(GKeyFile *keyfile, chassis_options_t *opts, chassis *chas);

View File

@ -143,6 +143,8 @@ struct chassis_frontend_t {
char *default_db;
char *remote_config_url;
gint group_replication_mode;
};
/**
@ -172,6 +174,8 @@ chassis_frontend_new(void)
frontend->long_query_time = MAX_QUERY_TIME;
frontend->cetus_max_allowed_packet = MAX_ALLOWED_PACKET_DEFAULT;
frontend->disable_dns_cache = 0;
frontend->group_replication_mode = 0;
return frontend;
}
@ -437,6 +441,11 @@ chassis_frontend_set_chassis_options(struct chassis_frontend_t *frontend, chassi
0, 0, OPTION_ARG_STRING, &(frontend->remote_config_url),
"Remote config url, mysql://xx", "<string>",
NULL, show_remote_conf_url, SHOW_OPTS_PROPERTY);
chassis_options_add(opts,
"group-replication-mode",
0, 0, OPTION_ARG_INT, &(frontend->group_replication_mode),
"mysql group replication mode, 0:not support(defaults) 1:support single primary mode 2:support multi primary mode(not implement yet)", "<int>",
assign_group_replication, show_group_replication_mode, ALL_OPTS_PROPERTY);
return 0;
}
@ -793,6 +802,12 @@ main_cmdline(int argc, char **argv)
}
}
if(frontend->group_replication_mode != 0 && frontend->group_replication_mode != 1) {
g_critical("group-replication-mode is invalid, current value is %d", frontend->group_replication_mode);
GOTO_EXIT(EXIT_FAILURE);
}
srv->group_replication_mode = frontend->group_replication_mode;
if (chassis_frontend_init_basedir(argv[0], &(frontend->base_dir))) {
GOTO_EXIT(EXIT_FAILURE);
}

View File

@ -4434,7 +4434,7 @@ network_mysqld_self_con_handle(int event_fd, short events, void *user_data)
case ST_ASYNC_CONN:
switch (network_socket_connect_finish(con->server)) {
case NETWORK_SOCKET_SUCCESS:
if (con->backend->state != BACKEND_STATE_UP) {
if (con->backend->state != BACKEND_STATE_UP && srv->group_replication_mode == 0) {
con->backend->state = BACKEND_STATE_UP;
g_get_current_time(&(con->backend->state_since));
g_message(G_STRLOC ": set backend: %s (%p) up", con->backend->addr->name->str, con->backend);