diff --git a/plugins/admin/admin-plugin.c b/plugins/admin/admin-plugin.c index de75a16..d977b62 100644 --- a/plugins/admin/admin-plugin.c +++ b/plugins/admin/admin-plugin.c @@ -43,6 +43,8 @@ #include "sys-pedantic.h" #include "network-ssl.h" #include "chassis-options-utils.h" +#include "cetus-channel.h" +#include "cetus-process-cycle.h" #include "admin-lexer.l.h" #include "admin-parser.y.h" @@ -325,29 +327,38 @@ void adminParserFree(void*, void (*freeProc)(void*)); void adminParser(void*, int yymajor, token_t, void*); void adminParserTrace(FILE*, char*); -static network_mysqld_stmt_ret admin_process_query(network_mysqld_con *con) +static +void construct_channel_info(chassis *cycle, char *sql) { - network_socket *recv_sock = con->client; - GList *chunk = recv_sock->recv_queue->chunks->head; - GString *packet = chunk->data; + g_message("%s:call construct_channel_info", G_STRLOC); + cetus_channel_t ch; + memset(&ch, 0, sizeof(cetus_channel_t)); + ch.basics.command = CETUS_CMD_ADMIN; + ch.basics.pid = cetus_processes[cetus_process_slot].pid; + ch.basics.slot = cetus_process_slot; + ch.basics.fd = cetus_processes[cetus_process_slot].channel[0]; - if (packet->len < NET_HEADER_SIZE) { - /* packet too short */ - return PROXY_SEND_QUERY; - } + int len = strlen(sql); + if (len >= MAX_ADMIN_SQL_LEN) { + } else { + strncpy(ch.admin_sql, sql, len); + int i; + for (i = 0; i < cetus_last_process; i++) { + g_message("%s: pass close channel s:%i pid:%d to:%d", G_STRLOC, + ch.basics.slot, ch.basics.pid, cetus_processes[i].pid); - char command = packet->str[NET_HEADER_SIZE + 0]; + /* TODO: AGAIN */ + cetus_write_channel(cetus_processes[i].channel[0], + &ch, sizeof(cetus_channel_t)); + } + } +} - if (COM_QUERY == command) { - /* we need some more data after the COM_QUERY */ - if (packet->len < NET_HEADER_SIZE + 2) return PROXY_SEND_QUERY; - } - g_string_assign_len(con->orig_sql, packet->str + (NET_HEADER_SIZE + 1), - packet->len - (NET_HEADER_SIZE + 1)); - - const char* sql = con->orig_sql->str; - admin_clear_error(con); +NETWORK_MYSQLD_PLUGIN_PROTO(execute_admin_query) +{ + g_message("%s:call execute_admin_query", G_STRLOC); + char *sql = con->orig_sql->str; /* init lexer & parser */ yyscan_t scanner; @@ -387,6 +398,33 @@ static network_mysqld_stmt_ret admin_process_query(network_mysqld_con *con) adminParserFree(parser, free); adminyy_delete_buffer(buf_state, scanner); adminyylex_destroy(scanner); +} + +static network_mysqld_stmt_ret admin_process_query(network_mysqld_con *con) +{ + network_socket *recv_sock = con->client; + GList *chunk = recv_sock->recv_queue->chunks->head; + GString *packet = chunk->data; + + if (packet->len < NET_HEADER_SIZE) { + /* packet too short */ + return PROXY_SEND_QUERY; + } + + char command = packet->str[NET_HEADER_SIZE + 0]; + + if (COM_QUERY == command) { + /* we need some more data after the COM_QUERY */ + if (packet->len < NET_HEADER_SIZE + 2) return PROXY_SEND_QUERY; + } + + g_string_assign_len(con->orig_sql, packet->str + (NET_HEADER_SIZE + 1), + packet->len - (NET_HEADER_SIZE + 1)); + + construct_channel_info(con->srv, con->orig_sql->str); + + admin_clear_error(con); + return PROXY_SEND_RESULT; } @@ -394,6 +432,7 @@ static network_mysqld_stmt_ret admin_process_query(network_mysqld_con *con) * gets called after a query has been read */ NETWORK_MYSQLD_PLUGIN_PROTO(server_read_query) { + g_message("%s:call server_read_query", G_STRLOC); network_socket *recv_sock; network_mysqld_stmt_ret ret; @@ -459,6 +498,8 @@ static int network_mysqld_server_connection_init(network_mysqld_con *con) { con->plugins.con_timeout = server_timeout; + con->plugins.con_exectute_sql = execute_admin_query; + con->plugins.con_cleanup = admin_disconnect_client; return 0; @@ -690,6 +731,7 @@ network_mysqld_admin_plugin_apply_config(chassis *chas, listen_sock = network_socket_new(); con->server = listen_sock; + /* * set the plugin hooks as we want to apply them to the new * connections too later @@ -718,6 +760,8 @@ network_mysqld_admin_plugin_apply_config(chassis *chas, EV_READ|EV_PERSIST, network_mysqld_con_accept, con); chassis_event_add(chas, &(listen_sock->event)); + chas->admin_plugin = &(con->plugins); + chassis_config_register_service(chas->config_manager, config->address, "admin"); config->admin_stats = admin_stats_init(chas); return 0; diff --git a/plugins/admin/admin-plugin.h b/plugins/admin/admin-plugin.h index 96d0691..6b2056a 100644 --- a/plugins/admin/admin-plugin.h +++ b/plugins/admin/admin-plugin.h @@ -4,6 +4,7 @@ #include "glib-ext.h" #include "network-mysqld.h" #include "admin-stats.h" +#include "cetus-process.h" #ifndef PLUGIN_VERSION #ifdef CHASSIS_BUILD_TAG @@ -33,5 +34,4 @@ struct chassis_plugin_config { admin_stats_t *admin_stats; }; - #endif diff --git a/src/cetus-process-cycle.c b/src/cetus-process-cycle.c index 9096766..62e3d08 100644 --- a/src/cetus-process-cycle.c +++ b/src/cetus-process-cycle.c @@ -4,6 +4,7 @@ #include #include +#include "network-mysqld.h" #include "cetus-channel.h" #include "cetus-process.h" #include "cetus-process-cycle.h" @@ -576,7 +577,7 @@ cetus_worker_process_init(cetus_cycle_t *cycle, int worker) cycle->event_base = mainloop; g_assert(cycle->event_base); - event_set(&cetus_channel_event, cetus_channel, EV_READ | EV_PERSIST, cetus_channel_handler, NULL); + event_set(&cetus_channel_event, cetus_channel, EV_READ | EV_PERSIST, cetus_channel_handler, cycle); chassis_event_add(cycle, &cetus_channel_event); g_debug("%s: cetus_channel:%d is waiting for read, event base:%p, ev:%p", G_STRLOC, cetus_channel, cycle->event_base, &cetus_channel_event); @@ -595,8 +596,23 @@ cetus_worker_process_exit(cetus_cycle_t *cycle) static void -process_admin_sql(cetus_channel_t *ch) +process_admin_sql(cetus_cycle_t *cycle, cetus_channel_t *ch) { + network_mysqld_con *con = network_mysqld_con_new(); + con->plugin_con_state = g_new0(int, 1); + network_socket *client = network_socket_new(); + con->client = client; + con->srv = cycle; + + g_string_assign_len(con->orig_sql, ch->admin_sql, strlen(ch->admin_sql)); + + if (cycle->admin_plugin) { + network_socket_retval_t retval = NETWORK_SOCKET_SUCCESS; + NETWORK_MYSQLD_PLUGIN_FUNC(func) = NULL; + network_mysqld_hooks *plugin = cycle->admin_plugin; + func = plugin->con_exectute_sql; + retval = (*func) (cycle, con); + } } static void @@ -628,7 +644,7 @@ cetus_channel_handler(int fd, short events, void *user_data) switch (ch.basics.command) { case CETUS_CMD_ADMIN: - process_admin_sql(&ch); + process_admin_sql(user_data, &ch); break; case CETUS_CMD_QUIT: cetus_quit = 1; diff --git a/src/cetus-process-cycle.h b/src/cetus-process-cycle.h index bbcc3af..4c2f4c6 100644 --- a/src/cetus-process-cycle.h +++ b/src/cetus-process-cycle.h @@ -3,6 +3,7 @@ #define _CETUS_PROCESS_CYCLE_H_INCLUDED_ #include "cetus-process.h" +#include "cetus-channel.h" #define CETUS_PROCESS_SINGLE 0 #define CETUS_PROCESS_MASTER 1 @@ -13,7 +14,6 @@ void cetus_master_process_cycle(cetus_cycle_t *cycle); - extern unsigned int cetus_process; extern unsigned int cetus_worker; extern pid_t cetus_pid; diff --git a/src/cetus-process.h b/src/cetus-process.h index c1339cb..16d08f2 100644 --- a/src/cetus-process.h +++ b/src/cetus-process.h @@ -4,6 +4,7 @@ #include "cetus-setaffinity.h" #include "chassis-mainloop.h" +#include "network-mysqld.h" typedef chassis cetus_cycle_t; diff --git a/src/chassis-mainloop.h b/src/chassis-mainloop.h index b334f5e..ec9a81a 100644 --- a/src/chassis-mainloop.h +++ b/src/chassis-mainloop.h @@ -106,6 +106,7 @@ struct chassis { /**< array(chassis_plugin) */ GPtrArray *modules; + void *admin_plugin; /**< base directory for all relative paths referenced */ gchar *base_dir; diff --git a/src/network-mysqld.c b/src/network-mysqld.c index 06a46fd..73aed55 100644 --- a/src/network-mysqld.c +++ b/src/network-mysqld.c @@ -2352,6 +2352,7 @@ handle_read_query(network_mysqld_con *con, network_mysqld_con_state_t ostate) } WAIT_FOR_EVENT(con->client, EV_READ, &timeout); + return DISP_STOP; case NETWORK_SOCKET_ERROR_RETRY: case NETWORK_SOCKET_ERROR: @@ -5023,7 +5024,7 @@ check_and_create_conns_func(int fd, short what, void *arg) } } - g_message("%s: check_and_create_conns_func", G_STRLOC); + g_debug("%s: check_and_create_conns_func", G_STRLOC); struct timeval check_interval = {10, 0}; chassis_event_add_with_timeout(chas, &chas->auto_create_conns_event, &check_interval); } diff --git a/src/network-mysqld.h b/src/network-mysqld.h index f194b6f..3c74dcb 100644 --- a/src/network-mysqld.h +++ b/src/network-mysqld.h @@ -176,6 +176,8 @@ typedef struct { */ NETWORK_MYSQLD_PLUGIN_FUNC(con_cleanup); + NETWORK_MYSQLD_PLUGIN_FUNC(con_exectute_sql); + NETWORK_MYSQLD_PLUGIN_FUNC(con_timeout); } network_mysqld_hooks;