From f58b658d9583313d2ec81f9cc5641266acb2f1ea Mon Sep 17 00:00:00 2001 From: lazio579 Date: Thu, 28 Mar 2019 10:36:53 +0800 Subject: [PATCH] Fix consistency problems --- plugins/proxy/proxy-plugin.c | 46 +++++++++++++++++++++++------------- src/network-injection.h | 15 ++++++------ src/network-mysqld.c | 10 +++----- 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/plugins/proxy/proxy-plugin.c b/plugins/proxy/proxy-plugin.c index 6ea71b5..e23d416 100644 --- a/plugins/proxy/proxy-plugin.c +++ b/plugins/proxy/proxy-plugin.c @@ -711,7 +711,8 @@ process_non_trans_query(network_mysqld_con *con, sql_context_t *context, mysqld_ } static void -proxy_inject_packet(network_mysqld_con *con, int type, int resp_type, GString *payload, gboolean resultset_is_needed) +proxy_inject_packet(network_mysqld_con *con, int type, int resp_type, GString *payload, + gboolean resultset_is_needed, gboolean is_fast_streamed) { proxy_plugin_con_t *st = con->plugin_con_state; GQueue *q = st->injected.queries; @@ -720,6 +721,7 @@ proxy_inject_packet(network_mysqld_con *con, int type, int resp_type, GString *p inj->ts_read_query = get_timer_microseconds(); } inj->resultset_is_needed = resultset_is_needed; + inj->is_fast_streamed = is_fast_streamed; switch (type) { case PROXY_QUEUE_ADD_APPEND: @@ -805,12 +807,12 @@ adjust_sql_mode(network_mysqld_con *con, mysqld_query_attr_t *query_attr) g_string_append(packet, "SET sql_mode='"); g_string_append_len(packet, con->client->sql_mode->str, con->client->sql_mode->len); g_string_append(packet, "'"); - proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHANGE_SQL_MODE, packet, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHANGE_SQL_MODE, packet, TRUE, FALSE); } else { GString *packet = g_string_new(NULL); g_string_append_c(packet, (char)COM_QUERY); g_string_append(packet, "SET sql_mode=''"); - proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHANGE_SQL_MODE, packet, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHANGE_SQL_MODE, packet, TRUE, FALSE); } g_string_assign_len(con->server->sql_mode, con->client->sql_mode->str, con->client->sql_mode->len); @@ -858,7 +860,7 @@ adjust_charset(network_mysqld_con *con, mysqld_query_attr_t *query_attr) g_string_append_c(packet, (char)COM_QUERY); g_string_append(packet, "SET character_set_client = "); g_string_append(packet, con->client->charset_client->str); - proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHAR_SET_CLT, packet, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHAR_SET_CLT, packet, TRUE, FALSE); } GString *charset_client = con->client->charset_client; g_string_assign_len(con->server->charset_client, charset_client->str, charset_client->len); @@ -875,7 +877,7 @@ adjust_charset(network_mysqld_con *con, mysqld_query_attr_t *query_attr) g_string_append_c(packet, (char)COM_QUERY); g_string_append(packet, "SET character_set_connection = "); g_string_append(packet, con->client->charset_connection->str); - proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHAR_SET_CONN, packet, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHAR_SET_CONN, packet, TRUE, FALSE); } GString *charset_conn = con->client->charset_connection; g_string_assign_len(con->server->charset_connection, charset_conn->str, charset_conn->len); @@ -892,12 +894,12 @@ adjust_charset(network_mysqld_con *con, mysqld_query_attr_t *query_attr) g_string_append_c(packet, (char)COM_QUERY); g_string_append(packet, "SET character_set_results = "); g_string_append(packet, con->client->charset_results->str); - proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHAR_SET_RESULTS, packet, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHAR_SET_RESULTS, packet, TRUE, FALSE); } else { GString *packet = g_string_new(NULL); g_string_append_c(packet, (char)COM_QUERY); g_string_append(packet, "SET character_set_results = NULL"); - proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHAR_SET_RESULTS, packet, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHAR_SET_RESULTS, packet, TRUE, FALSE); } GString *charset_results = con->client->charset_results; @@ -920,7 +922,7 @@ adjust_charset(network_mysqld_con *con, mysqld_query_attr_t *query_attr) g_string_append(packet, charset_str); } - proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_SET_NAMES, packet, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_SET_NAMES, packet, TRUE, FALSE); } return 0; @@ -941,7 +943,7 @@ adjust_default_db(network_mysqld_con *con, enum enum_server_command cmd) g_string_append_c(packet, (char)COM_QUERY); g_string_append(packet, "use "); g_string_append_len(packet, clt_default_db->str, clt_default_db->len); - proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHANGE_DB, packet, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHANGE_DB, packet, TRUE, FALSE); g_debug("%s: adjust default db", G_STRLOC); } } @@ -954,7 +956,7 @@ reset_connection(network_mysqld_con *con) GString *packet = g_string_new(NULL); g_string_append_c(packet, (char)COM_RESET_CONNECTION); - proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_RESET_CONNECTION, packet, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_RESET_CONNECTION, packet, TRUE, FALSE); con->server->is_in_sess_context = 0; @@ -991,7 +993,7 @@ adjust_user(network_mysqld_con *con) GString *payload = g_string_new(NULL); mysqld_proto_append_change_user_packet(payload, &chuser); - proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHANGE_USER, payload, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHANGE_USER, payload, TRUE, FALSE); con->server->is_in_sess_context = 0; g_string_free(hashed_password, TRUE); @@ -1011,7 +1013,7 @@ adjust_multi_stmt(network_mysqld_con *con, enum enum_server_command cmd) g_string_append_c(packet, (char)1); } g_string_append_c(packet, (char)0); - proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHANGE_MULTI_STMT, packet, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_PREPEND, INJ_ID_CHANGE_MULTI_STMT, packet, TRUE, FALSE); g_debug("%s: adjust multi stmt", G_STRLOC); con->server->is_multi_stmt_set = con->client->is_multi_stmt_set; } @@ -1314,6 +1316,7 @@ network_read_query(network_mysqld_con *con, proxy_plugin_con_t *st) con->master_conn_shortaged = 0; con->slave_conn_shortaged = 0; con->use_slave_forced = 0; + con->candidate_fast_streamed = 0; network_injection_queue_reset(st->injected.queries); @@ -1436,16 +1439,26 @@ network_read_query(network_mysqld_con *con, proxy_plugin_con_t *st) switch (command) { case COM_QUERY: if (context->stmt_type == STMT_SELECT && con->server->is_read_only) { - proxy_inject_packet(con, PROXY_QUEUE_ADD_APPEND, INJ_ID_COM_QUERY, payload, FALSE); + if (con->srv->is_fast_stream_enabled) { + if ((!con->srv->sql_mgr) || + (con->srv->sql_mgr->sql_log_switch != ON && con->srv->sql_mgr->sql_log_switch != REALTIME)) + { + proxy_inject_packet(con, PROXY_QUEUE_ADD_APPEND, INJ_ID_COM_QUERY, payload, FALSE, TRUE); + } else { + proxy_inject_packet(con, PROXY_QUEUE_ADD_APPEND, INJ_ID_COM_QUERY, payload, FALSE, FALSE); + } + } else { + proxy_inject_packet(con, PROXY_QUEUE_ADD_APPEND, INJ_ID_COM_QUERY, payload, FALSE, FALSE); + } } else { - proxy_inject_packet(con, PROXY_QUEUE_ADD_APPEND, INJ_ID_COM_QUERY, payload, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_APPEND, INJ_ID_COM_QUERY, payload, TRUE, FALSE); } break; case COM_STMT_PREPARE: - proxy_inject_packet(con, PROXY_QUEUE_ADD_APPEND, INJ_ID_COM_STMT_PREPARE, payload, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_APPEND, INJ_ID_COM_STMT_PREPARE, payload, TRUE, FALSE); break; default: - proxy_inject_packet(con, PROXY_QUEUE_ADD_APPEND, INJ_ID_COM_DEFAULT, payload, TRUE); + proxy_inject_packet(con, PROXY_QUEUE_ADD_APPEND, INJ_ID_COM_DEFAULT, payload, TRUE, FALSE); } if (context->stmt_type == STMT_SHOW_WARNINGS && con->last_warning_met) { @@ -1651,6 +1664,7 @@ NETWORK_MYSQLD_PLUGIN_PROTO(proxy_read_query) inj = g_queue_peek_head(st->injected.queries); con->resultset_is_needed = inj->resultset_is_needed; + con->candidate_fast_streamed = inj->is_fast_streamed; send_sock = con->server; diff --git a/src/network-injection.h b/src/network-injection.h index 6285c59..84439af 100644 --- a/src/network-injection.h +++ b/src/network-injection.h @@ -52,25 +52,26 @@ typedef struct { } query_status; typedef struct { - GString *query; - /**< a unique id set by the scripts to map the query to a handler */ int id; + /**< flag to announce if we have to buffer the result for later processing */ + unsigned int resultset_is_needed:1; + unsigned int is_fast_streamed:1; + + GString *query; /* the userdata's need them */ GQueue *result_queue; /**< the data to parse */ - /**< summary information about the query status */ - query_status qstat; - guint64 rows; guint64 bytes; guint64 ts_read_query; guint64 ts_read_query_result_last; - /**< flag to announce if we have to buffer the result for later processing */ - unsigned int resultset_is_needed:1; + /**< summary information about the query status */ + query_status qstat; + } injection; /** diff --git a/src/network-mysqld.c b/src/network-mysqld.c index 258047d..693ffaf 100644 --- a/src/network-mysqld.c +++ b/src/network-mysqld.c @@ -2739,7 +2739,6 @@ process_rw_write(network_mysqld_con *con, network_mysqld_con_state_t ostate, int } } - if (con->server->send_queue->offset == 0) { /* only parse the packets once */ network_packet packet; @@ -4060,12 +4059,9 @@ network_mysqld_read_rw_resp(network_mysqld_con *con, network_socket *server, int server->resp_len += read_len; if (!server->do_compress) { - if (read_len > 0 && !con->resultset_is_needed && con->srv->is_fast_stream_enabled) { - if ((!con->srv->sql_mgr) || - (con->srv->sql_mgr->sql_log_switch != ON && con->srv->sql_mgr->sql_log_switch != REALTIME)) - { - return network_mysqld_process_select_resp(con, server, NULL, disp_flag); - } + if (read_len > 0 && !con->resultset_is_needed && con->candidate_fast_streamed) { + g_debug("%s: visit network_mysqld_process_select_resp for con:%p", G_STRLOC, con); + return network_mysqld_process_select_resp(con, server, NULL, disp_flag); } ret = network_mysqld_con_get_packet(chas, server); } else {