Fix segment_num, stored_rows_num and stored_rows_count metrics (#20704)

TODO: better track segment with state other than flushed and dropped.

/kind bug

issue: #20146, #20145
Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
This commit is contained in:
Ten Thousand Leaves 2022-11-22 19:21:13 +08:00 committed by GitHub
parent d1cb380ee0
commit a8c7199a52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 115 additions and 55 deletions

View File

@ -249,7 +249,8 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
}
func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionResult) error {
oldSegments, modSegments, newSegment, err := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result)
// Also prepare metric updates.
oldSegments, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result)
if err != nil {
return err
}
@ -281,6 +282,8 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
zap.Int64("nodeID", nodeID), zap.String("reason", err.Error()))
return c.meta.revertAlterMetaStoreAfterCompaction(oldSegments, newSegment.SegmentInfo)
}
// Apply metrics after successful meta update.
metricMutation.commit()
c.meta.alterInMemoryMetaAfterCompaction(newSegment, modSegments)
log.Info("handleCompactionResult: success to handle merge compaction result")

View File

@ -55,6 +55,13 @@ type meta struct {
chunkManager storage.ChunkManager
}
// A local cache of segment metric update. Must call commit() to take effect.
type segMetricMutation struct {
stateChange map[string]int // segment state -> state change count (to increase or decrease).
rowCountChange int64 // Change in # of rows.
rowCountAccChange int64 // Total # of historical added rows, accumulated.
}
type collectionInfo struct {
ID int64
Schema *schemapb.CollectionSchema
@ -92,7 +99,7 @@ func (m *meta) reloadFromKV() error {
metrics.DataCoordNumSegments.WithLabelValues(metrics.GrowingSegmentLabel).Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel).Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushingSegmentLabel).Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.DroppedSegmentLabel).Set(0)
metrics.DataCoordNumStoredRows.WithLabelValues().Set(0)
numStoredRows := int64(0)
for _, segment := range segments {
@ -270,7 +277,7 @@ func (m *meta) DropSegment(segmentID UniqueID) error {
zap.Error(err))
return err
}
metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Inc()
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String()).Dec()
m.segments.DropSegment(segmentID)
log.Info("meta update: dropping segment - complete",
zap.Int64("segment ID", segmentID))
@ -322,8 +329,11 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
}
// Persist segment updates first.
clonedSegment := curSegInfo.Clone()
clonedSegment.State = targetState
oldState := curSegInfo.GetState()
metricMutation := &segMetricMutation{
stateChange: make(map[string]int),
}
// Update segment state and prepare segment metric update.
updateSegStateAndPrepareMetrics(clonedSegment, targetState, metricMutation)
if clonedSegment != nil && isSegmentHealthy(clonedSegment) {
if err := m.catalog.AlterSegment(m.ctx, clonedSegment.SegmentInfo, curSegInfo.SegmentInfo); err != nil {
log.Error("meta update: setting segment state - failed to alter segments",
@ -332,14 +342,8 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
zap.Error(err))
return err
}
metrics.DataCoordNumSegments.WithLabelValues(oldState.String()).Dec()
metrics.DataCoordNumSegments.WithLabelValues(targetState.String()).Inc()
if targetState == commonpb.SegmentState_Flushed {
metrics.DataCoordNumStoredRows.WithLabelValues().Add(float64(curSegInfo.GetNumOfRows()))
metrics.DataCoordNumStoredRowsCounter.WithLabelValues().Add(float64(curSegInfo.GetNumOfRows()))
} else if oldState == commonpb.SegmentState_Flushed {
metrics.DataCoordNumStoredRows.WithLabelValues().Sub(float64(curSegInfo.GetNumOfRows()))
}
// Apply segment metric update after successful meta update.
metricMutation.commit()
}
// Update in-memory meta.
m.segments.SetState(segmentID, targetState)
@ -415,14 +419,19 @@ func (m *meta) UpdateFlushSegmentsInfo(
return nil
}
metricMutation := &segMetricMutation{
stateChange: make(map[string]int),
}
clonedSegment := segment.Clone()
modSegments := make(map[UniqueID]*SegmentInfo)
if flushed {
clonedSegment.State = commonpb.SegmentState_Flushing
// Update segment state and prepare metrics.
updateSegStateAndPrepareMetrics(clonedSegment, commonpb.SegmentState_Flushing, metricMutation)
modSegments[segmentID] = clonedSegment
}
if dropped {
clonedSegment.State = commonpb.SegmentState_Dropped
// Update segment state and prepare metrics.
updateSegStateAndPrepareMetrics(clonedSegment, commonpb.SegmentState_Dropped, metricMutation)
clonedSegment.DroppedAt = uint64(time.Now().UnixNano())
modSegments[segmentID] = clonedSegment
}
@ -514,16 +523,8 @@ func (m *meta) UpdateFlushSegmentsInfo(
zap.Error(err))
return err
}
oldSegmentState := segment.GetState()
newSegmentState := clonedSegment.GetState()
metrics.DataCoordNumSegments.WithLabelValues(oldSegmentState.String()).Dec()
metrics.DataCoordNumSegments.WithLabelValues(newSegmentState.String()).Inc()
if newSegmentState == commonpb.SegmentState_Flushed {
metrics.DataCoordNumStoredRows.WithLabelValues().Add(float64(clonedSegment.GetNumOfRows()))
metrics.DataCoordNumStoredRowsCounter.WithLabelValues().Add(float64(clonedSegment.GetNumOfRows()))
} else if oldSegmentState == commonpb.SegmentState_Flushed {
metrics.DataCoordNumStoredRows.WithLabelValues().Sub(float64(segment.GetNumOfRows()))
}
// Apply metric mutation after a successful meta update.
metricMutation.commit()
// update memory status
for id, s := range modSegments {
m.segments.SetSegment(id, s)
@ -540,13 +541,17 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
zap.String("channel", channel))
m.Lock()
defer m.Unlock()
// Prepare segment metric mutation.
metricMutation := &segMetricMutation{
stateChange: make(map[string]int),
}
modSegments := make(map[UniqueID]*SegmentInfo)
originSegments := make(map[UniqueID]*SegmentInfo)
// save new segments flushed from buffer data
for _, seg2Drop := range segments {
segment := m.mergeDropSegment(seg2Drop)
var segment *SegmentInfo
segment, metricMutation = m.mergeDropSegment(seg2Drop)
if segment != nil {
originSegments[seg2Drop.GetID()] = seg2Drop
modSegments[seg2Drop.GetID()] = segment
}
}
@ -559,25 +564,17 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
// seg inf mod segments are all in dropped state
if !ok {
clonedSeg := seg.Clone()
clonedSeg.State = commonpb.SegmentState_Dropped
updateSegStateAndPrepareMetrics(clonedSeg, commonpb.SegmentState_Dropped, metricMutation)
modSegments[seg.ID] = clonedSeg
originSegments[seg.GetID()] = seg
}
}
err := m.batchSaveDropSegments(channel, modSegments)
if err == nil {
for _, seg := range originSegments {
state := seg.GetState()
metrics.DataCoordNumSegments.WithLabelValues(state.String()).Dec()
if state == commonpb.SegmentState_Flushed {
metrics.DataCoordNumStoredRows.WithLabelValues().Sub(float64(seg.GetNumOfRows()))
}
}
}
if err != nil {
log.Error("meta update: update drop channel segment info failed",
zap.String("channel", channel),
zap.Error(err))
// Apply segment metric mutation on successful meta update.
metricMutation.commit()
} else {
log.Info("meta update: update drop channel segment info - complete",
zap.String("channel", channel))
@ -586,17 +583,20 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
}
// mergeDropSegment merges drop segment information with meta segments
func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) *SegmentInfo {
func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) (*SegmentInfo, *segMetricMutation) {
segment := m.segments.GetSegment(seg2Drop.ID)
// healthy check makes sure the Idempotence
if segment == nil || !isSegmentHealthy(segment) {
log.Warn("UpdateDropChannel skipping nil or unhealthy", zap.Bool("is nil", segment == nil),
zap.Bool("isHealthy", isSegmentHealthy(segment)))
return nil
return nil, nil
}
metricMutation := &segMetricMutation{
stateChange: make(map[string]int),
}
clonedSegment := segment.Clone()
clonedSegment.State = commonpb.SegmentState_Dropped
updateSegStateAndPrepareMetrics(clonedSegment, commonpb.SegmentState_Dropped, metricMutation)
currBinlogs := clonedSegment.GetBinlogs()
@ -641,7 +641,8 @@ func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) *SegmentInfo {
clonedSegment.DmlPosition = seg2Drop.GetDmlPosition()
}
clonedSegment.currRows = seg2Drop.currRows
return clonedSegment
clonedSegment.NumOfRows = seg2Drop.currRows
return clonedSegment, metricMutation
}
// batchSaveDropSegments saves drop segments info with channel removal flag
@ -676,7 +677,6 @@ func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*Segm
m.segments.SetSegment(id, segment)
}
metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Add(float64(len(segments)))
return nil
}
@ -897,7 +897,8 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
// - the segment info of compactedFrom segments after compaction to alter
// - the segment info of compactedTo segment after compaction to add
// The compactedTo segment could contain 0 numRows
func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.CompactionSegmentBinlogs, result *datapb.CompactionResult) ([]*datapb.SegmentInfo, []*SegmentInfo, *SegmentInfo, error) {
func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.CompactionSegmentBinlogs,
result *datapb.CompactionResult) ([]*datapb.SegmentInfo, []*SegmentInfo, *SegmentInfo, *segMetricMutation, error) {
log.Info("meta update: prepare for complete compaction mutation")
m.Lock()
defer m.Unlock()
@ -907,12 +908,15 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac
modSegments = make([]*SegmentInfo, 0, len(compactionLogs))
)
metricMutation := &segMetricMutation{
stateChange: make(map[string]int),
}
for _, cl := range compactionLogs {
if segment := m.segments.GetSegment(cl.GetSegmentID()); segment != nil {
oldSegments = append(oldSegments, segment.Clone().SegmentInfo)
cloned := segment.Clone()
cloned.State = commonpb.SegmentState_Dropped
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
cloned.DroppedAt = uint64(time.Now().UnixNano())
modSegments = append(modSegments, cloned)
}
@ -945,7 +949,7 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac
newAddedDeltalogs := m.updateDeltalogs(originDeltalogs, deletedDeltalogs, nil)
copiedDeltalogs, err := m.copyDeltaFiles(newAddedDeltalogs, modSegments[0].CollectionID, modSegments[0].PartitionID, result.GetSegmentID())
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
deltalogs := append(result.GetDeltalogs(), copiedDeltalogs...)
@ -971,7 +975,7 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac
CompactionFrom: compactionFrom,
}
segment := NewSegmentInfo(segmentInfo)
metricMutation.addNewSeg(segment.GetState(), segment.GetNumOfRows())
log.Info("meta update: prepare for complete compaction mutation - complete",
zap.Int64("collection ID", segment.GetCollectionID()),
zap.Int64("partition ID", segment.GetPartitionID()),
@ -979,7 +983,7 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac
zap.Int64("new segment num of rows", segment.GetNumOfRows()),
zap.Any("compacted from", segment.GetCompactionFrom()))
return oldSegments, modSegments, segment, nil
return oldSegments, modSegments, segment, metricMutation, nil
}
func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, partitionID, targetSegmentID int64) ([]*datapb.FieldBinlog, error) {
@ -1216,3 +1220,51 @@ func (m *meta) DropChannelCheckpoint(vChannel string) error {
log.Debug("DropChannelCheckpoint done", zap.String("vChannel", vChannel))
return nil
}
// addNewSeg update metrics update for a new segment.
func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, rowCount int64) {
s.stateChange[state.String()]++
s.rowCountChange += rowCount
s.rowCountAccChange += rowCount
}
// commit persists all updates in current segMetricMutation, should and must be called AFTER segment state change
// has persisted in Etcd.
func (s *segMetricMutation) commit() {
for state, change := range s.stateChange {
metrics.DataCoordNumSegments.WithLabelValues(state).Add(float64(change))
}
metrics.DataCoordNumStoredRows.WithLabelValues().Add(float64(s.rowCountChange))
metrics.DataCoordNumStoredRowsCounter.WithLabelValues().Add(float64(s.rowCountAccChange))
}
// append updates current segMetricMutation when segment state change happens.
func (s *segMetricMutation) append(oldState, newState commonpb.SegmentState, rowCountUpdate int64) {
if oldState != newState {
s.stateChange[oldState.String()]--
s.stateChange[newState.String()]++
}
// Update # of rows on new flush operations and drop operations.
if isFlushState(newState) && !isFlushState(oldState) {
// If new flush.
s.rowCountChange += rowCountUpdate
s.rowCountAccChange += rowCountUpdate
} else if newState == commonpb.SegmentState_Dropped && oldState != newState {
// If new drop.
s.rowCountChange -= rowCountUpdate
}
}
func isFlushState(state commonpb.SegmentState) bool {
return state == commonpb.SegmentState_Flushing || state == commonpb.SegmentState_Flushed
}
// updateSegStateAndPrepareMetrics updates a segment's in-memory state and prepare for the corresponding metric update.
func updateSegStateAndPrepareMetrics(segToUpdate *SegmentInfo, targetState commonpb.SegmentState, metricMutation *segMetricMutation) {
log.Debug("updating segment state and updating metrics",
zap.String("old state", segToUpdate.GetState().String()),
zap.String("new state", targetState.String()),
zap.Int64("# of rows", segToUpdate.GetNumOfRows()))
metricMutation.append(segToUpdate.GetState(), targetState, segToUpdate.GetNumOfRows())
segToUpdate.State = targetState
}

