From ecb2b242e2859114428c9b1605bfe9891dd346e8 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 30 Sep 2024 10:01:16 +0800 Subject: [PATCH] enhance: Add sorted for segment info (#36469) issue: #33744 Signed-off-by: Cai Zhang --- go.mod | 2 +- go.sum | 2 + internal/datacoord/meta.go | 66 ++++++++------ internal/datacoord/meta_test.go | 2 +- internal/datacoord/services.go | 1 + internal/datacoord/util.go | 7 ++ internal/proto/query_coord.proto | 4 +- internal/proxy/impl.go | 2 + internal/querycoordv2/dist/dist_handler.go | 1 + internal/querycoordv2/utils/types.go | 1 + internal/querynodev2/segments/mock_segment.go | 86 +++++++++---------- internal/querynodev2/segments/segment.go | 4 + .../querynodev2/segments/segment_interface.go | 1 + internal/querynodev2/services.go | 1 + pkg/metrics/datacoord_metrics.go | 1 + pkg/metrics/metrics.go | 1 + 16 files changed, 107 insertions(+), 75 deletions(-) diff --git a/go.mod b/go.mod index fe7154ec2b..c9eacfcb45 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.7 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497 github.com/minio/minio-go/v7 v7.0.61 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index ef6c2cab13..b68189bb4e 100644 --- a/go.sum +++ b/go.sum @@ -602,6 +602,8 @@ github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.11.1 h1:5jiRP5j93CrgqcC20XVn68DX27htZdhedP1NyoIwkVg= github.com/milvus-io/pulsar-client-go v0.11.1/go.mod h1:cipLojlpUzs3i3cDNrK2MdOVs4HWPD7MQsAoOUqWcec= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497 h1:t4sQMbSy05p8qgMGvEGyLYYLoZ9fD1dushS1bj5X6+0= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index b113fb82bf..f0eb451f48 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -121,9 +121,9 @@ func newChannelCps() *channelCPs { // A local cache of segment metric update. Must call commit() to take effect. type segMetricMutation struct { - stateChange map[string]map[string]int // segment state, seg level -> state change count (to increase or decrease). - rowCountChange int64 // Change in # of rows. - rowCountAccChange int64 // Total # of historical added rows, accumulated. + stateChange map[string]map[string]map[string]int // segment state, seg level -> state -> isSorted change count (to increase or decrease). + rowCountChange int64 // Change in # of rows. + rowCountAccChange int64 // Total # of historical added rows, accumulated. } type collectionInfo struct { @@ -197,7 +197,7 @@ func (m *meta) reloadFromKV() error { for _, segment := range segments { // segments from catalog.ListSegments will not have logPath m.segments.SetSegment(segment.ID, NewSegmentInfo(segment)) - metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Inc() + metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String(), getSortStatus(segment.GetIsSorted())).Inc() if segment.State == commonpb.SegmentState_Flushed { numStoredRows += segment.NumOfRows @@ -511,7 +511,7 @@ func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error { } m.segments.SetSegment(segment.GetID(), segment) - metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Inc() + metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String(), getSortStatus(segment.GetIsSorted())).Inc() log.Info("meta update: adding segment - complete", zap.Int64("segmentID", segment.GetID())) return nil } @@ -533,7 +533,7 @@ func (m *meta) DropSegment(segmentID UniqueID) error { zap.Error(err)) return err } - metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Dec() + metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String(), getSortStatus(segment.GetIsSorted())).Dec() coll, ok := m.collections[segment.CollectionID] if ok { metrics.CleanupDataCoordSegmentMetrics(coll.DatabaseName, segment.CollectionID, segment.ID) @@ -637,7 +637,7 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e // Persist segment updates first. clonedSegment := curSegInfo.Clone() metricMutation := &segMetricMutation{ - stateChange: make(map[string]map[string]int), + stateChange: make(map[string]map[string]map[string]int), } if clonedSegment != nil && isSegmentHealthy(clonedSegment) { // Update segment state and prepare segment metric update. @@ -748,7 +748,7 @@ func CreateL0Operator(collectionID, partitionID, segmentID int64, channel string Level: datapb.SegmentLevel_L0, }, } - modPack.metricMutation.addNewSeg(commonpb.SegmentState_Flushed, datapb.SegmentLevel_L0, 0) + modPack.metricMutation.addNewSeg(commonpb.SegmentState_Flushed, datapb.SegmentLevel_L0, false, 0) } return true } @@ -1016,7 +1016,7 @@ func (m *meta) UpdateSegmentsInfo(operators ...UpdateOperator) error { segments: make(map[int64]*SegmentInfo), increments: make(map[int64]metastore.BinlogsIncrement), metricMutation: &segMetricMutation{ - stateChange: make(map[string]map[string]int), + stateChange: make(map[string]map[string]map[string]int), }, } @@ -1055,7 +1055,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI // Prepare segment metric mutation. metricMutation := &segMetricMutation{ - stateChange: make(map[string]map[string]int), + stateChange: make(map[string]map[string]map[string]int), } modSegments := make(map[UniqueID]*SegmentInfo) // save new segments flushed from buffer data @@ -1096,7 +1096,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI // mergeDropSegment merges drop segment information with meta segments func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) (*SegmentInfo, *segMetricMutation) { metricMutation := &segMetricMutation{ - stateChange: make(map[string]map[string]int), + stateChange: make(map[string]map[string]map[string]int), } segment := m.segments.GetSegment(seg2Drop.ID) @@ -1412,7 +1412,7 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul zap.Int64("partitionID", t.PartitionID), zap.String("channel", t.GetChannel())) - metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)} + metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)} compactFromSegIDs := make([]int64, 0) compactToSegIDs := make([]int64, 0) compactFromSegInfos := make([]*SegmentInfo, 0) @@ -1460,7 +1460,7 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul segment := NewSegmentInfo(segmentInfo) compactToSegInfos = append(compactToSegInfos, segment) compactToSegIDs = append(compactToSegIDs, segment.GetID()) - metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows()) + metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetIsSorted(), segment.GetNumOfRows()) } log = log.With(zap.Int64s("compact from", compactFromSegIDs), zap.Int64s("compact to", compactToSegIDs)) @@ -1504,7 +1504,7 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d zap.Int64("partitionID", t.PartitionID), zap.String("channel", t.GetChannel())) - metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)} + metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)} var compactFromSegIDs []int64 var compactFromSegInfos []*SegmentInfo for _, segmentID := range t.GetInputSegments() { @@ -1558,7 +1558,7 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d // L1 segment with NumRows=0 will be discarded, so no need to change the metric if compactToSegmentInfo.GetNumOfRows() > 0 { // metrics mutation for compactTo segments - metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetNumOfRows()) + metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetIsSorted(), compactToSegmentInfo.GetNumOfRows()) } else { compactToSegmentInfo.State = commonpb.SegmentState_Dropped } @@ -1812,11 +1812,15 @@ func (m *meta) GetEarliestStartPositionOfGrowingSegments(label *CompactionGroupL } // addNewSeg update metrics update for a new segment. -func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, level datapb.SegmentLevel, rowCount int64) { +func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, level datapb.SegmentLevel, isSorted bool, rowCount int64) { if _, ok := s.stateChange[level.String()]; !ok { - s.stateChange[level.String()] = make(map[string]int) + s.stateChange[level.String()] = make(map[string]map[string]int) } - s.stateChange[level.String()][state.String()] += 1 + if _, ok := s.stateChange[level.String()][state.String()]; !ok { + s.stateChange[level.String()][state.String()] = make(map[string]int) + + } + s.stateChange[level.String()][state.String()][getSortStatus(isSorted)] += 1 s.rowCountChange += rowCount s.rowCountAccChange += rowCount @@ -1826,20 +1830,28 @@ func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, level datapb. // has persisted in Etcd. func (s *segMetricMutation) commit() { for level, submap := range s.stateChange { - for state, change := range submap { - metrics.DataCoordNumSegments.WithLabelValues(state, level).Add(float64(change)) + for state, sortedMap := range submap { + for sortedLabel, change := range sortedMap { + metrics.DataCoordNumSegments.WithLabelValues(state, level, sortedLabel).Add(float64(change)) + } } } } // append updates current segMetricMutation when segment state change happens. -func (s *segMetricMutation) append(oldState, newState commonpb.SegmentState, level datapb.SegmentLevel, rowCountUpdate int64) { +func (s *segMetricMutation) append(oldState, newState commonpb.SegmentState, level datapb.SegmentLevel, isSorted bool, rowCountUpdate int64) { if oldState != newState { if _, ok := s.stateChange[level.String()]; !ok { - s.stateChange[level.String()] = make(map[string]int) + s.stateChange[level.String()] = make(map[string]map[string]int) } - s.stateChange[level.String()][oldState.String()] -= 1 - s.stateChange[level.String()][newState.String()] += 1 + if _, ok := s.stateChange[level.String()][oldState.String()]; !ok { + s.stateChange[level.String()][oldState.String()] = make(map[string]int) + } + if _, ok := s.stateChange[level.String()][newState.String()]; !ok { + s.stateChange[level.String()][newState.String()] = make(map[string]int) + } + s.stateChange[level.String()][oldState.String()][getSortStatus(isSorted)] -= 1 + s.stateChange[level.String()][newState.String()][getSortStatus(isSorted)] += 1 } // Update # of rows on new flush operations and drop operations. if isFlushState(newState) && !isFlushState(oldState) { @@ -1863,7 +1875,7 @@ func updateSegStateAndPrepareMetrics(segToUpdate *SegmentInfo, targetState commo zap.String("old state", segToUpdate.GetState().String()), zap.String("new state", targetState.String()), zap.Int64("# of rows", segToUpdate.GetNumOfRows())) - metricMutation.append(segToUpdate.GetState(), targetState, segToUpdate.GetLevel(), segToUpdate.GetNumOfRows()) + metricMutation.append(segToUpdate.GetState(), targetState, segToUpdate.GetLevel(), segToUpdate.GetIsSorted(), segToUpdate.GetNumOfRows()) segToUpdate.State = targetState if targetState == commonpb.SegmentState_Dropped { segToUpdate.DroppedAt = uint64(time.Now().UnixNano()) @@ -1957,7 +1969,7 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats zap.Int64("old segmentID", oldSegmentID), zap.Int64("target segmentID", result.GetSegmentID())) - metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)} + metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)} oldSegment := m.segments.GetSegment(oldSegmentID) if oldSegment == nil { @@ -1994,7 +2006,7 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats } segment := NewSegmentInfo(segmentInfo) if segment.GetNumOfRows() > 0 { - metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows()) + metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetIsSorted(), segment.GetNumOfRows()) } else { segment.State = commonpb.SegmentState_Dropped } diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index e89221eb18..8492173123 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -124,7 +124,7 @@ func (suite *MetaReloadSuite) TestReloadFromKV() { _, err := newMeta(ctx, suite.catalog, nil) suite.NoError(err) - suite.MetricsEqual(metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel, datapb.SegmentLevel_Legacy.String()), 1) + suite.MetricsEqual(metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel, datapb.SegmentLevel_Legacy.String(), "unsorted"), 1) }) } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index da0bdaca1b..b8552b3d42 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -912,6 +912,7 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI InsertChannel: segment.InsertChannel, NumOfRows: rowCount, Level: segment.GetLevel(), + IsSorted: segment.GetIsSorted(), }) } diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index de7ce5a571..1712a9c5d6 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -349,3 +349,10 @@ func createStorageConfig() *indexpb.StorageConfig { return storageConfig } + +func getSortStatus(sorted bool) string { + if sorted { + return "sorted" + } + return "unsorted" +} diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index 51220841b2..93e5f09719 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -557,6 +557,7 @@ message SegmentInfo { bool enable_index = 16; bool is_fake = 17; data.SegmentLevel level = 18; + bool is_sorted = 19; } message CollectionInfo { @@ -634,7 +635,8 @@ message SegmentVersionInfo { int64 version = 5; uint64 last_delta_timestamp = 6; map index_info = 7; - data.SegmentLevel level = 8; + data.SegmentLevel level = 8; + bool is_sorted = 9; } message ChannelVersionInfo { diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 71579c6889..1969848742 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -4111,6 +4111,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G NumRows: info.NumOfRows, State: info.State, Level: commonpb.SegmentLevel(info.Level), + IsSorted: info.GetIsSorted(), } } metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, @@ -4184,6 +4185,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue State: info.SegmentState, NodeIds: info.NodeIds, Level: commonpb.SegmentLevel(info.Level), + IsSorted: info.GetIsSorted(), } } diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index c06ed5328f..57b2b44ee7 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -146,6 +146,7 @@ func (dh *distHandler) updateSegmentsDistribution(resp *querypb.GetDataDistribut PartitionID: s.GetPartition(), InsertChannel: s.GetChannel(), Level: s.GetLevel(), + IsSorted: s.GetIsSorted(), } } updates = append(updates, &meta.Segment{ diff --git a/internal/querycoordv2/utils/types.go b/internal/querycoordv2/utils/types.go index f77e5cca69..697ee46bc5 100644 --- a/internal/querycoordv2/utils/types.go +++ b/internal/querycoordv2/utils/types.go @@ -45,6 +45,7 @@ func MergeMetaSegmentIntoSegmentInfo(info *querypb.SegmentInfo, segments ...*met SegmentState: commonpb.SegmentState_Sealed, IndexInfos: make([]*querypb.FieldIndexInfo, 0), Level: first.Level, + IsSorted: first.GetIsSorted(), } for _, indexInfo := range first.IndexInfo { info.IndexName = indexInfo.IndexName diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index 3f470d2206..a0d3df97ba 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -17,8 +17,6 @@ import ( querypb "github.com/milvus-io/milvus/internal/proto/querypb" - schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - segcorepb "github.com/milvus-io/milvus/internal/proto/segcorepb" storage "github.com/milvus-io/milvus/internal/storage" @@ -589,6 +587,47 @@ func (_c *MockSegment_IsLazyLoad_Call) RunAndReturn(run func() bool) *MockSegmen return _c } +// IsSorted provides a mock function with given fields: +func (_m *MockSegment) IsSorted() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockSegment_IsSorted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsSorted' +type MockSegment_IsSorted_Call struct { + *mock.Call +} + +// IsSorted is a helper method to define mock.On call +func (_e *MockSegment_Expecter) IsSorted() *MockSegment_IsSorted_Call { + return &MockSegment_IsSorted_Call{Call: _e.mock.On("IsSorted")} +} + +func (_c *MockSegment_IsSorted_Call) Run(run func()) *MockSegment_IsSorted_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSegment_IsSorted_Call) Return(_a0 bool) *MockSegment_IsSorted_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegment_IsSorted_Call) RunAndReturn(run func() bool) *MockSegment_IsSorted_Call { + _c.Call.Return(run) + return _c +} + // LastDeltaTimestamp provides a mock function with given fields: func (_m *MockSegment) LastDeltaTimestamp() uint64 { ret := _m.Called() @@ -714,49 +753,6 @@ func (_c *MockSegment_LoadDeltaData_Call) RunAndReturn(run func(context.Context, return _c } -// LoadDeltaData2 provides a mock function with given fields: ctx, schema -func (_m *MockSegment) LoadDeltaData2(ctx context.Context, schema *schemapb.CollectionSchema) error { - ret := _m.Called(ctx, schema) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *schemapb.CollectionSchema) error); ok { - r0 = rf(ctx, schema) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockSegment_LoadDeltaData2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadDeltaData2' -type MockSegment_LoadDeltaData2_Call struct { - *mock.Call -} - -// LoadDeltaData2 is a helper method to define mock.On call -// - ctx context.Context -// - schema *schemapb.CollectionSchema -func (_e *MockSegment_Expecter) LoadDeltaData2(ctx interface{}, schema interface{}) *MockSegment_LoadDeltaData2_Call { - return &MockSegment_LoadDeltaData2_Call{Call: _e.mock.On("LoadDeltaData2", ctx, schema)} -} - -func (_c *MockSegment_LoadDeltaData2_Call) Run(run func(ctx context.Context, schema *schemapb.CollectionSchema)) *MockSegment_LoadDeltaData2_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*schemapb.CollectionSchema)) - }) - return _c -} - -func (_c *MockSegment_LoadDeltaData2_Call) Return(_a0 error) *MockSegment_LoadDeltaData2_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockSegment_LoadDeltaData2_Call) RunAndReturn(run func(context.Context, *schemapb.CollectionSchema) error) *MockSegment_LoadDeltaData2_Call { - _c.Call.Return(run) - return _c -} - // LoadInfo provides a mock function with given fields: func (_m *MockSegment) LoadInfo() *querypb.SegmentLoadInfo { ret := _m.Called() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 8d0c3d7ab8..d5cedc7fa4 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -161,6 +161,10 @@ func (s *baseSegment) Level() datapb.SegmentLevel { return s.loadInfo.Load().GetLevel() } +func (s *baseSegment) IsSorted() bool { + return s.loadInfo.Load().GetIsSorted() +} + func (s *baseSegment) StartPosition() *msgpb.MsgPosition { return s.loadInfo.Load().GetStartPosition() } diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index 164395b206..9ad7ef219a 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -53,6 +53,7 @@ type Segment interface { StartPosition() *msgpb.MsgPosition Type() SegmentType Level() datapb.SegmentLevel + IsSorted() bool LoadInfo() *querypb.SegmentLoadInfo // PinIfNotReleased the segment to prevent it from being released PinIfNotReleased() error diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 8631ee94a8..883a42ecb8 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1181,6 +1181,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get Channel: s.Shard().VirtualName(), Version: s.Version(), Level: s.Level(), + IsSorted: s.IsSorted(), LastDeltaTimestamp: s.LastDeltaTimestamp(), IndexInfo: lo.SliceToMap(s.Indexes(), func(info *segments.IndexedFieldInfo) (int64, *querypb.FieldIndexInfo) { return info.IndexInfo.FieldID, info.IndexInfo diff --git a/pkg/metrics/datacoord_metrics.go b/pkg/metrics/datacoord_metrics.go index 274256b353..cc79d8991a 100644 --- a/pkg/metrics/datacoord_metrics.go +++ b/pkg/metrics/datacoord_metrics.go @@ -51,6 +51,7 @@ var ( }, []string{ segmentStateLabelName, segmentLevelLabelName, + segmentIsSortedLabelName, }) // DataCoordCollectionNum records the num of collections managed by DataCoord. diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 50d5c23778..dacdd709b4 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -99,6 +99,7 @@ const ( segmentStateLabelName = "segment_state" segmentIDLabelName = "segment_id" segmentLevelLabelName = "segment_level" + segmentIsSortedLabelName = "segment_is_sorted" usernameLabelName = "username" roleNameLabelName = "role_name" cacheNameLabelName = "cache_name"