2022-01-24 14:41:23 +08:00
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
// Package clickhouse implements gdb.Driver, which supports operations for ClickHouse.
package clickhouse
import (
2022-02-19 15:09:44 +08:00
"context"
"database/sql"
"errors"
"fmt"
2022-02-19 23:10:31 +08:00
"strings"
2022-02-19 15:09:44 +08:00
2022-04-12 21:31:51 +08:00
"github.com/ClickHouse/clickhouse-go"
2022-02-19 15:09:44 +08:00
"github.com/gogf/gf/v2/container/gmap"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/text/gregex"
"github.com/gogf/gf/v2/text/gstr"
"github.com/gogf/gf/v2/util/gconv"
)
// Driver is the driver for postgresql database.
type Driver struct {
* gdb . Core
}
var (
// tableFieldsMap caches the table information retrieved from database.
2022-02-19 16:59:17 +08:00
tableFieldsMap = gmap . New ( true )
2022-04-12 21:31:51 +08:00
errUnsupportedInsertIgnore = errors . New ( "unsupported method: InsertIgnore" )
errUnsupportedInsertGetId = errors . New ( "unsupported method: InsertGetId" )
errUnsupportedReplace = errors . New ( "unsupported method: Replace" )
errUnsupportedBegin = errors . New ( "unsupported method: Begin" )
errUnsupportedTransaction = errors . New ( "unsupported method: Transaction" )
2022-04-08 09:44:42 +08:00
errSQLNull = errors . New ( "SQL cannot be null" )
2022-01-24 14:41:23 +08:00
)
2022-02-19 15:09:44 +08:00
func init ( ) {
if err := gdb . Register ( ` clickhouse ` , New ( ) ) ; err != nil {
panic ( err )
}
}
// New create and returns a driver that implements gdb.Driver, which supports operations for clickhouse.
func New ( ) gdb . Driver {
return & Driver { }
}
// New creates and returns a database object for clickhouse.
// It implements the interface of gdb.Driver for extra database driver installation.
func ( d * Driver ) New ( core * gdb . Core , node * gdb . ConfigNode ) ( gdb . DB , error ) {
return & Driver {
Core : core ,
} , nil
}
// Open creates and returns an underlying sql.DB object for clickhouse.
func ( d * Driver ) Open ( config * gdb . ConfigNode ) ( * sql . DB , error ) {
var (
source string
driver = "clickhouse"
)
2022-04-27 17:15:26 +08:00
// clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60
2022-02-24 21:06:26 +08:00
if config . Link != "" {
source = config . Link
2022-04-27 17:15:26 +08:00
// Custom changing the schema in runtime.
if config . Name != "" {
source , _ = gregex . ReplaceString ( ` @(.+?)/([\w\.\-]+)+ ` , "@$1/" + config . Name , source )
}
2022-02-24 21:06:26 +08:00
} else if config . Pass != "" {
2022-02-19 15:09:44 +08:00
source = fmt . Sprintf (
2022-02-24 21:06:26 +08:00
"clickhouse://%s:%s@%s:%s/%s?charset=%s&debug=%s" ,
config . User , config . Pass , config . Host , config . Port , config . Name , config . Charset , gconv . String ( config . Debug ) )
2022-02-19 15:09:44 +08:00
} else {
source = fmt . Sprintf (
2022-02-24 21:06:26 +08:00
"clickhouse://%s@%s:%s/%s?charset=%s&debug=%s" ,
config . User , config . Host , config . Port , config . Name , config . Charset , gconv . String ( config . Debug ) )
2022-02-19 15:09:44 +08:00
}
db , err := sql . Open ( driver , source )
if err != nil {
return nil , err
}
2022-02-19 16:59:17 +08:00
2022-02-19 15:09:44 +08:00
return db , nil
}
// Tables retrieves and returns the tables of current schema.
// It's mainly used in cli tool chain for automatically generating the models.
func ( d * Driver ) Tables ( ctx context . Context , schema ... string ) ( tables [ ] string , err error ) {
var result gdb . Result
link , err := d . SlaveLink ( schema ... )
if err != nil {
return nil , err
}
2022-02-19 16:59:17 +08:00
query := fmt . Sprintf ( "select name from `system`.tables where database = '%s'" , d . GetConfig ( ) . Name )
2022-04-04 12:46:11 +08:00
result , err = d . DoSelect ( ctx , link , query )
2022-02-19 15:09:44 +08:00
if err != nil {
return
}
for _ , m := range result {
tables = append ( tables , m [ "name" ] . String ( ) )
}
return
}
// TableFields retrieves and returns the fields' information of specified table of current schema.
// Also see DriverMysql.TableFields.
2022-04-12 21:31:51 +08:00
func ( d * Driver ) TableFields (
ctx context . Context , table string , schema ... string ,
) ( fields map [ string ] * gdb . TableField , err error ) {
2022-02-19 15:09:44 +08:00
charL , charR := d . GetChars ( )
table = gstr . Trim ( table , charL + charR )
if gstr . Contains ( table , " " ) {
return nil , gerror . NewCode ( gcode . CodeInvalidParameter , "function TableFields supports only single table operations" )
}
useSchema := d . GetSchema ( )
if len ( schema ) > 0 && schema [ 0 ] != "" {
useSchema = schema [ 0 ]
}
v := tableFieldsMap . GetOrSetFuncLock (
fmt . Sprintf ( ` clickhouse_table_fields_%s_%s@group:%s ` , table , useSchema , d . GetGroup ( ) ) ,
func ( ) interface { } {
var (
result gdb . Result
link gdb . Link
)
if link , err = d . SlaveLink ( useSchema ) ; err != nil {
return nil
}
2022-02-19 16:59:17 +08:00
getColumnsSql := fmt . Sprintf ( "select name,position,default_expression,comment from `system`.columns c where database = '%s' and `table` = '%s'" , d . GetConfig ( ) . Name , table )
2022-04-04 12:46:11 +08:00
result , err = d . DoSelect ( ctx , link , getColumnsSql )
2022-02-19 15:09:44 +08:00
if err != nil {
return nil
}
fields = make ( map [ string ] * gdb . TableField )
for _ , m := range result {
var (
isNull = false
fieldType = m [ "type" ] . String ( )
)
// in clickhouse , filed type like is Nullable(int)
fieldsResult , _ := gregex . MatchString ( ` ^Nullable\((.*?)\) ` , fieldType )
if len ( fieldsResult ) == 2 {
isNull = true
fieldType = fieldsResult [ 1 ]
}
fields [ m [ "name" ] . String ( ) ] = & gdb . TableField {
Index : m [ "position" ] . Int ( ) ,
Name : m [ "name" ] . String ( ) ,
Default : m [ "default_expression" ] . Val ( ) ,
Comment : m [ "comment" ] . String ( ) ,
//Key: m["Key"].String(),
Type : fieldType ,
Null : isNull ,
}
}
return fields
} ,
)
if v != nil {
fields = v . ( map [ string ] * gdb . TableField )
}
return
}
// FilteredLink retrieves and returns filtered `linkInfo` that can be using for
// logging or tracing purpose.
func ( d * Driver ) FilteredLink ( ) string {
linkInfo := d . GetConfig ( ) . Link
if linkInfo == "" {
return ""
}
s , _ := gregex . ReplaceString (
` (.+?):(.+)@tcp(.+) ` ,
` $1:xxx@tcp$3 ` ,
linkInfo ,
)
return s
}
// PingMaster pings the master node to check authentication or keeps the connection alive.
func ( d * Driver ) PingMaster ( ) error {
conn , err := d . Master ( )
if err != nil {
return err
}
return d . ping ( conn )
}
// PingSlave pings the slave node to check authentication or keeps the connection alive.
func ( d * Driver ) PingSlave ( ) error {
conn , err := d . Slave ( )
if err != nil {
return err
}
return d . ping ( conn )
}
// ping Returns the Clickhouse specific error.
func ( d * Driver ) ping ( conn * sql . DB ) error {
err := conn . Ping ( )
if exception , ok := err . ( * clickhouse . Exception ) ; ok {
return errors . New ( fmt . Sprintf ( "[%d]%s" , exception . Code , exception . Message ) )
}
return err
}
2022-02-23 22:51:37 +08:00
// DoFilter handles the sql before posts it to database.
2022-04-12 21:31:51 +08:00
func ( d * Driver ) DoFilter (
ctx context . Context , link gdb . Link , originSql string , args [ ] interface { } ,
) ( newSql string , newArgs [ ] interface { } , err error ) {
// It replaces STD SQL to Clickhouse SQL grammar.
// MySQL eg: UPDATE visits SET xxx
2022-02-23 22:51:37 +08:00
// Clickhouse eg: ALTER TABLE visits UPDATE xxx
2022-04-12 21:31:51 +08:00
// MySQL eg: DELETE FROM VISIT
2022-02-23 22:51:37 +08:00
// Clickhouse eg: ALTER TABLE VISIT DELETE WHERE filter_expr
2022-04-04 12:46:11 +08:00
result , err := gregex . MatchString ( "(?i)^UPDATE|DELETE" , originSql )
2022-02-23 22:51:37 +08:00
if err != nil {
return "" , nil , err
}
if len ( result ) != 0 {
2022-04-04 12:46:11 +08:00
sqlSlice := strings . Split ( originSql , " " )
2022-02-23 22:51:37 +08:00
if len ( sqlSlice ) < 3 {
2022-04-08 09:44:42 +08:00
return "" , nil , errSQLNull
2022-02-23 22:51:37 +08:00
}
ck := [ ] string { "ALTER" , "TABLE" }
switch strings . ToUpper ( result [ 0 ] ) {
case "UPDATE" :
sqlSlice = append ( append ( append ( ck , sqlSlice [ 1 ] ) , result [ 0 ] ) , sqlSlice [ 3 : ] ... )
return strings . Join ( sqlSlice , " " ) , args , nil
case "DELETE" :
sqlSlice = append ( append ( append ( ck , sqlSlice [ 2 ] ) , result [ 0 ] ) , sqlSlice [ 3 : ] ... )
return strings . Join ( sqlSlice , " " ) , args , nil
}
}
2022-04-04 12:46:11 +08:00
return originSql , args , nil
2022-02-19 16:59:17 +08:00
}
2022-02-19 17:49:53 +08:00
// DoCommit commits current sql and arguments to underlying sql driver.
2022-02-19 16:59:17 +08:00
func ( d * Driver ) DoCommit ( ctx context . Context , in gdb . DoCommitInput ) ( out gdb . DoCommitOutput , err error ) {
2022-04-04 12:46:11 +08:00
ctx = d . InjectIgnoreResult ( ctx )
return d . Core . DoCommit ( ctx , in )
2022-02-19 16:59:17 +08:00
}
2022-04-12 21:31:51 +08:00
func ( d * Driver ) DoInsert (
ctx context . Context , link gdb . Link , table string , list gdb . List , option gdb . DoInsertOption ,
) ( result sql . Result , err error ) {
2022-02-19 23:10:31 +08:00
var (
keys [ ] string // Field names.
valueHolder = make ( [ ] string , 0 )
)
// Handle the field names and placeholders.
for k := range list [ 0 ] {
keys = append ( keys , k )
valueHolder = append ( valueHolder , "?" )
}
// Prepare the batch result pointer.
var (
charL , charR = d . Core . GetChars ( )
keysStr = charL + strings . Join ( keys , charR + "," + charL ) + charR
holderStr = strings . Join ( valueHolder , "," )
tx = & gdb . TX { }
stdSqlResult sql . Result
stmt * gdb . Stmt
)
tx , err = d . Core . Begin ( ctx )
if err != nil {
return
}
stmt , err = tx . Prepare ( fmt . Sprintf (
"INSERT INTO %s(%s) VALUES (%s)" ,
d . QuotePrefixTableName ( table ) , keysStr ,
holderStr ,
) )
if err != nil {
return
}
2022-02-24 12:58:57 +08:00
for i := 0 ; i < len ( list ) ; i ++ {
2022-04-12 21:31:51 +08:00
params := make ( [ ] interface { } , 0 ) // Values that will be committed to underlying database driver.
2022-02-19 23:10:31 +08:00
for _ , k := range keys {
params = append ( params , list [ i ] [ k ] )
}
// Prepare is allowed to execute only once in a transaction opened by clickhouse
stdSqlResult , err = stmt . ExecContext ( ctx , params ... )
if err != nil {
return stdSqlResult , err
}
}
2022-02-24 12:58:57 +08:00
return stdSqlResult , tx . Commit ( )
2022-02-19 17:49:53 +08:00
}
2022-02-19 16:59:17 +08:00
// InsertIgnore Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE.
func ( d * Driver ) InsertIgnore ( ctx context . Context , table string , data interface { } , batch ... int ) ( sql . Result , error ) {
2022-04-08 09:44:42 +08:00
return nil , errUnsupportedInsertIgnore
2022-02-19 16:59:17 +08:00
}
// InsertAndGetId Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE.
func ( d * Driver ) InsertAndGetId ( ctx context . Context , table string , data interface { } , batch ... int ) ( int64 , error ) {
2022-04-08 09:44:42 +08:00
return 0 , errUnsupportedInsertGetId
2022-02-19 16:59:17 +08:00
}
// Replace Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE.
func ( d * Driver ) Replace ( ctx context . Context , table string , data interface { } , batch ... int ) ( sql . Result , error ) {
2022-04-08 09:44:42 +08:00
return nil , errUnsupportedReplace
2022-02-19 16:59:17 +08:00
}
2022-02-19 23:10:31 +08:00
func ( d * Driver ) Begin ( ctx context . Context ) ( tx * gdb . TX , err error ) {
2022-04-08 09:44:42 +08:00
return nil , errUnsupportedBegin
2022-02-19 23:10:31 +08:00
}
func ( d * Driver ) Transaction ( ctx context . Context , f func ( ctx context . Context , tx * gdb . TX ) error ) error {
2022-04-08 09:44:42 +08:00
return errUnsupportedTransaction
2022-02-19 23:10:31 +08:00
}