Add code for kill query

This commit is contained in:
wangbin579 2018-08-20 15:19:00 +08:00
parent 8f3087d974
commit 87f6be8dc0
6 changed files with 74 additions and 1 deletions

View File

@ -419,6 +419,16 @@ void admin_show_connectionlist(network_mysqld_con *con, int show_count)
fields = g_ptr_array_new_with_free_func((void *) network_mysqld_proto_fielddef_free);
field = network_mysqld_proto_fielddef_new();
field->name = g_strdup("PID");
field->type = MYSQL_TYPE_STRING;
g_ptr_array_add(fields, field);
field = network_mysqld_proto_fielddef_new();
field->name = g_strdup("ThreadID");
field->type = MYSQL_TYPE_STRING;
g_ptr_array_add(fields, field);
field = network_mysqld_proto_fielddef_new();
field->name = g_strdup("User");
field->type = MYSQL_TYPE_STRING;
@ -488,6 +498,8 @@ void admin_show_connectionlist(network_mysqld_con *con, int show_count)
len = priv->cons->len;
int count = 0;
cetus_pid_t process_id = getpid();
for (i = 0; i < len; i++) {
network_mysqld_con *con = priv->cons->pdata[i];
@ -502,6 +514,13 @@ void admin_show_connectionlist(network_mysqld_con *con, int show_count)
count++;
row = g_ptr_array_new_with_free_func(g_free);
sprintf(buffer, "%d", process_id);
g_ptr_array_add(row, g_strdup(buffer));
sprintf(buffer, "%d", con->client->challenge->thread_id);
g_ptr_array_add(row, g_strdup(buffer));
if (con->client->response != NULL) {
g_ptr_array_add(row, g_strdup(con->client->response->username->str));
} else {
@ -1483,6 +1502,23 @@ void admin_config_reload(network_mysqld_con* con, char* object)
}
}
void admin_kill_query(network_mysqld_con* con, unsigned int thread_id)
{
if (con->is_admin_client) {
con->process_index = thread_id >> 24;
if (con->process_index > cetus_last_process) {
con->direct_answer = 1;
network_mysqld_con_send_error(con->client, C("thread id is not correct"));
} else {
con->ask_the_given_worker = 1;
}
return;
}
}
void admin_reset_stats(network_mysqld_con* con)
{
if (con->is_admin_client) {
@ -1946,11 +1982,13 @@ void admin_save_settings(network_mysqld_con *con)
network_mysqld_con_send_error_full(con->client, L(msg), 1066, "28000");
}
}
void admin_compatible_cmd(network_mysqld_con* con)
{
con->direct_answer = 1;
network_mysqld_con_send_ok(con->client);
}
void admin_show_databases(network_mysqld_con* con)
{
g_debug("%s:call admin_show_databases", G_STRLOC);

View File

@ -471,10 +471,27 @@ int construct_channel_info(network_mysqld_con *con, char *sql)
strncpy(ch.admin_sql, sql, len);
g_message("%s:cetus_last_process:%d, ch admin sql:%s",
G_STRLOC, cetus_last_process, ch.admin_sql);
if (con->ask_the_given_worker) {
int index = con->process_index;
g_message("%s: pass sql info to s:%i pid:%d to:%d", G_STRLOC,
ch.basics.slot, ch.basics.pid, cetus_processes[index].pid);
/* TODO: AGAIN */
cetus_write_channel(cetus_processes[index].parent_child_channel[0],
&ch, sizeof(cetus_channel_t));
int fd = cetus_processes[index].parent_child_channel[0];
g_debug("%s:fd:%d for network_read_sql_resp", G_STRLOC, fd);
event_set(&(cetus_processes[index].event), fd, EV_READ, network_read_sql_resp, con);
chassis_event_add_with_timeout(cycle, &(cetus_processes[index].event), NULL);
con->num_read_pending++;
g_debug("%s:con num_read_pending:%d", G_STRLOC, con->num_read_pending);
return 0;
}
int num = cetus_last_process;
if (con->ask_one_worker) {
num = 1;
}
int i;
for (i = 0; i < num; i++) {
g_message("%s: pass sql info to s:%i pid:%d to:%d", G_STRLOC,
@ -582,6 +599,7 @@ static network_mysqld_stmt_ret admin_process_query(network_mysqld_con *con)
con->direct_answer = 0;
con->ask_one_worker = 0;
con->ask_the_given_worker = 0;
con->admin_read_merge = 0;
visit_parser(con, con->orig_sql->str);

View File

@ -503,6 +503,11 @@ cetus_worker_process_cycle(cetus_cycle_t *cycle, void *data)
}
}
cycle->priv->thread_id = 1 + (cetus_last_process << 24);
cycle->priv->max_thread_id = (cetus_last_process << 24) + (1 << 24) - 1;
g_message("%s: first thread id:%d, max thread id:%d", G_STRLOC,
cycle->priv->thread_id, cycle->priv->max_thread_id);
#ifndef SIMPLE_PARSER
cycle->dist_tran_id = g_random_int_range(0, 100000000);
struct ifreq buffer;

View File

@ -102,6 +102,8 @@
#define E_NET_WOULDBLOCK EWOULDBLOCK
#endif
extern int cetus_last_process;
static void network_mysqld_self_con_handle(int event_fd, short events, void *user_data);
/**
@ -177,7 +179,7 @@ network_mysqld_priv_init(void)
priv->backends = network_backends_new();
priv->users = cetus_users_new();
priv->monitor = cetus_monitor_new();
priv->thread_id = 1;
return priv;
}

View File

@ -570,8 +570,10 @@ struct network_mysqld_con {
unsigned int direct_answer:1;
unsigned int admin_read_merge:1;
unsigned int ask_one_worker:1;
unsigned int ask_the_given_worker:1;
unsigned int is_client_to_be_closed:1;
unsigned int last_backend_type:2;
unsigned int process_index:6;
unsigned int all_participate_num:8;
unsigned long long xa_id;
@ -754,6 +756,7 @@ struct chassis_private {
struct cetus_variable_t *stats_variables;
struct cetus_monitor_t *monitor;
guint32 thread_id;
guint32 max_thread_id;
};
NETWORK_API network_socket_retval_t

View File

@ -74,6 +74,8 @@ typedef int socklen_t;
#define MAX_CACHED_ITEMS 65536
extern int cetus_last_process;
/* judge if client_ip_with_username is in allow or deny ip_table*/
static gboolean
client_ip_table_lookup(GHashTable *ip_table, char *client_ip_with_username)
@ -430,6 +432,11 @@ do_connect_cetus(network_mysqld_con *con, network_backend_t **backend, int *back
g_string_free(version, FALSE);
challenge->thread_id = g->thread_id++;
if (g->thread_id > g->max_thread_id) {
g->thread_id = 1 + (cetus_last_process << 24);
g_message("%s: rewind first thread id:%d", G_STRLOC, g->thread_id);
}
GString *auth_packet = g_string_new(NULL);
network_mysqld_proto_append_auth_challenge(auth_packet, challenge);