diff --git a/cmd/dataservice/main.go b/cmd/dataservice/main.go index ad1c81e61d..2332cde84a 100644 --- a/cmd/dataservice/main.go +++ b/cmd/dataservice/main.go @@ -2,18 +2,23 @@ package main import ( "context" - "log" "os" "os/signal" "syscall" + "github.com/zilliztech/milvus-distributed/internal/dataservice" + "github.com/zilliztech/milvus-distributed/cmd/distributed/components" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" ) func main() { ctx, cancel := context.WithCancel(context.Background()) + dataservice.Params.Init() + log.SetupLogger(&dataservice.Params.Log) + defer log.Sync() msFactory := pulsarms.NewFactory() svr, err := components.NewDataService(ctx, msFactory) @@ -35,5 +40,5 @@ func main() { if err := svr.Stop(); err != nil { panic(err) } - log.Println("shut down data service") + log.Debug("shut down data service") } diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 0b9c081b8c..21cad65361 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -77,3 +77,13 @@ dataService: dataNode: address: localhost port: 21124 + +log: + level: debug + file: + rootPath: /tmp/milvus + maxSize: 300 # MB + maxAge: 10 # day + maxBackups: 20 + dev: true + format: text # text/json \ No newline at end of file diff --git a/go.mod b/go.mod index 0f160674e9..f1cff2081b 100644 --- a/go.mod +++ b/go.mod @@ -4,44 +4,45 @@ go 1.15 require ( code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48 // indirect + github.com/99designs/keyring v1.1.5 // indirect github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect - github.com/akamensky/argparse v1.2.2 - github.com/apache/pulsar-client-go v0.1.1 + github.com/akamensky/argparse v1.2.2 // indirect + github.com/apache/pulsar-client-go v0.3.0 github.com/apache/thrift v0.13.0 github.com/aws/aws-sdk-go v1.30.8 // indirect github.com/coreos/etcd v3.3.25+incompatible // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect + github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 // indirect github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect github.com/frankban/quicktest v1.10.2 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-basic/ipv4 v1.0.0 - github.com/go-kit/kit v0.9.0 - github.com/gogo/protobuf v1.2.1 + github.com/go-kit/kit v0.9.0 // indirect + github.com/gogo/protobuf v1.3.1 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/mock v1.3.1 - github.com/golang/protobuf v1.3.2 + github.com/golang/protobuf v1.4.2 github.com/google/btree v1.0.0 - github.com/ilyakaznacheev/cleanenv v1.2.5 + github.com/ilyakaznacheev/cleanenv v1.2.5 // indirect github.com/klauspost/compress v1.10.11 // indirect github.com/kr/text v0.2.0 // indirect + github.com/linkedin/goavro/v2 v2.9.8 // indirect github.com/minio/minio-go/v7 v7.0.5 github.com/mitchellh/mapstructure v1.1.2 - github.com/modern-go/reflect2 v1.0.1 + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect - github.com/oklog/run v1.1.0 - github.com/onsi/ginkgo v1.12.1 // indirect - github.com/onsi/gomega v1.10.0 // indirect - github.com/opentracing/opentracing-go v1.2.0 + github.com/oklog/run v1.1.0 // indirect + github.com/onsi/gomega v1.10.5 // indirect + github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pierrec/lz4 v2.5.2+incompatible // indirect github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 // indirect github.com/pingcap/errors v0.11.4 // indirect github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463 // indirect github.com/pivotal-golang/bytefmt v0.0.0-20200131002437-cf55d5288a48 // indirect - github.com/pkg/errors v0.9.1 - github.com/prometheus/client_golang v1.5.1 // indirect - github.com/prometheus/common v0.10.0 + github.com/pkg/errors v0.9.1 // indirect + github.com/prometheus/common v0.10.0 // indirect github.com/prometheus/procfs v0.1.3 // indirect github.com/quasilyte/go-ruleguard v0.2.1 // indirect github.com/sirupsen/logrus v1.6.0 // indirect @@ -50,24 +51,26 @@ require ( github.com/spf13/viper v1.7.1 github.com/stretchr/testify v1.6.1 github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c - github.com/uber/jaeger-client-go v2.25.0+incompatible + github.com/uber/jaeger-client-go v2.25.0+incompatible // indirect github.com/uber/jaeger-lib v2.4.0+incompatible // indirect github.com/urfave/cli v1.22.5 // indirect + github.com/valyala/gozstd v1.7.0 // indirect github.com/yahoo/athenz v1.9.16 // indirect go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 go.uber.org/zap v1.15.0 golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect - golang.org/x/net v0.0.0-20200904194848-62affa334b73 - golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f // indirect - golang.org/x/text v0.3.3 + golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb + golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect + golang.org/x/text v0.3.3 // indirect golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect golang.org/x/tools v0.0.0-20200825202427-b303f430e36d // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150 // indirect google.golang.org/grpc v1.31.0 - gopkg.in/alecthomas/kingpin.v2 v2.2.6 + gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v2 v2.3.0 // indirect honnef.co/go/tools v0.0.1-2020.1.4 // indirect sigs.k8s.io/yaml v1.2.0 // indirect diff --git a/go.sum b/go.sum index 0ec8b96ed4..f22abee87c 100644 --- a/go.sum +++ b/go.sum @@ -12,9 +12,12 @@ cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiy code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48 h1:/EMHruHCFXR9xClkGV/t0rmHrdhX4+trQUcBqjwc9xE= code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/99designs/keyring v1.1.5 h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4= +github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/HdrHistogram/hdrhistogram-go v1.0.1 h1:GX8GAYDuhlFQnI2fRDHQhTlkHMz8bEn0jTI6LJU0mpw= github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -28,6 +31,13 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1C github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/apache/pulsar-client-go v0.1.1 h1:v/kU+2ZCC6yFIcbZrFtWa9/nvVzVr18L+xYJUvZSxEQ= github.com/apache/pulsar-client-go v0.1.1/go.mod h1:mlxC65KL1BLhGO2bnT9zWMttVzR2czVPb27D477YpyU= +github.com/apache/pulsar-client-go v0.2.0-candidate-1/go.mod h1:POSPPmXv1RuoM7FzHaS3NurCSOopwin2ekGK2PcOgVM= +github.com/apache/pulsar-client-go v0.2.0/go.mod h1:POSPPmXv1RuoM7FzHaS3NurCSOopwin2ekGK2PcOgVM= +github.com/apache/pulsar-client-go v0.3.0-candidate-1/go.mod h1:9eSgOadVhCfb2DfWtS1SCYaYIMk9VDOZztr4u3FO8cQ= +github.com/apache/pulsar-client-go v0.3.0 h1:rNhJ/ENwoEfZPHHwUHNxPBTNqNQE2LQEm7DXu043giM= +github.com/apache/pulsar-client-go v0.3.0/go.mod h1:9eSgOadVhCfb2DfWtS1SCYaYIMk9VDOZztr4u3FO8cQ= +github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb h1:E1P0FudxDdj2RhbveZC9i3PwukLCA/4XQSkBS/dw6/I= +github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb/go.mod h1:0UtvvETGDdvXNDCHa8ZQpxl+w3HbdFtfYZvDHLgWGTY= github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4= @@ -74,6 +84,10 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 h1:iwZdTE0PVqJCos1vaoKsclOGD3ADKpshg3SRtYBbwso= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= +github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU= +github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U= +github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 h1:QWqadCIHYA5zja4b6h9uGQn93u1vL+G/aewImumdg/M= +github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod h1:inRp+etsHuvVqMPNTXaFlpf/Tj7wqviBtdJoPVrPEFQ= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -86,6 +100,8 @@ github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 h1:qk/FSDDxo05w github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU= +github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 h1:clC1lXBpe2kTj2VHdaIu9ajZQe4kcEY9j0NsnDDBZ3o= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -115,10 +131,14 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= +github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= +github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -134,6 +154,13 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -176,6 +203,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= +github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= +github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -219,7 +248,10 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d h1:Z+RDyXzjKE0i2sTjZ/b1uxiGtPhFy34Ou/Tk0qwN0kM= +github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.10.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.10.11 h1:K9z59aO18Aywg2b/WSgBaUX99mHy2BES18Cr5lBKZHk= @@ -240,6 +272,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg= +github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= @@ -273,6 +307,8 @@ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/montanaflynn/stats v0.5.0 h1:2EkzeTSqBB4V4bJwWrt5gIIrZmpJBcoIRGS2kWLgzmk= github.com/montanaflynn/stats v0.5.0/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= +github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= @@ -286,10 +322,19 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1 h1:mFwc4LvZ0xpSvDZ3E+k8Yte0hLOMxXUlP+yXtJqkYfQ= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.12.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.12.3/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.13.0/go.mod h1:+REjRxOmWfHCjfv9TTWB1jD1Frx4XydAD3zm1lskyM0= +github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA= +github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.0 h1:Gwkk+PTu/nfOwNMtUB/mRUv0X7ewW5dO4AERT1ThVKo= github.com/onsi/gomega v1.10.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= +github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= +github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= @@ -335,6 +380,10 @@ github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.5.1 h1:bdHYieyGlH+6OLEk2YQha8THib30KP0/yD0YH9m6xcA= github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= +github.com/prometheus/client_golang v1.6.0/go.mod h1:ZLOG9ck3JLRdB5MgO8f+lLTe83AXG6ro35rLTxvnIl4= +github.com/prometheus/client_golang v1.7.0/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= +github.com/prometheus/client_golang v1.7.1 h1:NTGy1Ja9pByO+xAeH/qiWnLrKtr3hJPNjaVUwnjpdpA= +github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM= @@ -513,13 +562,18 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA= golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= +golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw= +golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -541,6 +595,7 @@ golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -550,9 +605,13 @@ golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f h1:Fqb3ao1hUmOR3GkUOg/Y+BadLwykBIzs5q8Ez2SbHyc= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -565,6 +624,7 @@ golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqG golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= @@ -602,6 +662,7 @@ google.golang.org/api v0.13.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsb google.golang.org/api v0.14.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I= google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= @@ -617,6 +678,16 @@ google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150 h1:VPpdpQkGvFicX9y google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.0/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.20.1/go.mod h1:KqelGeouBkcbcuB3HCk4/YH2tmNLk6YSWA5LIWeI/lY= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index 84a29d1932..a1a7c10551 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -1,9 +1,12 @@ package dataservice import ( + "path" "strconv" "sync" + "github.com/zilliztech/milvus-distributed/internal/log" + "github.com/zilliztech/milvus-distributed/internal/util/paramtable" ) @@ -35,6 +38,7 @@ type ParamTable struct { K2SChannelNames []string SegmentFlushMetaPath string + Log log.Config } var Params ParamTable @@ -72,6 +76,7 @@ func (p *ParamTable) Init() { p.initDataServiceSubscriptionName() p.initK2SChannelNames() p.initSegmentFlushMetaPath() + p.initLogCfg() }) } @@ -219,3 +224,34 @@ func (p *ParamTable) initSegmentFlushMetaPath() { } p.SegmentFlushMetaPath = subPath } + +func (p *ParamTable) initLogCfg() { + p.Log = log.Config{} + format, err := p.Load("log.format") + if err != nil { + panic(err) + } + p.Log.Format = format + level, err := p.Load("log.level") + if err != nil { + panic(err) + } + p.Log.Level = level + devStr, err := p.Load("log.dev") + if err != nil { + panic(err) + } + dev, err := strconv.ParseBool(devStr) + if err != nil { + panic(err) + } + p.Log.Development = dev + p.Log.File.MaxSize = p.ParseInt("log.file.maxSize") + p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups") + p.Log.File.MaxDays = p.ParseInt("log.file.maxAge") + rootPath, err := p.Load("log.file.rootPath") + if err != nil { + panic(err) + } + p.Log.File.Filename = path.Join(rootPath, "dataservice-"+strconv.FormatInt(p.NodeID, 10)+".log") +} diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index d6a3e60513..91cf889e33 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -4,13 +4,16 @@ import ( "context" "errors" "fmt" - "log" "path" "strconv" "sync" "sync/atomic" "time" + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/log" + "github.com/zilliztech/milvus-distributed/internal/distributed/datanode" "github.com/golang/protobuf/proto" @@ -99,7 +102,6 @@ type ( ) func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { - Params.Init() ch := make(chan struct{}) s := &Server{ ctx: ctx, @@ -158,7 +160,7 @@ func (s *Server) Start() error { } s.startServerLoop() s.state.Store(internalpb2.StateCode_HEALTHY) - log.Println("start success") + log.Debug("start success") return nil } @@ -210,7 +212,7 @@ func (s *Server) initMsgProducer() error { } func (s *Server) loadMetaFromMaster() error { - log.Println("loading collection meta from master") + log.Debug("loading collection meta from master") if err := s.checkMasterIsHealthy(); err != nil { return err } @@ -245,7 +247,7 @@ func (s *Server) loadMetaFromMaster() error { CollectionName: collectionName, }) if err != nil { - log.Println(err.Error()) + log.Error("describe collection error", zap.Error(err)) continue } partitions, err := s.masterClient.ShowPartitions(&milvuspb.ShowPartitionRequest{ @@ -260,7 +262,7 @@ func (s *Server) loadMetaFromMaster() error { CollectionID: collection.CollectionID, }) if err != nil { - log.Println(err.Error()) + log.Error("show partitions error", zap.Error(err)) continue } err = s.meta.AddCollection(&collectionInfo{ @@ -269,11 +271,11 @@ func (s *Server) loadMetaFromMaster() error { Partitions: partitions.PartitionIDs, }) if err != nil { - log.Println(err.Error()) + log.Error("add collection error", zap.Error(err)) continue } } - log.Println("load collection meta from master complete") + log.Debug("load collection meta from master complete") return nil } @@ -331,7 +333,7 @@ func (s *Server) startStatsChannel(ctx context.Context) { statistics := msg.(*msgstream.SegmentStatisticsMsg) for _, stat := range statistics.SegStats { if err := s.statsHandler.HandleSegmentStat(stat); err != nil { - log.Println(err.Error()) + log.Error("handle segment stat error", zap.Error(err)) continue } } @@ -348,7 +350,7 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) { for { select { case <-ctx.Done(): - log.Println("segment flush channel shut down") + log.Debug("segment flush channel shut down") return default: } @@ -361,13 +363,13 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) { segmentInfo, err := s.meta.GetSegment(realMsg.SegmentID) if err != nil { - log.Println(err.Error()) + log.Error("get segment error", zap.Error(err)) continue } segmentInfo.FlushedTime = realMsg.BeginTimestamp segmentInfo.State = commonpb.SegmentState_SegmentFlushed if err = s.meta.UpdateSegment(segmentInfo); err != nil { - log.Println(err.Error()) + log.Error("update segment error", zap.Error(err)) continue } } @@ -383,14 +385,14 @@ func (s *Server) startDDChannel(ctx context.Context) { for { select { case <-ctx.Done(): - log.Println("dd channel shut down") + log.Debug("dd channel shut down") return default: } msgPack := ddStream.Consume() for _, msg := range msgPack.Msgs { if err := s.ddHandler.HandleDDMsg(msg); err != nil { - log.Println(err.Error()) + log.Error("handle dd msg error", zap.Error(err)) continue } } @@ -398,9 +400,9 @@ func (s *Server) startDDChannel(ctx context.Context) { } func (s *Server) waitDataNodeRegister() { - log.Println("waiting data node to register") + log.Debug("waiting data node to register") <-s.registerFinishCh - log.Println("all data nodes register") + log.Debug("all data nodes register") } func (s *Server) Stop() error { @@ -549,8 +551,6 @@ func (s *Server) AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.Assign continue } - log.Printf("no enough space for allocation of Collection %d, Partition %d, Channel %s, Count %d", - r.CollectionID, r.PartitionID, r.ChannelName, r.Count) if err = s.openNewSegment(r.CollectionID, r.PartitionID, r.ChannelName); err != nil { result.Status.Reason = fmt.Sprintf("open new segment of Collection %d, Partition %d, Channel %s, Count %d error: %s", r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error()) diff --git a/internal/log/LICENSE.pingcap b/internal/log/LICENSE.pingcap new file mode 100644 index 0000000000..261eeb9e9f --- /dev/null +++ b/internal/log/LICENSE.pingcap @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/internal/log/config.go b/internal/log/config.go new file mode 100644 index 0000000000..45ab1d3e3d --- /dev/null +++ b/internal/log/config.go @@ -0,0 +1,106 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "time" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + defaultLogMaxSize = 300 // MB +) + +// FileLogConfig serializes file log related config in toml/json. +type FileLogConfig struct { + // Log filename, leave empty to disable file log. + Filename string `toml:"filename" json:"filename"` + // Max size for a single file, in MB. + MaxSize int `toml:"max-size" json:"max-size"` + // Max log keep days, default is never deleting. + MaxDays int `toml:"max-days" json:"max-days"` + // Maximum number of old log files to retain. + MaxBackups int `toml:"max-backups" json:"max-backups"` +} + +// Config serializes log related config in toml/json. +type Config struct { + // Log level. + Level string `toml:"level" json:"level"` + // Log format. one of json, text, or console. + Format string `toml:"format" json:"format"` + // Disable automatic timestamps in output. + DisableTimestamp bool `toml:"disable-timestamp" json:"disable-timestamp"` + // File log config. + File FileLogConfig `toml:"file" json:"file"` + // Development puts the logger in development mode, which changes the + // behavior of DPanicLevel and takes stacktraces more liberally. + Development bool `toml:"development" json:"development"` + // DisableCaller stops annotating logs with the calling function's file + // name and line number. By default, all logs are annotated. + DisableCaller bool `toml:"disable-caller" json:"disable-caller"` + // DisableStacktrace completely disables automatic stacktrace capturing. By + // default, stacktraces are captured for WarnLevel and above logs in + // development and ErrorLevel and above in production. + DisableStacktrace bool `toml:"disable-stacktrace" json:"disable-stacktrace"` + // DisableErrorVerbose stops annotating logs with the full verbose error + // message. + DisableErrorVerbose bool `toml:"disable-error-verbose" json:"disable-error-verbose"` + // SamplingConfig sets a sampling strategy for the logger. Sampling caps the + // global CPU and I/O load that logging puts on your process while attempting + // to preserve a representative subset of your logs. + // + // Values configured here are per-second. See zapcore.NewSampler for details. + Sampling *zap.SamplingConfig `toml:"sampling" json:"sampling"` +} + +// ZapProperties records some information about zap. +type ZapProperties struct { + Core zapcore.Core + Syncer zapcore.WriteSyncer + Level zap.AtomicLevel +} + +func newZapTextEncoder(cfg *Config) zapcore.Encoder { + return NewTextEncoder(cfg) +} + +func (cfg *Config) buildOptions(errSink zapcore.WriteSyncer) []zap.Option { + opts := []zap.Option{zap.ErrorOutput(errSink)} + + if cfg.Development { + opts = append(opts, zap.Development()) + } + + if !cfg.DisableCaller { + opts = append(opts, zap.AddCaller()) + } + + stackLevel := zap.ErrorLevel + if cfg.Development { + stackLevel = zap.WarnLevel + } + if !cfg.DisableStacktrace { + opts = append(opts, zap.AddStacktrace(stackLevel)) + } + + if cfg.Sampling != nil { + opts = append(opts, zap.WrapCore(func(core zapcore.Core) zapcore.Core { + return zapcore.NewSampler(core, time.Second, cfg.Sampling.Initial, cfg.Sampling.Thereafter) + })) + } + return opts +} diff --git a/internal/log/global.go b/internal/log/global.go new file mode 100644 index 0000000000..a33b478e6d --- /dev/null +++ b/internal/log/global.go @@ -0,0 +1,76 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// Debug logs a message at DebugLevel. The message includes any fields passed +// at the log site, as well as any fields accumulated on the logger. +func Debug(msg string, fields ...zap.Field) { + L().WithOptions(zap.AddCallerSkip(1)).Debug(msg, fields...) +} + +// Info logs a message at InfoLevel. The message includes any fields passed +// at the log site, as well as any fields accumulated on the logger. +func Info(msg string, fields ...zap.Field) { + L().WithOptions(zap.AddCallerSkip(1)).Info(msg, fields...) +} + +// Warn logs a message at WarnLevel. The message includes any fields passed +// at the log site, as well as any fields accumulated on the logger. +func Warn(msg string, fields ...zap.Field) { + L().WithOptions(zap.AddCallerSkip(1)).Warn(msg, fields...) +} + +// Error logs a message at ErrorLevel. The message includes any fields passed +// at the log site, as well as any fields accumulated on the logger. +func Error(msg string, fields ...zap.Field) { + L().WithOptions(zap.AddCallerSkip(1)).Error(msg, fields...) +} + +// Panic logs a message at PanicLevel. The message includes any fields passed +// at the log site, as well as any fields accumulated on the logger. +// +// The logger then panics, even if logging at PanicLevel is disabled. +func Panic(msg string, fields ...zap.Field) { + L().WithOptions(zap.AddCallerSkip(1)).Panic(msg, fields...) +} + +// Fatal logs a message at FatalLevel. The message includes any fields passed +// at the log site, as well as any fields accumulated on the logger. +// +// The logger then calls os.Exit(1), even if logging at FatalLevel is +// disabled. +func Fatal(msg string, fields ...zap.Field) { + L().WithOptions(zap.AddCallerSkip(1)).Fatal(msg, fields...) +} + +// With creates a child logger and adds structured context to it. +// Fields added to the child don't affect the parent, and vice versa. +func With(fields ...zap.Field) *zap.Logger { + return L().WithOptions(zap.AddCallerSkip(1)).With(fields...) +} + +// SetLevel alters the logging level. +func SetLevel(l zapcore.Level) { + _globalP.Load().(*ZapProperties).Level.SetLevel(l) +} + +// GetLevel gets the logging level. +func GetLevel() zapcore.Level { + return _globalP.Load().(*ZapProperties).Level.Level() +} diff --git a/internal/log/log.go b/internal/log/log.go new file mode 100644 index 0000000000..8b04be8f0a --- /dev/null +++ b/internal/log/log.go @@ -0,0 +1,128 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "errors" + "os" + "sync/atomic" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + lumberjack "gopkg.in/natefinch/lumberjack.v2" +) + +var _globalL, _globalP, _globalS atomic.Value + +func init() { + l, p := newStdLogger() + _globalL.Store(l) + _globalP.Store(p) + + s := _globalL.Load().(*zap.Logger).Sugar() + _globalS.Store(s) +} + +// InitLogger initializes a zap logger. +func InitLogger(cfg *Config, opts ...zap.Option) (*zap.Logger, *ZapProperties, error) { + var output zapcore.WriteSyncer + if len(cfg.File.Filename) > 0 { + lg, err := initFileLog(&cfg.File) + if err != nil { + return nil, nil, err + } + output = zapcore.AddSync(lg) + } else { + stdOut, _, err := zap.Open([]string{"stdout"}...) + if err != nil { + return nil, nil, err + } + output = stdOut + } + return InitLoggerWithWriteSyncer(cfg, output, opts...) +} + +// InitLoggerWithWriteSyncer initializes a zap logger with specified write syncer. +func InitLoggerWithWriteSyncer(cfg *Config, output zapcore.WriteSyncer, opts ...zap.Option) (*zap.Logger, *ZapProperties, error) { + level := zap.NewAtomicLevel() + err := level.UnmarshalText([]byte(cfg.Level)) + if err != nil { + return nil, nil, err + } + core := NewTextCore(newZapTextEncoder(cfg), output, level) + opts = append(cfg.buildOptions(output), opts...) + lg := zap.New(core, opts...) + r := &ZapProperties{ + Core: core, + Syncer: output, + Level: level, + } + return lg, r, nil +} + +// initFileLog initializes file based logging options. +func initFileLog(cfg *FileLogConfig) (*lumberjack.Logger, error) { + if st, err := os.Stat(cfg.Filename); err == nil { + if st.IsDir() { + return nil, errors.New("can't use directory as log file name") + } + } + if cfg.MaxSize == 0 { + cfg.MaxSize = defaultLogMaxSize + } + + // use lumberjack to logrotate + return &lumberjack.Logger{ + Filename: cfg.Filename, + MaxSize: cfg.MaxSize, + MaxBackups: cfg.MaxBackups, + MaxAge: cfg.MaxDays, + LocalTime: true, + }, nil +} + +func newStdLogger() (*zap.Logger, *ZapProperties) { + conf := &Config{Level: "info", File: FileLogConfig{}} + lg, r, _ := InitLogger(conf) + return lg, r +} + +// L returns the global Logger, which can be reconfigured with ReplaceGlobals. +// It's safe for concurrent use. +func L() *zap.Logger { + return _globalL.Load().(*zap.Logger) +} + +// S returns the global SugaredLogger, which can be reconfigured with +// ReplaceGlobals. It's safe for concurrent use. +func S() *zap.SugaredLogger { + return _globalS.Load().(*zap.SugaredLogger) +} + +// ReplaceGlobals replaces the global Logger and SugaredLogger. +// It's safe for concurrent use. +func ReplaceGlobals(logger *zap.Logger, props *ZapProperties) { + _globalL.Store(logger) + _globalS.Store(logger.Sugar()) + _globalP.Store(props) +} + +// Sync flushes any buffered log entries. +func Sync() error { + err := L().Sync() + if err != nil { + return err + } + return S().Sync() +} diff --git a/internal/log/log_test.go b/internal/log/log_test.go new file mode 100644 index 0000000000..2ef8523cc1 --- /dev/null +++ b/internal/log/log_test.go @@ -0,0 +1,22 @@ +package log + +import ( + "testing" + + "github.com/stretchr/testify/assert" + lumberjack "gopkg.in/natefinch/lumberjack.v2" +) + +func TestLumberjack(t *testing.T) { + l := &lumberjack.Logger{ + Filename: "/var/log/milvus/xxoo", + MaxSize: 100, + MaxAge: 1, + MaxBackups: 2, + LocalTime: true, + } + _, err := l.Write([]byte{1, 1}) + assert.Nil(t, err) + err = l.Close() + assert.Nil(t, err) +} diff --git a/internal/log/logutil.go b/internal/log/logutil.go new file mode 100644 index 0000000000..82c9a7e650 --- /dev/null +++ b/internal/log/logutil.go @@ -0,0 +1,167 @@ +package log + +import ( + "sync" + "sync/atomic" + + "github.com/apache/pulsar-client-go/pulsar/log" + + "go.uber.org/zap/zapcore" + + etcd "go.etcd.io/etcd/clientv3" + "google.golang.org/grpc/grpclog" + + "go.uber.org/zap" +) + +type zapWrapper struct { + logger *zap.Logger +} + +func (w *zapWrapper) Info(args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Info(args...) +} + +func (w *zapWrapper) Infoln(args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Info(args...) +} + +func (w zapWrapper) Infof(format string, args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Infof(format, args...) +} + +func (w zapWrapper) Warning(args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Warn(args...) +} + +func (w zapWrapper) Warningln(args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Warn(args...) +} + +func (w *zapWrapper) Warningf(format string, args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Warnf(format, args...) +} + +func (w zapWrapper) Error(args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Error(args...) +} + +func (w *zapWrapper) Errorln(args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Error(args...) +} + +func (w zapWrapper) Errorf(format string, args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Errorf(format, args...) +} + +func (w *zapWrapper) Fatal(args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Fatal(args...) +} + +func (w zapWrapper) Fatalln(args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Fatal(args...) +} + +func (w *zapWrapper) Fatalf(format string, args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Fatalf(format, args...) +} + +// V reports whether verbosity level l is at least the requested verbose level. +// grpc LoggerV2 +// 0=info, 1=warning, 2=error, 3=fatal +// zap +// -1=debug, 0=info, 1=warning, 2=error, 3=dpanic, 4=panic, 5=fatal +func (w *zapWrapper) V(l int) bool { + zapLevel := l + if l == 3 { + zapLevel = 5 + } + return w.logger.Core().Enabled(zapcore.Level(zapLevel)) +} + +func (w *zapWrapper) SubLogger(fields log.Fields) log.Logger { + return w.WithFields(fields).(log.Logger) +} + +func (w *zapWrapper) WithFields(fields log.Fields) log.Entry { + if len(fields) == 0 { + return w + } + kv := make([]interface{}, 0, 2*len(fields)) + for k, v := range fields { + kv = append(kv, k, v) + } + return &zapWrapper{ + logger: w.logger.Sugar().With(kv...).Desugar(), + } +} + +func (w *zapWrapper) WithField(name string, value interface{}) log.Entry { + return &zapWrapper{ + logger: w.logger.Sugar().With(name, value).Desugar(), + } +} + +func (w *zapWrapper) WithError(err error) log.Entry { + return &zapWrapper{ + logger: w.logger.Sugar().With("error", err).Desugar(), + } +} + +func (w *zapWrapper) Debug(args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Debug(args...) +} + +func (w *zapWrapper) Warn(args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Warn(args...) +} + +func (w *zapWrapper) Debugf(format string, args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Debugf(format, args...) +} + +func (w *zapWrapper) Warnf(format string, args ...interface{}) { + w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Warnf(format, args...) +} + +var once sync.Once +var _globalZapWrapper atomic.Value + +const defaultLogLevel = "info" + +func init() { + conf := &Config{Level: defaultLogLevel, File: FileLogConfig{}} + lg, _, _ := InitLogger(conf) + _globalZapWrapper.Store(&zapWrapper{ + logger: lg, + }) +} + +func SetupLogger(cfg *Config) { + once.Do(func() { + // initialize logger + logger, p, err := InitLogger(cfg, zap.AddStacktrace(zap.ErrorLevel)) + if err == nil { + ReplaceGlobals(logger, p) + } else { + Fatal("initialize logger error", zap.Error(err)) + } + + // initialize grpc and etcd logger + c := *cfg + c.Level = defaultLogLevel + lg, _, err := InitLogger(&c) + if err != nil { + Fatal("initialize grpc/etcd logger error", zap.Error(err)) + } + + wrapper := &zapWrapper{lg} + grpclog.SetLoggerV2(wrapper) + etcd.SetLogger(wrapper) + _globalZapWrapper.Store(wrapper) + }) +} + +func GetZapWrapper() *zapWrapper { + return _globalZapWrapper.Load().(*zapWrapper) +} diff --git a/internal/log/zap_text_core.go b/internal/log/zap_text_core.go new file mode 100644 index 0000000000..301df8b88a --- /dev/null +++ b/internal/log/zap_text_core.go @@ -0,0 +1,90 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "fmt" + + "go.uber.org/zap/zapcore" +) + +// NewTextCore creates a Core that writes logs to a WriteSyncer. +func NewTextCore(enc zapcore.Encoder, ws zapcore.WriteSyncer, enab zapcore.LevelEnabler) zapcore.Core { + return &textIOCore{ + LevelEnabler: enab, + enc: enc, + out: ws, + } +} + +// textIOCore is a copy of zapcore.ioCore that only accept *textEncoder +// it can be removed after https://github.com/uber-go/zap/pull/685 be merged +type textIOCore struct { + zapcore.LevelEnabler + enc zapcore.Encoder + out zapcore.WriteSyncer +} + +func (c *textIOCore) With(fields []zapcore.Field) zapcore.Core { + clone := c.clone() + // it's different to ioCore, here call textEncoder#addFields to fix https://github.com/pingcap/log/issues/3 + switch e := clone.enc.(type) { + case *textEncoder: + e.addFields(fields) + case zapcore.ObjectEncoder: + for _, field := range fields { + field.AddTo(e) + } + default: + panic(fmt.Sprintf("unsupported encode type: %T for With operation", clone.enc)) + } + return clone +} + +func (c *textIOCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if c.Enabled(ent.Level) { + return ce.AddCore(ent, c) + } + return ce +} + +func (c *textIOCore) Write(ent zapcore.Entry, fields []zapcore.Field) error { + buf, err := c.enc.EncodeEntry(ent, fields) + if err != nil { + return err + } + _, err = c.out.Write(buf.Bytes()) + buf.Free() + if err != nil { + return err + } + if ent.Level > zapcore.ErrorLevel { + // Since we may be crashing the program, sync the output. Ignore Sync + // errors, pending a clean solution to issue https://github.com/uber-go/zap/issues/370. + c.Sync() + } + return nil +} + +func (c *textIOCore) Sync() error { + return c.out.Sync() +} + +func (c *textIOCore) clone() *textIOCore { + return &textIOCore{ + LevelEnabler: c.LevelEnabler, + enc: c.enc.Clone(), + out: c.out, + } +} diff --git a/internal/log/zap_text_encoder.go b/internal/log/zap_text_encoder.go new file mode 100644 index 0000000000..7b03b9220d --- /dev/null +++ b/internal/log/zap_text_encoder.go @@ -0,0 +1,663 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package log + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "math" + "strings" + "sync" + "time" + "unicode/utf8" + + "go.uber.org/zap/buffer" + "go.uber.org/zap/zapcore" +) + +// DefaultTimeEncoder serializes time.Time to a human-readable formatted string +func DefaultTimeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) { + s := t.Format("2006/01/02 15:04:05.000 -07:00") + if e, ok := enc.(*textEncoder); ok { + for _, c := range []byte(s) { + e.buf.AppendByte(c) + } + return + } + enc.AppendString(s) +} + +// ShortCallerEncoder serializes a caller in file:line format. +func ShortCallerEncoder(caller zapcore.EntryCaller, enc zapcore.PrimitiveArrayEncoder) { + enc.AppendString(getCallerString(caller)) +} + +func getCallerString(ec zapcore.EntryCaller) string { + if !ec.Defined { + return "" + } + + idx := strings.LastIndexByte(ec.File, '/') + buf := _pool.Get() + for i := idx + 1; i < len(ec.File); i++ { + b := ec.File[i] + switch { + case b >= 'A' && b <= 'Z': + buf.AppendByte(b) + case b >= 'a' && b <= 'z': + buf.AppendByte(b) + case b >= '0' && b <= '9': + buf.AppendByte(b) + case b == '.' || b == '-' || b == '_': + buf.AppendByte(b) + default: + } + } + buf.AppendByte(':') + buf.AppendInt(int64(ec.Line)) + caller := buf.String() + buf.Free() + return caller +} + +// For JSON-escaping; see textEncoder.safeAddString below. +const _hex = "0123456789abcdef" + +var _textPool = sync.Pool{New: func() interface{} { + return &textEncoder{} +}} + +var ( + _pool = buffer.NewPool() + // Get retrieves a buffer from the pool, creating one if necessary. + Get = _pool.Get +) + +func getTextEncoder() *textEncoder { + return _textPool.Get().(*textEncoder) +} + +func putTextEncoder(enc *textEncoder) { + if enc.reflectBuf != nil { + enc.reflectBuf.Free() + } + enc.EncoderConfig = nil + enc.buf = nil + enc.spaced = false + enc.openNamespaces = 0 + enc.reflectBuf = nil + enc.reflectEnc = nil + _textPool.Put(enc) +} + +type textEncoder struct { + *zapcore.EncoderConfig + buf *buffer.Buffer + spaced bool // include spaces after colons and commas + openNamespaces int + disableErrorVerbose bool + + // for encoding generic values by reflection + reflectBuf *buffer.Buffer + reflectEnc *json.Encoder +} + +// NewTextEncoder creates a fast, low-allocation Text encoder. The encoder +// appropriately escapes all field keys and values. +func NewTextEncoder(cfg *Config) zapcore.Encoder { + cc := zapcore.EncoderConfig{ + // Keys can be anything except the empty string. + TimeKey: "time", + LevelKey: "level", + NameKey: "name", + CallerKey: "caller", + MessageKey: "message", + StacktraceKey: "stack", + LineEnding: zapcore.DefaultLineEnding, + EncodeLevel: zapcore.CapitalLevelEncoder, + EncodeTime: DefaultTimeEncoder, + EncodeDuration: zapcore.StringDurationEncoder, + EncodeCaller: ShortCallerEncoder, + } + if cfg.DisableTimestamp { + cc.TimeKey = "" + } + switch cfg.Format { + case "text", "": + return &textEncoder{ + EncoderConfig: &cc, + buf: _pool.Get(), + spaced: false, + disableErrorVerbose: cfg.DisableErrorVerbose, + } + case "json": + return zapcore.NewJSONEncoder(cc) + default: + panic(fmt.Sprintf("unsupport log format: %s", cfg.Format)) + } +} + +func (enc *textEncoder) AddArray(key string, arr zapcore.ArrayMarshaler) error { + enc.addKey(key) + return enc.AppendArray(arr) +} + +func (enc *textEncoder) AddObject(key string, obj zapcore.ObjectMarshaler) error { + enc.addKey(key) + return enc.AppendObject(obj) +} + +func (enc *textEncoder) AddBinary(key string, val []byte) { + enc.AddString(key, base64.StdEncoding.EncodeToString(val)) +} + +func (enc *textEncoder) AddByteString(key string, val []byte) { + enc.addKey(key) + enc.AppendByteString(val) +} + +func (enc *textEncoder) AddBool(key string, val bool) { + enc.addKey(key) + enc.AppendBool(val) +} + +func (enc *textEncoder) AddComplex128(key string, val complex128) { + enc.addKey(key) + enc.AppendComplex128(val) +} + +func (enc *textEncoder) AddDuration(key string, val time.Duration) { + enc.addKey(key) + enc.AppendDuration(val) +} + +func (enc *textEncoder) AddFloat64(key string, val float64) { + enc.addKey(key) + enc.AppendFloat64(val) +} + +func (enc *textEncoder) AddInt64(key string, val int64) { + enc.addKey(key) + enc.AppendInt64(val) +} + +func (enc *textEncoder) resetReflectBuf() { + if enc.reflectBuf == nil { + enc.reflectBuf = _pool.Get() + enc.reflectEnc = json.NewEncoder(enc.reflectBuf) + } else { + enc.reflectBuf.Reset() + } +} + +func (enc *textEncoder) AddReflected(key string, obj interface{}) error { + enc.resetReflectBuf() + err := enc.reflectEnc.Encode(obj) + if err != nil { + return err + } + enc.reflectBuf.TrimNewline() + enc.addKey(key) + enc.AppendByteString(enc.reflectBuf.Bytes()) + return nil +} + +func (enc *textEncoder) OpenNamespace(key string) { + enc.addKey(key) + enc.buf.AppendByte('{') + enc.openNamespaces++ +} + +func (enc *textEncoder) AddString(key, val string) { + enc.addKey(key) + enc.AppendString(val) +} + +func (enc *textEncoder) AddTime(key string, val time.Time) { + enc.addKey(key) + enc.AppendTime(val) +} + +func (enc *textEncoder) AddUint64(key string, val uint64) { + enc.addKey(key) + enc.AppendUint64(val) +} + +func (enc *textEncoder) AppendArray(arr zapcore.ArrayMarshaler) error { + enc.addElementSeparator() + ne := enc.cloned() + ne.buf.AppendByte('[') + err := arr.MarshalLogArray(ne) + ne.buf.AppendByte(']') + enc.AppendByteString(ne.buf.Bytes()) + ne.buf.Free() + putTextEncoder(ne) + return err +} + +func (enc *textEncoder) AppendObject(obj zapcore.ObjectMarshaler) error { + enc.addElementSeparator() + ne := enc.cloned() + ne.buf.AppendByte('{') + err := obj.MarshalLogObject(ne) + ne.buf.AppendByte('}') + enc.AppendByteString(ne.buf.Bytes()) + ne.buf.Free() + putTextEncoder(ne) + return err +} + +func (enc *textEncoder) AppendBool(val bool) { + enc.addElementSeparator() + enc.buf.AppendBool(val) +} + +func (enc *textEncoder) AppendByteString(val []byte) { + enc.addElementSeparator() + if !enc.needDoubleQuotes(string(val)) { + enc.safeAddByteString(val) + return + } + enc.buf.AppendByte('"') + enc.safeAddByteString(val) + enc.buf.AppendByte('"') +} + +func (enc *textEncoder) AppendComplex128(val complex128) { + enc.addElementSeparator() + // Cast to a platform-independent, fixed-size type. + r, i := real(val), imag(val) + enc.buf.AppendFloat(r, 64) + enc.buf.AppendByte('+') + enc.buf.AppendFloat(i, 64) + enc.buf.AppendByte('i') +} + +func (enc *textEncoder) AppendDuration(val time.Duration) { + cur := enc.buf.Len() + enc.EncodeDuration(val, enc) + if cur == enc.buf.Len() { + // User-supplied EncodeDuration is a no-op. Fall back to nanoseconds to keep + // JSON valid. + enc.AppendInt64(int64(val)) + } +} + +func (enc *textEncoder) AppendInt64(val int64) { + enc.addElementSeparator() + enc.buf.AppendInt(val) +} + +func (enc *textEncoder) AppendReflected(val interface{}) error { + enc.resetReflectBuf() + err := enc.reflectEnc.Encode(val) + if err != nil { + return err + } + enc.reflectBuf.TrimNewline() + enc.AppendByteString(enc.reflectBuf.Bytes()) + return nil +} + +func (enc *textEncoder) AppendString(val string) { + enc.addElementSeparator() + enc.safeAddStringWithQuote(val) +} + +func (enc *textEncoder) AppendTime(val time.Time) { + cur := enc.buf.Len() + enc.EncodeTime(val, enc) + if cur == enc.buf.Len() { + // User-supplied EncodeTime is a no-op. Fall back to nanos since epoch to keep + // output JSON valid. + enc.AppendInt64(val.UnixNano()) + } +} + +func (enc *textEncoder) beginQuoteFiled() { + if enc.buf.Len() > 0 { + enc.buf.AppendByte(' ') + } + enc.buf.AppendByte('[') +} + +func (enc *textEncoder) endQuoteFiled() { + enc.buf.AppendByte(']') +} + +func (enc *textEncoder) AppendUint64(val uint64) { + enc.addElementSeparator() + enc.buf.AppendUint(val) +} + +func (enc *textEncoder) AddComplex64(k string, v complex64) { enc.AddComplex128(k, complex128(v)) } +func (enc *textEncoder) AddFloat32(k string, v float32) { enc.AddFloat64(k, float64(v)) } +func (enc *textEncoder) AddInt(k string, v int) { enc.AddInt64(k, int64(v)) } +func (enc *textEncoder) AddInt32(k string, v int32) { enc.AddInt64(k, int64(v)) } +func (enc *textEncoder) AddInt16(k string, v int16) { enc.AddInt64(k, int64(v)) } +func (enc *textEncoder) AddInt8(k string, v int8) { enc.AddInt64(k, int64(v)) } +func (enc *textEncoder) AddUint(k string, v uint) { enc.AddUint64(k, uint64(v)) } +func (enc *textEncoder) AddUint32(k string, v uint32) { enc.AddUint64(k, uint64(v)) } +func (enc *textEncoder) AddUint16(k string, v uint16) { enc.AddUint64(k, uint64(v)) } +func (enc *textEncoder) AddUint8(k string, v uint8) { enc.AddUint64(k, uint64(v)) } +func (enc *textEncoder) AddUintptr(k string, v uintptr) { enc.AddUint64(k, uint64(v)) } +func (enc *textEncoder) AppendComplex64(v complex64) { enc.AppendComplex128(complex128(v)) } +func (enc *textEncoder) AppendFloat64(v float64) { enc.appendFloat(v, 64) } +func (enc *textEncoder) AppendFloat32(v float32) { enc.appendFloat(float64(v), 32) } +func (enc *textEncoder) AppendInt(v int) { enc.AppendInt64(int64(v)) } +func (enc *textEncoder) AppendInt32(v int32) { enc.AppendInt64(int64(v)) } +func (enc *textEncoder) AppendInt16(v int16) { enc.AppendInt64(int64(v)) } +func (enc *textEncoder) AppendInt8(v int8) { enc.AppendInt64(int64(v)) } +func (enc *textEncoder) AppendUint(v uint) { enc.AppendUint64(uint64(v)) } +func (enc *textEncoder) AppendUint32(v uint32) { enc.AppendUint64(uint64(v)) } +func (enc *textEncoder) AppendUint16(v uint16) { enc.AppendUint64(uint64(v)) } +func (enc *textEncoder) AppendUint8(v uint8) { enc.AppendUint64(uint64(v)) } +func (enc *textEncoder) AppendUintptr(v uintptr) { enc.AppendUint64(uint64(v)) } + +func (enc *textEncoder) Clone() zapcore.Encoder { + clone := enc.cloned() + clone.buf.Write(enc.buf.Bytes()) + return clone +} + +func (enc *textEncoder) cloned() *textEncoder { + clone := getTextEncoder() + clone.EncoderConfig = enc.EncoderConfig + clone.spaced = enc.spaced + clone.openNamespaces = enc.openNamespaces + clone.disableErrorVerbose = enc.disableErrorVerbose + clone.buf = _pool.Get() + return clone +} + +func (enc *textEncoder) EncodeEntry(ent zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error) { + final := enc.cloned() + if final.TimeKey != "" { + final.beginQuoteFiled() + final.AppendTime(ent.Time) + final.endQuoteFiled() + } + + if final.LevelKey != "" { + final.beginQuoteFiled() + cur := final.buf.Len() + final.EncodeLevel(ent.Level, final) + if cur == final.buf.Len() { + // User-supplied EncodeLevel was a no-op. Fall back to strings to keep + // output JSON valid. + final.AppendString(ent.Level.String()) + } + final.endQuoteFiled() + } + + if ent.LoggerName != "" && final.NameKey != "" { + final.beginQuoteFiled() + cur := final.buf.Len() + nameEncoder := final.EncodeName + + // if no name encoder provided, fall back to FullNameEncoder for backwards + // compatibility + if nameEncoder == nil { + nameEncoder = zapcore.FullNameEncoder + } + + nameEncoder(ent.LoggerName, final) + if cur == final.buf.Len() { + // User-supplied EncodeName was a no-op. Fall back to strings to + // keep output JSON valid. + final.AppendString(ent.LoggerName) + } + final.endQuoteFiled() + } + if ent.Caller.Defined && final.CallerKey != "" { + final.beginQuoteFiled() + cur := final.buf.Len() + final.EncodeCaller(ent.Caller, final) + if cur == final.buf.Len() { + // User-supplied EncodeCaller was a no-op. Fall back to strings to + // keep output JSON valid. + final.AppendString(ent.Caller.String()) + } + final.endQuoteFiled() + } + // add Message + if len(ent.Message) > 0 { + final.beginQuoteFiled() + final.AppendString(ent.Message) + final.endQuoteFiled() + } + if enc.buf.Len() > 0 { + final.buf.AppendByte(' ') + final.buf.Write(enc.buf.Bytes()) + } + final.addFields(fields) + final.closeOpenNamespaces() + if ent.Stack != "" && final.StacktraceKey != "" { + final.beginQuoteFiled() + final.AddString(final.StacktraceKey, ent.Stack) + final.endQuoteFiled() + } + + if final.LineEnding != "" { + final.buf.AppendString(final.LineEnding) + } else { + final.buf.AppendString(zapcore.DefaultLineEnding) + } + + ret := final.buf + putTextEncoder(final) + return ret, nil +} + +func (enc *textEncoder) truncate() { + enc.buf.Reset() +} + +func (enc *textEncoder) closeOpenNamespaces() { + for i := 0; i < enc.openNamespaces; i++ { + enc.buf.AppendByte('}') + } +} + +func (enc *textEncoder) addKey(key string) { + enc.addElementSeparator() + enc.safeAddStringWithQuote(key) + enc.buf.AppendByte('=') +} + +func (enc *textEncoder) addElementSeparator() { + last := enc.buf.Len() - 1 + if last < 0 { + return + } + switch enc.buf.Bytes()[last] { + case '{', '[', ':', ',', ' ', '=': + return + default: + enc.buf.AppendByte(',') + } +} + +func (enc *textEncoder) appendFloat(val float64, bitSize int) { + enc.addElementSeparator() + switch { + case math.IsNaN(val): + enc.buf.AppendString("NaN") + case math.IsInf(val, 1): + enc.buf.AppendString("+Inf") + case math.IsInf(val, -1): + enc.buf.AppendString("-Inf") + default: + enc.buf.AppendFloat(val, bitSize) + } +} + +// safeAddString JSON-escapes a string and appends it to the internal buffer. +// Unlike the standard library's encoder, it doesn't attempt to protect the +// user from browser vulnerabilities or JSONP-related problems. +func (enc *textEncoder) safeAddString(s string) { + for i := 0; i < len(s); { + if enc.tryAddRuneSelf(s[i]) { + i++ + continue + } + r, size := utf8.DecodeRuneInString(s[i:]) + if enc.tryAddRuneError(r, size) { + i++ + continue + } + enc.buf.AppendString(s[i : i+size]) + i += size + } +} + +// safeAddStringWithQuote will automatically add quotoes. +func (enc *textEncoder) safeAddStringWithQuote(s string) { + if !enc.needDoubleQuotes(s) { + enc.safeAddString(s) + return + } + enc.buf.AppendByte('"') + enc.safeAddString(s) + enc.buf.AppendByte('"') +} + +// safeAddByteString is no-alloc equivalent of safeAddString(string(s)) for s []byte. +func (enc *textEncoder) safeAddByteString(s []byte) { + for i := 0; i < len(s); { + if enc.tryAddRuneSelf(s[i]) { + i++ + continue + } + r, size := utf8.DecodeRune(s[i:]) + if enc.tryAddRuneError(r, size) { + i++ + continue + } + enc.buf.Write(s[i : i+size]) + i += size + } +} + +// See [log-fileds](https://github.com/tikv/rfcs/blob/master/text/2018-12-19-unified-log-format.md#log-fields-section). +func (enc *textEncoder) needDoubleQuotes(s string) bool { + for i := 0; i < len(s); { + b := s[i] + if b <= 0x20 { + return true + } + switch b { + case '\\', '"', '[', ']', '=': + return true + } + i++ + } + return false +} + +// tryAddRuneSelf appends b if it is valid UTF-8 character represented in a single byte. +func (enc *textEncoder) tryAddRuneSelf(b byte) bool { + if b >= utf8.RuneSelf { + return false + } + if 0x20 <= b && b != '\\' && b != '"' { + enc.buf.AppendByte(b) + return true + } + switch b { + case '\\', '"': + enc.buf.AppendByte('\\') + enc.buf.AppendByte(b) + case '\n': + enc.buf.AppendByte('\\') + enc.buf.AppendByte('n') + case '\r': + enc.buf.AppendByte('\\') + enc.buf.AppendByte('r') + case '\t': + enc.buf.AppendByte('\\') + enc.buf.AppendByte('t') + + default: + // Encode bytes < 0x20, except for the escape sequences above. + enc.buf.AppendString(`\u00`) + enc.buf.AppendByte(_hex[b>>4]) + enc.buf.AppendByte(_hex[b&0xF]) + } + return true +} + +func (enc *textEncoder) tryAddRuneError(r rune, size int) bool { + if r == utf8.RuneError && size == 1 { + enc.buf.AppendString(`\ufffd`) + return true + } + return false +} + +func (enc *textEncoder) addFields(fields []zapcore.Field) { + for _, f := range fields { + if f.Type == zapcore.ErrorType { + // handle ErrorType in pingcap/log to fix "[key=?,keyVerbose=?]" problem. + // see more detail at https://github.com/pingcap/log/pull/5 + enc.encodeError(f) + continue + } + enc.beginQuoteFiled() + f.AddTo(enc) + enc.endQuoteFiled() + } +} + +func (enc *textEncoder) encodeError(f zapcore.Field) { + err := f.Interface.(error) + basic := err.Error() + enc.beginQuoteFiled() + enc.AddString(f.Key, basic) + enc.endQuoteFiled() + if enc.disableErrorVerbose { + return + } + if e, isFormatter := err.(fmt.Formatter); isFormatter { + verbose := fmt.Sprintf("%+v", e) + if verbose != basic { + // This is a rich error type, like those produced by + // github.com/pkg/errors. + enc.beginQuoteFiled() + enc.AddString(f.Key+"Verbose", verbose) + enc.endQuoteFiled() + } + } +} diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index 03d0b2a22a..ef8f951a96 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -2,7 +2,6 @@ package pulsarms import ( "context" - "log" "path/filepath" "reflect" "strconv" @@ -10,6 +9,9 @@ import ( "sync" "time" + "github.com/zilliztech/milvus-distributed/internal/log" + "go.uber.org/zap" + "github.com/apache/pulsar-client-go/pulsar" "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/errors" @@ -64,10 +66,15 @@ func newPulsarMsgStream(ctx context.Context, consumerReflects := make([]reflect.SelectCase, 0) receiveBuf := make(chan *MsgPack, receiveBufSize) - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address}) + var client pulsar.Client + var err error + opts := pulsar.ClientOptions{ + URL: address, + } + client, err = pulsar.NewClient(opts) if err != nil { defer streamCancel() - log.Printf("Set pulsar client failed, error = %v", err) + log.Error("Set pulsar client failed, error", zap.Error(err)) return nil, err } @@ -203,7 +210,7 @@ func (ppRW *propertiesReaderWriter) ForeachKey(handler func(key, val string) err func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error { tsMsgs := msgPack.Msgs if len(tsMsgs) <= 0 { - log.Printf("Warning: Receive empty msgPack") + log.Debug("Warning: Receive empty msgPack") return nil } if len(ms.producers) <= 0 { @@ -303,12 +310,12 @@ func (ms *PulsarMsgStream) Consume() *MsgPack { select { case cm, ok := <-ms.receiveBuf: if !ok { - log.Println("buf chan closed") + log.Debug("buf chan closed") return nil } return cm case <-ms.ctx.Done(): - log.Printf("context closed") + log.Debug("context closed") return nil } } @@ -320,7 +327,7 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { for { select { case <-ms.ctx.Done(): - log.Println("done") + log.Debug("done") return default: tsMsgList := make([]TsMsg, 0) @@ -387,12 +394,12 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { headerMsg := commonpb.MsgHeader{} err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) if err != nil { - log.Printf("Failed to unmarshal message header, error = %v", err) + log.Error("Failed to unmarshal message header", zap.Error(err)) continue } tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType) if err != nil { - log.Printf("Failed to unmarshal tsMsg, error = %v", err) + log.Error("Failed to unmarshal tsMsg", zap.Error(err)) continue } @@ -590,7 +597,7 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer, return case pulsarMsg, ok := <-consumer.Chan(): if !ok { - log.Printf("consumer closed!") + log.Debug("consumer closed!") return } consumer.Ack(pulsarMsg) @@ -598,12 +605,12 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer, headerMsg := commonpb.MsgHeader{} err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) if err != nil { - log.Printf("Failed to unmarshal message header, error = %v", err) + log.Error("Failed to unmarshal message header", zap.Error(err)) continue } tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType) if err != nil { - log.Printf("Failed to unmarshal tsMsg, error = %v", err) + log.Error("Failed to unmarshal tsMsg", zap.Error(err)) continue } // set pulsar info to tsMsg @@ -662,11 +669,11 @@ func (ms *PulsarTtMsgStream) Seek(mp *internalpb2.MsgPosition) error { headerMsg := commonpb.MsgHeader{} err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) if err != nil { - log.Printf("Failed to unmarshal message header, error = %v", err) + log.Error("Failed to unmarshal message header", zap.Error(err)) } tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType) if err != nil { - log.Printf("Failed to unmarshal tsMsg, error = %v", err) + log.Error("Failed to unmarshal tsMsg", zap.Error(err)) } if tsMsg.Type() == commonpb.MsgType_kTimeTick { if tsMsg.BeginTs() >= mp.Timestamp {