remove sharding feature from gdb.Model (#2758)

This commit is contained in:
John Guo 2023-07-13 21:15:53 +08:00 committed by GitHub
parent 6d7edb1479
commit a2fec50500
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 108 additions and 127 deletions

View File

@ -8,6 +8,7 @@ package mysql_test
import (
"context"
"database/sql"
"fmt"
"testing"
@ -27,15 +28,24 @@ func Test_Model_Sharding_Table(t *testing.T) {
createTable(table2)
defer dropTable(table2)
shardingModel := db.Model(table1).Sharding(
func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) {
out = &gdb.ShardingOutput{
Table: table2,
Schema: in.Schema,
}
return
shardingModel := db.Model(table1).Hook(gdb.HookHandler{
Select: func(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result, err error) {
in.Table = table2
return in.Next(ctx)
},
)
Insert: func(ctx context.Context, in *gdb.HookInsertInput) (result sql.Result, err error) {
in.Table = table2
return in.Next(ctx)
},
Update: func(ctx context.Context, in *gdb.HookUpdateInput) (result sql.Result, err error) {
in.Table = table2
return in.Next(ctx)
},
Delete: func(ctx context.Context, in *gdb.HookDeleteInput) (result sql.Result, err error) {
in.Table = table2
return in.Next(ctx)
},
})
gtest.C(t, func(t *gtest.T) {
r, err := shardingModel.Insert(g.Map{
"id": 1,
@ -126,15 +136,28 @@ func Test_Model_Sharding_Schema(t *testing.T) {
createTableWithDb(db2, table)
defer dropTableWithDb(db2, table)
shardingModel := db.Model(table).Sharding(
func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) {
out = &gdb.ShardingOutput{
Table: table,
Schema: db2.GetSchema(),
}
return
shardingModel := db.Model(table).Hook(gdb.HookHandler{
Select: func(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result, err error) {
in.Table = table
in.Schema = db2.GetSchema()
return in.Next(ctx)
},
)
Insert: func(ctx context.Context, in *gdb.HookInsertInput) (result sql.Result, err error) {
in.Table = table
in.Schema = db2.GetSchema()
return in.Next(ctx)
},
Update: func(ctx context.Context, in *gdb.HookUpdateInput) (result sql.Result, err error) {
in.Table = table
in.Schema = db2.GetSchema()
return in.Next(ctx)
},
Delete: func(ctx context.Context, in *gdb.HookDeleteInput) (result sql.Result, err error) {
in.Table = table
in.Schema = db2.GetSchema()
return in.Next(ctx)
},
})
gtest.C(t, func(t *gtest.T) {
r, err := shardingModel.Insert(g.Map{
"id": 1,

View File

@ -49,7 +49,6 @@ type Model struct {
safe bool // If true, it clones and returns a new model object whenever operation done; or else it changes the attribute of current model.
onDuplicate interface{} // onDuplicate is used for ON "DUPLICATE KEY UPDATE" statement.
onDuplicateEx interface{} // onDuplicateEx is used for excluding some columns ON "DUPLICATE KEY UPDATE" statement.
shardingFunc ShardingFunc // shardingFunc is the custom function for records sharding.
}
// ModelHandler is a function that handles given Model and returns a new Model that is custom modified.

View File

@ -11,6 +11,7 @@ import (
"database/sql"
"fmt"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/text/gregex"
"github.com/gogf/gf/v2/text/gstr"
)
@ -33,11 +34,11 @@ type HookHandler struct {
// internalParamHook manages all internal parameters for hook operations.
// The `internal` obviously means you cannot access these parameters outside this package.
type internalParamHook struct {
link Link // Connection object from third party sql driver.
handlerCalled bool // Simple mark for custom handler called, in case of recursive calling.
removedWhere bool // Removed mark for condition string that was removed `WHERE` prefix.
originalTableName string // The original table name.
originalSchemaName string // The original schema name.
link Link // Connection object from third party sql driver.
handlerCalled bool // Simple mark for custom handler called, in case of recursive calling.
removedWhere bool // Removed mark for condition string that was removed `WHERE` prefix.
originalTableName *gvar.Var // The original table name.
originalSchemaName *gvar.Var // The original schema name.
}
type internalParamHookSelect struct {
@ -65,17 +66,19 @@ type internalParamHookDelete struct {
// which is usually not be interesting for upper business hook handler.
type HookSelectInput struct {
internalParamHookSelect
Model *Model // Current operation Model, which takes no effect if updated.
Table string // The table name that to be used. Update this attribute to change target table name.
Sql string // The sql string that to be committed.
Args []interface{} // The arguments of sql.
Model *Model // Current operation Model.
Table string // The table name that to be used. Update this attribute to change target table name.
Schema string // The schema name that to be used. Update this attribute to change target schema name.
Sql string // The sql string that to be committed.
Args []interface{} // The arguments of sql.
}
// HookInsertInput holds the parameters for insert hook operation.
type HookInsertInput struct {
internalParamHookInsert
Model *Model // Current operation Model, which takes no effect if updated.
Model *Model // Current operation Model.
Table string // The table name that to be used. Update this attribute to change target table name.
Schema string // The schema name that to be used. Update this attribute to change target schema name.
Data List // The data records list to be inserted/saved into table.
Option DoInsertOption // The extra option for data inserting.
}
@ -83,8 +86,9 @@ type HookInsertInput struct {
// HookUpdateInput holds the parameters for update hook operation.
type HookUpdateInput struct {
internalParamHookUpdate
Model *Model // Current operation Model, which takes no effect if updated.
Model *Model // Current operation Model.
Table string // The table name that to be used. Update this attribute to change target table name.
Schema string // The schema name that to be used. Update this attribute to change target schema name.
Data interface{} // Data can be type of: map[string]interface{}/string. You can use type assertion on `Data`.
Condition string // The where condition string for updating.
Args []interface{} // The arguments for sql place-holders.
@ -93,8 +97,9 @@ type HookUpdateInput struct {
// HookDeleteInput holds the parameters for delete hook operation.
type HookDeleteInput struct {
internalParamHookDelete
Model *Model // Current operation Model, which takes no effect if updated.
Model *Model // Current operation Model.
Table string // The table name that to be used. Update this attribute to change target table name.
Schema string // The schema name that to be used. Update this attribute to change target schema name.
Condition string // The where condition string for deleting.
Args []interface{} // The arguments for sql place-holders.
}
@ -108,62 +113,22 @@ func (h *internalParamHook) IsTransaction() bool {
return h.link.IsTransaction()
}
func (h *internalParamHook) handlerSharding(
ctx context.Context, table string, model *Model, isOnMaster bool,
) (newTable string, err error) {
shardingInput := ShardingInput{
Table: table,
Schema: model.db.GetSchema(),
}
newTable = shardingInput.Table
h.originalTableName = shardingInput.Table
h.originalSchemaName = shardingInput.Schema
if model.shardingFunc != nil {
var shardingOutput *ShardingOutput
// Call custom sharding function.
shardingOutput, err = model.shardingFunc(ctx, shardingInput)
if err != nil {
return
}
if shardingOutput != nil {
// Table sharding.
if shardingOutput.Table != "" {
newTable = shardingOutput.Table
}
// Schema sharding.
if shardingOutput.Schema != "" && shardingOutput.Schema != shardingInput.Schema {
if isOnMaster {
// Insert/Update/Delete statements on master node.
h.link, err = model.db.GetCore().MasterLink(shardingOutput.Schema)
} else {
// Select statement on slave node.
h.link, err = model.db.GetCore().SlaveLink(shardingOutput.Schema)
}
if err != nil {
return
}
}
}
}
return
}
// Next calls the next hook handler.
func (h *HookSelectInput) Next(ctx context.Context) (result Result, err error) {
// Sharding feature.
if h.originalTableName == "" {
if h.Table, err = h.handlerSharding(ctx, h.Table, h.Model, false); err != nil {
return
}
if h.originalTableName.IsNil() {
h.originalTableName = gvar.New(h.Table)
}
if h.originalSchemaName.IsNil() {
h.originalSchemaName = gvar.New(h.Schema)
}
// Custom hook handler call.
if h.handler != nil && !h.handlerCalled {
h.handlerCalled = true
return h.handler(ctx, h)
}
var toBeCommittedSql = h.Sql
if h.Table != h.originalTableName {
// Replace table name the table name is changed by hook handler.
// Table change.
if h.Table != h.originalTableName.String() {
toBeCommittedSql, err = gregex.ReplaceStringFuncMatch(
`(?i) FROM ([\S]+)`,
toBeCommittedSql,
@ -173,32 +138,47 @@ func (h *HookSelectInput) Next(ctx context.Context) (result Result, err error) {
},
)
}
// Schema change.
if h.Schema != "" && h.Schema != h.originalSchemaName.String() {
h.link, err = h.Model.db.GetCore().SlaveLink(h.Schema)
if err != nil {
return
}
}
return h.Model.db.DoSelect(ctx, h.link, toBeCommittedSql, h.Args...)
}
// Next calls the next hook handler.
func (h *HookInsertInput) Next(ctx context.Context) (result sql.Result, err error) {
// Sharding feature.
if h.originalTableName == "" {
if h.Table, err = h.handlerSharding(ctx, h.Table, h.Model, true); err != nil {
return
}
if h.originalTableName.IsNil() {
h.originalTableName = gvar.New(h.Table)
}
if h.originalSchemaName.IsNil() {
h.originalSchemaName = gvar.New(h.Schema)
}
if h.handler != nil && !h.handlerCalled {
h.handlerCalled = true
return h.handler(ctx, h)
}
// Schema change.
if h.Schema != "" && h.Schema != h.originalSchemaName.String() {
h.link, err = h.Model.db.GetCore().MasterLink(h.Schema)
if err != nil {
return
}
}
return h.Model.db.DoInsert(ctx, h.link, h.Table, h.Data, h.Option)
}
// Next calls the next hook handler.
func (h *HookUpdateInput) Next(ctx context.Context) (result sql.Result, err error) {
// Sharding feature.
if h.originalTableName == "" {
if h.Table, err = h.handlerSharding(ctx, h.Table, h.Model, true); err != nil {
return
}
if h.originalTableName.IsNil() {
h.originalTableName = gvar.New(h.Table)
}
if h.originalSchemaName.IsNil() {
h.originalSchemaName = gvar.New(h.Schema)
}
if h.handler != nil && !h.handlerCalled {
@ -212,16 +192,23 @@ func (h *HookUpdateInput) Next(ctx context.Context) (result sql.Result, err erro
if h.removedWhere {
h.Condition = whereKeyInCondition + h.Condition
}
// Schema change.
if h.Schema != "" && h.Schema != h.originalSchemaName.String() {
h.link, err = h.Model.db.GetCore().MasterLink(h.Schema)
if err != nil {
return
}
}
return h.Model.db.DoUpdate(ctx, h.link, h.Table, h.Data, h.Condition, h.Args...)
}
// Next calls the next hook handler.
func (h *HookDeleteInput) Next(ctx context.Context) (result sql.Result, err error) {
// Sharding feature.
if h.originalTableName == "" {
if h.Table, err = h.handlerSharding(ctx, h.Table, h.Model, true); err != nil {
return
}
if h.originalTableName.IsNil() {
h.originalTableName = gvar.New(h.Table)
}
if h.originalSchemaName.IsNil() {
h.originalSchemaName = gvar.New(h.Schema)
}
if h.handler != nil && !h.handlerCalled {
@ -235,6 +222,13 @@ func (h *HookDeleteInput) Next(ctx context.Context) (result sql.Result, err erro
if h.removedWhere {
h.Condition = whereKeyInCondition + h.Condition
}
// Schema change.
if h.Schema != "" && h.Schema != h.originalSchemaName.String() {
h.link, err = h.Model.db.GetCore().MasterLink(h.Schema)
if err != nil {
return
}
}
return h.Model.db.DoDelete(ctx, h.link, h.Table, h.Condition, h.Args...)
}

View File

@ -1,35 +0,0 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gdb
import "context"
// ShardingInput holds the input parameters for sharding.
type ShardingInput struct {
Table string // The original table name.
Schema string // The original schema name. Note that this might be empty according database configuration.
}
// ShardingOutput holds the output parameters for sharding.
type ShardingOutput struct {
Table string // The target table name.
Schema string // The target schema name.
}
// ShardingFunc is custom function for records sharding by certain Model, which supports sharding on table and schema.
// It retrieves the original Table/Schema from ShardingInput, and returns the new Table/Schema by ShardingOutput.
// If the Table/Schema in ShardingOutput is empty string, it then ignores the returned value and uses the default
// Table/Schema of current Model to execute the sql statement.
type ShardingFunc func(ctx context.Context, in ShardingInput) (out *ShardingOutput, err error)
// Sharding sets custom sharding function for current model.
// More info please refer to ShardingFunc.
func (m *Model) Sharding(f ShardingFunc) *Model {
model := m.getModel()
model.shardingFunc = f
return model
}