gdb包事务功能改进

This commit is contained in:
John 2018-03-09 17:55:42 +08:00
parent 6fb8746316
commit cdc8293087
7 changed files with 455 additions and 190 deletions

View File

@ -27,22 +27,54 @@ const (
// 数据库操作接口
type Link interface {
// 打开数据库连接,建立数据库操作对象
Open (c *ConfigNode) (*sql.DB, error)
Close() error
// SQL操作方法
Query(q string, args ...interface{}) (*sql.Rows, error)
Exec(q string, args ...interface{}) (sql.Result, error)
Prepare(q string) (*sql.Stmt, error)
// 数据库查询
GetAll(q string, args ...interface{}) (List, error)
GetOne(q string, args ...interface{}) (Map, error)
GetValue(q string, args ...interface{}) (interface{}, error)
// Ping
PingMaster() error
PingSlave() error
// 连接属性设置
SetMaxIdleConns(n int)
SetMaxOpenConns(n int)
// 开启事务操作
Begin() (*sql.Tx, error)
// 数据表插入/更新/保存操作
Insert(table string, data Map) (sql.Result, error)
Replace(table string, data Map) (sql.Result, error)
Save(table string, data Map) (sql.Result, error)
// 数据表插入/更新/保存操作(批量)
BatchInsert(table string, list List, batch int) (sql.Result, error)
BatchReplace(table string, list List, batch int) (sql.Result, error)
BatchSave(table string, list List, batch int) (sql.Result, error)
// 数据修改/删除
Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error)
Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error)
// 创建链式操作对象(Table为From的别名)
Table(tables string) (*DbOp)
From(tables string) (*DbOp)
// 关闭数据库操作对象
Close() error
// 内部方法
insert(table string, data Map, option uint8) (sql.Result, error)
batchInsert(table string, list List, batch int, option uint8) (sql.Result, error)
setMaster(master *sql.DB)
setSlave(slave *sql.DB)
setQuoteChar(left string, right string)
@ -50,36 +82,15 @@ type Link interface {
getQuoteCharLeft () string
getQuoteCharRight () string
handleSqlBeforeExec(q *string) *string
Begin() (*sql.Tx, error)
Commit() error
Rollback() error
insert(table string, data Map, option uint8) (sql.Result, error)
Insert(table string, data Map) (sql.Result, error)
Replace(table string, data Map) (sql.Result, error)
Save(table string, data Map) (sql.Result, error)
batchInsert(table string, list List, batch int, option uint8) (sql.Result, error)
BatchInsert(table string, list List, batch int) (sql.Result, error)
BatchReplace(table string, list List, batch int) (sql.Result, error)
BatchSave(table string, list List, batch int) (sql.Result, error)
Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error)
Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error)
Table(tables string) (*gLinkOp)
From(tables string) (*gLinkOp)
}
// 数据库链接对象
type dbLink struct {
link Link
transaction *sql.Tx
master *sql.DB
slave *sql.DB
charl string
charr string
type Db struct {
link Link
master *sql.DB
slave *sql.DB
charl string
charr string
}
// 关联数组,绑定一条数据表记录
@ -186,10 +197,10 @@ func newLink (masterNode *ConfigNode, slaveNode *ConfigNode) (Link, error) {
var link Link
switch masterNode.Type {
case "mysql":
link = Link(&mysqlLink{})
link = Link(&dbmysql{})
case "pgsql":
link = Link(&pgsqlLink{})
link = Link(&dbpgsql{})
default:
return nil, errors.New(fmt.Sprintf("unsupported db type '%s'", masterNode.Type))
@ -213,23 +224,23 @@ func newLink (masterNode *ConfigNode, slaveNode *ConfigNode) (Link, error) {
}
// 设置master链接对象
func (l *dbLink) setMaster(master *sql.DB) {
l.master = master
func (db *Db) setMaster(master *sql.DB) {
db.master = master
}
// 设置slave链接对象
func (l *dbLink) setSlave(slave *sql.DB) {
l.slave = slave
func (db *Db) setSlave(slave *sql.DB) {
db.slave = slave
}
// 设置当前数据库类型引用字符
func (l *dbLink) setQuoteChar(left string, right string) {
l.charl = left
l.charr = right
func (db *Db) setQuoteChar(left string, right string) {
db.charl = left
db.charr = right
}
// 设置挡脸操作的link接口
func (l *dbLink) setLink(link Link) {
l.link = link
func (db *Db) setLink(link Link) {
db.link = link
}

View File

@ -16,20 +16,20 @@ import (
)
// 关闭链接
func (l *dbLink) Close() error {
if l.master != nil {
err := l.master.Close()
if (err == nil) {
l.master = nil
func (db *Db) Close() error {
if db.master != nil {
err := db.master.Close()
if err == nil {
db.master = nil
} else {
glog.Fatal(err)
return err
}
}
if l.slave != nil {
err := l.slave.Close()
if (err == nil) {
l.slave = nil
if db.slave != nil {
err := db.slave.Close()
if err == nil {
db.slave = nil
} else {
glog.Fatal(err)
return err
@ -39,44 +39,44 @@ func (l *dbLink) Close() error {
}
// 数据库sql查询操作主要执行查询
func (l *dbLink) Query(q string, args ...interface{}) (*sql.Rows, error) {
p := l.link.handleSqlBeforeExec(&q)
rows, err := l.slave.Query(*p, args ...)
err = l.formatError(err, p, args...)
if (err == nil) {
func (db *Db) Query(q string, args ...interface{}) (*sql.Rows, error) {
p := db.link.handleSqlBeforeExec(&q)
rows, err := db.slave.Query(*p, args ...)
err = db.formatError(err, p, args...)
if err == nil {
return rows, nil
}
return nil, err
}
// 执行一条sql并返回执行情况主要用于非查询操作
func (l *dbLink) Exec(q string, args ...interface{}) (sql.Result, error) {
func (db *Db) Exec(q string, args ...interface{}) (sql.Result, error) {
//fmt.Println(q)
//fmt.Println(args)
p := l.link.handleSqlBeforeExec(&q)
r, err := l.master.Exec(*p, args ...)
err = l.formatError(err, p, args...)
p := db.link.handleSqlBeforeExec(&q)
r, err := db.master.Exec(*p, args ...)
err = db.formatError(err, p, args...)
return r, err
}
// 格式化错误信息
func (l *dbLink) formatError(err error, q *string, args ...interface{}) error {
func (db *Db) formatError(err error, q *string, args ...interface{}) error {
if err != nil {
errstr := fmt.Sprintf("DB ERROR: %s\n", err.Error())
errstr += fmt.Sprintf("DB QUERY: %s\n", *q)
if len(args) > 0 {
errstr += fmt.Sprintf("DB PARAM: %v\n", args)
}
err = errors.New(errstr)
err = errors.New(errstr)
}
return err
}
// 数据库查询,获取查询结果集,以列表结构返回
func (l *dbLink) GetAll(q string, args ...interface{}) (List, error) {
func (db *Db) GetAll(q string, args ...interface{}) (List, error) {
// 执行sql
rows, err := l.Query(q, args ...)
rows, err := db.Query(q, args ...)
if err != nil || rows == nil {
return nil, err
}
@ -107,8 +107,8 @@ func (l *dbLink) GetAll(q string, args ...interface{}) (List, error) {
}
// 数据库查询,获取查询结果集,以关联数组结构返回
func (l *dbLink) GetOne(q string, args ...interface{}) (Map, error) {
list, err := l.GetAll(q, args ...)
func (db *Db) GetOne(q string, args ...interface{}) (Map, error) {
list, err := db.GetAll(q, args ...)
if err != nil {
return nil, err
}
@ -116,8 +116,8 @@ func (l *dbLink) GetOne(q string, args ...interface{}) (Map, error) {
}
// 数据库查询,获取查询字段值
func (l *dbLink) GetValue(q string, args ...interface{}) (interface{}, error) {
one, err := l.GetOne(q, args ...)
func (db *Db) GetValue(q string, args ...interface{}) (interface{}, error) {
one, err := db.GetOne(q, args ...)
if err != nil {
return "", err
}
@ -129,61 +129,39 @@ func (l *dbLink) GetValue(q string, args ...interface{}) (interface{}, error) {
// sql预处理执行完成后调用返回值sql.Stmt.Exec完成sql操作
// 记得调用sql.Stmt.Close关闭操作对象
func (l *dbLink) Prepare(q string) (*sql.Stmt, error) {
return l.master.Prepare(q)
func (db *Db) Prepare(q string) (*sql.Stmt, error) {
return db.master.Prepare(q)
}
// ping一下判断或保持数据库链接(master)
func (l *dbLink) PingMaster() error {
err := l.master.Ping();
func (db *Db) PingMaster() error {
err := db.master.Ping();
return err
}
// ping一下判断或保持数据库链接(slave)
func (l *dbLink) PingSlave() error {
err := l.slave.Ping();
func (db *Db) PingSlave() error {
err := db.slave.Ping();
return err
}
// 设置数据库连接池中空闲链接的大小
func (l *dbLink) SetMaxIdleConns(n int) {
l.master.SetMaxIdleConns(n);
func (db *Db) SetMaxIdleConns(n int) {
db.master.SetMaxIdleConns(n);
}
// 设置数据库连接池最大打开的链接数量
func (l *dbLink) SetMaxOpenConns(n int) {
l.master.SetMaxOpenConns(n);
func (db *Db) SetMaxOpenConns(n int) {
db.master.SetMaxOpenConns(n);
}
// 事务操作,开启,会返回一个底层的事务操作对象链接如需要嵌套事务,那么可以使用该对象,否则请忽略
func (l *dbLink) Begin() (*sql.Tx, error) {
tx, err := l.master.Begin()
if err == nil {
l.transaction = tx
}
return tx, err
}
// 事务操作,提交
func (l *dbLink) Commit() error {
if l.transaction == nil {
return errors.New("transaction not start")
}
err := l.transaction.Commit()
return err
}
// 事务操作,回滚
func (l *dbLink) Rollback() error {
if l.transaction == nil {
return errors.New("transaction not start")
}
err := l.transaction.Rollback()
return err
func (db *Db) Begin() (*sql.Tx, error) {
return db.master.Begin()
}
// 根据insert选项获得操作名称
func (l *dbLink) getInsertOperationByOption(option uint8) string {
func (db *Db) getInsertOperationByOption(option uint8) string {
oper := "INSERT"
switch option {
case OPTION_INSERT:
@ -201,47 +179,47 @@ func (l *dbLink) getInsertOperationByOption(option uint8) string {
// 1: replace: 如果数据存在(主键或者唯一索引),那么删除后重新写入一条
// 2: save: 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据
// 3: ignore: 如果数据存在(主键或者唯一索引),那么什么也不做
func (l *dbLink) insert(table string, data Map, option uint8) (sql.Result, error) {
func (db *Db) insert(table string, data Map, option uint8) (sql.Result, error) {
var keys []string
var values []string
var params []interface{}
for k, v := range data {
keys = append(keys, l.charl + k + l.charr)
keys = append(keys, db.charl + k + db.charr)
values = append(values, "?")
params = append(params, v)
}
operation := l.getInsertOperationByOption(option)
operation := db.getInsertOperationByOption(option)
updatestr := ""
if option == OPTION_SAVE {
var updates []string
for k, _ := range data {
updates = append(updates, fmt.Sprintf("%s%s%s=VALUES(%s)", l.charl, k, l.charr, k))
updates = append(updates, fmt.Sprintf("%s%s%s=VALUES(%s)", db.charl, k, db.charr, k))
}
updatestr = fmt.Sprintf(" ON DUPLICATE KEY UPDATE %s", strings.Join(updates, ","))
}
return l.Exec(
return db.Exec(
fmt.Sprintf("%s INTO %s%s%s(%s) VALUES(%s) %s",
operation, l.charl, table, l.charr, strings.Join(keys, ","), strings.Join(values, ","), updatestr), params...
operation, db.charl, table, db.charr, strings.Join(keys, ","), strings.Join(values, ","), updatestr), params...
)
}
// CURD操作:单条数据写入, 仅仅执行写入操作,如果存在冲突的主键或者唯一索引,那么报错返回
func (l *dbLink) Insert(table string, data Map) (sql.Result, error) {
return l.link.insert(table, data, OPTION_INSERT)
func (db *Db) Insert(table string, data Map) (sql.Result, error) {
return db.link.insert(table, data, OPTION_INSERT)
}
// CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条
func (l *dbLink) Replace(table string, data Map) (sql.Result, error) {
return l.link.insert(table, data, OPTION_REPLACE)
func (db *Db) Replace(table string, data Map) (sql.Result, error) {
return db.link.insert(table, data, OPTION_REPLACE)
}
// CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据
func (l *dbLink) Save(table string, data Map) (sql.Result, error) {
return l.link.insert(table, data, OPTION_SAVE)
func (db *Db) Save(table string, data Map) (sql.Result, error) {
return db.link.insert(table, data, OPTION_SAVE)
}
// 批量写入数据
func (l *dbLink) batchInsert(table string, list List, batch int, option uint8) (sql.Result, error) {
func (db *Db) batchInsert(table string, list List, batch int, option uint8) (sql.Result, error) {
var keys []string
var values []string
var bvalues []string
@ -257,14 +235,14 @@ func (l *dbLink) batchInsert(table string, list List, batch int, option uint8) (
keys = append(keys, k)
values = append(values, "?")
}
var kstr = l.charl + strings.Join(keys, l.charl + "," + l.charr) + l.charr
var kstr = db.charl + strings.Join(keys, db.charl + "," + db.charr) + db.charr
// 操作判断
operation := l.getInsertOperationByOption(option)
operation := db.getInsertOperationByOption(option)
updatestr := ""
if option == OPTION_SAVE {
var updates []string
for _, k := range keys {
updates = append(updates, fmt.Sprintf("%s=VALUES(%s)", l.charl, k, l.charr, k))
updates = append(updates, fmt.Sprintf("%s=VALUES(%s)", db.charl, k, db.charr, k))
}
updatestr = fmt.Sprintf(" ON DUPLICATE KEY UPDATE %s", strings.Join(updates, ","))
}
@ -275,7 +253,7 @@ func (l *dbLink) batchInsert(table string, list List, batch int, option uint8) (
}
bvalues = append(bvalues, "(" + strings.Join(values, ",") + ")")
if len(bvalues) == batch {
r, err := l.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, l.charl, table, l.charr, kstr, strings.Join(bvalues, ","), updatestr), params...)
r, err := db.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, db.charl, table, db.charr, kstr, strings.Join(bvalues, ","), updatestr), params...)
if err != nil {
return result, err
}
@ -285,7 +263,7 @@ func (l *dbLink) batchInsert(table string, list List, batch int, option uint8) (
}
// 处理最后不构成指定批量的数据
if len(bvalues) > 0 {
r, err := l.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, l.charl, table, l.charr, kstr, strings.Join(bvalues, ","), updatestr), params...)
r, err := db.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, db.charl, table, db.charr, kstr, strings.Join(bvalues, ","), updatestr), params...)
if err != nil {
return result, err
}
@ -295,23 +273,23 @@ func (l *dbLink) batchInsert(table string, list List, batch int, option uint8) (
}
// CURD操作:批量数据指定批次量写入
func (l *dbLink) BatchInsert(table string, list List, batch int) (sql.Result, error) {
return l.link.batchInsert(table, list, batch, OPTION_INSERT)
func (db *Db) BatchInsert(table string, list List, batch int) (sql.Result, error) {
return db.link.batchInsert(table, list, batch, OPTION_INSERT)
}
// CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条
func (l *dbLink) BatchReplace(table string, list List, batch int) (sql.Result, error) {
return l.link.batchInsert(table, list, batch, OPTION_REPLACE)
func (db *Db) BatchReplace(table string, list List, batch int) (sql.Result, error) {
return db.link.batchInsert(table, list, batch, OPTION_REPLACE)
}
// CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据
func (l *dbLink) BatchSave(table string, list List, batch int) (sql.Result, error) {
return l.link.batchInsert(table, list, batch, OPTION_SAVE)
func (db *Db) BatchSave(table string, list List, batch int) (sql.Result, error) {
return db.link.batchInsert(table, list, batch, OPTION_SAVE)
}
// CURD操作:数据更新统一采用sql预处理
// data参数支持字符串或者关联数组类型内部会自行做判断处理
func (l *dbLink) Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) {
func (db *Db) Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) {
var params []interface{}
var updates string
switch data.(type) {
@ -320,7 +298,7 @@ func (l *dbLink) Update(table string, data interface{}, condition interface{}, a
case Map:
var keys []string
for k, v := range data.(Map) {
keys = append(keys, fmt.Sprintf("%s%s%s=?", l.charl, k, l.charr))
keys = append(keys, fmt.Sprintf("%s%s%s=?", db.charl, k, db.charr))
params = append(params, v)
}
updates = strings.Join(keys, ",")
@ -337,11 +315,11 @@ func (l *dbLink) Update(table string, data interface{}, condition interface{}, a
}
}
return l.Exec(fmt.Sprintf("UPDATE %s%s%s SET %s WHERE %s", l.charl, table, l.charr, updates, condition), params...)
return db.Exec(fmt.Sprintf("UPDATE %s%s%s SET %s WHERE %s", db.charl, table, db.charr, updates, condition), params...)
}
// CURD操作:删除数据
func (l *dbLink) Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) {
return l.Exec(fmt.Sprintf("DELETE FROM %s WHERE %s", l.charl, table, l.charr, condition), args...)
func (db *Db) Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) {
return db.Exec(fmt.Sprintf("DELETE FROM %s WHERE %s", db.charl, table, db.charr, condition), args...)
}

View File

@ -14,7 +14,7 @@ import (
)
// 数据库链式操作对象
type gLinkOp struct {
type DbOp struct {
link Link // 数据库链接对象
tables string // 数据库操作表
fields string // 操作字段
@ -29,76 +29,76 @@ type gLinkOp struct {
}
// 链式操作,数据表字段,可支持多个表,以半角逗号连接
func (l *dbLink) Table(tables string) (*gLinkOp) {
return &gLinkOp {
link : l.link,
func (db *Db) Table(tables string) (*DbOp) {
return &DbOp {
link : db.link,
tables: tables,
}
}
// 链式操作,数据表字段,可支持多个表,以半角逗号连接
func (l *dbLink) From(tables string) (*gLinkOp) {
return l.Table(tables)
func (db *Db) From(tables string) (*DbOp) {
return db.Table(tables)
}
// 链式操作,左联表
func (op *gLinkOp) LeftJoin(joinTable string, on string) (*gLinkOp) {
func (op *DbOp) LeftJoin(joinTable string, on string) (*DbOp) {
op.tables += fmt.Sprintf(" LEFT JOIN %s ON (%s)", joinTable, on)
return op
}
// 链式操作,右联表
func (op *gLinkOp) RightJoin(joinTable string, on string) (*gLinkOp) {
func (op *DbOp) RightJoin(joinTable string, on string) (*DbOp) {
op.tables += fmt.Sprintf(" RIGHT JOIN %s ON (%s)", joinTable, on)
return op
}
// 链式操作,内联表
func (op *gLinkOp) InnerJoin(joinTable string, on string) (*gLinkOp) {
func (op *DbOp) InnerJoin(joinTable string, on string) (*DbOp) {
op.tables += fmt.Sprintf(" INNER JOIN %s ON (%s)", joinTable, on)
return op
}
// 链式操作,查询字段
func (op *gLinkOp) Fields(fields string) (*gLinkOp) {
func (op *DbOp) Fields(fields string) (*DbOp) {
op.fields = fields
return op
}
// 链式操作consition
func (op *gLinkOp) Where(where string, args...interface{}) (*gLinkOp) {
func (op *DbOp) Where(where string, args...interface{}) (*DbOp) {
op.where = where
op.whereArgs = args
return op
}
// 链式操作group by
func (op *gLinkOp) GroupBy(groupby string) (*gLinkOp) {
func (op *DbOp) GroupBy(groupby string) (*DbOp) {
op.groupby = groupby
return op
}
// 链式操作order by
func (op *gLinkOp) OrderBy(orderby string) (*gLinkOp) {
func (op *DbOp) OrderBy(orderby string) (*DbOp) {
op.orderby = orderby
return op
}
// 链式操作limit
func (op *gLinkOp) Limit(start int, limit int) (*gLinkOp) {
func (op *DbOp) Limit(start int, limit int) (*DbOp) {
op.start = start
op.limit = limit
return op
}
// 链式操作,操作数据记录项
func (op *gLinkOp) Data(data interface{}) (*gLinkOp) {
func (op *DbOp) Data(data interface{}) (*DbOp) {
op.data = data
return op
}
// 链式操作, CURD - Insert/BatchInsert
func (op *gLinkOp) Insert() (sql.Result, error) {
func (op *DbOp) Insert() (sql.Result, error) {
// 批量操作
if list, ok := op.data.(List); ok {
batch := 10
@ -118,7 +118,7 @@ func (op *gLinkOp) Insert() (sql.Result, error) {
}
// 链式操作, CURD - Replace/BatchReplace
func (op *gLinkOp) Replace() (sql.Result, error) {
func (op *DbOp) Replace() (sql.Result, error) {
// 批量操作
if list, ok := op.data.(List); ok {
batch := 10
@ -138,7 +138,7 @@ func (op *gLinkOp) Replace() (sql.Result, error) {
}
// 链式操作, CURD - Save/BatchSave
func (op *gLinkOp) Save() (sql.Result, error) {
func (op *DbOp) Save() (sql.Result, error) {
// 批量操作
if list, ok := op.data.(List); ok {
batch := 10
@ -158,7 +158,7 @@ func (op *gLinkOp) Save() (sql.Result, error) {
}
// 链式操作, CURD - Update
func (op *gLinkOp) Update() (sql.Result, error) {
func (op *DbOp) Update() (sql.Result, error) {
if op.data == nil {
return nil, errors.New("updating table with empty data")
}
@ -166,7 +166,7 @@ func (op *gLinkOp) Update() (sql.Result, error) {
}
// 链式操作, CURD - Delete
func (op *gLinkOp) Delete() (sql.Result, error) {
func (op *DbOp) Delete() (sql.Result, error) {
if op.where == "" {
return nil, errors.New("where is required while deleting")
}
@ -174,13 +174,13 @@ func (op *gLinkOp) Delete() (sql.Result, error) {
}
// 设置批处理的大小
func (op *gLinkOp) Batch(batch int) *gLinkOp {
func (op *DbOp) Batch(batch int) *DbOp {
op.batch = batch
return op
}
// 链式操作select
func (op *gLinkOp) Select() (List, error) {
func (op *DbOp) Select() (List, error) {
if op.fields == "" {
op.fields = "*"
}
@ -201,12 +201,12 @@ func (op *gLinkOp) Select() (List, error) {
}
// 链式操作,查询所有记录
func (op *gLinkOp) All() (List, error) {
func (op *DbOp) All() (List, error) {
return op.Select()
}
// 链式操作,查询单条记录
func (op *gLinkOp) One() (Map, error) {
func (op *DbOp) One() (Map, error) {
list, err := op.All()
if err != nil {
return nil, err
@ -215,7 +215,7 @@ func (op *gLinkOp) One() (Map, error) {
}
// 链式操作,查询字段值
func (op *gLinkOp) Value() (interface{}, error) {
func (op *DbOp) Value() (interface{}, error) {
one, err := op.One()
if err != nil {
return "", err

View File

@ -8,42 +8,41 @@
package gdb
import (
"database/sql"
"fmt"
"gitee.com/johng/gf/g/os/glog"
"database/sql"
)
// 数据库链接对象
type mysqlLink struct {
dbLink
type dbmysql struct {
Db
}
// 创建SQL操作对象内部采用了lazy link处理
func (l *mysqlLink) Open (c *ConfigNode) (*sql.DB, error) {
func (db *dbmysql) Open (c *ConfigNode) (*sql.DB, error) {
var dbsource string
if c.Linkinfo != "" {
dbsource = c.Linkinfo
} else {
dbsource = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", c.User, c.Pass, c.Host, c.Port, c.Name)
}
db, err := sql.Open("mysql", dbsource)
if err != nil {
glog.Fatal(err)
if db, err := sql.Open("mysql", dbsource); err == nil {
return db, nil
} else {
return nil, err
}
return db, err
}
// 获得关键字操作符 - 左
func (l *mysqlLink) getQuoteCharLeft () string {
func (db *dbmysql) getQuoteCharLeft () string {
return "`"
}
// 获得关键字操作符 - 右
func (l *mysqlLink) getQuoteCharRight () string {
func (db *dbmysql) getQuoteCharRight () string {
return "`"
}
// 在执行sql之前对sql进行进一步处理
func (l *mysqlLink) handleSqlBeforeExec(q *string) *string {
func (db *dbmysql) handleSqlBeforeExec(q *string) *string {
return q
}

View File

@ -8,47 +8,46 @@
package gdb
import (
"database/sql"
"fmt"
"regexp"
"gitee.com/johng/gf/g/os/glog"
"database/sql"
)
// postgresql的适配
// @todo 需要完善replace和save的操作覆盖
// 数据库链接对象
type pgsqlLink struct {
dbLink
type dbpgsql struct {
Db
}
// 创建SQL操作对象内部采用了lazy link处理
func (l *pgsqlLink) Open (c *ConfigNode) (*sql.DB, error) {
func (db *dbpgsql) Open (c *ConfigNode) (*sql.DB, error) {
var dbsource string
if c.Linkinfo != "" {
dbsource = c.Linkinfo
} else {
dbsource = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s", c.User, c.Pass, c.Host, c.Port, c.Name)
}
db, err := sql.Open("postgres", dbsource)
if err != nil {
glog.Fatal(err)
if db, err := sql.Open("postgres", dbsource); err == nil {
return db, nil
} else {
return nil, err
}
return db, err
}
// 获得关键字操作符 - 左
func (l *pgsqlLink) getQuoteCharLeft () string {
func (db *dbpgsql) getQuoteCharLeft () string {
return "\""
}
// 获得关键字操作符 - 右
func (l *pgsqlLink) getQuoteCharRight () string {
func (db *dbpgsql) getQuoteCharRight () string {
return "\""
}
// 在执行sql之前对sql进行进一步处理
func (l *pgsqlLink) handleSqlBeforeExec(q *string) *string {
func (db *dbpgsql) handleSqlBeforeExec(q *string) *string {
reg := regexp.MustCompile("\\?")
index := 0
str := reg.ReplaceAllStringFunc(*q, func (s string) string {

View File

@ -0,0 +1,276 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gdb
import (
"fmt"
"errors"
"database/sql"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gcache"
"gitee.com/johng/gf/g/util/grand"
_ "github.com/lib/pq"
_ "github.com/go-sql-driver/mysql"
"strings"
)
// 数据库事务对象
type Tx struct {
db Db
tx *sql.Tx
//Query(q string, args ...interface{}) (*sql.Rows, error)
//Exec(q string, args ...interface{}) (sql.Result, error)
//Prepare(q string) (*sql.Stmt, error)
//
//GetAll(q string, args ...interface{}) (List, error)
//GetOne(q string, args ...interface{}) (Map, error)
//GetValue(q string, args ...interface{}) (interface{}, error)
//
//
//Insert(table string, data Map) (sql.Result, error)
//Replace(table string, data Map) (sql.Result, error)
//Save(table string, data Map) (sql.Result, error)
//
//BatchInsert(table string, list List, batch int) (sql.Result, error)
//BatchReplace(table string, list List, batch int) (sql.Result, error)
//BatchSave(table string, list List, batch int) (sql.Result, error)
//
//Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error)
//Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error)
//
//Table(tables string) (*gLinkOp)
//From(tables string) (*gLinkOp)
}
// 数据库sql查询操作主要执行查询
func (tx *Tx) Query(q string, args ...interface{}) (*sql.Rows, error) {
p := tx.db.link.handleSqlBeforeExec(&q)
rows, err := tx.tx.Query(*p, args ...)
err = tx.db.formatError(err, p, args...)
if err == nil {
return rows, nil
}
return nil, err
}
// 执行一条sql并返回执行情况主要用于非查询操作
func (tx *Tx) Exec(q string, args ...interface{}) (sql.Result, error) {
p := tx.db.link.handleSqlBeforeExec(&q)
r, err := tx.tx.Exec(*p, args ...)
err = tx.db.formatError(err, p, args...)
return r, err
}
// 数据库查询,获取查询结果集,以列表结构返回
func (tx *Tx) GetAll(q string, args ...interface{}) (List, error) {
// 执行sql
rows, err := tx.Query(q, args ...)
if err != nil || rows == nil {
return nil, err
}
// 列名称列表
columns, err := rows.Columns()
if err != nil {
return nil, err
}
// 返回结构组装
values := make([]sql.RawBytes, len(columns))
scanArgs := make([]interface{}, len(values))
var list List
for i := range values {
scanArgs[i] = &values[i]
}
for rows.Next() {
err = rows.Scan(scanArgs...)
if err != nil {
return list, err
}
row := make(Map)
for i, col := range values {
row[columns[i]] = string(col)
}
list = append(list, row)
}
return list, nil
}
// 数据库查询,获取查询结果集,以关联数组结构返回
func (tx *Tx) GetOne(q string, args ...interface{}) (Map, error) {
list, err := tx.GetAll(q, args ...)
if err != nil {
return nil, err
}
return list[0], nil
}
// 数据库查询,获取查询字段值
func (tx *Tx) GetValue(q string, args ...interface{}) (interface{}, error) {
one, err := tx.GetOne(q, args ...)
if err != nil {
return "", err
}
for _, v := range one {
return v, nil
}
return "", nil
}
// sql预处理执行完成后调用返回值sql.Stmt.Exec完成sql操作
// 记得调用sql.Stmt.Close关闭操作对象
func (tx *Tx) Prepare(q string) (*sql.Stmt, error) {
return tx.Prepare(q)
}
// insert、replace, save ignore操作
// 0: insert: 仅仅执行写入操作,如果存在冲突的主键或者唯一索引,那么报错返回
// 1: replace: 如果数据存在(主键或者唯一索引),那么删除后重新写入一条
// 2: save: 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据
// 3: ignore: 如果数据存在(主键或者唯一索引),那么什么也不做
func (tx *Tx) insert(table string, data Map, option uint8) (sql.Result, error) {
var keys []string
var values []string
var params []interface{}
for k, v := range data {
keys = append(keys, tx.db.charl + k + tx.db.charr)
values = append(values, "?")
params = append(params, v)
}
operation := tx.db.getInsertOperationByOption(option)
updatestr := ""
if option == OPTION_SAVE {
var updates []string
for k, _ := range data {
updates = append(updates, fmt.Sprintf("%s%s%s=VALUES(%s)", db.charl, k, db.charr, k))
}
updatestr = fmt.Sprintf(" ON DUPLICATE KEY UPDATE %s", strings.Join(updates, ","))
}
return tx.Exec(
fmt.Sprintf("%s INTO %s%s%s(%s) VALUES(%s) %s",
operation, tx.db.charl, table, db.charr, strings.Join(keys, ","), strings.Join(values, ","), updatestr), params...
)
}
// CURD操作:单条数据写入, 仅仅执行写入操作,如果存在冲突的主键或者唯一索引,那么报错返回
func (tx *Tx) Insert(table string, data Map) (sql.Result, error) {
return db.link.insert(table, data, OPTION_INSERT)
}
// CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条
func (tx *Tx) Replace(table string, data Map) (sql.Result, error) {
return db.link.insert(table, data, OPTION_REPLACE)
}
// CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据
func (tx *Tx) Save(table string, data Map) (sql.Result, error) {
return db.link.insert(table, data, OPTION_SAVE)
}
// 批量写入数据
func (tx *Tx) batchInsert(table string, list List, batch int, option uint8) (sql.Result, error) {
var keys []string
var values []string
var bvalues []string
var params []interface{}
var result sql.Result
var size = len(list)
// 判断长度
if size < 1 {
return result, errors.New("empty data list")
}
// 首先获取字段名称及记录长度
for k, _ := range list[0] {
keys = append(keys, k)
values = append(values, "?")
}
var kstr = db.charl + strings.Join(keys, db.charl + "," + db.charr) + db.charr
// 操作判断
operation := db.getInsertOperationByOption(option)
updatestr := ""
if option == OPTION_SAVE {
var updates []string
for _, k := range keys {
updates = append(updates, fmt.Sprintf("%s=VALUES(%s)", db.charl, k, db.charr, k))
}
updatestr = fmt.Sprintf(" ON DUPLICATE KEY UPDATE %s", strings.Join(updates, ","))
}
// 构造批量写入数据格式(注意map的遍历是无序的)
for i := 0; i < size; i++ {
for _, k := range keys {
params = append(params, list[i][k])
}
bvalues = append(bvalues, "(" + strings.Join(values, ",") + ")")
if len(bvalues) == batch {
r, err := db.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, db.charl, table, db.charr, kstr, strings.Join(bvalues, ","), updatestr), params...)
if err != nil {
return result, err
}
result = r
bvalues = bvalues[:0]
}
}
// 处理最后不构成指定批量的数据
if len(bvalues) > 0 {
r, err := db.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, db.charl, table, db.charr, kstr, strings.Join(bvalues, ","), updatestr), params...)
if err != nil {
return result, err
}
result = r
}
return result, nil
}
// CURD操作:批量数据指定批次量写入
func (tx *Tx) BatchInsert(table string, list List, batch int) (sql.Result, error) {
return db.link.batchInsert(table, list, batch, OPTION_INSERT)
}
// CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条
func (tx *Tx) BatchReplace(table string, list List, batch int) (sql.Result, error) {
return db.link.batchInsert(table, list, batch, OPTION_REPLACE)
}
// CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据
func (tx *Tx) BatchSave(table string, list List, batch int) (sql.Result, error) {
return db.link.batchInsert(table, list, batch, OPTION_SAVE)
}
// CURD操作:数据更新统一采用sql预处理
// data参数支持字符串或者关联数组类型内部会自行做判断处理
func (tx *Tx) Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) {
var params []interface{}
var updates string
switch data.(type) {
case string:
updates = data.(string)
case Map:
var keys []string
for k, v := range data.(Map) {
keys = append(keys, fmt.Sprintf("%s%s%s=?", db.charl, k, db.charr))
params = append(params, v)
}
updates = strings.Join(keys, ",")
default:
return nil, errors.New("invalid data type for 'data' field, string or Map expected")
}
for _, v := range args {
if r, ok := v.(string); ok {
params = append(params, r)
} else if r, ok := v.(int); ok {
params = append(params, string(r))
} else {
}
}
return db.Exec(fmt.Sprintf("UPDATE %s%s%s SET %s WHERE %s", db.charl, table, db.charr, updates, condition), params...)
}
// CURD操作:删除数据
func (tx *Tx) Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) {
return db.Exec(fmt.Sprintf("DELETE FROM %s WHERE %s", db.charl, table, db.charr, condition), args...)
}

View File

@ -376,23 +376,25 @@ func linkopBatchSave() {
// 事务操作示例1
func transaction1() {
fmt.Println("transaction1:")
db.Begin()
r, err := db.Save("user", gdb.Map{
"uid" : 1,
"name" : "john",
})
db.Rollback()
fmt.Println(r, err)
if tx, err := db.Begin(); err == nil {
r, err := db.Save("user", gdb.Map{
"uid" : 1,
"name" : "john",
})
tx.Rollback()
fmt.Println(r, err)
}
fmt.Println()
}
// 事务操作示例2
func transaction2() {
fmt.Println("transaction2:")
db.Begin()
r, err := db.Table("user").Data(gdb.Map{"uid":1, "name": "john_1"}).Save()
db.Commit()
fmt.Println(r, err)
if tx, err := db.Begin(); err == nil {
r, err := db.Table("user").Data(gdb.Map{"uid":1, "name": "john_1"}).Save()
tx.Commit()
fmt.Println(r, err)
}
fmt.Println()
}
@ -454,5 +456,5 @@ func main() {
//linkopUpdate3()
//keepPing()
transaction1()
transaction2()
//transaction2()
}