milvus/internal/rootcoord/quota_center.go
SimFG 37b2f90c90
enhance: [2.4] the panic when db isn't existed in the rate limit interceptor (#33308)
issue: #33243
pr: #33244

1. fix: the panic when db isn't existed in the rate limit interceptor
#33244
2. enhance: check the auth in some rest v2 api #33256

---------

Signed-off-by: SimFG <bang.fu@zilliz.com>
2024-05-23 17:25:41 +08:00

1548 lines
54 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/quota"
rlinternal "github.com/milvus-io/milvus/internal/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
GetMetricsTimeout = 10 * time.Second
SetRatesTimeout = 10 * time.Second
)
type RateAllocateStrategy int32
const (
Average RateAllocateStrategy = 0
ByRateWeight RateAllocateStrategy = 1
)
var DefaultRateAllocateStrategy = Average
const Inf = ratelimitutil.Inf
type Limit = ratelimitutil.Limit
func GetInfLimiter(_ internalpb.RateType) *ratelimitutil.Limiter {
// It indicates an infinite limiter with burst is 0
return ratelimitutil.NewLimiter(Inf, 0)
}
func GetEarliestLimiter() *ratelimitutil.Limiter {
// It indicates an earliest limiter with burst is 0
return ratelimitutil.NewLimiter(0, 0)
}
type opType int
const (
ddl opType = iota
dml
dql
allOps
)
var ddlRateTypes = typeutil.NewSet(
internalpb.RateType_DDLCollection,
internalpb.RateType_DDLPartition,
internalpb.RateType_DDLIndex,
internalpb.RateType_DDLFlush,
internalpb.RateType_DDLCompaction,
)
var dmlRateTypes = typeutil.NewSet(
internalpb.RateType_DMLInsert,
internalpb.RateType_DMLUpsert,
internalpb.RateType_DMLDelete,
internalpb.RateType_DMLBulkLoad,
)
var dqlRateTypes = typeutil.NewSet(
internalpb.RateType_DQLSearch,
internalpb.RateType_DQLQuery,
)
// QuotaCenter manages the quota and limitations of the whole cluster,
// it receives metrics info from DataNodes, QueryNodes and Proxies, and
// notifies Proxies to limit rate of requests from clients or reject
// all requests when the cluster met resources issues.
// Limitations:
// 1. DML throughput limitation;
// 2. DDL, DQL qps/rps limitation;
//
// Protections:
// 1. TT protection -> dqlRate = maxDQLRate * (maxDelay - ttDelay) / maxDelay
// 2. Memory protection -> dmlRate = maxDMLRate * (highMem - curMem) / (highMem - lowMem)
// 3. Disk quota protection -> force deny writing if exceeded
// 4. DQL Queue length protection -> dqlRate = curDQLRate * CoolOffSpeed
// 5. DQL queue latency protection -> dqlRate = curDQLRate * CoolOffSpeed
// 6. Search result protection -> searchRate = curSearchRate * CoolOffSpeed
// 7. GrowingSegsSize protection -> dmlRate = maxDMLRate * (high - cur) / (high - low)
//
// If necessary, user can also manually force to deny RW requests.
type QuotaCenter struct {
ctx context.Context
cancel context.CancelFunc
// clients
proxies proxyutil.ProxyClientManagerInterface
queryCoord types.QueryCoordClient
dataCoord types.DataCoordClient
meta IMetaTable
// metrics
queryNodeMetrics map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics
dataNodeMetrics map[UniqueID]*metricsinfo.DataNodeQuotaMetrics
proxyMetrics map[UniqueID]*metricsinfo.ProxyQuotaMetrics
diskMu sync.Mutex // guards dataCoordMetrics and totalBinlogSize
dataCoordMetrics *metricsinfo.DataCoordQuotaMetrics
totalBinlogSize int64
readableCollections map[int64]map[int64][]int64 // db id -> collection id -> partition id
writableCollections map[int64]map[int64][]int64 // db id -> collection id -> partition id
dbs *typeutil.ConcurrentMap[string, int64] // db name -> db id
collections *typeutil.ConcurrentMap[string, int64] // db id + collection name -> collection id
// this is a transitional data structure to cache db id for each collection.
// TODO many metrics information only have collection id currently, it can be removed after db id add into all metrics.
collectionIDToDBID *typeutil.ConcurrentMap[int64, int64] // collection id -> db id
rateLimiter *rlinternal.RateLimiterTree
tsoAllocator tso.Allocator
rateAllocateStrategy RateAllocateStrategy
stopOnce sync.Once
stopChan chan struct{}
wg sync.WaitGroup
}
// NewQuotaCenter returns a new QuotaCenter.
func NewQuotaCenter(proxies proxyutil.ProxyClientManagerInterface, queryCoord types.QueryCoordClient,
dataCoord types.DataCoordClient, tsoAllocator tso.Allocator, meta IMetaTable,
) *QuotaCenter {
ctx, cancel := context.WithCancel(context.TODO())
q := &QuotaCenter{
ctx: ctx,
cancel: cancel,
proxies: proxies,
queryCoord: queryCoord,
dataCoord: dataCoord,
tsoAllocator: tsoAllocator,
meta: meta,
readableCollections: make(map[int64]map[int64][]int64, 0),
writableCollections: make(map[int64]map[int64][]int64, 0),
rateLimiter: rlinternal.NewRateLimiterTree(initInfLimiter(internalpb.RateScope_Cluster, allOps)),
rateAllocateStrategy: DefaultRateAllocateStrategy,
stopChan: make(chan struct{}),
}
q.clearMetrics()
return q
}
func initInfLimiter(rateScope internalpb.RateScope, opType opType) *rlinternal.RateLimiterNode {
return initLimiter(GetInfLimiter, rateScope, opType)
}
func newParamLimiterFunc(rateScope internalpb.RateScope, opType opType) func() *rlinternal.RateLimiterNode {
return func() *rlinternal.RateLimiterNode {
return initLimiter(func(rt internalpb.RateType) *ratelimitutil.Limiter {
limitVal := quota.GetQuotaValue(rateScope, rt, Params)
return ratelimitutil.NewLimiter(Limit(limitVal), 0)
}, rateScope, opType)
}
}
func newParamLimiterFuncWithLimitFunc(rateScope internalpb.RateScope,
opType opType,
limitFunc func(internalpb.RateType) Limit,
) func() *rlinternal.RateLimiterNode {
return func() *rlinternal.RateLimiterNode {
return initLimiter(func(rt internalpb.RateType) *ratelimitutil.Limiter {
limitVal := limitFunc(rt)
return ratelimitutil.NewLimiter(limitVal, 0)
}, rateScope, opType)
}
}
func initLimiter(limiterFunc func(internalpb.RateType) *ratelimitutil.Limiter, rateScope internalpb.RateScope, opType opType) *rlinternal.RateLimiterNode {
rateLimiters := rlinternal.NewRateLimiterNode(rateScope)
getRateTypes(rateScope, opType).Range(func(rt internalpb.RateType) bool {
rateLimiters.GetLimiters().GetOrInsert(rt, limiterFunc(rt))
return true
})
return rateLimiters
}
func updateLimiter(node *rlinternal.RateLimiterNode, limiter *ratelimitutil.Limiter, rateScope internalpb.RateScope, opType opType) {
if node == nil {
log.Warn("update limiter failed, node is nil", zap.Any("rateScope", rateScope), zap.Any("opType", opType))
return
}
limiters := node.GetLimiters()
getRateTypes(rateScope, opType).Range(func(rt internalpb.RateType) bool {
originLimiter, ok := limiters.Get(rt)
if !ok {
log.Warn("update limiter failed, limiter not found",
zap.Any("rateScope", rateScope),
zap.Any("opType", opType),
zap.Any("rateType", rt))
return true
}
originLimiter.SetLimit(limiter.Limit())
return true
})
}
func getRateTypes(scope internalpb.RateScope, opType opType) typeutil.Set[internalpb.RateType] {
var allRateTypes typeutil.Set[internalpb.RateType]
switch scope {
case internalpb.RateScope_Cluster:
fallthrough
case internalpb.RateScope_Database:
allRateTypes = ddlRateTypes.Union(dmlRateTypes).Union(dqlRateTypes)
case internalpb.RateScope_Collection:
allRateTypes = typeutil.NewSet(internalpb.RateType_DDLFlush).Union(dmlRateTypes).Union(dqlRateTypes)
case internalpb.RateScope_Partition:
allRateTypes = dmlRateTypes.Union(dqlRateTypes)
default:
panic("Unknown rate scope:" + scope.String())
}
switch opType {
case ddl:
return ddlRateTypes.Intersection(allRateTypes)
case dml:
return dmlRateTypes.Intersection(allRateTypes)
case dql:
return dqlRateTypes.Intersection(allRateTypes)
default:
return allRateTypes
}
}
func (q *QuotaCenter) Start() {
q.wg.Add(1)
go q.run()
}
// run starts the service of QuotaCenter.
func (q *QuotaCenter) run() {
defer q.wg.Done()
interval := Params.QuotaConfig.QuotaCenterCollectInterval.GetAsDuration(time.Second)
log.Info("Start QuotaCenter", zap.Duration("collectInterval", interval))
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-q.stopChan:
log.Info("QuotaCenter exit")
return
case <-ticker.C:
err := q.collectMetrics()
if err != nil {
log.Warn("quotaCenter collect metrics failed", zap.Error(err))
break
}
err = q.calculateRates()
if err != nil {
log.Warn("quotaCenter calculate rates failed", zap.Error(err))
break
}
err = q.sendRatesToProxy()
if err != nil {
log.Warn("quotaCenter send rates to proxy failed", zap.Error(err))
}
q.recordMetrics()
}
}
}
// stop would stop the service of QuotaCenter.
func (q *QuotaCenter) stop() {
log.Info("stop quota center")
q.stopOnce.Do(func() {
// cancel all blocking request to coord
q.cancel()
close(q.stopChan)
})
q.wg.Wait()
}
// clearMetrics removes all metrics stored in QuotaCenter.
func (q *QuotaCenter) clearMetrics() {
q.dataNodeMetrics = make(map[UniqueID]*metricsinfo.DataNodeQuotaMetrics, 0)
q.queryNodeMetrics = make(map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics, 0)
q.proxyMetrics = make(map[UniqueID]*metricsinfo.ProxyQuotaMetrics, 0)
q.collectionIDToDBID = typeutil.NewConcurrentMap[int64, int64]()
q.collections = typeutil.NewConcurrentMap[string, int64]()
q.dbs = typeutil.NewConcurrentMap[string, int64]()
}
func updateNumEntitiesLoaded(current map[int64]int64, qn *metricsinfo.QueryNodeCollectionMetrics) map[int64]int64 {
for collectionID, rowNum := range qn.CollectionRows {
current[collectionID] += rowNum
}
return current
}
func FormatCollectionKey(dbID int64, collectionName string) string {
return fmt.Sprintf("%d.%s", dbID, collectionName)
}
func SplitCollectionKey(key string) (dbID int64, collectionName string) {
splits := strings.Split(key, ".")
if len(splits) == 2 {
dbID, _ = strconv.ParseInt(splits[0], 10, 64)
collectionName = splits[1]
}
return
}
// collectMetrics sends GetMetrics requests to DataCoord and QueryCoord to sync the metrics in DataNodes and QueryNodes.
func (q *QuotaCenter) collectMetrics() error {
oldDataNodes := typeutil.NewSet(lo.Keys(q.dataNodeMetrics)...)
oldQueryNodes := typeutil.NewSet(lo.Keys(q.queryNodeMetrics)...)
q.clearMetrics()
ctx, cancel := context.WithTimeout(q.ctx, GetMetricsTimeout)
defer cancel()
group := &errgroup.Group{}
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
if err != nil {
return err
}
// get Query cluster metrics
group.Go(func() error {
rsp, err := q.queryCoord.GetMetrics(ctx, req)
if err = merr.CheckRPCCall(rsp, err); err != nil {
return err
}
queryCoordTopology := &metricsinfo.QueryCoordTopology{}
err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), queryCoordTopology)
if err != nil {
return err
}
collections := typeutil.NewUniqueSet()
numEntitiesLoaded := make(map[int64]int64)
for _, queryNodeMetric := range queryCoordTopology.Cluster.ConnectedNodes {
if queryNodeMetric.QuotaMetrics != nil {
oldQueryNodes.Remove(queryNodeMetric.ID)
q.queryNodeMetrics[queryNodeMetric.ID] = queryNodeMetric.QuotaMetrics
collections.Insert(queryNodeMetric.QuotaMetrics.Effect.CollectionIDs...)
}
if queryNodeMetric.CollectionMetrics != nil {
numEntitiesLoaded = updateNumEntitiesLoaded(numEntitiesLoaded, queryNodeMetric.CollectionMetrics)
}
}
q.readableCollections = make(map[int64]map[int64][]int64, 0)
var rangeErr error
collections.Range(func(collectionID int64) bool {
coll, getErr := q.meta.GetCollectionByIDWithMaxTs(context.TODO(), collectionID)
if getErr != nil {
rangeErr = getErr
return false
}
collIDToPartIDs, ok := q.readableCollections[coll.DBID]
if !ok {
collIDToPartIDs = make(map[int64][]int64)
q.readableCollections[coll.DBID] = collIDToPartIDs
}
collIDToPartIDs[collectionID] = append(collIDToPartIDs[collectionID],
lo.Map(coll.Partitions, func(part *model.Partition, _ int) int64 { return part.PartitionID })...)
q.collectionIDToDBID.Insert(collectionID, coll.DBID)
q.collections.Insert(FormatCollectionKey(coll.DBID, coll.Name), collectionID)
if numEntity, ok := numEntitiesLoaded[collectionID]; ok {
metrics.RootCoordNumEntities.WithLabelValues(coll.Name, metrics.LoadedLabel).Set(float64(numEntity))
}
return true
})
return rangeErr
})
// get Data cluster metrics
group.Go(func() error {
rsp, err := q.dataCoord.GetMetrics(ctx, req)
if err = merr.CheckRPCCall(rsp, err); err != nil {
return err
}
dataCoordTopology := &metricsinfo.DataCoordTopology{}
err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), dataCoordTopology)
if err != nil {
return err
}
collections := typeutil.NewUniqueSet()
for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes {
if dataNodeMetric.QuotaMetrics != nil {
oldDataNodes.Remove(dataNodeMetric.ID)
q.dataNodeMetrics[dataNodeMetric.ID] = dataNodeMetric.QuotaMetrics
collections.Insert(dataNodeMetric.QuotaMetrics.Effect.CollectionIDs...)
}
}
datacoordQuotaCollections := make([]int64, 0)
q.diskMu.Lock()
if dataCoordTopology.Cluster.Self.QuotaMetrics != nil {
q.dataCoordMetrics = dataCoordTopology.Cluster.Self.QuotaMetrics
for _, metricCollections := range q.dataCoordMetrics.PartitionsBinlogSize {
for metricCollection := range metricCollections {
datacoordQuotaCollections = append(datacoordQuotaCollections, metricCollection)
}
}
}
q.diskMu.Unlock()
q.writableCollections = make(map[int64]map[int64][]int64, 0)
var collectionMetrics map[int64]*metricsinfo.DataCoordCollectionInfo
cm := dataCoordTopology.Cluster.Self.CollectionMetrics
if cm != nil {
collectionMetrics = cm.Collections
}
var rangeErr error
collections.Range(func(collectionID int64) bool {
coll, getErr := q.meta.GetCollectionByIDWithMaxTs(context.TODO(), collectionID)
if getErr != nil {
rangeErr = getErr
return false
}
collIDToPartIDs, ok := q.writableCollections[coll.DBID]
if !ok {
collIDToPartIDs = make(map[int64][]int64)
q.writableCollections[coll.DBID] = collIDToPartIDs
}
collIDToPartIDs[collectionID] = append(collIDToPartIDs[collectionID],
lo.Map(coll.Partitions, func(part *model.Partition, _ int) int64 { return part.PartitionID })...)
q.collectionIDToDBID.Insert(collectionID, coll.DBID)
q.collections.Insert(FormatCollectionKey(coll.DBID, coll.Name), collectionID)
if collectionMetrics == nil {
return true
}
if datacoordCollectionMetric, ok := collectionMetrics[collectionID]; ok {
metrics.RootCoordNumEntities.WithLabelValues(coll.Name, metrics.TotalLabel).Set(float64(datacoordCollectionMetric.NumEntitiesTotal))
fields := lo.KeyBy(coll.Fields, func(v *model.Field) int64 { return v.FieldID })
for _, indexInfo := range datacoordCollectionMetric.IndexInfo {
if _, ok := fields[indexInfo.FieldID]; !ok {
continue
}
field := fields[indexInfo.FieldID]
metrics.RootCoordIndexedNumEntities.WithLabelValues(
coll.Name,
indexInfo.IndexName,
strconv.FormatBool(typeutil.IsVectorType(field.DataType))).Set(float64(indexInfo.NumEntitiesIndexed))
}
}
return true
})
if rangeErr != nil {
return rangeErr
}
for _, collectionID := range datacoordQuotaCollections {
_, ok := q.collectionIDToDBID.Get(collectionID)
if ok {
continue
}
coll, getErr := q.meta.GetCollectionByIDWithMaxTs(context.TODO(), collectionID)
if getErr != nil {
return getErr
}
q.collectionIDToDBID.Insert(collectionID, coll.DBID)
q.collections.Insert(FormatCollectionKey(coll.DBID, coll.Name), collectionID)
}
return nil
})
// get Proxies metrics
group.Go(func() error {
// TODO: get more proxy metrics info
rsps, err := q.proxies.GetProxyMetrics(ctx)
if err != nil {
return err
}
for _, rsp := range rsps {
proxyMetric := &metricsinfo.ProxyInfos{}
err = metricsinfo.UnmarshalComponentInfos(rsp.GetResponse(), proxyMetric)
if err != nil {
return err
}
if proxyMetric.QuotaMetrics != nil {
q.proxyMetrics[proxyMetric.ID] = proxyMetric.QuotaMetrics
}
}
return nil
})
group.Go(func() error {
dbs, err := q.meta.ListDatabases(ctx, typeutil.MaxTimestamp)
if err != nil {
return err
}
for _, db := range dbs {
q.dbs.Insert(db.Name, db.ID)
}
return nil
})
err = group.Wait()
if err != nil {
return err
}
for oldDN := range oldDataNodes {
metrics.RootCoordTtDelay.DeleteLabelValues(typeutil.DataNodeRole, strconv.FormatInt(oldDN, 10))
}
for oldQN := range oldQueryNodes {
metrics.RootCoordTtDelay.DeleteLabelValues(typeutil.QueryNodeRole, strconv.FormatInt(oldQN, 10))
}
return nil
}
// forceDenyWriting sets dml rates to 0 to reject all dml requests.
func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster bool, dbIDs, collectionIDs []int64, col2partitionIDs map[int64][]int64) error {
if cluster {
clusterLimiters := q.rateLimiter.GetRootLimiters()
updateLimiter(clusterLimiters, GetEarliestLimiter(), internalpb.RateScope_Cluster, dml)
clusterLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode)
}
for _, dbID := range dbIDs {
dbLimiters := q.rateLimiter.GetDatabaseLimiters(dbID)
if dbLimiters == nil {
log.Warn("db limiter not found of db ID", zap.Int64("dbID", dbID))
return fmt.Errorf("db limiter not found of db ID: %d", dbID)
}
updateLimiter(dbLimiters, GetEarliestLimiter(), internalpb.RateScope_Database, dml)
dbLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode)
}
for _, collectionID := range collectionIDs {
dbID, ok := q.collectionIDToDBID.Get(collectionID)
if !ok {
return fmt.Errorf("db ID not found of collection ID: %d", collectionID)
}
collectionLimiter := q.rateLimiter.GetCollectionLimiters(dbID, collectionID)
if collectionLimiter == nil {
log.Warn("collection limiter not found of collection ID",
zap.Int64("dbID", dbID),
zap.Int64("collectionID", collectionID))
return fmt.Errorf("collection limiter not found of collection ID: %d", collectionID)
}
updateLimiter(collectionLimiter, GetEarliestLimiter(), internalpb.RateScope_Collection, dml)
collectionLimiter.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode)
}
for collectionID, partitionIDs := range col2partitionIDs {
for _, partitionID := range partitionIDs {
dbID, ok := q.collectionIDToDBID.Get(collectionID)
if !ok {
return fmt.Errorf("db ID not found of collection ID: %d", collectionID)
}
partitionLimiter := q.rateLimiter.GetPartitionLimiters(dbID, collectionID, partitionID)
if partitionLimiter == nil {
log.Warn("partition limiter not found of partition ID",
zap.Int64("dbID", dbID),
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID))
return fmt.Errorf("partition limiter not found of partition ID: %d", partitionID)
}
updateLimiter(partitionLimiter, GetEarliestLimiter(), internalpb.RateScope_Partition, dml)
partitionLimiter.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToWrite, errorCode)
}
}
if cluster || len(dbIDs) > 0 || len(collectionIDs) > 0 || len(col2partitionIDs) > 0 {
log.RatedWarn(10, "QuotaCenter force to deny writing",
zap.Bool("cluster", cluster),
zap.Int64s("dbIDs", dbIDs),
zap.Int64s("collectionIDs", collectionIDs),
zap.Any("partitionIDs", col2partitionIDs),
zap.String("reason", errorCode.String()))
}
return nil
}
// forceDenyReading sets dql rates to 0 to reject all dql requests.
func (q *QuotaCenter) forceDenyReading(errorCode commonpb.ErrorCode) {
var collectionIDs []int64
for dbID, collectionIDToPartIDs := range q.readableCollections {
for collectionID := range collectionIDToPartIDs {
collectionLimiter := q.rateLimiter.GetCollectionLimiters(dbID, collectionID)
updateLimiter(collectionLimiter, GetEarliestLimiter(), internalpb.RateScope_Collection, dql)
collectionLimiter.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToRead, errorCode)
collectionIDs = append(collectionIDs, collectionID)
}
}
log.Warn("QuotaCenter force to deny reading",
zap.Int64s("collectionIDs", collectionIDs),
zap.String("reason", errorCode.String()))
}
// getRealTimeRate return real time rate in Proxy.
func (q *QuotaCenter) getRealTimeRate(label string) float64 {
var rate float64
for _, metric := range q.proxyMetrics {
for _, r := range metric.Rms {
if r.Label == label {
rate += r.Rate
break
}
}
}
return rate
}
// guaranteeMinRate make sure the rate will not be less than the min rate.
func (q *QuotaCenter) guaranteeMinRate(minRate float64, rt internalpb.RateType, rln *rlinternal.RateLimiterNode) {
v, ok := rln.GetLimiters().Get(rt)
if ok && minRate > 0 && v.Limit() < Limit(minRate) {
v.SetLimit(Limit(minRate))
}
}
// calculateReadRates calculates and sets dql rates.
func (q *QuotaCenter) calculateReadRates() error {
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
if Params.QuotaConfig.ForceDenyReading.GetAsBool() {
q.forceDenyReading(commonpb.ErrorCode_ForceDeny)
return nil
}
limitCollectionSet := typeutil.NewUniqueSet()
limitDBNameSet := typeutil.NewSet[string]()
limitCollectionNameSet := typeutil.NewSet[string]()
clusterLimit := false
formatCollctionRateKey := func(dbName, collectionName string) string {
return fmt.Sprintf("%s.%s", dbName, collectionName)
}
splitCollctionRateKey := func(key string) (string, string) {
parts := strings.Split(key, ".")
return parts[0], parts[1]
}
// query latency
queueLatencyThreshold := Params.QuotaConfig.QueueLatencyThreshold.GetAsDuration(time.Second)
// enableQueueProtection && queueLatencyThreshold >= 0 means enable queue latency protection
if queueLatencyThreshold >= 0 {
for _, metric := range q.queryNodeMetrics {
searchLatency := metric.SearchQueue.AvgQueueDuration
queryLatency := metric.QueryQueue.AvgQueueDuration
if searchLatency >= queueLatencyThreshold || queryLatency >= queueLatencyThreshold {
limitCollectionSet.Insert(metric.Effect.CollectionIDs...)
}
}
}
// queue length
enableQueueProtection := Params.QuotaConfig.QueueProtectionEnabled.GetAsBool()
nqInQueueThreshold := Params.QuotaConfig.NQInQueueThreshold.GetAsInt64()
if enableQueueProtection && nqInQueueThreshold >= 0 {
// >= 0 means enable queue length protection
sum := func(ri metricsinfo.ReadInfoInQueue) int64 {
return ri.UnsolvedQueue + ri.ReadyQueue + ri.ReceiveChan + ri.ExecuteChan
}
for _, metric := range q.queryNodeMetrics {
// We think of the NQ of query request as 1.
// search use same queue length counter with query
if sum(metric.SearchQueue) >= nqInQueueThreshold {
limitCollectionSet.Insert(metric.Effect.CollectionIDs...)
}
}
}
metricMap := make(map[string]float64) // label metric
collectionMetricMap := make(map[string]map[string]map[string]float64) // sub label metric, label -> db -> collection -> value
for _, metric := range q.proxyMetrics {
for _, rm := range metric.Rms {
if !ratelimitutil.IsSubLabel(rm.Label) {
metricMap[rm.Label] += rm.Rate
continue
}
mainLabel, database, collection, ok := ratelimitutil.SplitCollectionSubLabel(rm.Label)
if !ok {
continue
}
labelMetric, ok := collectionMetricMap[mainLabel]
if !ok {
labelMetric = make(map[string]map[string]float64)
collectionMetricMap[mainLabel] = labelMetric
}
databaseMetric, ok := labelMetric[database]
if !ok {
databaseMetric = make(map[string]float64)
labelMetric[database] = databaseMetric
}
databaseMetric[collection] += rm.Rate
}
}
// read result
enableResultProtection := Params.QuotaConfig.ResultProtectionEnabled.GetAsBool()
if enableResultProtection {
maxRate := Params.QuotaConfig.MaxReadResultRate.GetAsFloat()
maxDBRate := Params.QuotaConfig.MaxReadResultRatePerDB.GetAsFloat()
maxCollectionRate := Params.QuotaConfig.MaxReadResultRatePerCollection.GetAsFloat()
dbRateCount := make(map[string]float64)
collectionRateCount := make(map[string]float64)
rateCount := metricMap[metricsinfo.ReadResultThroughput]
for mainLabel, labelMetric := range collectionMetricMap {
if mainLabel != metricsinfo.ReadResultThroughput {
continue
}
for database, databaseMetric := range labelMetric {
for collection, metricValue := range databaseMetric {
dbRateCount[database] += metricValue
collectionRateCount[formatCollctionRateKey(database, collection)] = metricValue
}
}
}
if rateCount >= maxRate {
clusterLimit = true
}
for s, f := range dbRateCount {
if f >= maxDBRate {
limitDBNameSet.Insert(s)
}
}
for s, f := range collectionRateCount {
if f >= maxCollectionRate {
limitCollectionNameSet.Insert(s)
}
}
}
dbIDs := make(map[int64]string, q.dbs.Len())
collectionIDs := make(map[int64]string, q.collections.Len())
q.dbs.Range(func(name string, id int64) bool {
dbIDs[id] = name
return true
})
q.collections.Range(func(name string, id int64) bool {
_, collectionName := SplitCollectionKey(name)
collectionIDs[id] = collectionName
return true
})
coolOffSpeed := Params.QuotaConfig.CoolOffSpeed.GetAsFloat()
if clusterLimit {
realTimeClusterSearchRate := metricMap[internalpb.RateType_DQLSearch.String()]
realTimeClusterQueryRate := metricMap[internalpb.RateType_DQLQuery.String()]
q.coolOffReading(realTimeClusterSearchRate, realTimeClusterQueryRate, coolOffSpeed, q.rateLimiter.GetRootLimiters(), log)
}
var updateLimitErr error
if limitDBNameSet.Len() > 0 {
databaseSearchRate := make(map[string]float64)
databaseQueryRate := make(map[string]float64)
for mainLabel, labelMetric := range collectionMetricMap {
var databaseRate map[string]float64
if mainLabel == internalpb.RateType_DQLSearch.String() {
databaseRate = databaseSearchRate
} else if mainLabel == internalpb.RateType_DQLQuery.String() {
databaseRate = databaseQueryRate
} else {
continue
}
for database, databaseMetric := range labelMetric {
for _, metricValue := range databaseMetric {
databaseRate[database] += metricValue
}
}
}
limitDBNameSet.Range(func(name string) bool {
dbID, ok := q.dbs.Get(name)
if !ok {
log.Warn("db not found", zap.String("dbName", name))
updateLimitErr = fmt.Errorf("db not found: %s", name)
return false
}
dbLimiter := q.rateLimiter.GetDatabaseLimiters(dbID)
if dbLimiter == nil {
log.Warn("database limiter not found", zap.Int64("dbID", dbID))
updateLimitErr = fmt.Errorf("database limiter not found")
return false
}
realTimeSearchRate := databaseSearchRate[name]
realTimeQueryRate := databaseQueryRate[name]
q.coolOffReading(realTimeSearchRate, realTimeQueryRate, coolOffSpeed, dbLimiter, log)
return true
})
if updateLimitErr != nil {
return updateLimitErr
}
}
limitCollectionNameSet.Range(func(name string) bool {
dbName, collectionName := splitCollctionRateKey(name)
dbID, ok := q.dbs.Get(dbName)
if !ok {
log.Warn("db not found", zap.String("dbName", dbName))
updateLimitErr = fmt.Errorf("db not found: %s", dbName)
return false
}
collectionID, ok := q.collections.Get(FormatCollectionKey(dbID, collectionName))
if !ok {
log.Warn("collection not found", zap.String("collectionName", name))
updateLimitErr = fmt.Errorf("collection not found: %s", name)
return false
}
limitCollectionSet.Insert(collectionID)
return true
})
if updateLimitErr != nil {
return updateLimitErr
}
safeGetCollectionRate := func(label, dbName, collectionName string) float64 {
if labelMetric, ok := collectionMetricMap[label]; ok {
if dbMetric, ok := labelMetric[dbName]; ok {
if rate, ok := dbMetric[collectionName]; ok {
return rate
}
}
}
return 0
}
coolOffCollectionID := func(collections ...int64) error {
for _, collection := range collections {
dbID, ok := q.collectionIDToDBID.Get(collection)
if !ok {
return fmt.Errorf("db ID not found of collection ID: %d", collection)
}
collectionLimiter := q.rateLimiter.GetCollectionLimiters(dbID, collection)
if collectionLimiter == nil {
return fmt.Errorf("collection limiter not found: %d", collection)
}
dbName, ok := dbIDs[dbID]
if !ok {
return fmt.Errorf("db name not found of db ID: %d", dbID)
}
collectionName, ok := collectionIDs[collection]
if !ok {
return fmt.Errorf("collection name not found of collection ID: %d", collection)
}
realTimeSearchRate := safeGetCollectionRate(internalpb.RateType_DQLSearch.String(), dbName, collectionName)
realTimeQueryRate := safeGetCollectionRate(internalpb.RateType_DQLQuery.String(), dbName, collectionName)
q.coolOffReading(realTimeSearchRate, realTimeQueryRate, coolOffSpeed, collectionLimiter, log)
collectionProps := q.getCollectionLimitProperties(collection)
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionSearchRateMinKey),
internalpb.RateType_DQLSearch, collectionLimiter)
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionQueryRateMinKey),
internalpb.RateType_DQLQuery, collectionLimiter)
}
return nil
}
if updateLimitErr = coolOffCollectionID(limitCollectionSet.Collect()...); updateLimitErr != nil {
return updateLimitErr
}
return nil
}
func (q *QuotaCenter) coolOffReading(realTimeSearchRate, realTimeQueryRate, coolOffSpeed float64,
node *rlinternal.RateLimiterNode, mlog *log.MLogger,
) {
limiter := node.GetLimiters()
v, ok := limiter.Get(internalpb.RateType_DQLSearch)
if ok && v.Limit() != Inf && realTimeSearchRate > 0 {
v.SetLimit(Limit(realTimeSearchRate * coolOffSpeed))
mlog.RatedWarn(10, "QuotaCenter cool read rates off done",
zap.Any("level", node.Level()),
zap.Any("id", node.GetID()),
zap.Any("searchRate", v.Limit()))
}
v, ok = limiter.Get(internalpb.RateType_DQLQuery)
if ok && v.Limit() != Inf && realTimeQueryRate > 0 {
v.SetLimit(Limit(realTimeQueryRate * coolOffSpeed))
mlog.RatedWarn(10, "QuotaCenter cool read rates off done",
zap.Any("level", node.Level()),
zap.Any("id", node.GetID()),
zap.Any("queryRate", v.Limit()))
}
}
// calculateWriteRates calculates and sets dml rates.
func (q *QuotaCenter) calculateWriteRates() error {
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
if Params.QuotaConfig.ForceDenyWriting.GetAsBool() {
return q.forceDenyWriting(commonpb.ErrorCode_ForceDeny, true, nil, nil, nil)
}
if err := q.checkDiskQuota(); err != nil {
return err
}
ts, err := q.tsoAllocator.GenerateTSO(1)
if err != nil {
return err
}
collectionFactors := make(map[int64]float64)
updateCollectionFactor := func(factors map[int64]float64) {
for collection, factor := range factors {
_, ok := collectionFactors[collection]
if !ok || collectionFactors[collection] > factor {
collectionFactors[collection] = factor
}
}
}
ttFactors := q.getTimeTickDelayFactor(ts)
updateCollectionFactor(ttFactors)
memFactors := q.getMemoryFactor()
updateCollectionFactor(memFactors)
growingSegFactors := q.getGrowingSegmentsSizeFactor()
updateCollectionFactor(growingSegFactors)
ttCollections := make([]int64, 0)
memoryCollections := make([]int64, 0)
for collection, factor := range collectionFactors {
metrics.RootCoordRateLimitRatio.WithLabelValues(fmt.Sprint(collection)).Set(1 - factor)
if factor <= 0 {
if _, ok := ttFactors[collection]; ok && factor == ttFactors[collection] {
// factor comes from ttFactor
ttCollections = append(ttCollections, collection)
} else {
memoryCollections = append(memoryCollections, collection)
}
}
dbID, ok := q.collectionIDToDBID.Get(collection)
if !ok {
return fmt.Errorf("db ID not found of collection ID: %d", collection)
}
collectionLimiter := q.rateLimiter.GetCollectionLimiters(dbID, collection)
if collectionLimiter == nil {
return fmt.Errorf("collection limiter not found: %d", collection)
}
limiter := collectionLimiter.GetLimiters()
for _, rt := range []internalpb.RateType{
internalpb.RateType_DMLInsert,
internalpb.RateType_DMLUpsert,
internalpb.RateType_DMLDelete,
} {
v, ok := limiter.Get(rt)
if ok {
if v.Limit() != Inf {
v.SetLimit(v.Limit() * Limit(factor))
}
}
}
collectionProps := q.getCollectionLimitProperties(collection)
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionInsertRateMinKey),
internalpb.RateType_DMLInsert, collectionLimiter)
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionUpsertRateMinKey),
internalpb.RateType_DMLUpsert, collectionLimiter)
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionDeleteRateMinKey),
internalpb.RateType_DMLDelete, collectionLimiter)
log.RatedDebug(10, "QuotaCenter cool write rates off done",
zap.Int64("collectionID", collection),
zap.Float64("factor", factor))
}
if len(ttCollections) > 0 {
if err = q.forceDenyWriting(commonpb.ErrorCode_TimeTickLongDelay, false, nil, ttCollections, nil); err != nil {
log.Warn("fail to force deny writing for time tick delay", zap.Error(err))
return err
}
}
if len(memoryCollections) > 0 {
if err = q.forceDenyWriting(commonpb.ErrorCode_MemoryQuotaExhausted, false, nil, memoryCollections, nil); err != nil {
log.Warn("fail to force deny writing for memory quota", zap.Error(err))
return err
}
}
return nil
}
func (q *QuotaCenter) getTimeTickDelayFactor(ts Timestamp) map[int64]float64 {
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
if !Params.QuotaConfig.TtProtectionEnabled.GetAsBool() {
return make(map[int64]float64)
}
maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
if maxDelay < 0 {
// < 0 means disable tt protection
return make(map[int64]float64)
}
collectionsMaxDelay := make(map[int64]time.Duration)
updateCollectionDelay := func(delay time.Duration, collections []int64) {
for _, collection := range collections {
_, ok := collectionsMaxDelay[collection]
if !ok || collectionsMaxDelay[collection] < delay {
collectionsMaxDelay[collection] = delay
}
}
}
t1, _ := tsoutil.ParseTS(ts)
for nodeID, metric := range q.queryNodeMetrics {
if metric.Fgm.NumFlowGraph > 0 && metric.Fgm.MinFlowGraphChannel != "" {
t2, _ := tsoutil.ParseTS(metric.Fgm.MinFlowGraphTt)
delay := t1.Sub(t2)
updateCollectionDelay(delay, metric.Effect.CollectionIDs)
metrics.RootCoordTtDelay.WithLabelValues(typeutil.QueryNodeRole, strconv.FormatInt(nodeID, 10)).Set(float64(delay.Milliseconds()))
}
}
for nodeID, metric := range q.dataNodeMetrics {
if metric.Fgm.NumFlowGraph > 0 && metric.Fgm.MinFlowGraphChannel != "" {
t2, _ := tsoutil.ParseTS(metric.Fgm.MinFlowGraphTt)
delay := t1.Sub(t2)
updateCollectionDelay(delay, metric.Effect.CollectionIDs)
metrics.RootCoordTtDelay.WithLabelValues(typeutil.DataNodeRole, strconv.FormatInt(nodeID, 10)).Set(float64(delay.Milliseconds()))
}
}
collectionFactor := make(map[int64]float64)
for collectionID, curMaxDelay := range collectionsMaxDelay {
if curMaxDelay.Nanoseconds() >= maxDelay.Nanoseconds() {
log.RatedWarn(10, "QuotaCenter force deny writing due to long timeTick delay",
zap.Int64("collectionID", collectionID),
zap.Time("curTs", t1),
zap.Duration("delay", curMaxDelay),
zap.Duration("MaxDelay", maxDelay))
log.RatedInfo(10, "DataNode and QueryNode Metrics",
zap.Any("QueryNodeMetrics", q.queryNodeMetrics),
zap.Any("DataNodeMetrics", q.dataNodeMetrics))
collectionFactor[collectionID] = 0
continue
}
factor := float64(maxDelay.Nanoseconds()-curMaxDelay.Nanoseconds()) / float64(maxDelay.Nanoseconds())
if factor <= 0.9 {
log.RatedWarn(10, "QuotaCenter: limit writing due to long timeTick delay",
zap.Int64("collectionID", collectionID),
zap.Time("curTs", t1),
zap.Duration("delay", curMaxDelay),
zap.Duration("MaxDelay", maxDelay),
zap.Float64("factor", factor))
}
collectionFactor[collectionID] = factor
}
return collectionFactor
}
// getMemoryFactor checks whether any node has memory resource issue,
// and return the factor according to max memory water level.
func (q *QuotaCenter) getMemoryFactor() map[int64]float64 {
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
if !Params.QuotaConfig.MemProtectionEnabled.GetAsBool() {
return make(map[int64]float64)
}
dataNodeMemoryLowWaterLevel := Params.QuotaConfig.DataNodeMemoryLowWaterLevel.GetAsFloat()
dataNodeMemoryHighWaterLevel := Params.QuotaConfig.DataNodeMemoryHighWaterLevel.GetAsFloat()
queryNodeMemoryLowWaterLevel := Params.QuotaConfig.QueryNodeMemoryLowWaterLevel.GetAsFloat()
queryNodeMemoryHighWaterLevel := Params.QuotaConfig.QueryNodeMemoryHighWaterLevel.GetAsFloat()
collectionFactor := make(map[int64]float64)
updateCollectionFactor := func(factor float64, collections []int64) {
for _, collection := range collections {
_, ok := collectionFactor[collection]
if !ok || collectionFactor[collection] > factor {
collectionFactor[collection] = factor
}
}
}
for nodeID, metric := range q.queryNodeMetrics {
memoryWaterLevel := float64(metric.Hms.MemoryUsage) / float64(metric.Hms.Memory)
if memoryWaterLevel <= queryNodeMemoryLowWaterLevel {
continue
}
if memoryWaterLevel >= queryNodeMemoryHighWaterLevel {
log.RatedWarn(10, "QuotaCenter: QueryNode memory to high water level",
zap.String("Node", fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID)),
zap.Int64s("collections", metric.Effect.CollectionIDs),
zap.Uint64("UsedMem", metric.Hms.MemoryUsage),
zap.Uint64("TotalMem", metric.Hms.Memory),
zap.Float64("curWatermark", memoryWaterLevel),
zap.Float64("lowWatermark", queryNodeMemoryLowWaterLevel),
zap.Float64("highWatermark", queryNodeMemoryHighWaterLevel))
updateCollectionFactor(0, metric.Effect.CollectionIDs)
continue
}
factor := (queryNodeMemoryHighWaterLevel - memoryWaterLevel) / (queryNodeMemoryHighWaterLevel - queryNodeMemoryLowWaterLevel)
updateCollectionFactor(factor, metric.Effect.CollectionIDs)
log.RatedWarn(10, "QuotaCenter: QueryNode memory to low water level, limit writing rate",
zap.String("Node", fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID)),
zap.Int64s("collections", metric.Effect.CollectionIDs),
zap.Uint64("UsedMem", metric.Hms.MemoryUsage),
zap.Uint64("TotalMem", metric.Hms.Memory),
zap.Float64("curWatermark", memoryWaterLevel),
zap.Float64("lowWatermark", queryNodeMemoryLowWaterLevel),
zap.Float64("highWatermark", queryNodeMemoryHighWaterLevel))
}
for nodeID, metric := range q.dataNodeMetrics {
memoryWaterLevel := float64(metric.Hms.MemoryUsage) / float64(metric.Hms.Memory)
if memoryWaterLevel <= dataNodeMemoryLowWaterLevel {
continue
}
if memoryWaterLevel >= dataNodeMemoryHighWaterLevel {
log.RatedWarn(10, "QuotaCenter: DataNode memory to high water level",
zap.String("Node", fmt.Sprintf("%s-%d", typeutil.DataNodeRole, nodeID)),
zap.Int64s("collections", metric.Effect.CollectionIDs),
zap.Uint64("UsedMem", metric.Hms.MemoryUsage),
zap.Uint64("TotalMem", metric.Hms.Memory),
zap.Float64("curWatermark", memoryWaterLevel),
zap.Float64("lowWatermark", dataNodeMemoryLowWaterLevel),
zap.Float64("highWatermark", dataNodeMemoryHighWaterLevel))
updateCollectionFactor(0, metric.Effect.CollectionIDs)
continue
}
factor := (dataNodeMemoryHighWaterLevel - memoryWaterLevel) / (dataNodeMemoryHighWaterLevel - dataNodeMemoryLowWaterLevel)
log.RatedWarn(10, "QuotaCenter: DataNode memory to low water level, limit writing rate",
zap.String("Node", fmt.Sprintf("%s-%d", typeutil.DataNodeRole, nodeID)),
zap.Int64s("collections", metric.Effect.CollectionIDs),
zap.Uint64("UsedMem", metric.Hms.MemoryUsage),
zap.Uint64("TotalMem", metric.Hms.Memory),
zap.Float64("curWatermark", memoryWaterLevel),
zap.Float64("lowWatermark", dataNodeMemoryLowWaterLevel),
zap.Float64("highWatermark", dataNodeMemoryHighWaterLevel))
updateCollectionFactor(factor, metric.Effect.CollectionIDs)
}
return collectionFactor
}
func (q *QuotaCenter) getGrowingSegmentsSizeFactor() map[int64]float64 {
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
if !Params.QuotaConfig.GrowingSegmentsSizeProtectionEnabled.GetAsBool() {
return make(map[int64]float64)
}
low := Params.QuotaConfig.GrowingSegmentsSizeLowWaterLevel.GetAsFloat()
high := Params.QuotaConfig.GrowingSegmentsSizeHighWaterLevel.GetAsFloat()
collectionFactor := make(map[int64]float64)
updateCollectionFactor := func(factor float64, collections []int64) {
for _, collection := range collections {
_, ok := collectionFactor[collection]
if !ok || collectionFactor[collection] > factor {
collectionFactor[collection] = factor
}
}
}
for nodeID, metric := range q.queryNodeMetrics {
cur := float64(metric.GrowingSegmentsSize) / float64(metric.Hms.Memory)
if cur <= low {
continue
}
factor := (high - cur) / (high - low)
if factor < Params.QuotaConfig.GrowingSegmentsSizeMinRateRatio.GetAsFloat() {
factor = Params.QuotaConfig.GrowingSegmentsSizeMinRateRatio.GetAsFloat()
}
updateCollectionFactor(factor, metric.Effect.CollectionIDs)
log.RatedWarn(10, "QuotaCenter: QueryNode growing segments size exceeds watermark, limit writing rate",
zap.String("Node", fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID)),
zap.Int64s("collections", metric.Effect.CollectionIDs),
zap.Int64("segmentsSize", metric.GrowingSegmentsSize),
zap.Uint64("TotalMem", metric.Hms.Memory),
zap.Float64("highWatermark", high),
zap.Float64("lowWatermark", low),
zap.Float64("factor", factor))
}
return collectionFactor
}
// calculateRates calculates target rates by different strategies.
func (q *QuotaCenter) calculateRates() error {
err := q.resetAllCurrentRates()
if err != nil {
log.Warn("QuotaCenter resetAllCurrentRates failed", zap.Error(err))
return err
}
err = q.calculateWriteRates()
if err != nil {
log.Warn("QuotaCenter calculateWriteRates failed", zap.Error(err))
return err
}
err = q.calculateReadRates()
if err != nil {
log.Warn("QuotaCenter calculateReadRates failed", zap.Error(err))
return err
}
// log.Debug("QuotaCenter calculates rate done", zap.Any("rates", q.currentRates))
return nil
}
func (q *QuotaCenter) resetAllCurrentRates() error {
q.rateLimiter = rlinternal.NewRateLimiterTree(initInfLimiter(internalpb.RateScope_Cluster, allOps))
initLimiters := func(sourceCollections map[int64]map[int64][]int64) {
for dbID, collections := range sourceCollections {
for collectionID, partitionIDs := range collections {
getCollectionLimitVal := func(rateType internalpb.RateType) Limit {
limitVal, err := q.getCollectionMaxLimit(rateType, collectionID)
if err != nil {
return Limit(quota.GetQuotaValue(internalpb.RateScope_Collection, rateType, Params))
}
return limitVal
}
for _, partitionID := range partitionIDs {
q.rateLimiter.GetOrCreatePartitionLimiters(dbID, collectionID, partitionID,
newParamLimiterFunc(internalpb.RateScope_Database, allOps),
newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal),
newParamLimiterFunc(internalpb.RateScope_Partition, allOps))
}
if len(partitionIDs) == 0 {
q.rateLimiter.GetOrCreateCollectionLimiters(dbID, collectionID,
newParamLimiterFunc(internalpb.RateScope_Database, allOps),
newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal))
}
}
if len(collections) == 0 {
q.rateLimiter.GetOrCreateDatabaseLimiters(dbID, newParamLimiterFunc(internalpb.RateScope_Database, allOps))
}
}
}
initLimiters(q.readableCollections)
initLimiters(q.writableCollections)
return nil
}
// getCollectionMaxLimit get limit value from collection's properties.
func (q *QuotaCenter) getCollectionMaxLimit(rt internalpb.RateType, collectionID int64) (ratelimitutil.Limit, error) {
collectionProps := q.getCollectionLimitProperties(collectionID)
switch rt {
case internalpb.RateType_DMLInsert:
return Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionInsertRateMaxKey)), nil
case internalpb.RateType_DMLUpsert:
return Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionUpsertRateMaxKey)), nil
case internalpb.RateType_DMLDelete:
return Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionDeleteRateMaxKey)), nil
case internalpb.RateType_DMLBulkLoad:
return Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionBulkLoadRateMaxKey)), nil
case internalpb.RateType_DQLSearch:
return Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionSearchRateMaxKey)), nil
case internalpb.RateType_DQLQuery:
return Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionQueryRateMaxKey)), nil
default:
return 0, fmt.Errorf("unsupportd rate type:%s", rt.String())
}
}
func (q *QuotaCenter) getCollectionLimitProperties(collection int64) map[string]string {
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
collectionInfo, err := q.meta.GetCollectionByIDWithMaxTs(context.TODO(), collection)
if err != nil {
log.RatedWarn(10, "failed to get rate limit properties from collection meta",
zap.Int64("collectionID", collection),
zap.Error(err))
return make(map[string]string)
}
properties := make(map[string]string)
for _, pair := range collectionInfo.Properties {
properties[pair.GetKey()] = pair.GetValue()
}
return properties
}
// checkDiskQuota checks if disk quota exceeded.
func (q *QuotaCenter) checkDiskQuota() error {
q.diskMu.Lock()
defer q.diskMu.Unlock()
if !Params.QuotaConfig.DiskProtectionEnabled.GetAsBool() {
return nil
}
if q.dataCoordMetrics == nil {
return nil
}
totalDiskQuota := Params.QuotaConfig.DiskQuota.GetAsFloat()
total := q.dataCoordMetrics.TotalBinlogSize
if float64(total) >= totalDiskQuota {
err := q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, true, nil, nil, nil)
if err != nil {
log.Warn("fail to force deny writing", zap.Error(err))
}
return err
}
collectionDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat()
dbSizeInfo := make(map[int64]int64)
collections := make([]int64, 0)
for collection, binlogSize := range q.dataCoordMetrics.CollectionBinlogSize {
collectionProps := q.getCollectionLimitProperties(collection)
colDiskQuota := getRateLimitConfig(collectionProps, common.CollectionDiskQuotaKey, collectionDiskQuota)
if float64(binlogSize) >= colDiskQuota {
log.RatedWarn(10, "collection disk quota exceeded",
zap.Int64("collection", collection),
zap.Int64("coll disk usage", binlogSize),
zap.Float64("coll disk quota", colDiskQuota))
collections = append(collections, collection)
}
dbID, ok := q.collectionIDToDBID.Get(collection)
if !ok {
log.Warn("cannot find db id for collection", zap.Int64("collection", collection))
continue
}
dbSizeInfo[dbID] += binlogSize
}
dbs := make([]int64, 0)
dbDiskQuota := Params.QuotaConfig.DiskQuotaPerDB.GetAsFloat()
for dbID, binlogSize := range dbSizeInfo {
if float64(binlogSize) >= dbDiskQuota {
log.RatedWarn(10, "db disk quota exceeded",
zap.Int64("db", dbID),
zap.Int64("db disk usage", binlogSize),
zap.Float64("db disk quota", dbDiskQuota))
dbs = append(dbs, dbID)
}
}
col2partitions := make(map[int64][]int64)
partitionDiskQuota := Params.QuotaConfig.DiskQuotaPerPartition.GetAsFloat()
for collection, partitions := range q.dataCoordMetrics.PartitionsBinlogSize {
for partition, binlogSize := range partitions {
if float64(binlogSize) >= partitionDiskQuota {
log.RatedWarn(10, "partition disk quota exceeded",
zap.Int64("collection", collection),
zap.Int64("partition", partition),
zap.Int64("part disk usage", binlogSize),
zap.Float64("part disk quota", partitionDiskQuota))
col2partitions[collection] = append(col2partitions[collection], partition)
}
}
}
err := q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, false, dbs, collections, col2partitions)
if err != nil {
log.Warn("fail to force deny writing", zap.Error(err))
return err
}
q.totalBinlogSize = total
return nil
}
func (q *QuotaCenter) toRequestLimiter(limiter *rlinternal.RateLimiterNode) *proxypb.Limiter {
var rates []*internalpb.Rate
switch q.rateAllocateStrategy {
case Average:
proxyNum := q.proxies.GetProxyCount()
if proxyNum == 0 {
return nil
}
limiter.GetLimiters().Range(func(rt internalpb.RateType, limiter *ratelimitutil.Limiter) bool {
if !limiter.HasUpdated() {
return true
}
r := limiter.Limit()
if r != Inf {
rates = append(rates, &internalpb.Rate{Rt: rt, R: float64(r) / float64(proxyNum)})
}
return true
})
case ByRateWeight:
// TODO: support ByRateWeight
}
size := limiter.GetQuotaStates().Len()
states := make([]milvuspb.QuotaState, 0, size)
codes := make([]commonpb.ErrorCode, 0, size)
limiter.GetQuotaStates().Range(func(state milvuspb.QuotaState, code commonpb.ErrorCode) bool {
states = append(states, state)
codes = append(codes, code)
return true
})
return &proxypb.Limiter{
Rates: rates,
States: states,
Codes: codes,
}
}
func (q *QuotaCenter) toRatesRequest() *proxypb.SetRatesRequest {
clusterRateLimiter := q.rateLimiter.GetRootLimiters()
// collect db rate limit if clusterRateLimiter has database limiter children
dbLimiters := make(map[int64]*proxypb.LimiterNode, clusterRateLimiter.GetChildren().Len())
clusterRateLimiter.GetChildren().Range(func(dbID int64, dbRateLimiters *rlinternal.RateLimiterNode) bool {
dbLimiter := q.toRequestLimiter(dbRateLimiters)
// collect collection rate limit if dbRateLimiters has collection limiter children
collectionLimiters := make(map[int64]*proxypb.LimiterNode, dbRateLimiters.GetChildren().Len())
dbRateLimiters.GetChildren().Range(func(collectionID int64, collectionRateLimiters *rlinternal.RateLimiterNode) bool {
collectionLimiter := q.toRequestLimiter(collectionRateLimiters)
// collect partitions rate limit if collectionRateLimiters has partition limiter children
partitionLimiters := make(map[int64]*proxypb.LimiterNode, collectionRateLimiters.GetChildren().Len())
collectionRateLimiters.GetChildren().Range(func(partitionID int64, partitionRateLimiters *rlinternal.RateLimiterNode) bool {
partitionLimiters[partitionID] = &proxypb.LimiterNode{
Limiter: q.toRequestLimiter(partitionRateLimiters),
Children: make(map[int64]*proxypb.LimiterNode, 0),
}
return true
})
collectionLimiters[collectionID] = &proxypb.LimiterNode{
Limiter: collectionLimiter,
Children: partitionLimiters,
}
return true
})
dbLimiters[dbID] = &proxypb.LimiterNode{
Limiter: dbLimiter,
Children: collectionLimiters,
}
return true
})
clusterLimiter := &proxypb.LimiterNode{
Limiter: q.toRequestLimiter(clusterRateLimiter),
Children: dbLimiters,
}
timestamp := tsoutil.ComposeTSByTime(time.Now(), 0)
return &proxypb.SetRatesRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgID(int64(timestamp)),
commonpbutil.WithTimeStamp(timestamp),
),
Rates: []*proxypb.CollectionRate{},
RootLimiter: clusterLimiter,
}
}
// sendRatesToProxy notifies Proxies to set rates for different rate types.
func (q *QuotaCenter) sendRatesToProxy() error {
ctx, cancel := context.WithTimeout(context.Background(), SetRatesTimeout)
defer cancel()
return q.proxies.SetRates(ctx, q.toRatesRequest())
}
// recordMetrics records metrics of quota states.
func (q *QuotaCenter) recordMetrics() {
dbIDs := make(map[int64]string, q.dbs.Len())
collectionIDs := make(map[int64]string, q.collections.Len())
q.dbs.Range(func(name string, id int64) bool {
dbIDs[id] = name
return true
})
q.collections.Range(func(name string, id int64) bool {
_, collectionName := SplitCollectionKey(name)
collectionIDs[id] = collectionName
return true
})
record := func(errorCode commonpb.ErrorCode) {
rlinternal.TraverseRateLimiterTree(q.rateLimiter.GetRootLimiters(), nil,
func(node *rlinternal.RateLimiterNode, state milvuspb.QuotaState, errCode commonpb.ErrorCode) bool {
if errCode == errorCode {
var name string
switch node.Level() {
case internalpb.RateScope_Cluster:
name = "cluster"
case internalpb.RateScope_Database:
name = "db_" + dbIDs[node.GetID()]
case internalpb.RateScope_Collection:
name = "collection_" + collectionIDs[node.GetID()]
default:
return false
}
metrics.RootCoordQuotaStates.WithLabelValues(errorCode.String(), name).Set(1.0)
return false
}
return true
})
}
record(commonpb.ErrorCode_MemoryQuotaExhausted)
record(commonpb.ErrorCode_DiskQuotaExhausted)
record(commonpb.ErrorCode_TimeTickLongDelay)
}
func (q *QuotaCenter) diskAllowance(collection UniqueID) float64 {
q.diskMu.Lock()
defer q.diskMu.Unlock()
if !Params.QuotaConfig.DiskProtectionEnabled.GetAsBool() {
return math.MaxInt64
}
if q.dataCoordMetrics == nil {
return math.MaxInt64
}
totalDiskQuota := Params.QuotaConfig.DiskQuota.GetAsFloat()
colDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat()
allowance := math.Min(totalDiskQuota, colDiskQuota)
if binlogSize, ok := q.dataCoordMetrics.CollectionBinlogSize[collection]; ok {
allowance = math.Min(allowance, colDiskQuota-float64(binlogSize))
}
allowance = math.Min(allowance, totalDiskQuota-float64(q.totalBinlogSize))
return allowance
}