diff --git a/plugins/admin/admin-commands.c b/plugins/admin/admin-commands.c index 5e9254b..cd9eb63 100644 --- a/plugins/admin/admin-commands.c +++ b/plugins/admin/admin-commands.c @@ -518,8 +518,12 @@ void admin_show_connectionlist(network_mysqld_con *con, int show_count) sprintf(buffer, "%d", process_id); g_ptr_array_add(row, g_strdup(buffer)); - sprintf(buffer, "%d", con->client->challenge->thread_id); - g_ptr_array_add(row, g_strdup(buffer)); + if (con->client->challenge) { + sprintf(buffer, "%d", con->client->challenge->thread_id); + g_ptr_array_add(row, g_strdup(buffer)); + } else { + g_ptr_array_add(row, NULL); + } if (con->client->response != NULL) { g_ptr_array_add(row, g_strdup(con->client->response->username->str)); @@ -1502,7 +1506,7 @@ void admin_config_reload(network_mysqld_con* con, char* object) } } -void admin_kill_query(network_mysqld_con* con, unsigned int thread_id) +void admin_kill_query(network_mysqld_con* con, guint32 thread_id) { if (con->is_admin_client) { con->process_index = thread_id >> 24; @@ -1517,6 +1521,9 @@ void admin_kill_query(network_mysqld_con* con, unsigned int thread_id) return; } + gboolean ok = network_mysqld_kill_connection(con->srv, thread_id); + network_mysqld_con_send_ok_full(con->client, ok ? 1 : 0, 0, SERVER_STATUS_AUTOCOMMIT, 0); + } void admin_reset_stats(network_mysqld_con* con) diff --git a/plugins/admin/admin-commands.h b/plugins/admin/admin-commands.h index db8ed56..4352793 100644 --- a/plugins/admin/admin-commands.h +++ b/plugins/admin/admin-commands.h @@ -61,4 +61,5 @@ void admin_select_single_table(network_mysqld_con*); void admin_sql_log_start(network_mysqld_con* con); void admin_sql_log_stop(network_mysqld_con* con); void admin_sql_log_status(network_mysqld_con* con); +void admin_kill_query(network_mysqld_con* con, guint32); #endif // ADMIN_COMMANDS_H diff --git a/plugins/admin/admin-parser.y b/plugins/admin/admin-parser.y index 849f1dc..f1caf3d 100644 --- a/plugins/admin/admin-parser.y +++ b/plugins/admin/admin-parser.y @@ -457,4 +457,7 @@ cmd ::= SQL LOG START SEMI. { } cmd ::= SQL LOG STOP SEMI. { admin_sql_log_stop(con); -} \ No newline at end of file +} +cmd ::= KILL QUERY INTEGER(X) SEMI. { + admin_kill_query(con, token2int(X)); +} diff --git a/plugins/admin/lexer.l b/plugins/admin/lexer.l index ddbe754..2bf50b7 100644 --- a/plugins/admin/lexer.l +++ b/plugins/admin/lexer.l @@ -99,6 +99,8 @@ "LOG" return TK_LOG; "START" return TK_START; "STOP" return TK_STOP; +"KILL" return TK_KILL; +"QUERY" return TK_QUERY; [0-9]+ return TK_INTEGER; /*sign symbol is handled in parser*/ diff --git a/plugins/shard/shard-plugin.c b/plugins/shard/shard-plugin.c index 1ab442c..9cfa4c2 100644 --- a/plugins/shard/shard-plugin.c +++ b/plugins/shard/shard-plugin.c @@ -1201,7 +1201,8 @@ proxy_get_server_list(network_mysqld_con *con) } break; default: - rv = sharding_parse_groups(con->client->default_db, st->sql_context, stats, con->key, plan); + rv = sharding_parse_groups(con->client->default_db, st->sql_context, + stats, con->key, plan); break; } diff --git a/plugins/shard/sharding-parser.c b/plugins/shard/sharding-parser.c index 0399112..0748bb1 100644 --- a/plugins/shard/sharding-parser.c +++ b/plugins/shard/sharding-parser.c @@ -1484,7 +1484,7 @@ routing_by_property(sql_context_t *context, sql_property_t *property, char *defa int sharding_parse_groups(GString *default_db, sql_context_t *context, query_stats_t *stats, - guint32 fixture, sharding_plan_t *plan) + guint64 fixture, sharding_plan_t *plan) { GPtrArray *groups = g_ptr_array_new(); if (context == NULL) { diff --git a/plugins/shard/sharding-parser.h b/plugins/shard/sharding-parser.h index 24340cc..6f2081c 100644 --- a/plugins/shard/sharding-parser.h +++ b/plugins/shard/sharding-parser.h @@ -37,7 +37,7 @@ #define USE_PREVIOUS_TRAN_CONNS 9 #define ERROR_UNPARSABLE -1 -NETWORK_API int sharding_parse_groups(GString *, sql_context_t *, query_stats_t *, unsigned int, sharding_plan_t *); +NETWORK_API int sharding_parse_groups(GString *, sql_context_t *, query_stats_t *, guint64, sharding_plan_t *); NETWORK_API GString *sharding_modify_sql(sql_context_t *, having_condition_t *); diff --git a/src/chassis-mainloop.h b/src/chassis-mainloop.h index d736904..b52f9b1 100644 --- a/src/chassis-mainloop.h +++ b/src/chassis-mainloop.h @@ -126,7 +126,7 @@ struct chassis { char *default_charset; char *default_hashed_pwd; - unsigned int sess_key; + guint64 sess_key; unsigned int maintain_close_mode; unsigned int disable_threads; int ssl; diff --git a/src/network-mysqld.c b/src/network-mysqld.c index 9e4f3c0..285c74c 100644 --- a/src/network-mysqld.c +++ b/src/network-mysqld.c @@ -309,6 +309,31 @@ network_mysqld_add_connection(chassis *srv, network_mysqld_con *con, gboolean li } } +gboolean +network_mysqld_kill_connection(chassis *srv, guint32 id) +{ + int i; + + for (i = 0; i < srv->priv->cons->len; ++i) { + network_mysqld_con* con = g_ptr_array_index(srv->priv->cons, i); + + if (!con->client || !con->client->challenge) { + continue; + } + + if (con->client->challenge->thread_id == id) { + g_ptr_array_remove_index(srv->priv->cons, i); + con->server_to_be_closed = 1; + plugin_call_cleanup(srv, con); + g_message(G_STRLOC "kill query %u", (unsigned int) id); + network_mysqld_con_free(con); + return TRUE; + } + } + + return FALSE; +} + static void cetus_clean_conn_data(network_mysqld_con *con) { @@ -4223,7 +4248,7 @@ network_mysqld_con_accept(int G_GNUC_UNUSED event_fd, short events, void *user_d network_mysqld_add_connection(listen_con->srv, client_con, FALSE); client_con->key = client_con->srv->sess_key++; - g_message("%s: accept a new client connection, sess key:%d", G_STRLOC, client_con->key); + g_message("%s: accept a new client connection", G_STRLOC); /** * inherit the config to the new connection @@ -5099,4 +5124,3 @@ check_and_create_conns_func(int fd, short what, void *arg) chassis_event_add_with_timeout(chas, &chas->auto_create_conns_event, &check_interval); } - diff --git a/src/network-mysqld.h b/src/network-mysqld.h index dc805e5..a5fe66a 100644 --- a/src/network-mysqld.h +++ b/src/network-mysqld.h @@ -767,6 +767,7 @@ NETWORK_API void send_part_content_to_client(network_mysqld_con *con); NETWORK_API void set_conn_attr(network_mysqld_con *con, network_socket *server); NETWORK_API int network_mysqld_init(chassis *srv); NETWORK_API void network_mysqld_add_connection(chassis *srv, network_mysqld_con *con, gboolean listen); +gboolean network_mysqld_kill_connection(chassis *srv, guint32 id); NETWORK_API void network_mysqld_con_handle(int event_fd, short events, void *user_data); NETWORK_API int network_mysqld_queue_append(network_socket *sock, network_queue *queue, const char *data, size_t len); NETWORK_API int network_mysqld_queue_append_raw(network_socket *sock, network_queue *queue, GString *data); diff --git a/src/sharding-config.c b/src/sharding-config.c index 0a5ff2e..39bedca 100644 --- a/src/sharding-config.c +++ b/src/sharding-config.c @@ -382,7 +382,7 @@ shard_conf_is_shard_table(const char *db, const char *table) } GPtrArray * -shard_conf_get_fixed_group(GPtrArray *groups, guint32 fixture) +shard_conf_get_fixed_group(GPtrArray *groups, guint64 fixture) { int len = g_list_length(shard_conf_all_groups); if (len == 0) { diff --git a/src/sharding-config.h b/src/sharding-config.h index e369231..4d265ca 100644 --- a/src/sharding-config.h +++ b/src/sharding-config.h @@ -99,7 +99,7 @@ GPtrArray *shard_conf_get_any_group(GPtrArray *groups, const char *db, const cha GPtrArray *shard_conf_get_all_groups(GPtrArray *groups); /* same fixture will get same group */ -GPtrArray *shard_conf_get_fixed_group(GPtrArray *groups, guint32 fixture); +GPtrArray *shard_conf_get_fixed_group(GPtrArray *groups, guint64 fixture); GPtrArray *shard_conf_get_table_groups(GPtrArray *visited_groups, const char *db, const char *table);