admin-plugin implemented with lex&parser

This commit is contained in:
jingxiaobing 2018-05-24 20:37:21 +08:00
parent 50de6a3b20
commit cbd0e2b1e6
11 changed files with 2454 additions and 2487 deletions

View File

@ -25,16 +25,44 @@ INCLUDE_DIRECTORIES(${GLIB_INCLUDE_DIRS})
INCLUDE_DIRECTORIES(${MYSQL_INCLUDE_DIRS}) INCLUDE_DIRECTORIES(${MYSQL_INCLUDE_DIRS})
INCLUDE_DIRECTORIES(${EVENT_INCLUDE_DIRS}) INCLUDE_DIRECTORIES(${EVENT_INCLUDE_DIRS})
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
include_directories(${CMAKE_CURRENT_BINARY_DIR})
LINK_DIRECTORIES(${GLIB_LIBRARY_DIRS}) LINK_DIRECTORIES(${GLIB_LIBRARY_DIRS})
LINK_DIRECTORIES(${LIBINTL_LIBRARY_DIRS}) LINK_DIRECTORIES(${LIBINTL_LIBRARY_DIRS})
LINK_DIRECTORIES(${MYSQL_LIBRARY_DIRS}) LINK_DIRECTORIES(${MYSQL_LIBRARY_DIRS})
SET(_plugin_name admin) set(_plugin_name admin)
ADD_LIBRARY(${_plugin_name} SHARED "${_plugin_name}-plugin.c")
TARGET_LINK_LIBRARIES(${_plugin_name} mysql-chassis-proxy)
if(HAVE_TCMALLOC)
TARGET_LINK_LIBRARIES(${_plugin_name} tcmalloc)
endif(HAVE_TCMALLOC)
find_package(FLEX)
set(ADMIN_LEXER_SRC ${CMAKE_CURRENT_BINARY_DIR}/admin-lexer.l.c)
set(ADMIN_LEXER_HEADER ${CMAKE_CURRENT_BINARY_DIR}/admin-lexer.l.h)
FLEX_TARGET(AdminLexer lexer.l ${ADMIN_LEXER_SRC} COMPILE_FLAGS
"--header-file=${ADMIN_LEXER_HEADER}")
set_source_files_properties(${ADMIN_LEXER_SRC} PROPERTIES GENERATED 1)
set_source_files_properties(${ADMIN_LEXER_HEADER} PROPERTIES GENERATED 1)
set(PARSER_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/admin-parser.y.c)
set(OUTPUT_HEADER ${CMAKE_CURRENT_BINARY_DIR}/admin-parser.y.h)
set(GRAMMAR_FILE ${CMAKE_CURRENT_SOURCE_DIR}/admin-parser.y)
set(LEMON_TEMPLATE_FILE ${CETUS_TOOLS_DIR}/lempar.c)
add_custom_command(
OUTPUT ${PARSER_OUTPUT} ${SQL_TOKENS_HEADER}
DEPENDS ${LEMON_TEMPLATE_FILE} ${GRAMMAR_FILE}
COMMAND lemon -q -T${LEMON_TEMPLATE_FILE} ${GRAMMAR_FILE} -o${PARSER_OUTPUT} -h${OUTPUT_HEADER}
)
set_source_files_properties(${PARSER_OUTPUT} PROPERTIES GENERATED 1)
set_source_files_properties(${OUTPUT_HEADER} PROPERTIES GENERATED 1)
ADD_LIBRARY(${_plugin_name} SHARED
admin-plugin.c
admin-commands.c
admin-stats.c
${ADMIN_LEXER_SRC}
${PARSER_OUTPUT}
)
TARGET_LINK_LIBRARIES(${_plugin_name} mysql-chassis-proxy)
CHASSIS_PLUGIN_INSTALL(${_plugin_name}) CHASSIS_PLUGIN_INSTALL(${_plugin_name})

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,55 @@
#ifndef ADMIN_COMMANDS_H
#define ADMIN_COMMANDS_H
#include "glib-ext.h"
typedef struct token_t {
char* z;
int n;
} token_t;
typedef struct network_mysqld_con network_mysqld_con;
void admin_clear_error(network_mysqld_con*);
int admin_get_error(network_mysqld_con*);
void admin_syntax_error(network_mysqld_con*);
void admin_stack_overflow(network_mysqld_con*);
void admin_select_conn_details(network_mysqld_con* con);
void admin_select_all_backends(network_mysqld_con*);
void admin_select_all_groups(network_mysqld_con* con);
void admin_show_connectionlist(network_mysqld_con *admin_con, int show_count);
void admin_show_allow_ip(network_mysqld_con *con, const char* module_name);
void admin_add_allow_ip(network_mysqld_con *con, char *module, char *addr);
void admin_delete_allow_ip(network_mysqld_con *con, char* module, char* ip);
void admin_set_reduce_conns(network_mysqld_con* con, int mode);
void admin_set_maintain(network_mysqld_con* con, int mode);
void admin_show_status(network_mysqld_con* con, const char* like);
void admin_show_variables(network_mysqld_con* con, const char* like);
void admin_select_version(network_mysqld_con* con);
void admin_select_connection_stat(network_mysqld_con* con, int backend_ndx, char *user);
void admin_select_user_password(network_mysqld_con* con, char* from_table, char *user);
void admin_update_user_password(network_mysqld_con* con, char *from_table,
char *user, char *password);
void admin_delete_user_password(network_mysqld_con* con, char* user);
void admin_insert_backend(network_mysqld_con* con, char *addr, char *type, char *state);
void admin_update_backend(network_mysqld_con* con, char *key1, char *val1,
char *key2, char *val2,
char *cond_key, char *cond_val);
void admin_delete_backend(network_mysqld_con* con, char *key, char *val);
void admin_get_stats(network_mysqld_con* con, char* p);
void admin_get_config(network_mysqld_con* con, char* p);
void admin_set_config(network_mysqld_con* con, char* key, char* value);
void admin_reset_stats(network_mysqld_con* con);
void admin_select_help(network_mysqld_con* con);
void admin_send_overview(network_mysqld_con* con);
enum sharding_method_t;
void admin_create_vdb(network_mysqld_con* con, int id, GPtrArray* partitions,
enum sharding_method_t method, int key_type, int shard_num);
void admin_create_sharded_table(network_mysqld_con*, const char* schema, const char* table,
const char* key, int vdb_id);
void admin_select_vdb(network_mysqld_con* con);
void admin_select_sharded_table(network_mysqld_con* con);
#endif // ADMIN_COMMANDS_H

