enhance: Add sorted for segment info (#36469)

issue: #33744

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2024-09-30 10:01:16 +08:00 committed by GitHub
parent a78a6b33ab
commit ecb2b242e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 107 additions and 75 deletions

2
go.mod
View File

@ -22,7 +22,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497
github.com/minio/minio-go/v7 v7.0.61
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0

2
go.sum
View File

@ -602,6 +602,8 @@ github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.11.1 h1:5jiRP5j93CrgqcC20XVn68DX27htZdhedP1NyoIwkVg=
github.com/milvus-io/pulsar-client-go v0.11.1/go.mod h1:cipLojlpUzs3i3cDNrK2MdOVs4HWPD7MQsAoOUqWcec=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497 h1:t4sQMbSy05p8qgMGvEGyLYYLoZ9fD1dushS1bj5X6+0=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=

View File

@ -121,9 +121,9 @@ func newChannelCps() *channelCPs {
// A local cache of segment metric update. Must call commit() to take effect.
type segMetricMutation struct {
stateChange map[string]map[string]int // segment state, seg level -> state change count (to increase or decrease).
rowCountChange int64 // Change in # of rows.
rowCountAccChange int64 // Total # of historical added rows, accumulated.
stateChange map[string]map[string]map[string]int // segment state, seg level -> state -> isSorted change count (to increase or decrease).
rowCountChange int64 // Change in # of rows.
rowCountAccChange int64 // Total # of historical added rows, accumulated.
}
type collectionInfo struct {
@ -197,7 +197,7 @@ func (m *meta) reloadFromKV() error {
for _, segment := range segments {
// segments from catalog.ListSegments will not have logPath
m.segments.SetSegment(segment.ID, NewSegmentInfo(segment))
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Inc()
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String(), getSortStatus(segment.GetIsSorted())).Inc()
if segment.State == commonpb.SegmentState_Flushed {
numStoredRows += segment.NumOfRows
@ -511,7 +511,7 @@ func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error {
}
m.segments.SetSegment(segment.GetID(), segment)
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Inc()
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String(), getSortStatus(segment.GetIsSorted())).Inc()
log.Info("meta update: adding segment - complete", zap.Int64("segmentID", segment.GetID()))
return nil
}
@ -533,7 +533,7 @@ func (m *meta) DropSegment(segmentID UniqueID) error {
zap.Error(err))
return err
}
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Dec()
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String(), getSortStatus(segment.GetIsSorted())).Dec()
coll, ok := m.collections[segment.CollectionID]
if ok {
metrics.CleanupDataCoordSegmentMetrics(coll.DatabaseName, segment.CollectionID, segment.ID)
@ -637,7 +637,7 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
// Persist segment updates first.
clonedSegment := curSegInfo.Clone()
metricMutation := &segMetricMutation{
stateChange: make(map[string]map[string]int),
stateChange: make(map[string]map[string]map[string]int),
}
if clonedSegment != nil && isSegmentHealthy(clonedSegment) {
// Update segment state and prepare segment metric update.
@ -748,7 +748,7 @@ func CreateL0Operator(collectionID, partitionID, segmentID int64, channel string
Level: datapb.SegmentLevel_L0,
},
}
modPack.metricMutation.addNewSeg(commonpb.SegmentState_Flushed, datapb.SegmentLevel_L0, 0)
modPack.metricMutation.addNewSeg(commonpb.SegmentState_Flushed, datapb.SegmentLevel_L0, false, 0)
}
return true
}
@ -1016,7 +1016,7 @@ func (m *meta) UpdateSegmentsInfo(operators ...UpdateOperator) error {
segments: make(map[int64]*SegmentInfo),
increments: make(map[int64]metastore.BinlogsIncrement),
metricMutation: &segMetricMutation{
stateChange: make(map[string]map[string]int),
stateChange: make(map[string]map[string]map[string]int),
},
}
@ -1055,7 +1055,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
// Prepare segment metric mutation.
metricMutation := &segMetricMutation{
stateChange: make(map[string]map[string]int),
stateChange: make(map[string]map[string]map[string]int),
}
modSegments := make(map[UniqueID]*SegmentInfo)
// save new segments flushed from buffer data
@ -1096,7 +1096,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
// mergeDropSegment merges drop segment information with meta segments
func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) (*SegmentInfo, *segMetricMutation) {
metricMutation := &segMetricMutation{
stateChange: make(map[string]map[string]int),
stateChange: make(map[string]map[string]map[string]int),
}
segment := m.segments.GetSegment(seg2Drop.ID)
@ -1412,7 +1412,7 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
zap.Int64("partitionID", t.PartitionID),
zap.String("channel", t.GetChannel()))
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)}
compactFromSegIDs := make([]int64, 0)
compactToSegIDs := make([]int64, 0)
compactFromSegInfos := make([]*SegmentInfo, 0)
@ -1460,7 +1460,7 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
segment := NewSegmentInfo(segmentInfo)
compactToSegInfos = append(compactToSegInfos, segment)
compactToSegIDs = append(compactToSegIDs, segment.GetID())
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetIsSorted(), segment.GetNumOfRows())
}
log = log.With(zap.Int64s("compact from", compactFromSegIDs), zap.Int64s("compact to", compactToSegIDs))
@ -1504,7 +1504,7 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
zap.Int64("partitionID", t.PartitionID),
zap.String("channel", t.GetChannel()))
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)}
var compactFromSegIDs []int64
var compactFromSegInfos []*SegmentInfo
for _, segmentID := range t.GetInputSegments() {
@ -1558,7 +1558,7 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
// L1 segment with NumRows=0 will be discarded, so no need to change the metric
if compactToSegmentInfo.GetNumOfRows() > 0 {
// metrics mutation for compactTo segments
metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetNumOfRows())
metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetIsSorted(), compactToSegmentInfo.GetNumOfRows())
} else {
compactToSegmentInfo.State = commonpb.SegmentState_Dropped
}
@ -1812,11 +1812,15 @@ func (m *meta) GetEarliestStartPositionOfGrowingSegments(label *CompactionGroupL
}
// addNewSeg update metrics update for a new segment.
func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, level datapb.SegmentLevel, rowCount int64) {
func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, level datapb.SegmentLevel, isSorted bool, rowCount int64) {
if _, ok := s.stateChange[level.String()]; !ok {
s.stateChange[level.String()] = make(map[string]int)
s.stateChange[level.String()] = make(map[string]map[string]int)
}
s.stateChange[level.String()][state.String()] += 1
if _, ok := s.stateChange[level.String()][state.String()]; !ok {
s.stateChange[level.String()][state.String()] = make(map[string]int)
}
s.stateChange[level.String()][state.String()][getSortStatus(isSorted)] += 1
s.rowCountChange += rowCount
s.rowCountAccChange += rowCount
@ -1826,20 +1830,28 @@ func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, level datapb.
// has persisted in Etcd.
func (s *segMetricMutation) commit() {
for level, submap := range s.stateChange {
for state, change := range submap {
metrics.DataCoordNumSegments.WithLabelValues(state, level).Add(float64(change))
for state, sortedMap := range submap {
for sortedLabel, change := range sortedMap {
metrics.DataCoordNumSegments.WithLabelValues(state, level, sortedLabel).Add(float64(change))
}
}
}
}
// append updates current segMetricMutation when segment state change happens.
func (s *segMetricMutation) append(oldState, newState commonpb.SegmentState, level datapb.SegmentLevel, rowCountUpdate int64) {
func (s *segMetricMutation) append(oldState, newState commonpb.SegmentState, level datapb.SegmentLevel, isSorted bool, rowCountUpdate int64) {
if oldState != newState {
if _, ok := s.stateChange[level.String()]; !ok {
s.stateChange[level.String()] = make(map[string]int)
s.stateChange[level.String()] = make(map[string]map[string]int)
}
s.stateChange[level.String()][oldState.String()] -= 1
s.stateChange[level.String()][newState.String()] += 1
if _, ok := s.stateChange[level.String()][oldState.String()]; !ok {
s.stateChange[level.String()][oldState.String()] = make(map[string]int)
}
if _, ok := s.stateChange[level.String()][newState.String()]; !ok {
s.stateChange[level.String()][newState.String()] = make(map[string]int)
}
s.stateChange[level.String()][oldState.String()][getSortStatus(isSorted)] -= 1
s.stateChange[level.String()][newState.String()][getSortStatus(isSorted)] += 1
}
// Update # of rows on new flush operations and drop operations.
if isFlushState(newState) && !isFlushState(oldState) {
@ -1863,7 +1875,7 @@ func updateSegStateAndPrepareMetrics(segToUpdate *SegmentInfo, targetState commo
zap.String("old state", segToUpdate.GetState().String()),
zap.String("new state", targetState.String()),
zap.Int64("# of rows", segToUpdate.GetNumOfRows()))
metricMutation.append(segToUpdate.GetState(), targetState, segToUpdate.GetLevel(), segToUpdate.GetNumOfRows())
metricMutation.append(segToUpdate.GetState(), targetState, segToUpdate.GetLevel(), segToUpdate.GetIsSorted(), segToUpdate.GetNumOfRows())
segToUpdate.State = targetState
if targetState == commonpb.SegmentState_Dropped {
segToUpdate.DroppedAt = uint64(time.Now().UnixNano())
@ -1957,7 +1969,7 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
zap.Int64("old segmentID", oldSegmentID),
zap.Int64("target segmentID", result.GetSegmentID()))
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)}
oldSegment := m.segments.GetSegment(oldSegmentID)
if oldSegment == nil {
@ -1994,7 +2006,7 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
}
segment := NewSegmentInfo(segmentInfo)
if segment.GetNumOfRows() > 0 {
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetIsSorted(), segment.GetNumOfRows())
} else {
segment.State = commonpb.SegmentState_Dropped
}

View File

@ -124,7 +124,7 @@ func (suite *MetaReloadSuite) TestReloadFromKV() {
_, err := newMeta(ctx, suite.catalog, nil)
suite.NoError(err)
suite.MetricsEqual(metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel, datapb.SegmentLevel_Legacy.String()), 1)
suite.MetricsEqual(metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel, datapb.SegmentLevel_Legacy.String(), "unsorted"), 1)
})
}

