mirror of
https://gitee.com/wangbin579/cetus.git
synced 2024-12-02 03:47:41 +08:00
Merge branch 'master' into shard-online
This commit is contained in:
commit
2670055022
@ -8,7 +8,7 @@ plugins=shard,admin
|
||||
# Proxy Configuration, For eaxmlpe: MySQL master host ip is 192.0.0.1 and salve host ip is 192.0.0.2
|
||||
proxy-address=0.0.0.0:6001
|
||||
proxy-backend-addresses=192.0.0.1:3306@data1,192.0.0.1:3307@data2,192.0.0.1:3308@data3,192.0.0.1:3309@data4
|
||||
proxy-backend-addresses=192.0.0.2:3306@data1,192.0.0.2:3307@data2,192.0.0.2:3308@data3,192.0.0.2:3309@data4
|
||||
proxy-read-only-backend-addresses=192.0.0.2:3306@data1,192.0.0.2:3307@data2,192.0.0.2:3308@data3,192.0.0.2:3309@data4
|
||||
|
||||
# Admin Configuration
|
||||
admin-address=0.0.0.0:7001
|
||||
|
@ -125,3 +125,37 @@ try_get_double_value(const gchar *option_value, gdouble *return_value)
|
||||
return FALSE;
|
||||
}
|
||||
}
|
||||
|
||||
int make_iso8601_timestamp(char *buf, uint64_t utime)
|
||||
{
|
||||
struct tm my_tm;
|
||||
char tzinfo[7]="Z"; // max 6 chars plus \0
|
||||
size_t len;
|
||||
time_t seconds;
|
||||
|
||||
seconds= utime / 1000000;
|
||||
utime = utime % 1000000;
|
||||
{
|
||||
localtime_r(&seconds, &my_tm);
|
||||
long tim= timezone; // seconds West of UTC.
|
||||
char dir= '-';
|
||||
|
||||
if (tim < 0) {
|
||||
dir= '+';
|
||||
tim= -tim;
|
||||
}
|
||||
snprintf(tzinfo, sizeof(tzinfo), "%c%02d:%02d",
|
||||
dir, (int) (tim / (60 * 60)), (int) ((tim / 60) % 60));
|
||||
}
|
||||
|
||||
len = snprintf(buf, 64, "%04d-%02d-%02dT%02d:%02d:%02d.%06lu%s",
|
||||
my_tm.tm_year + 1900,
|
||||
my_tm.tm_mon + 1,
|
||||
my_tm.tm_mday,
|
||||
my_tm.tm_hour,
|
||||
my_tm.tm_min,
|
||||
my_tm.tm_sec,
|
||||
(unsigned long) utime,
|
||||
tzinfo);
|
||||
return len;
|
||||
}
|
||||
|
@ -49,4 +49,6 @@ gboolean read_file_to_buffer(const char *filename, char **buffer);
|
||||
gboolean try_get_int_value(const gchar *option_value, gint *return_value);
|
||||
gboolean try_get_double_value(const gchar *option_value, gdouble *return_value);
|
||||
|
||||
int make_iso8601_timestamp(char *buf, uint64_t utime);
|
||||
|
||||
#endif
|
||||
|
@ -56,9 +56,7 @@ void
|
||||
chassis_event_add_with_timeout(chassis *chas, struct event *ev, struct timeval *tv)
|
||||
{
|
||||
event_base_set(chas->event_base, ev);
|
||||
#if NETWORK_DEBUG_TRACE_EVENT
|
||||
CHECK_PENDING_EVENT(ev);
|
||||
#endif
|
||||
event_add(ev, tv);
|
||||
g_debug("%s:event add ev:%p", G_STRLOC, ev);
|
||||
}
|
||||
|
@ -329,6 +329,7 @@ chassis_mainloop(void *_chas)
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef SIMPLE_PARSER
|
||||
chas->dist_tran_id = g_random_int_range(0, 100000000);
|
||||
int srv_id = g_random_int_range(0, 10000);
|
||||
if (chas->proxy_address) {
|
||||
@ -338,6 +339,7 @@ chassis_mainloop(void *_chas)
|
||||
}
|
||||
g_message("Initial dist_tran_id:%llu", chas->dist_tran_id);
|
||||
g_message("dist_tran_prefix:%s", chas->dist_tran_prefix);
|
||||
#endif
|
||||
|
||||
/*
|
||||
* drop root privileges if requested
|
||||
|
@ -54,7 +54,10 @@ typedef struct chassis chassis;
|
||||
#define MAX_SERVER_NUM 64
|
||||
#define MAX_QUERY_TIME 1000
|
||||
#define MAX_WAIT_TIME 1024
|
||||
#define MAX_TRY_NUM 6
|
||||
#define MAX_CREATE_CONN_NUM 512
|
||||
#define MAX_DIST_TRAN_PREFIX 32
|
||||
#define DEFAULT_LIVE_TIME 7200
|
||||
|
||||
#define MAX_ALLOWED_PACKET_CEIL (1 * GB)
|
||||
#define MAX_ALLOWED_PACKET_DEFAULT (32 * MB)
|
||||
@ -148,6 +151,7 @@ struct chassis {
|
||||
int merged_output_size;
|
||||
int max_header_size;
|
||||
int compressed_merged_output_size;
|
||||
int is_need_to_create_conns;
|
||||
|
||||
/* Conn-pool initialize settings */
|
||||
int max_idle_connections;
|
||||
|
@ -532,7 +532,7 @@ show_max_alive_time(gpointer param) {
|
||||
return g_strdup_printf("%d (s)", srv->max_alive_time);
|
||||
}
|
||||
if(CAN_SAVE_OPTS_PROPERTY(opt_type)) {
|
||||
if(srv->max_alive_time == 7200) {
|
||||
if(srv->max_alive_time == DEFAULT_LIVE_TIME) {
|
||||
return NULL;
|
||||
}
|
||||
return g_strdup_printf("%d", srv->max_alive_time);
|
||||
|
@ -166,7 +166,7 @@ chassis_frontend_new(void)
|
||||
|
||||
frontend->default_pool_size = 100;
|
||||
frontend->max_resp_len = 10 * 1024 * 1024; /* 10M */
|
||||
frontend->max_alive_time = 7200;
|
||||
frontend->max_alive_time = DEFAULT_LIVE_TIME;
|
||||
frontend->merged_output_size = 8192;
|
||||
frontend->max_header_size = 65536;
|
||||
frontend->config_port = 3306;
|
||||
@ -679,12 +679,6 @@ static void
|
||||
slow_query_log_handler(const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer user_data)
|
||||
{
|
||||
FILE *fp = user_data;
|
||||
time_t t = time(0);
|
||||
struct tm *tm = localtime(&t);
|
||||
char timestr[32] = { 0 };
|
||||
const int len = 20;
|
||||
strftime(timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S ", tm);
|
||||
fwrite(timestr, 1, len, fp);
|
||||
fwrite(message, 1, strlen(message), fp);
|
||||
fwrite("\n", 1, 1, fp);
|
||||
}
|
||||
|
@ -31,7 +31,7 @@
|
||||
#include "cetus-users.h"
|
||||
|
||||
const char *backend_state_t_str[] = {
|
||||
"unkown",
|
||||
"unknown",
|
||||
"online",
|
||||
"down",
|
||||
"maintaining",
|
||||
@ -39,7 +39,7 @@ const char *backend_state_t_str[] = {
|
||||
};
|
||||
|
||||
const char *backend_type_t_str[] = {
|
||||
"unkown",
|
||||
"unknown",
|
||||
"read/write",
|
||||
"readonly"
|
||||
};
|
||||
@ -197,7 +197,7 @@ static void set_backend_config(network_backend_t *backend, chassis *srv) {
|
||||
*/
|
||||
int
|
||||
network_backends_add(network_backends_t *bs, const gchar *address,
|
||||
backend_type_t type, backend_state_t state, void *srv)
|
||||
backend_type_t type, backend_state_t state, chassis *srv)
|
||||
{
|
||||
network_backend_t *new_backend = network_backend_new();
|
||||
new_backend->type = type;
|
||||
@ -237,7 +237,7 @@ network_backends_add(network_backends_t *bs, const gchar *address,
|
||||
}
|
||||
|
||||
set_backend_config(new_backend, srv);
|
||||
network_connection_pool_create_conns(srv);
|
||||
srv->is_need_to_create_conns = 1;
|
||||
network_backends_into_group(bs, new_backend);
|
||||
g_message("added %s backend: %s, state: %s", backend_type_t_str[type], address, backend_state_t_str[state]);
|
||||
|
||||
@ -343,7 +343,8 @@ network_backends_modify(network_backends_t *bs, guint ndx,
|
||||
cur->state_since = now;
|
||||
if (state == BACKEND_STATE_UP || state == BACKEND_TYPE_UNKNOWN) {
|
||||
if (cur->pool->srv) {
|
||||
network_connection_pool_create_conns(cur->pool->srv);
|
||||
chassis *srv = cur->pool->srv;
|
||||
srv->is_need_to_create_conns = 1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -110,7 +110,7 @@ typedef struct {
|
||||
|
||||
NETWORK_API network_backends_t *network_backends_new();
|
||||
NETWORK_API void network_backends_free(network_backends_t *);
|
||||
NETWORK_API int network_backends_add(network_backends_t *, const gchar *, backend_type_t, backend_state_t, void *);
|
||||
NETWORK_API int network_backends_add(network_backends_t *, const gchar *, backend_type_t, backend_state_t, chassis *);
|
||||
NETWORK_API int network_backends_remove(network_backends_t *bs, guint index);
|
||||
NETWORK_API int network_backends_check(network_backends_t *bs);
|
||||
NETWORK_API int network_backends_modify(network_backends_t *, guint, backend_type_t, backend_state_t, backend_state_t);
|
||||
|
@ -2235,6 +2235,23 @@ disp_query_after_consistant_attr(network_mysqld_con *con)
|
||||
}
|
||||
}
|
||||
|
||||
void log_slowquery(int interval_ms, char* host, char* user, char* sql)
|
||||
{
|
||||
uint64_t usec;
|
||||
struct timeval t;
|
||||
gettimeofday(&t, NULL);
|
||||
usec = (uint64_t)t.tv_sec * 1000000 + t.tv_usec;
|
||||
char time_str[64];
|
||||
make_iso8601_timestamp(time_str, usec);
|
||||
|
||||
float interval = interval_ms / 1000.0;
|
||||
g_log("slowquery", G_LOG_LEVEL_MESSAGE,
|
||||
"# Time: %s\n"
|
||||
"# User@Host: %s @ %s Id: 0\n"
|
||||
"# Query_time: %f Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0\n"
|
||||
"SET timestamp=%ld\n%s", time_str, user, host, interval, t.tv_sec, sql);
|
||||
}
|
||||
|
||||
static void
|
||||
handle_query_time_stats(network_mysqld_con *con)
|
||||
{
|
||||
@ -2243,9 +2260,8 @@ handle_query_time_stats(network_mysqld_con *con)
|
||||
|
||||
diff = MAX(0, diff);
|
||||
if (diff >= con->srv->long_query_time) {
|
||||
g_log("slowquery", G_LOG_LEVEL_MESSAGE,
|
||||
"time: %dms, client: %s, user: %s, sql: %s",
|
||||
diff, con->client->src->name->str, con->client->response->username->str, con->orig_sql->str);
|
||||
log_slowquery(diff, con->client->src->name->str,
|
||||
con->client->response->username->str, con->orig_sql->str);
|
||||
diff = con->srv->long_query_time - 1;
|
||||
}
|
||||
con->srv->query_stats.query_time_table[diff]++;
|
||||
@ -2290,6 +2306,11 @@ handle_read_query(network_mysqld_con *con, network_mysqld_con_state_t ostate)
|
||||
|
||||
gettimeofday(&(con->req_recv_time), NULL);
|
||||
|
||||
if (srv->is_need_to_create_conns) {
|
||||
srv->is_need_to_create_conns = 0;
|
||||
network_connection_pool_create_conns(srv);
|
||||
}
|
||||
|
||||
if (!con->is_wait_server) {
|
||||
do {
|
||||
switch (network_mysqld_read(srv, recv_sock)) {
|
||||
@ -3304,6 +3325,7 @@ process_service_unavailable(network_mysqld_con *con)
|
||||
for (i = 0; i < con->servers->len; i++) {
|
||||
server_session_t *ss = g_ptr_array_index(con->servers, i);
|
||||
if (ss->fresh) {
|
||||
CHECK_PENDING_EVENT(&(ss->server->event));
|
||||
network_pool_add_idle_conn(ss->backend->pool, con->srv, ss->server);
|
||||
ss->backend->connected_clients--;
|
||||
g_message("%s: connected_clients sub:%d, %d ndx for con:%p", G_STRLOC,
|
||||
@ -4460,7 +4482,9 @@ chassis_event_add_with_timeout(srv, &(sock->event), timeout);
|
||||
static int
|
||||
process_self_event(server_connection_state_t *con, int events, int event_fd)
|
||||
{
|
||||
g_debug("%s:events:%d, ev:%p, state:%d", G_STRLOC, events, (&con->server->event), con->state);
|
||||
if (events == EV_READ) {
|
||||
g_debug("%s:EV_READ, ev:%p, state:%d", G_STRLOC, (&con->server->event), con->state);
|
||||
int b = -1;
|
||||
if (ioctl(con->server->fd, FIONREAD, &b)) {
|
||||
g_warning("ioctl(%d, FIONREAD, ...) failed: %s", event_fd, strerror(errno));
|
||||
@ -4477,7 +4501,7 @@ process_self_event(server_connection_state_t *con, int events, int event_fd)
|
||||
}
|
||||
}
|
||||
} else if (events == EV_TIMEOUT) {
|
||||
g_debug("%s:timeout, ev:%p", G_STRLOC, (&con->server->event));
|
||||
g_debug("%s:timeout, ev:%p, state:%d", G_STRLOC, (&con->server->event), con->state);
|
||||
if (con->state == ST_ASYNC_CONN) {
|
||||
g_message("%s: self conn timeout, state:%d, con:%p, server:%p", G_STRLOC, con->state, con, con->server);
|
||||
con->state = ST_ASYNC_ERROR;
|
||||
@ -4504,8 +4528,18 @@ process_self_server_read(server_connection_state_t *con)
|
||||
case NETWORK_SOCKET_SUCCESS:
|
||||
break;
|
||||
case NETWORK_SOCKET_WAIT_FOR_EVENT:{
|
||||
con->retry_cnt++;
|
||||
if (con->retry_cnt >= MAX_TRY_NUM) {
|
||||
con->state = ST_ASYNC_ERROR;
|
||||
break;
|
||||
}
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = 3;
|
||||
timeout.tv_usec = 0;
|
||||
g_debug("%s: set timeout:%d for new conn:%p", G_STRLOC,
|
||||
(int)timeout.tv_sec, con);
|
||||
/* call us again when you have a event */
|
||||
ASYNC_WAIT_FOR_EVENT(con->server, EV_READ, NULL, con);
|
||||
ASYNC_WAIT_FOR_EVENT(con->server, EV_READ, &timeout, con);
|
||||
return 0;
|
||||
}
|
||||
case NETWORK_SOCKET_ERROR:
|
||||
@ -4570,12 +4604,10 @@ process_self_read_auth_result(server_connection_state_t *con)
|
||||
network_mysqld_queue_reset(con->server);
|
||||
network_queue_clear(con->server->recv_queue);
|
||||
con->server->is_multi_stmt_set = con->is_multi_stmt_set;
|
||||
#if NETWORK_DEBUG_TRACE_EVENT
|
||||
CHECK_PENDING_EVENT(&(con->server->event));
|
||||
#endif
|
||||
if (con->srv->is_back_compressed) {
|
||||
con->server->do_compress = 1;
|
||||
}
|
||||
CHECK_PENDING_EVENT(&(con->server->event));
|
||||
network_pool_add_idle_conn(con->pool, con->srv, con->server);
|
||||
con->server = NULL; /* tell _self_con_free we succeed */
|
||||
network_mysqld_self_con_free(con);
|
||||
@ -4639,10 +4671,18 @@ network_mysqld_self_con_handle(int event_fd, short events, void *user_data)
|
||||
case ST_ASYNC_READ_HANDSHAKE:
|
||||
g_assert(events == 0 || event_fd == con->server->fd);
|
||||
|
||||
g_debug("%s: ST_ASYNC_READ_HANDSHAKE for con:%p, connection %s and %s",
|
||||
G_STRLOC, con, con->server->src->name->str, con->server->dst->name->str);
|
||||
|
||||
|
||||
if (!process_self_server_read(con)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (con->state == ST_ASYNC_ERROR) {
|
||||
break;
|
||||
}
|
||||
|
||||
switch (proxy_self_read_handshake(srv, con)) {
|
||||
case RET_SUCCESS:
|
||||
break;
|
||||
@ -4856,7 +4896,12 @@ network_connection_pool_create_conns(chassis *srv)
|
||||
continue;
|
||||
}
|
||||
|
||||
for (j = 0; j < backend->config->mid_conn_pool; j++) {
|
||||
int allowd_conn_num = backend->config->mid_conn_pool;
|
||||
if (allowd_conn_num > MAX_CREATE_CONN_NUM) {
|
||||
allowd_conn_num = MAX_CREATE_CONN_NUM;
|
||||
}
|
||||
|
||||
for (j = 0; j < allowd_conn_num; j++) {
|
||||
server_connection_state_t *scs = network_mysqld_self_con_init(srv);
|
||||
if (srv->disable_dns_cache)
|
||||
network_address_set_address(scs->server->dst, backend->address->str);
|
||||
@ -4882,6 +4927,8 @@ network_connection_pool_create_conns(chassis *srv)
|
||||
G_STRLOC, i, scs->server, scs);
|
||||
|
||||
scs->backend->connected_clients++;
|
||||
int create_err = 0;
|
||||
|
||||
switch (network_socket_connect(scs->server)) {
|
||||
case NETWORK_SOCKET_ERROR_RETRY:{
|
||||
scs->state = ST_ASYNC_CONN;
|
||||
@ -4900,6 +4947,7 @@ network_connection_pool_create_conns(chassis *srv)
|
||||
g_message("%s: set backend conn:%p read handshake", G_STRLOC, scs);
|
||||
break;
|
||||
default:
|
||||
create_err = 1;
|
||||
scs->backend->connected_clients--;
|
||||
network_mysqld_self_con_free(scs);
|
||||
if (scs->srv->disable_threads) {
|
||||
@ -4913,6 +4961,10 @@ network_connection_pool_create_conns(chassis *srv)
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (create_err) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -340,6 +340,7 @@ struct server_connection_state_t {
|
||||
chassis *srv;
|
||||
network_connection_pool *pool;
|
||||
unsigned int is_multi_stmt_set:1;
|
||||
unsigned int retry_cnt:4;
|
||||
guint8 charset_code;
|
||||
};
|
||||
|
||||
|
@ -584,6 +584,7 @@ proxy_put_shard_conn_to_pool(network_mysqld_con *con)
|
||||
|
||||
int alive_time = con->srv->current_time - server->create_time;
|
||||
if (alive_time > con->srv->max_alive_time) {
|
||||
g_debug("%s: reach max_alive_time", G_STRLOC);
|
||||
is_put_to_pool_allowed = 0;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user