mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
enhance: use different value to get related data size according to segment type (#33017)
issue: #30436 Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
parent
4fc7915c70
commit
1d48d0aeb2
@ -352,7 +352,7 @@ message SegmentLoadInfo {
|
||||
repeated data.FieldBinlog deltalogs = 9;
|
||||
repeated int64 compactionFrom = 10; // segmentIDs compacted from
|
||||
repeated FieldIndexInfo index_infos = 11;
|
||||
int64 segment_size = 12;
|
||||
int64 segment_size = 12 [deprecated = true];
|
||||
string insert_channel = 13;
|
||||
msg.MsgPosition start_position = 14;
|
||||
msg.MsgPosition delta_position = 15;
|
||||
|
@ -86,52 +86,9 @@ func PackSegmentLoadInfo(segment *datapb.SegmentInfo, channelCheckpoint *msgpb.M
|
||||
Level: segment.GetLevel(),
|
||||
StorageVersion: segment.GetStorageVersion(),
|
||||
}
|
||||
loadInfo.SegmentSize = calculateSegmentSize(loadInfo)
|
||||
return loadInfo
|
||||
}
|
||||
|
||||
func calculateSegmentSize(segmentLoadInfo *querypb.SegmentLoadInfo) int64 {
|
||||
segmentSize := int64(0)
|
||||
|
||||
fieldIndex := make(map[int64]*querypb.FieldIndexInfo)
|
||||
for _, index := range segmentLoadInfo.IndexInfos {
|
||||
if index.EnableIndex {
|
||||
fieldID := index.FieldID
|
||||
fieldIndex[fieldID] = index
|
||||
}
|
||||
}
|
||||
|
||||
for _, fieldBinlog := range segmentLoadInfo.BinlogPaths {
|
||||
fieldID := fieldBinlog.FieldID
|
||||
if index, ok := fieldIndex[fieldID]; ok {
|
||||
segmentSize += index.IndexSize
|
||||
} else {
|
||||
segmentSize += getFieldSizeFromFieldBinlog(fieldBinlog)
|
||||
}
|
||||
}
|
||||
|
||||
// Get size of state data
|
||||
for _, fieldBinlog := range segmentLoadInfo.Statslogs {
|
||||
segmentSize += getFieldSizeFromFieldBinlog(fieldBinlog)
|
||||
}
|
||||
|
||||
// Get size of delete data
|
||||
for _, fieldBinlog := range segmentLoadInfo.Deltalogs {
|
||||
segmentSize += getFieldSizeFromFieldBinlog(fieldBinlog)
|
||||
}
|
||||
|
||||
return segmentSize
|
||||
}
|
||||
|
||||
func getFieldSizeFromFieldBinlog(fieldBinlog *datapb.FieldBinlog) int64 {
|
||||
fieldSize := int64(0)
|
||||
for _, binlog := range fieldBinlog.Binlogs {
|
||||
fieldSize += binlog.LogSize
|
||||
}
|
||||
|
||||
return fieldSize
|
||||
}
|
||||
|
||||
func MergeDmChannelInfo(infos []*datapb.VchannelInfo) *meta.DmChannel {
|
||||
var dmChannel *meta.DmChannel
|
||||
|
||||
|
@ -146,7 +146,7 @@ func retrieveOnSegmentsWithStream(ctx context.Context, segments []Segment, segTy
|
||||
Ids: result.GetIds(),
|
||||
FieldsData: result.GetFieldsData(),
|
||||
CostAggregation: &internalpb.CostAggregation{
|
||||
TotalRelatedDataSize: segment.MemSize(),
|
||||
TotalRelatedDataSize: GetSegmentRelatedDataSize(segment),
|
||||
},
|
||||
AllRetrieveCount: result.GetAllRetrieveCount(),
|
||||
}); err != nil {
|
||||
|
@ -23,7 +23,9 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/segments/metricsutil"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
@ -206,3 +208,39 @@ func withLazyLoadTimeoutContext(ctx context.Context) (context.Context, context.C
|
||||
// TODO: use context.WithTimeoutCause instead of contextutil.WithTimeoutCause in go1.21
|
||||
return contextutil.WithTimeoutCause(ctx, lazyLoadTimeout, errLazyLoadTimeout)
|
||||
}
|
||||
|
||||
func GetSegmentRelatedDataSize(segment Segment) int64 {
|
||||
if segment.Type() == SegmentTypeSealed {
|
||||
return calculateSegmentLogSize(segment.LoadInfo())
|
||||
}
|
||||
return segment.MemSize()
|
||||
}
|
||||
|
||||
func calculateSegmentLogSize(segmentLoadInfo *querypb.SegmentLoadInfo) int64 {
|
||||
segmentSize := int64(0)
|
||||
|
||||
for _, fieldBinlog := range segmentLoadInfo.BinlogPaths {
|
||||
segmentSize += getFieldSizeFromFieldBinlog(fieldBinlog)
|
||||
}
|
||||
|
||||
// Get size of state data
|
||||
for _, fieldBinlog := range segmentLoadInfo.Statslogs {
|
||||
segmentSize += getFieldSizeFromFieldBinlog(fieldBinlog)
|
||||
}
|
||||
|
||||
// Get size of delete data
|
||||
for _, fieldBinlog := range segmentLoadInfo.Deltalogs {
|
||||
segmentSize += getFieldSizeFromFieldBinlog(fieldBinlog)
|
||||
}
|
||||
|
||||
return segmentSize
|
||||
}
|
||||
|
||||
func getFieldSizeFromFieldBinlog(fieldBinlog *datapb.FieldBinlog) int64 {
|
||||
fieldSize := int64(0)
|
||||
for _, binlog := range fieldBinlog.Binlogs {
|
||||
fieldSize += binlog.LogSize
|
||||
}
|
||||
|
||||
return fieldSize
|
||||
}
|
||||
|
@ -4,6 +4,9 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
)
|
||||
|
||||
func TestFilterZeroValuesFromSlice(t *testing.T) {
|
||||
@ -18,3 +21,57 @@ func TestFilterZeroValuesFromSlice(t *testing.T) {
|
||||
assert.Equal(t, 3, len(filteredInts))
|
||||
assert.EqualValues(t, []int64{10, 5, 13}, filteredInts)
|
||||
}
|
||||
|
||||
func TestGetSegmentRelatedDataSize(t *testing.T) {
|
||||
t.Run("seal segment", func(t *testing.T) {
|
||||
segment := NewMockSegment(t)
|
||||
segment.EXPECT().Type().Return(SegmentTypeSealed)
|
||||
segment.EXPECT().LoadInfo().Return(&querypb.SegmentLoadInfo{
|
||||
BinlogPaths: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogSize: 10,
|
||||
},
|
||||
{
|
||||
LogSize: 20,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogSize: 30,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Deltalogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogSize: 30,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Statslogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogSize: 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.EqualValues(t, 100, GetSegmentRelatedDataSize(segment))
|
||||
})
|
||||
|
||||
t.Run("growing segment", func(t *testing.T) {
|
||||
segment := NewMockSegment(t)
|
||||
segment.EXPECT().Type().Return(SegmentTypeGrowing)
|
||||
segment.EXPECT().MemSize().Return(int64(100))
|
||||
assert.EqualValues(t, 100, GetSegmentRelatedDataSize(segment))
|
||||
})
|
||||
}
|
||||
|
@ -144,7 +144,7 @@ func (t *QueryTask) Execute() error {
|
||||
}
|
||||
|
||||
relatedDataSize := lo.Reduce(querySegments, func(acc int64, seg segments.Segment, _ int) int64 {
|
||||
return acc + seg.MemSize()
|
||||
return acc + segments.GetSegmentRelatedDataSize(seg)
|
||||
}, 0)
|
||||
|
||||
t.result = &internalpb.RetrieveResults{
|
||||
|
@ -215,7 +215,7 @@ func (t *SearchTask) Execute() error {
|
||||
}
|
||||
|
||||
relatedDataSize := lo.Reduce(searchedSegments, func(acc int64, seg segments.Segment, _ int) int64 {
|
||||
return acc + seg.MemSize()
|
||||
return acc + segments.GetSegmentRelatedDataSize(seg)
|
||||
}, 0)
|
||||
|
||||
tr.RecordSpan()
|
||||
@ -445,6 +445,7 @@ func (t *StreamingSearchTask) Execute() error {
|
||||
|
||||
// 1. search&&reduce or streaming-search&&streaming-reduce
|
||||
metricType := searchReq.Plan().GetMetricType()
|
||||
var relatedDataSize int64
|
||||
if req.GetScope() == querypb.DataScope_Historical {
|
||||
streamReduceFunc := func(result *segments.SearchResult) error {
|
||||
reduceErr := t.streamReduce(t.ctx, searchReq.Plan(), result, t.originNqs, t.originTopks)
|
||||
@ -470,6 +471,9 @@ func (t *StreamingSearchTask) Execute() error {
|
||||
log.Error("Failed to get stream-reduced search result")
|
||||
return err
|
||||
}
|
||||
relatedDataSize = lo.Reduce(pinnedSegments, func(acc int64, seg segments.Segment, _ int) int64 {
|
||||
return acc + segments.GetSegmentRelatedDataSize(seg)
|
||||
}, 0)
|
||||
} else if req.GetScope() == querypb.DataScope_Streaming {
|
||||
results, pinnedSegments, err := segments.SearchStreaming(
|
||||
t.ctx,
|
||||
@ -507,6 +511,9 @@ func (t *StreamingSearchTask) Execute() error {
|
||||
metrics.ReduceSegments,
|
||||
metrics.BatchReduce).
|
||||
Observe(float64(tr.RecordSpan().Milliseconds()))
|
||||
relatedDataSize = lo.Reduce(pinnedSegments, func(acc int64, seg segments.Segment, _ int) int64 {
|
||||
return acc + segments.GetSegmentRelatedDataSize(seg)
|
||||
}, 0)
|
||||
}
|
||||
|
||||
// 2. reorganize blobs to original search request
|
||||
@ -539,7 +546,8 @@ func (t *StreamingSearchTask) Execute() error {
|
||||
SlicedOffset: 1,
|
||||
SlicedNumCount: 1,
|
||||
CostAggregation: &internalpb.CostAggregation{
|
||||
ServiceTime: tr.ElapseSpan().Milliseconds(),
|
||||
ServiceTime: tr.ElapseSpan().Milliseconds(),
|
||||
TotalRelatedDataSize: relatedDataSize,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user