From c6e2900cfc2831c479397e37008fb7cf5cfca0df Mon Sep 17 00:00:00 2001 From: SimFG Date: Thu, 4 Jul 2024 10:18:09 +0800 Subject: [PATCH] enhance: add the tick log for the tt msgstream seek method (#34397) /kind improvement Signed-off-by: SimFG --- pkg/mq/msgstream/mq_msgstream.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index 40f9b21cf0..ccf1d6bd86 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -888,6 +888,9 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*MsgPosition, ms.consumerLock.Lock() defer ms.consumerLock.Unlock() + loopTick := time.NewTicker(5 * time.Second) + defer loopTick.Stop() + for idx := range msgPositions { mp = msgPositions[idx] if len(mp.MsgID) == 0 { @@ -903,16 +906,21 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*MsgPosition, // skip all data before current tt runLoop := true + loopMsgCnt := 0 + loopStarTime := time.Now() for runLoop { select { case <-ms.ctx.Done(): return ms.ctx.Err() case <-ctx.Done(): return ctx.Err() + case <-loopTick.C: + log.Info("seek loop tick", zap.Int("loopMsgCnt", loopMsgCnt), zap.String("channel", mp.ChannelName)) case msg, ok := <-consumer.Chan(): if !ok { return fmt.Errorf("consumer closed") } + loopMsgCnt++ consumer.Ack(msg) headerMsg := commonpb.MsgHeader{} @@ -926,6 +934,12 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*MsgPosition, } if tsMsg.Type() == commonpb.MsgType_TimeTick && tsMsg.BeginTs() >= mp.Timestamp { runLoop = false + if time.Since(loopStarTime) > 30*time.Second { + log.Info("seek loop finished long time", + zap.Int("loopMsgCnt", loopMsgCnt), + zap.String("channel", mp.ChannelName), + zap.Duration("cost", time.Since(loopStarTime))) + } } else if tsMsg.BeginTs() > mp.Timestamp { ctx, _ := ExtractCtx(tsMsg, msg.Properties()) tsMsg.SetTraceCtx(ctx)