gf/contrib/drivers/clickhouse/clickhouse.go

308 lines
9.5 KiB
Go
Raw Normal View History

// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
// Package clickhouse implements gdb.Driver, which supports operations for ClickHouse.
package clickhouse
import (
2022-02-19 15:09:44 +08:00
"context"
"database/sql"
"errors"
"fmt"
2022-02-19 16:59:17 +08:00
"github.com/ClickHouse/clickhouse-go"
2022-02-19 23:10:31 +08:00
"strings"
2022-02-19 15:09:44 +08:00
"github.com/gogf/gf/v2/container/gmap"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/text/gregex"
"github.com/gogf/gf/v2/text/gstr"
"github.com/gogf/gf/v2/util/gconv"
)
// Driver is the driver for postgresql database.
type Driver struct {
*gdb.Core
}
var (
// tableFieldsMap caches the table information retrieved from database.
2022-02-19 16:59:17 +08:00
tableFieldsMap = gmap.New(true)
2022-04-08 09:44:42 +08:00
errUnsupportedInsertIgnore = errors.New("unsupported method:InsertIgnore")
errUnsupportedInsertGetId = errors.New("unsupported method:InsertGetId")
errUnsupportedReplace = errors.New("unsupported method:Replace")
errUnsupportedBegin = errors.New("unsupported method:Begin")
errUnsupportedTransaction = errors.New("unsupported method:Transaction")
errSQLNull = errors.New("SQL cannot be null")
)
2022-02-19 15:09:44 +08:00
func init() {
if err := gdb.Register(`clickhouse`, New()); err != nil {
panic(err)
}
}
// New create and returns a driver that implements gdb.Driver, which supports operations for clickhouse.
func New() gdb.Driver {
return &Driver{}
}
// New creates and returns a database object for clickhouse.
// It implements the interface of gdb.Driver for extra database driver installation.
func (d *Driver) New(core *gdb.Core, node *gdb.ConfigNode) (gdb.DB, error) {
return &Driver{
Core: core,
}, nil
}
// Open creates and returns an underlying sql.DB object for clickhouse.
func (d *Driver) Open(config *gdb.ConfigNode) (*sql.DB, error) {
var (
source string
driver = "clickhouse"
)
2022-02-24 21:06:26 +08:00
if config.Link != "" {
source = config.Link
} else if config.Pass != "" {
2022-02-19 15:09:44 +08:00
source = fmt.Sprintf(
2022-02-24 21:06:26 +08:00
"clickhouse://%s:%s@%s:%s/%s?charset=%s&debug=%s",
config.User, config.Pass, config.Host, config.Port, config.Name, config.Charset, gconv.String(config.Debug))
2022-02-19 15:09:44 +08:00
} else {
source = fmt.Sprintf(
2022-02-24 21:06:26 +08:00
"clickhouse://%s@%s:%s/%s?charset=%s&debug=%s",
config.User, config.Host, config.Port, config.Name, config.Charset, gconv.String(config.Debug))
2022-02-19 15:09:44 +08:00
}
db, err := sql.Open(driver, source)
if err != nil {
return nil, err
}
2022-02-19 16:59:17 +08:00
2022-02-19 15:09:44 +08:00
return db, nil
}
// Tables retrieves and returns the tables of current schema.
// It's mainly used in cli tool chain for automatically generating the models.
func (d *Driver) Tables(ctx context.Context, schema ...string) (tables []string, err error) {
var result gdb.Result
link, err := d.SlaveLink(schema...)
if err != nil {
return nil, err
}
2022-02-19 16:59:17 +08:00
query := fmt.Sprintf("select name from `system`.tables where database = '%s'", d.GetConfig().Name)
2022-04-04 12:46:11 +08:00
result, err = d.DoSelect(ctx, link, query)
2022-02-19 15:09:44 +08:00
if err != nil {
return
}
for _, m := range result {
tables = append(tables, m["name"].String())
}
return
}
// TableFields retrieves and returns the fields' information of specified table of current schema.
// Also see DriverMysql.TableFields.
func (d *Driver) TableFields(ctx context.Context, table string, schema ...string) (fields map[string]*gdb.TableField, err error) {
charL, charR := d.GetChars()
table = gstr.Trim(table, charL+charR)
if gstr.Contains(table, " ") {
return nil, gerror.NewCode(gcode.CodeInvalidParameter, "function TableFields supports only single table operations")
}
useSchema := d.GetSchema()
if len(schema) > 0 && schema[0] != "" {
useSchema = schema[0]
}
v := tableFieldsMap.GetOrSetFuncLock(
fmt.Sprintf(`clickhouse_table_fields_%s_%s@group:%s`, table, useSchema, d.GetGroup()),
func() interface{} {
var (
result gdb.Result
link gdb.Link
)
if link, err = d.SlaveLink(useSchema); err != nil {
return nil
}
2022-02-19 16:59:17 +08:00
getColumnsSql := fmt.Sprintf("select name,position,default_expression,comment from `system`.columns c where database = '%s' and `table` = '%s'", d.GetConfig().Name, table)
2022-04-04 12:46:11 +08:00
result, err = d.DoSelect(ctx, link, getColumnsSql)
2022-02-19 15:09:44 +08:00
if err != nil {
return nil
}
fields = make(map[string]*gdb.TableField)
for _, m := range result {
var (
isNull = false
fieldType = m["type"].String()
)
// in clickhouse , filed type like is Nullable(int)
fieldsResult, _ := gregex.MatchString(`^Nullable\((.*?)\)`, fieldType)
if len(fieldsResult) == 2 {
isNull = true
fieldType = fieldsResult[1]
}
fields[m["name"].String()] = &gdb.TableField{
Index: m["position"].Int(),
Name: m["name"].String(),
Default: m["default_expression"].Val(),
Comment: m["comment"].String(),
//Key: m["Key"].String(),
Type: fieldType,
Null: isNull,
}
}
return fields
},
)
if v != nil {
fields = v.(map[string]*gdb.TableField)
}
return
}
// FilteredLink retrieves and returns filtered `linkInfo` that can be using for
// logging or tracing purpose.
func (d *Driver) FilteredLink() string {
linkInfo := d.GetConfig().Link
if linkInfo == "" {
return ""
}
s, _ := gregex.ReplaceString(
`(.+?):(.+)@tcp(.+)`,
`$1:xxx@tcp$3`,
linkInfo,
)
return s
}
// PingMaster pings the master node to check authentication or keeps the connection alive.
func (d *Driver) PingMaster() error {
conn, err := d.Master()
if err != nil {
return err
}
return d.ping(conn)
}
// PingSlave pings the slave node to check authentication or keeps the connection alive.
func (d *Driver) PingSlave() error {
conn, err := d.Slave()
if err != nil {
return err
}
return d.ping(conn)
}
// ping Returns the Clickhouse specific error.
func (d *Driver) ping(conn *sql.DB) error {
err := conn.Ping()
if exception, ok := err.(*clickhouse.Exception); ok {
return errors.New(fmt.Sprintf("[%d]%s", exception.Code, exception.Message))
}
return err
}
2022-02-23 22:51:37 +08:00
// DoFilter handles the sql before posts it to database.
2022-04-04 12:46:11 +08:00
func (d *Driver) DoFilter(ctx context.Context, link gdb.Link, originSql string, args []interface{}) (newSql string, newArgs []interface{}, err error) {
2022-04-08 09:44:42 +08:00
// replace STD SQL to Clickhouse SQL grammar
2022-02-23 22:51:37 +08:00
// MySQL eg: UPDATE visits SET xxx
// Clickhouse eg: ALTER TABLE visits UPDATE xxx
// MySQL eg: DELETE FROM VISIT
// Clickhouse eg: ALTER TABLE VISIT DELETE WHERE filter_expr
2022-04-04 12:46:11 +08:00
result, err := gregex.MatchString("(?i)^UPDATE|DELETE", originSql)
2022-02-23 22:51:37 +08:00
if err != nil {
return "", nil, err
}
if len(result) != 0 {
2022-04-04 12:46:11 +08:00
sqlSlice := strings.Split(originSql, " ")
2022-02-23 22:51:37 +08:00
if len(sqlSlice) < 3 {
2022-04-08 09:44:42 +08:00
return "", nil, errSQLNull
2022-02-23 22:51:37 +08:00
}
ck := []string{"ALTER", "TABLE"}
switch strings.ToUpper(result[0]) {
case "UPDATE":
sqlSlice = append(append(append(ck, sqlSlice[1]), result[0]), sqlSlice[3:]...)
return strings.Join(sqlSlice, " "), args, nil
case "DELETE":
sqlSlice = append(append(append(ck, sqlSlice[2]), result[0]), sqlSlice[3:]...)
return strings.Join(sqlSlice, " "), args, nil
}
}
2022-04-04 12:46:11 +08:00
return originSql, args, nil
2022-02-19 16:59:17 +08:00
}
2022-02-19 17:49:53 +08:00
// DoCommit commits current sql and arguments to underlying sql driver.
2022-02-19 16:59:17 +08:00
func (d *Driver) DoCommit(ctx context.Context, in gdb.DoCommitInput) (out gdb.DoCommitOutput, err error) {
2022-04-04 12:46:11 +08:00
ctx = d.InjectIgnoreResult(ctx)
return d.Core.DoCommit(ctx, in)
2022-02-19 16:59:17 +08:00
}
2022-02-19 17:49:53 +08:00
func (d *Driver) DoInsert(ctx context.Context, link gdb.Link, table string, list gdb.List, option gdb.DoInsertOption) (result sql.Result, err error) {
2022-02-19 23:10:31 +08:00
var (
keys []string // Field names.
valueHolder = make([]string, 0)
)
// Handle the field names and placeholders.
for k := range list[0] {
keys = append(keys, k)
valueHolder = append(valueHolder, "?")
}
// Prepare the batch result pointer.
var (
charL, charR = d.Core.GetChars()
keysStr = charL + strings.Join(keys, charR+","+charL) + charR
holderStr = strings.Join(valueHolder, ",")
tx = &gdb.TX{}
stdSqlResult sql.Result
stmt *gdb.Stmt
)
tx, err = d.Core.Begin(ctx)
if err != nil {
return
}
stmt, err = tx.Prepare(fmt.Sprintf(
"INSERT INTO %s(%s) VALUES (%s)",
d.QuotePrefixTableName(table), keysStr,
holderStr,
))
if err != nil {
return
}
2022-02-24 12:58:57 +08:00
for i := 0; i < len(list); i++ {
2022-02-19 23:10:31 +08:00
params := []interface{}{} // Values that will be committed to underlying database driver.
for _, k := range keys {
params = append(params, list[i][k])
}
// Prepare is allowed to execute only once in a transaction opened by clickhouse
stdSqlResult, err = stmt.ExecContext(ctx, params...)
if err != nil {
return stdSqlResult, err
}
}
2022-02-24 12:58:57 +08:00
return stdSqlResult, tx.Commit()
2022-02-19 17:49:53 +08:00
}
2022-02-19 16:59:17 +08:00
// InsertIgnore Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE.
func (d *Driver) InsertIgnore(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) {
2022-04-08 09:44:42 +08:00
return nil, errUnsupportedInsertIgnore
2022-02-19 16:59:17 +08:00
}
// InsertAndGetId Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE.
func (d *Driver) InsertAndGetId(ctx context.Context, table string, data interface{}, batch ...int) (int64, error) {
2022-04-08 09:44:42 +08:00
return 0, errUnsupportedInsertGetId
2022-02-19 16:59:17 +08:00
}
// Replace Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE.
func (d *Driver) Replace(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) {
2022-04-08 09:44:42 +08:00
return nil, errUnsupportedReplace
2022-02-19 16:59:17 +08:00
}
2022-02-19 23:10:31 +08:00
func (d *Driver) Begin(ctx context.Context) (tx *gdb.TX, err error) {
2022-04-08 09:44:42 +08:00
return nil, errUnsupportedBegin
2022-02-19 23:10:31 +08:00
}
func (d *Driver) Transaction(ctx context.Context, f func(ctx context.Context, tx *gdb.TX) error) error {
2022-04-08 09:44:42 +08:00
return errUnsupportedTransaction
2022-02-19 23:10:31 +08:00
}