mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
Prevent limiter burst overflow (#20153)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
ec7a98d231
commit
bddd47c60f
@ -122,14 +122,12 @@ func (rl *rateLimiter) registerLimiters() {
|
||||
case internalpb.RateType_DQLQuery:
|
||||
r = Params.QuotaConfig.DQLMaxQueryRate
|
||||
}
|
||||
log.Info("RateLimiter register for rateType",
|
||||
zap.String("rateType", internalpb.RateType_name[rt]),
|
||||
zap.String("rate", ratelimitutil.Limit(r).String()))
|
||||
limit := ratelimitutil.Limit(r)
|
||||
if limit < 0 {
|
||||
limit = ratelimitutil.Inf
|
||||
}
|
||||
burst := int(r) // use rate as burst, because Limiter is with punishment mechanism, burst is insignificant.
|
||||
rl.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(limit, burst)
|
||||
log.Info("RateLimiter register for rateType",
|
||||
zap.String("rateType", internalpb.RateType_name[rt]),
|
||||
zap.String("rate", ratelimitutil.Limit(r).String()),
|
||||
zap.String("burst", fmt.Sprintf("%v", burst)))
|
||||
}
|
||||
}
|
||||
|
@ -49,11 +49,33 @@ func TestMultiRateLimiter(t *testing.T) {
|
||||
multiLimiter := NewMultiRateLimiter()
|
||||
bak := Params.QuotaConfig.QuotaAndLimitsEnabled
|
||||
Params.QuotaConfig.QuotaAndLimitsEnabled = false
|
||||
ok, r := multiLimiter.Limit(internalpb.RateType(0), 1)
|
||||
assert.False(t, ok)
|
||||
assert.NotEqual(t, float64(0), r)
|
||||
for _, rt := range internalpb.RateType_value {
|
||||
ok, r := multiLimiter.Limit(internalpb.RateType(rt), 1)
|
||||
assert.False(t, ok)
|
||||
assert.NotEqual(t, float64(0), r)
|
||||
}
|
||||
Params.QuotaConfig.QuotaAndLimitsEnabled = bak
|
||||
})
|
||||
|
||||
t.Run("test limit", func(t *testing.T) {
|
||||
run := func(insertRate float64) {
|
||||
bakInsertRate := Params.QuotaConfig.DMLMaxInsertRate
|
||||
Params.QuotaConfig.DMLMaxInsertRate = insertRate
|
||||
multiLimiter := NewMultiRateLimiter()
|
||||
bak := Params.QuotaConfig.QuotaAndLimitsEnabled
|
||||
Params.QuotaConfig.QuotaAndLimitsEnabled = true
|
||||
ok, r := multiLimiter.Limit(internalpb.RateType_DMLInsert, 1*1024*1024)
|
||||
assert.False(t, ok)
|
||||
assert.NotEqual(t, float64(0), r)
|
||||
Params.QuotaConfig.QuotaAndLimitsEnabled = bak
|
||||
Params.QuotaConfig.DMLMaxInsertRate = bakInsertRate
|
||||
}
|
||||
run(math.MaxInt)
|
||||
run(math.MaxInt / 1.2)
|
||||
run(math.MaxInt / 2)
|
||||
run(math.MaxInt / 3)
|
||||
run(math.MaxInt / 10000)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRateLimiter(t *testing.T) {
|
||||
@ -90,14 +112,8 @@ func TestRateLimiter(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
for _, rt := range internalpb.RateType_value {
|
||||
for i := 0; i < 100; i++ {
|
||||
if i == 0 {
|
||||
// Consume token initialed in bucket
|
||||
ok, _ := limiter.limit(internalpb.RateType(rt), 1)
|
||||
assert.False(t, ok)
|
||||
} else {
|
||||
ok, _ := limiter.limit(internalpb.RateType(rt), 1)
|
||||
assert.True(t, ok)
|
||||
}
|
||||
ok, _ := limiter.limit(internalpb.RateType(rt), 1)
|
||||
assert.True(t, ok)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -654,7 +654,11 @@ func (q *QuotaCenter) setRates() error {
|
||||
}
|
||||
rates := make([]*internalpb.Rate, 0, len(q.currentRates))
|
||||
for rt, r := range q.currentRates {
|
||||
rates = append(rates, &internalpb.Rate{Rt: rt, R: float64(r) / float64(proxyNum)})
|
||||
if r == Inf {
|
||||
rates = append(rates, &internalpb.Rate{Rt: rt, R: float64(r)})
|
||||
} else {
|
||||
rates = append(rates, &internalpb.Rate{Rt: rt, R: float64(r) / float64(proxyNum)})
|
||||
}
|
||||
}
|
||||
return rates
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ import (
|
||||
|
||||
const (
|
||||
// defaultMax is the default unlimited rate or threshold.
|
||||
defaultMax = float64(math.MaxFloat64)
|
||||
defaultMax = float64(math.MaxInt)
|
||||
// MBSize used to convert megabytes and bytes.
|
||||
MBSize = 1024.0 * 1024.0
|
||||
// defaultDiskQuotaInMB is the default disk quota in megabytes.
|
||||
|
@ -31,7 +31,7 @@ import (
|
||||
type Limit float64
|
||||
|
||||
// Inf is the infinite rate limit; it allows all events.
|
||||
const Inf = Limit(math.MaxFloat64)
|
||||
const Inf = Limit(math.MaxInt)
|
||||
|
||||
// A Limiter controls how frequently events are allowed to happen.
|
||||
// It implements a "token bucket" of size b, initially full and refilled
|
||||
@ -113,6 +113,12 @@ func (lim *Limiter) SetLimit(newLimit Limit) {
|
||||
lim.last = now
|
||||
lim.tokens = tokens
|
||||
lim.limit = newLimit
|
||||
if newLimit >= math.MaxInt {
|
||||
lim.burst = math.MaxInt
|
||||
} else {
|
||||
// use rate as burst, because Limiter is with punishment mechanism, burst is insignificant.
|
||||
lim.burst = int(newLimit)
|
||||
}
|
||||
}
|
||||
|
||||
// advance calculates and returns an updated state for lim resulting from the passage of time.
|
||||
|
@ -18,6 +18,7 @@ package ratelimitutil
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
@ -63,6 +64,15 @@ func run(t *testing.T, lim *Limiter, allows []allow) {
|
||||
}
|
||||
}
|
||||
|
||||
func runWithoutCheckToken(t *testing.T, lim *Limiter, allows []allow) {
|
||||
for i, a := range allows {
|
||||
ok := lim.AllowN(a.t, a.n)
|
||||
if ok != a.ok {
|
||||
t.Errorf("step %d: lim.AllowN(%v, %v) = %v want %v", i, a.t, a.n, ok, a.ok)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLimit(t *testing.T) {
|
||||
t.Run("test limit", func(t *testing.T) {
|
||||
// test base
|
||||
@ -119,18 +129,30 @@ func TestLimit(t *testing.T) {
|
||||
{t2, 1, false, -1},
|
||||
{t2, 1, false, -1},
|
||||
})
|
||||
|
||||
limit := Inf
|
||||
burst := math.MaxInt
|
||||
limiter := NewLimiter(limit, burst)
|
||||
runWithoutCheckToken(t, limiter, []allow{
|
||||
{t0, 1 * 1024 * 1024, true, 0},
|
||||
{t1, 1 * 1024 * 1024, true, 0},
|
||||
{t2, 1 * 1024 * 1024, true, 0},
|
||||
{t3, 1 * 1024 * 1024, true, 0},
|
||||
{t4, 1 * 1024 * 1024, true, 0},
|
||||
{t5, 1 * 1024 * 1024, true, 0},
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("test SetLimit", func(t *testing.T) {
|
||||
lim := NewLimiter(10, 2)
|
||||
lim := NewLimiter(10, 10)
|
||||
|
||||
run(t, lim, []allow{
|
||||
{t0, 5, true, -3},
|
||||
{t0, 1, false, -3},
|
||||
{t1, 1, false, -3},
|
||||
{t0, 5, true, 5},
|
||||
{t0, 1, true, 4},
|
||||
{t1, 1, true, 4},
|
||||
})
|
||||
lim.SetLimit(100)
|
||||
run(t, lim, []allow{{t2, 10, true, -8}})
|
||||
runWithoutCheckToken(t, lim, []allow{{t2, 10, true, 0}})
|
||||
})
|
||||
|
||||
t.Run("test no truncation error", func(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user