diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 0a33c18764..8853f05cb3 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -42,9 +42,11 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/indexparams" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/timerecord" + "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -1068,3 +1070,72 @@ func (m *indexMeta) TaskStatsJSON() string { } return string(ret) } + +func (m *indexMeta) GetIndexJSON(collectionID int64) string { + m.RLock() + defer m.RUnlock() + + var indexMetrics []*metricsinfo.Index + for collID, indexes := range m.indexes { + for _, index := range indexes { + if collectionID == 0 || collID == collectionID { + im := &metricsinfo.Index{ + CollectionID: collID, + IndexID: index.IndexID, + FieldID: index.FieldID, + Name: index.IndexName, + IsDeleted: index.IsDeleted, + CreateTime: tsoutil.PhysicalTimeFormat(index.CreateTime), + IndexParams: funcutil.KeyValuePair2Map(index.IndexParams), + IsAutoIndex: index.IsAutoIndex, + UserIndexParams: funcutil.KeyValuePair2Map(index.UserIndexParams), + } + indexMetrics = append(indexMetrics, im) + } + } + } + + ret, err := json.Marshal(indexMetrics) + if err != nil { + return "" + } + return string(ret) +} + +func (m *indexMeta) GetSegmentIndexedFields(collectionID UniqueID, segmentID UniqueID) (bool, []*metricsinfo.IndexedField) { + m.RLock() + defer m.RUnlock() + fieldIndexes, ok := m.indexes[collectionID] + if !ok { + // the segment should be unindexed status if the collection has no indexes + return false, []*metricsinfo.IndexedField{} + } + + // the segment should be unindexed status if the segment indexes is not found + segIndexInfos, ok := m.segmentIndexes[segmentID] + if !ok || len(segIndexInfos) == 0 { + return false, []*metricsinfo.IndexedField{} + } + + isIndexed := true + var segmentIndexes []*metricsinfo.IndexedField + for _, index := range fieldIndexes { + if si, ok := segIndexInfos[index.IndexID]; !index.IsDeleted { + buildID := int64(-1) + if !ok { + // the segment should be unindexed status if the segment index is not found within field indexes + isIndexed = false + } else { + buildID = si.BuildID + } + + segmentIndexes = append(segmentIndexes, &metricsinfo.IndexedField{ + IndexFieldID: index.IndexID, + IndexID: index.IndexID, + BuildID: buildID, + IndexSize: int64(si.IndexSize), + }) + } + } + return isIndexed, segmentIndexes +} diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index 991100a240..d56a55315e 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -1567,3 +1567,113 @@ func TestBuildIndexTaskStatsJSON(t *testing.T) { im.segmentBuildInfo.Remove(si1.BuildID) assert.Equal(t, 1, len(im.segmentBuildInfo.List())) } + +func TestMeta_GetIndexJSON(t *testing.T) { + m := &indexMeta{ + indexes: map[UniqueID]map[UniqueID]*model.Index{ + 1: { + 1: &model.Index{ + CollectionID: 1, + FieldID: 1, + IndexID: 1, + IndexName: "index1", + IsDeleted: false, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "param1", + Value: "value1", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "param1", + Value: "value1", + }, + }, + IsAutoIndex: true, + UserIndexParams: []*commonpb.KeyValuePair{ + { + Key: "param1", + Value: "value1", + }, + }, + }, + }, + }, + } + + actualJSON := m.GetIndexJSON(0) + var actualIndex []*metricsinfo.Index + err := json.Unmarshal([]byte(actualJSON), &actualIndex) + assert.NoError(t, err) + assert.Equal(t, int64(1), actualIndex[0].CollectionID) + assert.Equal(t, int64(1), actualIndex[0].FieldID) + assert.Equal(t, int64(1), actualIndex[0].IndexID) + assert.Equal(t, map[string]string{"param1": "value1"}, actualIndex[0].IndexParams) + assert.Equal(t, map[string]string{"param1": "value1"}, actualIndex[0].UserIndexParams) +} + +func TestMeta_GetSegmentIndexStatus(t *testing.T) { + var ( + collID = UniqueID(1) + partID = UniqueID(2) + indexID = UniqueID(10) + fieldID = UniqueID(100) + segID = UniqueID(1000) + buildID = UniqueID(10000) + ) + + m := &indexMeta{} + m.indexes = map[UniqueID]map[UniqueID]*model.Index{ + collID: { + indexID: { + CollectionID: collID, + FieldID: fieldID, + IndexID: indexID, + IndexName: "test_index", + IsDeleted: false, + }, + }, + } + m.segmentIndexes = map[UniqueID]map[UniqueID]*model.SegmentIndex{ + segID: { + indexID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 10250, + IndexID: indexID, + BuildID: buildID, + NodeID: 1, + IndexVersion: 0, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreatedUTCTime: 12, + IndexFileKeys: nil, + IndexSize: 0, + }, + }, + segID + 1: {}, + } + + t.Run("index exists", func(t *testing.T) { + isIndexed, segmentIndexes := m.GetSegmentIndexedFields(collID, segID) + assert.True(t, isIndexed) + assert.Len(t, segmentIndexes, 1) + assert.Equal(t, indexID, segmentIndexes[0].IndexID) + assert.Equal(t, buildID, segmentIndexes[0].BuildID) + }) + + t.Run("index does not exist", func(t *testing.T) { + isIndexed, segmentIndexes := m.GetSegmentIndexedFields(collID+1, segID) + assert.False(t, isIndexed) + assert.Empty(t, segmentIndexes) + }) + + t.Run("segment does not exist", func(t *testing.T) { + isIndexed, segmentIndexes := m.GetSegmentIndexedFields(collID, segID+1) + assert.False(t, isIndexed) + assert.Empty(t, segmentIndexes) + }) +} diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 6384792f6c..d554f07aa5 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -2046,26 +2046,28 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats return metricMutation, nil } -func (m *meta) getSegmentsMetrics() []*metricsinfo.Segment { +func (m *meta) getSegmentsMetrics(collectionID int64) []*metricsinfo.Segment { m.RLock() defer m.RUnlock() segments := make([]*metricsinfo.Segment, 0, len(m.segments.segments)) for _, s := range m.segments.segments { - segments = append(segments, &metricsinfo.Segment{ - SegmentID: s.ID, - CollectionID: s.CollectionID, - PartitionID: s.PartitionID, - Channel: s.InsertChannel, - NumOfRows: s.NumOfRows, - State: s.State.String(), - MemSize: s.size.Load(), - Level: s.Level.String(), - IsImporting: s.IsImporting, - Compacted: s.Compacted, - IsSorted: s.IsSorted, - NodeID: paramtable.GetNodeID(), - }) + if collectionID <= 0 || s.GetCollectionID() == collectionID { + segments = append(segments, &metricsinfo.Segment{ + SegmentID: s.ID, + CollectionID: s.CollectionID, + PartitionID: s.PartitionID, + Channel: s.InsertChannel, + NumOfRows: s.NumOfRows, + State: s.State.String(), + MemSize: s.size.Load(), + Level: s.Level.String(), + IsImporting: s.IsImporting, + Compacted: s.Compacted, + IsSorted: s.IsSorted, + NodeID: paramtable.GetNodeID(), + }) + } } return segments diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index ec8b7d8329..7ef0593d47 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -1375,7 +1375,7 @@ func TestMeta_GetSegmentsJSON(t *testing.T) { }, } - segments := m.getSegmentsMetrics() + segments := m.getSegmentsMetrics(0) // Check the length of the segments assert.Equal(t, 2, len(segments)) diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index 31e090cdb8..9e83e7d289 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -18,9 +18,11 @@ package datacoord import ( "context" + "fmt" "sync" "github.com/cockroachdb/errors" + "github.com/tidwall/gjson" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -132,8 +134,45 @@ func mergeChannels(dnChannels []*metricsinfo.Channel, dcChannels map[int64]map[s return mergedChannels } +func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + v := jsonReq.Get(metricsinfo.MetricRequestParamINKey) + if !v.Exists() { + // default to get all segments from dataanode + return s.getDataNodeSegmentsJSON(ctx, req) + } + + in := v.String() + if in == "dn" { + // TODO: support filter by collection id + return s.getDataNodeSegmentsJSON(ctx, req) + } + + if in == "dc" { + v = jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey) + collectionID := int64(0) + if v.Exists() { + collectionID = v.Int() + } + + segments := s.meta.getSegmentsMetrics(collectionID) + for _, seg := range segments { + isIndexed, indexedFields := s.meta.indexMeta.GetSegmentIndexedFields(seg.CollectionID, seg.SegmentID) + seg.IndexedFields = indexedFields + seg.IsIndexed = isIndexed + } + + bs, err := json.Marshal(segments) + if err != nil { + log.Warn("marshal segment value failed", zap.Int64("collectionID", collectionID), zap.String("err", err.Error())) + return "", nil + } + return string(bs), nil + } + return "", fmt.Errorf("invalid param value in=[%s], it should be dc or dn", in) +} + func (s *Server) getDistJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) string { - segments := s.meta.getSegmentsMetrics() + segments := s.meta.getSegmentsMetrics(-1) var channels []*metricsinfo.DmChannel for nodeID, ch := range s.channelManager.GetChannelWatchInfos() { for _, chInfo := range ch { @@ -158,7 +197,7 @@ func (s *Server) getDistJSON(ctx context.Context, req *milvuspb.GetMetricsReques return string(bs) } -func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) { +func (s *Server) getDataNodeSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) { ret, err := getMetrics[*metricsinfo.Segment](s, ctx, req) return metricsinfo.MarshalGetMetricsValues(ret, err) } diff --git a/internal/datacoord/metrics_info_test.go b/internal/datacoord/metrics_info_test.go index 514a70b3e6..c5d7c5250c 100644 --- a/internal/datacoord/metrics_info_test.go +++ b/internal/datacoord/metrics_info_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + "github.com/tidwall/gjson" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -29,6 +30,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/json" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/util/merr" @@ -371,7 +373,7 @@ func TestGetSegmentsJSON(t *testing.T) { mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) svr.cluster = mockCluster - actualJSON, err := svr.getSegmentsJSON(ctx, req) + actualJSON, err := svr.getDataNodeSegmentsJSON(ctx, req) assert.NoError(t, err) assert.Equal(t, expectedJSON, actualJSON) }) @@ -394,7 +396,7 @@ func TestGetSegmentsJSON(t *testing.T) { mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) svr.cluster = mockCluster - actualJSON, err := svr.getSegmentsJSON(ctx, req) + actualJSON, err := svr.getDataNodeSegmentsJSON(ctx, req) assert.Error(t, err) assert.Equal(t, "", actualJSON) }) @@ -422,7 +424,7 @@ func TestGetSegmentsJSON(t *testing.T) { mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) svr.cluster = mockCluster - actualJSON, err := svr.getSegmentsJSON(ctx, req) + actualJSON, err := svr.getDataNodeSegmentsJSON(ctx, req) assert.Error(t, err) assert.Equal(t, "", actualJSON) }) @@ -451,7 +453,7 @@ func TestGetSegmentsJSON(t *testing.T) { svr.cluster = mockCluster expectedJSON := "null" - actualJSON, err := svr.getSegmentsJSON(ctx, req) + actualJSON, err := svr.getDataNodeSegmentsJSON(ctx, req) assert.NoError(t, err) assert.Equal(t, expectedJSON, actualJSON) }) @@ -684,3 +686,115 @@ func TestGetDistJSON(t *testing.T) { assert.Equal(t, expectedJSON, actualJSON) }) } + +func TestServer_getSegmentsJSON(t *testing.T) { + s := &Server{ + meta: &meta{ + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "channel1", + }, + }, + }, + }, + indexMeta: &indexMeta{ + segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ + 1000: { + 10: &model.SegmentIndex{ + SegmentID: 1000, + CollectionID: 1, + PartitionID: 2, + NumRows: 10250, + IndexID: 10, + BuildID: 10000, + NodeID: 1, + IndexVersion: 0, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreatedUTCTime: 12, + IndexFileKeys: nil, + IndexSize: 0, + }, + }, + }, + indexes: map[UniqueID]map[UniqueID]*model.Index{ + 1: { + 10: &model.Index{ + CollectionID: 1, + FieldID: 100, + IndexID: 10, + IndexName: "test_index", + IsDeleted: false, + }, + }, + }, + }, + }, + } + + ctx := context.TODO() + req := &milvuspb.GetMetricsRequest{} + t.Run("valid request in dc", func(t *testing.T) { + jsonReq := gjson.Parse(`{"in": "dc", "collection_id": 1}`) + result, err := s.getSegmentsJSON(ctx, req, jsonReq) + assert.NoError(t, err) + assert.NotEmpty(t, result) + }) + + t.Run("invalid request", func(t *testing.T) { + jsonReq := gjson.Parse(`{"in": "invalid"}`) + result, err := s.getSegmentsJSON(ctx, req, jsonReq) + assert.Error(t, err) + assert.Empty(t, result) + }) + + t.Run("vaild request in dn", func(t *testing.T) { + segments := []*metricsinfo.Segment{ + { + SegmentID: 1, + CollectionID: 100, + PartitionID: 10, + NumOfRows: 1000, + State: "Flushed", + }, + } + segmentsBytes, err := json.Marshal(segments) + assert.NoError(t, err) + expectedJSON := string(segmentsBytes) + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: expectedJSON, + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + s.cluster = mockCluster + + jsonReq := gjson.Parse(`{"in": "dn"}`) + result, err := s.getSegmentsJSON(ctx, req, jsonReq) + assert.NoError(t, err) + assert.NotEmpty(t, result) + + jsonReq = gjson.Parse(`{}`) + result, err = s.getSegmentsJSON(ctx, req, jsonReq) + assert.NoError(t, err) + assert.NotEmpty(t, result) + }) +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index c735681951..e1ccd37c70 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -1145,41 +1145,50 @@ func (s *Server) registerMetricsRequest() { return s.getSystemInfoMetrics(ctx, req) }) - s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataDist, + s.metricsRequest.RegisterMetricsRequest(metricsinfo.DistKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return s.getDistJSON(ctx, req), nil }) - s.metricsRequest.RegisterMetricsRequest(metricsinfo.ImportTasks, + s.metricsRequest.RegisterMetricsRequest(metricsinfo.ImportTaskKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return s.importMeta.TaskStatsJSON(), nil }) - s.metricsRequest.RegisterMetricsRequest(metricsinfo.CompactionTasks, + s.metricsRequest.RegisterMetricsRequest(metricsinfo.CompactionTaskKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return s.meta.compactionTaskMeta.TaskStatsJSON(), nil }) - s.metricsRequest.RegisterMetricsRequest(metricsinfo.BuildIndexTasks, + s.metricsRequest.RegisterMetricsRequest(metricsinfo.BuildIndexTaskKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return s.meta.indexMeta.TaskStatsJSON(), nil }) - s.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks, + s.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTaskKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return s.getSyncTaskJSON(ctx, req) }) - s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments, + s.metricsRequest.RegisterMetricsRequest(metricsinfo.SegmentKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - return s.getSegmentsJSON(ctx, req) + return s.getSegmentsJSON(ctx, req, jsonReq) }) - s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels, + s.metricsRequest.RegisterMetricsRequest(metricsinfo.ChannelKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return s.getChannelsJSON(ctx, req) }) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.IndexKey, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + v := jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey) + collectionID := int64(0) + if v.Exists() { + collectionID = v.Int() + } + return s.meta.indexMeta.GetIndexJSON(collectionID), nil + }) log.Info("register metrics actions finished") } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index bbf62f484d..df4d671e46 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -285,17 +285,17 @@ func (node *DataNode) registerMetricsRequest() { return node.getSystemInfoMetrics(ctx, req) }) - node.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks, + node.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTaskKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return node.syncMgr.TaskStatsJSON(), nil }) - node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments, + node.metricsRequest.RegisterMetricsRequest(metricsinfo.SegmentKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return node.flowgraphManager.GetSegmentsJSON(), nil }) - node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels, + node.metricsRequest.RegisterMetricsRequest(metricsinfo.ChannelKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return node.flowgraphManager.GetChannelsJSON(), nil }) diff --git a/internal/http/router.go b/internal/http/router.go index 4ce447d69c..f457115183 100644 --- a/internal/http/router.go +++ b/internal/http/router.go @@ -88,6 +88,8 @@ const ( QCResourceGroupPath = "/_qc/resource_group" // QCAllTasksPath is the path to get all tasks in QueryCoord. QCAllTasksPath = "/_qc/tasks" + // QCSegmentsPath is the path to get segments in QueryCoord. + QCSegmentsPath = "/_qc/segments" // QNSegmentsPath is the path to get segments in QueryNode. QNSegmentsPath = "/_qn/segments" @@ -102,6 +104,8 @@ const ( DCCompactionTasksPath = "/_dc/tasks/compaction" // DCBuildIndexTasksPath is the path to get build index tasks in DataCoord. DCBuildIndexTasksPath = "/_dc/tasks/build_index" + // DCSegmentsPath is the path to get segments in DataCoord. + DCSegmentsPath = "/_dc/segments" // DNSyncTasksPath is the path to get sync tasks in DataNode. DNSyncTasksPath = "/_dn/tasks/sync" @@ -119,4 +123,7 @@ const ( CollectionListPath = "/_collection/list" // CollectionDescPath is the path to get collection description. CollectionDescPath = "/_collection/desc" + + // IndexListPath is the path to get all indexes. + IndexListPath = "/_index/list" ) diff --git a/internal/proxy/http_req_impl.go b/internal/proxy/http_req_impl.go index d986b9f061..375aefe001 100644 --- a/internal/proxy/http_req_impl.go +++ b/internal/proxy/http_req_impl.go @@ -210,7 +210,9 @@ func getDataComponentMetrics(node *Proxy, metricsType string) gin.HandlerFunc { // The Get request should be used to get the query parameters, not the body, such as Javascript // fetch API only support GET request with query parameter. -func listCollection(rootCoord types.RootCoordClient, queryCoord types.QueryCoordClient) gin.HandlerFunc { +func listCollection(node *Proxy) gin.HandlerFunc { + rootCoord := node.rootCoord + queryCoord := node.queryCoord return func(c *gin.Context) { dbName := c.Query(httpDBName) if len(dbName) == 0 { @@ -290,7 +292,8 @@ func listCollection(rootCoord types.RootCoordClient, queryCoord types.QueryCoord } } -func describeCollection(node types.ProxyComponent, rootCoord types.RootCoordClient) gin.HandlerFunc { +func describeCollection(node *Proxy) gin.HandlerFunc { + rootCoord := node.rootCoord return func(c *gin.Context) { dbName := c.Query(httpDBName) collectionName := c.Query(HTTPCollectionName) @@ -304,7 +307,7 @@ func describeCollection(node types.ProxyComponent, rootCoord types.RootCoordClie return } - describeCollectionResp, err := node.DescribeCollection(c, &milvuspb.DescribeCollectionRequest{ + describeCollectionResp, err := rootCoord.DescribeCollection(c, &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_DescribeCollection, }, diff --git a/internal/proxy/http_req_impl_test.go b/internal/proxy/http_req_impl_test.go index 531951b7ff..a81199a6e5 100644 --- a/internal/proxy/http_req_impl_test.go +++ b/internal/proxy/http_req_impl_test.go @@ -178,7 +178,8 @@ func TestListCollection(t *testing.T) { QueryServiceAvailable: []bool{true, true}, }, nil) - handler := listCollection(mockRoortCoordClient, mockQueryCoordClient) + proxy := &Proxy{queryCoord: mockQueryCoordClient, rootCoord: mockRoortCoordClient} + handler := listCollection(proxy) handler(c) assert.Equal(t, http.StatusOK, w.Code) @@ -194,7 +195,8 @@ func TestListCollection(t *testing.T) { mockRoortCoordClient := mocks.NewMockRootCoordClient(t) mockRoortCoordClient.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(nil, errors.New("error")) - handler := listCollection(mockRoortCoordClient, nil) + proxy := &Proxy{rootCoord: mockRoortCoordClient} + handler := listCollection(proxy) handler(c) assert.Equal(t, http.StatusInternalServerError, w.Code) assert.Contains(t, w.Body.String(), "error") @@ -212,7 +214,8 @@ func TestListCollection(t *testing.T) { mockQueryCoordClient := mocks.NewMockQueryCoordClient(t) mockQueryCoordClient.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(nil, errors.New("error")) - handler := listCollection(mockRoortCoordClient, mockQueryCoordClient) + proxy := &Proxy{queryCoord: mockQueryCoordClient, rootCoord: mockRoortCoordClient} + handler := listCollection(proxy) handler(c) assert.Equal(t, http.StatusInternalServerError, w.Code) assert.Contains(t, w.Body.String(), "error") @@ -225,9 +228,8 @@ func TestDescribeCollection(t *testing.T) { c, _ := gin.CreateTestContext(w) c.Request, _ = http.NewRequest("GET", "/?db_name=default&collection_name=collection1", nil) - mockProxy := mocks.NewMockProxy(t) mockRootCoord := mocks.NewMockRootCoordClient(t) - mockProxy.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{ + mockRootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{ Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, CollectionID: 1, CollectionName: "collection1", @@ -255,7 +257,8 @@ func TestDescribeCollection(t *testing.T) { Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, }, nil) - handler := describeCollection(mockProxy, mockRootCoord) + proxy := &Proxy{rootCoord: mockRootCoord} + handler := describeCollection(proxy) handler(c) assert.Equal(t, http.StatusOK, w.Code) @@ -268,11 +271,11 @@ func TestDescribeCollection(t *testing.T) { c, _ := gin.CreateTestContext(w) c.Request, _ = http.NewRequest("GET", "/?db_name=default&collection_name=collection1", nil) - mockProxy := mocks.NewMockProxy(t) mockRootCoord := mocks.NewMockRootCoordClient(t) - mockProxy.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, errors.New("error")) + mockRootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, errors.New("error")) - handler := describeCollection(mockProxy, mockRootCoord) + proxy := &Proxy{rootCoord: mockRootCoord} + handler := describeCollection(proxy) handler(c) assert.Equal(t, http.StatusInternalServerError, w.Code) @@ -284,10 +287,9 @@ func TestDescribeCollection(t *testing.T) { c, _ := gin.CreateTestContext(w) c.Request, _ = http.NewRequest("GET", "/?db_name=default", nil) - mockProxy := mocks.NewMockProxy(t) mockRootCoord := mocks.NewMockRootCoordClient(t) - - handler := describeCollection(mockProxy, mockRootCoord) + proxy := &Proxy{rootCoord: mockRootCoord} + handler := describeCollection(proxy) handler(c) assert.Equal(t, http.StatusBadRequest, w.Code) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index f889b517af..3a1ab22c10 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -6617,34 +6617,37 @@ func (node *Proxy) RegisterRestRouter(router gin.IRouter) { router.GET(http.SlowQueryPath, getSlowQuery(node)) // QueryCoord requests that are forwarded from proxy - router.GET(http.QCTargetPath, getQueryComponentMetrics(node, metricsinfo.QueryTarget)) - router.GET(http.QCDistPath, getQueryComponentMetrics(node, metricsinfo.QueryDist)) - router.GET(http.QCReplicaPath, getQueryComponentMetrics(node, metricsinfo.QueryReplicas)) - router.GET(http.QCResourceGroupPath, getQueryComponentMetrics(node, metricsinfo.QueryResourceGroups)) - router.GET(http.QCAllTasksPath, getQueryComponentMetrics(node, metricsinfo.QueryCoordAllTasks)) + router.GET(http.QCTargetPath, getQueryComponentMetrics(node, metricsinfo.TargetKey)) + router.GET(http.QCDistPath, getQueryComponentMetrics(node, metricsinfo.DistKey)) + router.GET(http.QCReplicaPath, getQueryComponentMetrics(node, metricsinfo.ReplicaKey)) + router.GET(http.QCResourceGroupPath, getQueryComponentMetrics(node, metricsinfo.ResourceGroupKey)) + router.GET(http.QCAllTasksPath, getQueryComponentMetrics(node, metricsinfo.AllTaskKey)) + router.GET(http.QCSegmentsPath, getQueryComponentMetrics(node, metricsinfo.SegmentKey)) // QueryNode requests that are forwarded from querycoord - router.GET(http.QNSegmentsPath, getQueryComponentMetrics(node, metricsinfo.QuerySegments)) - router.GET(http.QNChannelsPath, getQueryComponentMetrics(node, metricsinfo.QueryChannels)) + router.GET(http.QNSegmentsPath, getQueryComponentMetrics(node, metricsinfo.SegmentKey)) + router.GET(http.QNChannelsPath, getQueryComponentMetrics(node, metricsinfo.ChannelKey)) // DataCoord requests that are forwarded from proxy - router.GET(http.DCDistPath, getDataComponentMetrics(node, metricsinfo.DataDist)) - router.GET(http.DCCompactionTasksPath, getDataComponentMetrics(node, metricsinfo.CompactionTasks)) - router.GET(http.DCImportTasksPath, getDataComponentMetrics(node, metricsinfo.ImportTasks)) - router.GET(http.DCBuildIndexTasksPath, getDataComponentMetrics(node, metricsinfo.BuildIndexTasks)) + router.GET(http.DCDistPath, getDataComponentMetrics(node, metricsinfo.DistKey)) + router.GET(http.DCCompactionTasksPath, getDataComponentMetrics(node, metricsinfo.CompactionTaskKey)) + router.GET(http.DCImportTasksPath, getDataComponentMetrics(node, metricsinfo.ImportTaskKey)) + router.GET(http.DCBuildIndexTasksPath, getDataComponentMetrics(node, metricsinfo.BuildIndexTaskKey)) + router.GET(http.IndexListPath, getDataComponentMetrics(node, metricsinfo.IndexKey)) + router.GET(http.DCSegmentsPath, getDataComponentMetrics(node, metricsinfo.SegmentKey)) // Datanode requests that are forwarded from datacoord - router.GET(http.DNSyncTasksPath, getDataComponentMetrics(node, metricsinfo.SyncTasks)) - router.GET(http.DNSegmentsPath, getDataComponentMetrics(node, metricsinfo.DataSegments)) - router.GET(http.DNChannelsPath, getDataComponentMetrics(node, metricsinfo.DataChannels)) + router.GET(http.DNSyncTasksPath, getDataComponentMetrics(node, metricsinfo.SyncTaskKey)) + router.GET(http.DNSegmentsPath, getDataComponentMetrics(node, metricsinfo.SegmentKey)) + router.GET(http.DNChannelsPath, getDataComponentMetrics(node, metricsinfo.ChannelKey)) // Database requests router.GET(http.DatabaseListPath, listDatabase(node)) router.GET(http.DatabaseDescPath, describeDatabase(node)) // Collection requests - router.GET(http.CollectionListPath, listCollection(node.rootCoord, node.queryCoord)) - router.GET(http.CollectionDescPath, describeCollection(node, node.rootCoord)) + router.GET(http.CollectionListPath, listCollection(node)) + router.GET(http.CollectionDescPath, describeCollection(node)) } func (node *Proxy) CreatePrivilegeGroup(ctx context.Context, req *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) { diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index b1d5aac1cf..7c69574980 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -18,11 +18,13 @@ package querycoordv2 import ( "context" + "fmt" "sync" "time" "github.com/cockroachdb/errors" "github.com/samber/lo" + "github.com/tidwall/gjson" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -291,6 +293,36 @@ func (s *Server) getSegmentsFromQueryNode(ctx context.Context, req *milvuspb.Get return metricsinfo.MarshalGetMetricsValues(segments, err) } +func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + v := jsonReq.Get(metricsinfo.MetricRequestParamINKey) + if !v.Exists() { + // default to get all segments from dataanode + return s.getSegmentsFromQueryNode(ctx, req) + } + + in := v.String() + if in == "qn" { + // TODO: support filter by collection id + return s.getSegmentsFromQueryNode(ctx, req) + } + + if in == "qc" { + v = jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey) + collectionID := int64(0) + if v.Exists() { + collectionID = v.Int() + } + filteredSegments := s.dist.SegmentDistManager.GetSegmentDist(collectionID) + bs, err := json.Marshal(filteredSegments) + if err != nil { + log.Warn("marshal segment value failed", zap.Int64("collectionID", collectionID), zap.String("err", err.Error())) + return "", nil + } + return string(bs), nil + } + return "", fmt.Errorf("invalid param value in=[%s], it should be qc or qn", in) +} + // TODO(dragondriver): add more detail metrics func (s *Server) getSystemInfoMetrics( ctx context.Context, diff --git a/internal/querycoordv2/handlers_test.go b/internal/querycoordv2/handlers_test.go index 1ac7d65467..e465c81ea4 100644 --- a/internal/querycoordv2/handlers_test.go +++ b/internal/querycoordv2/handlers_test.go @@ -21,9 +21,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/tidwall/gjson" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/json" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/util/metricsinfo" ) @@ -106,3 +109,74 @@ func TestGetSegmentsFromQueryNode(t *testing.T) { assert.NoError(t, err) assert.Equal(t, expectedSegments, actualSegments) } + +func TestServer_getSegmentsJSON(t *testing.T) { + mockCluster := session.NewMockCluster(t) + nodeManager := session.NewNodeManager() + nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1})) + server := &Server{cluster: mockCluster, nodeMgr: nodeManager} + expectedSegments := []*metricsinfo.Segment{ + { + SegmentID: 1, + PartitionID: 1, + Channel: "channel1", + ResourceGroup: "default", + MemSize: int64(1024), + LoadedInsertRowCount: 100, + }, + { + SegmentID: 2, + PartitionID: 1, + Channel: "channel2", + ResourceGroup: "default", + MemSize: int64(1024), + LoadedInsertRowCount: 200, + }, + } + resp := &milvuspb.GetMetricsResponse{ + Response: func() string { + data, _ := json.Marshal(expectedSegments) + return string(data) + }(), + } + req := &milvuspb.GetMetricsRequest{} + mockCluster.EXPECT().GetMetrics(mock.Anything, mock.Anything, req).Return(resp, nil) + + server.dist = meta.NewDistributionManager() + server.dist.SegmentDistManager.Update(1, meta.SegmentFromInfo(&datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + PartitionID: 1, + InsertChannel: "dmc0", + })) + + ctx := context.TODO() + + t.Run("valid request in dc", func(t *testing.T) { + jsonReq := gjson.Parse(`{"in": "qc", "collection_id": 1}`) + result, err := server.getSegmentsJSON(ctx, req, jsonReq) + assert.NoError(t, err) + assert.NotEmpty(t, result) + }) + + t.Run("invalid request", func(t *testing.T) { + jsonReq := gjson.Parse(`{"in": "invalid"}`) + result, err := server.getSegmentsJSON(ctx, req, jsonReq) + assert.Error(t, err) + assert.Empty(t, result) + }) + + t.Run("valid request in qn", func(t *testing.T) { + jsonReq := gjson.Parse(`{"in": "qn"}`) + result, err := server.getSegmentsJSON(ctx, req, jsonReq) + assert.NoError(t, err) + assert.NotEmpty(t, result) + }) + + t.Run("valid request in qc", func(t *testing.T) { + jsonReq := gjson.Parse(`{"in": "qc", "collection_id": 1}`) + result, err := server.getSegmentsJSON(ctx, req, jsonReq) + assert.NoError(t, err) + assert.NotEmpty(t, result) + }) +} diff --git a/internal/querycoordv2/meta/dist_manager.go b/internal/querycoordv2/meta/dist_manager.go index 9c6239948b..05275f2135 100644 --- a/internal/querycoordv2/meta/dist_manager.go +++ b/internal/querycoordv2/meta/dist_manager.go @@ -43,7 +43,7 @@ func NewDistributionManager() *DistributionManager { // If there are no segments, channels, or leader views, it returns an empty string. // In case of an error during JSON marshaling, it returns the error. func (dm *DistributionManager) GetDistributionJSON() string { - segments := dm.GetSegmentDist() + segments := dm.GetSegmentDist(0) channels := dm.GetChannelDist() leaderView := dm.GetLeaderView() diff --git a/internal/querycoordv2/meta/leader_view_manager.go b/internal/querycoordv2/meta/leader_view_manager.go index 19d0915164..c0cfca82b4 100644 --- a/internal/querycoordv2/meta/leader_view_manager.go +++ b/internal/querycoordv2/meta/leader_view_manager.go @@ -351,7 +351,10 @@ func (mgr *LeaderViewManager) GetLeaderView() []*metricsinfo.LeaderView { } for _, seg := range lv.GrowingSegments { - leaderView.GrowingSegments = append(leaderView.GrowingSegments, newSegmentMetricsFrom(seg)) + leaderView.GrowingSegments = append(leaderView.GrowingSegments, &metricsinfo.Segment{ + SegmentID: seg.ID, + NodeID: seg.Node, + }) } leaderViews = append(leaderViews, leaderView) diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index ea858e1a6e..85519b7770 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -137,12 +137,13 @@ func newSegmentMetricsFrom(segment *Segment) *metricsinfo.Segment { convertedSegment := metrics.NewSegmentFrom(segment.SegmentInfo) convertedSegment.NodeID = segment.Node convertedSegment.LoadedTimestamp = tsoutil.PhysicalTimeFormat(segment.LastDeltaTimestamp) - convertedSegment.Index = lo.Map(lo.Values(segment.IndexInfo), func(e *querypb.FieldIndexInfo, i int) *metricsinfo.SegmentIndex { - return &metricsinfo.SegmentIndex{ + convertedSegment.IndexedFields = lo.Map(lo.Values(segment.IndexInfo), func(e *querypb.FieldIndexInfo, i int) *metricsinfo.IndexedField { + return &metricsinfo.IndexedField{ IndexFieldID: e.FieldID, IndexID: e.IndexID, BuildID: e.BuildID, IndexSize: e.IndexSize, + IsLoaded: true, } }) return convertedSegment @@ -246,14 +247,16 @@ func (m *SegmentDistManager) GetByFilter(filters ...SegmentDistFilter) []*Segmen return ret } -func (m *SegmentDistManager) GetSegmentDist() []*metricsinfo.Segment { +func (m *SegmentDistManager) GetSegmentDist(collectionID int64) []*metricsinfo.Segment { m.rwmutex.RLock() defer m.rwmutex.RUnlock() var segments []*metricsinfo.Segment for _, nodeSeg := range m.segments { for _, segment := range nodeSeg.segments { - segments = append(segments, newSegmentMetricsFrom(segment)) + if collectionID == 0 || segment.GetCollectionID() == collectionID { + segments = append(segments, newSegmentMetricsFrom(segment)) + } } } diff --git a/internal/querycoordv2/meta/segment_dist_manager_test.go b/internal/querycoordv2/meta/segment_dist_manager_test.go index 3f049ba7ad..43e75041e1 100644 --- a/internal/querycoordv2/meta/segment_dist_manager_test.go +++ b/internal/querycoordv2/meta/segment_dist_manager_test.go @@ -222,7 +222,7 @@ func TestGetSegmentDistJSON(t *testing.T) { manager.Update(1, segment1) manager.Update(2, segment2) - segments := manager.GetSegmentDist() + segments := manager.GetSegmentDist(0) assert.Equal(t, 2, len(segments)) checkResults := func(s *metricsinfo.Segment) { diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 4704848773..43f0264448 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -221,7 +221,7 @@ func (s *Server) registerMetricsRequest() { } QuerySegmentsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - return s.getSegmentsFromQueryNode(ctx, req) + return s.getSegmentsJSON(ctx, req, jsonReq) } QueryChannelsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { @@ -230,15 +230,15 @@ func (s *Server) registerMetricsRequest() { // register actions that requests are processed in querycoord s.metricsRequest.RegisterMetricsRequest(metricsinfo.SystemInfoMetrics, getSystemInfoAction) - s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryCoordAllTasks, QueryTasksAction) - s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryDist, QueryDistAction) - s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryTarget, QueryTargetAction) - s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryReplicas, QueryReplicasAction) - s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryResourceGroups, QueryResourceGroupsAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.AllTaskKey, QueryTasksAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.DistKey, QueryDistAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.TargetKey, QueryTargetAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.ReplicaKey, QueryReplicasAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.ResourceGroupKey, QueryResourceGroupsAction) // register actions that requests are processed in querynode - s.metricsRequest.RegisterMetricsRequest(metricsinfo.QuerySegments, QuerySegmentsAction) - s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryChannels, QueryChannelsAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.SegmentKey, QuerySegmentsAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.ChannelKey, QueryChannelsAction) log.Info("register metrics actions finished") } diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 04b514568a..e2ca77f512 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -189,9 +189,9 @@ func getSegmentJSON(node *QueryNode) string { allSegments := node.manager.Segment.GetBy() var ms []*metricsinfo.Segment for _, s := range allSegments { - indexes := make([]*metricsinfo.SegmentIndex, 0, len(s.Indexes())) + indexes := make([]*metricsinfo.IndexedField, 0, len(s.Indexes())) for _, index := range s.Indexes() { - indexes = append(indexes, &metricsinfo.SegmentIndex{ + indexes = append(indexes, &metricsinfo.IndexedField{ IndexFieldID: index.IndexInfo.FieldID, IndexID: index.IndexInfo.IndexID, IndexSize: index.IndexInfo.IndexSize, @@ -205,7 +205,7 @@ func getSegmentJSON(node *QueryNode) string { CollectionID: s.Collection(), PartitionID: s.Partition(), MemSize: s.MemSize(), - Index: indexes, + IndexedFields: indexes, State: s.Type().String(), ResourceGroup: s.ResourceGroup(), LoadedInsertRowCount: s.InsertCount(), diff --git a/internal/querynodev2/metrics_info_test.go b/internal/querynodev2/metrics_info_test.go index 447d548d34..be00650d57 100644 --- a/internal/querynodev2/metrics_info_test.go +++ b/internal/querynodev2/metrics_info_test.go @@ -120,12 +120,12 @@ func TestGetSegmentJSON(t *testing.T) { assert.Equal(t, int64(1001), segments[0].CollectionID) assert.Equal(t, int64(2001), segments[0].PartitionID) assert.Equal(t, int64(1024), segments[0].MemSize) - assert.Equal(t, 1, len(segments[0].Index)) - assert.Equal(t, int64(1), segments[0].Index[0].IndexFieldID) - assert.Equal(t, int64(101), segments[0].Index[0].IndexID) - assert.Equal(t, int64(512), segments[0].Index[0].IndexSize) - assert.Equal(t, int64(10001), segments[0].Index[0].BuildID) - assert.True(t, segments[0].Index[0].IsLoaded) + assert.Equal(t, 1, len(segments[0].IndexedFields)) + assert.Equal(t, int64(1), segments[0].IndexedFields[0].IndexFieldID) + assert.Equal(t, int64(101), segments[0].IndexedFields[0].IndexID) + assert.Equal(t, int64(512), segments[0].IndexedFields[0].IndexSize) + assert.Equal(t, int64(10001), segments[0].IndexedFields[0].BuildID) + assert.True(t, segments[0].IndexedFields[0].IsLoaded) assert.Equal(t, "Growing", segments[0].State) assert.Equal(t, "default", segments[0].ResourceGroup) assert.Equal(t, int64(100), segments[0].LoadedInsertRowCount) diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index f7ff0da91f..17fb05cba4 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -284,12 +284,12 @@ func (node *QueryNode) registerMetricsRequest() { return getSystemInfoMetrics(ctx, req, node) }) - node.metricsRequest.RegisterMetricsRequest(metricsinfo.QuerySegments, + node.metricsRequest.RegisterMetricsRequest(metricsinfo.SegmentKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return getSegmentJSON(node), nil }) - node.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryChannels, + node.metricsRequest.RegisterMetricsRequest(metricsinfo.ChannelKey, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return getChannelJSON(node), nil }) diff --git a/pkg/util/metricsinfo/metric_request.go b/pkg/util/metricsinfo/metric_request.go index 212177bfde..02c0e5a6fe 100644 --- a/pkg/util/metricsinfo/metric_request.go +++ b/pkg/util/metricsinfo/metric_request.go @@ -44,54 +44,60 @@ const ( // MetricRequestParamsSeparator is a separator that parameter value will be joined be separator MetricRequestParamsSeparator = "," - // QuerySegmentDist request for segment distribution on the query node - QuerySegments = "qn_segments" + // SegmentKey request for get segments from the datanode/querynode/datacoord/querycoord + SegmentKey = "segments" - // QueryChannelDist request for channel distribution on the query node - QueryChannels = "qn_channels" + // ChannelKey request for get channels from the datanode/querynode/datacoord/querycoord + ChannelKey = "channels" - // QueryDist request for segment/channel/leader view distribution on querycoord - QueryDist = "qc_dist" + // DistKey request for segment/channel/leader view distribution on querycoord + // DistKey request for get segments on the datacoord + DistKey = "dist" - // QueryTarget request for segment/channel target on the querycoord - QueryTarget = "qc_target" + // TargetKey request for segment/channel target on the querycoord + TargetKey = "qc_target" - // QueryCoordAllTasks request for get tasks on the querycoord - QueryCoordAllTasks = "qc_tasks_all" + // AllTaskKey request for get all tasks on the querycoord + AllTaskKey = "tasks_all" - // QueryReplicas request for get replica on the querycoord - QueryReplicas = "qc_replica" + // ReplicaKey request for get replica on the querycoord + ReplicaKey = "replica" - // QueryResourceGroups request for get resource groups on the querycoord - QueryResourceGroups = "qc_resource_group" + // ResourceGroupKey request for get resource groups on the querycoord + ResourceGroupKey = "resource_group" - // DataDist request for get segments on the datacoord - DataDist = "dc_segments" + // ImportTaskKey request for get import tasks from the datacoord + ImportTaskKey = "import_tasks" - // ImportTasks request for get import tasks from the datacoord - ImportTasks = "dc_import_tasks" + // CompactionTaskKey request for get compaction tasks from the datacoord + CompactionTaskKey = "compaction_tasks" - // CompactionTasks request for get compaction tasks from the datacoord - CompactionTasks = "dc_compaction_tasks" + // BuildIndexTaskKey request for get building index tasks from the datacoord + BuildIndexTaskKey = "build_index_tasks" - // BuildIndexTasks request for get building index tasks from the datacoord - BuildIndexTasks = "dc_build_index_tasks" + // IndexKey request for get index list/detail from the datacoord + IndexKey = "index" - // SyncTasks request for get sync tasks from the datanode - SyncTasks = "dn_sync_tasks" - - // DataSegments request for get segments from the datanode - DataSegments = "dn_segments" - - // DataChannels request for get channels from the datanode - DataChannels = "dn_channels" + // SyncTaskKey request for get sync tasks from the datanode + SyncTaskKey = "sync_tasks" // MetricRequestParamVerboseKey as a request parameter decide to whether return verbose value MetricRequestParamVerboseKey = "verbose" MetricRequestParamTargetScopeKey = "target_scope" + + MetricRequestParamINKey = "in" + + MetricRequestParamCollectionIDKey = "collection_id" ) +var MetricRequestParamINValue = map[string]struct{}{ + "dc": {}, + "qc": {}, + "dn": {}, + "qn": {}, +} + type MetricsRequestAction func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) type MetricsRequest struct { diff --git a/pkg/util/metricsinfo/metrics_info.go b/pkg/util/metricsinfo/metrics_info.go index b321316885..dfa341ef8b 100644 --- a/pkg/util/metricsinfo/metrics_info.go +++ b/pkg/util/metricsinfo/metrics_info.go @@ -126,7 +126,7 @@ type Segment struct { // load related IsInvisible bool `json:"is_invisible,omitempty"` LoadedTimestamp string `json:"loaded_timestamp,omitempty,string"` - Index []*SegmentIndex `json:"index,omitempty"` + IndexedFields []*IndexedField `json:"index_fields,omitempty"` ResourceGroup string `json:"resource_group,omitempty"` LoadedInsertRowCount int64 `json:"loaded_insert_row_count,omitempty,string"` // inert row count for growing segment that excludes the deleted row count in QueryNode MemSize int64 `json:"mem_size,omitempty,string"` // memory size of segment in QueryNode @@ -135,9 +135,11 @@ type Segment struct { FlushedRows int64 `json:"flushed_rows,omitempty,string"` SyncBufferRows int64 `json:"sync_buffer_rows,omitempty,string"` SyncingRows int64 `json:"syncing_rows,omitempty,string"` + + IsIndexed bool `json:"is_indexed,omitempty"` // indicate whether the segment is indexed } -type SegmentIndex struct { +type IndexedField struct { IndexFieldID int64 `json:"field_id,omitempty,string"` IndexID int64 `json:"index_id,omitempty,string"` BuildID int64 `json:"build_id,omitempty,string"` @@ -463,3 +465,17 @@ type Databases struct { IDs []string `json:"db_ids,omitempty"` CreatedTimestamps []string `json:"created_timestamps,omitempty"` } + +type Index struct { + CollectionID int64 `json:"collection_id,omitempty,string"` + FieldID int64 `json:"field_id,omitempty,string"` + IndexID int64 `json:"index_id,omitempty,string"` + Name string `json:"name,omitempty"` + IsDeleted bool `json:"is_deleted"` + CreateTime string `json:"create_time,omitempty"` + IndexParams map[string]string `json:"index_params,omitempty"` + IsAutoIndex bool `json:"is_auto_index,omitempty"` + UserIndexParams map[string]string `json:"user_index_params"` + State string `json:"state,omitempty"` + IndexStateFailReason string `json:"index_state_fail_reason,omitempty"` +}