mirror of
https://gitee.com/wangbin579/cetus.git
synced 2024-11-30 10:57:37 +08:00
Merge branch 'master' into multi-process
This commit is contained in:
commit
aaac1cbc0c
@ -146,7 +146,7 @@ Default: : 2 (seconds)
|
||||
|
||||
### proxy-read-timeout
|
||||
|
||||
Default: : 10 (minutes)
|
||||
Default: : 600 (seconds)
|
||||
|
||||
读Proxy的超时时间
|
||||
|
||||
@ -154,7 +154,7 @@ Default: : 10 (minutes)
|
||||
|
||||
### proxy-write-timeout
|
||||
|
||||
Default: : 10 (minutes)
|
||||
Default: : 600 (seconds)
|
||||
|
||||
写Proxy的超时时间
|
||||
|
||||
|
@ -136,6 +136,8 @@ Cetus内置监控功能,可通过配置选择开启或关闭。开启后Cetus
|
||||
|
||||
注释的使用方式为在SQL中SELECT字段之后插入/\*# mode=READWRITE \*/;部分应用可能对数据准确性特别敏感,这种情况下,我们可以设置默认所有请求都走主节点,但是,对于部分后台的统计分析功能,主要分析历史数据时,我们可以通过SQL指定后端到只读节点,来减少批量查询业务对主库的影响,我们可以在SELECT字段后插入/\*# mode=READONLY \*/来指定Cetus将SQL发送到只读从库进行执行。
|
||||
|
||||
**注:若使用注释请在连接Cetus时加上-c参数,如 mysql --prompt="proxy> " --comments -hxxx.xxx.xxx.xxx -Pxxxx -uxxxx -pxxx -c**
|
||||
|
||||
### 5.不支持 Kill query
|
||||
|
||||
不支持在SQL执行过程中 kill query操作,一旦SQL语句开始执行就不能通过这种方式来终止,此时可以连接Cetus管理后端,通过执行 show connectionlist 命令查看正在执行的SQL,从而找到正在执行的后端信息,通过数据库中 kill query的命令进行终止操作。
|
||||
|
@ -331,7 +331,9 @@ Cetus提供注释功能,用以解决日常维护时的需求(DBA同学经常
|
||||
|
||||
Sharding版支持的key类型:table|group|mode|transaction,支持的value包括all/readwrite/readonly/single_node。
|
||||
|
||||
使用示例如下:
|
||||
**注:若使用注释请在连接Cetus时加上-c参数,如 mysql --prompt="proxy> " --comments -hxxx.xxx.xxx.xxx -Pxxxx -uxxxx -pxxx -c**
|
||||
|
||||
注释功能使用示例如下:
|
||||
|
||||
**1.Key类型为table的用法**
|
||||
|
||||
@ -349,12 +351,6 @@ Sharding版支持的key类型:table|group|mode|transaction,支持的value包
|
||||
|
||||
说明:查询后端节点dataA中,表employee的记录数。
|
||||
|
||||
用法:/\*# group=all \*/
|
||||
|
||||
SQL: create /\*# group=all \*/ table employee xxxx;
|
||||
|
||||
说明:在后端所有节点均创建此表。
|
||||
|
||||
**3.Key类型为mode的用法**
|
||||
|
||||
用法:/\*# mode=readwrite \*/
|
||||
|
@ -88,6 +88,12 @@ struct transact_feature_t {
|
||||
int isolation_level;
|
||||
};
|
||||
|
||||
struct idlist_opt_t {
|
||||
sql_id_list_t* list;
|
||||
const char* span_start;
|
||||
const char* span_end;
|
||||
};
|
||||
|
||||
} // end %include
|
||||
|
||||
// Input is a single SQL command
|
||||
@ -825,7 +831,9 @@ insert_stmt ::= insert_cmd(R) INTO fullname(X) idlist_opt(F) select(S). {
|
||||
sql_insert_t* p = sql_insert_new();
|
||||
p->is_replace = R;
|
||||
p->table = X;
|
||||
p->columns = F;
|
||||
p->columns = F.list;
|
||||
p->columns_start = F.span_start;
|
||||
p->columns_end = F.span_end;
|
||||
p->sel_val = S;
|
||||
sql_insert(context, p);
|
||||
}
|
||||
@ -833,7 +841,9 @@ insert_stmt ::= insert_cmd(R) INTO fullname(X) idlist_opt(F) DEFAULT VALUES. {
|
||||
sql_insert_t* p = sql_insert_new();
|
||||
p->is_replace = R;
|
||||
p->table = X;
|
||||
p->columns = F;
|
||||
p->columns = F.list;
|
||||
p->columns_start = F.span_start;
|
||||
p->columns_end = F.span_end;
|
||||
p->sel_val = 0;
|
||||
sql_insert(context, p);
|
||||
}
|
||||
@ -842,7 +852,9 @@ insert_stmt ::= insert_cmd(R) INTO fullname(X) idlist_opt(F) values(S)
|
||||
sql_insert_t* p = sql_insert_new();
|
||||
p->is_replace = R;
|
||||
p->table = X;
|
||||
p->columns = F;
|
||||
p->columns = F.list;
|
||||
p->columns_start = F.span_start;
|
||||
p->columns_end = F.span_end;
|
||||
p->sel_val = S;
|
||||
sql_insert(context, p);
|
||||
}
|
||||
@ -852,13 +864,20 @@ insert_cmd(A) ::= INSERT. {A=0;}
|
||||
insert_cmd(A) ::= REPLACE. {A = 1;}
|
||||
insert_cmd(A) ::= INSERT IGNORE. {A=1;}
|
||||
|
||||
%type idlist_opt {sql_id_list_t*}
|
||||
%destructor idlist_opt {sql_id_list_free($$);}
|
||||
%type idlist_opt {struct idlist_opt_t}
|
||||
%destructor idlist_opt {sql_id_list_free($$.list);}
|
||||
%type idlist {sql_id_list_t*}
|
||||
%destructor idlist {sql_id_list_free($$);}
|
||||
|
||||
idlist_opt(A) ::= . {A = 0;}
|
||||
idlist_opt(A) ::= LP idlist(X) RP. {A = X;}
|
||||
idlist_opt(A) ::= . {
|
||||
A.list = 0;
|
||||
A.span_start = A.span_end = 0;
|
||||
}
|
||||
idlist_opt(A) ::= LP(U) idlist(X) RP(V). {
|
||||
A.list = X;
|
||||
A.span_start = U.z;
|
||||
A.span_end = V.z + V.n;
|
||||
}
|
||||
idlist(A) ::= idlist(A) COMMA nm(Y).
|
||||
{A = sql_id_list_append(A,&Y);}
|
||||
idlist(A) ::= nm(Y).
|
||||
|
@ -1020,7 +1020,9 @@ collate(C) ::= . {C = 0;}
|
||||
collate(C) ::= COLLATE ids. {C = 1;}
|
||||
|
||||
///////////////////////LOCK TABLES///////////////////////////
|
||||
cmd ::= LOCK TABLES lock_tables.
|
||||
cmd ::= LOCK TABLES lock_tables. {
|
||||
context->rw_flag |= CF_WRITE;
|
||||
}
|
||||
lock_tables ::= fullname as lock_type.
|
||||
lock_tables ::= lock_tables COMMA fullname as lock_type.
|
||||
lock_type ::= READ opt_local.
|
||||
@ -1029,4 +1031,6 @@ opt_local ::= LOCAL.
|
||||
opt_local ::= .
|
||||
opt_priority ::= LOW_PRIORITY.
|
||||
opt_priority ::= .
|
||||
cmd ::= UNLOCK TABLES.
|
||||
cmd ::= UNLOCK TABLES. {
|
||||
context->rw_flag |= CF_WRITE;
|
||||
}
|
||||
|
@ -430,16 +430,9 @@ sql_construct_insert(GString *s, sql_insert_t *p)
|
||||
g_string_append_c(s, ' ');
|
||||
}
|
||||
if (p->columns && p->columns->len > 0) {
|
||||
g_string_append_c(s, '(');
|
||||
int i = 0;
|
||||
for (i = 0; i < p->columns->len; ++i) {
|
||||
const char *col = g_ptr_array_index(p->columns, i);
|
||||
g_string_append(s, col);
|
||||
if (i != p->columns->len - 1) {
|
||||
g_string_append_c(s, ',');
|
||||
}
|
||||
}
|
||||
g_string_append(s, ") ");
|
||||
g_string_append_len(s, p->columns_start,
|
||||
p->columns_end - p->columns_start);
|
||||
g_string_append_c(s, ' ');
|
||||
}
|
||||
if (p->sel_val) {
|
||||
if (p->sel_val->from_src) {
|
||||
|
@ -218,6 +218,8 @@ struct sql_insert_t {
|
||||
sql_src_list_t *table;
|
||||
sql_select_t *sel_val; /* [1] select... [2] values(...) */
|
||||
sql_id_list_t *columns;
|
||||
const char *columns_start; /* [start, end) span of columns */
|
||||
const char *columns_end;
|
||||
};
|
||||
|
||||
struct sql_column_t {
|
||||
|
@ -423,9 +423,7 @@ process_non_trans_prepare_stmt(network_mysqld_con *con)
|
||||
int type = BACKEND_TYPE_RO;
|
||||
if (!proxy_get_backend_ndx(con, type, FALSE)) {
|
||||
visit_slave = FALSE;
|
||||
if (con->server == NULL) {
|
||||
con->slave_conn_shortaged = 1;
|
||||
}
|
||||
con->slave_conn_shortaged = 1;
|
||||
}
|
||||
} else {
|
||||
if (con->server) {
|
||||
@ -634,6 +632,7 @@ process_non_trans_query(network_mysqld_con *con, sql_context_t *context, mysqld_
|
||||
if (!success) {
|
||||
con->master_conn_shortaged = 1;
|
||||
g_debug("%s:PROXY_NO_CONNECTION", G_STRLOC);
|
||||
return PROXY_NO_CONNECTION;
|
||||
}
|
||||
}
|
||||
} else { /* ro operation */
|
||||
@ -651,7 +650,7 @@ process_non_trans_query(network_mysqld_con *con, sql_context_t *context, mysqld_
|
||||
if (con->config->read_master_percentage != 100) {
|
||||
if (!is_orig_ro_server) {
|
||||
gboolean success = proxy_get_backend_ndx(con, BACKEND_TYPE_RO, FALSE);
|
||||
if (!success && con->server == NULL) {
|
||||
if (!success) {
|
||||
con->slave_conn_shortaged = 1;
|
||||
g_debug("%s:PROXY_NO_CONNECTION", G_STRLOC);
|
||||
}
|
||||
@ -662,6 +661,7 @@ process_non_trans_query(network_mysqld_con *con, sql_context_t *context, mysqld_
|
||||
if (!success) {
|
||||
con->master_conn_shortaged = 1;
|
||||
g_debug("%s:PROXY_NO_CONNECTION", G_STRLOC);
|
||||
return PROXY_NO_CONNECTION;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1056,11 +1056,9 @@ forced_visit(network_mysqld_con *con, proxy_plugin_con_t *st, sql_context_t *con
|
||||
context->rw_flag & CF_FORCE_SLAVE);
|
||||
if (!success) {
|
||||
if (type == BACKEND_TYPE_RO) {
|
||||
if (con->server == NULL) {
|
||||
con->slave_conn_shortaged = 1;
|
||||
g_debug("%s:slave_conn_shortaged is true", G_STRLOC);
|
||||
success = proxy_get_backend_ndx(con, BACKEND_TYPE_RW, FALSE);
|
||||
}
|
||||
con->slave_conn_shortaged = 1;
|
||||
g_debug("%s:slave_conn_shortaged is true", G_STRLOC);
|
||||
success = proxy_get_backend_ndx(con, BACKEND_TYPE_RW, FALSE);
|
||||
}
|
||||
|
||||
if (!success) {
|
||||
@ -2044,8 +2042,8 @@ mysqld_con_reserved_connections_free(network_mysqld_con *con)
|
||||
chassis_private *g = srv->priv;
|
||||
if (st->backend_ndx_array) {
|
||||
int i, checked = 0;
|
||||
for (i = 0; i < MAX_SERVER_NUM; i++) {
|
||||
if (st->backend_ndx_array[i] == 0) {
|
||||
for (i = 0; i < MAX_SERVER_NUM_FOR_PREPARE; i++) {
|
||||
if (st->backend_ndx_array[i] <= 0) {
|
||||
continue;
|
||||
}
|
||||
/* rw-edition: after filtering, now [i] is a valid backend index */
|
||||
@ -2666,6 +2664,9 @@ network_mysqld_proxy_plugin_apply_config(chassis *chas, chassis_plugin_config *c
|
||||
|
||||
if (network_backends_load_config(g->backends, chas) != -1) {
|
||||
network_connection_pool_create_conns(chas);
|
||||
evtimer_set(&chas->auto_create_conns_event, check_and_create_conns_func, chas);
|
||||
struct timeval check_interval = {10, 0};
|
||||
chassis_event_add_with_timeout(chas, &chas->auto_create_conns_event, &check_interval);
|
||||
}
|
||||
chassis_config_register_service(chas->config_manager, config->address, "proxy");
|
||||
|
||||
|
@ -270,6 +270,10 @@ NETWORK_MYSQLD_PLUGIN_PROTO(proxy_read_query)
|
||||
}
|
||||
}
|
||||
|
||||
con->master_conn_shortaged = 0;
|
||||
con->slave_conn_shortaged = 0;
|
||||
con->use_slave_forced = 0;
|
||||
|
||||
rc = proxy_get_server_list(con);
|
||||
|
||||
switch (rc) {
|
||||
@ -1257,6 +1261,12 @@ proxy_get_pooled_connection(network_mysqld_con *con,
|
||||
|
||||
*sock = network_connection_pool_get(backend->pool, con->client->response->username, is_robbed);
|
||||
if (*sock == NULL) {
|
||||
if (type == BACKEND_TYPE_RW) {
|
||||
con->master_conn_shortaged = 1;
|
||||
} else {
|
||||
con->slave_conn_shortaged = 1;
|
||||
}
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
@ -1654,11 +1664,6 @@ build_xa_end_command(network_mysqld_con *con, server_session_t *ss, int first)
|
||||
|
||||
NETWORK_MYSQLD_PLUGIN_PROTO(proxy_get_server_conn_list)
|
||||
{
|
||||
if (con->srv->complement_conn_cnt > 0) {
|
||||
network_connection_pool_create_conn(con);
|
||||
con->srv->complement_conn_cnt--;
|
||||
}
|
||||
|
||||
GList *chunk = con->client->recv_queue->chunks->head;
|
||||
GString *packet = (GString *)(chunk->data);
|
||||
gboolean do_query = FALSE;
|
||||
@ -1797,6 +1802,7 @@ NETWORK_MYSQLD_PLUGIN_PROTO(proxy_get_server_conn_list)
|
||||
if (con->dist_tran_failed) {
|
||||
network_queue_clear(con->client->recv_queue);
|
||||
network_mysqld_queue_reset(con->client);
|
||||
g_message("%s: clear recv queue", G_STRLOC);
|
||||
}
|
||||
} else {
|
||||
if (!ss->participated) {
|
||||
@ -1817,7 +1823,12 @@ NETWORK_MYSQLD_PLUGIN_PROTO(proxy_get_server_conn_list)
|
||||
snprintf(p_xa_log_buffer, XA_LOG_BUF_LEN - (p_xa_log_buffer - xa_log_buffer),
|
||||
"%s@%d", ss->server->dst->name->str, ss->server->challenge->thread_id);
|
||||
p_xa_log_buffer = p_xa_log_buffer + strlen(p_xa_log_buffer);
|
||||
shard_build_xa_query(con, ss);
|
||||
if (shard_build_xa_query(con, ss) == -1) {
|
||||
g_warning("%s:shard_build_xa_query failed for con:%p", G_STRLOC, con);
|
||||
con->server_to_be_closed = 1;
|
||||
con->dist_tran_state = NEXT_ST_XA_OVER;
|
||||
return NETWORK_SOCKET_ERROR;
|
||||
}
|
||||
is_xa_query = 1;
|
||||
if (con->is_auto_commit) {
|
||||
ss->dist_tran_state = NEXT_ST_XA_END;
|
||||
@ -2402,7 +2413,7 @@ network_mysqld_shard_plugin_get_options(chassis_plugin_config *config)
|
||||
|
||||
chassis_options_add(&opts, "proxy-read-timeout",
|
||||
0, 0, OPTION_ARG_DOUBLE, &(config->read_timeout_dbl),
|
||||
"read timeout in seconds (default: 10 minutes)", NULL,
|
||||
"read timeout in seconds (default: 600 seconds)", NULL,
|
||||
assign_proxy_read_timeout, show_proxy_read_timeout, ALL_OPTS_PROPERTY);
|
||||
|
||||
chassis_options_add(&opts, "proxy-xa-commit-or-rollback-read-timeout",
|
||||
@ -2413,7 +2424,7 @@ network_mysqld_shard_plugin_get_options(chassis_plugin_config *config)
|
||||
|
||||
chassis_options_add(&opts, "proxy-write-timeout",
|
||||
0, 0, OPTION_ARG_DOUBLE, &(config->write_timeout_dbl),
|
||||
"write timeout in seconds (default: 10 minutes)", NULL,
|
||||
"write timeout in seconds (default: 600 seconds)", NULL,
|
||||
assign_proxy_write_timeout, show_proxy_write_timeout, ALL_OPTS_PROPERTY);
|
||||
|
||||
chassis_options_add(&opts, "proxy-allow-ip",
|
||||
@ -2542,6 +2553,9 @@ network_mysqld_shard_plugin_apply_config(chassis *chas, chassis_plugin_config *c
|
||||
|
||||
if (network_backends_load_config(g->backends, chas) != -1) {
|
||||
network_connection_pool_create_conns(chas);
|
||||
evtimer_set(&chas->auto_create_conns_event, check_and_create_conns_func, chas);
|
||||
struct timeval check_interval = {10, 0};
|
||||
chassis_event_add_with_timeout(chas, &chas->auto_create_conns_event, &check_interval);
|
||||
}
|
||||
chassis_config_register_service(chas->config_manager, config->address, "shard");
|
||||
|
||||
|
@ -294,6 +294,15 @@ struct condition_t {
|
||||
} v;
|
||||
};
|
||||
|
||||
/**
|
||||
* ! we don't handle negative `b`
|
||||
*/
|
||||
static int32_t modulo(int64_t a, int32_t b)
|
||||
{
|
||||
int32_t r = a % b;
|
||||
return r < 0 ? r + b : r;
|
||||
}
|
||||
|
||||
/**
|
||||
* check if the group satisfies an inequation
|
||||
* suppose condition is "Greater Than 42", (op = TK_GT, v.num = 42)
|
||||
@ -308,7 +317,7 @@ partition_satisfies(sharding_partition_t *partition, struct condition_t cond)
|
||||
int64_t hash_value = (partition->key_type == SHARD_DATA_TYPE_STR)
|
||||
? cetus_str_hash((const unsigned char *)cond.v.str) : cond.v.num;
|
||||
|
||||
int32_t hash_mod = hash_value % partition->hash_count;
|
||||
int32_t hash_mod = modulo(hash_value, partition->hash_count);
|
||||
|
||||
if (cond.op == TK_EQ) {
|
||||
return sharding_partition_contain_hash(partition, hash_mod);
|
||||
|
@ -52,6 +52,7 @@ typedef struct chassis_private chassis_private;
|
||||
typedef struct chassis chassis;
|
||||
|
||||
#define MAX_SERVER_NUM 64
|
||||
#define MAX_SERVER_NUM_FOR_PREPARE 16
|
||||
#define MAX_QUERY_TIME 1000
|
||||
#define MAX_WAIT_TIME 1024
|
||||
#define MAX_TRY_NUM 6
|
||||
@ -134,7 +135,7 @@ struct chassis {
|
||||
unsigned int is_reduce_conns;
|
||||
unsigned int xa_log_detailed;
|
||||
unsigned int check_slave_delay;
|
||||
int complement_conn_cnt;
|
||||
int complement_conn_flag;
|
||||
int default_query_cache_timeout;
|
||||
int client_idle_timeout;
|
||||
double slave_delay_down_threshold_sec;
|
||||
@ -196,6 +197,8 @@ struct chassis {
|
||||
gint print_version;
|
||||
|
||||
gint group_replication_mode;
|
||||
|
||||
struct event auto_create_conns_event;
|
||||
};
|
||||
|
||||
CHASSIS_API chassis *chassis_new(void);
|
||||
|
@ -551,8 +551,8 @@ assign_max_alive_time(const gchar *newval, gpointer param) {
|
||||
gint value = 0;
|
||||
if (try_get_int_value(newval, &value)) {
|
||||
if (value >= 0) {
|
||||
if (value < 600) {
|
||||
value = 600;
|
||||
if (value < 60) {
|
||||
value = 60;
|
||||
}
|
||||
srv->max_alive_time = value;
|
||||
ret = ASSIGN_OK;
|
||||
|
@ -564,8 +564,8 @@ init_parameters(struct chassis_frontend_t *frontend, chassis *srv)
|
||||
g_message("set max resp len:%d", srv->max_resp_len);
|
||||
|
||||
srv->current_time = time(0);
|
||||
if (frontend->max_alive_time < 600) {
|
||||
frontend->max_alive_time = 600;
|
||||
if (frontend->max_alive_time < 60) {
|
||||
frontend->max_alive_time = 60;
|
||||
}
|
||||
srv->max_alive_time = frontend->max_alive_time;
|
||||
g_message("set max alive time:%d", srv->max_alive_time);
|
||||
|
@ -77,7 +77,7 @@ network_mysqld_con_idle_handle(int event_fd, short events, void *user_data)
|
||||
network_connection_pool_remove(pool, pool_entry);
|
||||
if (pool->srv) {
|
||||
chassis *srv = pool->srv;
|
||||
srv->complement_conn_cnt++;
|
||||
srv->complement_conn_flag = 1;
|
||||
}
|
||||
|
||||
g_message("%s:the server decided to close the connection", G_STRLOC);
|
||||
@ -85,7 +85,7 @@ network_mysqld_con_idle_handle(int event_fd, short events, void *user_data)
|
||||
} else if (events == EV_TIMEOUT) {
|
||||
if (pool->srv) {
|
||||
chassis *srv = pool->srv;
|
||||
srv->complement_conn_cnt++;
|
||||
srv->complement_conn_flag = 1;
|
||||
}
|
||||
network_connection_pool_remove(pool, pool_entry);
|
||||
}
|
||||
@ -97,18 +97,18 @@ network_pool_add_idle_conn(network_connection_pool *pool, chassis *srv, network_
|
||||
network_connection_pool_entry *pool_entry = NULL;
|
||||
pool_entry = network_connection_pool_add(pool, server);
|
||||
event_set(&(server->event), server->fd, EV_READ, network_mysqld_con_idle_handle, pool_entry);
|
||||
g_debug("%s: ev:%p add network_mysqld_con_idle_handle for server:%p, fd:%d",
|
||||
G_STRLOC, &(server->event), server, server->fd);
|
||||
int surplus_time = srv->current_time - server->create_time;
|
||||
surplus_time = srv->max_alive_time - surplus_time;
|
||||
surplus_time = srv->max_alive_time - surplus_time + g_random_int_range(0, 240);
|
||||
|
||||
if (surplus_time < 60) {
|
||||
g_debug("%s: negtive surplus_time:%d", G_STRLOC, surplus_time);
|
||||
surplus_time = 60 + g_random_int_range(0, 240);
|
||||
surplus_time = 60;
|
||||
}
|
||||
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = surplus_time;
|
||||
timeout.tv_usec = 0;
|
||||
g_debug("%s: ev:%p add network_mysqld_con_idle_handle for server:%p, fd:%d, timeout:%d",
|
||||
G_STRLOC, &(server->event), server, server->fd, surplus_time);
|
||||
|
||||
chassis_event_add_with_timeout(srv, &(server->event), &timeout);
|
||||
|
||||
@ -203,12 +203,14 @@ network_pool_add_conn(network_mysqld_con *con, int is_swap)
|
||||
network_socket *server;
|
||||
network_backend_t *backend;
|
||||
|
||||
for (i = 0; i < con->servers->len; i++) {
|
||||
for (i = 0; i < MAX_SERVER_NUM_FOR_PREPARE; i++) {
|
||||
|
||||
if (st->backend_ndx_array == NULL) {
|
||||
g_message("%s: st backend ndx array is null:%p", G_STRLOC, con);
|
||||
} else {
|
||||
if (st->backend_ndx_array[i] == 0) {
|
||||
if (st->backend_ndx_array[i] <= 0) {
|
||||
g_message("%s: i:%d backend_ndx_array value:%d for con:%p",
|
||||
G_STRLOC, i, st->backend_ndx_array[i], con);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -266,8 +268,9 @@ mysqld_con_reserved_connections_add(network_mysqld_con *con, network_socket *soc
|
||||
{
|
||||
proxy_plugin_con_t *st = con->plugin_con_state;
|
||||
if (st->backend_ndx_array == NULL) {
|
||||
st->backend_ndx_array = g_new0(short, MAX_SERVER_NUM);
|
||||
st->backend_ndx_array = g_new0(short, MAX_SERVER_NUM_FOR_PREPARE);
|
||||
st->backend_ndx_array[st->backend_ndx] = 1; /* current sock index = 0 */
|
||||
g_debug("%s: set st->backend_ndx:%d ndx array 1", G_STRLOC, st->backend_ndx);
|
||||
}
|
||||
|
||||
if (con->servers == NULL) {
|
||||
@ -277,6 +280,7 @@ mysqld_con_reserved_connections_add(network_mysqld_con *con, network_socket *soc
|
||||
}
|
||||
g_ptr_array_add(con->servers, sock);
|
||||
st->backend_ndx_array[backend_idx] = con->servers->len;
|
||||
g_debug("%s: set backend_ndx:%d ndx array:%d", G_STRLOC, backend_idx, con->servers->len);
|
||||
}
|
||||
|
||||
network_socket *
|
||||
@ -284,6 +288,8 @@ mysqld_con_reserved_connections_get(network_mysqld_con *con, int backend_idx)
|
||||
{
|
||||
proxy_plugin_con_t *st = con->plugin_con_state;
|
||||
if (con->servers) {
|
||||
g_debug("%s: backend_idx:%d backend_ndx_array value:%d for con:%p",
|
||||
G_STRLOC, backend_idx, st->backend_ndx_array[backend_idx], con);
|
||||
int conn_idx = st->backend_ndx_array[backend_idx];
|
||||
if (conn_idx > 0) {
|
||||
conn_idx -= 1;
|
||||
@ -350,9 +356,15 @@ network_connection_pool_swap(network_mysqld_con *con, int backend_ndx)
|
||||
GString *name = con->client->response ? con->client->response->username : &empty_name;
|
||||
network_socket *sock = network_connection_pool_get(backend->pool, name, &is_robbed);
|
||||
if (sock == NULL) {
|
||||
if (server_switch_need_add) {
|
||||
g_message("%s:retrieve master conn failed, but still hold read server", G_STRLOC);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (con->server) {
|
||||
if (network_pool_add_conn(con, 1) != 0) {
|
||||
g_warning("%s: move the curr conn back into the pool failed", G_STRLOC);
|
||||
g_warning("%s: move the curr conn back into the pool failed:%p",
|
||||
G_STRLOC, con);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
@ -1710,11 +1710,17 @@ build_attr_statements(network_mysqld_con *con)
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
int
|
||||
shard_build_xa_query(network_mysqld_con *con, server_session_t *ss)
|
||||
{
|
||||
network_socket *recv_sock = con->client;
|
||||
GList *chunk = recv_sock->recv_queue->chunks->head;
|
||||
|
||||
if (chunk == NULL) {
|
||||
g_critical("%s:chunk is nil", G_STRLOC);
|
||||
return -1;
|
||||
}
|
||||
|
||||
GString *packet = (GString *)(chunk->data);
|
||||
|
||||
g_debug("%s:packet id:%d when get server", G_STRLOC, ss->server->last_packet_id);
|
||||
@ -1736,6 +1742,8 @@ shard_build_xa_query(network_mysqld_con *con, server_session_t *ss)
|
||||
con->is_xa_query_sent = 1;
|
||||
con->all_participate_num++;
|
||||
con->resp_expected_num++;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
@ -1885,7 +1893,7 @@ disp_xa_abnormal_resultset(network_mysqld_con *con, server_session_t *ss,
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
static int
|
||||
disp_xa_according_state(network_mysqld_con *con, server_session_t *ss,
|
||||
int *is_xa_cmd_met, int *is_xa_query, char **p_buffer, char *buffer, int end)
|
||||
{
|
||||
@ -1901,7 +1909,10 @@ disp_xa_according_state(network_mysqld_con *con, server_session_t *ss,
|
||||
snprintf(*p_buffer, XA_BUF_LEN - (*p_buffer - buffer), "%s@%d",
|
||||
ss->server->dst->name->str, ss->server->challenge->thread_id);
|
||||
*p_buffer = *p_buffer + strlen(*p_buffer);
|
||||
shard_build_xa_query(con, ss);
|
||||
if (shard_build_xa_query(con, ss) == -1) {
|
||||
g_warning("%s:shard_build_xa_query failed for con:%p", G_STRLOC, con);
|
||||
return -1;
|
||||
}
|
||||
*is_xa_query = 1;
|
||||
g_debug("%s:set is xa query true for con:%p", G_STRLOC, con);
|
||||
if (con->is_auto_commit) {
|
||||
@ -1931,6 +1942,8 @@ disp_xa_according_state(network_mysqld_con *con, server_session_t *ss,
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
@ -2010,7 +2023,13 @@ build_xa_statements(network_mysqld_con *con)
|
||||
if (result == -1) {
|
||||
disp_xa_abnormal_resultset(con, ss, &is_xa_cmd_met, &p_buffer, buffer, end);
|
||||
} else {
|
||||
disp_xa_according_state(con, ss, &is_xa_cmd_met, &is_xa_query, &p_buffer, buffer, end);
|
||||
if (disp_xa_according_state(con, ss, &is_xa_cmd_met,
|
||||
&is_xa_query, &p_buffer, buffer, end) == -1)
|
||||
{
|
||||
con->server_to_be_closed = 1;
|
||||
con->dist_tran_state = NEXT_ST_XA_OVER;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
global_xa_state = ss->dist_tran_state;
|
||||
@ -2306,11 +2325,6 @@ handle_read_query(network_mysqld_con *con, network_mysqld_con_state_t ostate)
|
||||
|
||||
gettimeofday(&(con->req_recv_time), NULL);
|
||||
|
||||
if (srv->is_need_to_create_conns) {
|
||||
srv->is_need_to_create_conns = 0;
|
||||
network_connection_pool_create_conns(srv);
|
||||
}
|
||||
|
||||
if (!con->is_wait_server) {
|
||||
do {
|
||||
switch (network_mysqld_read(srv, recv_sock)) {
|
||||
@ -2359,6 +2373,12 @@ handle_read_query(network_mysqld_con *con, network_mysqld_con_state_t ostate)
|
||||
}
|
||||
|
||||
con->resp_too_long = 0;
|
||||
|
||||
/* check for tracing some problems and it will be removed later */
|
||||
if (con->client->recv_queue->chunks->head == NULL) {
|
||||
g_critical("%s:client recv queue head is nil", G_STRLOC);
|
||||
}
|
||||
|
||||
g_debug("%s:call read query", G_STRLOC);
|
||||
network_socket_retval_t ret = plugin_call(srv, con, con->state);
|
||||
switch (ret) {
|
||||
@ -4799,6 +4819,7 @@ network_connection_pool_create_conn(network_mysqld_con *con)
|
||||
} else if (total >= pool->mid_idle_connections) {
|
||||
int idle_conn = total - backend->connected_clients;
|
||||
if (idle_conn > backend->connected_clients) {
|
||||
g_debug("%s: idle conn num is enough:%d, %d", G_STRLOC, idle_conn, backend->connected_clients);
|
||||
continue;
|
||||
}
|
||||
int is_need_to_create = 0;
|
||||
@ -4806,17 +4827,23 @@ network_connection_pool_create_conn(network_mysqld_con *con)
|
||||
case BACKEND_TYPE_RW:
|
||||
if (con->master_conn_shortaged) {
|
||||
is_need_to_create = 1;
|
||||
} else {
|
||||
g_message("%s: master_conn_shortaged false", G_STRLOC);
|
||||
}
|
||||
break;
|
||||
case BACKEND_TYPE_RO:
|
||||
if (con->slave_conn_shortaged) {
|
||||
is_need_to_create = 1;
|
||||
} else {
|
||||
g_message("%s: slave_conn_shortaged false", G_STRLOC);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
g_warning("%s: unknown type:%d", G_STRLOC, backend->type);
|
||||
break;
|
||||
}
|
||||
if (!is_need_to_create) {
|
||||
g_message("%s: is_need_to_create false", G_STRLOC);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@ -4885,6 +4912,7 @@ network_connection_pool_create_conn(network_mysqld_con *con)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
network_connection_pool_create_conns(chassis *srv)
|
||||
{
|
||||
@ -4897,12 +4925,15 @@ network_connection_pool_create_conns(chassis *srv)
|
||||
if (backend->state != BACKEND_STATE_UP && backend->state != BACKEND_STATE_UNKNOWN) {
|
||||
continue;
|
||||
}
|
||||
int allowd_conn_num = backend->config->mid_conn_pool;
|
||||
|
||||
int total = backend->pool->cur_idle_connections + backend->connected_clients;
|
||||
if (total > 0) {
|
||||
if (total >= allowd_conn_num) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int allowd_conn_num = backend->config->mid_conn_pool;
|
||||
allowd_conn_num = allowd_conn_num - total;
|
||||
|
||||
if (allowd_conn_num > MAX_CREATE_CONN_NUM) {
|
||||
allowd_conn_num = MAX_CREATE_CONN_NUM;
|
||||
}
|
||||
@ -4974,6 +5005,27 @@ network_connection_pool_create_conns(chassis *srv)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void
|
||||
check_and_create_conns_func(int fd, short what, void *arg)
|
||||
{
|
||||
chassis* chas = arg;
|
||||
|
||||
if (chas->is_need_to_create_conns) {
|
||||
network_connection_pool_create_conns(chas);
|
||||
chas->is_need_to_create_conns = 0;
|
||||
} else {
|
||||
if (chas->complement_conn_flag) {
|
||||
network_connection_pool_create_conns(chas);
|
||||
chas->complement_conn_flag = 0;
|
||||
}
|
||||
}
|
||||
|
||||
g_message("%s: check_and_create_conns_func", G_STRLOC);
|
||||
struct timeval check_interval = {10, 0};
|
||||
chassis_event_add_with_timeout(chas, &chas->auto_create_conns_event, &check_interval);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -765,12 +765,13 @@ NETWORK_API int network_mysqld_queue_reset(network_socket *sock);
|
||||
|
||||
NETWORK_API void network_connection_pool_create_conn(network_mysqld_con *con);
|
||||
NETWORK_API void network_connection_pool_create_conns(chassis *srv);
|
||||
NETWORK_API void check_and_create_conns_func(int fd, short what, void *arg);
|
||||
|
||||
NETWORK_API void record_xa_log_for_mending(network_mysqld_con *con, network_socket *sock);
|
||||
NETWORK_API gboolean shard_set_autocommit(network_mysqld_con *con);
|
||||
NETWORK_API gboolean shard_set_charset_consistant(network_mysqld_con *con);
|
||||
NETWORK_API gboolean shard_set_default_db_consistant(network_mysqld_con *con);
|
||||
NETWORK_API gboolean shard_set_multi_stmt_consistant(network_mysqld_con *con);
|
||||
NETWORK_API void shard_build_xa_query(network_mysqld_con *con, server_session_t *ss);
|
||||
NETWORK_API int shard_build_xa_query(network_mysqld_con *con, server_session_t *ss);
|
||||
|
||||
#endif
|
||||
|
@ -629,7 +629,7 @@ proxy_put_shard_conn_to_pool(network_mysqld_con *con)
|
||||
G_STRLOC, server, con, (int)con->servers->len);
|
||||
network_socket_free(server);
|
||||
if (!is_reduced) {
|
||||
con->srv->complement_conn_cnt++;
|
||||
con->srv->complement_conn_flag = 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user