Modify sql when partitioned

This commit is contained in:
lazio579 2019-01-14 19:20:28 +08:00
parent 391cba3c2a
commit 3fa5bc437e
3 changed files with 26 additions and 2 deletions

View File

@ -59,8 +59,9 @@ typedef struct sql_context_t {
enum sql_parsing_place_t parsing_place;
struct sql_property_t *property;
int is_parsing_subquery;
int allow_subquery_nesting;
unsigned int is_parsing_subquery:1;
unsigned int allow_subquery_nesting:1;
unsigned int sql_needs_reconstruct:1;
} sql_context_t;
void sql_context_init(sql_context_t *);

View File

@ -382,6 +382,7 @@ sharding_get_sql(network_mysqld_con *con, const GString *group)
if (!con->srv->is_partition_mode) {
return sharding_plan_get_sql(con->sharding_plan, group);
} else {
g_message("%s: first group:%s, now group:%s for con:%p", G_STRLOC, con->first_group->str, group->str, con);
if (g_string_equal(con->first_group, group)) {
return sharding_plan_get_sql(con->sharding_plan, group);
} else {
@ -389,6 +390,7 @@ sharding_get_sql(network_mysqld_con *con, const GString *group)
sql_context_t *context = st->sql_context;
GString *new_sql = sharding_modify_sql(context, &(con->hav_condi),
con->srv->is_groupby_need_reconstruct, con->srv->is_partition_mode, con->sharding_plan->groups->len);
g_message("%s: new sql:%s for con:%p", G_STRLOC, new_sql->str, con);
return new_sql;
}
}

View File

@ -222,6 +222,8 @@ modify_select(sql_context_t *context, having_condition_t *hav_condi, int is_grou
GString *new_sql = NULL;
if (having) {
need_reconstruct = TRUE;
} else if (context->sql_needs_reconstruct) {
need_reconstruct = TRUE;
}
if (need_reconstruct) {
@ -1039,6 +1041,10 @@ routing_select(sql_context_t *context, const sql_select_t *select,
}
}
if (partition_mode) {
context->sql_needs_reconstruct = 1;
}
gboolean has_sharding_key = FALSE;
for (i = 0; i < sharding_tables->len; ++i) {
sql_src_item_t *shard_table = g_ptr_array_index(sharding_tables, i);
@ -1092,6 +1098,9 @@ routing_select(sql_context_t *context, const sql_select_t *select,
sql_src_item_t *shard_table = g_ptr_array_index(sharding_tables, 0);
char *db = shard_table->dbname ? shard_table->dbname : default_db;
shard_conf_get_table_groups(groups, db, shard_table->table_name);
if (partition_mode) {
dup_groups(shard_table, groups);
}
}
g_ptr_array_free(sharding_tables, TRUE);
return USE_ALL_SHARDINGS;
@ -1151,6 +1160,10 @@ routing_update(sql_context_t *context, sql_update_t *update,
}
return USE_NON_SHARDING_TABLE;
}
if (partition_mode) {
context->sql_needs_reconstruct = 1;
}
plan->table_type = SHARDED_TABLE;
sharding_table_t *shard_info = shard_conf_get_info(db, table->table_name);
int key_occur = optimize_sharding_condition(update->where_clause,
@ -1356,6 +1369,9 @@ routing_insert(sql_context_t *context, sql_insert_t *insert, char *default_db, s
return USE_NON_SHARDING_TABLE;
}
if (partition_mode) {
context->sql_needs_reconstruct = 1;
}
plan->table_type = SHARDED_TABLE;
sql_id_list_t *cols = insert->columns;
if (cols == NULL) {
@ -1459,6 +1475,9 @@ routing_delete(sql_context_t *context, sql_delete_t *delete,
return USE_NON_SHARDING_TABLE;
}
if (partition_mode) {
context->sql_needs_reconstruct = 1;
}
table->groups = g_ptr_array_new();
plan->table_type = SHARDED_TABLE;
if (!delete->where_clause) {
@ -1566,6 +1585,8 @@ int
sharding_parse_groups(GString *default_db, sql_context_t *context, query_stats_t *stats,
guint64 fixture, sharding_plan_t *plan, int partition_mode)
{
context->sql_needs_reconstruct = 0;
GPtrArray *groups = g_ptr_array_new();
if (context == NULL) {
g_warning("%s:sql is not parsed", G_STRLOC);