View File

@ -912,6 +912,7 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
InsertChannel: segment.InsertChannel,
NumOfRows: rowCount,
Level: segment.GetLevel(),
IsSorted: segment.GetIsSorted(),
})
}

View File

@ -349,3 +349,10 @@ func createStorageConfig() *indexpb.StorageConfig {
return storageConfig
}
func getSortStatus(sorted bool) string {
if sorted {
return "sorted"
}
return "unsorted"
}

View File

@ -557,6 +557,7 @@ message SegmentInfo {
bool enable_index = 16;
bool is_fake = 17;
data.SegmentLevel level = 18;
bool is_sorted = 19;
}
message CollectionInfo {
@ -635,6 +636,7 @@ message SegmentVersionInfo {
uint64 last_delta_timestamp = 6;
map<int64, FieldIndexInfo> index_info = 7;
data.SegmentLevel level = 8;
bool is_sorted = 9;
}
message ChannelVersionInfo {

View File

@ -4111,6 +4111,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G
NumRows: info.NumOfRows,
State: info.State,
Level: commonpb.SegmentLevel(info.Level),
IsSorted: info.GetIsSorted(),
}
}
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
@ -4184,6 +4185,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue
State: info.SegmentState,
NodeIds: info.NodeIds,
Level: commonpb.SegmentLevel(info.Level),
IsSorted: info.GetIsSorted(),
}
}

