From f31a20faadc131daac0a2cebafd1b1c017776ba1 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 21 May 2024 11:59:39 +0800 Subject: [PATCH] fix: [Backport] Mark channel checkpoint dropped prevent cp lag metrics leakage (#32454) (#33198) Cherry-pick from 2.3 pr: #32454 See also #31506 #31508 --------- Signed-off-by: Congqi Xia --- internal/datacoord/meta.go | 22 ++++++++++++++++++++++ internal/datacoord/services.go | 2 ++ 2 files changed, 24 insertions(+) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index ea8cbc4dd6..065802a634 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1363,6 +1363,28 @@ func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition) return nil } +// MarkChannelCheckpointDropped set channel checkpoint to MaxUint64 preventing future update +// and remove the metrics for channel checkpoint lag. +func (m *meta) MarkChannelCheckpointDropped(ctx context.Context, channel string) error { + m.channelCPs.Lock() + defer m.channelCPs.Unlock() + + cp := &msgpb.MsgPosition{ + ChannelName: channel, + Timestamp: math.MaxUint64, + } + + err := m.catalog.SaveChannelCheckpoints(ctx, []*msgpb.MsgPosition{cp}) + if err != nil { + return err + } + + m.channelCPs.checkpoints[channel] = cp + + metrics.DataCoordCheckpointUnixSeconds.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), channel) + return nil +} + // UpdateChannelCheckpoints updates and saves channel checkpoints. func (m *meta) UpdateChannelCheckpoints(positions []*msgpb.MsgPosition) error { m.channelCPs.Lock() diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 49ee83fda3..6a00585e8f 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -618,6 +618,8 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual s.segmentManager.DropSegmentsOfChannel(ctx, channel) s.compactionHandler.removeTasksByChannel(channel) metrics.DataCoordCheckpointUnixSeconds.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), channel) + s.meta.MarkChannelCheckpointDropped(ctx, channel) + // no compaction triggered in Drop procedure return resp, nil }