From 46251ce2e0d358cb9b20aaedc6ce1140293bdeb7 Mon Sep 17 00:00:00 2001 From: tsthght Date: Wed, 9 May 2018 10:18:35 +0800 Subject: [PATCH] support single primary mode mgr --- src/cetus-monitor.c | 263 ++++++++++++++++++++++++++++++++++++ src/chassis-mainloop.c | 2 + src/chassis-mainloop.h | 2 + src/chassis-options-utils.c | 40 ++++++ src/chassis-options-utils.h | 2 + src/mysql-proxy-cli.c | 15 ++ src/network-mysqld.c | 2 +- 7 files changed, 325 insertions(+), 1 deletion(-) diff --git a/src/cetus-monitor.c b/src/cetus-monitor.c index 15d018c..a07d7f9 100644 --- a/src/cetus-monitor.c +++ b/src/cetus-monitor.c @@ -40,10 +40,14 @@ #include "glib-ext.h" #include "sharding-config.h" +#include + #define CHECK_ALIVE_INTERVAL 3 #define CHECK_ALIVE_TIMES 2 #define CHECK_DELAY_INTERVAL 300 * 1000 /* 300ms */ +#define ADDRESS_LEN 64 + /* Each backend should have db and table */ #define HEARTBEAT_DB "proxy_heart_beat" @@ -124,6 +128,255 @@ get_mysql_connection(cetus_monitor_t *monitor, char *addr) return conn; } +static gint +get_ip_by_name(const gchar *name, gchar *ip) { + if(ip == NULL || name == NULL) return -1; + char **pptr; + struct hostent *hptr; + hptr = gethostbyname(name); + if(hptr == NULL) { + g_debug("gethostbyname failed."); + return -1; + } + for(pptr = hptr->h_addr_list; *pptr != NULL; pptr++) { + if(inet_ntop(hptr->h_addrtype, *pptr, ip, ADDRESS_LEN)) { + return 0; + } + } + return -1; +} + +static gint +slave_list_compare(gconstpointer a, gconstpointer b) { + gchar *old_value = (gchar *)a; + gchar *search_value = (gchar *)b; + return strcasecmp(old_value, search_value); +} + +static void +group_replication_detect(network_backends_t *bs, cetus_monitor_t *monitor) +{ + if(bs == NULL) return ; + + GList *slave_list = NULL; + gchar master_addr[ADDRESS_LEN] = {""}; + gchar slave_addr[ADDRESS_LEN] = {""}; + gchar server_group[64] = {""}; + guint has_master = 0; + guint i = 0; + guint backends_num = 0; + + gchar *sql1 = "SELECT `MEMBER_HOST`, `MEMBER_PORT` FROM " + "performance_schema.replication_group_members " + "WHERE MEMBER_STATE = 'ONLINE' AND MEMBER_ID = " + "(SELECT VARIABLE_VALUE FROM performance_schema.global_status " + "WHERE VARIABLE_NAME = 'group_replication_primary_member') "; + gchar *sql2 = "SELECT `MEMBER_HOST`, `MEMBER_PORT` FROM " + "performance_schema.replication_group_members " + "WHERE MEMBER_STATE = 'ONLINE' AND MEMBER_ID <> " + "(SELECT VARIABLE_VALUE FROM performance_schema.global_status " + "WHERE VARIABLE_NAME = 'group_replication_primary_member') "; + + backends_num = network_backends_count(bs); + for (i = 0; i < backends_num; i++) { + network_backend_t *backend = network_backends_get(bs, i); + if (backend->state == BACKEND_STATE_MAINTAINING) + continue; + + char *backend_addr = backend->addr->name->str; + MYSQL *conn = get_mysql_connection(monitor, backend_addr); + int result = 0; + MYSQL_RES *rs_set = NULL; + MYSQL_ROW row = NULL; + gchar ip[ADDRESS_LEN] = {""}; + gchar old_master[ADDRESS_LEN] = {""}; + + if(conn == NULL) { + g_debug("get connection failed. error: %d, text: %s, backend: %s", + mysql_errno(conn), mysql_error(conn), backend_addr); + continue; + } + + if(mysql_real_query(conn, L(sql1))) { + g_debug("select primary info failed for group_replication. error: %d, text: %s, backend: %s", + mysql_errno(conn), mysql_error(conn), backend_addr); + continue; + } + + rs_set = mysql_store_result(conn); + if(rs_set == NULL) { + g_debug("get primary info result set failed for group_replication. error: %d, text: %s, backend: %s", + mysql_errno(conn), mysql_error(conn), backend_addr); + continue; + } + + row = mysql_fetch_row(rs_set); + if(row == NULL || row[0] == NULL || row[1] == NULL) { + g_debug("get primary info rows failed for group_replication. error: %d, text: %s, backend: %s", + mysql_errno(conn), mysql_error(conn), backend_addr); + mysql_free_result(rs_set); + continue; + } + + if((get_ip_by_name(row[0], ip) != 0) || ip[0] == '\0') { + g_debug("get master ip by name failed. error: %d, text: %s, backend: %s", + mysql_errno(conn), mysql_error(conn), backend_addr); + mysql_free_result(rs_set); + continue; + } + + memcpy(old_master, master_addr, strlen(master_addr)); + snprintf(master_addr, ADDRESS_LEN, "%s:%s", ip, row[1]); + + if(old_master[0] != '\0' && strcasecmp(old_master, master_addr) != 0) { + g_warning("exists more than one masters."); + return ; + } else if (old_master[0] != '\0' && strcasecmp(old_master, master_addr) == 0) { + continue; + } + + mysql_free_result(rs_set); + rs_set = NULL; + + if(master_addr[0] == '\0') { + g_debug("get master address failed. error: %d, text: %s, backend: %s", + mysql_errno(conn), mysql_error(conn), backend_addr); + continue; + } + + if(strcasecmp(backend_addr, master_addr)) { + conn = get_mysql_connection(monitor, master_addr); + if(conn == NULL) { + g_debug("get connection failed. error: %d, text: %s, backend: %s", + mysql_errno(conn), mysql_error(conn), master_addr); + continue; + } + } + + if(mysql_real_query(conn, L(sql2))) { + g_debug("select slave info failed for group_replication. error: %d, text: %s, backend: %s", + mysql_errno(conn), mysql_error(conn), master_addr); + continue; + } + + rs_set = mysql_store_result(conn); + if(rs_set == NULL) { + g_debug("get slave info result set failed for group_replication. error: %d, text: %s", + mysql_errno(conn), mysql_error(conn)); + continue; + } + while(row=mysql_fetch_row(rs_set)) { + memset(ip, 0, ADDRESS_LEN); + if((get_ip_by_name(row[0], ip) != 0) || ip[0] == '\0') { + g_debug("get slave ip by name failed. error: %d, text: %s", + mysql_errno(conn), mysql_error(conn)); + mysql_free_result(rs_set); + continue; + } + memset(slave_addr, 0, ADDRESS_LEN); + snprintf(slave_addr, ADDRESS_LEN, "%s:%s", ip, row[1]); + if(slave_addr[0] != '\0') { + slave_list = g_list_append(slave_list, strdup(slave_addr)); + g_debug("add slave %s in list, %d", slave_addr, g_list_length(slave_list)); + } else { + g_debug("get slave address failed. error: %d, text: %s", + mysql_errno(conn), mysql_error(conn)); + } + } + mysql_free_result(rs_set); + } + + backends_num = network_backends_count(bs); + for (i = 0; i < backends_num; i++) { + network_backend_t *backend = network_backends_get(bs, i); + + char *backend_addr = backend->addr->name->str; + + if(server_group[0] == '\0' && backend->server_group && backend->server_group->len) { + snprintf(server_group, 32, "%s", backend->server_group->str); + } + if(backend->type == BACKEND_TYPE_RW) { + has_master++; + if(!strcasecmp(backend_addr, master_addr)) { + if(backend->state != BACKEND_STATE_UP) { + network_backends_modify(bs, i, BACKEND_TYPE_RW, BACKEND_STATE_UP); + } + break; + } + GList *it = g_list_find_custom(slave_list, backend_addr, slave_list_compare); + if(it) { + if(backend->state == BACKEND_STATE_DELETED || backend->state == BACKEND_STATE_MAINTAINING) { + network_backends_modify(bs, i, BACKEND_TYPE_RO, BACKEND_STATE_UNKNOWN); + } else { + network_backends_modify(bs, i, BACKEND_TYPE_RO, backend->state); + } + slave_list = g_list_remove_link(slave_list, it); + g_free(it->data); + g_list_free(it); + } else { + network_backends_modify(bs, i, BACKEND_TYPE_RO, BACKEND_STATE_DELETED); + } + break; + } + } + + backends_num = network_backends_count(bs); + for (i = 0; i < backends_num; i++) { + network_backend_t *backend = network_backends_get(bs, i); + + char *backend_addr = backend->addr->name->str; + + if(server_group[0] == '\0' && backend->server_group && backend->server_group->len) { + snprintf(server_group, 32, "%s", backend->server_group->str); + } + if(backend->type == BACKEND_TYPE_RO || backend->type == BACKEND_TYPE_UNKNOWN) { + GList *it = g_list_find_custom(slave_list, backend_addr, slave_list_compare); + if(it) { + if(backend->state == BACKEND_STATE_DELETED || backend->state == BACKEND_STATE_MAINTAINING) { + network_backends_modify(bs, i, BACKEND_TYPE_RO, BACKEND_STATE_UNKNOWN); + } + slave_list = g_list_remove_link(slave_list, it); + g_free(it->data); + g_list_free(it); + } else { + if(master_addr[0] != '\0' && !strcasecmp(backend_addr, master_addr)) { + network_backends_modify(bs, i, BACKEND_TYPE_RW, BACKEND_STATE_UP); + has_master++; + } else { + if(backend->type != BACKEND_TYPE_RO || backend->state != BACKEND_STATE_DELETED) { + network_backends_modify(bs, i, BACKEND_TYPE_RO, BACKEND_STATE_DELETED); + } + } + } + } + } + + if(!has_master && master_addr[0] != '\0') { + if(server_group[0] != '\0') { + gchar master_addr_temp[ADDRESS_LEN] = {""}; + snprintf(master_addr_temp, ADDRESS_LEN, "%s@%s", master_addr, server_group); + network_backends_add(bs, master_addr_temp, BACKEND_TYPE_RW, BACKEND_STATE_UP, monitor->chas); + } else { + network_backends_add(bs, master_addr, BACKEND_TYPE_RW, BACKEND_STATE_UP, monitor->chas); + } + } + + if(g_list_length(slave_list)) { + GList *it = NULL; + for(it = slave_list; it; it = it->next) { + if(server_group[0] != '\0') { + gchar slave_addr_temp[ADDRESS_LEN] = {""}; + snprintf(slave_addr_temp, ADDRESS_LEN, "%s@%s", it->data, server_group); + network_backends_add(bs, slave_addr_temp, BACKEND_TYPE_RO, BACKEND_STATE_UNKNOWN, monitor->chas); + } else { + network_backends_add(bs, it->data, BACKEND_TYPE_RO, BACKEND_STATE_UNKNOWN, monitor->chas); + } + } + } + + g_list_free_full(slave_list, g_free); +} + #define ADD_MONITOR_TIMER(ev_struct, ev_cb, timeout) \ evtimer_set(&(monitor->ev_struct), ev_cb, monitor);\ event_base_set(monitor->evloop, &(monitor->ev_struct));\ @@ -137,6 +390,11 @@ check_backend_alive(int fd, short what, void *arg) int i; network_backends_t *bs = chas->priv->backends; + + if(chas->group_replication_mode ==1) { + group_replication_detect(bs, monitor); + } + for (i = 0; i < network_backends_count(bs); i++) { network_backend_t *backend = network_backends_get(bs, i); if (backend->state == BACKEND_STATE_DELETED || backend->state == BACKEND_STATE_MAINTAINING) @@ -184,6 +442,11 @@ update_master_timestamp(int fd, short what, void *arg) chassis *chas = monitor->chas; int i; network_backends_t *bs = chas->priv->backends; + + if(chas->group_replication_mode ==1) { + group_replication_detect(bs, monitor); + } + /* Catch RW time * Need a table to write from master and read from slave. * CREATE TABLE `tb_heartbeat` ( diff --git a/src/chassis-mainloop.c b/src/chassis-mainloop.c index e7dab88..d45b64b 100644 --- a/src/chassis-mainloop.c +++ b/src/chassis-mainloop.c @@ -110,6 +110,8 @@ chassis_new() chas->remote_config_url = NULL; chas->default_file = NULL; + chas->group_replication_mode = 0; + return chas; } diff --git a/src/chassis-mainloop.h b/src/chassis-mainloop.h index ef0ae70..6a846e2 100644 --- a/src/chassis-mainloop.h +++ b/src/chassis-mainloop.h @@ -189,6 +189,8 @@ struct chassis { char *remote_config_url; gchar *default_file; gint print_version; + + gint group_replication_mode; }; CHASSIS_API chassis *chassis_new(void); diff --git a/src/chassis-options-utils.c b/src/chassis-options-utils.c index aff573b..92043d7 100644 --- a/src/chassis-options-utils.c +++ b/src/chassis-options-utils.c @@ -1003,6 +1003,46 @@ show_remote_conf_url(gpointer param) { return NULL; } +gchar* +show_group_replication_mode(gpointer param) { + struct external_param *opt_param = (struct external_param *)param; + chassis *srv = opt_param->chas; + gint opt_type = opt_param->opt_type; + if(CAN_SHOW_OPTS_PROPERTY(opt_type)) { + return g_strdup_printf("%d", srv->group_replication_mode); + } + if(CAN_SAVE_OPTS_PROPERTY(opt_type)) { + return g_strdup_printf("%d", srv->group_replication_mode); + } + return NULL; +} + +gint +assign_group_replication(const gchar *newval, gpointer param) { + gint ret = ASSIGN_ERROR; + struct external_param *opt_param = (struct external_param *)param; + chassis *srv = opt_param->chas; + gint opt_type = opt_param->opt_type; + if(CAN_ASSIGN_OPTS_PROPERTY(opt_type)) { + if(NULL != newval) { + gint value = 0; + if(try_get_int_value(newval, &value)) { + if(value == 0 || value == 1) { + srv->group_replication_mode = value; + ret = ASSIGN_OK; + } else { + ret = ASSIGN_VALUE_INVALID; + } + } else { + ret = ASSIGN_VALUE_INVALID; + } + } else { + ret = ASSIGN_VALUE_INVALID; + } + } + return ret; +} + gint chassis_options_save(GKeyFile *keyfile, chassis_options_t *opts, chassis *chas) { diff --git a/src/chassis-options-utils.h b/src/chassis-options-utils.h index eac6507..4a722d1 100644 --- a/src/chassis-options-utils.h +++ b/src/chassis-options-utils.h @@ -89,6 +89,7 @@ CHASSIS_API gchar* show_disable_dns_cache(gpointer param); CHASSIS_API gchar* show_master_preferred(gpointer param); CHASSIS_API gchar* show_max_allowed_packet(gpointer param); CHASSIS_API gchar* show_remote_conf_url(gpointer param); +CHASSIS_API gchar* show_group_replication_mode(gpointer param); /* assign utils */ CHASSIS_API gint assign_log_level(const gchar *newval, gpointer param); @@ -106,6 +107,7 @@ CHASSIS_API gint assign_slave_delay_down(const gchar *newval, gpointer param); CHASSIS_API gint assign_default_query_cache_timeout(const gchar *newval, gpointer param); CHASSIS_API gint assign_long_query_time(const gchar *newval, gpointer param); CHASSIS_API gint assign_max_allowed_packet(const gchar *newval, gpointer param); +CHASSIS_API gint assign_group_replication(const gchar *newval, gpointer param); CHASSIS_API gint chassis_options_save(GKeyFile *keyfile, chassis_options_t *opts, chassis *chas); diff --git a/src/mysql-proxy-cli.c b/src/mysql-proxy-cli.c index 635a426..17347e7 100644 --- a/src/mysql-proxy-cli.c +++ b/src/mysql-proxy-cli.c @@ -143,6 +143,8 @@ struct chassis_frontend_t { char *default_db; char *remote_config_url; + + gint group_replication_mode; }; /** @@ -172,6 +174,8 @@ chassis_frontend_new(void) frontend->long_query_time = MAX_QUERY_TIME; frontend->cetus_max_allowed_packet = MAX_ALLOWED_PACKET_DEFAULT; frontend->disable_dns_cache = 0; + + frontend->group_replication_mode = 0; return frontend; } @@ -437,6 +441,11 @@ chassis_frontend_set_chassis_options(struct chassis_frontend_t *frontend, chassi 0, 0, OPTION_ARG_STRING, &(frontend->remote_config_url), "Remote config url, mysql://xx", "", NULL, show_remote_conf_url, SHOW_OPTS_PROPERTY); + chassis_options_add(opts, + "group-replication-mode", + 0, 0, OPTION_ARG_INT, &(frontend->group_replication_mode), + "mysql group replication mode, 0:not support(defaults) 1:support single primary mode 2:support multi primary mode(not implement yet)", "", + assign_group_replication, show_group_replication_mode, ALL_OPTS_PROPERTY); return 0; } @@ -793,6 +802,12 @@ main_cmdline(int argc, char **argv) } } + if(frontend->group_replication_mode != 0 && frontend->group_replication_mode != 1) { + g_critical("group-replication-mode is invalid, current value is %d", frontend->group_replication_mode); + GOTO_EXIT(EXIT_FAILURE); + } + srv->group_replication_mode = frontend->group_replication_mode; + if (chassis_frontend_init_basedir(argv[0], &(frontend->base_dir))) { GOTO_EXIT(EXIT_FAILURE); } diff --git a/src/network-mysqld.c b/src/network-mysqld.c index 5937ad3..bfb37f0 100644 --- a/src/network-mysqld.c +++ b/src/network-mysqld.c @@ -4434,7 +4434,7 @@ network_mysqld_self_con_handle(int event_fd, short events, void *user_data) case ST_ASYNC_CONN: switch (network_socket_connect_finish(con->server)) { case NETWORK_SOCKET_SUCCESS: - if (con->backend->state != BACKEND_STATE_UP) { + if (con->backend->state != BACKEND_STATE_UP && srv->group_replication_mode == 0) { con->backend->state = BACKEND_STATE_UP; g_get_current_time(&(con->backend->state_since)); g_message(G_STRLOC ": set backend: %s (%p) up", con->backend->addr->name->str, con->backend);