View File

@ -146,6 +146,7 @@ func (dh *distHandler) updateSegmentsDistribution(resp *querypb.GetDataDistribut
PartitionID: s.GetPartition(),
InsertChannel: s.GetChannel(),
Level: s.GetLevel(),
IsSorted: s.GetIsSorted(),
}
}
updates = append(updates, &meta.Segment{

View File

@ -45,6 +45,7 @@ func MergeMetaSegmentIntoSegmentInfo(info *querypb.SegmentInfo, segments ...*met
SegmentState: commonpb.SegmentState_Sealed,
IndexInfos: make([]*querypb.FieldIndexInfo, 0),
Level: first.Level,
IsSorted: first.GetIsSorted(),
}
for _, indexInfo := range first.IndexInfo {
info.IndexName = indexInfo.IndexName

View File

@ -17,8 +17,6 @@ import (
querypb "github.com/milvus-io/milvus/internal/proto/querypb"
schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
segcorepb "github.com/milvus-io/milvus/internal/proto/segcorepb"
storage "github.com/milvus-io/milvus/internal/storage"
@ -589,6 +587,47 @@ func (_c *MockSegment_IsLazyLoad_Call) RunAndReturn(run func() bool) *MockSegmen
return _c
}
// IsSorted provides a mock function with given fields:
func (_m *MockSegment) IsSorted() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockSegment_IsSorted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsSorted'
type MockSegment_IsSorted_Call struct {
*mock.Call
}
// IsSorted is a helper method to define mock.On call
func (_e *MockSegment_Expecter) IsSorted() *MockSegment_IsSorted_Call {
return &MockSegment_IsSorted_Call{Call: _e.mock.On("IsSorted")}
}
func (_c *MockSegment_IsSorted_Call) Run(run func()) *MockSegment_IsSorted_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSegment_IsSorted_Call) Return(_a0 bool) *MockSegment_IsSorted_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_IsSorted_Call) RunAndReturn(run func() bool) *MockSegment_IsSorted_Call {
_c.Call.Return(run)
return _c
}
// LastDeltaTimestamp provides a mock function with given fields:
func (_m *MockSegment) LastDeltaTimestamp() uint64 {
ret := _m.Called()
@ -714,49 +753,6 @@ func (_c *MockSegment_LoadDeltaData_Call) RunAndReturn(run func(context.Context,
return _c
}
// LoadDeltaData2 provides a mock function with given fields: ctx, schema
func (_m *MockSegment) LoadDeltaData2(ctx context.Context, schema *schemapb.CollectionSchema) error {
ret := _m.Called(ctx, schema)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *schemapb.CollectionSchema) error); ok {
r0 = rf(ctx, schema)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockSegment_LoadDeltaData2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadDeltaData2'
type MockSegment_LoadDeltaData2_Call struct {
*mock.Call
}
// LoadDeltaData2 is a helper method to define mock.On call
// - ctx context.Context
// - schema *schemapb.CollectionSchema
func (_e *MockSegment_Expecter) LoadDeltaData2(ctx interface{}, schema interface{}) *MockSegment_LoadDeltaData2_Call {
return &MockSegment_LoadDeltaData2_Call{Call: _e.mock.On("LoadDeltaData2", ctx, schema)}
}
func (_c *MockSegment_LoadDeltaData2_Call) Run(run func(ctx context.Context, schema *schemapb.CollectionSchema)) *MockSegment_LoadDeltaData2_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*schemapb.CollectionSchema))
})
return _c
}
func (_c *MockSegment_LoadDeltaData2_Call) Return(_a0 error) *MockSegment_LoadDeltaData2_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_LoadDeltaData2_Call) RunAndReturn(run func(context.Context, *schemapb.CollectionSchema) error) *MockSegment_LoadDeltaData2_Call {
_c.Call.Return(run)
return _c
}
// LoadInfo provides a mock function with given fields:
func (_m *MockSegment) LoadInfo() *querypb.SegmentLoadInfo {
ret := _m.Called()

View File

@ -161,6 +161,10 @@ func (s *baseSegment) Level() datapb.SegmentLevel {
return s.loadInfo.Load().GetLevel()
}
func (s *baseSegment) IsSorted() bool {
return s.loadInfo.Load().GetIsSorted()
}
func (s *baseSegment) StartPosition() *msgpb.MsgPosition {
return s.loadInfo.Load().GetStartPosition()
}

View File

@ -53,6 +53,7 @@ type Segment interface {
StartPosition() *msgpb.MsgPosition
Type() SegmentType
Level() datapb.SegmentLevel
IsSorted() bool
LoadInfo() *querypb.SegmentLoadInfo
// PinIfNotReleased the segment to prevent it from being released
PinIfNotReleased() error

View File

@ -1181,6 +1181,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
Channel: s.Shard().VirtualName(),
Version: s.Version(),
Level: s.Level(),
IsSorted: s.IsSorted(),
LastDeltaTimestamp: s.LastDeltaTimestamp(),
IndexInfo: lo.SliceToMap(s.Indexes(), func(info *segments.IndexedFieldInfo) (int64, *querypb.FieldIndexInfo) {
return info.IndexInfo.FieldID, info.IndexInfo

View File

@ -51,6 +51,7 @@ var (
}, []string{
segmentStateLabelName,
segmentLevelLabelName,
segmentIsSortedLabelName,
})
// DataCoordCollectionNum records the num of collections managed by DataCoord.

View File

@ -99,6 +99,7 @@ const (
segmentStateLabelName = "segment_state"
segmentIDLabelName = "segment_id"
segmentLevelLabelName = "segment_level"
segmentIsSortedLabelName = "segment_is_sorted"
usernameLabelName = "username"
roleNameLabelName = "role_name"
cacheNameLabelName = "cache_name"