Fix dup xa id problems when multi-processed

This commit is contained in:
wangbin579 2018-08-15 14:33:23 +08:00
parent ffa21bc963
commit 0183498c1e
14 changed files with 73 additions and 89 deletions

View File

@ -110,11 +110,6 @@ void admin_select_all_backends(network_mysqld_con* admin_con)
field->type = MYSQL_TYPE_STRING;
g_ptr_array_add(fields, field);
field = network_mysqld_proto_fielddef_new();
field->name = g_strdup("uuid");
field->type = MYSQL_TYPE_STRING;
g_ptr_array_add(fields, field);
field = network_mysqld_proto_fielddef_new();
field->name = g_strdup("idle_conns");
field->type = MYSQL_TYPE_STRING;
@ -166,8 +161,6 @@ void admin_select_all_backends(network_mysqld_con* admin_con)
sprintf(buffer, "%d", backend->slave_delay_msec);
g_ptr_array_add(row, (backend->type == BACKEND_TYPE_RO && chas->check_slave_delay == 1) ? g_strdup(buffer) : NULL);
g_ptr_array_add(row, backend->uuid->len ? g_strdup(backend->uuid->str) : NULL);
sprintf(buffer, "%d", backend->pool->cur_idle_connections);
g_ptr_array_add(row, g_strdup(buffer));

View File

@ -1590,8 +1590,9 @@ sharding_parse_groups(GString *default_db, sql_context_t *context, query_stats_t
case STMT_ROLLBACK:
g_ptr_array_free(groups, TRUE);
return USE_PREVIOUS_TRAN_CONNS;
case STMT_COMMON_DDL: /* ddl without comments sent to all */
case STMT_CALL:
return rc;
case STMT_COMMON_DDL: /* ddl without comments sent to all */
shard_conf_get_all_groups(groups);
sharding_plan_add_groups(plan, groups);
g_ptr_array_free(groups, TRUE);

View File

@ -603,6 +603,15 @@ cetus_worker_process_init(cetus_cycle_t *cycle, int worker)
chassis_event_add(cycle, &cetus_channel_event);
g_debug("%s: cetus_channel:%d is waiting for read, event base:%p, ev:%p",
G_STRLOC, cetus_channel, cycle->event_base, &cetus_channel_event);
#ifndef SIMPLE_PARSER
cycle->dist_tran_id = g_random_int_range(0, 100000000);
int master_id = cycle->guid_state.worker_id;
snprintf(cycle->dist_tran_prefix, MAX_DIST_TRAN_PREFIX, "clt-%d-%d", master_id, getpid());
g_message("Initial dist_tran_id:%llu", cycle->dist_tran_id);
g_message("dist_tran_prefix:%s, process id:%d", cycle->dist_tran_prefix, cetus_process_id);
incremental_guid_init(&(cycle->guid_state));
#endif
}

View File

@ -31,6 +31,7 @@ char **cetus_os_argv;
int cetus_process_slot;
int cetus_channel;
int cetus_last_process;
int cetus_process_id;
struct event cetus_channel_event;
cetus_process_t cetus_processes[CETUS_MAX_PROCESSES];
@ -249,6 +250,8 @@ cetus_spawn_process(cetus_cycle_t *cycle, cetus_spawn_proc_pt proc, void *data,
g_message("%s: cetus_last_process add,orig:%d, cetus_processes[s].parent_child_channel[0]:%d",
G_STRLOC, cetus_last_process, cetus_processes[s].parent_child_channel[0]);
cetus_last_process++;
/* TODO may have potential problems when having too many crashes */
cetus_process_id = (cetus_process_id + 1) % MAX_WORK_PROCESSES;
}
return pid;

View File

@ -78,6 +78,7 @@ extern pid_t cetus_parent;
extern int cetus_channel;
extern int cetus_process_slot;
extern int cetus_last_process;
extern int cetus_process_id;
extern struct event cetus_channel_event;
extern cetus_process_t cetus_processes[CETUS_MAX_PROCESSES];

View File

