enhance: avoid the repeated metric info in the proxy (#32380)

issue: #30577

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2024-04-19 10:21:20 +08:00 committed by GitHub
parent 18c3cbe46d
commit 31a29a2451
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 223 additions and 116 deletions

View File

@ -2706,17 +2706,16 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
return it.result, nil
}
func GetDBAndCollectionRateSubLabels(req any) []string {
subLabels := make([]string, 2)
func GetCollectionRateSubLabel(req any) string {
dbName, _ := requestutil.GetDbNameFromRequest(req)
if dbName != "" {
subLabels[0] = ratelimitutil.GetDBSubLabel(dbName.(string))
if dbName == "" {
return ""
}
collectionName, _ := requestutil.GetCollectionNameFromRequest(req)
if collectionName != "" {
subLabels[1] = ratelimitutil.GetCollectionSubLabel(dbName.(string), collectionName.(string))
if collectionName == "" {
return ""
}
return subLabels
return ratelimitutil.GetCollectionSubLabel(dbName.(string), collectionName.(string))
}
// Search searches the most similar records of requests.
@ -2753,8 +2752,8 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
request.GetCollectionName(),
).Add(float64(request.GetNq()))
subLabels := GetDBAndCollectionRateSubLabels(request)
rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(request.GetNq()), subLabels...)
subLabel := GetCollectionRateSubLabel(request)
rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(request.GetNq()), subLabel)
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.SearchResults{
@ -2931,7 +2930,7 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
}
metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabels...)
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
}
return qt.result, nil
}
@ -2962,12 +2961,12 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
request.GetCollectionName(),
).Add(float64(receiveSize))
subLabels := GetDBAndCollectionRateSubLabels(request)
subLabel := GetCollectionRateSubLabel(request)
allNQ := int64(0)
for _, searchRequest := range request.Requests {
allNQ += searchRequest.GetNq()
}
rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(allNQ), subLabels...)
rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(allNQ), subLabel)
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.SearchResults{
@ -3128,7 +3127,7 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
}
metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabels...)
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
}
return qt.result, nil
}
@ -3275,8 +3274,8 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes
request.GetCollectionName(),
).Add(float64(1))
subLabels := GetDBAndCollectionRateSubLabels(request)
rateCol.Add(internalpb.RateType_DQLQuery.String(), 1, subLabels...)
subLabel := GetCollectionRateSubLabel(request)
rateCol.Add(internalpb.RateType_DQLQuery.String(), 1, subLabel)
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.QueryResults{
@ -3394,7 +3393,7 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes
).Observe(float64(tr.ElapseSpan().Milliseconds()))
sentSize := proto.Size(qt.result)
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabels...)
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
return qt.result, nil

View File

@ -50,6 +50,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/resource"
)
@ -1636,3 +1637,33 @@ func TestProxy_ImportV2(t *testing.T) {
assert.Equal(t, int32(0), rsp.GetStatus().GetCode())
})
}
func TestGetCollectionRateSubLabel(t *testing.T) {
d := "db1"
collectionName := "test1"
t.Run("normal", func(t *testing.T) {
subLabel := GetCollectionRateSubLabel(&milvuspb.QueryRequest{
DbName: d,
CollectionName: collectionName,
})
assert.Equal(t, ratelimitutil.GetCollectionSubLabel(d, collectionName), subLabel)
})
t.Run("fail", func(t *testing.T) {
{
subLabel := GetCollectionRateSubLabel(&milvuspb.QueryRequest{
DbName: "",
CollectionName: collectionName,
})
assert.Equal(t, "", subLabel)
}
{
subLabel := GetCollectionRateSubLabel(&milvuspb.QueryRequest{
DbName: d,
CollectionName: "",
})
assert.Equal(t, "", subLabel)
}
})
}

View File

