mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-03 12:29:36 +08:00
ed8836cd15
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
718 lines
25 KiB
Go
718 lines
25 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rootcoord
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
|
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
|
"github.com/milvus-io/milvus/internal/tso"
|
|
"github.com/milvus-io/milvus/internal/types"
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/metrics"
|
|
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
|
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
|
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
|
|
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
|
"github.com/samber/lo"
|
|
)
|
|
|
|
const (
|
|
GetMetricsTimeout = 10 * time.Second
|
|
SetRatesTimeout = 10 * time.Second
|
|
)
|
|
|
|
type RateAllocateStrategy int32
|
|
|
|
const (
|
|
Average RateAllocateStrategy = 0
|
|
ByRateWeight RateAllocateStrategy = 1
|
|
)
|
|
|
|
var DefaultRateAllocateStrategy = Average
|
|
|
|
const Inf = ratelimitutil.Inf
|
|
|
|
type Limit = ratelimitutil.Limit
|
|
|
|
type collectionRates = map[internalpb.RateType]Limit
|
|
|
|
type collectionStates = map[milvuspb.QuotaState]commonpb.ErrorCode
|
|
|
|
// QuotaCenter manages the quota and limitations of the whole cluster,
|
|
// it receives metrics info from DataNodes, QueryNodes and Proxies, and
|
|
// notifies Proxies to limit rate of requests from clients or reject
|
|
// all requests when the cluster met resources issues.
|
|
// Limitations:
|
|
// 1. DML throughput limitation;
|
|
// 2. DDL, DQL qps/rps limitation;
|
|
//
|
|
// Protections:
|
|
// 1. TT protection -> dqlRate = maxDQLRate * (maxDelay - ttDelay) / maxDelay
|
|
// 2. Memory protection -> dmlRate = maxDMLRate * (highMem - curMem) / (highMem - lowMem)
|
|
// 3. Disk quota protection -> force deny writing if exceeded
|
|
// 4. DQL Queue length protection -> dqlRate = curDQLRate * CoolOffSpeed
|
|
// 5. DQL queue latency protection -> dqlRate = curDQLRate * CoolOffSpeed
|
|
// 6. Search result protection -> searchRate = curSearchRate * CoolOffSpeed
|
|
//
|
|
// If necessary, user can also manually force to deny RW requests.
|
|
type QuotaCenter struct {
|
|
// clients
|
|
proxies *proxyClientManager
|
|
queryCoord types.QueryCoord
|
|
dataCoord types.DataCoord
|
|
|
|
// metrics
|
|
queryNodeMetrics map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics
|
|
dataNodeMetrics map[UniqueID]*metricsinfo.DataNodeQuotaMetrics
|
|
proxyMetrics map[UniqueID]*metricsinfo.ProxyQuotaMetrics
|
|
diskMu sync.Mutex // guards dataCoordMetrics and totalBinlogSize
|
|
dataCoordMetrics *metricsinfo.DataCoordQuotaMetrics
|
|
totalBinlogSize int64
|
|
|
|
readableCollections []int64
|
|
writableCollections []int64
|
|
|
|
currentRates map[int64]collectionRates
|
|
quotaStates map[int64]collectionStates
|
|
tsoAllocator tso.Allocator
|
|
|
|
rateAllocateStrategy RateAllocateStrategy
|
|
|
|
stopOnce sync.Once
|
|
stopChan chan struct{}
|
|
}
|
|
|
|
// NewQuotaCenter returns a new QuotaCenter.
|
|
func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoord, dataCoord types.DataCoord, tsoAllocator tso.Allocator) *QuotaCenter {
|
|
return &QuotaCenter{
|
|
proxies: proxies,
|
|
queryCoord: queryCoord,
|
|
dataCoord: dataCoord,
|
|
currentRates: make(map[int64]map[internalpb.RateType]Limit),
|
|
quotaStates: make(map[int64]map[milvuspb.QuotaState]commonpb.ErrorCode),
|
|
tsoAllocator: tsoAllocator,
|
|
readableCollections: make([]int64, 0),
|
|
writableCollections: make([]int64, 0),
|
|
|
|
rateAllocateStrategy: DefaultRateAllocateStrategy,
|
|
stopChan: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// run starts the service of QuotaCenter.
|
|
func (q *QuotaCenter) run() {
|
|
log.Info("Start QuotaCenter", zap.Float64("collectInterval/s", Params.QuotaConfig.QuotaCenterCollectInterval.GetAsFloat()))
|
|
ticker := time.NewTicker(time.Duration(Params.QuotaConfig.QuotaCenterCollectInterval.GetAsFloat() * float64(time.Second)))
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-q.stopChan:
|
|
log.Info("QuotaCenter exit")
|
|
return
|
|
case <-ticker.C:
|
|
err := q.syncMetrics()
|
|
if err != nil {
|
|
log.Warn("quotaCenter sync metrics failed", zap.Error(err))
|
|
break
|
|
}
|
|
err = q.calculateRates()
|
|
if err != nil {
|
|
log.Warn("quotaCenter calculate rates failed", zap.Error(err))
|
|
break
|
|
}
|
|
err = q.setRates()
|
|
if err != nil {
|
|
log.Warn("quotaCenter setRates failed", zap.Error(err))
|
|
}
|
|
q.recordMetrics()
|
|
}
|
|
}
|
|
}
|
|
|
|
// stop would stop the service of QuotaCenter.
|
|
func (q *QuotaCenter) stop() {
|
|
q.stopOnce.Do(func() {
|
|
q.stopChan <- struct{}{}
|
|
})
|
|
}
|
|
|
|
// clearMetrics removes all metrics stored in QuotaCenter.
|
|
func (q *QuotaCenter) clearMetrics() {
|
|
q.dataNodeMetrics = make(map[UniqueID]*metricsinfo.DataNodeQuotaMetrics, 0)
|
|
q.queryNodeMetrics = make(map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics, 0)
|
|
q.proxyMetrics = make(map[UniqueID]*metricsinfo.ProxyQuotaMetrics, 0)
|
|
}
|
|
|
|
// syncMetrics sends GetMetrics requests to DataCoord and QueryCoord to sync the metrics in DataNodes and QueryNodes.
|
|
func (q *QuotaCenter) syncMetrics() error {
|
|
q.clearMetrics()
|
|
ctx, cancel := context.WithTimeout(context.Background(), GetMetricsTimeout)
|
|
defer cancel()
|
|
|
|
group := &errgroup.Group{}
|
|
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// get Query cluster metrics
|
|
group.Go(func() error {
|
|
rsp, err := q.queryCoord.GetMetrics(ctx, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if rsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
|
return fmt.Errorf("quotaCenter get Query cluster failed, err = %s", rsp.GetStatus().GetReason())
|
|
}
|
|
queryCoordTopology := &metricsinfo.QueryCoordTopology{}
|
|
err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), queryCoordTopology)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
collections := typeutil.NewUniqueSet()
|
|
for _, queryNodeMetric := range queryCoordTopology.Cluster.ConnectedNodes {
|
|
if queryNodeMetric.QuotaMetrics != nil {
|
|
q.queryNodeMetrics[queryNodeMetric.ID] = queryNodeMetric.QuotaMetrics
|
|
collections.Insert(queryNodeMetric.QuotaMetrics.Effect.CollectionIDs...)
|
|
}
|
|
}
|
|
q.readableCollections = collections.Collect()
|
|
return nil
|
|
})
|
|
// get Data cluster metrics
|
|
group.Go(func() error {
|
|
rsp, err := q.dataCoord.GetMetrics(ctx, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if rsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
|
return fmt.Errorf("quotaCenter get Data cluster failed, err = %s", rsp.GetStatus().GetReason())
|
|
}
|
|
dataCoordTopology := &metricsinfo.DataCoordTopology{}
|
|
err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), dataCoordTopology)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
collections := typeutil.NewUniqueSet()
|
|
for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes {
|
|
if dataNodeMetric.QuotaMetrics != nil {
|
|
q.dataNodeMetrics[dataNodeMetric.ID] = dataNodeMetric.QuotaMetrics
|
|
collections.Insert(dataNodeMetric.QuotaMetrics.Effect.CollectionIDs...)
|
|
}
|
|
}
|
|
q.writableCollections = collections.Collect()
|
|
q.diskMu.Lock()
|
|
if dataCoordTopology.Cluster.Self.QuotaMetrics != nil {
|
|
q.dataCoordMetrics = dataCoordTopology.Cluster.Self.QuotaMetrics
|
|
}
|
|
q.diskMu.Unlock()
|
|
return nil
|
|
})
|
|
// get Proxies metrics
|
|
group.Go(func() error {
|
|
// TODO: get more proxy metrics info
|
|
rsps, err := q.proxies.GetProxyMetrics(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, rsp := range rsps {
|
|
proxyMetric := &metricsinfo.ProxyInfos{}
|
|
err = metricsinfo.UnmarshalComponentInfos(rsp.GetResponse(), proxyMetric)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if proxyMetric.QuotaMetrics != nil {
|
|
q.proxyMetrics[proxyMetric.ID] = proxyMetric.QuotaMetrics
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
err = group.Wait()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
//log.Debug("QuotaCenter sync metrics done",
|
|
// zap.Any("dataNodeMetrics", q.dataNodeMetrics),
|
|
// zap.Any("queryNodeMetrics", q.queryNodeMetrics),
|
|
// zap.Any("proxyMetrics", q.proxyMetrics),
|
|
// zap.Any("dataCoordMetrics", q.dataCoordMetrics))
|
|
return nil
|
|
}
|
|
|
|
// forceDenyWriting sets dml rates to 0 to reject all dml requests.
|
|
func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, collections ...int64) {
|
|
if len(collections) == 0 && len(q.writableCollections) != 0 {
|
|
// default to all writable collections
|
|
collections = q.writableCollections
|
|
}
|
|
for _, collection := range collections {
|
|
q.currentRates[collection][internalpb.RateType_DMLInsert] = 0
|
|
q.currentRates[collection][internalpb.RateType_DMLDelete] = 0
|
|
q.currentRates[collection][internalpb.RateType_DMLBulkLoad] = 0
|
|
q.quotaStates[collection][milvuspb.QuotaState_DenyToWrite] = errorCode
|
|
}
|
|
log.Warn("QuotaCenter force to deny writing",
|
|
zap.Int64s("collectionIDs", collections),
|
|
zap.String("reason", errorCode.String()))
|
|
}
|
|
|
|
// forceDenyReading sets dql rates to 0 to reject all dql requests.
|
|
func (q *QuotaCenter) forceDenyReading(errorCode commonpb.ErrorCode, collections ...int64) {
|
|
if len(collections) == 0 {
|
|
// default to all readable collections
|
|
collections = q.readableCollections
|
|
}
|
|
for _, collection := range collections {
|
|
q.currentRates[collection][internalpb.RateType_DQLSearch] = 0
|
|
q.currentRates[collection][internalpb.RateType_DQLQuery] = 0
|
|
q.quotaStates[collection][milvuspb.QuotaState_DenyToRead] = errorCode
|
|
}
|
|
log.Warn("QuotaCenter force to deny reading",
|
|
zap.Int64s("collectionIDs", collections),
|
|
zap.String("reason", errorCode.String()))
|
|
}
|
|
|
|
// getRealTimeRate return real time rate in Proxy.
|
|
func (q *QuotaCenter) getRealTimeRate(rateType internalpb.RateType) float64 {
|
|
var rate float64
|
|
for _, metric := range q.proxyMetrics {
|
|
for _, r := range metric.Rms {
|
|
if r.Label == rateType.String() {
|
|
rate += r.Rate
|
|
}
|
|
}
|
|
}
|
|
return rate
|
|
}
|
|
|
|
// guaranteeMinRate make sure the rate will not be less than the min rate.
|
|
func (q *QuotaCenter) guaranteeMinRate(minRate float64, rateType internalpb.RateType, collections ...int64) {
|
|
for _, collection := range collections {
|
|
if minRate > 0 && q.currentRates[collection][rateType] < Limit(minRate) {
|
|
q.currentRates[collection][rateType] = Limit(minRate)
|
|
}
|
|
}
|
|
}
|
|
|
|
// calculateReadRates calculates and sets dql rates.
|
|
func (q *QuotaCenter) calculateReadRates() {
|
|
if Params.QuotaConfig.ForceDenyReading.GetAsBool() {
|
|
q.forceDenyReading(commonpb.ErrorCode_ForceDeny)
|
|
return
|
|
}
|
|
|
|
enableQueueProtection := Params.QuotaConfig.QueueProtectionEnabled.GetAsBool()
|
|
if !enableQueueProtection {
|
|
return
|
|
}
|
|
|
|
limitCollectionSet := typeutil.NewUniqueSet()
|
|
|
|
// query latency
|
|
queueLatencyThreshold := Params.QuotaConfig.QueueLatencyThreshold.GetAsDuration(time.Second)
|
|
// queueLatencyThreshold >= 0 means enable queue latency protection
|
|
if queueLatencyThreshold >= 0 {
|
|
for _, metric := range q.queryNodeMetrics {
|
|
searchLatency := metric.SearchQueue.AvgQueueDuration
|
|
queryLatency := metric.QueryQueue.AvgQueueDuration
|
|
if searchLatency >= queueLatencyThreshold || queryLatency >= queueLatencyThreshold {
|
|
limitCollectionSet.Insert(metric.Effect.CollectionIDs...)
|
|
}
|
|
}
|
|
}
|
|
|
|
// queue length
|
|
nqInQueueThreshold := Params.QuotaConfig.NQInQueueThreshold.GetAsInt64()
|
|
if nqInQueueThreshold >= 0 {
|
|
// >= 0 means enable queue length protection
|
|
sum := func(ri metricsinfo.ReadInfoInQueue) int64 {
|
|
return ri.UnsolvedQueue + ri.ReadyQueue + ri.ReceiveChan + ri.ExecuteChan
|
|
}
|
|
for _, metric := range q.queryNodeMetrics {
|
|
searchNQSum := sum(metric.SearchQueue)
|
|
queryTasksSum := sum(metric.QueryQueue)
|
|
nqInQueue := searchNQSum + queryTasksSum // We think of the NQ of query request as 1.
|
|
if nqInQueue >= nqInQueueThreshold {
|
|
limitCollectionSet.Insert(metric.Effect.CollectionIDs...)
|
|
}
|
|
}
|
|
}
|
|
|
|
// read result
|
|
enableResultProtection := Params.QuotaConfig.ResultProtectionEnabled.GetAsBool()
|
|
if enableResultProtection {
|
|
maxRate := Params.QuotaConfig.MaxReadResultRate.GetAsFloat()
|
|
rateCount := float64(0)
|
|
for _, metric := range q.proxyMetrics {
|
|
for _, rm := range metric.Rms {
|
|
if rm.Label == metricsinfo.ReadResultThroughput {
|
|
rateCount += rm.Rate
|
|
}
|
|
}
|
|
}
|
|
if rateCount >= maxRate {
|
|
limitCollectionSet.Insert(q.readableCollections...)
|
|
}
|
|
}
|
|
|
|
coolOffSpeed := Params.QuotaConfig.CoolOffSpeed.GetAsFloat()
|
|
coolOff := func(realTimeSearchRate float64, realTimeQueryRate float64, collections ...int64) {
|
|
for _, collection := range collections {
|
|
if q.currentRates[collection][internalpb.RateType_DQLSearch] != Inf && realTimeSearchRate > 0 {
|
|
q.currentRates[collection][internalpb.RateType_DQLSearch] = Limit(realTimeSearchRate * coolOffSpeed)
|
|
log.Warn("QuotaCenter cool read rates off done",
|
|
zap.Int64("collectionID", collection),
|
|
zap.Any("searchRate", q.currentRates[collection][internalpb.RateType_DQLSearch]))
|
|
}
|
|
if q.currentRates[collection][internalpb.RateType_DQLQuery] != Inf && realTimeQueryRate > 0 {
|
|
q.currentRates[collection][internalpb.RateType_DQLQuery] = Limit(realTimeQueryRate * coolOffSpeed)
|
|
log.Warn("QuotaCenter cool read rates off done",
|
|
zap.Int64("collectionID", collection),
|
|
zap.Any("queryRate", q.currentRates[collection][internalpb.RateType_DQLQuery]))
|
|
}
|
|
}
|
|
|
|
q.guaranteeMinRate(Params.QuotaConfig.DQLMinSearchRate.GetAsFloat(), internalpb.RateType_DQLSearch, collections...)
|
|
q.guaranteeMinRate(Params.QuotaConfig.DQLMinQueryRate.GetAsFloat(), internalpb.RateType_DQLQuery, collections...)
|
|
log.Info("QueryNodeMetrics when cool-off",
|
|
zap.Any("metrics", q.queryNodeMetrics))
|
|
}
|
|
|
|
// TODO: unify search and query?
|
|
realTimeSearchRate := q.getRealTimeRate(internalpb.RateType_DQLSearch)
|
|
realTimeQueryRate := q.getRealTimeRate(internalpb.RateType_DQLQuery)
|
|
coolOff(realTimeSearchRate, realTimeQueryRate, limitCollectionSet.Collect()...)
|
|
}
|
|
|
|
// calculateWriteRates calculates and sets dml rates.
|
|
func (q *QuotaCenter) calculateWriteRates() error {
|
|
if Params.QuotaConfig.ForceDenyWriting.GetAsBool() {
|
|
q.forceDenyWriting(commonpb.ErrorCode_ForceDeny)
|
|
return nil
|
|
}
|
|
|
|
q.checkDiskQuota()
|
|
|
|
collectionFactor := map[int64]float64{}
|
|
for _, collection := range q.writableCollections {
|
|
collectionFactor[collection] = 1.0
|
|
}
|
|
|
|
ts, err := q.tsoAllocator.GenerateTSO(1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
|
|
curTs, _ := tsoutil.ParseTS(ts)
|
|
processTtDelay := func(role string, nodeID int64, vchannel string, minTs time.Time, delay time.Duration, effectedCollections []int64) {
|
|
log := log.With(zap.String("role", role),
|
|
zap.Int64("nodeID", nodeID),
|
|
zap.String("vchannel", vchannel),
|
|
zap.Time("curTs", curTs),
|
|
zap.Time("minTs", minTs),
|
|
zap.Duration("delay", delay),
|
|
zap.Duration("MaxDelay", maxDelay),
|
|
zap.Int64s("effectedCollections", effectedCollections),
|
|
)
|
|
|
|
factor := float64(maxDelay.Nanoseconds()-delay.Nanoseconds()) / float64(maxDelay.Nanoseconds())
|
|
|
|
if factor <= 0 {
|
|
log.Warn("QuotaCenter: force deny writing due to long timeTick delay")
|
|
q.forceDenyWriting(commonpb.ErrorCode_TimeTickLongDelay, effectedCollections...)
|
|
}
|
|
|
|
for _, collection := range effectedCollections {
|
|
if factor < collectionFactor[collection] {
|
|
collectionFactor[collection] = factor
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
processMemUsage := func(role string, nodeID int64, hms metricsinfo.HardwareMetrics, memLowLevel float64, memHighLevel float64, effectedCollections []int64) {
|
|
memoryWaterLevel := float64(hms.MemoryUsage) / float64(hms.Memory)
|
|
log := log.With(
|
|
zap.String("role", role),
|
|
zap.Int64("nodeID", nodeID),
|
|
zap.Uint64("usedMem", hms.MemoryUsage),
|
|
zap.Uint64("totalMem", hms.Memory),
|
|
zap.Float64("memoryWaterLevel", memoryWaterLevel),
|
|
zap.Float64("memoryHighWaterLevel", memHighLevel),
|
|
zap.Int64s("effectedCollections", effectedCollections),
|
|
)
|
|
|
|
factor := (memHighLevel - memoryWaterLevel) / (memHighLevel - memLowLevel)
|
|
|
|
if factor <= 0 {
|
|
log.Warn("QuotaCenter: force deny writing due to memory reach high water")
|
|
q.forceDenyWriting(commonpb.ErrorCode_MemoryQuotaExhausted, effectedCollections...)
|
|
}
|
|
|
|
if factor < 0.9 {
|
|
log.Warn("QuotaCenter: Node memory to low water level, limit writing rate",
|
|
zap.Float64("factor", factor))
|
|
}
|
|
|
|
for _, collection := range effectedCollections {
|
|
if factor < collectionFactor[collection] {
|
|
collectionFactor[collection] = factor
|
|
}
|
|
}
|
|
}
|
|
|
|
enableTtProtection := Params.QuotaConfig.TtProtectionEnabled.GetAsBool()
|
|
enableMemProtection := Params.QuotaConfig.MemProtectionEnabled.GetAsBool()
|
|
|
|
dataNodeMemoryLowWaterLevel := Params.QuotaConfig.DataNodeMemoryLowWaterLevel.GetAsFloat()
|
|
dataNodeMemoryHighWaterLevel := Params.QuotaConfig.DataNodeMemoryHighWaterLevel.GetAsFloat()
|
|
queryNodeMemoryLowWaterLevel := Params.QuotaConfig.QueryNodeMemoryLowWaterLevel.GetAsFloat()
|
|
queryNodeMemoryHighWaterLevel := Params.QuotaConfig.QueryNodeMemoryHighWaterLevel.GetAsFloat()
|
|
|
|
for nodeID, metric := range q.queryNodeMetrics {
|
|
if enableTtProtection && maxDelay >= 0 && metric.Fgm.NumFlowGraph > 0 && metric.Fgm.MinFlowGraphChannel != "" {
|
|
minTs, _ := tsoutil.ParseTS(metric.Fgm.MinFlowGraphTt)
|
|
delay := curTs.Sub(minTs)
|
|
processTtDelay(typeutil.QueryNodeRole, nodeID, metric.Fgm.MinFlowGraphChannel, minTs, delay, metric.Effect.CollectionIDs)
|
|
metrics.RootCoordTtDelay.WithLabelValues(typeutil.QueryNodeRole, strconv.FormatInt(nodeID, 10)).Set(float64(delay.Milliseconds()))
|
|
}
|
|
|
|
if enableMemProtection {
|
|
processMemUsage(typeutil.QueryNodeRole, nodeID, metric.Hms, queryNodeMemoryLowWaterLevel, queryNodeMemoryHighWaterLevel, metric.Effect.CollectionIDs)
|
|
}
|
|
}
|
|
|
|
for nodeID, metric := range q.dataNodeMetrics {
|
|
if enableTtProtection && maxDelay >= 0 && metric.Fgm.NumFlowGraph > 0 && metric.Fgm.MinFlowGraphChannel != "" {
|
|
minTs, _ := tsoutil.ParseTS(metric.Fgm.MinFlowGraphTt)
|
|
delay := curTs.Sub(minTs)
|
|
processTtDelay(typeutil.DataNodeRole, nodeID, metric.Fgm.MinFlowGraphChannel, minTs, delay, metric.Effect.CollectionIDs)
|
|
metrics.RootCoordTtDelay.WithLabelValues(typeutil.DataNodeRole, strconv.FormatInt(nodeID, 10)).Set(float64(delay.Milliseconds()))
|
|
}
|
|
|
|
if enableMemProtection {
|
|
processMemUsage(typeutil.QueryNodeRole, nodeID, metric.Hms, dataNodeMemoryLowWaterLevel, dataNodeMemoryHighWaterLevel, metric.Effect.CollectionIDs)
|
|
}
|
|
}
|
|
|
|
for collection, cf := range collectionFactor {
|
|
if q.currentRates[collection][internalpb.RateType_DMLInsert] != Inf && cf > 0 {
|
|
q.currentRates[collection][internalpb.RateType_DMLInsert] *= Limit(cf)
|
|
}
|
|
if q.currentRates[collection][internalpb.RateType_DMLDelete] != Inf && cf > 0 {
|
|
q.currentRates[collection][internalpb.RateType_DMLDelete] *= Limit(cf)
|
|
}
|
|
q.guaranteeMinRate(Params.QuotaConfig.DMLMinInsertRate.GetAsFloat(), internalpb.RateType_DMLInsert)
|
|
q.guaranteeMinRate(Params.QuotaConfig.DMLMinDeleteRate.GetAsFloat(), internalpb.RateType_DMLDelete)
|
|
log.Warn("QuotaCenter cool write rates off done",
|
|
zap.Int64("collectionID", collection),
|
|
zap.Float64("factor", cf))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// calculateRates calculates target rates by different strategies.
|
|
func (q *QuotaCenter) calculateRates() error {
|
|
q.resetAllCurrentRates()
|
|
|
|
err := q.calculateWriteRates()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
q.calculateReadRates()
|
|
|
|
// log.Debug("QuotaCenter calculates rate done", zap.Any("rates", q.currentRates))
|
|
return nil
|
|
}
|
|
|
|
func (q *QuotaCenter) resetAllCurrentRates() {
|
|
q.quotaStates = make(map[int64]map[milvuspb.QuotaState]commonpb.ErrorCode)
|
|
q.currentRates = map[int64]map[internalpb.RateType]ratelimitutil.Limit{}
|
|
for _, collection := range q.writableCollections {
|
|
q.resetCurrentRate(internalpb.RateType_DMLInsert, collection)
|
|
q.resetCurrentRate(internalpb.RateType_DMLDelete, collection)
|
|
q.resetCurrentRate(internalpb.RateType_DMLBulkLoad, collection)
|
|
}
|
|
|
|
for _, collection := range q.readableCollections {
|
|
q.resetCurrentRate(internalpb.RateType_DQLSearch, collection)
|
|
q.resetCurrentRate(internalpb.RateType_DQLQuery, collection)
|
|
}
|
|
}
|
|
|
|
// resetCurrentRates resets all current rates to configured rates.
|
|
func (q *QuotaCenter) resetCurrentRate(rt internalpb.RateType, collection int64) {
|
|
if q.currentRates[collection] == nil {
|
|
q.currentRates[collection] = make(map[internalpb.RateType]ratelimitutil.Limit)
|
|
}
|
|
|
|
if q.quotaStates[collection] == nil {
|
|
q.quotaStates[collection] = make(map[milvuspb.QuotaState]commonpb.ErrorCode)
|
|
}
|
|
switch rt {
|
|
case internalpb.RateType_DMLInsert:
|
|
q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DMLMaxInsertRate.GetAsFloat())
|
|
case internalpb.RateType_DMLDelete:
|
|
q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DMLMaxDeleteRate.GetAsFloat())
|
|
case internalpb.RateType_DMLBulkLoad:
|
|
q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DMLMaxBulkLoadRate.GetAsFloat())
|
|
case internalpb.RateType_DQLSearch:
|
|
q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DQLMaxSearchRate.GetAsFloat())
|
|
case internalpb.RateType_DQLQuery:
|
|
q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DQLMaxQueryRate.GetAsFloat())
|
|
}
|
|
if q.currentRates[collection][rt] < 0 {
|
|
q.currentRates[collection][rt] = Inf // no limit
|
|
}
|
|
}
|
|
|
|
// checkDiskQuota checks if disk quota exceeded.
|
|
func (q *QuotaCenter) checkDiskQuota() {
|
|
q.diskMu.Lock()
|
|
defer q.diskMu.Unlock()
|
|
if !Params.QuotaConfig.DiskProtectionEnabled.GetAsBool() {
|
|
return
|
|
}
|
|
if q.dataCoordMetrics == nil {
|
|
return
|
|
}
|
|
collections := typeutil.NewUniqueSet()
|
|
totalDiskQuota := Params.QuotaConfig.DiskQuota.GetAsFloat()
|
|
colDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat()
|
|
for collection, binlogSize := range q.dataCoordMetrics.CollectionBinlogSize {
|
|
if float64(binlogSize) >= colDiskQuota {
|
|
log.Warn("collection disk quota exceeded",
|
|
zap.Int64("collection", collection),
|
|
zap.Int64("coll disk usage", binlogSize),
|
|
zap.Float64("coll disk quota", colDiskQuota))
|
|
collections.Insert(collection)
|
|
}
|
|
}
|
|
if collections.Len() > 0 {
|
|
q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, collections.Collect()...)
|
|
}
|
|
total := q.dataCoordMetrics.TotalBinlogSize
|
|
if float64(total) >= totalDiskQuota {
|
|
log.Warn("total disk quota exceeded",
|
|
zap.Int64("total disk usage", total),
|
|
zap.Float64("total disk quota", totalDiskQuota))
|
|
q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted)
|
|
}
|
|
q.totalBinlogSize = total
|
|
}
|
|
|
|
// setRates notifies Proxies to set rates for different rate types.
|
|
func (q *QuotaCenter) setRates() error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), SetRatesTimeout)
|
|
defer cancel()
|
|
|
|
toCollectionRate := func(collection int64, currentRates map[internalpb.RateType]ratelimitutil.Limit) *proxypb.CollectionRate {
|
|
rates := make([]*internalpb.Rate, 0, len(q.currentRates))
|
|
switch q.rateAllocateStrategy {
|
|
case Average:
|
|
proxyNum := q.proxies.GetProxyCount()
|
|
if proxyNum == 0 {
|
|
return nil
|
|
}
|
|
for rt, r := range currentRates {
|
|
if r == Inf {
|
|
rates = append(rates, &internalpb.Rate{Rt: rt, R: float64(r)})
|
|
} else {
|
|
rates = append(rates, &internalpb.Rate{Rt: rt, R: float64(r) / float64(proxyNum)})
|
|
}
|
|
}
|
|
|
|
case ByRateWeight:
|
|
// TODO: support ByRateWeight
|
|
}
|
|
|
|
return &proxypb.CollectionRate{
|
|
Collection: collection,
|
|
Rates: rates,
|
|
States: lo.Keys(q.quotaStates[collection]),
|
|
Codes: lo.Values(q.quotaStates[collection]),
|
|
}
|
|
}
|
|
|
|
collectionRates := make([]*proxypb.CollectionRate, 0)
|
|
for collection, rates := range q.currentRates {
|
|
collectionRates = append(collectionRates, toCollectionRate(collection, rates))
|
|
}
|
|
timestamp := tsoutil.ComposeTSByTime(time.Now(), 0)
|
|
req := &proxypb.SetRatesRequest{
|
|
Base: commonpbutil.NewMsgBase(
|
|
commonpbutil.WithMsgID(int64(timestamp)),
|
|
commonpbutil.WithTimeStamp(timestamp),
|
|
),
|
|
Rates: collectionRates,
|
|
}
|
|
return q.proxies.SetRates(ctx, req)
|
|
}
|
|
|
|
// recordMetrics records metrics of quota states.
|
|
func (q *QuotaCenter) recordMetrics() {
|
|
record := func(errorCode commonpb.ErrorCode) {
|
|
for _, states := range q.quotaStates {
|
|
for _, state := range states {
|
|
if state == errorCode {
|
|
metrics.RootCoordQuotaStates.WithLabelValues(errorCode.String()).Set(1)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
metrics.RootCoordQuotaStates.WithLabelValues(errorCode.String()).Set(0)
|
|
}
|
|
record(commonpb.ErrorCode_MemoryQuotaExhausted)
|
|
record(commonpb.ErrorCode_DiskQuotaExhausted)
|
|
record(commonpb.ErrorCode_TimeTickLongDelay)
|
|
}
|
|
|
|
func (q *QuotaCenter) diskAllowance(collection UniqueID) float64 {
|
|
q.diskMu.Lock()
|
|
q.diskMu.Unlock()
|
|
if !Params.QuotaConfig.DiskProtectionEnabled.GetAsBool() {
|
|
return math.MaxInt64
|
|
}
|
|
if q.dataCoordMetrics == nil {
|
|
return math.MaxInt64
|
|
}
|
|
totalDiskQuota := Params.QuotaConfig.DiskQuota.GetAsFloat()
|
|
colDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat()
|
|
allowance := float64(math.MaxInt64)
|
|
if binlogSize, ok := q.dataCoordMetrics.CollectionBinlogSize[collection]; ok {
|
|
allowance = math.Min(allowance, colDiskQuota-float64(binlogSize))
|
|
}
|
|
allowance = math.Min(allowance, totalDiskQuota-float64(q.totalBinlogSize))
|
|
return allowance
|
|
}
|