mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
relate: https://github.com/milvus-io/milvus/issues/30927 pr: https://github.com/milvus-io/milvus/pull/30891 --------- Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
parent
8cd6718672
commit
a20cb727eb
@ -691,6 +691,14 @@ quotaAndLimits:
|
||||
# collects metrics from Proxies, Query cluster and Data cluster.
|
||||
# seconds, (0 ~ 65536)
|
||||
quotaCenterCollectInterval: 3
|
||||
limits:
|
||||
allocRetryTimes: 15 # retry times when delete alloc forward data from rate limit failed
|
||||
allocWaitInterval: 1000 # retry wait duration when delete alloc forward data rate failed, in millisecond
|
||||
complexDeleteLimitEnable: false # whether complex delete check forward data by limiter
|
||||
maxCollectionNum: 65536
|
||||
maxCollectionNumPerDB: 65536
|
||||
maxInsertSize: -1 # maximum size of a single insert request, in bytes, -1 means no limit
|
||||
maxResourceGroupNumOfQueryNode: 1024 # maximum number of resource groups of query nodes
|
||||
ddl:
|
||||
enabled: false
|
||||
collectionRate: -1 # qps, default no limit, rate for CreateCollection, DropCollection, LoadCollection, ReleaseCollection
|
||||
@ -771,11 +779,6 @@ quotaAndLimits:
|
||||
max: -1 # qps, default no limit
|
||||
partition:
|
||||
max: -1 # qps, default no limit
|
||||
limits:
|
||||
maxCollectionNum: 65536
|
||||
maxCollectionNumPerDB: 65536
|
||||
maxInsertSize: -1 # maximum size of a single insert request, in bytes, -1 means no limit
|
||||
maxResourceGroupNumOfQueryNode: 1024 # maximum number of resource groups of query nodes
|
||||
limitWriting:
|
||||
# forceDeny false means dml requests are allowed (except for some
|
||||
# specific conditions, such as memory of nodes to water marker), true means always reject all dml requests.
|
||||
|
@ -1236,7 +1236,8 @@ func (t *clusteringCompactionTask) checkBuffersAfterCompaction() error {
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) generatePkStats(ctx context.Context, segmentID int64,
|
||||
numRows int64, binlogPaths [][]string) (*datapb.FieldBinlog, error) {
|
||||
numRows int64, binlogPaths [][]string,
|
||||
) (*datapb.FieldBinlog, error) {
|
||||
stats, err := storage.NewPrimaryKeyStats(t.primaryKeyField.GetFieldID(), int64(t.primaryKeyField.GetDataType()), numRows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -178,7 +178,8 @@ func statSerializeWrite(ctx context.Context, io io.BinlogIO, allocator allocator
|
||||
}
|
||||
|
||||
func uploadStatsBlobs(ctx context.Context, collectionID, partitionID, segmentID, pkID, numRows int64,
|
||||
io io.BinlogIO, allocator allocator.Allocator, blob *storage.Blob) (*datapb.FieldBinlog, error) {
|
||||
io io.BinlogIO, allocator allocator.Allocator, blob *storage.Blob,
|
||||
) (*datapb.FieldBinlog, error) {
|
||||
logID, err := allocator.AllocOne()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -42,6 +42,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proxy/connection"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/hookutil"
|
||||
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
@ -2645,6 +2646,11 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc()
|
||||
|
||||
var limiter types.Limiter
|
||||
if node.enableComplexDeleteLimit {
|
||||
limiter, _ = node.GetRateLimiter()
|
||||
}
|
||||
|
||||
dr := &deleteRunner{
|
||||
req: request,
|
||||
idAllocator: node.rowIDAllocator,
|
||||
@ -2653,6 +2659,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
||||
chTicker: node.chTicker,
|
||||
queue: node.sched.dmQueue,
|
||||
lb: node.lbPolicy,
|
||||
limiter: limiter,
|
||||
}
|
||||
|
||||
log.Debug("init delete runner in Proxy")
|
||||
|
@ -80,7 +80,7 @@ func TestProxy_InvalidateCollectionMetaCache_remove_stream(t *testing.T) {
|
||||
func TestProxy_CheckHealth(t *testing.T) {
|
||||
t.Run("not healthy", func(t *testing.T) {
|
||||
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
|
||||
node.simpleLimiter = NewSimpleLimiter()
|
||||
node.simpleLimiter = NewSimpleLimiter(0, 0)
|
||||
node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
ctx := context.Background()
|
||||
resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
@ -98,7 +98,7 @@ func TestProxy_CheckHealth(t *testing.T) {
|
||||
dataCoord: NewDataCoordMock(),
|
||||
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
|
||||
}
|
||||
node.simpleLimiter = NewSimpleLimiter()
|
||||
node.simpleLimiter = NewSimpleLimiter(0, 0)
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
ctx := context.Background()
|
||||
resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
@ -131,7 +131,7 @@ func TestProxy_CheckHealth(t *testing.T) {
|
||||
queryCoord: qc,
|
||||
dataCoord: dataCoordMock,
|
||||
}
|
||||
node.simpleLimiter = NewSimpleLimiter()
|
||||
node.simpleLimiter = NewSimpleLimiter(0, 0)
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
ctx := context.Background()
|
||||
resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
@ -148,7 +148,7 @@ func TestProxy_CheckHealth(t *testing.T) {
|
||||
dataCoord: NewDataCoordMock(),
|
||||
queryCoord: qc,
|
||||
}
|
||||
node.simpleLimiter = NewSimpleLimiter()
|
||||
node.simpleLimiter = NewSimpleLimiter(0, 0)
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
resp, err := node.CheckHealth(context.Background(), &milvuspb.CheckHealthRequest{})
|
||||
assert.NoError(t, err)
|
||||
@ -243,7 +243,7 @@ func TestProxy_ResourceGroup(t *testing.T) {
|
||||
|
||||
node, err := NewProxy(ctx, factory)
|
||||
assert.NoError(t, err)
|
||||
node.simpleLimiter = NewSimpleLimiter()
|
||||
node.simpleLimiter = NewSimpleLimiter(0, 0)
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
|
||||
qc := mocks.NewMockQueryCoordClient(t)
|
||||
@ -335,7 +335,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
|
||||
|
||||
node, err := NewProxy(ctx, factory)
|
||||
assert.NoError(t, err)
|
||||
node.simpleLimiter = NewSimpleLimiter()
|
||||
node.simpleLimiter = NewSimpleLimiter(0, 0)
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
|
||||
qc := mocks.NewMockQueryCoordClient(t)
|
||||
@ -936,7 +936,7 @@ func TestProxyCreateDatabase(t *testing.T) {
|
||||
node.tsoAllocator = ×tampAllocator{
|
||||
tso: newMockTimestampAllocatorInterface(),
|
||||
}
|
||||
node.simpleLimiter = NewSimpleLimiter()
|
||||
node.simpleLimiter = NewSimpleLimiter(0, 0)
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
|
||||
node.sched.ddQueue.setMaxTaskNum(10)
|
||||
@ -996,7 +996,7 @@ func TestProxyDropDatabase(t *testing.T) {
|
||||
node.tsoAllocator = ×tampAllocator{
|
||||
tso: newMockTimestampAllocatorInterface(),
|
||||
}
|
||||
node.simpleLimiter = NewSimpleLimiter()
|
||||
node.simpleLimiter = NewSimpleLimiter(0, 0)
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
|
||||
node.sched.ddQueue.setMaxTaskNum(10)
|
||||
@ -1055,7 +1055,7 @@ func TestProxyListDatabase(t *testing.T) {
|
||||
node.tsoAllocator = ×tampAllocator{
|
||||
tso: newMockTimestampAllocatorInterface(),
|
||||
}
|
||||
node.simpleLimiter = NewSimpleLimiter()
|
||||
node.simpleLimiter = NewSimpleLimiter(0, 0)
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
|
||||
node.sched.ddQueue.setMaxTaskNum(10)
|
||||
@ -1111,7 +1111,7 @@ func TestProxyAlterDatabase(t *testing.T) {
|
||||
node.tsoAllocator = ×tampAllocator{
|
||||
tso: newMockTimestampAllocatorInterface(),
|
||||
}
|
||||
node.simpleLimiter = NewSimpleLimiter()
|
||||
node.simpleLimiter = NewSimpleLimiter(0, 0)
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
|
||||
node.sched.ddQueue.setMaxTaskNum(10)
|
||||
@ -1164,7 +1164,7 @@ func TestProxyDescribeDatabase(t *testing.T) {
|
||||
node.tsoAllocator = ×tampAllocator{
|
||||
tso: newMockTimestampAllocatorInterface(),
|
||||
}
|
||||
node.simpleLimiter = NewSimpleLimiter()
|
||||
node.simpleLimiter = NewSimpleLimiter(0, 0)
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
|
||||
node.sched.ddQueue.setMaxTaskNum(10)
|
||||
@ -1287,6 +1287,7 @@ func TestProxy_Delete(t *testing.T) {
|
||||
Expr: "pk in [1, 2, 3]",
|
||||
}
|
||||
cache := NewMockCache(t)
|
||||
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
|
||||
cache.On("GetCollectionID",
|
||||
mock.Anything, // context.Context
|
||||
mock.AnythingOfType("string"),
|
||||
|
@ -126,6 +126,9 @@ type Proxy struct {
|
||||
|
||||
// materialized view
|
||||
enableMaterializedView bool
|
||||
|
||||
// delete rate limiter
|
||||
enableComplexDeleteLimit bool
|
||||
}
|
||||
|
||||
// NewProxy returns a Proxy struct.
|
||||
@ -144,7 +147,7 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) {
|
||||
factory: factory,
|
||||
searchResultCh: make(chan *internalpb.SearchResults, n),
|
||||
shardMgr: mgr,
|
||||
simpleLimiter: NewSimpleLimiter(),
|
||||
simpleLimiter: NewSimpleLimiter(Params.QuotaConfig.AllocWaitInterval.GetAsDuration(time.Millisecond), Params.QuotaConfig.AllocRetryTimes.GetAsUint()),
|
||||
lbPolicy: lbPolicy,
|
||||
resourceManager: resourceManager,
|
||||
replicateStreamManager: replicateStreamManager,
|
||||
@ -284,6 +287,7 @@ func (node *Proxy) Init() error {
|
||||
node.chTicker = newChannelsTimeTicker(node.ctx, Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond)/2, []string{}, node.sched.getPChanStatistics, tsoAllocator)
|
||||
log.Debug("create channels time ticker done", zap.String("role", typeutil.ProxyRole), zap.Duration("syncTimeTickInterval", syncTimeTickInterval))
|
||||
|
||||
node.enableComplexDeleteLimit = Params.QuotaConfig.ComplexDeleteLimitEnable.GetAsBool()
|
||||
node.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
|
||||
log.Debug("create metrics cache manager done", zap.String("role", typeutil.ProxyRole))
|
||||
|
||||
|
@ -299,7 +299,7 @@ func (s *proxyTestServer) startGrpc(ctx context.Context, wg *sync.WaitGroup, p *
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
s.simpleLimiter = NewSimpleLimiter()
|
||||
s.simpleLimiter = NewSimpleLimiter(0, 0)
|
||||
|
||||
opts := tracer.GetInterceptorOpts()
|
||||
s.grpcServer = grpc.NewServer(
|
||||
|
@ -50,6 +50,10 @@ func (l *limiterMock) Check(dbID int64, collectionIDToPartIDs map[int64][]int64,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *limiterMock) Alloc(ctx context.Context, dbID int64, collectionIDToPartIDs map[int64][]int64, rt internalpb.RateType, n int) error {
|
||||
return l.Check(dbID, collectionIDToPartIDs, rt, n)
|
||||
}
|
||||
|
||||
func TestRateLimitInterceptor(t *testing.T) {
|
||||
t.Run("test getRequestInfo", func(t *testing.T) {
|
||||
mockCache := NewMockCache(t)
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -35,6 +36,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/util"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
@ -42,15 +44,26 @@ import (
|
||||
type SimpleLimiter struct {
|
||||
quotaStatesMu sync.RWMutex
|
||||
rateLimiter *rlinternal.RateLimiterTree
|
||||
|
||||
// for alloc
|
||||
allocWaitInterval time.Duration
|
||||
allocRetryTimes uint
|
||||
}
|
||||
|
||||
// NewSimpleLimiter returns a new SimpleLimiter.
|
||||
func NewSimpleLimiter() *SimpleLimiter {
|
||||
func NewSimpleLimiter(allocWaitInterval time.Duration, allocRetryTimes uint) *SimpleLimiter {
|
||||
rootRateLimiter := newClusterLimiter()
|
||||
m := &SimpleLimiter{rateLimiter: rlinternal.NewRateLimiterTree(rootRateLimiter)}
|
||||
m := &SimpleLimiter{rateLimiter: rlinternal.NewRateLimiterTree(rootRateLimiter), allocWaitInterval: allocWaitInterval, allocRetryTimes: allocRetryTimes}
|
||||
return m
|
||||
}
|
||||
|
||||
// Alloc will retry till check pass or out of times.
|
||||
func (m *SimpleLimiter) Alloc(ctx context.Context, dbID int64, collectionIDToPartIDs map[int64][]int64, rt internalpb.RateType, n int) error {
|
||||
return retry.Do(ctx, func() error {
|
||||
return m.Check(dbID, collectionIDToPartIDs, rt, n)
|
||||
}, retry.Sleep(m.allocWaitInterval), retry.Attempts(m.allocRetryTimes))
|
||||
}
|
||||
|
||||
// Check checks if request would be limited or denied.
|
||||
func (m *SimpleLimiter) Check(dbID int64, collectionIDToPartIDs map[int64][]int64, rt internalpb.RateType, n int) error {
|
||||
if !Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() {
|
||||
|
@ -41,7 +41,7 @@ func TestSimpleRateLimiter(t *testing.T) {
|
||||
bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue()
|
||||
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true")
|
||||
|
||||
simpleLimiter := NewSimpleLimiter()
|
||||
simpleLimiter := NewSimpleLimiter(0, 0)
|
||||
clusterRateLimiters := simpleLimiter.rateLimiter.GetRootLimiters()
|
||||
|
||||
simpleLimiter.rateLimiter.GetOrCreateCollectionLimiters(0, collectionID, newDatabaseLimiter,
|
||||
@ -84,7 +84,7 @@ func TestSimpleRateLimiter(t *testing.T) {
|
||||
t.Run("test global static limit", func(t *testing.T) {
|
||||
bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue()
|
||||
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true")
|
||||
simpleLimiter := NewSimpleLimiter()
|
||||
simpleLimiter := NewSimpleLimiter(0, 0)
|
||||
clusterRateLimiters := simpleLimiter.rateLimiter.GetRootLimiters()
|
||||
|
||||
collectionIDToPartIDs := map[int64][]int64{
|
||||
@ -148,7 +148,7 @@ func TestSimpleRateLimiter(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("not enable quotaAndLimit", func(t *testing.T) {
|
||||
simpleLimiter := NewSimpleLimiter()
|
||||
simpleLimiter := NewSimpleLimiter(0, 0)
|
||||
bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue()
|
||||
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "false")
|
||||
for _, rt := range internalpb.RateType_value {
|
||||
@ -162,7 +162,7 @@ func TestSimpleRateLimiter(t *testing.T) {
|
||||
run := func(insertRate float64) {
|
||||
bakInsertRate := Params.QuotaConfig.DMLMaxInsertRate.GetValue()
|
||||
paramtable.Get().Save(Params.QuotaConfig.DMLMaxInsertRate.Key, fmt.Sprintf("%f", insertRate))
|
||||
simpleLimiter := NewSimpleLimiter()
|
||||
simpleLimiter := NewSimpleLimiter(0, 0)
|
||||
bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue()
|
||||
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true")
|
||||
err := simpleLimiter.Check(0, nil, internalpb.RateType_DMLInsert, 1*1024*1024)
|
||||
@ -178,7 +178,7 @@ func TestSimpleRateLimiter(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("test set rates", func(t *testing.T) {
|
||||
simpleLimiter := NewSimpleLimiter()
|
||||
simpleLimiter := NewSimpleLimiter(0, 0)
|
||||
zeroRates := getZeroCollectionRates()
|
||||
|
||||
err := simpleLimiter.SetRates(newCollectionLimiterNode(map[int64]*proxypb.LimiterNode{
|
||||
@ -200,7 +200,7 @@ func TestSimpleRateLimiter(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("test quota states", func(t *testing.T) {
|
||||
simpleLimiter := NewSimpleLimiter()
|
||||
simpleLimiter := NewSimpleLimiter(0, 0)
|
||||
err := simpleLimiter.SetRates(newCollectionLimiterNode(map[int64]*proxypb.LimiterNode{
|
||||
1: {
|
||||
// collection limiter
|
||||
@ -269,7 +269,7 @@ func newCollectionLimiterNode(collectionLimiterNodes map[int64]*proxypb.LimiterN
|
||||
|
||||
func TestRateLimiter(t *testing.T) {
|
||||
t.Run("test limit", func(t *testing.T) {
|
||||
simpleLimiter := NewSimpleLimiter()
|
||||
simpleLimiter := NewSimpleLimiter(0, 0)
|
||||
rootLimiters := simpleLimiter.rateLimiter.GetRootLimiters()
|
||||
for _, rt := range internalpb.RateType_value {
|
||||
rootLimiters.GetLimiters().Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1))
|
||||
@ -285,7 +285,7 @@ func TestRateLimiter(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("test setRates", func(t *testing.T) {
|
||||
simpleLimiter := NewSimpleLimiter()
|
||||
simpleLimiter := NewSimpleLimiter(0, 0)
|
||||
|
||||
collectionRateLimiters := simpleLimiter.rateLimiter.GetOrCreateCollectionLimiters(0, int64(1), newDatabaseLimiter,
|
||||
func() *rlinternal.RateLimiterNode {
|
||||
@ -348,7 +348,7 @@ func TestRateLimiter(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("test get error code", func(t *testing.T) {
|
||||
simpleLimiter := NewSimpleLimiter()
|
||||
simpleLimiter := NewSimpleLimiter(0, 0)
|
||||
|
||||
collectionRateLimiters := simpleLimiter.rateLimiter.GetOrCreateCollectionLimiters(0, int64(1), newDatabaseLimiter,
|
||||
func() *rlinternal.RateLimiterNode {
|
||||
|
@ -236,9 +236,11 @@ type deleteRunner struct {
|
||||
|
||||
idAllocator allocator.Interface
|
||||
tsoAllocatorIns tsoAllocator
|
||||
limiter types.Limiter
|
||||
|
||||
// delete info
|
||||
schema *schemaInfo
|
||||
dbID UniqueID
|
||||
collectionID UniqueID
|
||||
partitionID UniqueID
|
||||
partitionKeyMode bool
|
||||
@ -264,6 +266,12 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
|
||||
return ErrWithLog(log, "Invalid collection name", err)
|
||||
}
|
||||
|
||||
db, err := globalMetaCache.GetDatabaseInfo(ctx, dr.req.GetDbName())
|
||||
if err != nil {
|
||||
return merr.WrapErrAsInputErrorWhen(err, merr.ErrDatabaseNotFound)
|
||||
}
|
||||
dr.dbID = db.dbID
|
||||
|
||||
dr.collectionID, err = globalMetaCache.GetCollectionID(ctx, dr.req.GetDbName(), collName)
|
||||
if err != nil {
|
||||
return ErrWithLog(log, "Failed to get collection id", merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound))
|
||||
@ -435,7 +443,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
|
||||
taskCh := make(chan *deleteTask, 256)
|
||||
var receiveErr error
|
||||
go func() {
|
||||
receiveErr = dr.receiveQueryResult(ctx, client, taskCh)
|
||||
receiveErr = dr.receiveQueryResult(ctx, client, taskCh, partitionIDs)
|
||||
close(taskCh)
|
||||
}()
|
||||
var allQueryCnt int64
|
||||
@ -458,7 +466,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
|
||||
}
|
||||
}
|
||||
|
||||
func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) error {
|
||||
func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask, partitionIDs []int64) error {
|
||||
for {
|
||||
result, err := client.Recv()
|
||||
if err != nil {
|
||||
@ -475,6 +483,14 @@ func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.Q
|
||||
return err
|
||||
}
|
||||
|
||||
if dr.limiter != nil {
|
||||
err := dr.limiter.Alloc(ctx, dr.dbID, map[int64][]int64{dr.collectionID: partitionIDs}, internalpb.RateType_DMLDelete, proto.Size(result.GetIds()))
|
||||
if err != nil {
|
||||
log.Warn("query stream for delete failed because rate limiter", zap.Int64("msgID", dr.msgID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
task, err := dr.produce(ctx, result.GetIds())
|
||||
if err != nil {
|
||||
log.Warn("produce delete task failed", zap.Error(err))
|
||||
|
@ -118,6 +118,7 @@ func TestDeleteTask_GetChannels(t *testing.T) {
|
||||
mock.AnythingOfType("string"),
|
||||
mock.AnythingOfType("string"),
|
||||
).Return(collectionID, nil)
|
||||
|
||||
globalMetaCache = cache
|
||||
chMgr := NewMockChannelsMgr(t)
|
||||
chMgr.EXPECT().getChannels(mock.Anything).Return(channels, nil)
|
||||
@ -265,6 +266,19 @@ func TestDeleteRunner_Init(t *testing.T) {
|
||||
assert.Error(t, dr.Init(context.Background()))
|
||||
})
|
||||
|
||||
t.Run("fail to get database info", func(t *testing.T) {
|
||||
dr := deleteRunner{
|
||||
req: &milvuspb.DeleteRequest{
|
||||
CollectionName: collectionName,
|
||||
},
|
||||
}
|
||||
cache := NewMockCache(t)
|
||||
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error"))
|
||||
globalMetaCache = cache
|
||||
|
||||
assert.Error(t, dr.Init(context.Background()))
|
||||
})
|
||||
|
||||
t.Run("fail to get collection id", func(t *testing.T) {
|
||||
dr := deleteRunner{
|
||||
req: &milvuspb.DeleteRequest{
|
||||
@ -272,11 +286,13 @@ func TestDeleteRunner_Init(t *testing.T) {
|
||||
},
|
||||
}
|
||||
cache := NewMockCache(t)
|
||||
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
|
||||
cache.On("GetCollectionID",
|
||||
mock.Anything, // context.Context
|
||||
mock.AnythingOfType("string"),
|
||||
mock.AnythingOfType("string"),
|
||||
).Return(int64(0), errors.New("mock GetCollectionID err"))
|
||||
|
||||
globalMetaCache = cache
|
||||
assert.Error(t, dr.Init(context.Background()))
|
||||
})
|
||||
@ -287,6 +303,7 @@ func TestDeleteRunner_Init(t *testing.T) {
|
||||
DbName: dbName,
|
||||
}}
|
||||
cache := NewMockCache(t)
|
||||
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
|
||||
cache.On("GetCollectionID",
|
||||
mock.Anything, // context.Context
|
||||
mock.AnythingOfType("string"),
|
||||
@ -309,6 +326,7 @@ func TestDeleteRunner_Init(t *testing.T) {
|
||||
PartitionName: partitionName,
|
||||
}}
|
||||
cache := NewMockCache(t)
|
||||
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
|
||||
cache.On("GetCollectionID",
|
||||
mock.Anything, // context.Context
|
||||
mock.AnythingOfType("string"),
|
||||
@ -347,6 +365,7 @@ func TestDeleteRunner_Init(t *testing.T) {
|
||||
},
|
||||
}
|
||||
cache := NewMockCache(t)
|
||||
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
|
||||
cache.On("GetCollectionID",
|
||||
mock.Anything, // context.Context
|
||||
mock.AnythingOfType("string"),
|
||||
@ -372,6 +391,7 @@ func TestDeleteRunner_Init(t *testing.T) {
|
||||
},
|
||||
}
|
||||
cache := NewMockCache(t)
|
||||
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
|
||||
cache.On("GetCollectionID",
|
||||
mock.Anything, // context.Context
|
||||
mock.AnythingOfType("string"),
|
||||
@ -405,6 +425,7 @@ func TestDeleteRunner_Init(t *testing.T) {
|
||||
chMgr: chMgr,
|
||||
}
|
||||
cache := NewMockCache(t)
|
||||
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
|
||||
cache.On("GetCollectionID",
|
||||
mock.Anything, // context.Context
|
||||
mock.AnythingOfType("string"),
|
||||
@ -656,6 +677,65 @@ func TestDeleteRunner_Run(t *testing.T) {
|
||||
assert.Error(t, dr.Run(ctx))
|
||||
})
|
||||
|
||||
t.Run("complex delete rate limit check failed", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
mockMgr := NewMockChannelsMgr(t)
|
||||
qn := mocks.NewMockQueryNodeClient(t)
|
||||
lb := NewMockLBPolicy(t)
|
||||
|
||||
dr := deleteRunner{
|
||||
chMgr: mockMgr,
|
||||
queue: queue.dmQueue,
|
||||
schema: schema,
|
||||
collectionID: collectionID,
|
||||
partitionID: partitionID,
|
||||
vChannels: channels,
|
||||
idAllocator: idAllocator,
|
||||
tsoAllocatorIns: tsoAllocator,
|
||||
lb: lb,
|
||||
limiter: &limiterMock{},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
IDs: &schemapb.IDs{
|
||||
IdField: nil,
|
||||
},
|
||||
},
|
||||
req: &milvuspb.DeleteRequest{
|
||||
CollectionName: collectionName,
|
||||
PartitionName: partitionName,
|
||||
DbName: dbName,
|
||||
Expr: "pk < 3",
|
||||
},
|
||||
}
|
||||
lb.EXPECT().Execute(mock.Anything, mock.Anything).Call.Return(func(ctx context.Context, workload CollectionWorkLoad) error {
|
||||
return workload.exec(ctx, 1, qn, "")
|
||||
})
|
||||
|
||||
qn.EXPECT().QueryStream(mock.Anything, mock.Anything).Call.Return(
|
||||
func(ctx context.Context, in *querypb.QueryRequest, opts ...grpc.CallOption) querypb.QueryNode_QueryStreamClient {
|
||||
client := streamrpc.NewLocalQueryClient(ctx)
|
||||
server := client.CreateServer()
|
||||
|
||||
server.Send(&internalpb.RetrieveResults{
|
||||
Status: merr.Success(),
|
||||
Ids: &schemapb.IDs{
|
||||
IdField: &schemapb.IDs_IntId{
|
||||
IntId: &schemapb.LongArray{
|
||||
Data: []int64{0, 1, 2},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
server.FinishSend(nil)
|
||||
return client
|
||||
}, nil)
|
||||
|
||||
assert.Error(t, dr.Run(ctx))
|
||||
assert.Equal(t, int64(0), dr.result.DeleteCnt)
|
||||
})
|
||||
|
||||
t.Run("complex delete produce failed", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
@ -38,6 +38,7 @@ import (
|
||||
// Otherwise, the request will pass. Limit also returns limit of limiter.
|
||||
type Limiter interface {
|
||||
Check(dbID int64, collectionIDToPartIDs map[int64][]int64, rt internalpb.RateType, n int) error
|
||||
Alloc(ctx context.Context, dbID int64, collectionIDToPartIDs map[int64][]int64, rt internalpb.RateType, n int) error
|
||||
}
|
||||
|
||||
// Component is the interface all services implement
|
||||
|
@ -45,6 +45,9 @@ const (
|
||||
type quotaConfig struct {
|
||||
QuotaAndLimitsEnabled ParamItem `refreshable:"false"`
|
||||
QuotaCenterCollectInterval ParamItem `refreshable:"false"`
|
||||
AllocRetryTimes ParamItem `refreshable:"false"`
|
||||
AllocWaitInterval ParamItem `refreshable:"false"`
|
||||
ComplexDeleteLimitEnable ParamItem `refreshable:"false"`
|
||||
|
||||
// ddl
|
||||
DDLLimitEnabled ParamItem `refreshable:"true"`
|
||||
@ -2046,6 +2049,33 @@ MB/s, default no limit`,
|
||||
Export: true,
|
||||
}
|
||||
p.CoolOffSpeed.Init(base.mgr)
|
||||
|
||||
p.AllocRetryTimes = ParamItem{
|
||||
Key: "quotaAndLimits.limits.allocRetryTimes",
|
||||
Version: "2.4.0",
|
||||
DefaultValue: "15",
|
||||
Doc: `retry times when delete alloc forward data from rate limit failed`,
|
||||
Export: true,
|
||||
}
|
||||
p.AllocRetryTimes.Init(base.mgr)
|
||||
|
||||
p.AllocWaitInterval = ParamItem{
|
||||
Key: "quotaAndLimits.limits.allocWaitInterval",
|
||||
Version: "2.4.0",
|
||||
DefaultValue: "1000",
|
||||
Doc: `retry wait duration when delete alloc forward data rate failed, in millisecond`,
|
||||
Export: true,
|
||||
}
|
||||
p.AllocWaitInterval.Init(base.mgr)
|
||||
|
||||
p.ComplexDeleteLimitEnable = ParamItem{
|
||||
Key: "quotaAndLimits.limits.complexDeleteLimitEnable",
|
||||
Version: "2.4.0",
|
||||
DefaultValue: "false",
|
||||
Doc: `whether complex delete check forward data by limiter`,
|
||||
Export: true,
|
||||
}
|
||||
p.ComplexDeleteLimitEnable.Init(base.mgr)
|
||||
}
|
||||
|
||||
func megaBytes2Bytes(f float64) float64 {
|
||||
|
Loading…
Reference in New Issue
Block a user