@ -681,6 +681,32 @@ func (q *QuotaCenter) calculateReadRates() error {
}
}
metricMap := make(map[string]float64) // label metric
collectionMetricMap := make(map[string]map[string]map[string]float64) // sub label metric, label -> db -> collection -> value
for _, metric := range q.proxyMetrics {
for _, rm := range metric.Rms {
if !ratelimitutil.IsSubLabel(rm.Label) {
metricMap[rm.Label] += rm.Rate
continue
}
mainLabel, database, collection, ok := ratelimitutil.SplitCollectionSubLabel(rm.Label)
if !ok {
continue
}
labelMetric, ok := collectionMetricMap[mainLabel]
if !ok {
labelMetric = make(map[string]map[string]float64)
collectionMetricMap[mainLabel] = labelMetric
}
databaseMetric, ok := labelMetric[database]
if !ok {
databaseMetric = make(map[string]float64)
labelMetric[database] = databaseMetric
}
databaseMetric[collection] += rm.Rate
}
}
// read result
enableResultProtection := Params.QuotaConfig.ResultProtectionEnabled.GetAsBool()
if enableResultProtection {
@ -688,24 +714,17 @@ func (q *QuotaCenter) calculateReadRates() error {
maxDBRate := Params.QuotaConfig.MaxReadResultRatePerDB.GetAsFloat()
maxCollectionRate := Params.QuotaConfig.MaxReadResultRatePerCollection.GetAsFloat()
rateCount := float64(0)
dbRateCount := make(map[string]float64)
collectionRateCount := make(map[string]float64)
for _, metric := range q.proxyMetrics {
for _, rm := range metric.Rms {
if rm.Label == metricsinfo.ReadResultThroughput {
rateCount += rm.Rate
continue
}
dbName, ok := ratelimitutil.GetDBFromSubLabel(metricsinfo.ReadResultThroughput, rm.Label)
if ok {
dbRateCount[dbName] += rm.Rate
continue
}
dbName, collectionName, ok := ratelimitutil.GetCollectionFromSubLabel(metricsinfo.ReadResultThroughput, rm.Label)
if ok {
collectionRateCount[formatCollctionRateKey(dbName, collectionName)] += rm.Rate
continue
rateCount := metricMap[metricsinfo.ReadResultThroughput]
for mainLabel, labelMetric := range collectionMetricMap {
if mainLabel != metricsinfo.ReadResultThroughput {
continue
}
for database, databaseMetric := range labelMetric {
for collection, metricValue := range databaseMetric {
dbRateCount[database] += metricValue
collectionRateCount[formatCollctionRateKey(database, collection)] = metricValue
}
}
}
@ -737,74 +756,55 @@ func (q *QuotaCenter) calculateReadRates() error {
})
coolOffSpeed := Params.QuotaConfig.CoolOffSpeed.GetAsFloat()
coolOffCollectionID := func(collections ...int64) error {
for _, collection := range collections {
dbID, ok := q.collectionIDToDBID.Get(collection)
if !ok {
return fmt.Errorf("db ID not found of collection ID: %d", collection)
}
collectionLimiter := q.rateLimiter.GetCollectionLimiters(dbID, collection)
if collectionLimiter == nil {
return fmt.Errorf("collection limiter not found: %d", collection)
}
dbName, ok := dbIDs[dbID]
if !ok {
return fmt.Errorf("db name not found of db ID: %d", dbID)
}
collectionName, ok := collectionIDs[collection]
if !ok {
return fmt.Errorf("collection name not found of collection ID: %d", collection)
}
realTimeSearchRate := q.getRealTimeRate(
ratelimitutil.FormatSubLabel(internalpb.RateType_DQLSearch.String(),
ratelimitutil.GetCollectionSubLabel(dbName, collectionName)))
realTimeQueryRate := q.getRealTimeRate(
ratelimitutil.FormatSubLabel(internalpb.RateType_DQLQuery.String(),
ratelimitutil.GetCollectionSubLabel(dbName, collectionName)))
q.coolOffReading(realTimeSearchRate, realTimeQueryRate, coolOffSpeed, collectionLimiter, log)
collectionProps := q.getCollectionLimitProperties(collection)
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionSearchRateMinKey),
internalpb.RateType_DQLSearch, collectionLimiter)
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionQueryRateMinKey),
internalpb.RateType_DQLQuery, collectionLimiter)
}
return nil
}
if clusterLimit {
realTimeClusterSearchRate := q.getRealTimeRate(internalpb.RateType_DQLSearch.String())
realTimeClusterQueryRate := q.getRealTimeRate(internalpb.RateType_DQLQuery.String())
realTimeClusterSearchRate := metricMap[internalpb.RateType_DQLSearch.String()]
realTimeClusterQueryRate := metricMap[internalpb.RateType_DQLQuery.String()]
q.coolOffReading(realTimeClusterSearchRate, realTimeClusterQueryRate, coolOffSpeed, q.rateLimiter.GetRootLimiters(), log)
}
var updateLimitErr error
limitDBNameSet.Range(func(name string) bool {
dbID, ok := q.dbs.Get(name)
if !ok {
log.Warn("db not found", zap.String("dbName", name))
updateLimitErr = fmt.Errorf("db not found: %s", name)
return false
}
dbLimiter := q.rateLimiter.GetDatabaseLimiters(dbID)
if dbLimiter == nil {
log.Warn("database limiter not found", zap.Int64("dbID", dbID))
updateLimitErr = fmt.Errorf("database limiter not found")
return false
if limitDBNameSet.Len() > 0 {
databaseSearchRate := make(map[string]float64)
databaseQueryRate := make(map[string]float64)
for mainLabel, labelMetric := range collectionMetricMap {
var databaseRate map[string]float64
if mainLabel == internalpb.RateType_DQLSearch.String() {
databaseRate = databaseSearchRate
} else if mainLabel == internalpb.RateType_DQLQuery.String() {
databaseRate = databaseQueryRate
} else {
continue
}
for database, databaseMetric := range labelMetric {
for _, metricValue := range databaseMetric {
databaseRate[database] += metricValue
}
}
}
realTimeSearchRate := q.getRealTimeRate(
ratelimitutil.FormatSubLabel(internalpb.RateType_DQLSearch.String(),
ratelimitutil.GetDBSubLabel(name)))
realTimeQueryRate := q.getRealTimeRate(
ratelimitutil.FormatSubLabel(internalpb.RateType_DQLQuery.String(),
ratelimitutil.GetDBSubLabel(name)))
q.coolOffReading(realTimeSearchRate, realTimeQueryRate, coolOffSpeed, dbLimiter, log)
return true
})
if updateLimitErr != nil {
return updateLimitErr
limitDBNameSet.Range(func(name string) bool {
dbID, ok := q.dbs.Get(name)
if !ok {
log.Warn("db not found", zap.String("dbName", name))
updateLimitErr = fmt.Errorf("db not found: %s", name)
return false
}
dbLimiter := q.rateLimiter.GetDatabaseLimiters(dbID)
if dbLimiter == nil {
log.Warn("database limiter not found", zap.Int64("dbID", dbID))
updateLimitErr = fmt.Errorf("database limiter not found")
return false
}
realTimeSearchRate := databaseSearchRate[name]
realTimeQueryRate := databaseQueryRate[name]
q.coolOffReading(realTimeSearchRate, realTimeQueryRate, coolOffSpeed, dbLimiter, log)
return true
})
if updateLimitErr != nil {
return updateLimitErr
}
}
limitCollectionNameSet.Range(func(name string) bool {
@ -828,6 +828,49 @@ func (q *QuotaCenter) calculateReadRates() error {
return updateLimitErr
}
safeGetCollectionRate := func(label, dbName, collectionName string) float64 {
if labelMetric, ok := collectionMetricMap[label]; ok {
if dbMetric, ok := labelMetric[dbName]; ok {
if rate, ok := dbMetric[collectionName]; ok {
return rate
}
}
}
return 0
}
coolOffCollectionID := func(collections ...int64) error {
for _, collection := range collections {
dbID, ok := q.collectionIDToDBID.Get(collection)
if !ok {
return fmt.Errorf("db ID not found of collection ID: %d", collection)
}
collectionLimiter := q.rateLimiter.GetCollectionLimiters(dbID, collection)
if collectionLimiter == nil {
return fmt.Errorf("collection limiter not found: %d", collection)
}
dbName, ok := dbIDs[dbID]
if !ok {
return fmt.Errorf("db name not found of db ID: %d", dbID)
}
collectionName, ok := collectionIDs[collection]
if !ok {
return fmt.Errorf("collection name not found of collection ID: %d", collection)
}
realTimeSearchRate := safeGetCollectionRate(internalpb.RateType_DQLSearch.String(), dbName, collectionName)
realTimeQueryRate := safeGetCollectionRate(internalpb.RateType_DQLQuery.String(), dbName, collectionName)
q.coolOffReading(realTimeSearchRate, realTimeQueryRate, coolOffSpeed, collectionLimiter, log)
collectionProps := q.getCollectionLimitProperties(collection)
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionSearchRateMinKey),
internalpb.RateType_DQLSearch, collectionLimiter)
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionQueryRateMinKey),
internalpb.RateType_DQLQuery, collectionLimiter)
}
return nil
}
if updateLimitErr = coolOffCollectionID(limitCollectionSet.Collect()...); updateLimitErr != nil {
return updateLimitErr
}

