Merge branch 'vdb2table'

This commit is contained in:
jingxiaobing 2018-05-18 14:49:15 +08:00
commit e2defcf76c
4 changed files with 166 additions and 207 deletions

View File

@ -776,7 +776,7 @@ process_init_db_when_get_server_list(network_mysqld_con *con, sharding_plan_t *p
} else {
name_len = name_len - 1;
network_mysqld_proto_get_str_len(&packet, &db_name, name_len);
shard_conf_get_fixed_group(groups, db_name, con->key);
shard_conf_get_fixed_group(groups, con->key);
}
if (groups->len > 0) { /* has database */

View File

@ -858,7 +858,7 @@ routing_select(sql_context_t *context, const sql_select_t *select,
{
sql_src_list_t *sources = select->from_src;
if (!sources) {
shard_conf_get_fixed_group(groups, default_db, fixture);
shard_conf_get_fixed_group(groups, fixture);
stats->com_select_global += 1;
return USE_NON_SHARDING_TABLE;
}
@ -920,7 +920,7 @@ routing_select(sql_context_t *context, const sql_select_t *select,
}
if (sharding_tables->len == 0) {
shard_conf_get_fixed_group(groups, db, fixture);
shard_conf_get_fixed_group(groups, fixture);
g_ptr_array_free(sharding_tables, TRUE);
stats->com_select_global += 1;
return USE_NON_SHARDING_TABLE;
@ -1036,7 +1036,7 @@ routing_update(sql_context_t *context, sql_update_t *update,
return USE_NON_SHARDING_TABLE;
}
shard_conf_get_all_groups(groups, db);
shard_conf_get_all_groups(groups);
plan->table_type = GLOBAL_TABLE;
if (groups->len > 1) {
return USE_DIS_TRAN;
@ -1208,7 +1208,7 @@ routing_insert(sql_context_t *context, sql_insert_t *insert, char *default_db, s
return USE_NON_SHARDING_TABLE;
}
shard_conf_get_all_groups(groups, db);
shard_conf_get_all_groups(groups);
plan->table_type = GLOBAL_TABLE;
if (groups->len > 1) {
sharding_plan_add_groups(plan, groups);
@ -1299,13 +1299,6 @@ routing_delete(sql_context_t *context, sql_delete_t *delete,
}
char *db = default_db;
sql_src_list_t *from = delete->from_src;
if (from->len < 1) {
shard_conf_get_all_groups(groups, db);
if (groups->len > 1) {
return USE_DIS_TRAN;
}
return USE_NON_SHARDING_TABLE;
}
sql_src_item_t *table = g_ptr_array_index(from, 0);
if (table->dbname)
db = table->dbname;
@ -1317,7 +1310,7 @@ routing_delete(sql_context_t *context, sql_delete_t *delete,
return USE_NON_SHARDING_TABLE;
}
shard_conf_get_all_groups(groups, db);
shard_conf_get_all_groups(groups);
plan->table_type = GLOBAL_TABLE;
if (groups->len > 1) {
return USE_DIS_TRAN;
@ -1406,7 +1399,7 @@ routing_by_property(sql_context_t *context, sql_property_t *property, char *defa
return USE_SHARDING;
} else if (property->group) {
shard_conf_find_groups(groups, property->group, default_db);
shard_conf_find_groups(groups, property->group);
if (groups->len > 0) {
enum sql_clause_flag_t f = context->rw_flag;
if ((f & CF_WRITE) && !(f & CF_DDL) && groups->len > 1) {
@ -1432,7 +1425,7 @@ sharding_parse_groups(GString *default_db, sql_context_t *context, query_stats_t
GPtrArray *groups = g_ptr_array_new();
if (context == NULL) {
g_warning("%s:sql is not parsed", G_STRLOC);
shard_conf_get_fixed_group(groups, default_db->str, fixture);
shard_conf_get_fixed_group(groups, fixture);
sharding_plan_add_groups(plan, groups);
g_ptr_array_free(groups, TRUE);
return USE_NON_SHARDING_TABLE;
@ -1508,7 +1501,7 @@ sharding_parse_groups(GString *default_db, sql_context_t *context, query_stats_t
g_ptr_array_free(groups, TRUE);
return USE_NON_SHARDING_TABLE;
}
shard_conf_get_fixed_group(groups, db, fixture);
shard_conf_get_fixed_group(groups, fixture);
sharding_plan_add_groups(plan, groups);
g_ptr_array_free(groups, TRUE);
return USE_NON_SHARDING_TABLE;
@ -1521,7 +1514,7 @@ sharding_parse_groups(GString *default_db, sql_context_t *context, query_stats_t
g_ptr_array_free(groups, TRUE);
return USE_NONE;
} else {
shard_conf_get_all_groups(groups, default_db->str);
shard_conf_get_all_groups(groups);
sharding_plan_add_groups(plan, groups);
g_ptr_array_free(groups, TRUE);
return USE_ALL;
@ -1535,18 +1528,18 @@ sharding_parse_groups(GString *default_db, sql_context_t *context, query_stats_t
return USE_PREVIOUS_TRAN_CONNS;
case STMT_COMMON_DDL: /* ddl without comments sent to all */
case STMT_CALL:
shard_conf_get_all_groups(groups, default_db->str);
shard_conf_get_all_groups(groups);
sharding_plan_add_groups(plan, groups);
g_ptr_array_free(groups, TRUE);
return USE_ALL;
case STMT_SHOW:
shard_conf_get_fixed_group(groups, default_db->str, fixture);
shard_conf_get_fixed_group(groups, fixture);
sharding_plan_add_groups(plan, groups);
g_ptr_array_free(groups, TRUE);
return USE_NON_SHARDING_TABLE;
default:
g_debug("unrecognized query, using default master db, sql:%s", plan->orig_sql->str);
shard_conf_get_fixed_group(groups, default_db->str, fixture);
shard_conf_get_fixed_group(groups, fixture);
sharding_plan_add_groups(plan, groups);
g_ptr_array_free(groups, TRUE);
return USE_NON_SHARDING_TABLE;

View File

@ -29,46 +29,65 @@
static GList *shard_conf_vdbs = NULL;
static GList *shard_conf_tables = NULL;
static GHashTable *shard_conf_vdb_map = NULL;
static GHashTable *shard_conf_tables = NULL; /* mapping< schema_table_t*, sharding_table_t* > */
static GList *shard_conf_single_tables = NULL;
struct sharding_database_t {
char *name;
GHashTable *tables; /* <char *, const sharding_table_t *> */
static GList *shard_conf_all_groups = NULL;
struct schema_table_t {
const char *schema;
const char *table;
};
static struct sharding_database_t *
sharding_database_new(const char *name)
struct schema_table_t*
schema_table_new(const char* s, const char* t)
{
struct sharding_database_t *db = g_new0(struct sharding_database_t, 1);
db->tables = g_hash_table_new(g_str_hash, g_str_equal);
db->name = g_strdup(name);
return db;
struct schema_table_t *st = g_new0(struct schema_table_t, 1);
st->schema = g_strdup(s);
st->table = g_strdup(t);
return st;
}
static void
sharding_database_free(struct sharding_database_t *db)
void schema_table_free(struct schema_table_t *st)
{
g_free(db->name);
g_hash_table_destroy(db->tables);
g_free(db);
g_free((char*)st->schema);
g_free((char*)st->table);
g_free(st);
}
static void
sharding_database_add_table(struct sharding_database_t *db, sharding_table_t *table)
/* djb hash, same as g_str_hash */
static guint
schema_table_hash(gconstpointer v)
{
g_hash_table_insert(db->tables, table->name->str, table);
const struct schema_table_t *st = v;
const signed char *p;
guint32 h = 5381;
for (p = st->schema; *p != '\0'; p++)
h = (h << 5) + h + *p;
h = (h << 5) + h + '.';
for (p = st->table; *p != '\0'; p++)
h = (h << 5) + h + *p;
return h;
}
gboolean
schema_table_equal(gconstpointer v1,
gconstpointer v2)
{
const struct schema_table_t *st1 = v1;
const struct schema_table_t *st2 = v2;
return strcmp(st1->schema, st2->schema) == 0
&& strcmp(st1->table, st2->table) == 0;
}
static sharding_table_t *
sharding_database_get_table(struct sharding_database_t *db, const char *table)
sharding_tables_get(const char *schema, const char *table)
{
if (!table)
return NULL;
return g_hash_table_lookup(db->tables, table);
struct schema_table_t st = {schema, table};
gpointer tinfo = g_hash_table_lookup(shard_conf_tables, &st);
return tinfo;
}
static sharding_vdb_t *
@ -84,12 +103,22 @@ shard_vdbs_get_by_id(GList *vdbs, int id)
return NULL;
}
static sharding_vdb_t *
sharding_vdbs_get_by_table(const char *schema, const char *table)
{
sharding_table_t *tinfo = sharding_tables_get(schema, table);
if (tinfo)
return tinfo->vdb_ref;
else
return NULL;
}
void
sharding_table_free(gpointer q)
{
sharding_table_t *info = q;
if (NULL != info->db)
g_string_free(info->db, TRUE);
if (NULL != info->schema)
g_string_free(info->schema, TRUE);
if (NULL != info->name)
g_string_free(info->name, TRUE);
if (NULL != info->pkey)
@ -110,7 +139,6 @@ static sharding_vdb_t *
sharding_vdb_new()
{
sharding_vdb_t *vdb = g_new0(struct sharding_vdb_t, 1);
vdb->databases = g_ptr_array_new_with_free_func((GDestroyNotify) sharding_database_free);
vdb->partitions = g_ptr_array_new();
return vdb;
}
@ -141,8 +169,6 @@ sharding_vdb_free(sharding_vdb_t *vdb)
g_free(item);
}
g_ptr_array_free(vdb->partitions, TRUE);
g_ptr_array_free(vdb->databases, TRUE);
g_free(vdb);
}
@ -180,77 +206,42 @@ sharding_vdb_is_valid(sharding_vdb_t *vdb, int num_groups)
return TRUE;
}
static struct sharding_database_t *
sharding_vdb_get_database(sharding_vdb_t *vdb, const char *db_name)
{
int i = 0;
for (i = 0; i < vdb->databases->len; ++i) {
struct sharding_database_t *db = g_ptr_array_index(vdb->databases, i);
if (strcasecmp(db_name, db->name) == 0) {
return db;
}
}
return NULL;
}
static struct sharding_database_t *
sharding_vdb_add_database(sharding_vdb_t *vdb, const char *name)
{
struct sharding_database_t *db = sharding_database_new(name);
g_ptr_array_add(vdb->databases, db);
return db;
}
GPtrArray *
shard_conf_get_all_groups(GPtrArray *visited_groups, const char *db)
shard_conf_get_all_groups(GPtrArray *all_groups)
{
if (!db) {
g_warning(G_STRLOC " db name is NULL");
return visited_groups;
GList *l = shard_conf_all_groups;
for (; l; l = l->next) {
GString* gp = l->data;
g_ptr_array_add(all_groups, gp);
}
sharding_vdb_t *vdb = g_hash_table_lookup(shard_conf_vdb_map, db);
if (vdb) {
int i = 0;
for (i = 0; i < vdb->partitions->len; ++i) {
sharding_partition_t *part = g_ptr_array_index(vdb->partitions, i);
GString *gp = part->group_name;
g_ptr_array_add(visited_groups, gp);
}
} else {
g_warning(G_STRLOC " fail to get all groups for db: %s", db);
}
return visited_groups;
return all_groups;
}
void
shard_conf_find_groups(GPtrArray *groups, const char *match, const char *db)
shard_conf_find_groups(GPtrArray *groups, const char *pattern)
{
if (strcasecmp(match, "all") == 0) {
shard_conf_get_all_groups(groups, db);
if (strcasecmp(pattern, "all") == 0 || strcasecmp(pattern, "*") == 0) {
shard_conf_get_all_groups(groups);
return;
}
sharding_vdb_t *vdb = g_hash_table_lookup(shard_conf_vdb_map, db);
if (vdb) {
int i = 0;
for (i = 0; i < vdb->partitions->len; ++i) {
sharding_partition_t *part = g_ptr_array_index(vdb->partitions, i);
GString *gp = part->group_name;
if (strcmp(gp->str, match) == 0) {
g_ptr_array_add(groups, gp);
return;
}
GList *l = shard_conf_all_groups;
for (; l; l = l->next) {
GString *gp = l->data;
if (strcmp(gp->str, pattern) == 0) {
g_ptr_array_add(groups, gp);
return;
}
}
}
GPtrArray *
shard_conf_get_any_group(GPtrArray *visited_groups, char *db, char *UNUSED_PARAM(table))
shard_conf_get_any_group(GPtrArray *any_group, const char *db, const char *table)
{
if (!db) {
g_warning(G_STRLOC " db name is NULL");
if (!db || !table) {
g_warning(G_STRLOC " db or table name is NULL");
return NULL;
}
sharding_vdb_t *vdb = g_hash_table_lookup(shard_conf_vdb_map, db);
sharding_vdb_t *vdb = sharding_vdbs_get_by_table(db, table);
if (vdb == NULL) {
return NULL;
}
@ -259,20 +250,20 @@ shard_conf_get_any_group(GPtrArray *visited_groups, char *db, char *UNUSED_PARAM
int i = rand() % partitions->len;
sharding_partition_t *part = g_ptr_array_index(partitions, i);
g_ptr_array_add(visited_groups, part->group_name);
g_ptr_array_add(any_group, part->group_name);
return visited_groups;
return any_group;
}
GPtrArray *
shard_conf_get_table_groups(GPtrArray *visited_groups, char *db, char *UNUSED_PARAM(table))
shard_conf_get_table_groups(GPtrArray *visited_groups, const char *db, const char *table)
{
if (!db) {
g_warning(G_STRLOC " db name is NULL");
if (!db || !table) {
g_warning(G_STRLOC " schema or table name is NULL");
return NULL;
}
sharding_vdb_t *vdb = g_hash_table_lookup(shard_conf_vdb_map, db);
sharding_vdb_t *vdb = sharding_vdbs_get_by_table(db, table);
if (vdb == NULL) {
return NULL;
}
@ -301,14 +292,14 @@ shard_conf_get_table_groups(GPtrArray *visited_groups, char *db, char *UNUSED_PA
* no more duplication check cause one group correspond multiple range value
*/
GPtrArray *
shard_conf_table_partitions(GPtrArray *partitions, const char *db, const char *UNUSED_PARAM(table))
shard_conf_table_partitions(GPtrArray *partitions, const char *db, const char *table)
{
if (!db) {
g_warning(G_STRLOC " db name is NULL");
if (!db || !table) {
g_warning(G_STRLOC " db or table name is NULL");
return NULL;
}
sharding_vdb_t *vdb = g_hash_table_lookup(shard_conf_vdb_map, db);
sharding_vdb_t *vdb = sharding_vdbs_get_by_table(db, table);
if (!vdb) {
return NULL;
}
@ -324,48 +315,31 @@ shard_conf_table_partitions(GPtrArray *partitions, const char *db, const char *U
sharding_table_t *
shard_conf_get_info(const char *db_name, const char *table)
{
if (!db_name)
return NULL;
sharding_vdb_t *vdb = g_hash_table_lookup(shard_conf_vdb_map, db_name);
if (!vdb) {
return NULL;
}
struct sharding_database_t *db = sharding_vdb_get_database(vdb, db_name);
if (!db) {
return NULL;
}
return sharding_database_get_table(db, table);
return sharding_tables_get(db_name, table);
}
gboolean
shard_conf_is_shard_table(const char *db, const char *table)
{
return shard_conf_get_info(db, table) ? TRUE : FALSE;
return sharding_tables_get(db, table) ? TRUE : FALSE;
}
GPtrArray *
shard_conf_get_fixed_group(GPtrArray *groups, const char *db, guint32 fixture)
shard_conf_get_fixed_group(GPtrArray *groups, guint32 fixture)
{
if (!db) {
g_warning(G_STRLOC " db name is NULL");
int len = g_list_length(shard_conf_all_groups);
if (len == 0) {
return groups;
}
sharding_vdb_t *vdb = g_hash_table_lookup(shard_conf_vdb_map, db);
if (vdb) {
int base = vdb->partitions->len;
if (base == 0) {
return groups;
}
int index = fixture % base;
sharding_partition_t *part = g_ptr_array_index(vdb->partitions, index);
g_ptr_array_add(groups, part->group_name);
}
int index = fixture % len;
GString *grp = g_list_nth_data(shard_conf_all_groups, index);
g_ptr_array_add(groups, grp);
return groups;
}
struct single_table_t { /* single table only resides on 1 group */
GString *name;
GString *db;
GString *schema;
GString *group;
};
@ -374,7 +348,7 @@ single_table_free(struct single_table_t *t)
{
if (t) {
g_string_free(t->name, TRUE);
g_string_free(t->db, TRUE);
g_string_free(t->schema, TRUE);
g_string_free(t->group, TRUE);
g_free(t);
}
@ -388,21 +362,13 @@ shard_conf_set_vdb_list(GList *vdbs)
}
static void
shard_conf_set_table_list(GList *tables)
shard_conf_set_tables(GHashTable *tables)
{
g_list_free_full(shard_conf_tables, (GDestroyNotify) sharding_table_free);
if (shard_conf_tables)
g_hash_table_destroy(shard_conf_tables);
shard_conf_tables = tables;
}
static void
shard_conf_set_vdb_map(GHashTable *vdbmap)
{
if (shard_conf_vdb_map) {
g_hash_table_destroy(shard_conf_vdb_map);
}
shard_conf_vdb_map = vdbmap;
}
static void
shard_conf_set_single_tables(GList *tables)
{
@ -410,6 +376,24 @@ shard_conf_set_single_tables(GList *tables)
shard_conf_single_tables = tables;
}
static void
shard_conf_set_all_groups(GList *groups)
{
g_list_free_full(shard_conf_all_groups, g_string_true_free);
shard_conf_all_groups = groups;
}
static GList *
string_list_distinct_append(GList *strlist, const GString *str)
{
GList *l = strlist;
for (; l; l = l->next) {
GString* s = l->data;
if (g_string_equal(s, str))
return strlist;
}
return g_list_append(strlist, g_string_new(str->str));
}
/**
* setup index & validate configurations
*/
@ -428,53 +412,40 @@ shard_conf_try_setup(GList *vdbs, GList *tables, GList *single_tables, int num_g
return FALSE;
}
}
GHashTable *vdbmap = g_hash_table_new(g_str_hash, g_str_equal);
GList *all_groups = NULL;
GHashTable *table_dict = g_hash_table_new_full(schema_table_hash, schema_table_equal,
(GDestroyNotify)schema_table_free,
sharding_table_free);
l = tables;
for (; l != NULL; l = l->next) {
sharding_table_t *table = l->data;
sharding_vdb_t *vdb = shard_vdbs_get_by_id(vdbs, table->vdb_id);
/* Fill table with vdb data */
if (!vdb) {
g_critical(G_STRLOC " table:%s VDB ID cannot be found: %d", table->name->str, table->vdb_id);
g_hash_table_destroy(vdbmap);
return FALSE;
} else {
/* Fill table with vdb info */
if (vdb) {
table->vdb_ref = vdb;
table->shard_key_type = vdb->key_type;
table->logic_shard_num = vdb->logic_shard_num;
table->method = vdb->method;
table->partitions = vdb->partitions;
}
/* collect database into vdb */
struct sharding_database_t *database = sharding_vdb_get_database(vdb, table->db->str);
if (!database) {
database = sharding_vdb_add_database(vdb, table->db->str);
}
/* setup table map in database */
if (sharding_database_get_table(database, table->name->str)) {
g_hash_table_destroy(vdbmap);
g_critical(G_STRLOC " same table name inside same db: %s", table->name->str);
return FALSE;
}
sharding_database_add_table(database, table);
/* setup db to vdb map */
sharding_vdb_t *vdb_res = g_hash_table_lookup(vdbmap, database->name);
if (vdb_res && vdb_res != vdb) {
g_hash_table_destroy(vdbmap);
g_critical(G_STRLOC " same db inside different vdb: %s", database->name);
return FALSE;
} else {
g_hash_table_insert(vdbmap, database->name, vdb);
g_critical(G_STRLOC " table:%s VDB ID cannot be found: %d",
table->name->str, table->vdb_id);
g_hash_table_destroy(table_dict);
return FALSE;
}
int i = 0;
for (i = 0; i < vdb->partitions->len; ++i) {
sharding_partition_t *part = g_ptr_array_index(vdb->partitions, i);
all_groups = string_list_distinct_append(all_groups, part->group_name);
}
struct schema_table_t *st = schema_table_new(table->schema->str, table->name->str);
g_hash_table_insert(table_dict, st, table);
}
/* `tables` has been transferred to `table_dict`, free it */
g_list_free(tables);
shard_conf_set_vdb_list(vdbs);
shard_conf_set_table_list(tables);
shard_conf_set_vdb_map(vdbmap);
shard_conf_set_tables(table_dict);
shard_conf_set_single_tables(single_tables);
shard_conf_set_all_groups(all_groups);
return TRUE;
}
@ -484,10 +455,9 @@ shard_conf_destroy(void)
if (shard_conf_vdbs) {
g_list_free_full(shard_conf_vdbs, (GDestroyNotify) sharding_vdb_free);
}
g_list_free_full(shard_conf_tables, (GDestroyNotify) sharding_table_free);
if (shard_conf_vdb_map) {
g_hash_table_destroy(shard_conf_vdb_map);
}
g_hash_table_destroy(shard_conf_tables);
g_list_free_full(shard_conf_single_tables, (GDestroyNotify) single_table_free);
g_list_free_full(shard_conf_all_groups, g_string_true_free);
}
static GHashTable *load_shard_from_json(gchar *json_str);
@ -517,7 +487,7 @@ shard_conf_get_single_table(const char *db, const char *name)
GList *l = shard_conf_single_tables;
for (; l; l = l->next) {
struct single_table_t *t = l->data;
if (strcasecmp(t->name->str, name) == 0 && strcasecmp(t->db->str, db) == 0) {
if (strcasecmp(t->name->str, name) == 0 && strcasecmp(t->schema->str, db) == 0) {
return t;
}
}
@ -805,7 +775,7 @@ parse_tables(cJSON *root)
} else if (vdb->type == cJSON_Number) {
table->vdb_id = vdb->valueint;
}
table->db = g_string_new(db->valuestring);
table->schema = g_string_new(db->valuestring);
table->name = g_string_new(table_root->valuestring);
table->pkey = g_string_new(pkey->valuestring);
@ -830,7 +800,7 @@ parse_single_tables(cJSON *root)
if (name && db && group) {
struct single_table_t *table = g_new0(struct single_table_t, 1);
table->group = g_string_new(group->valuestring);
table->db = g_string_new(db->valuestring);
table->schema = g_string_new(db->valuestring);
table->name = g_string_new(name->valuestring);
tables = g_list_append(tables, table);
} else {

View File

@ -65,30 +65,26 @@ struct sharding_vdb_t {
int key_type;
int logic_shard_num;
GPtrArray *partitions; /* GPtrArray<sharding_partition_t *> */
GPtrArray *databases; /* GPtrArray<sharding_database_t *> */
};
struct sharding_table_t {
GString *db;
GString *schema;
GString *name;
GString *pkey;
int shard_key_type;
int logic_shard_num;
enum sharding_method_t method;
GPtrArray *partitions; /* GPtrArray<sharding_partition_t *> ref from VDB */
int vdb_id;
struct sharding_vdb_t *vdb;
struct sharding_vdb_t *vdb_ref;
};
GPtrArray *shard_conf_get_table_groups(GPtrArray *groups, char *db, char *table);
GPtrArray *shard_conf_get_any_group(GPtrArray *groups, const char *db, const char *table);
GPtrArray *shard_conf_get_any_group(GPtrArray *groups, char *db, char *table);
GPtrArray *shard_conf_get_all_groups(GPtrArray *groups, const char *db);
GPtrArray *shard_conf_get_all_groups(GPtrArray *groups);
/* same fixture will get same group */
GPtrArray *shard_conf_get_fixed_group(GPtrArray *groups, const char *db, guint32 fixture);
GPtrArray *shard_conf_get_fixed_group(GPtrArray *groups, guint32 fixture);
GPtrArray *shard_conf_get_table_groups(GPtrArray *visited_groups,
const char *db, const char *table);
gboolean shard_conf_is_shard_table(const char *db, const char *table);
@ -108,7 +104,7 @@ GPtrArray *shard_conf_table_partitions(GPtrArray *partitions, const char *db, co
* find partition by group name
* special name "all" will get all groups
*/
void shard_conf_find_groups(GPtrArray *groups, const char *match, const char *db);
void shard_conf_find_groups(GPtrArray *groups, const char *match);
gboolean shard_conf_load(char *, int);