Change singlenode logs output

Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>
This commit is contained in:
Xiangyu Wang 2021-03-30 10:52:42 +08:00 committed by yefu.chen
parent 91f5c48037
commit 72917467ec
7 changed files with 29 additions and 33 deletions

View File

@ -29,12 +29,15 @@ func initLogCfg() log.Config {
logCfg.File.MaxSize = 300 logCfg.File.MaxSize = 300
logCfg.File.MaxBackups = 20 logCfg.File.MaxBackups = 20
logCfg.File.MaxDays = 10 logCfg.File.MaxDays = 10
ciFileDir := "/milvus-distributed/logs/"
if _, err := os.Stat(ciFileDir); err == nil { // FIXME(wxyu): Load from config files
logCfg.File.Filename = ciFileDir + "singlenode.log" logCfg.File.Filename = ""
} else { //ciFileDir := "/milvus-distributed/logs/"
logCfg.File.Filename = "/tmp/milvus/singlenode.log" //if _, err := os.Stat(ciFileDir); err == nil {
} // logCfg.File.Filename = ciFileDir + "singlenode.log"
//} else {
// logCfg.File.Filename = "/tmp/milvus/singlenode.log"
//}
return logCfg return logCfg
} }

View File

@ -145,8 +145,10 @@ func (s *Server) start() error {
} }
func (s *Server) Stop() error { func (s *Server) Stop() error {
if err := s.closer.Close(); err != nil { if s.closer != nil {
return err if err := s.closer.Close(); err != nil {
return err
}
} }
s.loopCancel() s.loopCancel()
if s.indexnode != nil { if s.indexnode != nil {

View File

@ -226,7 +226,7 @@ func (s *Server) start() error {
func (s *Server) Stop() error { func (s *Server) Stop() error {
if s.closer != nil { if s.closer != nil {
if err := s.closer.Close(); err != nil { if err := s.closer.Close(); err != nil {
return err log.Error("close opentracing", zap.Error(err))
} }
} }
if s.proxyService != nil { if s.proxyService != nil {

View File

@ -139,7 +139,6 @@ func (s *Server) Stop() error {
} }
} }
s.cancel() s.cancel()
s.closer.Close()
err := s.proxyservice.Stop() err := s.proxyservice.Stop()
if err != nil { if err != nil {
return err return err

View File

@ -16,7 +16,7 @@ func (pp *pulsarProducer) Topic() string {
} }
func (pp *pulsarProducer) Send(ctx context.Context, message *client.ProducerMessage) error { func (pp *pulsarProducer) Send(ctx context.Context, message *client.ProducerMessage) error {
ppm := &pulsar.ProducerMessage{Payload: message.Payload} ppm := &pulsar.ProducerMessage{Payload: message.Payload, Properties: message.Properties}
_, err := pp.p.Send(ctx, ppm) _, err := pp.p.Send(ctx, ppm)
return err return err
} }

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/client" "github.com/zilliztech/milvus-distributed/internal/msgstream/client"
@ -50,8 +51,6 @@ type msgStream struct {
bufSize int64 bufSize int64
producerLock *sync.Mutex producerLock *sync.Mutex
consumerLock *sync.Mutex consumerLock *sync.Mutex
scMap *sync.Map
} }
func NewMsgStream(ctx context.Context, func NewMsgStream(ctx context.Context,
@ -67,13 +66,6 @@ func NewMsgStream(ctx context.Context,
consumerChannels := make([]string, 0) consumerChannels := make([]string, 0)
receiveBuf := make(chan *MsgPack, receiveBufSize) receiveBuf := make(chan *MsgPack, receiveBufSize)
var err error
if err != nil {
defer streamCancel()
log.Error("Set client failed, error", zap.Error(err))
return nil, err
}
stream := &msgStream{ stream := &msgStream{
ctx: streamCtx, ctx: streamCtx,
client: client, client: client,
@ -88,7 +80,6 @@ func NewMsgStream(ctx context.Context,
producerLock: &sync.Mutex{}, producerLock: &sync.Mutex{},
consumerLock: &sync.Mutex{}, consumerLock: &sync.Mutex{},
wait: &sync.WaitGroup{}, wait: &sync.WaitGroup{},
scMap: &sync.Map{},
} }
return stream, nil return stream, nil
@ -231,6 +222,8 @@ func (ms *msgStream) Produce(msgPack *MsgPack) error {
for k, v := range result { for k, v := range result {
channel := ms.producerChannels[k] channel := ms.producerChannels[k]
for i := 0; i < len(v.Msgs); i++ { for i := 0; i < len(v.Msgs); i++ {
sp, spanCtx := trace.MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i])
mb, err := v.Msgs[i].Marshal(v.Msgs[i]) mb, err := v.Msgs[i].Marshal(v.Msgs[i])
if err != nil { if err != nil {
return err return err
@ -243,7 +236,6 @@ func (ms *msgStream) Produce(msgPack *MsgPack) error {
msg := &client.ProducerMessage{Payload: m, Properties: map[string]string{}} msg := &client.ProducerMessage{Payload: m, Properties: map[string]string{}}
sp, spanCtx := trace.MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i])
trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)
if err := ms.producers[channel].Send( if err := ms.producers[channel].Send(
@ -262,6 +254,8 @@ func (ms *msgStream) Produce(msgPack *MsgPack) error {
func (ms *msgStream) Broadcast(msgPack *MsgPack) error { func (ms *msgStream) Broadcast(msgPack *MsgPack) error {
for _, v := range msgPack.Msgs { for _, v := range msgPack.Msgs {
sp, spanCtx := trace.MsgSpanFromCtx(v.TraceCtx(), v)
mb, err := v.Marshal(v) mb, err := v.Marshal(v)
if err != nil { if err != nil {
return err return err
@ -274,7 +268,6 @@ func (ms *msgStream) Broadcast(msgPack *MsgPack) error {
msg := &client.ProducerMessage{Payload: m, Properties: map[string]string{}} msg := &client.ProducerMessage{Payload: m, Properties: map[string]string{}}
sp, spanCtx := trace.MsgSpanFromCtx(v.TraceCtx(), v)
trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)
ms.producerLock.Lock() ms.producerLock.Lock()
@ -334,17 +327,17 @@ func (ms *msgStream) receiveMsg(consumer Consumer) {
continue continue
} }
sp, ok := trace.ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
if ok {
tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
}
tsMsg.SetPosition(&msgstream.MsgPosition{ tsMsg.SetPosition(&msgstream.MsgPosition{
ChannelName: filepath.Base(msg.Topic()), ChannelName: filepath.Base(msg.Topic()),
//FIXME //FIXME
MsgID: msg.ID().Serialize(), MsgID: msg.ID().Serialize(),
}) })
sp, ok := trace.ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
if ok {
ms.scMap.Store(tsMsg.ID(), sp.Context())
}
msgPack := MsgPack{Msgs: []TsMsg{tsMsg}} msgPack := MsgPack{Msgs: []TsMsg{tsMsg}}
ms.receiveBuf <- &msgPack ms.receiveBuf <- &msgPack
@ -614,7 +607,7 @@ func (ms *TtMsgStream) findTimeTick(consumer Consumer,
sp, ok := trace.ExtractFromPulsarMsgProperties(tsMsg, msg.Properties()) sp, ok := trace.ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
if ok { if ok {
ms.scMap.Store(tsMsg.ID(), sp.Context()) tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
} }
ms.unsolvedMutex.Lock() ms.unsolvedMutex.Lock()

View File

@ -285,7 +285,7 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol
return status, fmt.Errorf("load partitions: %s", err) return status, fmt.Errorf("load partitions: %s", err)
} }
err = qs.watchDmChannels(dbID, collectionID) err = qs.watchDmChannels(ctx, dbID, collectionID)
if err != nil { if err != nil {
log.Error("LoadCollectionRequest failed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) log.Error("LoadCollectionRequest failed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return fn(err), err return fn(err), err
@ -516,7 +516,7 @@ func (qs *QueryService) LoadPartitions(ctx context.Context, req *querypb.LoadPar
} }
if watchNeeded { if watchNeeded {
err = qs.watchDmChannels(dbID, collectionID) err = qs.watchDmChannels(ctx, dbID, collectionID)
if err != nil { if err != nil {
log.Debug("LoadPartitionRequest completed", zap.Int64("msgID", req.Base.MsgID), zap.Int64s("partitionIDs", partitionIDs), zap.Error(err)) log.Debug("LoadPartitionRequest completed", zap.Int64("msgID", req.Base.MsgID), zap.Int64s("partitionIDs", partitionIDs), zap.Error(err))
return fn(err), err return fn(err), err
@ -681,8 +681,7 @@ func (qs *QueryService) SetDataService(dataService types.DataService) {
qs.dataServiceClient = dataService qs.dataServiceClient = dataService
} }
func (qs *QueryService) watchDmChannels(dbID UniqueID, collectionID UniqueID) error { func (qs *QueryService) watchDmChannels(ctx context.Context, dbID UniqueID, collectionID UniqueID) error {
ctx := context.TODO()
collection, err := qs.replica.getCollectionByID(0, collectionID) collection, err := qs.replica.getCollectionByID(0, collectionID)
if err != nil { if err != nil {
return err return err