diff --git a/plugins/admin/admin-commands.c b/plugins/admin/admin-commands.c index afde71a..5e9254b 100644 --- a/plugins/admin/admin-commands.c +++ b/plugins/admin/admin-commands.c @@ -419,6 +419,16 @@ void admin_show_connectionlist(network_mysqld_con *con, int show_count) fields = g_ptr_array_new_with_free_func((void *) network_mysqld_proto_fielddef_free); + field = network_mysqld_proto_fielddef_new(); + field->name = g_strdup("PID"); + field->type = MYSQL_TYPE_STRING; + g_ptr_array_add(fields, field); + + field = network_mysqld_proto_fielddef_new(); + field->name = g_strdup("ThreadID"); + field->type = MYSQL_TYPE_STRING; + g_ptr_array_add(fields, field); + field = network_mysqld_proto_fielddef_new(); field->name = g_strdup("User"); field->type = MYSQL_TYPE_STRING; @@ -488,6 +498,8 @@ void admin_show_connectionlist(network_mysqld_con *con, int show_count) len = priv->cons->len; int count = 0; + cetus_pid_t process_id = getpid(); + for (i = 0; i < len; i++) { network_mysqld_con *con = priv->cons->pdata[i]; @@ -502,6 +514,13 @@ void admin_show_connectionlist(network_mysqld_con *con, int show_count) count++; row = g_ptr_array_new_with_free_func(g_free); + + sprintf(buffer, "%d", process_id); + g_ptr_array_add(row, g_strdup(buffer)); + + sprintf(buffer, "%d", con->client->challenge->thread_id); + g_ptr_array_add(row, g_strdup(buffer)); + if (con->client->response != NULL) { g_ptr_array_add(row, g_strdup(con->client->response->username->str)); } else { @@ -1483,6 +1502,23 @@ void admin_config_reload(network_mysqld_con* con, char* object) } } +void admin_kill_query(network_mysqld_con* con, unsigned int thread_id) +{ + if (con->is_admin_client) { + con->process_index = thread_id >> 24; + + if (con->process_index > cetus_last_process) { + con->direct_answer = 1; + network_mysqld_con_send_error(con->client, C("thread id is not correct")); + } else { + con->ask_the_given_worker = 1; + } + + return; + } + +} + void admin_reset_stats(network_mysqld_con* con) { if (con->is_admin_client) { @@ -1946,11 +1982,13 @@ void admin_save_settings(network_mysqld_con *con) network_mysqld_con_send_error_full(con->client, L(msg), 1066, "28000"); } } + void admin_compatible_cmd(network_mysqld_con* con) { con->direct_answer = 1; network_mysqld_con_send_ok(con->client); } + void admin_show_databases(network_mysqld_con* con) { g_debug("%s:call admin_show_databases", G_STRLOC); diff --git a/plugins/admin/admin-plugin.c b/plugins/admin/admin-plugin.c index 84200c9..d7afdab 100644 --- a/plugins/admin/admin-plugin.c +++ b/plugins/admin/admin-plugin.c @@ -471,10 +471,27 @@ int construct_channel_info(network_mysqld_con *con, char *sql) strncpy(ch.admin_sql, sql, len); g_message("%s:cetus_last_process:%d, ch admin sql:%s", G_STRLOC, cetus_last_process, ch.admin_sql); + if (con->ask_the_given_worker) { + int index = con->process_index; + g_message("%s: pass sql info to s:%i pid:%d to:%d", G_STRLOC, + ch.basics.slot, ch.basics.pid, cetus_processes[index].pid); + /* TODO: AGAIN */ + cetus_write_channel(cetus_processes[index].parent_child_channel[0], + &ch, sizeof(cetus_channel_t)); + int fd = cetus_processes[index].parent_child_channel[0]; + g_debug("%s:fd:%d for network_read_sql_resp", G_STRLOC, fd); + event_set(&(cetus_processes[index].event), fd, EV_READ, network_read_sql_resp, con); + chassis_event_add_with_timeout(cycle, &(cetus_processes[index].event), NULL); + con->num_read_pending++; + g_debug("%s:con num_read_pending:%d", G_STRLOC, con->num_read_pending); + return 0; + } + int num = cetus_last_process; if (con->ask_one_worker) { num = 1; } + int i; for (i = 0; i < num; i++) { g_message("%s: pass sql info to s:%i pid:%d to:%d", G_STRLOC, @@ -582,6 +599,7 @@ static network_mysqld_stmt_ret admin_process_query(network_mysqld_con *con) con->direct_answer = 0; con->ask_one_worker = 0; + con->ask_the_given_worker = 0; con->admin_read_merge = 0; visit_parser(con, con->orig_sql->str); diff --git a/src/cetus-process-cycle.c b/src/cetus-process-cycle.c index ef0d299..faa661a 100644 --- a/src/cetus-process-cycle.c +++ b/src/cetus-process-cycle.c @@ -503,6 +503,11 @@ cetus_worker_process_cycle(cetus_cycle_t *cycle, void *data) } } + cycle->priv->thread_id = 1 + (cetus_last_process << 24); + cycle->priv->max_thread_id = (cetus_last_process << 24) + (1 << 24) - 1; + g_message("%s: first thread id:%d, max thread id:%d", G_STRLOC, + cycle->priv->thread_id, cycle->priv->max_thread_id); + #ifndef SIMPLE_PARSER cycle->dist_tran_id = g_random_int_range(0, 100000000); struct ifreq buffer; diff --git a/src/network-mysqld.c b/src/network-mysqld.c index bbfc571..9e4f3c0 100644 --- a/src/network-mysqld.c +++ b/src/network-mysqld.c @@ -102,6 +102,8 @@ #define E_NET_WOULDBLOCK EWOULDBLOCK #endif +extern int cetus_last_process; + static void network_mysqld_self_con_handle(int event_fd, short events, void *user_data); /** @@ -177,7 +179,7 @@ network_mysqld_priv_init(void) priv->backends = network_backends_new(); priv->users = cetus_users_new(); priv->monitor = cetus_monitor_new(); - priv->thread_id = 1; + return priv; } diff --git a/src/network-mysqld.h b/src/network-mysqld.h index 2ff1f21..dc805e5 100644 --- a/src/network-mysqld.h +++ b/src/network-mysqld.h @@ -570,8 +570,10 @@ struct network_mysqld_con { unsigned int direct_answer:1; unsigned int admin_read_merge:1; unsigned int ask_one_worker:1; + unsigned int ask_the_given_worker:1; unsigned int is_client_to_be_closed:1; unsigned int last_backend_type:2; + unsigned int process_index:6; unsigned int all_participate_num:8; unsigned long long xa_id; @@ -754,6 +756,7 @@ struct chassis_private { struct cetus_variable_t *stats_variables; struct cetus_monitor_t *monitor; guint32 thread_id; + guint32 max_thread_id; }; NETWORK_API network_socket_retval_t diff --git a/src/plugin-common.c b/src/plugin-common.c index fa1745b..9d3ce71 100644 --- a/src/plugin-common.c +++ b/src/plugin-common.c @@ -74,6 +74,8 @@ typedef int socklen_t; #define MAX_CACHED_ITEMS 65536 +extern int cetus_last_process; + /* judge if client_ip_with_username is in allow or deny ip_table*/ static gboolean client_ip_table_lookup(GHashTable *ip_table, char *client_ip_with_username) @@ -430,6 +432,11 @@ do_connect_cetus(network_mysqld_con *con, network_backend_t **backend, int *back g_string_free(version, FALSE); challenge->thread_id = g->thread_id++; + if (g->thread_id > g->max_thread_id) { + g->thread_id = 1 + (cetus_last_process << 24); + g_message("%s: rewind first thread id:%d", G_STRLOC, g->thread_id); + } + GString *auth_packet = g_string_new(NULL); network_mysqld_proto_append_auth_challenge(auth_packet, challenge);