improve tracing feture for package glog/gdb/gredis

This commit is contained in:
jianchenma 2021-01-26 01:19:55 +08:00
parent 99dd889ff0
commit 4c6d9f5eff
17 changed files with 186 additions and 95 deletions

View File

@ -2,9 +2,8 @@
# MySQL.
[database]
[database.logger]
Level = "all"
Stdout = true
CtxKeys = ["Trace-Id"]
level = "all"
stdout = true
[database.default]
link = "mysql:root:12345678@tcp(127.0.0.1:3306)/test"
debug = true

View File

@ -8,9 +8,30 @@ import (
sdkTrace "go.opentelemetry.io/otel/sdk/trace"
)
type tracingApi struct{}
func (api *tracingApi) Insert(r *ghttp.Request) {
result, err := g.Table("user").Ctx(r.Context()).Insert(g.Map{
"name": r.GetString("name"),
})
if err != nil {
r.Response.WriteExit(gerror.Current(err))
}
id, _ := result.LastInsertId()
r.Response.Write("id:", id)
}
func (api *tracingApi) Query(r *ghttp.Request) {
one, err := g.Table("user").Ctx(r.Context()).FindOne(r.GetInt("id"))
if err != nil {
r.Response.WriteExit(gerror.Current(err))
}
r.Response.Write("user:", one)
}
const (
JaegerEndpoint = "http://localhost:14268/api/traces"
ServiceName = "TracingHttpServerWithDatabase"
ServiceName = "TracingHttpServerWithDB"
)
// initTracer creates a new trace provider instance and registers it as global trace provider.
@ -36,29 +57,8 @@ func main() {
s := g.Server()
s.Group("/", func(group *ghttp.RouterGroup) {
group.Middleware(ghttp.MiddlewareServerTracing)
group.ALL("/user", new(dbTracingApi))
group.ALL("/user", new(tracingApi))
})
s.SetPort(8199)
s.Run()
}
type dbTracingApi struct{}
func (api *dbTracingApi) Insert(r *ghttp.Request) {
result, err := g.Table("user").Ctx(r.Context()).Insert(g.Map{
"name": r.GetString("name"),
})
if err != nil {
r.Response.WriteExit(gerror.Current(err))
}
id, _ := result.LastInsertId()
r.Response.Write("id:", id)
}
func (api *dbTracingApi) Query(r *ghttp.Request) {
one, err := g.Table("user").Ctx(r.Context()).FindOne(r.GetInt("id"))
if err != nil {
r.Response.WriteExit(gerror.Current(err))
}
r.Response.Write("user:", one)
}

View File

@ -1,8 +1,8 @@
# Redis.
[redis]
default = "127.0.0.1:6379,0?tracing=1"
cache = "127.0.0.1:6379,1?tracing=1"
default = "127.0.0.1:6379,0"
cache = "127.0.0.1:6379,1"

View File

