mirror of
https://gitee.com/wangbin579/cetus.git
synced 2024-12-02 03:47:41 +08:00
Optimize xa transactions when only one server does write operations
This commit is contained in:
parent
a6228a287f
commit
2889741311
@ -300,7 +300,6 @@ NETWORK_MYSQLD_PLUGIN_PROTO(proxy_read_query)
|
||||
if (con->srv->query_cache_enabled) {
|
||||
shard_plugin_con_t *st = con->plugin_con_state;
|
||||
if (sql_context_is_cacheable(st->sql_context)) {
|
||||
shard_plugin_con_t *st = con->plugin_con_state;
|
||||
if (!con->is_in_transaction && !con->srv->master_preferred &&
|
||||
!(st->sql_context->rw_flag & CF_FORCE_MASTER) && !(st->sql_context->rw_flag & CF_FORCE_SLAVE)) {
|
||||
if (try_to_get_resp_from_query_cache(con)) {
|
||||
@ -1193,6 +1192,7 @@ proxy_get_server_list(network_mysqld_con *con)
|
||||
}
|
||||
}
|
||||
|
||||
con->write_flag = 0;
|
||||
con->use_all_prev_servers = 0;
|
||||
|
||||
query_stats_t *stats = &(con->srv->query_stats);
|
||||
@ -1201,6 +1201,10 @@ proxy_get_server_list(network_mysqld_con *con)
|
||||
|
||||
shard_plugin_con_t *st = con->plugin_con_state;
|
||||
|
||||
if (st->sql_context->rw_flag & CF_WRITE) {
|
||||
con->write_flag = 1;
|
||||
}
|
||||
|
||||
switch (con->parse.command) {
|
||||
case COM_INIT_DB:
|
||||
if (!process_init_db_when_get_server_list(con, plan, &rv, &disp_flag)) {
|
||||
@ -1244,7 +1248,6 @@ proxy_get_server_list(network_mysqld_con *con)
|
||||
con->server_to_be_closed = 0;
|
||||
con->server_closed = 0;
|
||||
con->resp_too_long = 0;
|
||||
con->all_participate_num = 0;
|
||||
|
||||
if (con->last_record_updated || con->srv->master_preferred ||
|
||||
st->sql_context->rw_flag & CF_WRITE ||
|
||||
@ -1418,6 +1421,7 @@ proxy_add_server_connection(network_mysqld_con *con, GString *group, int *server
|
||||
ss->server->last_packet_id = 0;
|
||||
ss->server->parse.qs_state = PARSE_COM_QUERY_INIT;
|
||||
ss->participated = 1;
|
||||
ss->has_xa_write = 0;
|
||||
ss->state = NET_RW_STATE_NONE;
|
||||
ss->fresh = 1;
|
||||
ss->is_xa_over = 0;
|
||||
|
@ -1471,7 +1471,7 @@ record_xa_log_for_mending(network_mysqld_con *con, network_socket *sock)
|
||||
tc_log_info(LOG_WARN, 0, "XA ROLLBACK %s %s@%u failed",
|
||||
con->xid_str, sock->dst->name->str, sock->challenge->thread_id);
|
||||
} else {
|
||||
if (con->servers->len == 1) {
|
||||
if (con->servers->len == 1 || con->write_server_num <= 1) {
|
||||
tc_log_info(LOG_WARN, 0,
|
||||
"XA COMMIT %s %s@%u ONE PHASE failed",
|
||||
con->xid_str, sock->dst->name->str, sock->challenge->thread_id);
|
||||
@ -1788,9 +1788,12 @@ shard_build_xa_query(network_mysqld_con *con, server_session_t *ss)
|
||||
ss->state = NET_RW_STATE_NONE;
|
||||
|
||||
con->is_xa_query_sent = 1;
|
||||
con->all_participate_num++;
|
||||
con->resp_expected_num++;
|
||||
|
||||
if (con->write_flag) {
|
||||
ss->has_xa_write = 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1814,7 +1817,7 @@ build_xa_command(network_mysqld_con *con, server_session_t *ss, int end, char *b
|
||||
|
||||
break;
|
||||
case NEXT_ST_XA_PREPARE:
|
||||
if (con->servers->len == 1) {
|
||||
if (con->servers->len == 1 || con->write_server_num <= 1) {
|
||||
snprintf(buffer, XA_CMD_BUF_LEN, "XA COMMIT %s ONE PHASE", con->xid_str);
|
||||
ss->dist_tran_state = NEXT_ST_XA_CANDIDATE_OVER;
|
||||
con->dist_tran_decided = 1;
|
||||
@ -2012,10 +2015,16 @@ build_xa_statements(network_mysqld_con *con)
|
||||
char buffer[XA_BUF_LEN] = { 0 };
|
||||
char *p_buffer = buffer;
|
||||
|
||||
con->write_server_num = 0;
|
||||
|
||||
for (iter = 0; iter < len; iter++) {
|
||||
server_session_t *ss = g_ptr_array_index(con->servers, iter);
|
||||
g_debug("%s: ss %d, xa state:%d for con:%p", G_STRLOC, iter, ss->dist_tran_state, con);
|
||||
|
||||
if (ss->has_xa_write) {
|
||||
con->write_server_num++;
|
||||
}
|
||||
|
||||
if (!con->is_commit_or_rollback && !ss->participated) {
|
||||
g_debug("%s: stop processing for this server:%d", G_STRLOC, iter);
|
||||
continue;
|
||||
|
@ -575,6 +575,7 @@ struct network_mysqld_con {
|
||||
unsigned int last_record_updated:1;
|
||||
unsigned int query_cache_judged:1;
|
||||
unsigned int is_client_compressed:1;
|
||||
unsigned int write_flag:1;
|
||||
unsigned int is_processed_by_subordinate:1;
|
||||
unsigned int is_admin_client:1;
|
||||
unsigned int is_admin_waiting_resp:1;
|
||||
@ -610,7 +611,7 @@ struct network_mysqld_con {
|
||||
unsigned int last_payload_len:4;
|
||||
unsigned int process_index:6;
|
||||
unsigned int last_packet_id:8;
|
||||
unsigned int all_participate_num:8;
|
||||
unsigned int write_server_num:8;
|
||||
|
||||
unsigned long long xa_id;
|
||||
guint32 auth_switch_to_round;
|
||||
@ -693,6 +694,7 @@ typedef enum {
|
||||
typedef struct server_session_t {
|
||||
unsigned int fresh:1;
|
||||
unsigned int participated:1;
|
||||
unsigned int has_xa_write:1;
|
||||
unsigned int xa_start_already_sent:1;
|
||||
unsigned int dist_tran_participated:1;
|
||||
unsigned int xa_query_status_error_and_abort:1;
|
||||
|
Loading…
Reference in New Issue
Block a user