From ed8836cd15fc17951d4f060be2f0a058f692d314 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 26 Apr 2023 21:52:36 +0800 Subject: [PATCH] Add disk quota at the collection level (#23704) Signed-off-by: bigsheeper --- configs/milvus.yaml | 1 + internal/datacoord/meta.go | 19 ++--- internal/datacoord/meta_test.go | 12 ++-- internal/datacoord/metrics_info.go | 4 +- internal/rootcoord/quota_center.go | 81 +++++++++++++++------ internal/rootcoord/quota_center_test.go | 96 +++++++++++++++++-------- pkg/util/metricsinfo/quota_metric.go | 3 +- pkg/util/paramtable/quota_param.go | 22 ++++++ pkg/util/paramtable/quota_param_test.go | 1 + 9 files changed, 170 insertions(+), 69 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index b4bdca79be..2bb133b495 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -487,6 +487,7 @@ quotaAndLimits: diskProtection: enabled: true # When the total file size of object storage is greater than `diskQuota`, all dml requests would be rejected; diskQuota: -1 # MB, (0, +inf), default no limit + diskQuotaPerCollection: -1 # MB, (0, +inf), default no limit limitReading: # forceDeny false means dql requests are allowed (except for some # specific conditions, such as collection has been dropped), true means always reject all dql requests. diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index de83d26bb9..4f906ca859 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -252,23 +252,26 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 { return ret } -// GetTotalBinlogSize returns the total size (bytes) of healthy segments in cluster. -func (m *meta) GetTotalBinlogSize() int64 { +// GetCollectionBinlogSize returns the total binlog size and binlog size of collections. +func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64) { m.RLock() defer m.RUnlock() - var totalHealthySize int64 + collectionBinlogSize := make(map[UniqueID]int64) + sizesOfStates := make(map[commonpb.SegmentState]int64) segments := m.segments.GetSegments() - sizes := make(map[commonpb.SegmentState]int64) + var total int64 for _, segment := range segments { + segmentSize := segment.getSegmentSize() if isSegmentHealthy(segment) { - totalHealthySize += segment.getSegmentSize() + total += segmentSize + collectionBinlogSize[segment.GetCollectionID()] += segmentSize } - sizes[segment.GetState()] += segment.getSegmentSize() + sizesOfStates[segment.GetState()] += segmentSize } - for state, size := range sizes { + for state, size := range sizesOfStates { metrics.DataCoordStoredBinlogSize.WithLabelValues(state.String()).Set(float64(size)) } - return totalHealthySize + return total, collectionBinlogSize } // AddSegment records segment info, persisting info into kv store diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index c0ae45c831..f5c072ff49 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -367,14 +367,10 @@ func TestMeta_Basic(t *testing.T) { equalCollectionInfo(t, collInfo, ret) }) - t.Run("Test GetTotalBinlogSize", func(t *testing.T) { + t.Run("Test GetCollectionBinlogSize", func(t *testing.T) { const size0 = 1024 const size1 = 2048 - // no binlog - size := meta.GetTotalBinlogSize() - assert.EqualValues(t, 0, size) - // add seg0 with size0 segID0, err := mockAllocator.allocID(ctx) assert.Nil(t, err) @@ -392,8 +388,10 @@ func TestMeta_Basic(t *testing.T) { assert.Nil(t, err) // check TotalBinlogSize - size = meta.GetTotalBinlogSize() - assert.Equal(t, int64(size0+size1), size) + total, collectionBinlogSize := meta.GetCollectionBinlogSize() + assert.Len(t, collectionBinlogSize, 1) + assert.Equal(t, int64(size0+size1), collectionBinlogSize[collID]) + assert.Equal(t, int64(size0+size1), total) }) } diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index fa59a45e20..5f0cdd6d05 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -37,8 +37,10 @@ import ( // getQuotaMetrics returns DataCoordQuotaMetrics. func (s *Server) getQuotaMetrics() *metricsinfo.DataCoordQuotaMetrics { + total, colSizes := s.meta.GetCollectionBinlogSize() return &metricsinfo.DataCoordQuotaMetrics{ - TotalBinlogSize: s.meta.GetTotalBinlogSize(), + TotalBinlogSize: total, + CollectionBinlogSize: colSizes, } } diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 49356078e3..c8619e6e51 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -19,6 +19,7 @@ package rootcoord import ( "context" "fmt" + "math" "strconv" "sync" "time" @@ -88,10 +89,13 @@ type QuotaCenter struct { dataCoord types.DataCoord // metrics - queryNodeMetrics map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics - dataNodeMetrics map[UniqueID]*metricsinfo.DataNodeQuotaMetrics - proxyMetrics map[UniqueID]*metricsinfo.ProxyQuotaMetrics - dataCoordMetrics *metricsinfo.DataCoordQuotaMetrics + queryNodeMetrics map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics + dataNodeMetrics map[UniqueID]*metricsinfo.DataNodeQuotaMetrics + proxyMetrics map[UniqueID]*metricsinfo.ProxyQuotaMetrics + diskMu sync.Mutex // guards dataCoordMetrics and totalBinlogSize + dataCoordMetrics *metricsinfo.DataCoordQuotaMetrics + totalBinlogSize int64 + readableCollections []int64 writableCollections []int64 @@ -226,9 +230,11 @@ func (q *QuotaCenter) syncMetrics() error { } } q.writableCollections = collections.Collect() + q.diskMu.Lock() if dataCoordTopology.Cluster.Self.QuotaMetrics != nil { q.dataCoordMetrics = dataCoordTopology.Cluster.Self.QuotaMetrics } + q.diskMu.Unlock() return nil }) // get Proxies metrics @@ -414,11 +420,7 @@ func (q *QuotaCenter) calculateWriteRates() error { return nil } - exceeded := q.ifDiskQuotaExceeded() - if exceeded { - q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted) // disk quota protection - return nil - } + q.checkDiskQuota() collectionFactor := map[int64]float64{} for _, collection := range q.writableCollections { @@ -594,25 +596,39 @@ func (q *QuotaCenter) resetCurrentRate(rt internalpb.RateType, collection int64) } } -// ifDiskQuotaExceeded checks if disk quota exceeded. -func (q *QuotaCenter) ifDiskQuotaExceeded() bool { +// checkDiskQuota checks if disk quota exceeded. +func (q *QuotaCenter) checkDiskQuota() { + q.diskMu.Lock() + defer q.diskMu.Unlock() if !Params.QuotaConfig.DiskProtectionEnabled.GetAsBool() { - return false + return } if q.dataCoordMetrics == nil { - return false + return } - diskQuota := Params.QuotaConfig.DiskQuota.GetAsFloat() - totalSize := q.dataCoordMetrics.TotalBinlogSize - if float64(totalSize) >= diskQuota { - log.Warn("QuotaCenter: disk quota exceeded", - zap.Int64("curDiskUsage", totalSize), - zap.Float64("diskQuota", diskQuota)) - log.Info("DataCoordMetric", - zap.Any("metric", q.dataCoordMetrics)) - return true + collections := typeutil.NewUniqueSet() + totalDiskQuota := Params.QuotaConfig.DiskQuota.GetAsFloat() + colDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat() + for collection, binlogSize := range q.dataCoordMetrics.CollectionBinlogSize { + if float64(binlogSize) >= colDiskQuota { + log.Warn("collection disk quota exceeded", + zap.Int64("collection", collection), + zap.Int64("coll disk usage", binlogSize), + zap.Float64("coll disk quota", colDiskQuota)) + collections.Insert(collection) + } } - return false + if collections.Len() > 0 { + q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, collections.Collect()...) + } + total := q.dataCoordMetrics.TotalBinlogSize + if float64(total) >= totalDiskQuota { + log.Warn("total disk quota exceeded", + zap.Int64("total disk usage", total), + zap.Float64("total disk quota", totalDiskQuota)) + q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted) + } + q.totalBinlogSize = total } // setRates notifies Proxies to set rates for different rate types. @@ -680,3 +696,22 @@ func (q *QuotaCenter) recordMetrics() { record(commonpb.ErrorCode_DiskQuotaExhausted) record(commonpb.ErrorCode_TimeTickLongDelay) } + +func (q *QuotaCenter) diskAllowance(collection UniqueID) float64 { + q.diskMu.Lock() + q.diskMu.Unlock() + if !Params.QuotaConfig.DiskProtectionEnabled.GetAsBool() { + return math.MaxInt64 + } + if q.dataCoordMetrics == nil { + return math.MaxInt64 + } + totalDiskQuota := Params.QuotaConfig.DiskQuota.GetAsFloat() + colDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat() + allowance := float64(math.MaxInt64) + if binlogSize, ok := q.dataCoordMetrics.CollectionBinlogSize[collection]; ok { + allowance = math.Min(allowance, colDiskQuota-float64(binlogSize)) + } + allowance = math.Min(allowance, totalDiskQuota-float64(q.totalBinlogSize)) + return allowance +} diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index 0f88743d39..32209c5f88 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -19,6 +19,7 @@ package rootcoord import ( "context" "fmt" + "math" "testing" "time" @@ -272,20 +273,6 @@ func TestQuotaCenter(t *testing.T) { err = quotaCenter.calculateWriteRates() assert.NoError(t, err) - // DiskQuota exceeded - quotaBackup := Params.QuotaConfig.DiskQuota - paramtable.Get().Save(Params.QuotaConfig.DiskQuota.Key, "99") - quotaCenter.dataCoordMetrics = &metricsinfo.DataCoordQuotaMetrics{TotalBinlogSize: 100 * 1024 * 1024} - quotaCenter.writableCollections = []int64{1, 2, 3} - quotaCenter.resetAllCurrentRates() - err = quotaCenter.calculateWriteRates() - assert.NoError(t, err) - for _, collection := range quotaCenter.writableCollections { - assert.Equal(t, Limit(0), quotaCenter.currentRates[collection][internalpb.RateType_DMLInsert]) - assert.Equal(t, Limit(0), quotaCenter.currentRates[collection][internalpb.RateType_DMLDelete]) - } - Params.QuotaConfig.DiskQuota = quotaBackup - // force deny forceBak := Params.QuotaConfig.ForceDenyWriting paramtable.Get().Save(Params.QuotaConfig.ForceDenyWriting.Key, "true") @@ -350,26 +337,41 @@ func TestQuotaCenter(t *testing.T) { paramtable.Get().Reset(Params.QuotaConfig.QueryNodeMemoryHighWaterLevel.Key) }) - t.Run("test ifDiskQuotaExceeded", func(t *testing.T) { + t.Run("test checkDiskQuota", func(t *testing.T) { qc := types.NewMockQueryCoord(t) quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + quotaCenter.checkDiskQuota() - paramtable.Get().Save(Params.QuotaConfig.DiskProtectionEnabled.Key, "false") - ok := quotaCenter.ifDiskQuotaExceeded() - assert.False(t, ok) - paramtable.Get().Save(Params.QuotaConfig.DiskProtectionEnabled.Key, "true") - + // total DiskQuota exceeded quotaBackup := Params.QuotaConfig.DiskQuota - paramtable.Get().Save(Params.QuotaConfig.DiskQuota.Key, fmt.Sprintf("%f", 99.0/1024/1024)) - quotaCenter.dataCoordMetrics = &metricsinfo.DataCoordQuotaMetrics{TotalBinlogSize: 100} - ok = quotaCenter.ifDiskQuotaExceeded() - assert.True(t, ok) + paramtable.Get().Save(Params.QuotaConfig.DiskQuota.Key, "99") + quotaCenter.dataCoordMetrics = &metricsinfo.DataCoordQuotaMetrics{ + TotalBinlogSize: 200 * 1024 * 1024, + CollectionBinlogSize: map[int64]int64{1: 100 * 1024 * 1024}} + quotaCenter.writableCollections = []int64{1, 2, 3} + quotaCenter.resetAllCurrentRates() + quotaCenter.checkDiskQuota() + for _, collection := range quotaCenter.writableCollections { + assert.Equal(t, Limit(0), quotaCenter.currentRates[collection][internalpb.RateType_DMLInsert]) + assert.Equal(t, Limit(0), quotaCenter.currentRates[collection][internalpb.RateType_DMLDelete]) + } + paramtable.Get().Save(Params.QuotaConfig.DiskQuota.Key, quotaBackup.GetValue()) - paramtable.Get().Save(Params.QuotaConfig.DiskQuota.Key, fmt.Sprintf("%f", 101.0/1024/1024)) - quotaCenter.dataCoordMetrics = &metricsinfo.DataCoordQuotaMetrics{TotalBinlogSize: 100} - ok = quotaCenter.ifDiskQuotaExceeded() - assert.False(t, ok) - Params.QuotaConfig.DiskQuota = quotaBackup + // collection DiskQuota exceeded + colQuotaBackup := Params.QuotaConfig.DiskQuotaPerCollection + paramtable.Get().Save(Params.QuotaConfig.DiskQuotaPerCollection.Key, "30") + quotaCenter.dataCoordMetrics = &metricsinfo.DataCoordQuotaMetrics{CollectionBinlogSize: map[int64]int64{ + 1: 20 * 1024 * 1024, 2: 30 * 1024 * 1024, 3: 60 * 1024 * 1024}} + quotaCenter.writableCollections = []int64{1, 2, 3} + quotaCenter.resetAllCurrentRates() + quotaCenter.checkDiskQuota() + assert.NotEqual(t, Limit(0), quotaCenter.currentRates[1][internalpb.RateType_DMLInsert]) + assert.NotEqual(t, Limit(0), quotaCenter.currentRates[1][internalpb.RateType_DMLDelete]) + assert.Equal(t, Limit(0), quotaCenter.currentRates[2][internalpb.RateType_DMLInsert]) + assert.Equal(t, Limit(0), quotaCenter.currentRates[2][internalpb.RateType_DMLDelete]) + assert.Equal(t, Limit(0), quotaCenter.currentRates[3][internalpb.RateType_DMLInsert]) + assert.Equal(t, Limit(0), quotaCenter.currentRates[3][internalpb.RateType_DMLDelete]) + paramtable.Get().Save(Params.QuotaConfig.DiskQuotaPerCollection.Key, colQuotaBackup.GetValue()) }) t.Run("test setRates", func(t *testing.T) { @@ -412,4 +414,40 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.guaranteeMinRate(float64(minRate), internalpb.RateType_DQLSearch, 1) assert.Equal(t, minRate, quotaCenter.currentRates[collectionID][internalpb.RateType_DQLSearch]) }) + + t.Run("test diskAllowance", func(t *testing.T) { + tests := []struct { + name string + totalDiskQuota string + collDiskQuota string + totalDiskUsage int64 // in MB + collDiskUsage int64 // in MB + expectAllowance int64 // in bytes + }{ + {"test max", "-1", "-1", 100, 100, math.MaxInt64}, + {"test total quota exceeded", "100", "-1", 100, 100, 0}, + {"test coll quota exceeded", "-1", "20", 100, 20, 0}, + {"test not exceeded", "100", "20", 80, 10, 10 * 1024 * 1024}, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + collection := UniqueID(0) + quotaCenter := NewQuotaCenter(pcm, nil, &dataCoordMockForQuota{}, core.tsoAllocator) + quotaCenter.resetAllCurrentRates() + quotaBackup := Params.QuotaConfig.DiskQuota + colQuotaBackup := Params.QuotaConfig.DiskQuotaPerCollection + paramtable.Get().Save(Params.QuotaConfig.DiskQuota.Key, test.totalDiskQuota) + paramtable.Get().Save(Params.QuotaConfig.DiskQuotaPerCollection.Key, test.collDiskQuota) + quotaCenter.diskMu.Lock() + quotaCenter.dataCoordMetrics = &metricsinfo.DataCoordQuotaMetrics{} + quotaCenter.dataCoordMetrics.CollectionBinlogSize = map[int64]int64{collection: test.collDiskUsage * 1024 * 1024} + quotaCenter.totalBinlogSize = test.totalDiskUsage * 1024 * 1024 + quotaCenter.diskMu.Unlock() + allowance := quotaCenter.diskAllowance(collection) + assert.Equal(t, float64(test.expectAllowance), allowance) + paramtable.Get().Save(Params.QuotaConfig.DiskQuota.Key, quotaBackup.GetValue()) + paramtable.Get().Save(Params.QuotaConfig.DiskQuotaPerCollection.Key, colQuotaBackup.GetValue()) + }) + } + }) } diff --git a/pkg/util/metricsinfo/quota_metric.go b/pkg/util/metricsinfo/quota_metric.go index 3673a9fa3c..48ba8fdc33 100644 --- a/pkg/util/metricsinfo/quota_metric.go +++ b/pkg/util/metricsinfo/quota_metric.go @@ -84,7 +84,8 @@ type QueryNodeQuotaMetrics struct { } type DataCoordQuotaMetrics struct { - TotalBinlogSize int64 + TotalBinlogSize int64 + CollectionBinlogSize map[int64]int64 } // DataNodeQuotaMetrics are metrics of DataNode. diff --git a/pkg/util/paramtable/quota_param.go b/pkg/util/paramtable/quota_param.go index c219e016eb..3ac103a170 100644 --- a/pkg/util/paramtable/quota_param.go +++ b/pkg/util/paramtable/quota_param.go @@ -90,6 +90,7 @@ type quotaConfig struct { QueryNodeMemoryHighWaterLevel ParamItem `refreshable:"true"` DiskProtectionEnabled ParamItem `refreshable:"true"` DiskQuota ParamItem `refreshable:"true"` + DiskQuotaPerCollection ParamItem `refreshable:"true"` // limit reading ForceDenyReading ParamItem `refreshable:"true"` @@ -678,6 +679,27 @@ When memory usage < memoryLowWaterLevel, no action.`, } p.DiskQuota.Init(base.mgr) + p.DiskQuotaPerCollection = ParamItem{ + Key: "quotaAndLimits.limitWriting.diskProtection.diskQuotaPerCollection", + Version: "2.2.8", + DefaultValue: quota, + Formatter: func(v string) string { + if !p.DiskProtectionEnabled.GetAsBool() { + return max + } + level := getAsFloat(v) + // (0, +inf) + if level <= 0 { + level = getAsFloat(quota) + } + // megabytes to bytes + return fmt.Sprintf("%f", megaBytes2Bytes(level)) + }, + Doc: "MB, (0, +inf), default no limit", + Export: true, + } + p.DiskQuotaPerCollection.Init(base.mgr) + // limit reading p.ForceDenyReading = ParamItem{ Key: "quotaAndLimits.limitReading.forceDeny", diff --git a/pkg/util/paramtable/quota_param_test.go b/pkg/util/paramtable/quota_param_test.go index a02bc5396c..2957642ed1 100644 --- a/pkg/util/paramtable/quota_param_test.go +++ b/pkg/util/paramtable/quota_param_test.go @@ -79,6 +79,7 @@ func TestQuotaParam(t *testing.T) { assert.Equal(t, defaultHighWaterLevel, qc.QueryNodeMemoryHighWaterLevel.GetAsFloat()) assert.Equal(t, true, qc.DiskProtectionEnabled.GetAsBool()) assert.Equal(t, defaultMax, qc.DiskQuota.GetAsFloat()) + assert.Equal(t, defaultMax, qc.DiskQuotaPerCollection.GetAsFloat()) }) t.Run("test limit reading", func(t *testing.T) {