diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index 7775884088..8838492db2 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -281,6 +281,12 @@ func (node *ProxyNode) sendChannelsTimeTickLoop() { case <-node.ctx.Done(): return case <-timer.C: + ts, err := node.tsoAllocator.AllocOne() + if err != nil { + log.Warn("Failed to get timestamp from tso", zap.Error(err)) + continue + } + stats, err := node.chTicker.getMinTsStatistics() if err != nil { log.Warn("sendChannelsTimeTickLoop.getMinTsStatistics", zap.Error(err)) @@ -290,10 +296,15 @@ func (node *ProxyNode) sendChannelsTimeTickLoop() { channels := make([]pChan, 0, len(stats)) tss := make([]Timestamp, 0, len(stats)) + maxTs := ts for channel, ts := range stats { channels = append(channels, channel) tss = append(tss, ts) + if ts > maxTs { + maxTs = ts + } } + log.Debug("send timestamp statistics of pchan", zap.Any("channels", channels), zap.Any("tss", tss)) req := &internalpb.ChannelTimeTickMsg{ @@ -303,8 +314,9 @@ func (node *ProxyNode) sendChannelsTimeTickLoop() { Timestamp: 0, // todo SourceID: node.session.ServerID, }, - ChannelNames: channels, - Timestamps: tss, + ChannelNames: channels, + Timestamps: tss, + DefaultTimestamp: maxTs, } status, err := node.masterService.UpdateChannelTimeTick(node.ctx, req)