Add code for sending sql to workers

This commit is contained in:
wangbin579 2018-07-04 14:13:50 +08:00
parent 323f1e46f9
commit 5743ea5441
8 changed files with 89 additions and 24 deletions

View File

@ -43,6 +43,8 @@
#include "sys-pedantic.h" #include "sys-pedantic.h"
#include "network-ssl.h" #include "network-ssl.h"
#include "chassis-options-utils.h" #include "chassis-options-utils.h"
#include "cetus-channel.h"
#include "cetus-process-cycle.h"
#include "admin-lexer.l.h" #include "admin-lexer.l.h"
#include "admin-parser.y.h" #include "admin-parser.y.h"
@ -325,29 +327,38 @@ void adminParserFree(void*, void (*freeProc)(void*));
void adminParser(void*, int yymajor, token_t, void*); void adminParser(void*, int yymajor, token_t, void*);
void adminParserTrace(FILE*, char*); void adminParserTrace(FILE*, char*);
static network_mysqld_stmt_ret admin_process_query(network_mysqld_con *con) static
void construct_channel_info(chassis *cycle, char *sql)
{ {
network_socket *recv_sock = con->client; g_message("%s:call construct_channel_info", G_STRLOC);
GList *chunk = recv_sock->recv_queue->chunks->head; cetus_channel_t ch;
GString *packet = chunk->data; memset(&ch, 0, sizeof(cetus_channel_t));
ch.basics.command = CETUS_CMD_ADMIN;
ch.basics.pid = cetus_processes[cetus_process_slot].pid;
ch.basics.slot = cetus_process_slot;
ch.basics.fd = cetus_processes[cetus_process_slot].channel[0];
if (packet->len < NET_HEADER_SIZE) { int len = strlen(sql);
/* packet too short */ if (len >= MAX_ADMIN_SQL_LEN) {
return PROXY_SEND_QUERY; } else {
} strncpy(ch.admin_sql, sql, len);
int i;
for (i = 0; i < cetus_last_process; i++) {
g_message("%s: pass close channel s:%i pid:%d to:%d", G_STRLOC,
ch.basics.slot, ch.basics.pid, cetus_processes[i].pid);
char command = packet->str[NET_HEADER_SIZE + 0]; /* TODO: AGAIN */
cetus_write_channel(cetus_processes[i].channel[0],
&ch, sizeof(cetus_channel_t));
}
}
}
if (COM_QUERY == command) {
/* we need some more data after the COM_QUERY */
if (packet->len < NET_HEADER_SIZE + 2) return PROXY_SEND_QUERY;
}
g_string_assign_len(con->orig_sql, packet->str + (NET_HEADER_SIZE + 1), NETWORK_MYSQLD_PLUGIN_PROTO(execute_admin_query)
packet->len - (NET_HEADER_SIZE + 1)); {
g_message("%s:call execute_admin_query", G_STRLOC);
const char* sql = con->orig_sql->str; char *sql = con->orig_sql->str;
admin_clear_error(con);
/* init lexer & parser */ /* init lexer & parser */
yyscan_t scanner; yyscan_t scanner;
@ -387,6 +398,33 @@ static network_mysqld_stmt_ret admin_process_query(network_mysqld_con *con)
adminParserFree(parser, free); adminParserFree(parser, free);
adminyy_delete_buffer(buf_state, scanner); adminyy_delete_buffer(buf_state, scanner);
adminyylex_destroy(scanner); adminyylex_destroy(scanner);
}
static network_mysqld_stmt_ret admin_process_query(network_mysqld_con *con)
{
network_socket *recv_sock = con->client;
GList *chunk = recv_sock->recv_queue->chunks->head;
GString *packet = chunk->data;
if (packet->len < NET_HEADER_SIZE) {
/* packet too short */
return PROXY_SEND_QUERY;
}
char command = packet->str[NET_HEADER_SIZE + 0];
if (COM_QUERY == command) {
/* we need some more data after the COM_QUERY */
if (packet->len < NET_HEADER_SIZE + 2) return PROXY_SEND_QUERY;
}
g_string_assign_len(con->orig_sql, packet->str + (NET_HEADER_SIZE + 1),
packet->len - (NET_HEADER_SIZE + 1));
construct_channel_info(con->srv, con->orig_sql->str);
admin_clear_error(con);
return PROXY_SEND_RESULT; return PROXY_SEND_RESULT;
} }
@ -394,6 +432,7 @@ static network_mysqld_stmt_ret admin_process_query(network_mysqld_con *con)
* gets called after a query has been read * gets called after a query has been read
*/ */
NETWORK_MYSQLD_PLUGIN_PROTO(server_read_query) { NETWORK_MYSQLD_PLUGIN_PROTO(server_read_query) {
g_message("%s:call server_read_query", G_STRLOC);
network_socket *recv_sock; network_socket *recv_sock;
network_mysqld_stmt_ret ret; network_mysqld_stmt_ret ret;
@ -459,6 +498,8 @@ static int network_mysqld_server_connection_init(network_mysqld_con *con) {
con->plugins.con_timeout = server_timeout; con->plugins.con_timeout = server_timeout;
con->plugins.con_exectute_sql = execute_admin_query;
con->plugins.con_cleanup = admin_disconnect_client; con->plugins.con_cleanup = admin_disconnect_client;
return 0; return 0;
@ -690,6 +731,7 @@ network_mysqld_admin_plugin_apply_config(chassis *chas,
listen_sock = network_socket_new(); listen_sock = network_socket_new();
con->server = listen_sock; con->server = listen_sock;
/* /*
* set the plugin hooks as we want to apply them to the new * set the plugin hooks as we want to apply them to the new
* connections too later * connections too later
@ -718,6 +760,8 @@ network_mysqld_admin_plugin_apply_config(chassis *chas,
EV_READ|EV_PERSIST, network_mysqld_con_accept, con); EV_READ|EV_PERSIST, network_mysqld_con_accept, con);
chassis_event_add(chas, &(listen_sock->event)); chassis_event_add(chas, &(listen_sock->event));
chas->admin_plugin = &(con->plugins);
chassis_config_register_service(chas->config_manager, config->address, "admin"); chassis_config_register_service(chas->config_manager, config->address, "admin");
config->admin_stats = admin_stats_init(chas); config->admin_stats = admin_stats_init(chas);
return 0; return 0;

View File

@ -4,6 +4,7 @@
#include "glib-ext.h" #include "glib-ext.h"
#include "network-mysqld.h" #include "network-mysqld.h"
#include "admin-stats.h" #include "admin-stats.h"
#include "cetus-process.h"
#ifndef PLUGIN_VERSION #ifndef PLUGIN_VERSION
#ifdef CHASSIS_BUILD_TAG #ifdef CHASSIS_BUILD_TAG
@ -33,5 +34,4 @@ struct chassis_plugin_config {
admin_stats_t *admin_stats; admin_stats_t *admin_stats;
}; };
#endif #endif

View File

@ -4,6 +4,7 @@
#include <unistd.h> #include <unistd.h>
#include <string.h> #include <string.h>
#include "network-mysqld.h"
#include "cetus-channel.h" #include "cetus-channel.h"
#include "cetus-process.h" #include "cetus-process.h"
#include "cetus-process-cycle.h" #include "cetus-process-cycle.h"
@ -576,7 +577,7 @@ cetus_worker_process_init(cetus_cycle_t *cycle, int worker)
cycle->event_base = mainloop; cycle->event_base = mainloop;
g_assert(cycle->event_base); g_assert(cycle->event_base);
event_set(&cetus_channel_event, cetus_channel, EV_READ | EV_PERSIST, cetus_channel_handler, NULL); event_set(&cetus_channel_event, cetus_channel, EV_READ | EV_PERSIST, cetus_channel_handler, cycle);
chassis_event_add(cycle, &cetus_channel_event); chassis_event_add(cycle, &cetus_channel_event);
g_debug("%s: cetus_channel:%d is waiting for read, event base:%p, ev:%p", g_debug("%s: cetus_channel:%d is waiting for read, event base:%p, ev:%p",
G_STRLOC, cetus_channel, cycle->event_base, &cetus_channel_event); G_STRLOC, cetus_channel, cycle->event_base, &cetus_channel_event);
@ -595,8 +596,23 @@ cetus_worker_process_exit(cetus_cycle_t *cycle)
static void static void
process_admin_sql(cetus_channel_t *ch) process_admin_sql(cetus_cycle_t *cycle, cetus_channel_t *ch)
{ {
network_mysqld_con *con = network_mysqld_con_new();
con->plugin_con_state = g_new0(int, 1);
network_socket *client = network_socket_new();
con->client = client;
con->srv = cycle;
g_string_assign_len(con->orig_sql, ch->admin_sql, strlen(ch->admin_sql));
if (cycle->admin_plugin) {
network_socket_retval_t retval = NETWORK_SOCKET_SUCCESS;
NETWORK_MYSQLD_PLUGIN_FUNC(func) = NULL;
network_mysqld_hooks *plugin = cycle->admin_plugin;
func = plugin->con_exectute_sql;
retval = (*func) (cycle, con);
}
} }
static void static void
@ -628,7 +644,7 @@ cetus_channel_handler(int fd, short events, void *user_data)
switch (ch.basics.command) { switch (ch.basics.command) {
case CETUS_CMD_ADMIN: case CETUS_CMD_ADMIN:
process_admin_sql(&ch); process_admin_sql(user_data, &ch);
break; break;
case CETUS_CMD_QUIT: case CETUS_CMD_QUIT:
cetus_quit = 1; cetus_quit = 1;

View File

@ -3,6 +3,7 @@
#define _CETUS_PROCESS_CYCLE_H_INCLUDED_ #define _CETUS_PROCESS_CYCLE_H_INCLUDED_
#include "cetus-process.h" #include "cetus-process.h"
#include "cetus-channel.h"
#define CETUS_PROCESS_SINGLE 0 #define CETUS_PROCESS_SINGLE 0
#define CETUS_PROCESS_MASTER 1 #define CETUS_PROCESS_MASTER 1
@ -13,7 +14,6 @@
void cetus_master_process_cycle(cetus_cycle_t *cycle); void cetus_master_process_cycle(cetus_cycle_t *cycle);
extern unsigned int cetus_process; extern unsigned int cetus_process;
extern unsigned int cetus_worker; extern unsigned int cetus_worker;
extern pid_t cetus_pid; extern pid_t cetus_pid;

View File

@ -4,6 +4,7 @@
#include "cetus-setaffinity.h" #include "cetus-setaffinity.h"
#include "chassis-mainloop.h" #include "chassis-mainloop.h"
#include "network-mysqld.h"
typedef chassis cetus_cycle_t; typedef chassis cetus_cycle_t;

View File

@ -106,6 +106,7 @@ struct chassis {
/**< array(chassis_plugin) */ /**< array(chassis_plugin) */
GPtrArray *modules; GPtrArray *modules;
void *admin_plugin;
/**< base directory for all relative paths referenced */ /**< base directory for all relative paths referenced */
gchar *base_dir; gchar *base_dir;

View File

@ -2352,6 +2352,7 @@ handle_read_query(network_mysqld_con *con, network_mysqld_con_state_t ostate)
} }
WAIT_FOR_EVENT(con->client, EV_READ, &timeout); WAIT_FOR_EVENT(con->client, EV_READ, &timeout);
return DISP_STOP; return DISP_STOP;
case NETWORK_SOCKET_ERROR_RETRY: case NETWORK_SOCKET_ERROR_RETRY:
case NETWORK_SOCKET_ERROR: case NETWORK_SOCKET_ERROR:
@ -5023,7 +5024,7 @@ check_and_create_conns_func(int fd, short what, void *arg)
} }
} }
g_message("%s: check_and_create_conns_func", G_STRLOC); g_debug("%s: check_and_create_conns_func", G_STRLOC);
struct timeval check_interval = {10, 0}; struct timeval check_interval = {10, 0};
chassis_event_add_with_timeout(chas, &chas->auto_create_conns_event, &check_interval); chassis_event_add_with_timeout(chas, &chas->auto_create_conns_event, &check_interval);
} }

View File

@ -176,6 +176,8 @@ typedef struct {
*/ */
NETWORK_MYSQLD_PLUGIN_FUNC(con_cleanup); NETWORK_MYSQLD_PLUGIN_FUNC(con_cleanup);
NETWORK_MYSQLD_PLUGIN_FUNC(con_exectute_sql);
NETWORK_MYSQLD_PLUGIN_FUNC(con_timeout); NETWORK_MYSQLD_PLUGIN_FUNC(con_timeout);
} network_mysqld_hooks; } network_mysqld_hooks;