View File

@ -1546,18 +1546,18 @@ func TestCalculateReadRates(t *testing.T) {
Label: metricsinfo.ReadResultThroughput,
Rate: 40 * 1024 * 1024,
},
{
Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetDBSubLabel("default")),
Rate: 20 * 1024 * 1024,
},
//{
// Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetDBSubLabel("default")),
// Rate: 20 * 1024 * 1024,
//},
{
Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetCollectionSubLabel("default", "col1")),
Rate: 15 * 1024 * 1024,
},
{
Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetDBSubLabel("test")),
Rate: 20 * 1024 * 1024,
},
//{
// Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetDBSubLabel("test")),
// Rate: 20 * 1024 * 1024,
//},
{
Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetCollectionSubLabel("test", "col2")),
Rate: 10 * 1024 * 1024,
@ -1574,10 +1574,10 @@ func TestCalculateReadRates(t *testing.T) {
Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetDBSubLabel("default")),
Rate: 10,
},
{
Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetDBSubLabel("test")),
Rate: 10,
},
//{
// Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetDBSubLabel("test")),
// Rate: 10,
//},
{
Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetCollectionSubLabel("default", "col1")),
Rate: 10,
@ -1598,31 +1598,31 @@ func TestCalculateReadRates(t *testing.T) {
Label: metricsinfo.ReadResultThroughput,
Rate: 20 * 1024 * 1024,
},
{
Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetDBSubLabel("default")),
Rate: 20 * 1024 * 1024,
},
//{
// Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetDBSubLabel("default")),
// Rate: 20 * 1024 * 1024,
//},
{
Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetCollectionSubLabel("default", "col1")),
Rate: 15 * 1024 * 1024,
Rate: 20 * 1024 * 1024,
},
{
Label: searchLabel,
Rate: 20,
},
{
Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetDBSubLabel("default")),
Rate: 20,
},
//{
// Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetDBSubLabel("default")),
// Rate: 20,
//},
{
Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetCollectionSubLabel("default", "col1")),
Rate: 10,
Rate: 20,
},
},
},
}
quotaCenter.rateLimiter.GetRootLimiters().GetLimiters().Insert(internalpb.RateType_DQLSearch, ratelimitutil.NewLimiter(500, 500))
quotaCenter.rateLimiter.GetRootLimiters().GetLimiters().Insert(internalpb.RateType_DQLSearch, ratelimitutil.NewLimiter(1000, 1000))
quotaCenter.rateLimiter.GetOrCreateCollectionLimiters(1, 10,
newParamLimiterFunc(internalpb.RateScope_Database, allOps),
newParamLimiterFunc(internalpb.RateScope_Collection, allOps))
@ -1646,7 +1646,7 @@ func TestCalculateReadRates(t *testing.T) {
checkRate(quotaCenter.rateLimiter.GetRootLimiters(), float64(32)) // (20 + 20) * 0.8
checkRate(quotaCenter.rateLimiter.GetDatabaseLimiters(1), float64(24)) // (20 + 10) * 0.8
checkRate(quotaCenter.rateLimiter.GetDatabaseLimiters(2), float64(500)) // not cool off
checkRate(quotaCenter.rateLimiter.GetCollectionLimiters(1, 10), float64(16)) // (10 + 10) * 0.8
checkRate(quotaCenter.rateLimiter.GetCollectionLimiters(1, 10), float64(24)) // (20 + 10) * 0.8
checkRate(quotaCenter.rateLimiter.GetCollectionLimiters(2, 20), float64(500)) // not cool off
checkRate(quotaCenter.rateLimiter.GetCollectionLimiters(2, 30), float64(500)) // not cool off
}