@ -8,11 +8,31 @@ import (
sdkTrace "go.opentelemetry.io/otel/sdk/trace"
)
type tracingApi struct{}
const (
JaegerEndpoint = "http://localhost:14268/api/traces"
ServiceName = "TracingHttpServerWithRedis"
)
func (api *tracingApi) Set(r *ghttp.Request) {
_, err := g.Redis().Ctx(r.Context()).Do("SET", r.GetString("key"), r.GetString("value"))
if err != nil {
r.Response.WriteExit(gerror.Current(err))
}
r.Response.Write("ok")
}
func (api *tracingApi) Get(r *ghttp.Request) {
value, err := g.Redis().Ctx(r.Context()).DoVar(
"GET", r.GetString("key"),
)
if err != nil {
r.Response.WriteExit(gerror.Current(err))
}
r.Response.Write(value.String())
}
// initTracer creates a new trace provider instance and registers it as global trace provider.
func initTracer() func() {
// Create and install Jaeger export pipeline.
@ -36,28 +56,8 @@ func main() {
s := g.Server()
s.Group("/", func(group *ghttp.RouterGroup) {
group.Middleware(ghttp.MiddlewareServerTracing)
group.ALL("/redis", new(redisTracingApi))
group.ALL("/redis", new(tracingApi))
})
s.SetPort(8199)
s.Run()
}
type redisTracingApi struct{}
func (api *redisTracingApi) Set(r *ghttp.Request) {
_, err := g.Redis().Ctx(r.Context()).Do("SET", r.GetString("key"), r.GetString("value"))
if err != nil {
r.Response.WriteExit(gerror.Current(err))
}
r.Response.Write("ok")
}
func (api *redisTracingApi) Get(r *ghttp.Request) {
value, err := g.Redis().Ctx(r.Context()).DoVar(
"GET", r.GetString("key"),
)
if err != nil {
r.Response.WriteExit(gerror.Current(err))
}
r.Response.Write(value.String())
}

View File

@ -0,0 +1,19 @@
# MySQL.
[database]
[database.logger]
level = "all"
stdout = true
[database.default]
link = "mysql:root:12345678@tcp(127.0.0.1:3306)/test"
debug = true
# Redis.
[redis]
default = "127.0.0.1:6379,0"
cache = "127.0.0.1:6379,1"

View File

@ -0,0 +1,71 @@
package main
import (
"github.com/gogf/gcache-adapter/adapter"
"github.com/gogf/gf/errors/gerror"
"github.com/gogf/gf/frame/g"
"github.com/gogf/gf/net/ghttp"
"go.opentelemetry.io/otel/exporters/trace/jaeger"
sdkTrace "go.opentelemetry.io/otel/sdk/trace"
"time"
)
type tracingApi struct{}
const (
JaegerEndpoint = "http://localhost:14268/api/traces"
ServiceName = "TracingHttpServerWithDBRedisLog"
)
// initTracer creates a new trace provider instance and registers it as global trace provider.
func initTracer() func() {
// Create and install Jaeger export pipeline.
flush, err := jaeger.InstallNewPipeline(
jaeger.WithCollectorEndpoint(JaegerEndpoint),
jaeger.WithProcess(jaeger.Process{
ServiceName: ServiceName,
}),
jaeger.WithSDK(&sdkTrace.Config{DefaultSampler: sdkTrace.AlwaysSample()}),
)
if err != nil {
g.Log().Fatal(err)
}
return flush
}
func (api *tracingApi) Insert(r *ghttp.Request) {
result, err := g.Table("user").Ctx(r.Context()).Insert(g.Map{
"name": r.GetString("name"),
})
if err != nil {
r.Response.WriteExit(gerror.Current(err))
}
id, _ := result.LastInsertId()
r.Response.Write("id:", id)
}
func (api *tracingApi) Query(r *ghttp.Request) {
one, err := g.Table("user").
Ctx(r.Context()).
Cache(5 * time.Second).
FindOne(r.GetInt("id"))
if err != nil {
r.Response.WriteExit(gerror.Current(err))
}
r.Response.Write("user:", one)
}
func main() {
flush := initTracer()
defer flush()
g.DB().GetCache().SetAdapter(adapter.NewRedis(g.Redis()))
s := g.Server()
s.Group("/", func(group *ghttp.RouterGroup) {
group.Middleware(ghttp.MiddlewareServerTracing)
group.ALL("/user", new(tracingApi))
})
s.SetPort(8199)
s.Run()
}

View File

@ -121,7 +121,11 @@ func (c *Core) DoQuery(link Link, sql string, args ...interface{}) (rows *sql.Ro
}
func (c *Core) addSqlToTracing(ctx context.Context, sql *Sql) {
if !c.DB.GetConfig().Tracing {
if ctx == nil {
return
}
spanCtx := trace.SpanContextFromContext(ctx)
if traceId := spanCtx.TraceID; !traceId.IsValid() {
return
}

View File

@ -51,7 +51,6 @@ type ConfigNode struct {
UpdatedAt string `json:"updatedAt"` // (Optional) The filed name of table for automatic-filled updated datetime.
DeletedAt string `json:"deletedAt"` // (Optional) The filed name of table for automatic-filled updated datetime.
TimeMaintainDisabled bool `json:"timeMaintainDisabled"` // (Optional) Disable the automatic time maintaining feature.
Tracing bool `json:"tracing"` // (Optional) Tracing enable the tracing feature for database.
}
// configs is internal used configuration object.
@ -205,7 +204,7 @@ func (c *Core) SetDryRun(enabled bool) {
// GetDryRun returns the DryRun value.
// Deprecated, use GetConfig instead.
func (c *Core) GetDryRun() bool {
return c.config.DryRun
return c.config.DryRun || allDryRun
}
// GetPrefix returns the table prefix string configured.

View File

@ -402,7 +402,7 @@ func (m *Model) FindValue(fieldsAndWhere ...interface{}) (Value, error) {
}
// FindArray queries and returns data values as slice from database.
// Note that if there're multiple columns in the result, it returns just one column values randomly.
// Note that if there are multiple columns in the result, it returns just one column values randomly.
// Also see Model.WherePri and Model.Value.
func (m *Model) FindArray(fieldsAndWhere ...interface{}) ([]Value, error) {
if len(fieldsAndWhere) >= 2 {

View File

@ -63,7 +63,7 @@ func Test_DB_Exec(t *testing.T) {
func Test_DB_Prepare(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
st, err := db.Prepare("SELECT 100")
st, err := db.Prepare("SELECT 1")
t.Assert(err, nil)
rows, err := st.Query()
@ -71,10 +71,10 @@ func Test_DB_Prepare(t *testing.T) {
array, err := rows.Columns()
t.Assert(err, nil)
t.Assert(array[0], "100")
t.Assert(array[0], "1")
err = rows.Close()
t.Assert(err, nil)
//err = rows.Close()
//t.Assert(err, nil)
})
}

View File

@ -368,7 +368,7 @@ CREATE TABLE %s (
t.Assert(oneUpdate["name"].String(), "name_1000")
t.Assert(oneUpdate["deleted_at"].String(), "")
t.Assert(oneUpdate["created_at"].GTime().Timestamp(), oneInsert["created_at"].GTime().Timestamp())
t.AssertGE(oneUpdate["updated_at"].GTime().Timestamp(), gtime.Timestamp()-2)
t.AssertGE(oneUpdate["updated_at"].GTime().Timestamp(), gtime.Timestamp()-4)
// Replace
dataReplace := User{

View File

@ -51,7 +51,6 @@ type Config struct {
ConnectTimeout time.Duration `json:"connectTimeout"` // Dial connection timeout.
TLS bool `json:"tls"` // Specifies the config to use when a TLS connection is dialed.
TLSSkipVerify bool `json:"tlsSkipVerify"` // Disables server name verification when connecting over TLS.
Tracing bool `json:"tracing"` // Tracing enable the tracing feature for redis.
}
// Pool statistics.

View File

@ -7,10 +7,8 @@
package gredis
import (
"github.com/gogf/gf/internal/intlog"
"time"
"github.com/gogf/gf/errors/gerror"
"github.com/gogf/gf/internal/intlog"
"github.com/gogf/gf/container/gmap"
"github.com/gogf/gf/text/gregex"
@ -98,26 +96,8 @@ func ConfigFromStr(str string) (config *Config, err error) {
if config.Port == 0 {
config.Port = DefaultRedisPort
}
if v, ok := parse["maxIdle"]; ok {
config.MaxIdle = gconv.Int(v)
}
if v, ok := parse["maxActive"]; ok {
config.MaxActive = gconv.Int(v)
}
if v, ok := parse["idleTimeout"]; ok {
config.IdleTimeout = gconv.Duration(v) * time.Second
}
if v, ok := parse["maxConnLifetime"]; ok {
config.MaxConnLifetime = gconv.Duration(v) * time.Second
}
if v, ok := parse["tls"]; ok {
config.TLS = gconv.Bool(v)
}
if v, ok := parse["skipVerify"]; ok {
config.TLSSkipVerify = gconv.Bool(v)
}
if v, ok := parse["tracing"]; ok {
config.Tracing = gconv.Bool(v)
if err = gconv.Struct(parse, config); err != nil {
return nil, err
}
return
}

View File

@ -63,15 +63,24 @@ func (c *Conn) do(timeout time.Duration, commandName string, args ...interface{}
timestampMilli1 := gtime.TimestampMilli()
reply, err = c.Conn.Do(commandName, args...)
timestampMilli2 := gtime.TimestampMilli()
// Tracing.
if !c.redis.config.Tracing {
if c.ctx == nil {
return
}
spanCtx := trace.SpanContextFromContext(c.ctx)
if traceId := spanCtx.TraceID; !traceId.IsValid() {
return
}
tr := otel.GetTracerProvider().Tracer(
"github.com/gogf/gf/database/gredis",
trace.WithInstrumentationVersion(fmt.Sprintf(`%s`, gf.VERSION)),
)
_, span := tr.Start(c.ctx, commandName)
ctx := c.ctx
if ctx == nil {
ctx = context.Background()
}
_, span := tr.Start(ctx, commandName)
defer span.End()
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf(`%+v`, err))

1
go.mod
View File

@ -7,6 +7,7 @@ require (
github.com/clbanning/mxj v1.8.5-0.20200714211355-ff02cfb8ea28
github.com/fsnotify/fsnotify v1.4.9
github.com/go-sql-driver/mysql v1.5.0
github.com/gogf/gcache-adapter v0.0.3
github.com/gomodule/redigo v2.0.0+incompatible
github.com/gorilla/websocket v1.4.1
github.com/grokify/html-strip-tags-go v0.0.0-20190921062105-daaa06bf1aaf

View File

@ -15,6 +15,7 @@ import (
"github.com/gogf/gf/os/gfpool"
"github.com/gogf/gf/os/gmlock"
"github.com/gogf/gf/os/gtimer"
"go.opentelemetry.io/otel/trace"
"io"
"os"
"strings"
@ -161,21 +162,30 @@ func (l *Logger) print(std io.Writer, lead string, values ...interface{}) {
tempStr = ""
valueStr = ""
)
// Context values.
if l.ctx != nil && len(l.config.CtxKeys) > 0 {
ctxStr := ""
for _, key := range l.config.CtxKeys {
if v := l.ctx.Value(key); v != nil {
if ctxStr != "" {
ctxStr += ", "
if l.ctx != nil {
// Tracing values.
spanCtx := trace.SpanContextFromContext(l.ctx)
if traceId := spanCtx.TraceID; traceId.IsValid() {
buffer.WriteString(fmt.Sprintf("{TraceID:%s} ", traceId.String()))
}
// Context values.
if len(l.config.CtxKeys) > 0 {
ctxStr := ""
for _, key := range l.config.CtxKeys {
if v := l.ctx.Value(key); v != nil {
if ctxStr != "" {
ctxStr += ", "
}
ctxStr += fmt.Sprintf("%s: %+v", key, v)
}
ctxStr += fmt.Sprintf("%s: %+v", key, v)
}
if ctxStr != "" {
buffer.WriteString(fmt.Sprintf("{%s} ", ctxStr))
}
}
if ctxStr != "" {
buffer.WriteString(fmt.Sprintf("{%s} ", ctxStr))
}
}
for _, v := range values {
tempStr = gconv.String(v)
if len(valueStr) > 0 {