mirror of
https://gitee.com/wangbin579/cetus.git
synced 2024-11-30 10:57:37 +08:00
1024 lines
32 KiB
C
1024 lines
32 KiB
C
/* $%BEGINLICENSE%$
|
|
Copyright (c) 2007, 2012, Oracle and/or its affiliates. All rights reserved.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License as
|
|
published by the Free Software Foundation; version 2 of the
|
|
License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
|
|
02110-1301 USA
|
|
|
|
$%ENDLICENSE%$ */
|
|
|
|
#ifndef _GNU_SOURCE
|
|
#define _GNU_SOURCE
|
|
#endif
|
|
|
|
#include "admin-plugin.h"
|
|
|
|
#include <ctype.h>
|
|
#include <errno.h>
|
|
#include <fcntl.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
#include "cetus-users.h"
|
|
#include "cetus-util.h"
|
|
#include "cetus-variable.h"
|
|
#include "character-set.h"
|
|
#include "chassis-event.h"
|
|
#include "chassis-options.h"
|
|
#include "cetus-monitor.h"
|
|
#include "network-mysqld-packet.h"
|
|
#include "network-mysqld-proto.h"
|
|
#include "server-session.h"
|
|
#include "sys-pedantic.h"
|
|
#include "network-ssl.h"
|
|
#include "chassis-options-utils.h"
|
|
#include "cetus-channel.h"
|
|
#include "cetus-process-cycle.h"
|
|
#include "resultset_merge.h"
|
|
#include "cetus-acl.h"
|
|
|
|
#include "admin-lexer.l.h"
|
|
#include "admin-parser.y.h"
|
|
#include "admin-commands.h"
|
|
#include "admin-stats.h"
|
|
|
|
chassis_plugin_config *admin_config = NULL;
|
|
|
|
/* get config->has_shard_plugin */
|
|
static gboolean
|
|
has_shard_plugin(GPtrArray *modules)
|
|
{
|
|
int i;
|
|
for (i = 0; i < modules->len; i++) {
|
|
chassis_plugin *plugin = modules->pdata[i];
|
|
if (strcmp(plugin->name, "shard") == 0) {
|
|
return TRUE;
|
|
}
|
|
}
|
|
return FALSE;
|
|
}
|
|
|
|
/* judge if client_ip is in allowed or denied ip range*/
|
|
static gboolean
|
|
ip_range_lookup(GHashTable *ip_table, char *client_ip)
|
|
{
|
|
char ip_range[128] = { 0 };
|
|
char wildcard[128] = { 0 };
|
|
GList *ip_range_table = g_hash_table_get_keys(ip_table);
|
|
GList *l;
|
|
for (l = ip_range_table; l; l = l->next) {
|
|
sscanf(l->data, "%64[0-9.].%s", ip_range, wildcard);
|
|
gchar *pos = NULL;
|
|
if ((pos = strcasestr(client_ip, ip_range))) {
|
|
if(pos == client_ip) {
|
|
return TRUE;
|
|
}
|
|
}
|
|
}
|
|
return FALSE;
|
|
}
|
|
|
|
|
|
NETWORK_MYSQLD_PLUGIN_PROTO(server_con_init)
|
|
{
|
|
network_mysqld_auth_challenge *challenge;
|
|
GString *packet;
|
|
|
|
challenge = network_mysqld_auth_challenge_new();
|
|
challenge->server_version_str = g_strdup("5.7 admin");
|
|
challenge->server_version = 50700;
|
|
challenge->charset = charset_get_number("utf8");
|
|
challenge->capabilities = CETUS_DEFAULT_FLAGS & (~CLIENT_TRANSACTIONS);
|
|
#ifdef HAVE_OPENSSL
|
|
if (chas->ssl) {
|
|
challenge->capabilities |= CLIENT_SSL;
|
|
}
|
|
#endif
|
|
challenge->server_status = SERVER_STATUS_AUTOCOMMIT;
|
|
challenge->thread_id = 1;
|
|
|
|
/* generate a random challenge */
|
|
network_mysqld_auth_challenge_set_challenge(challenge);
|
|
|
|
packet = g_string_new(NULL);
|
|
network_mysqld_proto_append_auth_challenge(packet, challenge);
|
|
con->client->challenge = challenge;
|
|
|
|
network_mysqld_queue_append(con->client, con->client->send_queue, S(packet));
|
|
|
|
g_string_free(packet, TRUE);
|
|
|
|
con->state = ST_SEND_HANDSHAKE;
|
|
|
|
/* status code of parser */
|
|
con->plugin_con_state = g_new0(int, 1);
|
|
|
|
return NETWORK_SOCKET_SUCCESS;
|
|
}
|
|
|
|
NETWORK_MYSQLD_PLUGIN_PROTO(server_read_auth) {
|
|
network_packet packet;
|
|
network_socket *recv_sock, *send_sock;
|
|
network_mysqld_auth_response *auth;
|
|
GString *excepted_response;
|
|
GString *hashed_pwd;
|
|
|
|
recv_sock = con->client;
|
|
send_sock = con->client;
|
|
|
|
packet.data = g_queue_peek_head(recv_sock->recv_queue->chunks);
|
|
if (packet.data == NULL) {
|
|
g_critical("%s: packet.data is nil", G_STRLOC);
|
|
return NETWORK_SOCKET_ERROR;
|
|
}
|
|
|
|
packet.offset = 0;
|
|
|
|
/* decode the packet */
|
|
network_mysqld_proto_skip_network_header(&packet);
|
|
|
|
if (con->client->response == NULL) {
|
|
auth = network_mysqld_auth_response_new(con->client->challenge->capabilities);
|
|
if (network_mysqld_proto_get_auth_response(&packet, auth)) {
|
|
network_mysqld_auth_response_free(auth);
|
|
return NETWORK_SOCKET_ERROR;
|
|
}
|
|
if (!(auth->client_capabilities & CLIENT_PROTOCOL_41)) {
|
|
/* should use packet-id 0 */
|
|
network_mysqld_queue_append(con->client, con->client->send_queue,
|
|
C("\xff\xd7\x07" "4.0 protocol is not supported"));
|
|
network_mysqld_auth_response_free(auth);
|
|
return NETWORK_SOCKET_ERROR;
|
|
}
|
|
|
|
#ifdef HAVE_OPENSSL
|
|
if (auth->ssl_request) {
|
|
if (network_ssl_create_connection(con->client, NETWORK_SSL_SERVER) == FALSE) {
|
|
network_mysqld_con_send_error_full(con->client, C("SSL server failed"), 1045, "28000");
|
|
network_mysqld_auth_response_free(auth);
|
|
return NETWORK_SOCKET_ERROR;
|
|
} else {
|
|
g_string_free(g_queue_pop_tail(con->client->recv_queue->chunks), TRUE);
|
|
con->state = ST_FRONT_SSL_HANDSHAKE;
|
|
return NETWORK_SOCKET_SUCCESS;
|
|
}
|
|
}
|
|
#endif
|
|
con->client->response = auth;
|
|
|
|
if ((auth->client_capabilities & CLIENT_PLUGIN_AUTH)
|
|
&& (g_strcmp0(auth->auth_plugin_name->str, "mysql_native_password") != 0))
|
|
{
|
|
GString *packet = g_string_new(0);
|
|
network_mysqld_proto_append_auth_switch(packet, "mysql_native_password",
|
|
con->client->challenge->auth_plugin_data);
|
|
network_mysqld_queue_append(con->client, con->client->send_queue, S(packet));
|
|
con->state = ST_SEND_AUTH_RESULT;
|
|
con->auth_result_state = AUTH_SWITCH;
|
|
g_string_free(packet, TRUE);
|
|
g_string_free(g_queue_pop_tail(recv_sock->recv_queue->chunks), TRUE);
|
|
return NETWORK_SOCKET_SUCCESS;
|
|
}
|
|
} else {
|
|
/* auth switch response */
|
|
gsize auth_data_len = packet.data->len - NET_HEADER_SIZE;
|
|
GString *auth_data = g_string_sized_new(calculate_alloc_len(auth_data_len));
|
|
network_mysqld_proto_get_gstr_len(&packet, auth_data_len, auth_data);
|
|
g_string_assign_len(con->client->response->auth_plugin_data, S(auth_data));
|
|
g_string_free(auth_data, TRUE);
|
|
auth = con->client->response;
|
|
}
|
|
|
|
char **client_addr_arr = g_strsplit(con->client->src->name->str, ":", -1);
|
|
char *client_ip = client_addr_arr[0];
|
|
char *client_username = con->client->response->username->str;
|
|
|
|
gboolean can_pass = cetus_acl_verify(con->srv->priv->acl, client_username, client_ip);
|
|
if (!can_pass) {
|
|
char *ip_err_msg = g_strdup_printf("Access denied for user '%s@%s'",
|
|
client_username, client_ip);
|
|
network_mysqld_con_send_error_full(recv_sock, L(ip_err_msg), 1045, "28000");
|
|
g_free(ip_err_msg);
|
|
g_strfreev(client_addr_arr);
|
|
con->state = ST_SEND_ERROR;
|
|
return NETWORK_SOCKET_SUCCESS;
|
|
}
|
|
|
|
g_strfreev(client_addr_arr);
|
|
|
|
/* check if the password matches */
|
|
excepted_response = g_string_new(NULL);
|
|
hashed_pwd = g_string_new(NULL);
|
|
|
|
if (!strleq(S(con->client->response->username),
|
|
con->config->admin_username, strlen(con->config->admin_username))) {
|
|
network_mysqld_con_send_error_full(send_sock, C("unknown user"), 1045, "28000");
|
|
|
|
/* close the connection after we have sent this packet */
|
|
con->state = ST_SEND_ERROR;
|
|
} else if (network_mysqld_proto_password_hash(hashed_pwd,
|
|
con->config->admin_password,
|
|
strlen(con->config->admin_password))) {
|
|
} else if (network_mysqld_proto_password_scramble(excepted_response,
|
|
S(recv_sock->challenge->auth_plugin_data), S(hashed_pwd))) {
|
|
network_mysqld_con_send_error_full(send_sock, C("scrambling failed"), 1045, "28000");
|
|
|
|
/* close the connection after we have sent this packet */
|
|
con->state = ST_SEND_ERROR;
|
|
} else if (!g_string_equal(excepted_response, auth->auth_plugin_data)) {
|
|
network_mysqld_con_send_error_full(send_sock, C("password doesn't match"), 1045, "28000");
|
|
|
|
/* close the connection after we have sent this packet */
|
|
con->state = ST_SEND_ERROR;
|
|
} else {
|
|
network_mysqld_con_send_ok(send_sock);
|
|
|
|
con->state = ST_SEND_AUTH_RESULT;
|
|
}
|
|
|
|
g_string_free(hashed_pwd, TRUE);
|
|
g_string_free(excepted_response, TRUE);
|
|
|
|
g_string_free(g_queue_pop_tail(recv_sock->recv_queue->chunks), TRUE);
|
|
|
|
if (recv_sock->recv_queue->chunks->length > 0) {
|
|
g_warning("%s: client-recv-queue-len = %d", G_STRLOC, recv_sock->recv_queue->chunks->length);
|
|
}
|
|
|
|
return NETWORK_SOCKET_SUCCESS;
|
|
}
|
|
|
|
void *adminParserAlloc(void *(*mallocProc)(size_t));
|
|
void adminParserFree(void*, void (*freeProc)(void*));
|
|
void adminParser(void*, int yymajor, token_t, void*);
|
|
void adminParserTrace(FILE*, char*);
|
|
|
|
static void
|
|
network_read_sql_resp(int G_GNUC_UNUSED fd, short events, void *user_data)
|
|
{
|
|
network_mysqld_con *con = user_data;
|
|
|
|
g_debug("%s: network_read_sql_resp, fd:%d", G_STRLOC, fd);
|
|
cetus_channel_t ch;
|
|
|
|
/* read header first */
|
|
int ret = cetus_read_channel(fd, &ch, sizeof(cetus_channel_t));
|
|
|
|
g_debug("%s: cetus_read_channel channel, fd:%d", G_STRLOC, fd);
|
|
|
|
if (ret == NETWORK_SOCKET_ERROR) {
|
|
con->num_read_pending--;
|
|
if (con->num_read_pending > 0) {
|
|
return;
|
|
} else {
|
|
con->srv->socketpair_mutex = 0;
|
|
network_mysqld_con_send_error(con->client, C("internal error"));
|
|
con->state = ST_SEND_QUERY_RESULT;
|
|
network_mysqld_queue_reset(con->client);
|
|
network_queue_clear(con->client->recv_queue);
|
|
network_mysqld_con_handle(-1, 0, con);
|
|
return;
|
|
}
|
|
} else if (ret == NETWORK_SOCKET_WAIT_FOR_EVENT) {
|
|
return;
|
|
}
|
|
|
|
g_debug("%s: channel command: %u, need to read servers:%d",
|
|
G_STRLOC, ch.basics.command, con->num_read_pending);
|
|
|
|
if (ch.basics.command == CETUS_CMD_ADMIN_RESP) {
|
|
con->num_read_pending--;
|
|
int unread_len = ch.admin_sql_resp_len;
|
|
GString *raw_packet = g_string_sized_new(calculate_alloc_len(unread_len));
|
|
unsigned char *p = raw_packet->str;
|
|
|
|
do {
|
|
int len = recv(fd, p, unread_len, 0);
|
|
if (len > 0) {
|
|
g_debug("%s: resp_len:%d, len:%d, fd:%d",
|
|
G_STRLOC, unread_len, len, fd);
|
|
unread_len = unread_len - len;
|
|
p = p + len;
|
|
} else if (len == 0) {
|
|
g_critical("%s: broken socketpair, resp_len:%d, len:%d, fd:%d",
|
|
G_STRLOC, unread_len, len, fd);
|
|
break;
|
|
} else {
|
|
g_message("%s: resp_len:%d, len:%d, fd:%d",
|
|
G_STRLOC, unread_len, len, fd);
|
|
}
|
|
} while (unread_len > 0);
|
|
|
|
network_socket *sock = network_socket_new();
|
|
g_queue_push_tail(sock->recv_queue_raw->chunks, raw_packet);
|
|
|
|
sock->recv_queue_raw->len += ch.admin_sql_resp_len;
|
|
raw_packet->len = ch.admin_sql_resp_len;
|
|
|
|
network_socket_retval_t ret = network_mysqld_con_get_packet(con->srv, sock);
|
|
|
|
while (ret == NETWORK_SOCKET_SUCCESS) {
|
|
network_packet packet;
|
|
GList *chunk;
|
|
|
|
chunk = sock->recv_queue->chunks->tail;
|
|
packet.data = chunk->data;
|
|
packet.offset = 0;
|
|
|
|
int is_finished = network_mysqld_proto_get_query_result(&packet, con);
|
|
if (is_finished == 1) {
|
|
g_debug("%s: read finished", G_STRLOC);
|
|
break;
|
|
}
|
|
|
|
ret = network_mysqld_con_get_packet(con->srv, sock);
|
|
}
|
|
|
|
if (con->servers == NULL) {
|
|
con->servers = g_ptr_array_new();
|
|
}
|
|
|
|
g_ptr_array_add(con->servers, sock);
|
|
|
|
} else {
|
|
g_critical("%s: not admin sql response command", G_STRLOC);
|
|
con->srv->socketpair_mutex = 0;
|
|
network_mysqld_con_send_error(con->client, C("internal error"));
|
|
con->state = ST_SEND_QUERY_RESULT;
|
|
network_mysqld_queue_reset(con->client);
|
|
network_queue_clear(con->client->recv_queue);
|
|
network_mysqld_con_handle(-1, 0, con);
|
|
return;
|
|
}
|
|
|
|
if (con->num_read_pending == 0) {
|
|
con->srv->socketpair_mutex = 0;
|
|
int len = con->servers->len;
|
|
GPtrArray *recv_queues = g_ptr_array_sized_new(len);
|
|
|
|
/* get all participants' receive queues */
|
|
int i;
|
|
for (i = 0; i < len; i++) {
|
|
network_socket *worker = g_ptr_array_index(con->servers, i);
|
|
g_ptr_array_add(recv_queues, worker->recv_queue);
|
|
}
|
|
|
|
GPtrArray *servers = con->servers;
|
|
con->servers = NULL;
|
|
result_merge_t result;
|
|
result.status = RM_SUCCESS;
|
|
result.detail = NULL;
|
|
g_debug("%s: call admin_resultset_merge", G_STRLOC);
|
|
admin_resultset_merge(con, con->client->send_queue, recv_queues, &result);
|
|
g_debug("%s: call admin_resultset_merge end", G_STRLOC);
|
|
con->state = ST_SEND_QUERY_RESULT;
|
|
network_mysqld_queue_reset(con->client);
|
|
network_queue_clear(con->client->recv_queue);
|
|
|
|
for (i = 0; i < len; i++) {
|
|
network_socket *worker = g_ptr_array_index(servers, i);
|
|
network_socket_free(worker);
|
|
}
|
|
g_ptr_array_free(servers, TRUE);
|
|
|
|
if (!con->data) {
|
|
g_ptr_array_free(recv_queues, TRUE);
|
|
}
|
|
network_mysqld_con_handle(-1, 0, con);
|
|
}
|
|
}
|
|
|
|
|
|
static
|
|
int construct_channel_info(network_mysqld_con *con, char *sql)
|
|
{
|
|
chassis *cycle = con->srv;
|
|
g_message("%s:call construct_channel_info, cetus_process_slot:%d", G_STRLOC, cetus_process_slot);
|
|
cetus_channel_t ch;
|
|
memset(&ch, 0, sizeof(cetus_channel_t));
|
|
ch.basics.command = CETUS_CMD_ADMIN;
|
|
ch.basics.pid = cetus_processes[cetus_process_slot].pid;
|
|
ch.basics.slot = cetus_process_slot;
|
|
ch.basics.fd = cetus_processes[cetus_process_slot].parent_child_channel[0];
|
|
|
|
con->num_read_pending = 0;
|
|
|
|
int len = strlen(sql);
|
|
if (len >= MAX_ADMIN_SQL_LEN) {
|
|
g_message("%s:admin sql is too long:%d, sql:%s", G_STRLOC, len, sql);
|
|
network_mysqld_con_send_error(con->client, C("admin sql is too long"));
|
|
return -1;
|
|
} else {
|
|
strncpy(ch.admin_sql, sql, len);
|
|
g_message("%s:cetus_last_process:%d, ch admin sql:%s",
|
|
G_STRLOC, cetus_last_process, ch.admin_sql);
|
|
if (con->ask_the_given_worker) {
|
|
int index = con->process_index;
|
|
g_message("%s: pass sql info to s:%i pid:%d to:%d", G_STRLOC,
|
|
ch.basics.slot, ch.basics.pid, cetus_processes[index].pid);
|
|
/* TODO: AGAIN */
|
|
cetus_write_channel(cetus_processes[index].parent_child_channel[0],
|
|
&ch, sizeof(cetus_channel_t));
|
|
int fd = cetus_processes[index].parent_child_channel[0];
|
|
g_debug("%s:fd:%d for network_read_sql_resp", G_STRLOC, fd);
|
|
event_set(&(cetus_processes[index].event), fd, EV_READ, network_read_sql_resp, con);
|
|
chassis_event_add_with_timeout(cycle, &(cetus_processes[index].event), NULL);
|
|
con->num_read_pending++;
|
|
g_debug("%s:con num_read_pending:%d", G_STRLOC, con->num_read_pending);
|
|
return 0;
|
|
}
|
|
|
|
int num = cetus_last_process;
|
|
if (con->ask_one_worker) {
|
|
num = 1;
|
|
}
|
|
|
|
int i;
|
|
for (i = 0; i < num; i++) {
|
|
g_message("%s: pass sql info to s:%i pid:%d to:%d", G_STRLOC,
|
|
ch.basics.slot, ch.basics.pid, cetus_processes[i].pid);
|
|
|
|
/* TODO: AGAIN */
|
|
cetus_write_channel(cetus_processes[i].parent_child_channel[0],
|
|
&ch, sizeof(cetus_channel_t));
|
|
int fd = cetus_processes[i].parent_child_channel[0];
|
|
g_debug("%s:fd:%d for network_read_sql_resp", G_STRLOC, fd);
|
|
event_set(&(cetus_processes[i].event), fd, EV_READ, network_read_sql_resp, con);
|
|
chassis_event_add_with_timeout(cycle, &(cetus_processes[i].event), NULL);
|
|
con->num_read_pending++;
|
|
}
|
|
g_debug("%s:con num_read_pending:%d", G_STRLOC, con->num_read_pending);
|
|
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
static void
|
|
network_send_admin_sql_to_workers(int G_GNUC_UNUSED fd, short what, void *user_data)
|
|
{
|
|
network_mysqld_con *con = user_data;
|
|
|
|
if (con->srv->socketpair_mutex) {
|
|
event_set(&con->client->event, con->client->fd, EV_TIMEOUT, network_send_admin_sql_to_workers, con);
|
|
struct timeval timeout = {0, 10000};
|
|
chassis_event_add_with_timeout(con->srv, &con->client->event, &timeout);
|
|
return;
|
|
}
|
|
|
|
con->srv->socketpair_mutex = 1;
|
|
if (construct_channel_info(con, con->orig_sql->str) == -1) {
|
|
con->srv->socketpair_mutex = 0;
|
|
con->state = ST_SEND_QUERY_RESULT;
|
|
network_mysqld_queue_reset(con->client);
|
|
network_queue_clear(con->client->recv_queue);
|
|
network_mysqld_con_handle(-1, 0, con);
|
|
}
|
|
|
|
g_debug("%s: network_send_admin_sql_to_workers, fd:%d", G_STRLOC, fd);
|
|
|
|
}
|
|
|
|
|
|
static void visit_parser(network_mysqld_con *con, const char *sql)
|
|
{
|
|
admin_clear_error(con);
|
|
|
|
/* init lexer & parser */
|
|
yyscan_t scanner;
|
|
adminyylex_init(&scanner);
|
|
YY_BUFFER_STATE buf_state = adminyy_scan_string(sql, scanner);
|
|
void* parser = adminParserAlloc(malloc);
|
|
#if 0
|
|
adminParserTrace(stdout, "---ParserTrace: ");
|
|
#endif
|
|
int code;
|
|
int last_parsed_token = 0;
|
|
token_t token;
|
|
while ((code = adminyylex(scanner)) > 0) {
|
|
token.z = adminyyget_text(scanner);
|
|
token.n = adminyyget_leng(scanner);
|
|
adminParser(parser, code, token, con);
|
|
|
|
last_parsed_token = code;
|
|
|
|
if (admin_get_error(con) != 0) {
|
|
break;
|
|
}
|
|
}
|
|
if (admin_get_error(con) == 0) {
|
|
if (last_parsed_token != TK_SEMI) {
|
|
adminParser(parser, TK_SEMI, token, con);
|
|
}
|
|
adminParser(parser, 0, token, con);
|
|
}
|
|
|
|
if (admin_get_error(con) != 0) {
|
|
g_message("%s:syntax error", G_STRLOC);
|
|
network_mysqld_con_send_error(con->client,
|
|
C("syntax error, 'select help' for usage"));
|
|
if (con->is_processed_by_subordinate) {
|
|
con->direct_answer = 1;
|
|
}
|
|
}
|
|
|
|
/* free lexer & parser */
|
|
adminParserFree(parser, free);
|
|
adminyy_delete_buffer(buf_state, scanner);
|
|
adminyylex_destroy(scanner);
|
|
|
|
}
|
|
|
|
NETWORK_MYSQLD_PLUGIN_PROTO(execute_admin_query)
|
|
{
|
|
if (con->config == NULL) {
|
|
con->config = admin_config;
|
|
}
|
|
g_message("%s:call execute_admin_query", G_STRLOC);
|
|
char *sql = con->orig_sql->str;
|
|
|
|
g_message("%s:call execute_admin_query:%s", G_STRLOC, sql);
|
|
|
|
visit_parser(con, sql);
|
|
|
|
return NETWORK_SOCKET_SUCCESS;
|
|
}
|
|
|
|
static network_mysqld_stmt_ret admin_process_query(network_mysqld_con *con)
|
|
{
|
|
network_socket *recv_sock = con->client;
|
|
GList *chunk = recv_sock->recv_queue->chunks->head;
|
|
GString *packet = chunk->data;
|
|
|
|
if (packet->len < NET_HEADER_SIZE) {
|
|
/* packet too short */
|
|
return PROXY_SEND_QUERY;
|
|
}
|
|
|
|
char command = packet->str[NET_HEADER_SIZE + 0];
|
|
|
|
if (COM_QUIT == command) {
|
|
return PROXY_CLIENT_QUIT;
|
|
} else if (COM_QUERY == command) {
|
|
/* we need some more data after the COM_QUERY */
|
|
if (packet->len < NET_HEADER_SIZE + 2) return PROXY_SEND_QUERY;
|
|
}
|
|
|
|
g_string_assign_len(con->orig_sql, packet->str + (NET_HEADER_SIZE + 1),
|
|
packet->len - (NET_HEADER_SIZE + 1));
|
|
g_message("%s:admin sql:%s", G_STRLOC, con->orig_sql->str);
|
|
|
|
con->direct_answer = 0;
|
|
con->ask_one_worker = 0;
|
|
con->ask_the_given_worker = 0;
|
|
con->admin_read_merge = 0;
|
|
|
|
visit_parser(con, con->orig_sql->str);
|
|
if (con->srv->worker_processes == 0) {
|
|
con->direct_answer = 1;
|
|
}
|
|
|
|
if (con->direct_answer) {
|
|
return PROXY_SEND_RESULT;
|
|
} else {
|
|
if (con->srv->socketpair_mutex) {
|
|
event_set(&con->client->event, con->client->fd, EV_TIMEOUT, network_send_admin_sql_to_workers, con);
|
|
struct timeval timeout = {0, 10000};
|
|
chassis_event_add_with_timeout(con->srv, &con->client->event, &timeout);
|
|
return PROXY_WAIT_QUERY_RESULT;
|
|
}
|
|
|
|
con->srv->socketpair_mutex = 1;
|
|
if (construct_channel_info(con, con->orig_sql->str) == -1) {
|
|
con->srv->socketpair_mutex = 0;
|
|
return PROXY_SEND_RESULT;
|
|
} else {
|
|
return PROXY_WAIT_QUERY_RESULT;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* gets called after a query has been read
|
|
*/
|
|
NETWORK_MYSQLD_PLUGIN_PROTO(server_read_query) {
|
|
g_message("%s:call server_read_query", G_STRLOC);
|
|
network_socket *recv_sock;
|
|
network_mysqld_stmt_ret ret;
|
|
|
|
gettimeofday(&(con->req_recv_time), NULL);
|
|
|
|
con->is_admin_client = 1;
|
|
|
|
if (con->srv->worker_processes == 0) {
|
|
con->is_processed_by_subordinate = 0;
|
|
} else {
|
|
con->is_processed_by_subordinate = 1;
|
|
}
|
|
recv_sock = con->client;
|
|
|
|
if (recv_sock->recv_queue->chunks->length != 1) {
|
|
g_message("%s: client-recv-queue-len = %d", G_STRLOC,
|
|
recv_sock->recv_queue->chunks->length);
|
|
}
|
|
|
|
ret = admin_process_query(con);
|
|
|
|
switch (ret) {
|
|
case PROXY_WAIT_QUERY_RESULT:
|
|
return NETWORK_SOCKET_WAIT_FOR_EVENT;
|
|
case PROXY_NO_DECISION:
|
|
network_mysqld_con_send_error(con->client,
|
|
C("request error, \"select * from help\" for usage"));
|
|
con->state = ST_SEND_QUERY_RESULT;
|
|
break;
|
|
case PROXY_SEND_RESULT:
|
|
con->state = ST_SEND_QUERY_RESULT;
|
|
break;
|
|
case PROXY_CLIENT_QUIT:
|
|
con->state = ST_CLIENT_QUIT;
|
|
break;
|
|
default:
|
|
network_mysqld_con_send_error(con->client,
|
|
C("network packet error, closing connection"));
|
|
con->state = ST_SEND_ERROR;
|
|
break;
|
|
}
|
|
|
|
g_string_free(g_queue_pop_tail(recv_sock->recv_queue->chunks), TRUE);
|
|
|
|
return NETWORK_SOCKET_SUCCESS;
|
|
}
|
|
|
|
NETWORK_MYSQLD_PLUGIN_PROTO(server_timeout)
|
|
{
|
|
con->prev_state = con->state;
|
|
con->state = ST_SEND_ERROR;
|
|
|
|
g_debug("%s:call server_timeout", G_STRLOC);
|
|
|
|
return NETWORK_SOCKET_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
* cleanup the admin specific data on the current connection
|
|
*
|
|
* @return NETWORK_SOCKET_SUCCESS
|
|
*/
|
|
NETWORK_MYSQLD_PLUGIN_PROTO(admin_disconnect_client) {
|
|
g_free(con->plugin_con_state);
|
|
con->plugin_con_state = NULL;
|
|
|
|
return NETWORK_SOCKET_SUCCESS;
|
|
}
|
|
|
|
|
|
static int network_mysqld_server_connection_init(network_mysqld_con *con) {
|
|
con->plugins.con_init = server_con_init;
|
|
|
|
con->plugins.con_read_auth = server_read_auth;
|
|
|
|
con->plugins.con_read_query = server_read_query;
|
|
|
|
con->plugins.con_timeout = server_timeout;
|
|
|
|
con->plugins.con_exectute_sql = execute_admin_query;
|
|
|
|
con->plugins.con_cleanup = admin_disconnect_client;
|
|
|
|
return 0;
|
|
}
|
|
|
|
chassis_plugin_config *config;
|
|
|
|
static chassis_plugin_config *network_mysqld_admin_plugin_new(void)
|
|
{
|
|
config = g_new0(chassis_plugin_config, 1);
|
|
|
|
return config;
|
|
}
|
|
|
|
static void network_mysqld_admin_plugin_free(chassis *chas, chassis_plugin_config *config) {
|
|
if (config->listen_con) {
|
|
/* the socket will be freed by network_mysqld_free() */
|
|
}
|
|
|
|
if (config->address) {
|
|
chassis_config_unregister_service(chas->config_manager, config->address);
|
|
g_free(config->address);
|
|
}
|
|
|
|
g_free(config->admin_username);
|
|
g_free(config->admin_password);
|
|
g_free(config->allow_ip);
|
|
g_free(config->deny_ip);
|
|
|
|
if (config->admin_stats) admin_stats_free(config->admin_stats);
|
|
|
|
g_free(config);
|
|
}
|
|
|
|
gchar*
|
|
show_admin_address(gpointer param) {
|
|
struct external_param *opt_param = (struct external_param *)param;
|
|
gint opt_type = opt_param->opt_type;
|
|
if(CAN_SHOW_OPTS_PROPERTY(opt_type)) {
|
|
return g_strdup_printf("%s", config->address != NULL ? config->address: "NULL");
|
|
}
|
|
if(CAN_SAVE_OPTS_PROPERTY(opt_type)) {
|
|
if(config->address != NULL) {
|
|
return g_strdup_printf("%s", config->address);
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
gchar*
|
|
show_admin_username(gpointer param) {
|
|
struct external_param *opt_param = (struct external_param *)param;
|
|
gint opt_type = opt_param->opt_type;
|
|
if(CAN_SHOW_OPTS_PROPERTY(opt_type)) {
|
|
return g_strdup_printf("%s", config->admin_username != NULL ? config->admin_username: "NULL");
|
|
}
|
|
if(CAN_SAVE_OPTS_PROPERTY(opt_type)) {
|
|
if(config->admin_username != NULL) {
|
|
return g_strdup_printf("%s", config->admin_username);
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
gchar*
|
|
show_admin_password(gpointer param) {
|
|
struct external_param *opt_param = (struct external_param *)param;
|
|
gint opt_type = opt_param->opt_type;
|
|
|
|
if(CAN_SHOW_OPTS_PROPERTY(opt_type)) {
|
|
GString* hashed_pwd = g_string_new(0);
|
|
network_mysqld_proto_password_hash(hashed_pwd, L(config->admin_password));
|
|
char* pwdhex = g_malloc0(hashed_pwd->len * 2 + 10);
|
|
bytes_to_hex_str(hashed_pwd->str, hashed_pwd->len, pwdhex);
|
|
g_string_free(hashed_pwd, TRUE);
|
|
return pwdhex;
|
|
}
|
|
if(CAN_SAVE_OPTS_PROPERTY(opt_type)) {
|
|
return g_strdup(config->admin_password);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
/**
|
|
* add the proxy specific options to the cmdline interface
|
|
*/
|
|
static GList *
|
|
network_mysqld_admin_plugin_get_options(chassis_plugin_config *config)
|
|
{
|
|
chassis_options_t opts = { 0 };
|
|
|
|
chassis_options_add(&opts, "admin-address",
|
|
0, 0, OPTION_ARG_STRING, &(config->address),
|
|
"listening address:port of the admin-server (default: :4041)", "<host:port>",
|
|
NULL, show_admin_address, SHOW_OPTS_PROPERTY|SAVE_OPTS_PROPERTY);
|
|
|
|
chassis_options_add(&opts, "admin-username",
|
|
0, 0, OPTION_ARG_STRING, &(config->admin_username), "username to allow to log in", "<string>",
|
|
NULL, show_admin_username, SHOW_OPTS_PROPERTY|SAVE_OPTS_PROPERTY);
|
|
|
|
chassis_options_add(&opts, "admin-password",
|
|
0, 0, OPTION_ARG_STRING, &(config->admin_password), "password to allow to log in", "<string>",
|
|
NULL, show_admin_password, SHOW_OPTS_PROPERTY|SAVE_OPTS_PROPERTY);
|
|
|
|
chassis_options_add(&opts, "admin-allow-ip",
|
|
0, 0, OPTION_ARG_STRING, &(config->allow_ip),
|
|
"ip address allowed to connect to admin", "<string>",
|
|
NULL, NULL, SAVE_OPTS_PROPERTY);
|
|
chassis_options_add(&opts, "admin-deny-ip",
|
|
0, 0, OPTION_ARG_STRING, &(config->deny_ip),
|
|
"ip address denyed to connect to admin", "<string>",
|
|
NULL, NULL, SAVE_OPTS_PROPERTY);
|
|
|
|
return opts.options;
|
|
}
|
|
|
|
#define MAX_CMD_OR_PATH_LEN 108
|
|
static void remove_unix_socket_if_stale(chassis *chas)
|
|
{
|
|
char command[MAX_CMD_OR_PATH_LEN];
|
|
|
|
memset(command, 0, MAX_CMD_OR_PATH_LEN);
|
|
|
|
sprintf(command, "netstat -npl|grep '%s'", chas->proxy_address);
|
|
|
|
FILE *p = popen(command, "r");
|
|
|
|
if (p) {
|
|
char result[256];
|
|
int count = fread(result, 1, sizeof(result), p);
|
|
|
|
if (count == 0) {
|
|
g_message("%s:call unlink", G_STRLOC);
|
|
/* no matter if it does not exist */
|
|
unlink(chas->unix_socket_name);
|
|
}
|
|
pclose(p);
|
|
}
|
|
|
|
}
|
|
|
|
static int
|
|
check_allowed_running(chassis *chas)
|
|
{
|
|
char buffer[MAX_CMD_OR_PATH_LEN];
|
|
|
|
if (strlen(chas->proxy_address) > (MAX_CMD_OR_PATH_LEN / 2)) {
|
|
g_message("%s:ip:port string is too long", G_STRLOC);
|
|
return -1;
|
|
}
|
|
|
|
memset(buffer, 0 ,MAX_CMD_OR_PATH_LEN);
|
|
|
|
sprintf(buffer, "/tmp/%s", chas->proxy_address);
|
|
chas->unix_socket_name = g_strdup(buffer);
|
|
|
|
remove_unix_socket_if_stale(chas);
|
|
|
|
int fd;
|
|
|
|
if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
|
|
g_message("%s:create socket error", G_STRLOC);
|
|
return -1;
|
|
}
|
|
|
|
|
|
struct sockaddr_un un;
|
|
memset(&un, 0, sizeof(un));
|
|
un.sun_family = AF_UNIX;
|
|
const char *name = chas->unix_socket_name;
|
|
if (strlen(name) >= sizeof(un.sun_path)) {
|
|
strncpy(un.sun_path, name, sizeof(un.sun_path) - 1);
|
|
} else {
|
|
strncpy(un.sun_path, name, strlen(name));
|
|
}
|
|
int len = offsetof(struct sockaddr_un, sun_path) + strlen(name);
|
|
|
|
if (bind(fd, (struct sockaddr *)&un, len) < 0) {
|
|
g_message("%s:already running", G_STRLOC);
|
|
close(fd);
|
|
return -1;
|
|
}
|
|
|
|
close(fd);
|
|
return 0;
|
|
}
|
|
|
|
|
|
/**
|
|
* init the plugin with the parsed config
|
|
*/
|
|
static int
|
|
network_mysqld_admin_plugin_apply_config(chassis *chas,
|
|
chassis_plugin_config *config)
|
|
{
|
|
if (config->listen_con) {
|
|
g_message("%s:close listen socket", G_STRLOC);
|
|
config->listen_con->server->event.ev_base = NULL;
|
|
network_socket_free(config->listen_con->server);
|
|
config->listen_con = NULL;
|
|
return 0;
|
|
}
|
|
|
|
g_message("%s:call network_mysqld_admin_plugin_apply_config", G_STRLOC);
|
|
network_mysqld_con *con;
|
|
network_socket *listen_sock;
|
|
|
|
if (!config->address) {
|
|
config->address = g_strdup(":4041");
|
|
}
|
|
|
|
chas->proxy_address = config->address;
|
|
g_message("set admin address for chassis:%s", config->address);
|
|
|
|
#if defined(SO_REUSEPORT)
|
|
if (chas->enable_admin_listen) {
|
|
if (check_allowed_running(chas) == -1) {
|
|
return -1;
|
|
}
|
|
}
|
|
#endif
|
|
|
|
if (!config->admin_username) {
|
|
g_critical("%s: --admin-username needs to be set",
|
|
G_STRLOC);
|
|
return -1;
|
|
}
|
|
if (!config->admin_password) {
|
|
g_critical("%s: --admin-password needs to be set",
|
|
G_STRLOC);
|
|
return -1;
|
|
}
|
|
if (!g_strcmp0(config->admin_password, "")) {
|
|
g_critical("%s: --admin-password cannot be empty",
|
|
G_STRLOC);
|
|
return -1;
|
|
}
|
|
|
|
if (config->allow_ip) {
|
|
cetus_acl_add_rules(chas->priv->acl, ACL_WHITELIST, config->allow_ip);
|
|
}
|
|
|
|
if (config->deny_ip) {
|
|
cetus_acl_add_rules(chas->priv->acl, ACL_BLACKLIST, config->deny_ip);
|
|
}
|
|
con = network_mysqld_con_new();
|
|
con->config = config;
|
|
network_mysqld_add_connection(chas, con, TRUE);
|
|
|
|
/**
|
|
* create a connection handle for the listen socket
|
|
*/
|
|
if (chas->enable_admin_listen) {
|
|
g_message("%s:enable_admin_listen true", G_STRLOC);
|
|
config->listen_con = con;
|
|
listen_sock = network_socket_new();
|
|
con->server = listen_sock;
|
|
}
|
|
|
|
g_message("%s:before set hooks", G_STRLOC);
|
|
|
|
/*
|
|
* set the plugin hooks as we want to apply them to the new
|
|
* connections too later
|
|
*/
|
|
network_mysqld_server_connection_init(con);
|
|
|
|
g_message("%s:after set hooks", G_STRLOC);
|
|
|
|
if (chas->enable_admin_listen) {
|
|
/* FIXME: network_socket_set_address() */
|
|
if (0 != network_address_set_address(listen_sock->dst,
|
|
config->address))
|
|
{
|
|
return -1;
|
|
}
|
|
|
|
if (0 != network_socket_bind(listen_sock, 1)) {
|
|
return -1;
|
|
}
|
|
|
|
/**
|
|
* call network_mysqld_con_accept() with this connection when we are done
|
|
*/
|
|
event_set(&(listen_sock->event), listen_sock->fd,
|
|
EV_READ|EV_PERSIST, network_mysqld_con_accept, con);
|
|
chassis_event_add(chas, &(listen_sock->event));
|
|
}
|
|
|
|
chas->admin_plugin = &(con->plugins);
|
|
|
|
config->has_shard_plugin = has_shard_plugin(chas->modules);
|
|
|
|
chassis_config_register_service(chas->config_manager, config->address, "admin");
|
|
config->admin_stats = admin_stats_init(chas);
|
|
|
|
admin_config = config;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
network_mysqld_admin_plugin_stop_listening(chassis *chas,
|
|
chassis_plugin_config *config)
|
|
{
|
|
g_message("%s:call network_mysqld_admin_plugin_stop_listening", G_STRLOC);
|
|
if (config->listen_con) {
|
|
g_message("%s:close listen socket:%d", G_STRLOC, config->listen_con->server->fd);
|
|
network_socket_free(config->listen_con->server);
|
|
config->listen_con = NULL;
|
|
}
|
|
}
|
|
|
|
|
|
G_MODULE_EXPORT int plugin_init(chassis_plugin *p) {
|
|
p->magic = CHASSIS_PLUGIN_MAGIC;
|
|
p->name = g_strdup("admin");
|
|
p->version = g_strdup(PLUGIN_VERSION);
|
|
|
|
p->init = network_mysqld_admin_plugin_new;
|
|
p->get_options = network_mysqld_admin_plugin_get_options;
|
|
p->apply_config = network_mysqld_admin_plugin_apply_config;
|
|
p->stop_listening = network_mysqld_admin_plugin_stop_listening;
|
|
p->destroy = network_mysqld_admin_plugin_free;
|
|
|
|
return 0;
|
|
}
|