View File

@ -0,0 +1,394 @@
%token_prefix TK_
%token_type {token_t}
%default_type {token_t}
%extra_argument {struct network_mysqld_con *con}
%syntax_error {
UNUSED_PARAMETER(yymajor); /* Silence some compiler warnings */
admin_syntax_error(con);
}
%stack_overflow {
admin_stack_overflow(con);
}
%name adminParser
%include {
#include <assert.h>
#include <inttypes.h>
#include <ctype.h>
#include <stdlib.h>
#include <string.h>
#include "admin-parser.y.h"
#include "admin-commands.h"
#include "sharding-config.h"
struct network_mysqld_con;
#define UNUSED_PARAMETER(x) (void)(x)
#define YYNOERRORRECOVERY 1
#define YYPARSEFREENEVERNULL 1
#define YYMALLOCARGTYPE uint64_t
typedef struct equation_t {
token_t left;
token_t right;
} equation_t;
static int64_t token2int(token_t token)
{
/*TODO: HEX*/
int64_t value = 0;
int sign = 1;
const char* c = token.z;
int i = 0;
if( *c == '+' || *c == '-' ) {
if( *c == '-' ) sign = -1;
c++;
i++;
}
while (isdigit(*c) && i++ < token.n) {
value *= 10;
value += (int) (*c-'0');
c++;
}
return (value * sign);
}
static void string_dequote(char* z)
{
int quote;
int i, j;
if( z==0 ) return;
quote = z[0];
switch( quote ){
case '\'': break;
case '"': break;
case '`': break; /* For MySQL compatibility */
default: return;
}
for (i=1, j=0; z[i]; i++) {
if (z[i] == quote) {
if (z[i+1]==quote) { /*quote escape*/
z[j++] = quote;
i++;
} else {
z[j++] = 0;
break;
}
} else if (z[i] == '\\') { /* slash escape */
i++;
z[j++] = z[i];
} else {
z[j++] = z[i];
}
}
}
static char* token_strdup(token_t token)
{
if (token.n == 0)
return NULL;
char* s = malloc(token.n + 1);
memcpy(s, token.z, token.n);
s[token.n] = '\0';
string_dequote(s);
return s;
}
} // end %include
input ::= cmd.
%left OR.
%left AND.
%right NOT.
%left LIKE NE EQ.
%left GT LE LT GE.
%fallback ID
CONN_DETAILS BACKENDS AT_SIGN REDUCE_CONNS ADD MAINTAIN STATUS
CONN_NUM BACKEND_NDX RESET CETUS VDB HASH RANGE SHARDKEY
.
%wildcard ANY.
%type opt_where_user {char*}
%destructor opt_where_user {free($$);}
opt_where_user(A) ::= WHERE USER EQ STRING(E). {A = token_strdup(E);}
opt_where_user(A) ::= . {A = NULL;}
%type equation {equation_t*}
%destructor equation {free($$);}
equation(A) ::= ID(X) EQ STRING|ID|INTEGER|FLOAT(Y). {
A = calloc(1, sizeof(equation_t));
A->left = X;
A->right = Y;
}
%type opt_like {char*}
%destructor opt_like {free($$);}
opt_like(A) ::= LIKE STRING(X). {A = token_strdup(X);}
opt_like(A) ::= . {A = NULL; }
%type boolean {int}
boolean(A) ::= TRUE. {A = 1;}
boolean(A) ::= FALSE. {A = 0;}
boolean(A) ::= INTEGER(X). {A = token2int(X)==0 ? 0:1;}
%type opt_integer {int}
opt_integer(A) ::= . {A = -1;}
opt_integer(A) ::= INTEGER(X). {A = token2int(X);}
%token_class ids STRING|ID.
cmd ::= SELECT CONN_DETAILS FROM BACKENDS. {
admin_select_conn_details(con);
}
cmd ::= SELECT STAR FROM BACKENDS. {
admin_select_all_backends(con);
}
cmd ::= SELECT STAR FROM GROUPS. {
admin_select_all_groups(con);
}
cmd ::= SHOW CONNECTIONLIST opt_integer(X). {
admin_show_connectionlist(con, X);
}
cmd ::= SHOW ALLOW_IP ids(X). {
char* module = token_strdup(X);
admin_show_allow_ip(con, module);
free(module);
}
cmd ::= ADD ALLOW_IP ids(X) STRING(Y). {
char* module = token_strdup(X);
char* ip = token_strdup(Y);
admin_add_allow_ip(con, module, ip);
free(module);
free(ip);
}
cmd ::= DELETE ALLOW_IP ids(X) STRING(Y). {
char* module = token_strdup(X);
char* ip = token_strdup(Y);
admin_delete_allow_ip(con, module, ip);
free(module);
free(ip);
}
cmd ::= SET REDUCE_CONNS boolean(X). {
admin_set_reduce_conns(con, X);
}
cmd ::= SET MAINTAIN boolean(X). {
admin_set_maintain(con, X);
}
cmd ::= SHOW STATUS opt_like(X). {
admin_show_status(con, X);
if (X) free(X);
}
cmd ::= SHOW VARIABLES opt_like(X). {
admin_show_variables(con, X);
if (X) free(X);
}
cmd ::= SELECT VERSION. {
admin_select_version(con);
}
cmd ::= SELECT CONN_NUM FROM BACKENDS WHERE BACKEND_NDX EQ INTEGER(X) AND USER EQ STRING(Y). {
char* user = token_strdup(Y);
admin_select_connection_stat(con, token2int(X), user);
free(user);
}
cmd ::= SELECT STAR FROM USER_PWD|APP_USER_PWD(T) opt_where_user(X). {
char* table = (@T == TK_USER_PWD)?"user_pwd":"app_user_pwd";
admin_select_user_password(con, table, X);
if (X) free(X);
}
cmd ::= UPDATE USER_PWD|APP_USER_PWD(T) SET PASSWORD EQ STRING(P) WHERE USER EQ STRING(U). {
char* table = (@T == TK_USER_PWD)?"user_pwd":"app_user_pwd";
char* user = token_strdup(U);
char* pass = token_strdup(P);
admin_update_user_password(con, table, user, pass);
free(user);
free(pass);
}
cmd ::= DELETE FROM USER_PWD|APP_USER_PWD WHERE USER EQ STRING(U). {
char* user = token_strdup(U);
admin_delete_user_password(con, user);
free(user);
}
cmd ::= INSERT INTO BACKENDS VALUES LP STRING(X) COMMA STRING(Y) COMMA STRING(Z) RP. {
char* addr = token_strdup(X);
char* type = token_strdup(Y);
char* state = token_strdup(Z);
admin_insert_backend(con, addr, type, state);
free(addr); free(type); free(state);
}
cmd ::= UPDATE BACKENDS SET equation(X) COMMA equation(Y) WHERE equation(Z). {
//TODO: equation list
char* key1 = token_strdup(X->left);
char* val1 = token_strdup(X->right);
char* key2 = token_strdup(Y->left);
char* val2 = token_strdup(Y->right);
char* cond_key = token_strdup(Z->left);
char* cond_val = token_strdup(Z->right);
admin_update_backend(con, key1, val1, key2, val2, cond_key, cond_val);
free(key1); free(val1);
free(key2); free(val2);
free(cond_key); free(cond_val);
free(X); free(Y);
}
cmd ::= DELETE FROM BACKENDS WHERE equation(Z). {
char* key = token_strdup(Z->left);
char* val = token_strdup(Z->right);
admin_delete_backend(con, key, val);
free(key);
free(val);
free(Z);
}
cmd ::= ADD MASTER STRING(X). {
char* addr = token_strdup(X);
admin_insert_backend(con, addr, "rw", "unknown");
free(addr);
}
cmd ::= ADD SLAVE STRING(X). {
char* addr = token_strdup(X);
admin_insert_backend(con, addr, "ro", "unknown");
free(addr);
}
cmd ::= STATS GET opt_id(X). {
admin_get_stats(con, X);
if (X) free(X);
}
cmd ::= CONFIG GET opt_id(X). {
admin_get_config(con, X);
if (X) free(X);
}
cmd ::= CONFIG SET equation(X). {
char* key = token_strdup(X->left);
char* val = token_strdup(X->right);
admin_set_config(con, key, val);
free(key);
free(val);
}
cmd ::= STATS RESET. {
admin_reset_stats(con);
}
cmd ::= SELECT STAR FROM HELP. {
admin_select_help(con);
}
cmd ::= SELECT HELP. {
admin_select_help(con);
}
cmd ::= CETUS. {
admin_send_overview(con);
}
%include {
struct vdb_method {
enum sharding_method_t method;
int key_type;
int logic_shard_num;
};
} //end %include
cmd ::= CREATE VDB INTEGER(X) LP partitions(Y) RP USING method(Z). {
admin_create_vdb(con, token2int(X), Y, Z.method, Z.key_type, Z.logic_shard_num);
g_ptr_array_free(Y, TRUE);
}
%type int_array_prefix {GArray*}
%type int_array {GArray*}
%destructor int_array_prefix {g_array_free($$, TRUE);}
%destructor int_array {g_array_free($$, TRUE);}
int_array_prefix(A) ::= int_array(A) COMMA.
int_array_prefix(A) ::= . { A = NULL; }
int_array(A) ::= int_array_prefix(X) INTEGER(Y). {
if (X == NULL) {
A = g_array_new(0,0,sizeof(int32_t));
} else {
A = X;
}
int32_t n = token2int(Y);
g_array_append_val(A, n);
}
%type partition {sharding_partition_t*}
%destructor partition {sharding_partition_free($$);}
partition(A) ::= ids(X) COLON LBRACKET int_array(Y) RBRACKET. {
A = g_new0(sharding_partition_t, 1);
A->group_name = g_string_new_len(X.z, X.n);
A->key_type = SHARD_DATA_TYPE_INT;
A->method = SHARD_METHOD_HASH;
int i;
for (i = 0; i < Y->len; ++i) {
int32_t val = g_array_index(Y, int, i);
SetBit(A->hash_set, val);
}
g_array_free(Y, TRUE);
}
partition(A) ::= ids(X) COLON ids(Y). {
A = g_new0(sharding_partition_t, 1);
A->group_name = g_string_new_len(X.z, X.n);
A->method = SHARD_METHOD_RANGE;
A->value = token_strdup(Y);
}
partition(A) ::= ids(X) COLON INTEGER(Y). {
A = g_new0(sharding_partition_t, 1);
A->group_name = g_string_new_len(X.z, X.n);
A->value = (void*)(int64_t)token2int(Y);
}
%type partitions_prefix {GPtrArray*}
%type partitions {GPtrArray*}
%destructor partitions_prefix {g_ptr_array_free($$, TRUE);}
%destructor partitions {g_ptr_array_free($$, TRUE);}
partitions_prefix(A) ::= partitions(A) COMMA.
partitions_prefix(A) ::= . { A = NULL;}
partitions(A) ::= partitions_prefix(X) partition(Y). {
if (X == NULL) {
A = g_ptr_array_new();
} else {
A = X;
}
g_ptr_array_add(A, Y);
}
%type opt_id {char*}
%destructor opt_id {free($$);}
opt_id(A) ::= ID(X). { A = token_strdup(X); }
opt_id(A) ::= . {A=0;}
%type method {struct vdb_method}
method(A) ::= HASH LP ID(X) COMMA INTEGER(Y) RP. {
A.method = SHARD_METHOD_HASH;
A.logic_shard_num = token2int(Y);
char* key = token_strdup(X);
A.key_type = sharding_key_type(key);
g_free(key);
}
method(A) ::= RANGE LP ID(X) RP. {
A.method = SHARD_METHOD_RANGE;
A.logic_shard_num = 0;
char* key = token_strdup(X);
A.key_type = sharding_key_type(key);
g_free(key);
}
cmd ::= CREATE SHARDED TABLE ids(X) DOT ids(Y) VDB INTEGER(Z) SHARDKEY ids(W). {
char* schema = token_strdup(X);
char* table = token_strdup(Y);
char* key = token_strdup(W);
admin_create_sharded_table(con, schema, table, key, token2int(Z));
g_free(schema);
g_free(table);
g_free(key);
}
cmd ::= SELECT STAR FROM VDB. {
admin_select_vdb(con);
}
cmd ::= SELECT SHARDED TABLE. {
admin_select_sharded_table(con);
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,35 @@
#ifndef ADMIN_PLUGIN_H
#define ADMIN_PLUGIN_H
#include "glib-ext.h"
#include "network-mysqld.h"
#include "admin-stats.h"
#ifndef PLUGIN_VERSION
#ifdef CHASSIS_BUILD_TAG
#define PLUGIN_VERSION CHASSIS_BUILD_TAG
#else
#define PLUGIN_VERSION PACKAGE_VERSION
#endif
#endif
struct chassis_plugin_config {
gchar *address; /**< listening address of the admin interface */
gchar *admin_username; /**< login username */
gchar *admin_password; /**< login password */
gchar *allow_ip; /**< allow ip addr list */
GHashTable *allow_ip_table;
gchar *deny_ip; /**< deny ip addr list */
GHashTable *deny_ip_table;
network_mysqld_con *listen_con;
admin_stats_t *admin_stats;
};
#endif

View File

@ -0,0 +1,81 @@
#include "admin-stats.h"
#include "admin-plugin.h"
#include "chassis-event.h"
/* ring buffer from: https://github.com/AndersKaloer/Ring-Buffer */
#define RING_BUFFER_SIZE 128 /* must be power of 2, !! index [0, 126] !!*/
#define RING_BUFFER_MASK (RING_BUFFER_SIZE-1)
typedef struct ring_buffer_t {
int head;
int tail;
guint64 buffer[RING_BUFFER_SIZE];
} ring_buffer_t;
static void ring_buffer_add(ring_buffer_t *buffer, guint64 data) {
if (((buffer->head - buffer->tail) & RING_BUFFER_MASK) == RING_BUFFER_MASK)
buffer->tail = ((buffer->tail + 1) & RING_BUFFER_MASK);
buffer->buffer[buffer->head] = data;
buffer->head = ((buffer->head + 1) & RING_BUFFER_MASK);
}
static guint64 ring_buffer_get(ring_buffer_t *buffer, int index) {
if (index >= ((buffer->head - buffer->tail) & RING_BUFFER_MASK))
return 0;
int data_index = ((buffer->tail + index) & RING_BUFFER_MASK);
return buffer->buffer[data_index];
}
struct admin_stats_t {
struct event sampling_timer;
chassis* chas;
ring_buffer_t sql_count_ring;
ring_buffer_t trx_count_ring;
};
/* sample interval is 10-sec, 127 samples takes about 21-min */
static void sql_stats_sampling_func(int fd, short what, void *arg)
{
admin_stats_t* a = arg;
query_stats_t* stats = &(a->chas->query_stats);
ring_buffer_add(&a->sql_count_ring,
stats->client_query.ro + stats->client_query.rw);
ring_buffer_add(&a->trx_count_ring, stats->xa_count);
static struct timeval ten_sec = {10, 0};
/* EV_PERSIST not work for libevent1.4, re-activate timer each time */
chassis_event_add_with_timeout(a->chas, &a->sampling_timer, &ten_sec);
}
admin_stats_t* admin_stats_init(chassis* chas)
{
admin_stats_t* stats = g_new0(admin_stats_t, 1);
stats->chas = chas;
stats->sql_count_ring.head = 126;
stats->trx_count_ring.head = 126;
/* EV_PERSIST not working for libevent 1.4 */
evtimer_set(&stats->sampling_timer, sql_stats_sampling_func, stats);
struct timeval ten_sec = {10, 0};
chassis_event_add_with_timeout(chas, &stats->sampling_timer, &ten_sec);
return stats;
}
void admin_stats_free(admin_stats_t* stats)
{
evtimer_del(&stats->sampling_timer);
g_free(stats);
}
void admin_stats_get_average(admin_stats_t* stats, int type, char* buf, int len)
{
ring_buffer_t* ring = type==1 ? &stats->sql_count_ring : &stats->trx_count_ring;
const int MOST_RECENT = 126;
guint64 c_now = ring_buffer_get(ring, MOST_RECENT);
guint64 c_1min = ring_buffer_get(ring, MOST_RECENT - 6);
guint64 c_5min = ring_buffer_get(ring, MOST_RECENT - 6*5);
guint64 c_15min = ring_buffer_get(ring, MOST_RECENT - 6*15);
snprintf(buf, len, "%.2f, %.2f, %.2f",
(c_now-c_1min)/60.0, (c_now-c_5min)/300.0, (c_now-c_15min)/900.0);
}

View File

@ -0,0 +1,13 @@
#ifndef ADMIN_STATS_H
#define ADMIN_STATS_H
struct chassis;
typedef struct admin_stats_t admin_stats_t;
admin_stats_t* admin_stats_init(struct chassis* chas);
void admin_stats_free(admin_stats_t* stats);
#define ADMIN_STATS_QPS 1
#define ADMIN_STATS_TPS 2
void admin_stats_get_average(admin_stats_t* stats, int type, char* buf, int len);
#endif // ADMIN_STATS_H

109
plugins/admin/lexer.l Normal file
View File

@ -0,0 +1,109 @@
%{
#include <stdio.h>
#include "admin-parser.y.h"
%}
%option reentrant
%option noyywrap
%option never-interactive
/*yyin and yyout set to NULL*/
%option nostdinit
%option nodefault
%option warn
%option case-insensitive
%option prefix="adminyy"
%x COMMENT
%%
[ \t\n]+ /* ignore whitespace */;
"(" return TK_LP ;
")" return TK_RP ;
"," return TK_COMMA ;
"<>" return TK_NE ;
"!=" return TK_NE ;
"=" return TK_EQ ;
"==" return TK_EQ ;
">" return TK_GT ;
"<=" return TK_LE ;
"<" return TK_LT ;
">=" return TK_GE ;
"*" return TK_STAR;
"@" return TK_AT_SIGN;
":" return TK_COLON;
"[" return TK_LBRACKET;
"]" return TK_RBRACKET;
"." return TK_DOT;
"!" return TK_NOT;
"OR" return TK_OR ;
"AND" return TK_AND ;
"LIKE" return TK_LIKE;
"FROM" return TK_FROM ;
"DELETE" return TK_DELETE ;
"WHERE" return TK_WHERE ;
"UPDATE" return TK_UPDATE ;
"SET" return TK_SET ;
"INTO" return TK_INTO ;
"VALUES" return TK_VALUES ;
"INSERT" return TK_INSERT ;
"SHOW" return TK_SHOW;
"SELECT" return TK_SELECT;
"TRUE" return TK_TRUE;
"FALSE" return TK_FALSE;
"connectionlist" return TK_CONNECTIONLIST;
"groups" return TK_GROUPS;
"backends" return TK_BACKENDS;
"user_pwd" return TK_USER_PWD;
"app_user_pwd" return TK_APP_USER_PWD;
"allow_ip" return TK_ALLOW_IP;
"conn_details" return TK_CONN_DETAILS;
"version" return TK_VERSION;
"get" return TK_GET;
"user" return TK_USER;
"password" return TK_PASSWORD;
"help" return TK_HELP;
"stats" return TK_STATS;
"config" return TK_CONFIG;
"reset" return TK_RESET;
"reduce_conns" return TK_REDUCE_CONNS;
"add" return TK_ADD;
"master" return TK_MASTER;
"slave" return TK_SLAVE;
"maintain" return TK_MAINTAIN;
"status" return TK_STATUS;
"variables" return TK_VARIABLES;
"conn_num" return TK_CONN_NUM;
"backend_ndx" return TK_BACKEND_NDX;
"cetus" return TK_CETUS;
"create" return TK_CREATE;
"vdb" return TK_VDB;
"using" return TK_USING;
"hash" return TK_HASH;
"range" return TK_RANGE;
"sharded" return TK_SHARDED;
"table" return TK_TABLE;
"shardkey" return TK_SHARDKEY;
[0-9]+ return TK_INTEGER; /*sign symbol is handled in parser*/
[0-9]*\.[0-9]+([eE][-+]?[0-9]+)? return TK_FLOAT;
[a-zA-Z][a-zA-Z0-9_]* return TK_ID;
'([^'\\]|\\.|'')*' return TK_STRING;
\"([^"\\]|\"\"|\\.)*\" return TK_STRING;
`([^`]|``)*` return TK_ID;
"/*" { BEGIN(COMMENT); }
<COMMENT>[^*]*
<COMMENT>"*"+[^*/]*
<COMMENT>"*"+"/" { BEGIN(INITIAL); }
"#".* { /* useless comment */ }
"--"[ \t\n].* { /* useless comment */ }
. {printf("### flex: bad input character: 0x%02x\n", yytext[0]);return 0;}

View File

@ -90,6 +90,16 @@ sharding_tables_get(const char *schema, const char *table)
return tinfo; return tinfo;
} }
gboolean sharding_tables_add(sharding_table_t* table)
{
if (sharding_tables_get(table->schema->str, table->name->str)) {
return FALSE; /* !! DON'T REPLACE ONLINE */
}
struct schema_table_t *st = schema_table_new(table->schema->str, table->name->str);
g_hash_table_insert(shard_conf_tables, st, table);
return TRUE;
}
static sharding_vdb_t * static sharding_vdb_t *
shard_vdbs_get_by_id(GList *vdbs, int id) shard_vdbs_get_by_id(GList *vdbs, int id)
{ {
@ -159,16 +169,53 @@ void sharding_partition_free(sharding_partition_t *p)
g_free(p); g_free(p);
} }
static sharding_vdb_t * void sharding_partition_to_string(sharding_partition_t* p, GString* repr)
sharding_vdb_new() {
g_string_truncate(repr, 0);
if (p->method == SHARD_METHOD_RANGE) {
if (p->key_type == SHARD_DATA_TYPE_STR) {
g_string_printf(repr, "(%s, %s]->%s", (char*)p->low_value, (char*)p->value,
p->group_name->str);
} else {
g_string_printf(repr, "(%ld, %ld]->%s",(int64_t)p->low_value, (int64_t)p->value,
p->group_name->str);
}
} else {
int i = 0;
g_string_append_c(repr, '[');
for (i = 0; i < p->hash_count; ++i) {
if (TestBit(p->hash_set, i)) {
g_string_append_printf(repr, "%d,", i);
}
}
g_string_truncate(repr, repr->len-1);
g_string_append_printf(repr, "]->%s", p->group_name->str);
}
}
void sharding_vdb_partitions_to_string(sharding_vdb_t* vdb, GString* repr)
{
g_string_truncate(repr, 0);
GString* str = g_string_new(0);
int i = 0;
for (i=0; i < vdb->partitions->len; ++i) {
sharding_partition_t* part = g_ptr_array_index(vdb->partitions, i);
sharding_partition_to_string(part, str);
g_string_append(repr, str->str);
if (i != vdb->partitions->len - 1)
g_string_append(repr, "; ");
}
g_string_free(str, TRUE);
}
sharding_vdb_t *sharding_vdb_new()
{ {
sharding_vdb_t *vdb = g_new0(struct sharding_vdb_t, 1); sharding_vdb_t *vdb = g_new0(struct sharding_vdb_t, 1);
vdb->partitions = g_ptr_array_new(); vdb->partitions = g_ptr_array_new();
return vdb; return vdb;
} }
static void void sharding_vdb_free(sharding_vdb_t *vdb)
sharding_vdb_free(sharding_vdb_t *vdb)
{ {
if (!vdb) { if (!vdb) {
return; return;
@ -183,8 +230,7 @@ sharding_vdb_free(sharding_vdb_t *vdb)
g_free(vdb); g_free(vdb);
} }
static gboolean gboolean sharding_vdb_is_valid(sharding_vdb_t *vdb, int num_groups)
sharding_vdb_is_valid(sharding_vdb_t *vdb, int num_groups)
{ {
if (vdb->method == SHARD_METHOD_HASH) { if (vdb->method == SHARD_METHOD_HASH) {
if (vdb->logic_shard_num <= 0 || vdb->logic_shard_num > MAX_HASH_VALUE_COUNT) { if (vdb->logic_shard_num <= 0 || vdb->logic_shard_num > MAX_HASH_VALUE_COUNT) {
@ -372,6 +418,11 @@ shard_conf_set_vdb_list(GList *vdbs)
shard_conf_vdbs = vdbs; shard_conf_vdbs = vdbs;
} }
GList* shard_conf_get_vdb_list()
{
return shard_conf_vdbs;
}
static void static void
shard_conf_set_tables(GHashTable *tables) shard_conf_set_tables(GHashTable *tables)
{ {
@ -380,6 +431,26 @@ shard_conf_set_tables(GHashTable *tables)
shard_conf_tables = tables; shard_conf_tables = tables;
} }
static gboolean
sharding_table_equal(gconstpointer v1, gconstpointer v2)
{
const sharding_table_t *st1 = v1;
const sharding_table_t *st2 = v2;
int a = strcasecmp(st1->schema->str, st2->schema->str);
if (a == 0) {
return strcasecmp(st1->name->str, st2->name->str);
} else {
return a;
}
}
GList* shard_conf_get_tables()
{
GList* tables = g_hash_table_get_values(shard_conf_tables);
tables = g_list_sort(tables, sharding_table_equal);
return tables;
}
static void static void
shard_conf_set_single_tables(GList *tables) shard_conf_set_single_tables(GList *tables)
{ {
@ -541,27 +612,37 @@ shard_conf_get_single_table_distinct_group(GPtrArray *groups, const char *db, co
return groups; return groups;
} }
static int struct code_map_t {
sharding_type(const char *str) const char *name;
int code;
} key_type_map[] = {
{"INT", SHARD_DATA_TYPE_INT},
{"STR", SHARD_DATA_TYPE_STR},
{"DATE", SHARD_DATA_TYPE_DATE},
{"DATETIME", SHARD_DATA_TYPE_DATETIME},
};
int sharding_key_type(const char *str)
{ {
struct code_map_t {
const char *name;
int code;
} map[] = {
{
"INT", SHARD_DATA_TYPE_INT}, {
"STR", SHARD_DATA_TYPE_STR}, {
"DATE", SHARD_DATA_TYPE_DATE}, {
"DATETIME", SHARD_DATA_TYPE_DATETIME},};
int i; int i;
for (i = 0; i < sizeof(map) / sizeof(*map); ++i) { for (i = 0; i < sizeof(key_type_map) / sizeof(*key_type_map); ++i) {
if (strcasecmp(map[i].name, str) == 0) if (strcasecmp(key_type_map[i].name, str) == 0)
return map[i].code; return key_type_map[i].code;
} }
g_critical("Wrong sharding setting <key_type:%s>", str); g_critical("Wrong sharding setting <key_type:%s>", str);
return -1; return -1;
} }
const char* sharding_key_type_str(int type)
{
int i;
for (i = 0; i < sizeof(key_type_map) / sizeof(*key_type_map); ++i) {
if (key_type_map[i].code == type)
return key_type_map[i].name;
}
return "error";
}
static int static int
sharding_method(const char *str) sharding_method(const char *str)
{ {
@ -744,7 +825,7 @@ parse_vdbs(cJSON *vdb_root)
} else { } else {
vdb->id = atoi(id->valuestring); vdb->id = atoi(id->valuestring);
} }
vdb->key_type = sharding_type(key_type->valuestring); vdb->key_type = sharding_key_type(key_type->valuestring);
if (vdb->key_type < 0) { if (vdb->key_type < 0) {
g_critical("Wrong sharding settings <key_type:%s>", key_type->valuestring); g_critical("Wrong sharding settings <key_type:%s>", key_type->valuestring);
} }
@ -858,3 +939,28 @@ load_shard_from_json(gchar *json_str)
g_hash_table_insert(shard_hash, "single_tables", single_list); /* NULLable */ g_hash_table_insert(shard_hash, "single_tables", single_list); /* NULLable */
return shard_hash; return shard_hash;
} }
gboolean shard_conf_add_vdb(sharding_vdb_t* vdb)
{
GList* l = shard_conf_vdbs;
for (l; l; l = l->next) {
sharding_vdb_t* base = l->data;
if (base->id == vdb->id) {
g_warning("add vdb dup id");
return FALSE;
}
}
setup_partitions(vdb->partitions, vdb);
shard_conf_vdbs = g_list_append(shard_conf_vdbs, vdb);
return TRUE;
}
gboolean shard_conf_add_sharded_table(sharding_table_t* t)
{
sharding_vdb_t* vdb = shard_vdbs_get_by_id(shard_conf_vdbs, t->vdb_id);
if (vdb) {
return sharding_tables_add(t);
} else {
return FALSE;
}
}

View File

@ -60,7 +60,9 @@ typedef struct sharding_partition_t {
} sharding_partition_t; } sharding_partition_t;
gboolean sharding_partition_contain_hash(sharding_partition_t *, int); gboolean sharding_partition_contain_hash(sharding_partition_t *, int);
void sharding_partition_free(sharding_partition_t *);
//gboolean sharding_partition_cover_range(sharding_partition_t *, ); //gboolean sharding_partition_cover_range(sharding_partition_t *, );
void sharding_partition_to_string(sharding_partition_t *, GString*);
struct sharding_vdb_t { struct sharding_vdb_t {
int id; int id;
@ -70,6 +72,8 @@ struct sharding_vdb_t {
GPtrArray *partitions; /* GPtrArray<sharding_partition_t *> */ GPtrArray *partitions; /* GPtrArray<sharding_partition_t *> */
}; };
void sharding_vdb_partitions_to_string(sharding_vdb_t* vdb, GString* repr);
struct sharding_table_t { struct sharding_table_t {
GString *schema; GString *schema;
GString *name; GString *name;
@ -78,7 +82,8 @@ struct sharding_table_t {
int vdb_id; int vdb_id;
struct sharding_vdb_t *vdb_ref; struct sharding_vdb_t *vdb_ref;
}; };
int sharding_key_type(const char *str);
const char* sharding_key_type_str(int type);
GPtrArray *shard_conf_get_any_group(GPtrArray *groups, const char *db, const char *table); GPtrArray *shard_conf_get_any_group(GPtrArray *groups, const char *db, const char *table);
GPtrArray *shard_conf_get_all_groups(GPtrArray *groups); GPtrArray *shard_conf_get_all_groups(GPtrArray *groups);
@ -113,4 +118,15 @@ gboolean shard_conf_load(char *, int);
void shard_conf_destroy(void); void shard_conf_destroy(void);
gboolean shard_conf_add_vdb(sharding_vdb_t* vdb);
sharding_vdb_t *sharding_vdb_new();
gboolean sharding_vdb_is_valid(sharding_vdb_t *vdb, int num_groups);
void sharding_vdb_free(sharding_vdb_t *vdb);
gboolean shard_conf_add_sharded_table(sharding_table_t* t);
GList* shard_conf_get_vdb_list();
GList* shard_conf_get_tables();
#endif /* __SHARDING_CONFIG_H__ */ #endif /* __SHARDING_CONFIG_H__ */