mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
Cherry-pick from 2.3 pr: #32454 See also #31506 #31508 --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
9f81290c63
commit
f31a20faad
@ -1363,6 +1363,28 @@ func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition)
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarkChannelCheckpointDropped set channel checkpoint to MaxUint64 preventing future update
|
||||
// and remove the metrics for channel checkpoint lag.
|
||||
func (m *meta) MarkChannelCheckpointDropped(ctx context.Context, channel string) error {
|
||||
m.channelCPs.Lock()
|
||||
defer m.channelCPs.Unlock()
|
||||
|
||||
cp := &msgpb.MsgPosition{
|
||||
ChannelName: channel,
|
||||
Timestamp: math.MaxUint64,
|
||||
}
|
||||
|
||||
err := m.catalog.SaveChannelCheckpoints(ctx, []*msgpb.MsgPosition{cp})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.channelCPs.checkpoints[channel] = cp
|
||||
|
||||
metrics.DataCoordCheckpointUnixSeconds.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), channel)
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateChannelCheckpoints updates and saves channel checkpoints.
|
||||
func (m *meta) UpdateChannelCheckpoints(positions []*msgpb.MsgPosition) error {
|
||||
m.channelCPs.Lock()
|
||||
|
@ -618,6 +618,8 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
|
||||
s.segmentManager.DropSegmentsOfChannel(ctx, channel)
|
||||
s.compactionHandler.removeTasksByChannel(channel)
|
||||
metrics.DataCoordCheckpointUnixSeconds.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), channel)
|
||||
s.meta.MarkChannelCheckpointDropped(ctx, channel)
|
||||
|
||||
// no compaction triggered in Drop procedure
|
||||
return resp, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user