View File

@ -140,6 +140,21 @@ func FormatSubLabel(label, subLabel string) string {
return fmt.Sprintf("%s-%s", label, subLabel)
}
func IsSubLabel(label string) bool {
return strings.Contains(label, "-")
}
func SplitCollectionSubLabel(label string) (mainLabel, database, collection string, ok bool) {
if !IsSubLabel(label) {
ok = false
return
}
subMark := strings.Index(label, "-")
mainLabel = label[:subMark]
database, collection, ok = GetCollectionFromSubLabel(mainLabel, label)
return
}
func GetDBFromSubLabel(label, fullLabel string) (string, bool) {
if !strings.HasPrefix(fullLabel, FormatSubLabel(label, GetDBSubLabel(""))) {
return "", false

View File

@ -246,4 +246,23 @@ func TestLabelUtil(t *testing.T) {
_, _, ok := GetCollectionFromSubLabel("foo", "aaa")
assert.False(t, ok)
}
{
ok := IsSubLabel(FormatSubLabel("foo", "bar"))
assert.True(t, ok)
}
{
_, _, _, ok := SplitCollectionSubLabel("foo")
assert.False(t, ok)
}
{
label := FormatSubLabel("foo", GetCollectionSubLabel("db1", "col1"))
mainLabel, db, col, ok := SplitCollectionSubLabel(label)
assert.True(t, ok)
assert.Equal(t, "foo", mainLabel)
assert.Equal(t, "db1", db)
assert.Equal(t, "col1", col)
}
}