@ -52,6 +52,7 @@
#include "chassis-sql-log.h"
static volatile sig_atomic_t signal_shutdown;
extern int cetus_process_id;
/**
* check if the libevent headers we built against match the
@ -102,8 +103,6 @@ chassis_new()
chas->shutdown_hooks = chassis_shutdown_hooks_new();
incremental_guid_init(&(chas->guid_state));
chas->startup_time = time(0);
chas->pid_file = NULL;
@ -322,18 +321,6 @@ chassis_mainloop(void *_chas)
chas->event_base = mainloop;
g_assert(chas->event_base);
#ifndef SIMPLE_PARSER
chas->dist_tran_id = g_random_int_range(0, 100000000);
int srv_id = g_random_int_range(0, 10000);
if (chas->proxy_address) {
snprintf(chas->dist_tran_prefix, MAX_DIST_TRAN_PREFIX, "clt-%s-%d", chas->proxy_address, srv_id);
} else {
snprintf(chas->dist_tran_prefix, MAX_DIST_TRAN_PREFIX, "clt-%d", srv_id);
}
g_message("Initial dist_tran_id:%llu", chas->dist_tran_id);
g_message("dist_tran_prefix:%s", chas->dist_tran_prefix);
#endif
/*
* drop root privileges if requested
*/
@ -386,29 +373,33 @@ chassis_mainloop(void *_chas)
return 0;
}
#ifndef SIMPLE_PARSER
uint64_t
incremental_guid_get_next(struct incremental_guid_state_t *s)
{
uint64_t uniq_id = 0;
uint64_t cur_time = time(0);
static uint64_t SEQ_MASK = (-1L ^ (-1L << 16L));
static uint64_t SEQ_MASK = (-1L ^ (-1L << 10L));
struct timeval tp;
gettimeofday(&tp, NULL);
unsigned int msec = tp.tv_usec / 1000;
uint64_t cur_time = tp.tv_sec;
uniq_id = cur_time << 32;
uniq_id |= (s->worker_id & 0xff) << 24;
uniq_id |= (msec << 22);
uniq_id |= (s->worker_id & 0xfff) << 10;
if (cur_time == s->last_sec) {
if (cur_time == s->last_sec && msec == s->last_msec) {
s->seq_id = (s->seq_id + 1) & SEQ_MASK;
if (s->seq_id == 0) {
s->rand_id = (s->rand_id + 1) & 0x3ff;
g_message("%s:rand id changed:%llu", G_STRLOC, (unsigned long long)s->rand_id);
g_critical("%s:too many calls in one millisecond", G_STRLOC);
}
} else {
s->seq_id = 0;
s->rand_id = s->init_rand_id;
}
s->last_sec = cur_time;
uniq_id |= s->rand_id << 16;
s->last_msec = msec;
uniq_id |= s->seq_id;
return uniq_id;
@ -419,14 +410,12 @@ incremental_guid_init(struct incremental_guid_state_t *s)
{
struct timeval tp;
gettimeofday(&tp, NULL);
unsigned int seed = tp.tv_usec;
if (s->worker_id == 0) {
s->worker_id = (int)((rand_r(&seed) / (RAND_MAX + 1.0)) * 64);
}
s->rand_id = (int)((rand_r(&seed) / (RAND_MAX + 1.0)) * 1024);
s->init_rand_id = s->rand_id;
s->last_sec = time(0);
s->worker_id = (s->worker_id << MAX_WORK_PROCESSES_SHIFT) + cetus_process_id;
g_message("internal worker id:%d", s->worker_id);
s->last_sec = tp.tv_sec;
s->last_msec = tp.tv_usec;
s->seq_id = 0;
}
#endif

View File

@ -53,6 +53,8 @@ typedef struct chassis chassis;
#define MAX_SERVER_NUM 64
#define MAX_SERVER_NUM_FOR_PREPARE 32
#define MAX_WORK_PROCESSES 64
#define MAX_WORK_PROCESSES_SHIFT 6
#define MAX_QUERY_TIME 1000
#define MAX_WAIT_TIME 1024
#define MAX_TRY_NUM 6
@ -88,17 +90,18 @@ typedef struct query_stats_t {
rw_op_t server_query_details[MAX_SERVER_NUM];
} query_stats_t;
#ifndef SIMPLE_PARSER
/* For generating unique global ids for MySQL */
struct incremental_guid_state_t {
unsigned int last_sec;
unsigned int last_msec;
int worker_id;
int rand_id;
int init_rand_id;
int seq_id;
};
void incremental_guid_init(struct incremental_guid_state_t *s);
uint64_t incremental_guid_get_next(struct incremental_guid_state_t *s);
#endif
struct chassis {
struct event_base *event_base;
@ -176,7 +179,10 @@ struct chassis {
query_stats_t query_stats;
#ifndef SIMPLE_PARSER
struct incremental_guid_state_t guid_state;
#endif
time_t startup_time;
time_t current_time;
struct chassis_options_t *options;

View File

@ -516,8 +516,8 @@ assign_worker_processes(const gchar *newval, gpointer param) {
if (try_get_int_value(newval, &value)) {
if (value <= 0) {
srv->worker_processes = 1;
} else if (value > 64) {
srv->worker_processes = 64;
} else if (value > MAX_WORK_PROCESSES) {
srv->worker_processes = MAX_WORK_PROCESSES;
} else {
srv->worker_processes = value;
}
@ -708,6 +708,7 @@ assign_max_header_size(const gchar *newval, gpointer param) {
return ret;
}
#ifndef SIMPLE_PARSER
gchar*
show_worker_id(gpointer param) {
struct external_param *opt_param = (struct external_param *)param;
@ -721,6 +722,7 @@ show_worker_id(gpointer param) {
}
return NULL;
}
#endif
gchar*
show_disable_threads(gpointer param) {

View File

@ -102,7 +102,9 @@ struct chassis_frontend_t {
int max_resp_len;
int max_alive_time;
int master_preferred;
#ifndef SIMPLE_PARSER
int worker_id;
#endif
int config_port;
int disable_threads;
int is_tcp_stream_enabled;
@ -377,11 +379,13 @@ chassis_frontend_set_chassis_options(struct chassis_frontend_t *frontend, chassi
"set the max header size for tcp streaming", "<integer>",
assign_max_header_size, show_max_header_size, ALL_OPTS_PROPERTY);
#ifndef SIMPLE_PARSER
chassis_options_add(opts,
"worker-id",
0, 0, OPTION_ARG_INT, &(frontend->worker_id),
"Set the worker id and the maximum value allowed is 63 and the min value is 1", "<integer>",
NULL, show_worker_id, SHOW_OPTS_PROPERTY|SAVE_OPTS_PROPERTY);
#endif
chassis_options_add(opts,
"disable-threads",
@ -621,10 +625,11 @@ init_parameters(struct chassis_frontend_t *frontend, chassis *srv)
srv->default_charset = DUP_STRING(frontend->default_charset, NULL);
srv->default_db = DUP_STRING(frontend->default_db, NULL);
frontend->worker_processes = 1;
if (frontend->worker_processes < 1) {
srv->worker_processes = 4;
} else if (frontend->worker_processes > 64) {
srv->worker_processes = 64;
} else if (frontend->worker_processes > MAX_WORK_PROCESSES) {
srv->worker_processes = MAX_WORK_PROCESSES;
} else {
srv->worker_processes = frontend->worker_processes;
}
@ -662,9 +667,20 @@ init_parameters(struct chassis_frontend_t *frontend, chassis *srv)
srv->max_header_size = frontend->max_header_size;
g_message("%s:set max header size:%d", G_STRLOC, srv->max_header_size);
#ifndef SIMPLE_PARSER
if (frontend->worker_id > 0) {
srv->guid_state.worker_id = frontend->worker_id & 0x3f;
} else {
struct timeval tp;
gettimeofday(&tp, NULL);
unsigned int seed = tp.tv_usec;
srv->guid_state.worker_id = (int)((rand_r(&seed) / (RAND_MAX + 1.0)) * 64);
g_warning("%s:please set worker id first, different instances should have different worker ids", G_STRLOC);
g_message("%s: the system chooses worker id automatically although it may have potential conflicts:%d",
G_STRLOC, srv->guid_state.worker_id);
}
#endif
#undef DUP_STRING
srv->client_found_rows = frontend->set_client_found_rows;

View File

@ -52,7 +52,6 @@ network_backend_new()
b = g_new0(network_backend_t, 1);
b->pool = network_connection_pool_new();
b->uuid = g_string_new(NULL);
b->addr = network_address_new();
b->server_group = g_string_new(NULL);
b->address = g_string_new(NULL);
@ -70,7 +69,6 @@ network_backend_free(network_backend_t *b)
network_connection_pool_free(b->pool);
network_address_free(b->addr);
g_string_free(b->uuid, TRUE);
g_string_free(b->server_version, TRUE);
g_string_free(b->server_group, TRUE);

View File

@ -74,9 +74,6 @@ typedef struct {
/**< number of open connections to this backend for SQF */
int connected_clients;
/**< the UUID of the backend */
GString *uuid;
backend_config *config;
time_t last_check_time;

View File

@ -2172,12 +2172,6 @@ normal_result_merge(network_mysqld_con *con)
network_mysqld_con_send_error_full(con->client, C("merge failed"), ER_CETUS_RESULT_MERGE, "HY000");
}
break;
case RM_CALL_FAIL:{
char msg[128] = { 0 };
snprintf(msg, sizeof(msg), "id:%lu '%s' failed", uniq_id, con->orig_sql->str);
network_mysqld_con_send_error_full(con->client, msg, strlen(msg), ER_CETUS_RESULT_MERGE, "HY000");
break;
}
default:
break;
}

View File

@ -355,7 +355,6 @@ typedef enum {
typedef enum {
RM_SUCCESS,
RM_FAIL,
RM_CALL_FAIL
} result_merge_status_t;
typedef struct result_merge_t {

View File

@ -2509,12 +2509,6 @@ merge_for_modify(sql_context_t *context, network_queue *send_queue, GPtrArray *r
case MYSQLD_PACKET_ERR:
network_queue_append(send_queue, pkt);
g_queue_remove(recv_q->chunks, pkt);
if (context && context->stmt_type == STMT_CALL) {
g_message("%s: stored procedure failed", G_STRLOC);
cetus_result_destroy(res_merge);
merged_result->status = RM_CALL_FAIL;
return 0;
}
cetus_result_destroy(res_merge);
merged_result->status = RM_FAIL;
return 0;
@ -2886,7 +2880,7 @@ merge_for_admin(network_queue *send_queue, GPtrArray *recv_queues,
static int
check_fail_met(sql_context_t *context, network_queue *send_queue, GPtrArray *recv_queues,
network_mysqld_con *con, uint64_t *uniq_id, int *call_fail_met, result_merge_t *merged_result)
network_mysqld_con *con, uint64_t *uniq_id, result_merge_t *merged_result)
{
int p;
char *orig_sql = con->orig_sql->str;
@ -2901,23 +2895,14 @@ check_fail_met(sql_context_t *context, network_queue *send_queue, GPtrArray *rec
if (pkt_type == MYSQLD_PACKET_ERR) {
g_warning("%s: failed query:%s, server:%s", G_STRLOC, orig_sql, ss->server->dst->name->str);
}
if (context->stmt_type == STMT_CALL) {
if (!(*call_fail_met)) {
*call_fail_met = 1;
chassis *srv = con->srv;
*uniq_id = incremental_guid_get_next(&(srv->guid_state));
}
log_packet_error_info(con->client, ss->server, orig_sql, pkt, *uniq_id);
continue;
} else {
network_queue_append(send_queue, pkt);
g_queue_pop_head(recv_q->chunks);
if (context->rw_flag & CF_DDL) {
g_warning("%s: failed ddl query:%s, server:%s",
G_STRLOC, orig_sql, ss->server->dst->name->str);
}
return 0;
network_queue_append(send_queue, pkt);
g_queue_pop_head(recv_q->chunks);
if (context->rw_flag & CF_DDL) {
g_warning("%s: failed ddl query:%s, server:%s",
G_STRLOC, orig_sql, ss->server->dst->name->str);
}
return 0;
}
} else {
g_warning("%s: merge failed for con:%p", G_STRLOC, con);
@ -3160,15 +3145,7 @@ resultset_merge(network_queue *send_queue, GPtrArray *recv_queues,
return;
}
int call_fail_met = 0;
if (!check_fail_met(context, send_queue, recv_queues, con, uniq_id, &call_fail_met, merged_result)) {
return;
}
if (call_fail_met) {
g_warning("%s: call failed for con:%p", G_STRLOC, con);
merged_result->status = RM_CALL_FAIL;
if (!check_fail_met(context, send_queue, recv_queues, con, uniq_id, merged_result)) {
return;
}
@ -3199,7 +3176,6 @@ resultset_merge(network_queue *send_queue, GPtrArray *recv_queues,
case STMT_INSERT:
case STMT_UPDATE:
case STMT_DELETE:
case STMT_CALL: /* Response should have no records */
case STMT_SET:
case STMT_START:
case STMT_COMMIT: