improve clickhouse driver

This commit is contained in:
daguang 2022-04-08 09:44:42 +08:00
parent a594592151
commit c90a9d45ee
4 changed files with 27 additions and 28 deletions

View File

@ -32,12 +32,12 @@ type Driver struct {
var ( var (
// tableFieldsMap caches the table information retrieved from database. // tableFieldsMap caches the table information retrieved from database.
tableFieldsMap = gmap.New(true) tableFieldsMap = gmap.New(true)
ErrUnsupportedInsertIgnore = errors.New("unsupported method:InsertIgnore") errUnsupportedInsertIgnore = errors.New("unsupported method:InsertIgnore")
ErrUnsupportedInsertGetId = errors.New("unsupported method:InsertGetId") errUnsupportedInsertGetId = errors.New("unsupported method:InsertGetId")
ErrUnsupportedReplace = errors.New("unsupported method:Replace") errUnsupportedReplace = errors.New("unsupported method:Replace")
ErrUnsupportedBegin = errors.New("unsupported method:Begin") errUnsupportedBegin = errors.New("unsupported method:Begin")
ErrUnsupportedTransaction = errors.New("unsupported method:Transaction") errUnsupportedTransaction = errors.New("unsupported method:Transaction")
ErrSQLNull = errors.New("SQL cannot be null") errSQLNull = errors.New("SQL cannot be null")
) )
func init() { func init() {
@ -205,7 +205,7 @@ func (d *Driver) ping(conn *sql.DB) error {
// DoFilter handles the sql before posts it to database. // DoFilter handles the sql before posts it to database.
func (d *Driver) DoFilter(ctx context.Context, link gdb.Link, originSql string, args []interface{}) (newSql string, newArgs []interface{}, err error) { func (d *Driver) DoFilter(ctx context.Context, link gdb.Link, originSql string, args []interface{}) (newSql string, newArgs []interface{}, err error) {
// replace MySQL to Clickhouse SQL grammar // replace STD SQL to Clickhouse SQL grammar
// MySQL eg: UPDATE visits SET xxx // MySQL eg: UPDATE visits SET xxx
// Clickhouse eg: ALTER TABLE visits UPDATE xxx // Clickhouse eg: ALTER TABLE visits UPDATE xxx
// MySQL eg: DELETE FROM VISIT // MySQL eg: DELETE FROM VISIT
@ -217,7 +217,7 @@ func (d *Driver) DoFilter(ctx context.Context, link gdb.Link, originSql string,
if len(result) != 0 { if len(result) != 0 {
sqlSlice := strings.Split(originSql, " ") sqlSlice := strings.Split(originSql, " ")
if len(sqlSlice) < 3 { if len(sqlSlice) < 3 {
return "", nil, ErrSQLNull return "", nil, errSQLNull
} }
ck := []string{"ALTER", "TABLE"} ck := []string{"ALTER", "TABLE"}
switch strings.ToUpper(result[0]) { switch strings.ToUpper(result[0]) {
@ -285,23 +285,23 @@ func (d *Driver) DoInsert(ctx context.Context, link gdb.Link, table string, list
// InsertIgnore Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE. // InsertIgnore Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE.
func (d *Driver) InsertIgnore(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) { func (d *Driver) InsertIgnore(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) {
return nil, ErrUnsupportedInsertIgnore return nil, errUnsupportedInsertIgnore
} }
// InsertAndGetId Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE. // InsertAndGetId Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE.
func (d *Driver) InsertAndGetId(ctx context.Context, table string, data interface{}, batch ...int) (int64, error) { func (d *Driver) InsertAndGetId(ctx context.Context, table string, data interface{}, batch ...int) (int64, error) {
return 0, ErrUnsupportedInsertGetId return 0, errUnsupportedInsertGetId
} }
// Replace Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE. // Replace Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE.
func (d *Driver) Replace(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) { func (d *Driver) Replace(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) {
return nil, ErrUnsupportedReplace return nil, errUnsupportedReplace
} }
func (d *Driver) Begin(ctx context.Context) (tx *gdb.TX, err error) { func (d *Driver) Begin(ctx context.Context) (tx *gdb.TX, err error) {
return nil, ErrUnsupportedBegin return nil, errUnsupportedBegin
} }
func (d *Driver) Transaction(ctx context.Context, f func(ctx context.Context, tx *gdb.TX) error) error { func (d *Driver) Transaction(ctx context.Context, f func(ctx context.Context, tx *gdb.TX) error) error {
return ErrUnsupportedTransaction return errUnsupportedTransaction
} }

View File

@ -113,19 +113,19 @@ func TestDriverClickhouse_Select(t *testing.T) {
func TestDriver_InsertIgnore(t *testing.T) { func TestDriver_InsertIgnore(t *testing.T) {
connect := InitClickhouse() connect := InitClickhouse()
_, err := connect.InsertIgnore(context.Background(), "", nil) _, err := connect.InsertIgnore(context.Background(), "", nil)
gtest.AssertEQ(err, ErrUnsupportedInsertIgnore) gtest.AssertEQ(err, errUnsupportedInsertIgnore)
} }
func TestDriver_InsertAndGetId(t *testing.T) { func TestDriver_InsertAndGetId(t *testing.T) {
connect := InitClickhouse() connect := InitClickhouse()
_, err := connect.InsertAndGetId(context.Background(), "", nil) _, err := connect.InsertAndGetId(context.Background(), "", nil)
gtest.AssertEQ(err, ErrUnsupportedInsertGetId) gtest.AssertEQ(err, errUnsupportedInsertGetId)
} }
func TestDriver_Replace(t *testing.T) { func TestDriver_Replace(t *testing.T) {
connect := InitClickhouse() connect := InitClickhouse()
_, err := connect.Replace(context.Background(), "", nil) _, err := connect.Replace(context.Background(), "", nil)
gtest.AssertEQ(err, ErrUnsupportedReplace) gtest.AssertEQ(err, errUnsupportedReplace)
} }
func TestDriverClickhouse_DoInsertOne(t *testing.T) { func TestDriverClickhouse_DoInsertOne(t *testing.T) {
@ -146,7 +146,7 @@ func TestDriver_DoInsertMany(t *testing.T) {
gtest.AssertEQ(createClickhouseTable(connect), nil) gtest.AssertEQ(createClickhouseTable(connect), nil)
defer dropClickhouseTable(connect) defer dropClickhouseTable(connect)
tx, err := connect.Begin(context.Background()) tx, err := connect.Begin(context.Background())
gtest.AssertEQ(err, ErrUnsupportedBegin) gtest.AssertEQ(err, errUnsupportedBegin)
gtest.AssertNil(tx) gtest.AssertNil(tx)
} }

View File

@ -27,12 +27,11 @@ type internalCtxData struct {
const ( const (
internalCtxDataKeyInCtx gctx.StrKey = "InternalCtxData" internalCtxDataKeyInCtx gctx.StrKey = "InternalCtxData"
// IgnoreResultInCtx // `ignoreResultKeyInCtx` is a mark for some db drivers that do not support `RowsAffected` function,
// This option is only available in ClickHouse. // for example: `clickhouse`. The `clickhouse` does not support fetching insert/update results,
// Because ClickHouse does not support fetching insert/update results and returns errors when executed // but returns errors when execute `RowsAffected`. It here ignores the calling of `RowsAffected`
// So need to ignore the results to avoid triggering errors // to avoid triggering errors, rather than ignoring errors after they are triggered.
// Rather than ignoring errors after they are triggered ignoreResultInCtx gctx.StrKey = "IgnoreResult"
IgnoreResultInCtx gctx.StrKey = "IgnoreResult"
) )
func (c *Core) injectInternalCtxData(ctx context.Context) context.Context { func (c *Core) injectInternalCtxData(ctx context.Context) context.Context {
@ -46,16 +45,16 @@ func (c *Core) injectInternalCtxData(ctx context.Context) context.Context {
} }
func (c *Core) InjectIgnoreResult(ctx context.Context) context.Context { func (c *Core) InjectIgnoreResult(ctx context.Context) context.Context {
if ctx.Value(IgnoreResultInCtx) != nil { if ctx.Value(ignoreResultInCtx) != nil {
return ctx return ctx
} }
return context.WithValue(ctx, IgnoreResultInCtx, &internalCtxData{ return context.WithValue(ctx, ignoreResultInCtx, &internalCtxData{
DB: c.db, DB: c.db,
}) })
} }
func (c *Core) GetIgnoreResultFromCtx(ctx context.Context) *internalCtxData { func (c *Core) getIgnoreResultFromCtx(ctx context.Context) *internalCtxData {
if v := ctx.Value(IgnoreResultInCtx); v != nil { if v := ctx.Value(ignoreResultInCtx); v != nil {
return v.(*internalCtxData) return v.(*internalCtxData)
} }
return nil return nil

View File

@ -261,7 +261,7 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp
} }
// Result handling. // Result handling.
switch { switch {
case sqlResult != nil && c.GetIgnoreResultFromCtx(ctx) == nil: case sqlResult != nil && c.getIgnoreResultFromCtx(ctx) == nil:
rowsAffected, err = sqlResult.RowsAffected() rowsAffected, err = sqlResult.RowsAffected()
out.Result = sqlResult out.Result = sqlResult