mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
Support rate limit based on growing segments size (#24121)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
a037f36891
commit
7384d83d2c
@ -501,6 +501,13 @@ quotaAndLimits:
|
||||
dataNodeMemoryHighWaterLevel: 0.95 # (0, 1], memoryHighWaterLevel in DataNodes
|
||||
queryNodeMemoryLowWaterLevel: 0.85 # (0, 1], memoryLowWaterLevel in QueryNodes
|
||||
queryNodeMemoryHighWaterLevel: 0.95 # (0, 1], memoryHighWaterLevel in QueryNodes
|
||||
growingSegmentsSizeProtection:
|
||||
# 1. No action will be taken if the ratio of growing segments size is less than the low water level.
|
||||
# 2. The DML rate will be reduced if the ratio of growing segments size is greater than the low water level and less than the high water level.
|
||||
# 3. All DML requests will be rejected if the ratio of growing segments size is greater than the high water level.
|
||||
enabled: false
|
||||
lowWaterLevel: 0.2
|
||||
highWaterLevel: 0.4
|
||||
diskProtection:
|
||||
enabled: true # When the total file size of object storage is greater than `diskQuota`, all dml requests would be rejected;
|
||||
diskQuota: -1 # MB, (0, +inf), default no limit
|
||||
|
@ -20,9 +20,12 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/collector"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
@ -104,6 +107,18 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
|
||||
}
|
||||
|
||||
minTsafeChannel, minTsafe := node.tSafeManager.Min()
|
||||
|
||||
growingSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing))
|
||||
growingSegmentsSize := lo.SumBy(growingSegments, func(seg segments.Segment) int64 {
|
||||
return seg.MemSize()
|
||||
})
|
||||
|
||||
allSegments := node.manager.Segment.GetBy()
|
||||
collections := typeutil.NewUniqueSet()
|
||||
for _, segment := range allSegments {
|
||||
collections.Insert(segment.Collection())
|
||||
}
|
||||
|
||||
return &metricsinfo.QueryNodeQuotaMetrics{
|
||||
Hms: metricsinfo.HardwareMetrics{},
|
||||
Rms: rms,
|
||||
@ -112,8 +127,13 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
|
||||
MinFlowGraphTt: minTsafe,
|
||||
NumFlowGraph: node.pipelineManager.Num(),
|
||||
},
|
||||
SearchQueue: sqms,
|
||||
QueryQueue: qqms,
|
||||
SearchQueue: sqms,
|
||||
QueryQueue: qqms,
|
||||
GrowingSegmentsSize: growingSegmentsSize,
|
||||
Effect: metricsinfo.NodeEffect{
|
||||
NodeID: paramtable.GetNodeID(),
|
||||
CollectionIDs: collections.Collect(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -80,6 +80,7 @@ type collectionStates = map[milvuspb.QuotaState]commonpb.ErrorCode
|
||||
// 4. DQL Queue length protection -> dqlRate = curDQLRate * CoolOffSpeed
|
||||
// 5. DQL queue latency protection -> dqlRate = curDQLRate * CoolOffSpeed
|
||||
// 6. Search result protection -> searchRate = curSearchRate * CoolOffSpeed
|
||||
// 7. GrowingSegsSize protection -> dmlRate = maxDMLRate * (high - cur) / (high - low)
|
||||
//
|
||||
// If necessary, user can also manually force to deny RW requests.
|
||||
type QuotaCenter struct {
|
||||
@ -422,14 +423,23 @@ func (q *QuotaCenter) calculateWriteRates() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
collectionFactors := q.getTimeTickDelayFactor(ts)
|
||||
memFactors := q.getMemoryFactor()
|
||||
for collection, factor := range memFactors {
|
||||
_, ok := collectionFactors[collection]
|
||||
if !ok || collectionFactors[collection] > factor {
|
||||
collectionFactors[collection] = factor
|
||||
|
||||
var collectionFactors map[int64]float64
|
||||
updateCollectionFactor := func(factors map[int64]float64) {
|
||||
for collection, factor := range factors {
|
||||
_, ok := collectionFactors[collection]
|
||||
if !ok || collectionFactors[collection] > factor {
|
||||
collectionFactors[collection] = factor
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
collectionFactors = q.getTimeTickDelayFactor(ts)
|
||||
memFactors := q.getMemoryFactor()
|
||||
updateCollectionFactor(memFactors)
|
||||
growingSegFactors := q.getGrowingSegmentsSizeFactor()
|
||||
updateCollectionFactor(growingSegFactors)
|
||||
|
||||
for collection, factor := range collectionFactors {
|
||||
if factor <= 0 {
|
||||
q.forceDenyWriting(commonpb.ErrorCode_TimeTickLongDelay, collection)
|
||||
@ -598,6 +608,51 @@ func (q *QuotaCenter) getMemoryFactor() map[int64]float64 {
|
||||
return collectionFactor
|
||||
}
|
||||
|
||||
func (q *QuotaCenter) getGrowingSegmentsSizeFactor() map[int64]float64 {
|
||||
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
|
||||
if !Params.QuotaConfig.GrowingSegmentsSizeProtectionEnabled.GetAsBool() {
|
||||
return make(map[int64]float64)
|
||||
}
|
||||
|
||||
low := Params.QuotaConfig.GrowingSegmentsSizeLowWaterLevel.GetAsFloat()
|
||||
high := Params.QuotaConfig.GrowingSegmentsSizeHighWaterLevel.GetAsFloat()
|
||||
|
||||
collectionFactor := make(map[int64]float64)
|
||||
updateCollectionFactor := func(factor float64, collections []int64) {
|
||||
for _, collection := range collections {
|
||||
_, ok := collectionFactor[collection]
|
||||
if !ok || collectionFactor[collection] > factor {
|
||||
collectionFactor[collection] = factor
|
||||
}
|
||||
}
|
||||
}
|
||||
for nodeID, metric := range q.queryNodeMetrics {
|
||||
cur := float64(metric.GrowingSegmentsSize) / float64(metric.Hms.Memory)
|
||||
if cur <= low {
|
||||
continue
|
||||
}
|
||||
if cur >= high {
|
||||
log.RatedWarn(10, "QuotaCenter: QueryNode growing segments size to high water level",
|
||||
zap.String("Node", fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID)),
|
||||
zap.Int64s("collections", metric.Effect.CollectionIDs),
|
||||
zap.Int64("segmentsSize", metric.GrowingSegmentsSize),
|
||||
zap.Uint64("TotalMem", metric.Hms.Memory),
|
||||
zap.Float64("highWaterLevel", high))
|
||||
updateCollectionFactor(0, metric.Effect.CollectionIDs)
|
||||
continue
|
||||
}
|
||||
factor := (high - cur) / (high - low)
|
||||
updateCollectionFactor(factor, metric.Effect.CollectionIDs)
|
||||
log.RatedWarn(10, "QuotaCenter: QueryNode growing segments size to low water level, limit writing rate",
|
||||
zap.String("Node", fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID)),
|
||||
zap.Int64s("collections", metric.Effect.CollectionIDs),
|
||||
zap.Int64("segmentsSize", metric.GrowingSegmentsSize),
|
||||
zap.Uint64("TotalMem", metric.Hms.Memory),
|
||||
zap.Float64("lowWaterLevel", low))
|
||||
}
|
||||
return collectionFactor
|
||||
}
|
||||
|
||||
// calculateRates calculates target rates by different strategies.
|
||||
func (q *QuotaCenter) calculateRates() error {
|
||||
q.resetAllCurrentRates()
|
||||
|
@ -397,6 +397,58 @@ func TestQuotaCenter(t *testing.T) {
|
||||
paramtable.Get().Reset(Params.QuotaConfig.QueryNodeMemoryHighWaterLevel.Key)
|
||||
})
|
||||
|
||||
t.Run("test GrowingSegmentsSize factors", func(t *testing.T) {
|
||||
qc := types.NewMockQueryCoord(t)
|
||||
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
|
||||
tests := []struct {
|
||||
low float64
|
||||
high float64
|
||||
growingSize int64
|
||||
memTotal uint64
|
||||
expectedFactor float64
|
||||
}{
|
||||
{0.8, 0.9, 10, 100, 1},
|
||||
{0.8, 0.9, 80, 100, 1},
|
||||
{0.8, 0.9, 82, 100, 0.8},
|
||||
{0.8, 0.9, 85, 100, 0.5},
|
||||
{0.8, 0.9, 88, 100, 0.2},
|
||||
{0.8, 0.9, 90, 100, 0},
|
||||
|
||||
{0.85, 0.95, 25, 100, 1},
|
||||
{0.85, 0.95, 85, 100, 1},
|
||||
{0.85, 0.95, 87, 100, 0.8},
|
||||
{0.85, 0.95, 90, 100, 0.5},
|
||||
{0.85, 0.95, 93, 100, 0.2},
|
||||
{0.85, 0.95, 95, 100, 0},
|
||||
}
|
||||
|
||||
quotaCenter.writableCollections = append(quotaCenter.writableCollections, 1, 2, 3)
|
||||
paramtable.Get().Save(Params.QuotaConfig.GrowingSegmentsSizeProtectionEnabled.Key, "true")
|
||||
for _, test := range tests {
|
||||
paramtable.Get().Save(Params.QuotaConfig.GrowingSegmentsSizeLowWaterLevel.Key, fmt.Sprintf("%f", test.low))
|
||||
paramtable.Get().Save(Params.QuotaConfig.GrowingSegmentsSizeHighWaterLevel.Key, fmt.Sprintf("%f", test.high))
|
||||
quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{
|
||||
1: {
|
||||
Hms: metricsinfo.HardwareMetrics{
|
||||
Memory: test.memTotal,
|
||||
},
|
||||
Effect: metricsinfo.NodeEffect{
|
||||
NodeID: 1,
|
||||
CollectionIDs: []int64{1, 2, 3},
|
||||
},
|
||||
GrowingSegmentsSize: test.growingSize,
|
||||
},
|
||||
}
|
||||
factors := quotaCenter.getGrowingSegmentsSizeFactor()
|
||||
|
||||
for _, factor := range factors {
|
||||
assert.True(t, math.Abs(factor-test.expectedFactor) < 0.01)
|
||||
}
|
||||
}
|
||||
paramtable.Get().Reset(Params.QuotaConfig.GrowingSegmentsSizeLowWaterLevel.Key)
|
||||
paramtable.Get().Reset(Params.QuotaConfig.GrowingSegmentsSizeHighWaterLevel.Key)
|
||||
})
|
||||
|
||||
t.Run("test checkDiskQuota", func(t *testing.T) {
|
||||
qc := types.NewMockQueryCoord(t)
|
||||
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
|
||||
|
@ -75,12 +75,13 @@ type NodeEffect struct {
|
||||
|
||||
// QueryNodeQuotaMetrics are metrics of QueryNode.
|
||||
type QueryNodeQuotaMetrics struct {
|
||||
Hms HardwareMetrics
|
||||
Rms []RateMetric
|
||||
Fgm FlowGraphMetric
|
||||
SearchQueue ReadInfoInQueue
|
||||
QueryQueue ReadInfoInQueue
|
||||
Effect NodeEffect
|
||||
Hms HardwareMetrics
|
||||
Rms []RateMetric
|
||||
Fgm FlowGraphMetric
|
||||
SearchQueue ReadInfoInQueue
|
||||
QueryQueue ReadInfoInQueue
|
||||
GrowingSegmentsSize int64
|
||||
Effect NodeEffect
|
||||
}
|
||||
|
||||
type DataCoordQuotaMetrics struct {
|
||||
|
@ -91,17 +91,20 @@ type quotaConfig struct {
|
||||
MaxCollectionNumPerDB ParamItem `refreshable:"true"`
|
||||
|
||||
// limit writing
|
||||
ForceDenyWriting ParamItem `refreshable:"true"`
|
||||
TtProtectionEnabled ParamItem `refreshable:"true"`
|
||||
MaxTimeTickDelay ParamItem `refreshable:"true"`
|
||||
MemProtectionEnabled ParamItem `refreshable:"true"`
|
||||
DataNodeMemoryLowWaterLevel ParamItem `refreshable:"true"`
|
||||
DataNodeMemoryHighWaterLevel ParamItem `refreshable:"true"`
|
||||
QueryNodeMemoryLowWaterLevel ParamItem `refreshable:"true"`
|
||||
QueryNodeMemoryHighWaterLevel ParamItem `refreshable:"true"`
|
||||
DiskProtectionEnabled ParamItem `refreshable:"true"`
|
||||
DiskQuota ParamItem `refreshable:"true"`
|
||||
DiskQuotaPerCollection ParamItem `refreshable:"true"`
|
||||
ForceDenyWriting ParamItem `refreshable:"true"`
|
||||
TtProtectionEnabled ParamItem `refreshable:"true"`
|
||||
MaxTimeTickDelay ParamItem `refreshable:"true"`
|
||||
MemProtectionEnabled ParamItem `refreshable:"true"`
|
||||
DataNodeMemoryLowWaterLevel ParamItem `refreshable:"true"`
|
||||
DataNodeMemoryHighWaterLevel ParamItem `refreshable:"true"`
|
||||
QueryNodeMemoryLowWaterLevel ParamItem `refreshable:"true"`
|
||||
QueryNodeMemoryHighWaterLevel ParamItem `refreshable:"true"`
|
||||
GrowingSegmentsSizeProtectionEnabled ParamItem `refreshable:"true"`
|
||||
GrowingSegmentsSizeLowWaterLevel ParamItem `refreshable:"true"`
|
||||
GrowingSegmentsSizeHighWaterLevel ParamItem `refreshable:"true"`
|
||||
DiskProtectionEnabled ParamItem `refreshable:"true"`
|
||||
DiskQuota ParamItem `refreshable:"true"`
|
||||
DiskQuotaPerCollection ParamItem `refreshable:"true"`
|
||||
|
||||
// limit reading
|
||||
ForceDenyReading ParamItem `refreshable:"true"`
|
||||
@ -880,6 +883,52 @@ When memory usage < memoryLowWaterLevel, no action.`,
|
||||
}
|
||||
p.QueryNodeMemoryHighWaterLevel.Init(base.mgr)
|
||||
|
||||
p.GrowingSegmentsSizeProtectionEnabled = ParamItem{
|
||||
Key: "quotaAndLimits.limitWriting.growingSegmentsSizeProtection.enabled",
|
||||
Version: "2.2.9",
|
||||
DefaultValue: "false",
|
||||
Doc: `1. No action will be taken if the ratio of growing segments size is less than the low water level.
|
||||
2. The DML rate will be reduced if the ratio of growing segments size is greater than the low water level and less than the high water level.
|
||||
3. All DML requests will be rejected if the ratio of growing segments size is greater than the high water level.`,
|
||||
Export: true,
|
||||
}
|
||||
p.GrowingSegmentsSizeProtectionEnabled.Init(base.mgr)
|
||||
|
||||
defaultGrowingSegSizeLowWaterLevel := "0.2"
|
||||
p.GrowingSegmentsSizeLowWaterLevel = ParamItem{
|
||||
Key: "quotaAndLimits.limitWriting.growingSegmentsSizeProtection.lowWaterLevel",
|
||||
Version: "2.2.9",
|
||||
DefaultValue: defaultGrowingSegSizeLowWaterLevel,
|
||||
Formatter: func(v string) string {
|
||||
level := getAsFloat(v)
|
||||
if level <= 0 || level > 1 {
|
||||
return defaultGrowingSegSizeLowWaterLevel
|
||||
}
|
||||
return v
|
||||
},
|
||||
Export: true,
|
||||
}
|
||||
p.GrowingSegmentsSizeLowWaterLevel.Init(base.mgr)
|
||||
|
||||
defaultGrowingSegSizeHighWaterLevel := "0.4"
|
||||
p.GrowingSegmentsSizeHighWaterLevel = ParamItem{
|
||||
Key: "quotaAndLimits.limitWriting.growingSegmentsSizeProtection.highWaterLevel",
|
||||
Version: "2.2.9",
|
||||
DefaultValue: defaultGrowingSegSizeHighWaterLevel,
|
||||
Formatter: func(v string) string {
|
||||
level := getAsFloat(v)
|
||||
if level <= 0 || level > 1 {
|
||||
return defaultGrowingSegSizeHighWaterLevel
|
||||
}
|
||||
if !p.checkMinMaxLegal(p.GrowingSegmentsSizeLowWaterLevel.GetAsFloat(), getAsFloat(v)) {
|
||||
return defaultGrowingSegSizeHighWaterLevel
|
||||
}
|
||||
return v
|
||||
},
|
||||
Export: true,
|
||||
}
|
||||
p.GrowingSegmentsSizeHighWaterLevel.Init(base.mgr)
|
||||
|
||||
p.DiskProtectionEnabled = ParamItem{
|
||||
Key: "quotaAndLimits.limitWriting.diskProtection.enabled",
|
||||
Version: "2.2.0",
|
||||
|
@ -122,6 +122,9 @@ func TestQuotaParam(t *testing.T) {
|
||||
assert.Equal(t, defaultHighWaterLevel, qc.DataNodeMemoryHighWaterLevel.GetAsFloat())
|
||||
assert.Equal(t, defaultLowWaterLevel, qc.QueryNodeMemoryLowWaterLevel.GetAsFloat())
|
||||
assert.Equal(t, defaultHighWaterLevel, qc.QueryNodeMemoryHighWaterLevel.GetAsFloat())
|
||||
assert.Equal(t, false, qc.GrowingSegmentsSizeProtectionEnabled.GetAsBool())
|
||||
assert.Equal(t, 0.2, qc.GrowingSegmentsSizeLowWaterLevel.GetAsFloat())
|
||||
assert.Equal(t, 0.4, qc.GrowingSegmentsSizeHighWaterLevel.GetAsFloat())
|
||||
assert.Equal(t, true, qc.DiskProtectionEnabled.GetAsBool())
|
||||
assert.Equal(t, defaultMax, qc.DiskQuota.GetAsFloat())
|
||||
assert.Equal(t, defaultMax, qc.DiskQuotaPerCollection.GetAsFloat())
|
||||
|
Loading…
Reference in New Issue
Block a user