mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 10:28:41 +08:00
enhance: add list index and segment index retrieval API for WebUI (#37861)
issue: #36621 Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
parent
7c5a8012cf
commit
7bbfe86bcd
@ -42,9 +42,11 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparams"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
@ -1068,3 +1070,72 @@ func (m *indexMeta) TaskStatsJSON() string {
|
||||
}
|
||||
return string(ret)
|
||||
}
|
||||
|
||||
func (m *indexMeta) GetIndexJSON(collectionID int64) string {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
var indexMetrics []*metricsinfo.Index
|
||||
for collID, indexes := range m.indexes {
|
||||
for _, index := range indexes {
|
||||
if collectionID == 0 || collID == collectionID {
|
||||
im := &metricsinfo.Index{
|
||||
CollectionID: collID,
|
||||
IndexID: index.IndexID,
|
||||
FieldID: index.FieldID,
|
||||
Name: index.IndexName,
|
||||
IsDeleted: index.IsDeleted,
|
||||
CreateTime: tsoutil.PhysicalTimeFormat(index.CreateTime),
|
||||
IndexParams: funcutil.KeyValuePair2Map(index.IndexParams),
|
||||
IsAutoIndex: index.IsAutoIndex,
|
||||
UserIndexParams: funcutil.KeyValuePair2Map(index.UserIndexParams),
|
||||
}
|
||||
indexMetrics = append(indexMetrics, im)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ret, err := json.Marshal(indexMetrics)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return string(ret)
|
||||
}
|
||||
|
||||
func (m *indexMeta) GetSegmentIndexedFields(collectionID UniqueID, segmentID UniqueID) (bool, []*metricsinfo.IndexedField) {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
fieldIndexes, ok := m.indexes[collectionID]
|
||||
if !ok {
|
||||
// the segment should be unindexed status if the collection has no indexes
|
||||
return false, []*metricsinfo.IndexedField{}
|
||||
}
|
||||
|
||||
// the segment should be unindexed status if the segment indexes is not found
|
||||
segIndexInfos, ok := m.segmentIndexes[segmentID]
|
||||
if !ok || len(segIndexInfos) == 0 {
|
||||
return false, []*metricsinfo.IndexedField{}
|
||||
}
|
||||
|
||||
isIndexed := true
|
||||
var segmentIndexes []*metricsinfo.IndexedField
|
||||
for _, index := range fieldIndexes {
|
||||
if si, ok := segIndexInfos[index.IndexID]; !index.IsDeleted {
|
||||
buildID := int64(-1)
|
||||
if !ok {
|
||||
// the segment should be unindexed status if the segment index is not found within field indexes
|
||||
isIndexed = false
|
||||
} else {
|
||||
buildID = si.BuildID
|
||||
}
|
||||
|
||||
segmentIndexes = append(segmentIndexes, &metricsinfo.IndexedField{
|
||||
IndexFieldID: index.IndexID,
|
||||
IndexID: index.IndexID,
|
||||
BuildID: buildID,
|
||||
IndexSize: int64(si.IndexSize),
|
||||
})
|
||||
}
|
||||
}
|
||||
return isIndexed, segmentIndexes
|
||||
}
|
||||
|
@ -1567,3 +1567,113 @@ func TestBuildIndexTaskStatsJSON(t *testing.T) {
|
||||
im.segmentBuildInfo.Remove(si1.BuildID)
|
||||
assert.Equal(t, 1, len(im.segmentBuildInfo.List()))
|
||||
}
|
||||
|
||||
func TestMeta_GetIndexJSON(t *testing.T) {
|
||||
m := &indexMeta{
|
||||
indexes: map[UniqueID]map[UniqueID]*model.Index{
|
||||
1: {
|
||||
1: &model.Index{
|
||||
CollectionID: 1,
|
||||
FieldID: 1,
|
||||
IndexID: 1,
|
||||
IndexName: "index1",
|
||||
IsDeleted: false,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "param1",
|
||||
Value: "value1",
|
||||
},
|
||||
},
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "param1",
|
||||
Value: "value1",
|
||||
},
|
||||
},
|
||||
IsAutoIndex: true,
|
||||
UserIndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "param1",
|
||||
Value: "value1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
actualJSON := m.GetIndexJSON(0)
|
||||
var actualIndex []*metricsinfo.Index
|
||||
err := json.Unmarshal([]byte(actualJSON), &actualIndex)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(1), actualIndex[0].CollectionID)
|
||||
assert.Equal(t, int64(1), actualIndex[0].FieldID)
|
||||
assert.Equal(t, int64(1), actualIndex[0].IndexID)
|
||||
assert.Equal(t, map[string]string{"param1": "value1"}, actualIndex[0].IndexParams)
|
||||
assert.Equal(t, map[string]string{"param1": "value1"}, actualIndex[0].UserIndexParams)
|
||||
}
|
||||
|
||||
func TestMeta_GetSegmentIndexStatus(t *testing.T) {
|
||||
var (
|
||||
collID = UniqueID(1)
|
||||
partID = UniqueID(2)
|
||||
indexID = UniqueID(10)
|
||||
fieldID = UniqueID(100)
|
||||
segID = UniqueID(1000)
|
||||
buildID = UniqueID(10000)
|
||||
)
|
||||
|
||||
m := &indexMeta{}
|
||||
m.indexes = map[UniqueID]map[UniqueID]*model.Index{
|
||||
collID: {
|
||||
indexID: {
|
||||
CollectionID: collID,
|
||||
FieldID: fieldID,
|
||||
IndexID: indexID,
|
||||
IndexName: "test_index",
|
||||
IsDeleted: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
m.segmentIndexes = map[UniqueID]map[UniqueID]*model.SegmentIndex{
|
||||
segID: {
|
||||
indexID: {
|
||||
SegmentID: segID,
|
||||
CollectionID: collID,
|
||||
PartitionID: partID,
|
||||
NumRows: 10250,
|
||||
IndexID: indexID,
|
||||
BuildID: buildID,
|
||||
NodeID: 1,
|
||||
IndexVersion: 0,
|
||||
IndexState: commonpb.IndexState_Finished,
|
||||
FailReason: "",
|
||||
IsDeleted: false,
|
||||
CreatedUTCTime: 12,
|
||||
IndexFileKeys: nil,
|
||||
IndexSize: 0,
|
||||
},
|
||||
},
|
||||
segID + 1: {},
|
||||
}
|
||||
|
||||
t.Run("index exists", func(t *testing.T) {
|
||||
isIndexed, segmentIndexes := m.GetSegmentIndexedFields(collID, segID)
|
||||
assert.True(t, isIndexed)
|
||||
assert.Len(t, segmentIndexes, 1)
|
||||
assert.Equal(t, indexID, segmentIndexes[0].IndexID)
|
||||
assert.Equal(t, buildID, segmentIndexes[0].BuildID)
|
||||
})
|
||||
|
||||
t.Run("index does not exist", func(t *testing.T) {
|
||||
isIndexed, segmentIndexes := m.GetSegmentIndexedFields(collID+1, segID)
|
||||
assert.False(t, isIndexed)
|
||||
assert.Empty(t, segmentIndexes)
|
||||
})
|
||||
|
||||
t.Run("segment does not exist", func(t *testing.T) {
|
||||
isIndexed, segmentIndexes := m.GetSegmentIndexedFields(collID, segID+1)
|
||||
assert.False(t, isIndexed)
|
||||
assert.Empty(t, segmentIndexes)
|
||||
})
|
||||
}
|
||||
|
@ -2046,26 +2046,28 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
|
||||
return metricMutation, nil
|
||||
}
|
||||
|
||||
func (m *meta) getSegmentsMetrics() []*metricsinfo.Segment {
|
||||
func (m *meta) getSegmentsMetrics(collectionID int64) []*metricsinfo.Segment {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
segments := make([]*metricsinfo.Segment, 0, len(m.segments.segments))
|
||||
for _, s := range m.segments.segments {
|
||||
segments = append(segments, &metricsinfo.Segment{
|
||||
SegmentID: s.ID,
|
||||
CollectionID: s.CollectionID,
|
||||
PartitionID: s.PartitionID,
|
||||
Channel: s.InsertChannel,
|
||||
NumOfRows: s.NumOfRows,
|
||||
State: s.State.String(),
|
||||
MemSize: s.size.Load(),
|
||||
Level: s.Level.String(),
|
||||
IsImporting: s.IsImporting,
|
||||
Compacted: s.Compacted,
|
||||
IsSorted: s.IsSorted,
|
||||
NodeID: paramtable.GetNodeID(),
|
||||
})
|
||||
if collectionID <= 0 || s.GetCollectionID() == collectionID {
|
||||
segments = append(segments, &metricsinfo.Segment{
|
||||
SegmentID: s.ID,
|
||||
CollectionID: s.CollectionID,
|
||||
PartitionID: s.PartitionID,
|
||||
Channel: s.InsertChannel,
|
||||
NumOfRows: s.NumOfRows,
|
||||
State: s.State.String(),
|
||||
MemSize: s.size.Load(),
|
||||
Level: s.Level.String(),
|
||||
IsImporting: s.IsImporting,
|
||||
Compacted: s.Compacted,
|
||||
IsSorted: s.IsSorted,
|
||||
NodeID: paramtable.GetNodeID(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return segments
|
||||
|
@ -1375,7 +1375,7 @@ func TestMeta_GetSegmentsJSON(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
segments := m.getSegmentsMetrics()
|
||||
segments := m.getSegmentsMetrics(0)
|
||||
|
||||
// Check the length of the segments
|
||||
assert.Equal(t, 2, len(segments))
|
||||
|
@ -18,9 +18,11 @@ package datacoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/tidwall/gjson"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
@ -132,8 +134,45 @@ func mergeChannels(dnChannels []*metricsinfo.Channel, dcChannels map[int64]map[s
|
||||
return mergedChannels
|
||||
}
|
||||
|
||||
func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
v := jsonReq.Get(metricsinfo.MetricRequestParamINKey)
|
||||
if !v.Exists() {
|
||||
// default to get all segments from dataanode
|
||||
return s.getDataNodeSegmentsJSON(ctx, req)
|
||||
}
|
||||
|
||||
in := v.String()
|
||||
if in == "dn" {
|
||||
// TODO: support filter by collection id
|
||||
return s.getDataNodeSegmentsJSON(ctx, req)
|
||||
}
|
||||
|
||||
if in == "dc" {
|
||||
v = jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey)
|
||||
collectionID := int64(0)
|
||||
if v.Exists() {
|
||||
collectionID = v.Int()
|
||||
}
|
||||
|
||||
segments := s.meta.getSegmentsMetrics(collectionID)
|
||||
for _, seg := range segments {
|
||||
isIndexed, indexedFields := s.meta.indexMeta.GetSegmentIndexedFields(seg.CollectionID, seg.SegmentID)
|
||||
seg.IndexedFields = indexedFields
|
||||
seg.IsIndexed = isIndexed
|
||||
}
|
||||
|
||||
bs, err := json.Marshal(segments)
|
||||
if err != nil {
|
||||
log.Warn("marshal segment value failed", zap.Int64("collectionID", collectionID), zap.String("err", err.Error()))
|
||||
return "", nil
|
||||
}
|
||||
return string(bs), nil
|
||||
}
|
||||
return "", fmt.Errorf("invalid param value in=[%s], it should be dc or dn", in)
|
||||
}
|
||||
|
||||
func (s *Server) getDistJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) string {
|
||||
segments := s.meta.getSegmentsMetrics()
|
||||
segments := s.meta.getSegmentsMetrics(-1)
|
||||
var channels []*metricsinfo.DmChannel
|
||||
for nodeID, ch := range s.channelManager.GetChannelWatchInfos() {
|
||||
for _, chInfo := range ch {
|
||||
@ -158,7 +197,7 @@ func (s *Server) getDistJSON(ctx context.Context, req *milvuspb.GetMetricsReques
|
||||
return string(bs)
|
||||
}
|
||||
|
||||
func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) {
|
||||
func (s *Server) getDataNodeSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) {
|
||||
ret, err := getMetrics[*metricsinfo.Segment](s, ctx, req)
|
||||
return metricsinfo.MarshalGetMetricsValues(ret, err)
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tidwall/gjson"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
@ -29,6 +30,7 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/session"
|
||||
"github.com/milvus-io/milvus/internal/json"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
@ -371,7 +373,7 @@ func TestGetSegmentsJSON(t *testing.T) {
|
||||
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
|
||||
svr.cluster = mockCluster
|
||||
|
||||
actualJSON, err := svr.getSegmentsJSON(ctx, req)
|
||||
actualJSON, err := svr.getDataNodeSegmentsJSON(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, expectedJSON, actualJSON)
|
||||
})
|
||||
@ -394,7 +396,7 @@ func TestGetSegmentsJSON(t *testing.T) {
|
||||
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
|
||||
svr.cluster = mockCluster
|
||||
|
||||
actualJSON, err := svr.getSegmentsJSON(ctx, req)
|
||||
actualJSON, err := svr.getDataNodeSegmentsJSON(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, "", actualJSON)
|
||||
})
|
||||
@ -422,7 +424,7 @@ func TestGetSegmentsJSON(t *testing.T) {
|
||||
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
|
||||
svr.cluster = mockCluster
|
||||
|
||||
actualJSON, err := svr.getSegmentsJSON(ctx, req)
|
||||
actualJSON, err := svr.getDataNodeSegmentsJSON(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, "", actualJSON)
|
||||
})
|
||||
@ -451,7 +453,7 @@ func TestGetSegmentsJSON(t *testing.T) {
|
||||
svr.cluster = mockCluster
|
||||
|
||||
expectedJSON := "null"
|
||||
actualJSON, err := svr.getSegmentsJSON(ctx, req)
|
||||
actualJSON, err := svr.getDataNodeSegmentsJSON(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, expectedJSON, actualJSON)
|
||||
})
|
||||
@ -684,3 +686,115 @@ func TestGetDistJSON(t *testing.T) {
|
||||
assert.Equal(t, expectedJSON, actualJSON)
|
||||
})
|
||||
}
|
||||
|
||||
func TestServer_getSegmentsJSON(t *testing.T) {
|
||||
s := &Server{
|
||||
meta: &meta{
|
||||
segments: &SegmentsInfo{
|
||||
segments: map[int64]*SegmentInfo{
|
||||
1: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "channel1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
indexMeta: &indexMeta{
|
||||
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
|
||||
1000: {
|
||||
10: &model.SegmentIndex{
|
||||
SegmentID: 1000,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
NumRows: 10250,
|
||||
IndexID: 10,
|
||||
BuildID: 10000,
|
||||
NodeID: 1,
|
||||
IndexVersion: 0,
|
||||
IndexState: commonpb.IndexState_Finished,
|
||||
FailReason: "",
|
||||
IsDeleted: false,
|
||||
CreatedUTCTime: 12,
|
||||
IndexFileKeys: nil,
|
||||
IndexSize: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
indexes: map[UniqueID]map[UniqueID]*model.Index{
|
||||
1: {
|
||||
10: &model.Index{
|
||||
CollectionID: 1,
|
||||
FieldID: 100,
|
||||
IndexID: 10,
|
||||
IndexName: "test_index",
|
||||
IsDeleted: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
req := &milvuspb.GetMetricsRequest{}
|
||||
t.Run("valid request in dc", func(t *testing.T) {
|
||||
jsonReq := gjson.Parse(`{"in": "dc", "collection_id": 1}`)
|
||||
result, err := s.getSegmentsJSON(ctx, req, jsonReq)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, result)
|
||||
})
|
||||
|
||||
t.Run("invalid request", func(t *testing.T) {
|
||||
jsonReq := gjson.Parse(`{"in": "invalid"}`)
|
||||
result, err := s.getSegmentsJSON(ctx, req, jsonReq)
|
||||
assert.Error(t, err)
|
||||
assert.Empty(t, result)
|
||||
})
|
||||
|
||||
t.Run("vaild request in dn", func(t *testing.T) {
|
||||
segments := []*metricsinfo.Segment{
|
||||
{
|
||||
SegmentID: 1,
|
||||
CollectionID: 100,
|
||||
PartitionID: 10,
|
||||
NumOfRows: 1000,
|
||||
State: "Flushed",
|
||||
},
|
||||
}
|
||||
segmentsBytes, err := json.Marshal(segments)
|
||||
assert.NoError(t, err)
|
||||
expectedJSON := string(segmentsBytes)
|
||||
|
||||
mockResp := &milvuspb.GetMetricsResponse{
|
||||
Status: merr.Success(),
|
||||
Response: expectedJSON,
|
||||
}
|
||||
|
||||
mockClient := &mockMetricDataNodeClient{
|
||||
mock: func() (*milvuspb.GetMetricsResponse, error) {
|
||||
return mockResp, nil
|
||||
},
|
||||
}
|
||||
|
||||
dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
|
||||
return mockClient, nil
|
||||
}
|
||||
|
||||
mockCluster := NewMockCluster(t)
|
||||
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
|
||||
s.cluster = mockCluster
|
||||
|
||||
jsonReq := gjson.Parse(`{"in": "dn"}`)
|
||||
result, err := s.getSegmentsJSON(ctx, req, jsonReq)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, result)
|
||||
|
||||
jsonReq = gjson.Parse(`{}`)
|
||||
result, err = s.getSegmentsJSON(ctx, req, jsonReq)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, result)
|
||||
})
|
||||
}
|
||||
|
@ -1145,41 +1145,50 @@ func (s *Server) registerMetricsRequest() {
|
||||
return s.getSystemInfoMetrics(ctx, req)
|
||||
})
|
||||
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataDist,
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.DistKey,
|
||||
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
return s.getDistJSON(ctx, req), nil
|
||||
})
|
||||
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.ImportTasks,
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.ImportTaskKey,
|
||||
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
return s.importMeta.TaskStatsJSON(), nil
|
||||
})
|
||||
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.CompactionTasks,
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.CompactionTaskKey,
|
||||
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
return s.meta.compactionTaskMeta.TaskStatsJSON(), nil
|
||||
})
|
||||
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.BuildIndexTasks,
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.BuildIndexTaskKey,
|
||||
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
return s.meta.indexMeta.TaskStatsJSON(), nil
|
||||
})
|
||||
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks,
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTaskKey,
|
||||
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
return s.getSyncTaskJSON(ctx, req)
|
||||
})
|
||||
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments,
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.SegmentKey,
|
||||
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
return s.getSegmentsJSON(ctx, req)
|
||||
return s.getSegmentsJSON(ctx, req, jsonReq)
|
||||
})
|
||||
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels,
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.ChannelKey,
|
||||
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
return s.getChannelsJSON(ctx, req)
|
||||
})
|
||||
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.IndexKey,
|
||||
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
v := jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey)
|
||||
collectionID := int64(0)
|
||||
if v.Exists() {
|
||||
collectionID = v.Int()
|
||||
}
|
||||
return s.meta.indexMeta.GetIndexJSON(collectionID), nil
|
||||
})
|
||||
log.Info("register metrics actions finished")
|
||||
}
|
||||
|
||||
|
@ -285,17 +285,17 @@ func (node *DataNode) registerMetricsRequest() {
|
||||
return node.getSystemInfoMetrics(ctx, req)
|
||||
})
|
||||
|
||||
node.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks,
|
||||
node.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTaskKey,
|
||||
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
return node.syncMgr.TaskStatsJSON(), nil
|
||||
})
|
||||
|
||||
node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments,
|
||||
node.metricsRequest.RegisterMetricsRequest(metricsinfo.SegmentKey,
|
||||
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
return node.flowgraphManager.GetSegmentsJSON(), nil
|
||||
})
|
||||
|
||||
node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels,
|
||||
node.metricsRequest.RegisterMetricsRequest(metricsinfo.ChannelKey,
|
||||
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
return node.flowgraphManager.GetChannelsJSON(), nil
|
||||
})
|
||||
|
@ -88,6 +88,8 @@ const (
|
||||
QCResourceGroupPath = "/_qc/resource_group"
|
||||
// QCAllTasksPath is the path to get all tasks in QueryCoord.
|
||||
QCAllTasksPath = "/_qc/tasks"
|
||||
// QCSegmentsPath is the path to get segments in QueryCoord.
|
||||
QCSegmentsPath = "/_qc/segments"
|
||||
|
||||
// QNSegmentsPath is the path to get segments in QueryNode.
|
||||
QNSegmentsPath = "/_qn/segments"
|
||||
@ -102,6 +104,8 @@ const (
|
||||
DCCompactionTasksPath = "/_dc/tasks/compaction"
|
||||
// DCBuildIndexTasksPath is the path to get build index tasks in DataCoord.
|
||||
DCBuildIndexTasksPath = "/_dc/tasks/build_index"
|
||||
// DCSegmentsPath is the path to get segments in DataCoord.
|
||||
DCSegmentsPath = "/_dc/segments"
|
||||
|
||||
// DNSyncTasksPath is the path to get sync tasks in DataNode.
|
||||
DNSyncTasksPath = "/_dn/tasks/sync"
|
||||
@ -119,4 +123,7 @@ const (
|
||||
CollectionListPath = "/_collection/list"
|
||||
// CollectionDescPath is the path to get collection description.
|
||||
CollectionDescPath = "/_collection/desc"
|
||||
|
||||
// IndexListPath is the path to get all indexes.
|
||||
IndexListPath = "/_index/list"
|
||||
)
|
||||
|
@ -210,7 +210,9 @@ func getDataComponentMetrics(node *Proxy, metricsType string) gin.HandlerFunc {
|
||||
|
||||
// The Get request should be used to get the query parameters, not the body, such as Javascript
|
||||
// fetch API only support GET request with query parameter.
|
||||
func listCollection(rootCoord types.RootCoordClient, queryCoord types.QueryCoordClient) gin.HandlerFunc {
|
||||
func listCollection(node *Proxy) gin.HandlerFunc {
|
||||
rootCoord := node.rootCoord
|
||||
queryCoord := node.queryCoord
|
||||
return func(c *gin.Context) {
|
||||
dbName := c.Query(httpDBName)
|
||||
if len(dbName) == 0 {
|
||||
@ -290,7 +292,8 @@ func listCollection(rootCoord types.RootCoordClient, queryCoord types.QueryCoord
|
||||
}
|
||||
}
|
||||
|
||||
func describeCollection(node types.ProxyComponent, rootCoord types.RootCoordClient) gin.HandlerFunc {
|
||||
func describeCollection(node *Proxy) gin.HandlerFunc {
|
||||
rootCoord := node.rootCoord
|
||||
return func(c *gin.Context) {
|
||||
dbName := c.Query(httpDBName)
|
||||
collectionName := c.Query(HTTPCollectionName)
|
||||
@ -304,7 +307,7 @@ func describeCollection(node types.ProxyComponent, rootCoord types.RootCoordClie
|
||||
return
|
||||
}
|
||||
|
||||
describeCollectionResp, err := node.DescribeCollection(c, &milvuspb.DescribeCollectionRequest{
|
||||
describeCollectionResp, err := rootCoord.DescribeCollection(c, &milvuspb.DescribeCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_DescribeCollection,
|
||||
},
|
||||
|
@ -178,7 +178,8 @@ func TestListCollection(t *testing.T) {
|
||||
QueryServiceAvailable: []bool{true, true},
|
||||
}, nil)
|
||||
|
||||
handler := listCollection(mockRoortCoordClient, mockQueryCoordClient)
|
||||
proxy := &Proxy{queryCoord: mockQueryCoordClient, rootCoord: mockRoortCoordClient}
|
||||
handler := listCollection(proxy)
|
||||
handler(c)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
@ -194,7 +195,8 @@ func TestListCollection(t *testing.T) {
|
||||
mockRoortCoordClient := mocks.NewMockRootCoordClient(t)
|
||||
mockRoortCoordClient.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(nil, errors.New("error"))
|
||||
|
||||
handler := listCollection(mockRoortCoordClient, nil)
|
||||
proxy := &Proxy{rootCoord: mockRoortCoordClient}
|
||||
handler := listCollection(proxy)
|
||||
handler(c)
|
||||
assert.Equal(t, http.StatusInternalServerError, w.Code)
|
||||
assert.Contains(t, w.Body.String(), "error")
|
||||
@ -212,7 +214,8 @@ func TestListCollection(t *testing.T) {
|
||||
mockQueryCoordClient := mocks.NewMockQueryCoordClient(t)
|
||||
mockQueryCoordClient.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(nil, errors.New("error"))
|
||||
|
||||
handler := listCollection(mockRoortCoordClient, mockQueryCoordClient)
|
||||
proxy := &Proxy{queryCoord: mockQueryCoordClient, rootCoord: mockRoortCoordClient}
|
||||
handler := listCollection(proxy)
|
||||
handler(c)
|
||||
assert.Equal(t, http.StatusInternalServerError, w.Code)
|
||||
assert.Contains(t, w.Body.String(), "error")
|
||||
@ -225,9 +228,8 @@ func TestDescribeCollection(t *testing.T) {
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request, _ = http.NewRequest("GET", "/?db_name=default&collection_name=collection1", nil)
|
||||
|
||||
mockProxy := mocks.NewMockProxy(t)
|
||||
mockRootCoord := mocks.NewMockRootCoordClient(t)
|
||||
mockProxy.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
|
||||
mockRootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
CollectionID: 1,
|
||||
CollectionName: "collection1",
|
||||
@ -255,7 +257,8 @@ func TestDescribeCollection(t *testing.T) {
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
}, nil)
|
||||
|
||||
handler := describeCollection(mockProxy, mockRootCoord)
|
||||
proxy := &Proxy{rootCoord: mockRootCoord}
|
||||
handler := describeCollection(proxy)
|
||||
handler(c)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
@ -268,11 +271,11 @@ func TestDescribeCollection(t *testing.T) {
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request, _ = http.NewRequest("GET", "/?db_name=default&collection_name=collection1", nil)
|
||||
|
||||
mockProxy := mocks.NewMockProxy(t)
|
||||
mockRootCoord := mocks.NewMockRootCoordClient(t)
|
||||
mockProxy.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, errors.New("error"))
|
||||
mockRootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, errors.New("error"))
|
||||
|
||||
handler := describeCollection(mockProxy, mockRootCoord)
|
||||
proxy := &Proxy{rootCoord: mockRootCoord}
|
||||
handler := describeCollection(proxy)
|
||||
handler(c)
|
||||
|
||||
assert.Equal(t, http.StatusInternalServerError, w.Code)
|
||||
@ -284,10 +287,9 @@ func TestDescribeCollection(t *testing.T) {
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request, _ = http.NewRequest("GET", "/?db_name=default", nil)
|
||||
|
||||
mockProxy := mocks.NewMockProxy(t)
|
||||
mockRootCoord := mocks.NewMockRootCoordClient(t)
|
||||
|
||||
handler := describeCollection(mockProxy, mockRootCoord)
|
||||
proxy := &Proxy{rootCoord: mockRootCoord}
|
||||
handler := describeCollection(proxy)
|
||||
handler(c)
|
||||
|
||||
assert.Equal(t, http.StatusBadRequest, w.Code)
|
||||
|
@ -6617,34 +6617,37 @@ func (node *Proxy) RegisterRestRouter(router gin.IRouter) {
|
||||
router.GET(http.SlowQueryPath, getSlowQuery(node))
|
||||
|
||||
// QueryCoord requests that are forwarded from proxy
|
||||
router.GET(http.QCTargetPath, getQueryComponentMetrics(node, metricsinfo.QueryTarget))
|
||||
router.GET(http.QCDistPath, getQueryComponentMetrics(node, metricsinfo.QueryDist))
|
||||
router.GET(http.QCReplicaPath, getQueryComponentMetrics(node, metricsinfo.QueryReplicas))
|
||||
router.GET(http.QCResourceGroupPath, getQueryComponentMetrics(node, metricsinfo.QueryResourceGroups))
|
||||
router.GET(http.QCAllTasksPath, getQueryComponentMetrics(node, metricsinfo.QueryCoordAllTasks))
|
||||
router.GET(http.QCTargetPath, getQueryComponentMetrics(node, metricsinfo.TargetKey))
|
||||
router.GET(http.QCDistPath, getQueryComponentMetrics(node, metricsinfo.DistKey))
|
||||
router.GET(http.QCReplicaPath, getQueryComponentMetrics(node, metricsinfo.ReplicaKey))
|
||||
router.GET(http.QCResourceGroupPath, getQueryComponentMetrics(node, metricsinfo.ResourceGroupKey))
|
||||
router.GET(http.QCAllTasksPath, getQueryComponentMetrics(node, metricsinfo.AllTaskKey))
|
||||
router.GET(http.QCSegmentsPath, getQueryComponentMetrics(node, metricsinfo.SegmentKey))
|
||||
|
||||
// QueryNode requests that are forwarded from querycoord
|
||||
router.GET(http.QNSegmentsPath, getQueryComponentMetrics(node, metricsinfo.QuerySegments))
|
||||
router.GET(http.QNChannelsPath, getQueryComponentMetrics(node, metricsinfo.QueryChannels))
|
||||
router.GET(http.QNSegmentsPath, getQueryComponentMetrics(node, metricsinfo.SegmentKey))
|
||||
router.GET(http.QNChannelsPath, getQueryComponentMetrics(node, metricsinfo.ChannelKey))
|
||||
|
||||
// DataCoord requests that are forwarded from proxy
|
||||
router.GET(http.DCDistPath, getDataComponentMetrics(node, metricsinfo.DataDist))
|
||||
router.GET(http.DCCompactionTasksPath, getDataComponentMetrics(node, metricsinfo.CompactionTasks))
|
||||
router.GET(http.DCImportTasksPath, getDataComponentMetrics(node, metricsinfo.ImportTasks))
|
||||
router.GET(http.DCBuildIndexTasksPath, getDataComponentMetrics(node, metricsinfo.BuildIndexTasks))
|
||||
router.GET(http.DCDistPath, getDataComponentMetrics(node, metricsinfo.DistKey))
|
||||
router.GET(http.DCCompactionTasksPath, getDataComponentMetrics(node, metricsinfo.CompactionTaskKey))
|
||||
router.GET(http.DCImportTasksPath, getDataComponentMetrics(node, metricsinfo.ImportTaskKey))
|
||||
router.GET(http.DCBuildIndexTasksPath, getDataComponentMetrics(node, metricsinfo.BuildIndexTaskKey))
|
||||
router.GET(http.IndexListPath, getDataComponentMetrics(node, metricsinfo.IndexKey))
|
||||
router.GET(http.DCSegmentsPath, getDataComponentMetrics(node, metricsinfo.SegmentKey))
|
||||
|
||||
// Datanode requests that are forwarded from datacoord
|
||||
router.GET(http.DNSyncTasksPath, getDataComponentMetrics(node, metricsinfo.SyncTasks))
|
||||
router.GET(http.DNSegmentsPath, getDataComponentMetrics(node, metricsinfo.DataSegments))
|
||||
router.GET(http.DNChannelsPath, getDataComponentMetrics(node, metricsinfo.DataChannels))
|
||||
router.GET(http.DNSyncTasksPath, getDataComponentMetrics(node, metricsinfo.SyncTaskKey))
|
||||
router.GET(http.DNSegmentsPath, getDataComponentMetrics(node, metricsinfo.SegmentKey))
|
||||
router.GET(http.DNChannelsPath, getDataComponentMetrics(node, metricsinfo.ChannelKey))
|
||||
|
||||
// Database requests
|
||||
router.GET(http.DatabaseListPath, listDatabase(node))
|
||||
router.GET(http.DatabaseDescPath, describeDatabase(node))
|
||||
|
||||
// Collection requests
|
||||
router.GET(http.CollectionListPath, listCollection(node.rootCoord, node.queryCoord))
|
||||
router.GET(http.CollectionDescPath, describeCollection(node, node.rootCoord))
|
||||
router.GET(http.CollectionListPath, listCollection(node))
|
||||
router.GET(http.CollectionDescPath, describeCollection(node))
|
||||
}
|
||||
|
||||
func (node *Proxy) CreatePrivilegeGroup(ctx context.Context, req *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) {
|
||||
|
@ -18,11 +18,13 @@ package querycoordv2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"github.com/tidwall/gjson"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
@ -291,6 +293,36 @@ func (s *Server) getSegmentsFromQueryNode(ctx context.Context, req *milvuspb.Get
|
||||
return metricsinfo.MarshalGetMetricsValues(segments, err)
|
||||
}
|
||||
|
||||
func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
v := jsonReq.Get(metricsinfo.MetricRequestParamINKey)
|
||||
if !v.Exists() {
|
||||
// default to get all segments from dataanode
|
||||
return s.getSegmentsFromQueryNode(ctx, req)
|
||||
}
|
||||
|
||||
in := v.String()
|
||||
if in == "qn" {
|
||||
// TODO: support filter by collection id
|
||||
return s.getSegmentsFromQueryNode(ctx, req)
|
||||
}
|
||||
|
||||
if in == "qc" {
|
||||
v = jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey)
|
||||
collectionID := int64(0)
|
||||
if v.Exists() {
|
||||
collectionID = v.Int()
|
||||
}
|
||||
filteredSegments := s.dist.SegmentDistManager.GetSegmentDist(collectionID)
|
||||
bs, err := json.Marshal(filteredSegments)
|
||||
if err != nil {
|
||||
log.Warn("marshal segment value failed", zap.Int64("collectionID", collectionID), zap.String("err", err.Error()))
|
||||
return "", nil
|
||||
}
|
||||
return string(bs), nil
|
||||
}
|
||||
return "", fmt.Errorf("invalid param value in=[%s], it should be qc or qn", in)
|
||||
}
|
||||
|
||||
// TODO(dragondriver): add more detail metrics
|
||||
func (s *Server) getSystemInfoMetrics(
|
||||
ctx context.Context,
|
||||
|
@ -21,9 +21,12 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/tidwall/gjson"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/json"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
)
|
||||
@ -106,3 +109,74 @@ func TestGetSegmentsFromQueryNode(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, expectedSegments, actualSegments)
|
||||
}
|
||||
|
||||
func TestServer_getSegmentsJSON(t *testing.T) {
|
||||
mockCluster := session.NewMockCluster(t)
|
||||
nodeManager := session.NewNodeManager()
|
||||
nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1}))
|
||||
server := &Server{cluster: mockCluster, nodeMgr: nodeManager}
|
||||
expectedSegments := []*metricsinfo.Segment{
|
||||
{
|
||||
SegmentID: 1,
|
||||
PartitionID: 1,
|
||||
Channel: "channel1",
|
||||
ResourceGroup: "default",
|
||||
MemSize: int64(1024),
|
||||
LoadedInsertRowCount: 100,
|
||||
},
|
||||
{
|
||||
SegmentID: 2,
|
||||
PartitionID: 1,
|
||||
Channel: "channel2",
|
||||
ResourceGroup: "default",
|
||||
MemSize: int64(1024),
|
||||
LoadedInsertRowCount: 200,
|
||||
},
|
||||
}
|
||||
resp := &milvuspb.GetMetricsResponse{
|
||||
Response: func() string {
|
||||
data, _ := json.Marshal(expectedSegments)
|
||||
return string(data)
|
||||
}(),
|
||||
}
|
||||
req := &milvuspb.GetMetricsRequest{}
|
||||
mockCluster.EXPECT().GetMetrics(mock.Anything, mock.Anything, req).Return(resp, nil)
|
||||
|
||||
server.dist = meta.NewDistributionManager()
|
||||
server.dist.SegmentDistManager.Update(1, meta.SegmentFromInfo(&datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: 1,
|
||||
PartitionID: 1,
|
||||
InsertChannel: "dmc0",
|
||||
}))
|
||||
|
||||
ctx := context.TODO()
|
||||
|
||||
t.Run("valid request in dc", func(t *testing.T) {
|
||||
jsonReq := gjson.Parse(`{"in": "qc", "collection_id": 1}`)
|
||||
result, err := server.getSegmentsJSON(ctx, req, jsonReq)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, result)
|
||||
})
|
||||
|
||||
t.Run("invalid request", func(t *testing.T) {
|
||||
jsonReq := gjson.Parse(`{"in": "invalid"}`)
|
||||
result, err := server.getSegmentsJSON(ctx, req, jsonReq)
|
||||
assert.Error(t, err)
|
||||
assert.Empty(t, result)
|
||||
})
|
||||
|
||||
t.Run("valid request in qn", func(t *testing.T) {
|
||||
jsonReq := gjson.Parse(`{"in": "qn"}`)
|
||||
result, err := server.getSegmentsJSON(ctx, req, jsonReq)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, result)
|
||||
})
|
||||
|
||||
t.Run("valid request in qc", func(t *testing.T) {
|
||||
jsonReq := gjson.Parse(`{"in": "qc", "collection_id": 1}`)
|
||||
result, err := server.getSegmentsJSON(ctx, req, jsonReq)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, result)
|
||||
})
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ func NewDistributionManager() *DistributionManager {
|
||||
// If there are no segments, channels, or leader views, it returns an empty string.
|
||||
// In case of an error during JSON marshaling, it returns the error.
|
||||
func (dm *DistributionManager) GetDistributionJSON() string {
|
||||
segments := dm.GetSegmentDist()
|
||||
segments := dm.GetSegmentDist(0)
|
||||
channels := dm.GetChannelDist()
|
||||
leaderView := dm.GetLeaderView()
|
||||
|
||||
|
@ -351,7 +351,10 @@ func (mgr *LeaderViewManager) GetLeaderView() []*metricsinfo.LeaderView {
|
||||
}
|
||||
|
||||
for _, seg := range lv.GrowingSegments {
|
||||
leaderView.GrowingSegments = append(leaderView.GrowingSegments, newSegmentMetricsFrom(seg))
|
||||
leaderView.GrowingSegments = append(leaderView.GrowingSegments, &metricsinfo.Segment{
|
||||
SegmentID: seg.ID,
|
||||
NodeID: seg.Node,
|
||||
})
|
||||
}
|
||||
|
||||
leaderViews = append(leaderViews, leaderView)
|
||||
|
@ -137,12 +137,13 @@ func newSegmentMetricsFrom(segment *Segment) *metricsinfo.Segment {
|
||||
convertedSegment := metrics.NewSegmentFrom(segment.SegmentInfo)
|
||||
convertedSegment.NodeID = segment.Node
|
||||
convertedSegment.LoadedTimestamp = tsoutil.PhysicalTimeFormat(segment.LastDeltaTimestamp)
|
||||
convertedSegment.Index = lo.Map(lo.Values(segment.IndexInfo), func(e *querypb.FieldIndexInfo, i int) *metricsinfo.SegmentIndex {
|
||||
return &metricsinfo.SegmentIndex{
|
||||
convertedSegment.IndexedFields = lo.Map(lo.Values(segment.IndexInfo), func(e *querypb.FieldIndexInfo, i int) *metricsinfo.IndexedField {
|
||||
return &metricsinfo.IndexedField{
|
||||
IndexFieldID: e.FieldID,
|
||||
IndexID: e.IndexID,
|
||||
BuildID: e.BuildID,
|
||||
IndexSize: e.IndexSize,
|
||||
IsLoaded: true,
|
||||
}
|
||||
})
|
||||
return convertedSegment
|
||||
@ -246,14 +247,16 @@ func (m *SegmentDistManager) GetByFilter(filters ...SegmentDistFilter) []*Segmen
|
||||
return ret
|
||||
}
|
||||
|
||||
func (m *SegmentDistManager) GetSegmentDist() []*metricsinfo.Segment {
|
||||
func (m *SegmentDistManager) GetSegmentDist(collectionID int64) []*metricsinfo.Segment {
|
||||
m.rwmutex.RLock()
|
||||
defer m.rwmutex.RUnlock()
|
||||
|
||||
var segments []*metricsinfo.Segment
|
||||
for _, nodeSeg := range m.segments {
|
||||
for _, segment := range nodeSeg.segments {
|
||||
segments = append(segments, newSegmentMetricsFrom(segment))
|
||||
if collectionID == 0 || segment.GetCollectionID() == collectionID {
|
||||
segments = append(segments, newSegmentMetricsFrom(segment))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -222,7 +222,7 @@ func TestGetSegmentDistJSON(t *testing.T) {
|
||||
manager.Update(1, segment1)
|
||||
manager.Update(2, segment2)
|
||||
|
||||
segments := manager.GetSegmentDist()
|
||||
segments := manager.GetSegmentDist(0)
|
||||
assert.Equal(t, 2, len(segments))
|
||||
|
||||
checkResults := func(s *metricsinfo.Segment) {
|
||||
|
@ -221,7 +221,7 @@ func (s *Server) registerMetricsRequest() {
|
||||
}
|
||||
|
||||
QuerySegmentsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
return s.getSegmentsFromQueryNode(ctx, req)
|
||||
return s.getSegmentsJSON(ctx, req, jsonReq)
|
||||
}
|
||||
|
||||
QueryChannelsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
@ -230,15 +230,15 @@ func (s *Server) registerMetricsRequest() {
|
||||
|
||||
// register actions that requests are processed in querycoord
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.SystemInfoMetrics, getSystemInfoAction)
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryCoordAllTasks, QueryTasksAction)
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryDist, QueryDistAction)
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryTarget, QueryTargetAction)
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryReplicas, QueryReplicasAction)
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryResourceGroups, QueryResourceGroupsAction)
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.AllTaskKey, QueryTasksAction)
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.DistKey, QueryDistAction)
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.TargetKey, QueryTargetAction)
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.ReplicaKey, QueryReplicasAction)
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.ResourceGroupKey, QueryResourceGroupsAction)
|
||||
|
||||
// register actions that requests are processed in querynode
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QuerySegments, QuerySegmentsAction)
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryChannels, QueryChannelsAction)
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.SegmentKey, QuerySegmentsAction)
|
||||
s.metricsRequest.RegisterMetricsRequest(metricsinfo.ChannelKey, QueryChannelsAction)
|
||||
log.Info("register metrics actions finished")
|
||||
}
|
||||
|
||||
|
@ -189,9 +189,9 @@ func getSegmentJSON(node *QueryNode) string {
|
||||
allSegments := node.manager.Segment.GetBy()
|
||||
var ms []*metricsinfo.Segment
|
||||
for _, s := range allSegments {
|
||||
indexes := make([]*metricsinfo.SegmentIndex, 0, len(s.Indexes()))
|
||||
indexes := make([]*metricsinfo.IndexedField, 0, len(s.Indexes()))
|
||||
for _, index := range s.Indexes() {
|
||||
indexes = append(indexes, &metricsinfo.SegmentIndex{
|
||||
indexes = append(indexes, &metricsinfo.IndexedField{
|
||||
IndexFieldID: index.IndexInfo.FieldID,
|
||||
IndexID: index.IndexInfo.IndexID,
|
||||
IndexSize: index.IndexInfo.IndexSize,
|
||||
@ -205,7 +205,7 @@ func getSegmentJSON(node *QueryNode) string {
|
||||
CollectionID: s.Collection(),
|
||||
PartitionID: s.Partition(),
|
||||
MemSize: s.MemSize(),
|
||||
Index: indexes,
|
||||
IndexedFields: indexes,
|
||||
State: s.Type().String(),
|
||||
ResourceGroup: s.ResourceGroup(),
|
||||
LoadedInsertRowCount: s.InsertCount(),
|
||||
|
@ -120,12 +120,12 @@ func TestGetSegmentJSON(t *testing.T) {
|
||||
assert.Equal(t, int64(1001), segments[0].CollectionID)
|
||||
assert.Equal(t, int64(2001), segments[0].PartitionID)
|
||||
assert.Equal(t, int64(1024), segments[0].MemSize)
|
||||
assert.Equal(t, 1, len(segments[0].Index))
|
||||
assert.Equal(t, int64(1), segments[0].Index[0].IndexFieldID)
|
||||
assert.Equal(t, int64(101), segments[0].Index[0].IndexID)
|
||||
assert.Equal(t, int64(512), segments[0].Index[0].IndexSize)
|
||||
assert.Equal(t, int64(10001), segments[0].Index[0].BuildID)
|
||||
assert.True(t, segments[0].Index[0].IsLoaded)
|
||||
assert.Equal(t, 1, len(segments[0].IndexedFields))
|
||||
assert.Equal(t, int64(1), segments[0].IndexedFields[0].IndexFieldID)
|
||||
assert.Equal(t, int64(101), segments[0].IndexedFields[0].IndexID)
|
||||
assert.Equal(t, int64(512), segments[0].IndexedFields[0].IndexSize)
|
||||
assert.Equal(t, int64(10001), segments[0].IndexedFields[0].BuildID)
|
||||
assert.True(t, segments[0].IndexedFields[0].IsLoaded)
|
||||
assert.Equal(t, "Growing", segments[0].State)
|
||||
assert.Equal(t, "default", segments[0].ResourceGroup)
|
||||
assert.Equal(t, int64(100), segments[0].LoadedInsertRowCount)
|
||||
|
@ -284,12 +284,12 @@ func (node *QueryNode) registerMetricsRequest() {
|
||||
return getSystemInfoMetrics(ctx, req, node)
|
||||
})
|
||||
|
||||
node.metricsRequest.RegisterMetricsRequest(metricsinfo.QuerySegments,
|
||||
node.metricsRequest.RegisterMetricsRequest(metricsinfo.SegmentKey,
|
||||
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
return getSegmentJSON(node), nil
|
||||
})
|
||||
|
||||
node.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryChannels,
|
||||
node.metricsRequest.RegisterMetricsRequest(metricsinfo.ChannelKey,
|
||||
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
||||
return getChannelJSON(node), nil
|
||||
})
|
||||
|
@ -44,54 +44,60 @@ const (
|
||||
// MetricRequestParamsSeparator is a separator that parameter value will be joined be separator
|
||||
MetricRequestParamsSeparator = ","
|
||||
|
||||
// QuerySegmentDist request for segment distribution on the query node
|
||||
QuerySegments = "qn_segments"
|
||||
// SegmentKey request for get segments from the datanode/querynode/datacoord/querycoord
|
||||
SegmentKey = "segments"
|
||||
|
||||
// QueryChannelDist request for channel distribution on the query node
|
||||
QueryChannels = "qn_channels"
|
||||
// ChannelKey request for get channels from the datanode/querynode/datacoord/querycoord
|
||||
ChannelKey = "channels"
|
||||
|
||||
// QueryDist request for segment/channel/leader view distribution on querycoord
|
||||
QueryDist = "qc_dist"
|
||||
// DistKey request for segment/channel/leader view distribution on querycoord
|
||||
// DistKey request for get segments on the datacoord
|
||||
DistKey = "dist"
|
||||
|
||||
// QueryTarget request for segment/channel target on the querycoord
|
||||
QueryTarget = "qc_target"
|
||||
// TargetKey request for segment/channel target on the querycoord
|
||||
TargetKey = "qc_target"
|
||||
|
||||
// QueryCoordAllTasks request for get tasks on the querycoord
|
||||
QueryCoordAllTasks = "qc_tasks_all"
|
||||
// AllTaskKey request for get all tasks on the querycoord
|
||||
AllTaskKey = "tasks_all"
|
||||
|
||||
// QueryReplicas request for get replica on the querycoord
|
||||
QueryReplicas = "qc_replica"
|
||||
// ReplicaKey request for get replica on the querycoord
|
||||
ReplicaKey = "replica"
|
||||
|
||||
// QueryResourceGroups request for get resource groups on the querycoord
|
||||
QueryResourceGroups = "qc_resource_group"
|
||||
// ResourceGroupKey request for get resource groups on the querycoord
|
||||
ResourceGroupKey = "resource_group"
|
||||
|
||||
// DataDist request for get segments on the datacoord
|
||||
DataDist = "dc_segments"
|
||||
// ImportTaskKey request for get import tasks from the datacoord
|
||||
ImportTaskKey = "import_tasks"
|
||||
|
||||
// ImportTasks request for get import tasks from the datacoord
|
||||
ImportTasks = "dc_import_tasks"
|
||||
// CompactionTaskKey request for get compaction tasks from the datacoord
|
||||
CompactionTaskKey = "compaction_tasks"
|
||||
|
||||
// CompactionTasks request for get compaction tasks from the datacoord
|
||||
CompactionTasks = "dc_compaction_tasks"
|
||||
// BuildIndexTaskKey request for get building index tasks from the datacoord
|
||||
BuildIndexTaskKey = "build_index_tasks"
|
||||
|
||||
// BuildIndexTasks request for get building index tasks from the datacoord
|
||||
BuildIndexTasks = "dc_build_index_tasks"
|
||||
// IndexKey request for get index list/detail from the datacoord
|
||||
IndexKey = "index"
|
||||
|
||||
// SyncTasks request for get sync tasks from the datanode
|
||||
SyncTasks = "dn_sync_tasks"
|
||||
|
||||
// DataSegments request for get segments from the datanode
|
||||
DataSegments = "dn_segments"
|
||||
|
||||
// DataChannels request for get channels from the datanode
|
||||
DataChannels = "dn_channels"
|
||||
// SyncTaskKey request for get sync tasks from the datanode
|
||||
SyncTaskKey = "sync_tasks"
|
||||
|
||||
// MetricRequestParamVerboseKey as a request parameter decide to whether return verbose value
|
||||
MetricRequestParamVerboseKey = "verbose"
|
||||
|
||||
MetricRequestParamTargetScopeKey = "target_scope"
|
||||
|
||||
MetricRequestParamINKey = "in"
|
||||
|
||||
MetricRequestParamCollectionIDKey = "collection_id"
|
||||
)
|
||||
|
||||
var MetricRequestParamINValue = map[string]struct{}{
|
||||
"dc": {},
|
||||
"qc": {},
|
||||
"dn": {},
|
||||
"qn": {},
|
||||
}
|
||||
|
||||
type MetricsRequestAction func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error)
|
||||
|
||||
type MetricsRequest struct {
|
||||
|
@ -126,7 +126,7 @@ type Segment struct {
|
||||
// load related
|
||||
IsInvisible bool `json:"is_invisible,omitempty"`
|
||||
LoadedTimestamp string `json:"loaded_timestamp,omitempty,string"`
|
||||
Index []*SegmentIndex `json:"index,omitempty"`
|
||||
IndexedFields []*IndexedField `json:"index_fields,omitempty"`
|
||||
ResourceGroup string `json:"resource_group,omitempty"`
|
||||
LoadedInsertRowCount int64 `json:"loaded_insert_row_count,omitempty,string"` // inert row count for growing segment that excludes the deleted row count in QueryNode
|
||||
MemSize int64 `json:"mem_size,omitempty,string"` // memory size of segment in QueryNode
|
||||
@ -135,9 +135,11 @@ type Segment struct {
|
||||
FlushedRows int64 `json:"flushed_rows,omitempty,string"`
|
||||
SyncBufferRows int64 `json:"sync_buffer_rows,omitempty,string"`
|
||||
SyncingRows int64 `json:"syncing_rows,omitempty,string"`
|
||||
|
||||
IsIndexed bool `json:"is_indexed,omitempty"` // indicate whether the segment is indexed
|
||||
}
|
||||
|
||||
type SegmentIndex struct {
|
||||
type IndexedField struct {
|
||||
IndexFieldID int64 `json:"field_id,omitempty,string"`
|
||||
IndexID int64 `json:"index_id,omitempty,string"`
|
||||
BuildID int64 `json:"build_id,omitempty,string"`
|
||||
@ -463,3 +465,17 @@ type Databases struct {
|
||||
IDs []string `json:"db_ids,omitempty"`
|
||||
CreatedTimestamps []string `json:"created_timestamps,omitempty"`
|
||||
}
|
||||
|
||||
type Index struct {
|
||||
CollectionID int64 `json:"collection_id,omitempty,string"`
|
||||
FieldID int64 `json:"field_id,omitempty,string"`
|
||||
IndexID int64 `json:"index_id,omitempty,string"`
|
||||
Name string `json:"name,omitempty"`
|
||||
IsDeleted bool `json:"is_deleted"`
|
||||
CreateTime string `json:"create_time,omitempty"`
|
||||
IndexParams map[string]string `json:"index_params,omitempty"`
|
||||
IsAutoIndex bool `json:"is_auto_index,omitempty"`
|
||||
UserIndexParams map[string]string `json:"user_index_params"`
|
||||
State string `json:"state,omitempty"`
|
||||
IndexStateFailReason string `json:"index_state_fail_reason,omitempty"`
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user