Fix more join problems for partition mode

This commit is contained in:
lazio579 2019-01-17 18:14:30 +08:00
parent 76bc1e1692
commit 080a512a79
3 changed files with 109 additions and 40 deletions

View File

@ -408,7 +408,12 @@ sql_construct_select(sql_select_t *select, int explain)
}
if (select->where_clause) {
g_string_append(s, " WHERE ");
append_sql_expr(s, select->where_clause);
sql_expr_t *expr = select->where_clause;
if (expr->modify_flag) {
sql_expr_traverse(s, expr);
} else {
append_sql_expr(s, expr);
}
}
if (select->groupby_clause) {
sql_expr_list_t *groupby = select->groupby_clause;

View File

@ -180,7 +180,9 @@ struct sql_table_reference_t {
typedef struct sql_table_reference_t sql_table_reference_t;
struct sql_expr_t {
uint16_t height; /* Height of the tree headed by this node */
uint16_t op; /* Operation performed by this node */
unsigned int modify_flag:1;
char *token_text; /* Token value. Zero terminated and dequoted */
int64_t num_value;
sql_expr_t *left;
@ -189,7 +191,6 @@ struct sql_expr_t {
sql_expr_list_t *list; /* op = IN, EXISTS, SELECT, CASE, FUNCTION, BETWEEN */
sql_select_t *select; /* EP_xIsSelect and op = IN, EXISTS, SELECT */
int height; /* Height of the tree headed by this node */
char *alias;
enum sql_expr_flags_t flags;
enum sql_var_scope_t var_scope; /* variable scope: SESSION(default) or GLOBAL */

View File

@ -146,10 +146,10 @@ prepare_for_sql_modify_orderby(sql_select_t *select)
{
sql_expr_list_t *columns = select->columns;
sql_column_list_t *orderby = NULL;
int i = 0;
if (select->orderby_clause) {
orderby = select->orderby_clause;
}
int i;
for (i = 0; i < columns->len; ++i) {
sql_expr_t *mcol = g_ptr_array_index(columns, i);
if (!(mcol->flags & EP_ORDER_BY)) {
@ -293,7 +293,7 @@ sql_select_contains_sharding_table(sql_select_t *select, char **current_db /* in
char *db = *current_db;
while (select) {
sql_src_list_t *sources = select->from_src;
int i = 0;
int i;
for (i = 0; sources && i < sources->len; ++i) {
sql_src_item_t *src = g_ptr_array_index(sources, i);
if (src->dbname) {
@ -407,7 +407,7 @@ partition_satisfies(sharding_partition_t *partition, struct condition_t cond)
static void
partitions_filter(GPtrArray *partitions, struct condition_t cond)
{
int i = 0;
int i;
for (i = 0; i < partitions->len; ++i) {
sharding_partition_t *gp = g_ptr_array_index(partitions, i);
if (!partition_satisfies(gp, cond)) {
@ -421,7 +421,7 @@ partitions_filter(GPtrArray *partitions, struct condition_t cond)
static void
partitions_collect(GPtrArray *from_partitions, struct condition_t cond, GPtrArray *to_partitions)
{
int i = 0;
int i;
for (i = 0; i < from_partitions->len; ++i) {
sharding_partition_t *gp = g_ptr_array_index(from_partitions, i);
if (partition_satisfies(gp, cond)) {
@ -434,7 +434,7 @@ partitions_collect(GPtrArray *from_partitions, struct condition_t cond, GPtrArra
sharding_partition_t *
partitions_get(GPtrArray *from_partitions, struct condition_t cond)
{
int i = 0;
int i;
for (i = 0; i < from_partitions->len; ++i) {
sharding_partition_t *gp = g_ptr_array_index(from_partitions, i);
if (partition_satisfies(gp, cond)) {
@ -447,7 +447,7 @@ partitions_get(GPtrArray *from_partitions, struct condition_t cond)
static void
partitions_merge(GPtrArray *partitions, GPtrArray *other)
{
int i = 0;
int i;
for (i = 0; i < other->len; ++i) {
sharding_partition_t *gp = g_ptr_array_index(other, i);
g_ptr_array_add(partitions, gp);
@ -458,8 +458,8 @@ partitions_merge(GPtrArray *partitions, GPtrArray *other)
static GPtrArray *
partitions_dup(GPtrArray *partitions)
{
int i = 0;
GPtrArray *dup = g_ptr_array_new();
int i;
for (i = 0; i < partitions->len; ++i) {
sharding_partition_t *gp = g_ptr_array_index(partitions, i);
g_ptr_array_add(dup, gp);
@ -760,7 +760,7 @@ optimize_sharding_condition(sql_expr_t *where, const sql_src_item_t *src, const
static void
partitions_get_group_names(GPtrArray *partitions, GPtrArray *groups)
{
int i = 0;
int i;
for (i = 0; i < partitions->len; ++i) {
sharding_partition_t *gp = g_ptr_array_index(partitions, i);
g_ptr_array_add(groups, gp->group_name);
@ -899,7 +899,7 @@ sql_select_get_single_tables(sql_select_t *select, char *current_db, GList **sin
char *db = current_db;
while (select) {
sql_src_list_t *sources = select->from_src;
int i = 0;
int i;
for (i = 0; sources && i < sources->len; ++i) {
sql_src_item_t *src = g_ptr_array_index(sources, i);
if (src->dbname) {
@ -913,13 +913,61 @@ sql_select_get_single_tables(sql_select_t *select, char *current_db, GList **sin
}
}
static void
dup_groups(sql_src_item_t *table, GPtrArray *groups)
{
sql_src_item_t *src = table;
if (src->table_name == NULL) {
if (src->select) {
sql_src_list_t *sources = src->select->from_src;
int i;
for (i = 0; sources && i < sources->len; ++i) {
src = g_ptr_array_index(sources, i);
dup_groups(src, groups);
}
return;
} else {
return;
}
}
src->groups = g_ptr_array_new();
int i;
for (i = 0; i < groups->len; i++) {
GString *group = g_ptr_array_index(groups, i);
g_ptr_array_add(table->groups, group);
}
}
static void
sql_select_check_and_set_shard_table(sql_expr_t *where, sql_select_t *select, char *current_db, GPtrArray *groups)
{
char *db = current_db;
while (select) {
sql_src_list_t *sources = select->from_src;
int i;
for (i = 0; sources && i < sources->len; ++i) {
sql_src_item_t *src = g_ptr_array_index(sources, i);
if (src->dbname) {
db = src->dbname;
}
if (src->table_name && shard_conf_is_shard_table(db, src->table_name)) {
dup_groups(src, groups);
where->modify_flag = 1;
}
}
select = select->prior;
}
}
static gboolean
sql_select_has_single_table(sql_select_t *select, char *current_db)
{
char *db = current_db;
while (select) {
sql_src_list_t *sources = select->from_src;
int i = 0;
int i;
for (i = 0; sources && i < sources->len; ++i) {
sql_src_item_t *src = g_ptr_array_index(sources, i);
if (src->dbname) {
@ -934,27 +982,24 @@ sql_select_has_single_table(sql_select_t *select, char *current_db)
return FALSE;
}
static void
dup_groups(sql_src_item_t *table, GPtrArray *groups)
{
table->groups = g_ptr_array_new();
int i;
for (i = 0; i < groups->len; i++) {
GString *group = g_ptr_array_index(groups, i);
g_ptr_array_add(table->groups, group);
}
}
static void
dup_groups_for_partition(sql_src_list_t *sources, GPtrArray *groups)
dup_groups_for_partition(sql_expr_t *where, sql_src_list_t *sources, GList *subqueries, char *default_db, GPtrArray *groups)
{
int i;
for (i = 0; i < sources->len; ++i) {
sql_src_item_t *src = g_ptr_array_index(sources, i);
if (src->groups == NULL || src->groups->len == 0) {
dup_groups(src, groups);
if (sources) {
int i;
for (i = 0; i < sources->len; ++i) {
sql_src_item_t *src = g_ptr_array_index(sources, i);
if (src->groups == NULL || src->groups->len == 0) {
dup_groups(src, groups);
}
}
}
GList *l;
for (l = subqueries; l; l = l->next) {
sql_select_check_and_set_shard_table(where, l->data, default_db, groups);
}
}
static int
@ -967,6 +1012,10 @@ routing_select(sql_context_t *context, const sql_select_t *select,
stats->com_select_global += 1;
return USE_NON_SHARDING_TABLE;
}
GList *subqueries = NULL;
sql_expr_find_subqueries(select->where_clause, &subqueries);
GPtrArray *sharding_tables = g_ptr_array_new();
GList *single_tables = NULL;
int i;
@ -978,12 +1027,15 @@ routing_select(sql_context_t *context, const sql_select_t *select,
sharding_filter_sql(context); /* sharding table inside sub-query, should be filterd */
if (context->rc == PARSE_NOT_SUPPORT) {
g_ptr_array_free(sharding_tables, TRUE);
g_list_free(subqueries);
return ERROR_UNPARSABLE;
}
shard_conf_get_table_groups(groups, db, table);
if (partition_mode) {
dup_groups_for_partition(sources, groups);
dup_groups_for_partition(select->where_clause, sources, subqueries, default_db, groups);
context->sql_needs_reconstruct = 1;
}
g_list_free(subqueries);
g_ptr_array_free(sharding_tables, TRUE);
return USE_ALL_SHARDINGS;
}
@ -1005,6 +1057,7 @@ routing_select(sql_context_t *context, const sql_select_t *select,
if (sharding_tables->len > 0) {
g_ptr_array_free(sharding_tables, TRUE);
g_list_free(single_tables);
g_list_free(subqueries);
sql_context_append_msg(context, "(cetus) JOIN single-table WITH sharding-table");
return ERROR_UNPARSABLE;
}
@ -1018,18 +1071,18 @@ routing_select(sql_context_t *context, const sql_select_t *select,
if (groups->len > 1) {
g_ptr_array_free(sharding_tables, TRUE);
g_list_free(single_tables);
g_list_free(subqueries);
sql_context_append_msg(context, "(cetus)JOIN multiple single-tables not allowed");
return ERROR_UNPARSABLE;
} else {
g_ptr_array_free(sharding_tables, TRUE);
g_list_free(single_tables);
g_list_free(subqueries);
return USE_NON_SHARDING_TABLE;
}
}
/* handle subquery in where clause */
GList *subqueries = NULL;
sql_expr_find_subqueries(select->where_clause, &subqueries);
GList *l;
for (l = subqueries; l; l = l->next) {
if (sql_select_has_single_table(l->data, default_db)) {
@ -1039,27 +1092,24 @@ routing_select(sql_context_t *context, const sql_select_t *select,
return ERROR_UNPARSABLE;
}
}
g_list_free(subqueries);
if (sharding_tables->len == 0) {
shard_conf_get_fixed_group(groups, fixture);
g_ptr_array_free(sharding_tables, TRUE);
stats->com_select_global += 1;
g_list_free(subqueries);
return USE_NON_SHARDING_TABLE;
}
if (sharding_tables->len >= 2) {
if (!join_on_sharding_key(default_db, sharding_tables, select->where_clause)) {
g_ptr_array_free(sharding_tables, TRUE);
g_list_free(subqueries);
sql_context_append_msg(context, "(proxy)JOIN must inside VDB and have explicit join-on condition");
return ERROR_UNPARSABLE;
}
}
if (partition_mode) {
context->sql_needs_reconstruct = 1;
}
gboolean has_sharding_key = FALSE;
for (i = 0; i < sharding_tables->len; ++i) {
sql_src_item_t *shard_table = g_ptr_array_index(sharding_tables, i);
@ -1086,6 +1136,7 @@ routing_select(sql_context_t *context, const sql_select_t *select,
g_warning(G_STRLOC ":unrecognized key ranges");
g_ptr_array_free(partitions, TRUE);
g_ptr_array_free(sharding_tables, TRUE);
g_list_free(subqueries);
sql_context_append_msg(context, "(proxy)sharding key parse error");
return ERROR_UNPARSABLE;
} else if (rc == PARSE_UNRECOGNIZED) {
@ -1094,8 +1145,10 @@ routing_select(sql_context_t *context, const sql_select_t *select,
shard_conf_get_table_groups(groups, db, shard_table->table_name);
stats->com_select_bad_key += 1;
if (partition_mode) {
dup_groups_for_partition(sources, groups);
dup_groups_for_partition(select->where_clause, sources, subqueries, default_db, groups);
context->sql_needs_reconstruct = 1;
}
g_list_free(subqueries);
return USE_ALL_SHARDINGS;
}
partitions_get_group_names(partitions, groups);
@ -1108,6 +1161,11 @@ routing_select(sql_context_t *context, const sql_select_t *select,
if (groups->len > 0) {
g_ptr_array_free(sharding_tables, TRUE);
if (partition_mode) {
dup_groups_for_partition(select->where_clause, NULL, subqueries, default_db, groups);
context->sql_needs_reconstruct = 1;
}
g_list_free(subqueries);
return USE_SHARDING;
} else {
/* has sharding table, but no sharding key
@ -1121,6 +1179,11 @@ routing_select(sql_context_t *context, const sql_select_t *select,
}
}
g_ptr_array_free(sharding_tables, TRUE);
if (partition_mode) {
dup_groups_for_partition(select->where_clause, NULL, subqueries, default_db, groups);
context->sql_needs_reconstruct = 1;
}
g_list_free(subqueries);
return USE_ALL_SHARDINGS;
}
@ -1193,7 +1256,7 @@ routing_update(sql_context_t *context, sql_update_t *update,
sharding_table_t *shard_info = shard_conf_get_info(db, table->table_name);
int key_occur = optimize_sharding_condition(update->where_clause,
table, shard_info->pkey->str);
int i = 0;
int i;
/* update sharding key is not allowed */
for (i = 0; update->set_list && i < update->set_list->len; ++i) {
sql_expr_t *equation = g_ptr_array_index(update->set_list, i);
@ -1780,7 +1843,7 @@ select_compare_orderby(sql_select_t *select)
}
if (columns->len >= ord_cols->len) {
int i = 0;
int i;
for (i = 0; i < ord_cols->len; ++i) {
sql_column_t *ordcol = g_ptr_array_index(ord_cols, i);
sql_expr_t *ord = ordcol->expr;
@ -1953,7 +2016,7 @@ sharding_filter_sql(sql_context_t *context)
if (select->groupby_clause) {
sql_expr_list_t *groupby = select->groupby_clause;
int i = 0;
int i;
for (i = 0; i < groupby->len; ++i) {
sql_expr_t *col = g_ptr_array_index(groupby, i);
if (col->op == TK_CASE) {