View File

@ -745,6 +745,7 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")},
NumOfRows: 1,
}},
2: {SegmentInfo: &datapb.SegmentInfo{
ID: 2,
@ -754,6 +755,7 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3", "log4")},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog3", "statlog4")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog3", "deltalog4")},
NumOfRows: 1,
}},
},
}
@ -783,13 +785,16 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log5")},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog5")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog5")},
NumOfRows: 1,
NumOfRows: 2,
}
beforeCompact, afterCompact, newSegment, err := m.PrepareCompleteCompactionMutation(inCompactionLogs, inCompactionResult)
beforeCompact, afterCompact, newSegment, metricMutation, err := m.PrepareCompleteCompactionMutation(inCompactionLogs, inCompactionResult)
assert.Nil(t, err)
assert.NotNil(t, beforeCompact)
assert.NotNil(t, afterCompact)
assert.NotNil(t, newSegment)
assert.Equal(t, 3, len(metricMutation.stateChange))
assert.Equal(t, int64(0), metricMutation.rowCountChange)
assert.Equal(t, int64(2), metricMutation.rowCountAccChange)
require.Equal(t, 2, len(beforeCompact))
assert.Equal(t, commonpb.SegmentState_Flushed, beforeCompact[0].GetState())

View File

@ -142,7 +142,7 @@ var (
)
//RegisterDataCoord registers DataCoord metrics
// RegisterDataCoord registers DataCoord metrics
func RegisterDataCoord(registry *prometheus.Registry) {
registry.MustRegister(DataCoordNumDataNodes)
registry.MustRegister(DataCoordNumSegments)

View File

@ -17,7 +17,6 @@
package metrics
import (
// nolint:gosec
_ "net/http/pprof"
@ -49,11 +48,12 @@ const (
FailedIndexTaskLabel = "failed"
RecycledIndexTaskLabel = "recycled"
// Note: below must matchcommonpb.SegmentState_name fields.
SealedSegmentLabel = "Sealed"
GrowingSegmentLabel = "Growing"
FlushedSegmentLabel = "Flushed"
FlushingSegmentLabel = "Flushing"
DropedSegmentLabel = "Dropped"
DroppedSegmentLabel = "Dropped"
Leader = "OnLeader"
FromLeader = "FromLeader"
@ -81,7 +81,7 @@ var (
buckets = prometheus.ExponentialBuckets(1, 2, 18)
)
//Register serves prometheus http service
// Register serves prometheus http service
func Register(r *prometheus.Registry) {
management.Register(&management.HTTPHandler{
Path: "/metrics",