cetus/plugins/admin/admin-plugin.c

930 lines
29 KiB
C
Raw Normal View History

2018-03-14 11:27:27 +08:00
/* $%BEGINLICENSE%$
Copyright (c) 2007, 2012, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; version 2 of the
License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
$%ENDLICENSE%$ */
2018-03-21 14:50:34 +08:00
#ifndef _GNU_SOURCE
2018-03-15 09:49:40 +08:00
#define _GNU_SOURCE
2018-03-21 14:50:34 +08:00
#endif
#include "admin-plugin.h"
2018-03-06 14:00:39 +08:00
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include "cetus-users.h"
#include "cetus-util.h"
#include "cetus-variable.h"
#include "character-set.h"
#include "chassis-event.h"
#include "chassis-options.h"
#include "cetus-monitor.h"
#include "network-mysqld-packet.h"
#include "network-mysqld-proto.h"
#include "server-session.h"
#include "sys-pedantic.h"
#include "network-ssl.h"
#include "chassis-options-utils.h"
2018-07-04 14:13:50 +08:00
#include "cetus-channel.h"
#include "cetus-process-cycle.h"
2018-07-17 10:52:20 +08:00
#include "resultset_merge.h"
2018-08-21 19:04:33 +08:00
#include "cetus-acl.h"
2018-03-06 14:00:39 +08:00
#include "admin-lexer.l.h"
2018-06-05 11:38:19 +08:00
#include "admin-parser.y.h"
#include "admin-commands.h"
#include "admin-stats.h"
2018-03-06 14:00:39 +08:00
chassis_plugin_config *admin_config = NULL;
2018-03-06 14:00:39 +08:00
/* get config->has_shard_plugin */
2018-03-20 14:19:44 +08:00
static gboolean
has_shard_plugin(GPtrArray *modules)
2018-03-06 14:00:39 +08:00
{
int i;
for (i = 0; i < modules->len; i++) {
chassis_plugin *plugin = modules->pdata[i];
if (strcmp(plugin->name, "shard") == 0) {
return TRUE;
}
}
return FALSE;
}
/* judge if client_ip is in allowed or denied ip range*/
static gboolean
ip_range_lookup(GHashTable *ip_table, char *client_ip)
{
char ip_range[128] = { 0 };
char wildcard[128] = { 0 };
GList *ip_range_table = g_hash_table_get_keys(ip_table);
GList *l;
for (l = ip_range_table; l; l = l->next) {
sscanf(l->data, "%64[0-9.].%s", ip_range, wildcard);
gchar *pos = NULL;
if ((pos = strcasestr(client_ip, ip_range))) {
if(pos == client_ip) {
return TRUE;
}
}
}
return FALSE;
}
2018-03-20 14:19:44 +08:00
NETWORK_MYSQLD_PLUGIN_PROTO(server_con_init)
{
2018-03-06 14:00:39 +08:00
network_mysqld_auth_challenge *challenge;
GString *packet;
challenge = network_mysqld_auth_challenge_new();
challenge->server_version_str = g_strdup("5.7 admin");
challenge->server_version = 50700;
challenge->charset = charset_get_number("utf8");
2018-05-16 17:11:23 +08:00
challenge->capabilities = CETUS_DEFAULT_FLAGS & (~CLIENT_TRANSACTIONS);
#ifdef HAVE_OPENSSL
if (chas->ssl) {
challenge->capabilities |= CLIENT_SSL;
}
#endif
challenge->server_status = SERVER_STATUS_AUTOCOMMIT;
challenge->thread_id = 1;
2018-03-06 14:00:39 +08:00
/* generate a random challenge */
network_mysqld_auth_challenge_set_challenge(challenge);
packet = g_string_new(NULL);
network_mysqld_proto_append_auth_challenge(packet, challenge);
con->client->challenge = challenge;
network_mysqld_queue_append(con->client, con->client->send_queue, S(packet));
g_string_free(packet, TRUE);
con->state = ST_SEND_HANDSHAKE;
/* status code of parser */
con->plugin_con_state = g_new0(int, 1);
2018-03-06 14:00:39 +08:00
return NETWORK_SOCKET_SUCCESS;
}
NETWORK_MYSQLD_PLUGIN_PROTO(server_read_auth) {
2018-03-06 14:00:39 +08:00
network_packet packet;
network_socket *recv_sock, *send_sock;
network_mysqld_auth_response *auth;
GString *excepted_response;
GString *hashed_pwd;
2018-03-06 14:00:39 +08:00
recv_sock = con->client;
send_sock = con->client;
2018-03-06 14:00:39 +08:00
packet.data = g_queue_peek_head(recv_sock->recv_queue->chunks);
if (packet.data == NULL) {
g_critical("%s: packet.data is nil", G_STRLOC);
return NETWORK_SOCKET_ERROR;
}
2018-03-06 14:00:39 +08:00
packet.offset = 0;
2018-03-06 14:00:39 +08:00
/* decode the packet */
network_mysqld_proto_skip_network_header(&packet);
if (con->client->response == NULL) {
auth = network_mysqld_auth_response_new(con->client->challenge->capabilities);
if (network_mysqld_proto_get_auth_response(&packet, auth)) {
network_mysqld_auth_response_free(auth);
return NETWORK_SOCKET_ERROR;
}
if (!(auth->client_capabilities & CLIENT_PROTOCOL_41)) {
/* should use packet-id 0 */
network_mysqld_queue_append(con->client, con->client->send_queue,
C("\xff\xd7\x07" "4.0 protocol is not supported"));
network_mysqld_auth_response_free(auth);
return NETWORK_SOCKET_ERROR;
}
2018-03-06 14:00:39 +08:00
#ifdef HAVE_OPENSSL
if (auth->ssl_request) {
if (network_ssl_create_connection(con->client, NETWORK_SSL_SERVER) == FALSE) {
network_mysqld_con_send_error_full(con->client, C("SSL server failed"), 1045, "28000");
network_mysqld_auth_response_free(auth);
return NETWORK_SOCKET_ERROR;
} else {
g_string_free(g_queue_pop_tail(con->client->recv_queue->chunks), TRUE);
con->state = ST_FRONT_SSL_HANDSHAKE;
return NETWORK_SOCKET_SUCCESS;
}
}
#endif
con->client->response = auth;
if ((auth->client_capabilities & CLIENT_PLUGIN_AUTH)
&& (g_strcmp0(auth->auth_plugin_name->str, "mysql_native_password") != 0))
{
GString *packet = g_string_new(0);
network_mysqld_proto_append_auth_switch(packet, "mysql_native_password",
con->client->challenge->auth_plugin_data);
network_mysqld_queue_append(con->client, con->client->send_queue, S(packet));
con->state = ST_SEND_AUTH_RESULT;
con->auth_result_state = AUTH_SWITCH;
g_string_free(packet, TRUE);
g_string_free(g_queue_pop_tail(recv_sock->recv_queue->chunks), TRUE);
return NETWORK_SOCKET_SUCCESS;
2018-03-28 17:05:12 +08:00
}
} else {
/* auth switch response */
gsize auth_data_len = packet.data->len - 4;
GString *auth_data = g_string_sized_new(auth_data_len);
network_mysqld_proto_get_gstr_len(&packet, auth_data_len, auth_data);
g_string_assign_len(con->client->response->auth_plugin_data, S(auth_data));
g_string_free(auth_data, TRUE);
auth = con->client->response;
}
2018-03-06 14:00:39 +08:00
2018-08-21 19:04:33 +08:00
/* check if the password matches */
excepted_response = g_string_new(NULL);
hashed_pwd = g_string_new(NULL);
if (!strleq(S(con->client->response->username),
con->config->admin_username, strlen(con->config->admin_username))) {
network_mysqld_con_send_error_full(send_sock, C("unknown user"), 1045, "28000");
/* close the connection after we have sent this packet */
2018-03-06 14:00:39 +08:00
con->state = ST_SEND_ERROR;
2018-08-21 19:04:33 +08:00
} else if (network_mysqld_proto_password_hash(hashed_pwd,
con->config->admin_password,
strlen(con->config->admin_password))) {
} else if (network_mysqld_proto_password_scramble(excepted_response,
S(recv_sock->challenge->auth_plugin_data), S(hashed_pwd))) {
network_mysqld_con_send_error_full(send_sock, C("scrambling failed"), 1045, "28000");
/* close the connection after we have sent this packet */
con->state = ST_SEND_ERROR;
} else if (!g_string_equal(excepted_response, auth->auth_plugin_data)) {
network_mysqld_con_send_error_full(send_sock, C("password doesn't match"), 1045, "28000");
2018-03-06 14:00:39 +08:00
2018-08-21 19:04:33 +08:00
/* close the connection after we have sent this packet */
con->state = ST_SEND_ERROR;
} else {
network_mysqld_con_send_ok(send_sock);
2018-03-06 14:00:39 +08:00
2018-08-21 19:04:33 +08:00
con->state = ST_SEND_AUTH_RESULT;
}
2018-08-21 19:04:33 +08:00
g_string_free(hashed_pwd, TRUE);
g_string_free(excepted_response, TRUE);
2018-03-06 14:00:39 +08:00
g_string_free(g_queue_pop_tail(recv_sock->recv_queue->chunks), TRUE);
if (recv_sock->recv_queue->chunks->length > 0) {
2018-03-20 14:19:44 +08:00
g_warning("%s: client-recv-queue-len = %d", G_STRLOC, recv_sock->recv_queue->chunks->length);
}
2018-03-06 14:00:39 +08:00
return NETWORK_SOCKET_SUCCESS;
}
void *adminParserAlloc(void *(*mallocProc)(size_t));
void adminParserFree(void*, void (*freeProc)(void*));
void adminParser(void*, int yymajor, token_t, void*);
void adminParserTrace(FILE*, char*);
2018-03-06 14:00:39 +08:00
2018-07-16 18:37:36 +08:00
static void
2018-07-17 10:52:20 +08:00
network_read_sql_resp(int G_GNUC_UNUSED fd, short events, void *user_data)
2018-07-16 18:37:36 +08:00
{
2018-07-17 10:52:20 +08:00
network_mysqld_con *con = user_data;
g_debug("%s: network_read_sql_resp, fd:%d", G_STRLOC, fd);
2018-07-25 14:49:28 +08:00
cetus_channel_t ch;
2018-07-17 10:52:20 +08:00
/* read header first */
int ret = cetus_read_channel(fd, &ch, sizeof(cetus_channel_t));
g_debug("%s: cetus_read_channel channel, fd:%d", G_STRLOC, fd);
if (ret == NETWORK_SOCKET_ERROR) {
return;
}
if (ret == NETWORK_SOCKET_WAIT_FOR_EVENT) {
return;
}
g_debug("%s: channel command: %u, need to read servers:%d",
G_STRLOC, ch.basics.command, con->num_read_pending);
if (ch.basics.command == CETUS_CMD_ADMIN_RESP) {
con->num_read_pending--;
2018-07-26 10:11:23 +08:00
int unread_len = ch.admin_sql_resp_len;
GString *raw_packet = g_string_sized_new(unread_len);
unsigned char *p = raw_packet->str;
do {
2018-08-21 14:13:07 +08:00
int len = recv(fd, p, unread_len, 0);
2018-07-26 10:11:23 +08:00
if (len > 0) {
g_debug("%s: resp_len:%d, len:%d, fd:%d",
G_STRLOC, unread_len, len, fd);
unread_len = unread_len - len;
p = p + len;
} else if (len == 0) {
g_critical("%s: broken socketpair, resp_len:%d, len:%d, fd:%d",
G_STRLOC, unread_len, len, fd);
break;
} else {
g_message("%s: resp_len:%d, len:%d, fd:%d",
G_STRLOC, unread_len, len, fd);
}
} while (unread_len > 0);
2018-07-17 10:52:20 +08:00
network_socket *sock = network_socket_new();
2018-07-25 14:49:28 +08:00
g_queue_push_tail(sock->recv_queue_raw->chunks, raw_packet);
2018-07-17 10:52:20 +08:00
2018-07-26 10:11:23 +08:00
sock->recv_queue_raw->len += ch.admin_sql_resp_len;
raw_packet->len = ch.admin_sql_resp_len;
2018-07-17 10:52:20 +08:00
network_socket_retval_t ret = network_mysqld_con_get_packet(con->srv, sock);
while (ret == NETWORK_SOCKET_SUCCESS) {
network_packet packet;
GList *chunk;
chunk = sock->recv_queue->chunks->tail;
packet.data = chunk->data;
packet.offset = 0;
int is_finished = network_mysqld_proto_get_query_result(&packet, con);
if (is_finished == 1) {
g_debug("%s: read finished", G_STRLOC);
break;
}
ret = network_mysqld_con_get_packet(con->srv, sock);
}
if (con->servers == NULL) {
con->servers = g_ptr_array_new();
}
g_ptr_array_add(con->servers, sock);
} else {
2018-07-26 10:11:23 +08:00
g_critical("%s: not admin sql response command", G_STRLOC);
return;
2018-07-17 10:52:20 +08:00
}
if (con->num_read_pending == 0) {
int len = con->servers->len;
GPtrArray *recv_queues = g_ptr_array_sized_new(len);
/* get all participants' receive queues */
int i;
for (i = 0; i < len; i++) {
network_socket *worker = g_ptr_array_index(con->servers, i);
g_ptr_array_add(recv_queues, worker->recv_queue);
}
/*TODO free all servers here */
2018-07-26 10:11:23 +08:00
GPtrArray *servers = con->servers;
2018-07-17 10:52:20 +08:00
con->servers = NULL;
result_merge_t result;
result.status = RM_SUCCESS;
result.detail = NULL;
g_debug("%s: call admin_resultset_merge", G_STRLOC);
admin_resultset_merge(con, con->client->send_queue, recv_queues, &result);
g_debug("%s: call admin_resultset_merge end", G_STRLOC);
2018-07-17 10:52:20 +08:00
con->state = ST_SEND_QUERY_RESULT;
network_mysqld_queue_reset(con->client);
network_queue_clear(con->client->recv_queue);
2018-07-26 10:11:23 +08:00
for (i = 0; i < len; i++) {
network_socket *worker = g_ptr_array_index(servers, i);
network_socket_free(worker);
}
g_ptr_array_free(servers, TRUE);
2018-07-17 10:52:20 +08:00
network_mysqld_con_handle(-1, 0, con);
}
2018-07-16 18:37:36 +08:00
}
2018-07-04 14:13:50 +08:00
static
2018-08-06 14:37:13 +08:00
int construct_channel_info(network_mysqld_con *con, char *sql)
2018-03-06 14:00:39 +08:00
{
2018-07-17 10:52:20 +08:00
chassis *cycle = con->srv;
2018-07-24 18:17:20 +08:00
g_message("%s:call construct_channel_info, cetus_process_slot:%d", G_STRLOC, cetus_process_slot);
2018-07-04 14:13:50 +08:00
cetus_channel_t ch;
memset(&ch, 0, sizeof(cetus_channel_t));
2018-07-05 15:19:53 +08:00
ch.basics.command = CETUS_CMD_ADMIN;
ch.basics.pid = cetus_processes[cetus_process_slot].pid;
ch.basics.slot = cetus_process_slot;
2018-07-25 14:49:28 +08:00
ch.basics.fd = cetus_processes[cetus_process_slot].parent_child_channel[0];
2018-07-05 15:19:53 +08:00
2018-07-17 10:52:20 +08:00
con->num_read_pending = 0;
2018-07-05 15:19:53 +08:00
int len = strlen(sql);
if (len >= MAX_ADMIN_SQL_LEN) {
2018-08-06 14:37:13 +08:00
g_message("%s:admin sql is too long:%d, sql:%s", G_STRLOC, len, sql);
network_mysqld_con_send_error(con->client, C("admin sql is too long"));
return -1;
2018-07-05 15:19:53 +08:00
} else {
strncpy(ch.admin_sql, sql, len);
2018-07-19 09:25:06 +08:00
g_message("%s:cetus_last_process:%d, ch admin sql:%s",
G_STRLOC, cetus_last_process, ch.admin_sql);
2018-08-20 15:19:00 +08:00
if (con->ask_the_given_worker) {
int index = con->process_index;
g_message("%s: pass sql info to s:%i pid:%d to:%d", G_STRLOC,
ch.basics.slot, ch.basics.pid, cetus_processes[index].pid);
/* TODO: AGAIN */
cetus_write_channel(cetus_processes[index].parent_child_channel[0],
&ch, sizeof(cetus_channel_t));
int fd = cetus_processes[index].parent_child_channel[0];
g_debug("%s:fd:%d for network_read_sql_resp", G_STRLOC, fd);
event_set(&(cetus_processes[index].event), fd, EV_READ, network_read_sql_resp, con);
chassis_event_add_with_timeout(cycle, &(cetus_processes[index].event), NULL);
con->num_read_pending++;
g_debug("%s:con num_read_pending:%d", G_STRLOC, con->num_read_pending);
return 0;
}
2018-07-19 11:00:33 +08:00
int num = cetus_last_process;
if (con->ask_one_worker) {
num = 1;
}
2018-08-20 15:19:00 +08:00
2018-07-05 15:19:53 +08:00
int i;
2018-07-19 11:00:33 +08:00
for (i = 0; i < num; i++) {
2018-07-16 18:37:36 +08:00
g_message("%s: pass sql info to s:%i pid:%d to:%d", G_STRLOC,
2018-07-05 15:19:53 +08:00
ch.basics.slot, ch.basics.pid, cetus_processes[i].pid);
/* TODO: AGAIN */
2018-07-25 14:49:28 +08:00
cetus_write_channel(cetus_processes[i].parent_child_channel[0],
2018-07-05 15:19:53 +08:00
&ch, sizeof(cetus_channel_t));
2018-07-25 14:49:28 +08:00
int fd = cetus_processes[i].parent_child_channel[0];
g_debug("%s:fd:%d for network_read_sql_resp", G_STRLOC, fd);
2018-07-17 10:52:20 +08:00
event_set(&(cetus_processes[i].event), fd, EV_READ, network_read_sql_resp, con);
2018-07-16 18:37:36 +08:00
chassis_event_add_with_timeout(cycle, &(cetus_processes[i].event), NULL);
2018-07-17 10:52:20 +08:00
con->num_read_pending++;
2018-07-05 15:19:53 +08:00
}
2018-07-24 18:17:20 +08:00
g_debug("%s:con num_read_pending:%d", G_STRLOC, con->num_read_pending);
2018-08-06 14:37:13 +08:00
return 0;
2018-07-05 15:19:53 +08:00
}
2018-07-04 14:13:50 +08:00
}
2018-03-06 14:00:39 +08:00
2018-07-19 11:00:33 +08:00
static void visit_parser(network_mysqld_con *con, const char *sql)
2018-07-04 14:13:50 +08:00
{
2018-07-19 11:00:33 +08:00
admin_clear_error(con);
2018-07-05 15:19:53 +08:00
/* init lexer & parser */
yyscan_t scanner;
adminyylex_init(&scanner);
YY_BUFFER_STATE buf_state = adminyy_scan_string(sql, scanner);
void* parser = adminParserAlloc(malloc);
#if 0
adminParserTrace(stdout, "---ParserTrace: ");
#endif
int code;
2018-07-27 16:31:50 +08:00
int last_parsed_token = 0;
token_t token;
while ((code = adminyylex(scanner)) > 0) {
token.z = adminyyget_text(scanner);
token.n = adminyyget_leng(scanner);
adminParser(parser, code, token, con);
2018-03-06 14:00:39 +08:00
2018-06-05 11:38:19 +08:00
last_parsed_token = code;
2018-03-06 14:00:39 +08:00
if (admin_get_error(con) != 0) {
break;
}
}
if (admin_get_error(con) == 0) {
2018-06-05 11:38:19 +08:00
if (last_parsed_token != TK_SEMI) {
adminParser(parser, TK_SEMI, token, con);
2018-03-06 14:00:39 +08:00
}
adminParser(parser, 0, token, con);
2018-03-06 14:00:39 +08:00
}
if (admin_get_error(con) != 0) {
2018-07-05 15:19:53 +08:00
g_message("%s:syntax error", G_STRLOC);
network_mysqld_con_send_error(con->client,
C("syntax error, 'select help' for usage"));
2018-07-19 11:00:33 +08:00
if (con->is_admin_client) {
con->direct_answer = 1;
}
2018-03-06 14:00:39 +08:00
}
/* free lexer & parser */
adminParserFree(parser, free);
adminyy_delete_buffer(buf_state, scanner);
adminyylex_destroy(scanner);
2018-07-19 11:00:33 +08:00
}
NETWORK_MYSQLD_PLUGIN_PROTO(execute_admin_query)
{
if (con->config == NULL) {
con->config = admin_config;
}
g_message("%s:call execute_admin_query", G_STRLOC);
char *sql = con->orig_sql->str;
g_message("%s:call execute_admin_query:%s", G_STRLOC, sql);
visit_parser(con, sql);
2018-08-20 17:46:43 +08:00
return NETWORK_SOCKET_SUCCESS;
2018-07-04 14:13:50 +08:00
}
static network_mysqld_stmt_ret admin_process_query(network_mysqld_con *con)
{
network_socket *recv_sock = con->client;
GList *chunk = recv_sock->recv_queue->chunks->head;
GString *packet = chunk->data;
if (packet->len < NET_HEADER_SIZE) {
/* packet too short */
return PROXY_SEND_QUERY;
}
char command = packet->str[NET_HEADER_SIZE + 0];
if (COM_QUIT == command) {
return PROXY_CLIENT_QUIT;
} else if (COM_QUERY == command) {
2018-07-04 14:13:50 +08:00
/* we need some more data after the COM_QUERY */
if (packet->len < NET_HEADER_SIZE + 2) return PROXY_SEND_QUERY;
}
g_string_assign_len(con->orig_sql, packet->str + (NET_HEADER_SIZE + 1),
packet->len - (NET_HEADER_SIZE + 1));
2018-07-04 17:07:11 +08:00
g_message("%s:admin sql:%s", G_STRLOC, con->orig_sql->str);
2018-07-19 11:00:33 +08:00
con->direct_answer = 0;
con->ask_one_worker = 0;
2018-08-20 15:19:00 +08:00
con->ask_the_given_worker = 0;
2018-07-19 11:00:33 +08:00
con->admin_read_merge = 0;
visit_parser(con, con->orig_sql->str);
if (con->direct_answer) {
return PROXY_SEND_RESULT;
} else {
2018-08-06 14:37:13 +08:00
if (construct_channel_info(con, con->orig_sql->str) == -1) {
return PROXY_SEND_RESULT;
} else {
return PROXY_WAIT_QUERY_RESULT;
}
2018-07-19 11:00:33 +08:00
}
2018-03-06 14:00:39 +08:00
}
/**
* gets called after a query has been read
*/
NETWORK_MYSQLD_PLUGIN_PROTO(server_read_query) {
2018-07-04 14:13:50 +08:00
g_message("%s:call server_read_query", G_STRLOC);
2018-03-06 14:00:39 +08:00
network_socket *recv_sock;
network_mysqld_stmt_ret ret;
gettimeofday(&(con->req_recv_time), NULL);
con->is_admin_client = 1;
2018-03-06 14:00:39 +08:00
recv_sock = con->client;
if (recv_sock->recv_queue->chunks->length != 1) {
g_message("%s: client-recv-queue-len = %d", G_STRLOC,
recv_sock->recv_queue->chunks->length);
2018-03-06 14:00:39 +08:00
}
ret = admin_process_query(con);
switch (ret) {
2018-07-16 18:37:36 +08:00
case PROXY_WAIT_QUERY_RESULT:
return NETWORK_SOCKET_WAIT_FOR_EVENT;
2018-03-06 14:00:39 +08:00
case PROXY_NO_DECISION:
network_mysqld_con_send_error(con->client,
C("request error, \"select * from help\" for usage"));
2018-03-06 14:00:39 +08:00
con->state = ST_SEND_QUERY_RESULT;
break;
case PROXY_SEND_RESULT:
con->state = ST_SEND_QUERY_RESULT;
break;
case PROXY_CLIENT_QUIT:
con->state = ST_CLIENT_QUIT;
break;
2018-03-06 14:00:39 +08:00
default:
network_mysqld_con_send_error(con->client,
C("network packet error, closing connection"));
2018-03-06 14:00:39 +08:00
con->state = ST_SEND_ERROR;
break;
}
g_string_free(g_queue_pop_tail(recv_sock->recv_queue->chunks), TRUE);
return NETWORK_SOCKET_SUCCESS;
}
NETWORK_MYSQLD_PLUGIN_PROTO(server_timeout)
2018-06-01 17:06:50 +08:00
{
con->prev_state = con->state;
con->state = ST_SEND_ERROR;
2018-06-01 17:06:50 +08:00
return NETWORK_SOCKET_SUCCESS;
}
2018-03-06 14:00:39 +08:00
/**
* cleanup the admin specific data on the current connection
*
* @return NETWORK_SOCKET_SUCCESS
*/
NETWORK_MYSQLD_PLUGIN_PROTO(admin_disconnect_client) {
g_free(con->plugin_con_state);
2018-03-06 14:00:39 +08:00
con->plugin_con_state = NULL;
return NETWORK_SOCKET_SUCCESS;
}
static int network_mysqld_server_connection_init(network_mysqld_con *con) {
con->plugins.con_init = server_con_init;
2018-03-06 14:00:39 +08:00
con->plugins.con_read_auth = server_read_auth;
2018-03-06 14:00:39 +08:00
con->plugins.con_read_query = server_read_query;
2018-03-06 14:00:39 +08:00
con->plugins.con_timeout = server_timeout;
2018-06-01 17:06:50 +08:00
2018-07-04 14:13:50 +08:00
con->plugins.con_exectute_sql = execute_admin_query;
con->plugins.con_cleanup = admin_disconnect_client;
2018-03-06 14:00:39 +08:00
return 0;
}
chassis_plugin_config *config;
2018-05-31 18:36:31 +08:00
static chassis_plugin_config *network_mysqld_admin_plugin_new(void)
2018-03-20 14:19:44 +08:00
{
2018-03-06 14:00:39 +08:00
config = g_new0(chassis_plugin_config, 1);
return config;
}
static void network_mysqld_admin_plugin_free(chassis *chas, chassis_plugin_config *config) {
2018-03-06 14:00:39 +08:00
if (config->listen_con) {
/* the socket will be freed by network_mysqld_free() */
}
if (config->address) {
chassis_config_unregister_service(chas->config_manager, config->address);
g_free(config->address);
}
g_free(config->admin_username);
g_free(config->admin_password);
g_free(config->allow_ip);
g_free(config->deny_ip);
2018-03-06 14:00:39 +08:00
2018-05-31 18:36:31 +08:00
if (config->admin_stats) admin_stats_free(config->admin_stats);
2018-03-06 14:00:39 +08:00
g_free(config);
}
gchar*
show_admin_address(gpointer param) {
struct external_param *opt_param = (struct external_param *)param;
gint opt_type = opt_param->opt_type;
if(CAN_SHOW_OPTS_PROPERTY(opt_type)) {
return g_strdup_printf("%s", config->address != NULL ? config->address: "NULL");
}
if(CAN_SAVE_OPTS_PROPERTY(opt_type)) {
if(config->address != NULL) {
return g_strdup_printf("%s", config->address);
}
}
return NULL;
}
gchar*
show_admin_username(gpointer param) {
struct external_param *opt_param = (struct external_param *)param;
gint opt_type = opt_param->opt_type;
if(CAN_SHOW_OPTS_PROPERTY(opt_type)) {
return g_strdup_printf("%s", config->admin_username != NULL ? config->admin_username: "NULL");
}
if(CAN_SAVE_OPTS_PROPERTY(opt_type)) {
if(config->admin_username != NULL) {
return g_strdup_printf("%s", config->admin_username);
}
}
return NULL;
}
gchar*
show_admin_password(gpointer param) {
struct external_param *opt_param = (struct external_param *)param;
gint opt_type = opt_param->opt_type;
if(CAN_SHOW_OPTS_PROPERTY(opt_type)) {
2018-09-06 17:21:57 +08:00
GString* hashed_pwd = g_string_new(0);
network_mysqld_proto_password_hash(hashed_pwd, L(config->admin_password));
char* pwdhex = g_malloc0(hashed_pwd->len * 2 + 10);
bytes_to_hex_str(hashed_pwd->str, hashed_pwd->len, pwdhex);
g_string_free(hashed_pwd, TRUE);
return pwdhex;
}
if(CAN_SAVE_OPTS_PROPERTY(opt_type)) {
2018-09-06 17:21:57 +08:00
return g_strdup(config->admin_password);
}
return NULL;
}
2018-03-06 14:00:39 +08:00
/**
* add the proxy specific options to the cmdline interface
*/
static GList *
network_mysqld_admin_plugin_get_options(chassis_plugin_config *config)
{
2018-03-20 14:19:44 +08:00
chassis_options_t opts = { 0 };
2018-03-06 14:00:39 +08:00
chassis_options_add(&opts, "admin-address",
2018-03-20 14:19:44 +08:00
0, 0, OPTION_ARG_STRING, &(config->address),
"listening address:port of the admin-server (default: :4041)", "<host:port>",
NULL, show_admin_address, SHOW_OPTS_PROPERTY|SAVE_OPTS_PROPERTY);
2018-03-06 14:00:39 +08:00
chassis_options_add(&opts, "admin-username",
0, 0, OPTION_ARG_STRING, &(config->admin_username), "username to allow to log in", "<string>",
NULL, show_admin_username, SHOW_OPTS_PROPERTY|SAVE_OPTS_PROPERTY);
2018-03-06 14:00:39 +08:00
chassis_options_add(&opts, "admin-password",
0, 0, OPTION_ARG_STRING, &(config->admin_password), "password to allow to log in", "<string>",
NULL, show_admin_password, SHOW_OPTS_PROPERTY|SAVE_OPTS_PROPERTY);
2018-03-06 14:00:39 +08:00
chassis_options_add(&opts, "admin-allow-ip",
2018-03-20 14:19:44 +08:00
0, 0, OPTION_ARG_STRING, &(config->allow_ip),
"ip address allowed to connect to admin", "<string>",
2018-08-21 19:04:33 +08:00
NULL, NULL, SAVE_OPTS_PROPERTY);
2018-03-06 14:00:39 +08:00
chassis_options_add(&opts, "admin-deny-ip",
2018-03-20 14:19:44 +08:00
0, 0, OPTION_ARG_STRING, &(config->deny_ip),
"ip address denyed to connect to admin", "<string>",
2018-08-21 19:04:33 +08:00
NULL, NULL, SAVE_OPTS_PROPERTY);
2018-03-06 14:00:39 +08:00
return opts.options;
}
#define MAX_CMD_OR_PATH_LEN 128
static void remove_unix_socket_if_stale(chassis *chas)
{
char command[MAX_CMD_OR_PATH_LEN];
memset(command, 0, MAX_CMD_OR_PATH_LEN);
sprintf(command, "netstat -npl|grep '%s'", chas->proxy_address);
FILE *p = popen(command, "r");
if (p) {
char result[256];
int count = fread(result, 1, sizeof(result), p);
if (count == 0) {
g_message("%s:call unlink", G_STRLOC);
/* no matter if it does not exist */
unlink(chas->unix_socket_name);
}
}
}
2018-09-28 10:42:29 +08:00
static int
check_allowed_running(chassis *chas)
{
char buffer[MAX_CMD_OR_PATH_LEN];
2018-09-28 10:42:29 +08:00
if (strlen(chas->proxy_address) > (MAX_CMD_OR_PATH_LEN / 2)) {
g_message("%s:ip:port string is too long", G_STRLOC);
return -1;
}
memset(buffer, 0 ,MAX_CMD_OR_PATH_LEN);
2018-09-28 10:42:29 +08:00
sprintf(buffer, "/tmp/%s", chas->proxy_address);
chas->unix_socket_name = g_strdup(buffer);
remove_unix_socket_if_stale(chas);
2018-09-28 10:42:29 +08:00
int fd;
if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
g_message("%s:create socket error", G_STRLOC);
return -1;
}
struct sockaddr_un un;
memset(&un, 0, sizeof(un));
un.sun_family = AF_UNIX;
const char *name = chas->unix_socket_name;
strcpy(un.sun_path, name);
int len = offsetof(struct sockaddr_un, sun_path) + strlen(name);
if (bind(fd, (struct sockaddr *)&un, len) < 0) {
g_message("%s:already running", G_STRLOC);
close(fd);
return -1;
}
close(fd);
return 0;
}
2018-03-06 14:00:39 +08:00
/**
* init the plugin with the parsed config
*/
static int
network_mysqld_admin_plugin_apply_config(chassis *chas,
chassis_plugin_config *config)
2018-03-06 14:00:39 +08:00
{
if (config->listen_con) {
g_message("%s:close listen socket", G_STRLOC);
config->listen_con->server->event.ev_base = NULL;
network_socket_free(config->listen_con->server);
config->listen_con = NULL;
return 0;
}
2018-07-05 15:19:53 +08:00
g_message("%s:call network_mysqld_admin_plugin_apply_config", G_STRLOC);
2018-03-06 14:00:39 +08:00
network_mysqld_con *con;
network_socket *listen_sock;
if (!config->address) {
config->address = g_strdup(":4041");
2018-09-28 10:42:29 +08:00
}
chas->proxy_address = config->address;
g_message("set admin address for chassis:%s", config->address);
#if defined(SO_REUSEPORT)
2018-09-28 10:42:29 +08:00
if (chas->enable_admin_listen) {
if (check_allowed_running(chas) == -1) {
return -1;
}
2018-03-06 14:00:39 +08:00
}
#endif
2018-03-20 14:19:44 +08:00
2018-03-06 14:00:39 +08:00
if (!config->admin_username) {
g_critical("%s: --admin-username needs to be set",
G_STRLOC);
2018-03-06 14:00:39 +08:00
return -1;
}
if (!config->admin_password) {
g_critical("%s: --admin-password needs to be set",
G_STRLOC);
2018-03-06 14:00:39 +08:00
return -1;
}
if (!g_strcmp0(config->admin_password, "")) {
g_critical("%s: --admin-password cannot be empty",
G_STRLOC);
2018-03-06 14:00:39 +08:00
return -1;
}
2018-08-21 19:04:33 +08:00
cetus_acl_add_rules(chas->priv->acl, ACL_WHITELIST, config->allow_ip);
2018-03-06 14:00:39 +08:00
con = network_mysqld_con_new();
con->config = config;
2018-07-05 15:19:53 +08:00
network_mysqld_add_connection(chas, con, TRUE);
2018-03-06 14:00:39 +08:00
2018-07-05 15:19:53 +08:00
/**
* create a connection handle for the listen socket
*/
if (chas->enable_admin_listen) {
g_message("%s:enable_admin_listen true", G_STRLOC);
config->listen_con = con;
listen_sock = network_socket_new();
con->server = listen_sock;
}
2018-03-06 14:00:39 +08:00
2018-07-05 15:19:53 +08:00
g_message("%s:before set hooks", G_STRLOC);
2018-07-04 14:13:50 +08:00
2018-03-06 14:00:39 +08:00
/*
* set the plugin hooks as we want to apply them to the new
* connections too later
*/
network_mysqld_server_connection_init(con);
2018-07-05 15:19:53 +08:00
g_message("%s:after set hooks", G_STRLOC);
2018-03-06 14:00:39 +08:00
2018-07-05 15:19:53 +08:00
if (chas->enable_admin_listen) {
/* FIXME: network_socket_set_address() */
if (0 != network_address_set_address(listen_sock->dst,
config->address))
{
return -1;
}
2018-03-06 14:00:39 +08:00
2018-09-28 10:42:29 +08:00
if (0 != network_socket_bind(listen_sock, 1)) {
2018-07-05 15:19:53 +08:00
return -1;
}
/**
* call network_mysqld_con_accept() with this connection when we are done
*/
event_set(&(listen_sock->event), listen_sock->fd,
EV_READ|EV_PERSIST, network_mysqld_con_accept, con);
chassis_event_add(chas, &(listen_sock->event));
}
2018-03-06 14:00:39 +08:00
2018-07-04 14:13:50 +08:00
chas->admin_plugin = &(con->plugins);
2018-07-05 15:19:53 +08:00
config->has_shard_plugin = has_shard_plugin(chas->modules);
2018-03-06 14:00:39 +08:00
chassis_config_register_service(chas->config_manager, config->address, "admin");
config->admin_stats = admin_stats_init(chas);
admin_config = config;
2018-03-06 14:00:39 +08:00
return 0;
}
2018-07-31 10:10:30 +08:00
static void
network_mysqld_admin_plugin_stop_listening(chassis *chas,
chassis_plugin_config *config)
{
2018-09-30 08:54:14 +08:00
g_message("%s:call network_mysqld_admin_plugin_stop_listening", G_STRLOC);
2018-07-31 10:10:30 +08:00
if (config->listen_con) {
g_message("%s:close listen socket:%d", G_STRLOC, config->listen_con->server->fd);
2018-07-31 10:10:30 +08:00
network_socket_free(config->listen_con->server);
config->listen_con = NULL;
}
}
G_MODULE_EXPORT int plugin_init(chassis_plugin *p) {
p->magic = CHASSIS_PLUGIN_MAGIC;
p->name = g_strdup("admin");
p->version = g_strdup(PLUGIN_VERSION);
2018-03-06 14:00:39 +08:00
p->init = network_mysqld_admin_plugin_new;
p->get_options = network_mysqld_admin_plugin_get_options;
2018-03-06 14:00:39 +08:00
p->apply_config = network_mysqld_admin_plugin_apply_config;
2018-07-31 10:10:30 +08:00
p->stop_listening = network_mysqld_admin_plugin_stop_listening;
p->destroy = network_mysqld_admin_plugin_free;
2018-03-06 14:00:39 +08:00
return 0;
}