From 72917467ec49dae80d744c3c3d6100efa4f2d902 Mon Sep 17 00:00:00 2001 From: Xiangyu Wang Date: Tue, 30 Mar 2021 10:52:42 +0800 Subject: [PATCH] Change singlenode logs output Signed-off-by: Xiangyu Wang --- cmd/singlenode/main.go | 15 ++++++---- internal/distributed/indexnode/service.go | 6 ++-- internal/distributed/masterservice/server.go | 2 +- internal/distributed/proxyservice/service.go | 1 - .../client/pulsar/pulsar_producer.go | 2 +- internal/msgstream/ms/msgstream_impl.go | 29 +++++++------------ internal/queryservice/queryservice.go | 7 ++--- 7 files changed, 29 insertions(+), 33 deletions(-) diff --git a/cmd/singlenode/main.go b/cmd/singlenode/main.go index e5961927ca..db97510a37 100644 --- a/cmd/singlenode/main.go +++ b/cmd/singlenode/main.go @@ -29,12 +29,15 @@ func initLogCfg() log.Config { logCfg.File.MaxSize = 300 logCfg.File.MaxBackups = 20 logCfg.File.MaxDays = 10 - ciFileDir := "/milvus-distributed/logs/" - if _, err := os.Stat(ciFileDir); err == nil { - logCfg.File.Filename = ciFileDir + "singlenode.log" - } else { - logCfg.File.Filename = "/tmp/milvus/singlenode.log" - } + + // FIXME(wxyu): Load from config files + logCfg.File.Filename = "" + //ciFileDir := "/milvus-distributed/logs/" + //if _, err := os.Stat(ciFileDir); err == nil { + // logCfg.File.Filename = ciFileDir + "singlenode.log" + //} else { + // logCfg.File.Filename = "/tmp/milvus/singlenode.log" + //} return logCfg } diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 593e06988f..b3adb0423f 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -145,8 +145,10 @@ func (s *Server) start() error { } func (s *Server) Stop() error { - if err := s.closer.Close(); err != nil { - return err + if s.closer != nil { + if err := s.closer.Close(); err != nil { + return err + } } s.loopCancel() if s.indexnode != nil { diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index f625a0921a..b6b20e6984 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -226,7 +226,7 @@ func (s *Server) start() error { func (s *Server) Stop() error { if s.closer != nil { if err := s.closer.Close(); err != nil { - return err + log.Error("close opentracing", zap.Error(err)) } } if s.proxyService != nil { diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go index 4ba943cec8..eda508bc04 100644 --- a/internal/distributed/proxyservice/service.go +++ b/internal/distributed/proxyservice/service.go @@ -139,7 +139,6 @@ func (s *Server) Stop() error { } } s.cancel() - s.closer.Close() err := s.proxyservice.Stop() if err != nil { return err diff --git a/internal/msgstream/client/pulsar/pulsar_producer.go b/internal/msgstream/client/pulsar/pulsar_producer.go index 845a4cfc8f..ead22f7bc9 100644 --- a/internal/msgstream/client/pulsar/pulsar_producer.go +++ b/internal/msgstream/client/pulsar/pulsar_producer.go @@ -16,7 +16,7 @@ func (pp *pulsarProducer) Topic() string { } func (pp *pulsarProducer) Send(ctx context.Context, message *client.ProducerMessage) error { - ppm := &pulsar.ProducerMessage{Payload: message.Payload} + ppm := &pulsar.ProducerMessage{Payload: message.Payload, Properties: message.Properties} _, err := pp.p.Send(ctx, ppm) return err } diff --git a/internal/msgstream/ms/msgstream_impl.go b/internal/msgstream/ms/msgstream_impl.go index 7fc30961c5..47b5405071 100644 --- a/internal/msgstream/ms/msgstream_impl.go +++ b/internal/msgstream/ms/msgstream_impl.go @@ -9,6 +9,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/opentracing/opentracing-go" "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream/client" @@ -50,8 +51,6 @@ type msgStream struct { bufSize int64 producerLock *sync.Mutex consumerLock *sync.Mutex - - scMap *sync.Map } func NewMsgStream(ctx context.Context, @@ -67,13 +66,6 @@ func NewMsgStream(ctx context.Context, consumerChannels := make([]string, 0) receiveBuf := make(chan *MsgPack, receiveBufSize) - var err error - if err != nil { - defer streamCancel() - log.Error("Set client failed, error", zap.Error(err)) - return nil, err - } - stream := &msgStream{ ctx: streamCtx, client: client, @@ -88,7 +80,6 @@ func NewMsgStream(ctx context.Context, producerLock: &sync.Mutex{}, consumerLock: &sync.Mutex{}, wait: &sync.WaitGroup{}, - scMap: &sync.Map{}, } return stream, nil @@ -231,6 +222,8 @@ func (ms *msgStream) Produce(msgPack *MsgPack) error { for k, v := range result { channel := ms.producerChannels[k] for i := 0; i < len(v.Msgs); i++ { + sp, spanCtx := trace.MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i]) + mb, err := v.Msgs[i].Marshal(v.Msgs[i]) if err != nil { return err @@ -243,7 +236,6 @@ func (ms *msgStream) Produce(msgPack *MsgPack) error { msg := &client.ProducerMessage{Payload: m, Properties: map[string]string{}} - sp, spanCtx := trace.MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i]) trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) if err := ms.producers[channel].Send( @@ -262,6 +254,8 @@ func (ms *msgStream) Produce(msgPack *MsgPack) error { func (ms *msgStream) Broadcast(msgPack *MsgPack) error { for _, v := range msgPack.Msgs { + sp, spanCtx := trace.MsgSpanFromCtx(v.TraceCtx(), v) + mb, err := v.Marshal(v) if err != nil { return err @@ -274,7 +268,6 @@ func (ms *msgStream) Broadcast(msgPack *MsgPack) error { msg := &client.ProducerMessage{Payload: m, Properties: map[string]string{}} - sp, spanCtx := trace.MsgSpanFromCtx(v.TraceCtx(), v) trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) ms.producerLock.Lock() @@ -334,17 +327,17 @@ func (ms *msgStream) receiveMsg(consumer Consumer) { continue } + sp, ok := trace.ExtractFromPulsarMsgProperties(tsMsg, msg.Properties()) + if ok { + tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp)) + } + tsMsg.SetPosition(&msgstream.MsgPosition{ ChannelName: filepath.Base(msg.Topic()), //FIXME MsgID: msg.ID().Serialize(), }) - sp, ok := trace.ExtractFromPulsarMsgProperties(tsMsg, msg.Properties()) - if ok { - ms.scMap.Store(tsMsg.ID(), sp.Context()) - } - msgPack := MsgPack{Msgs: []TsMsg{tsMsg}} ms.receiveBuf <- &msgPack @@ -614,7 +607,7 @@ func (ms *TtMsgStream) findTimeTick(consumer Consumer, sp, ok := trace.ExtractFromPulsarMsgProperties(tsMsg, msg.Properties()) if ok { - ms.scMap.Store(tsMsg.ID(), sp.Context()) + tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp)) } ms.unsolvedMutex.Lock() diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 677a102e97..2eb7c97b4c 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -285,7 +285,7 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol return status, fmt.Errorf("load partitions: %s", err) } - err = qs.watchDmChannels(dbID, collectionID) + err = qs.watchDmChannels(ctx, dbID, collectionID) if err != nil { log.Error("LoadCollectionRequest failed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) return fn(err), err @@ -516,7 +516,7 @@ func (qs *QueryService) LoadPartitions(ctx context.Context, req *querypb.LoadPar } if watchNeeded { - err = qs.watchDmChannels(dbID, collectionID) + err = qs.watchDmChannels(ctx, dbID, collectionID) if err != nil { log.Debug("LoadPartitionRequest completed", zap.Int64("msgID", req.Base.MsgID), zap.Int64s("partitionIDs", partitionIDs), zap.Error(err)) return fn(err), err @@ -681,8 +681,7 @@ func (qs *QueryService) SetDataService(dataService types.DataService) { qs.dataServiceClient = dataService } -func (qs *QueryService) watchDmChannels(dbID UniqueID, collectionID UniqueID) error { - ctx := context.TODO() +func (qs *QueryService) watchDmChannels(ctx context.Context, dbID UniqueID, collectionID UniqueID) error { collection, err := qs.replica.getCollectionByID(0, collectionID) if err != nil { return err