Support 64M Rpc Limit (#20845)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
Xiaofan 2022-12-07 10:07:18 +08:00 committed by GitHub
parent f8cff79804
commit e131915207
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 139 additions and 49 deletions

View File

@ -166,6 +166,11 @@ proxy:
accessLog:
localPath: /tmp/accesslog
filename: milvus_access_log.log
grpc:
serverMaxRecvSize: 67108864 # 64M
serverMaxSendSize: 67108864 # 64M
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
# Related configuration of queryCoord, used to manage topology and load balancing for the query nodes, and handoff from growing segments to sealed segments.
@ -321,8 +326,8 @@ grpc:
log:
level: WARNING
serverMaxRecvSize: 2147483647 # math.MaxInt32
serverMaxSendSize: 2147483647 # math.MaxInt32
serverMaxRecvSize: 536870912 # 512MB
serverMaxSendSize: 536870912 # 512MB
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024

View File

@ -71,7 +71,7 @@ PayloadWriter::finish() {
AssertInfo(output_ == nullptr, "payload writer has been finished");
std::shared_ptr<arrow::Array> array;
auto ast = builder_->Finish(&array);
AssertInfo(ast.ok(), "builder failed to finish");
AssertInfo(ast.ok(), ast.ToString());
auto table = arrow::Table::Make(schema_, {array});
output_ = std::make_shared<storage::PayloadOutputStream>();
@ -79,7 +79,7 @@ PayloadWriter::finish() {
ast = parquet::arrow::WriteTable(
*table, mem_pool, output_, 1024 * 1024 * 1024,
parquet::WriterProperties::Builder().compression(arrow::Compression::ZSTD)->compression_level(3)->build());
AssertInfo(ast.ok(), "write data to output stream failed");
AssertInfo(ast.ok(), ast.ToString());
}
bool

View File

@ -39,7 +39,7 @@ func TestIndexCoordinateServer(t *testing.T) {
server, err := NewServer(ctx, factory)
assert.NoError(t, err)
assert.NotNil(t, server)
Params.Init()
Params.InitOnce(typeutil.IndexCoordRole)
indexCoordClient := indexcoord.NewIndexCoordMock()
err = server.SetClient(indexCoordClient)

View File

@ -0,0 +1,75 @@
package proxy
import (
"context"
"fmt"
"os"
"strings"
"sync"
"testing"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
grpcproxyclient "github.com/milvus-io/milvus/internal/distributed/proxy/client"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
)
func TestProxyRpcLimit(t *testing.T) {
var err error
var wg sync.WaitGroup
path := "/tmp/milvus/rocksmq" + funcutil.GenRandomStr()
t.Setenv("ROCKSMQ_PATH", path)
defer os.RemoveAll(path)
ctx := GetContext(context.Background(), "root:123456")
localMsg := true
factory := dependency.NewDefaultFactory(localMsg)
var p paramtable.GrpcServerConfig
assert.NoError(t, err)
p.InitOnce(typeutil.ProxyRole)
p.Save("proxy.grpc.serverMaxRecvSize", "1")
p.InitServerMaxRecvSize()
assert.Equal(t, p.ServerMaxRecvSize, 1)
log.Info("Initialize parameter table of Proxy")
proxy, err := NewProxy(ctx, factory)
assert.NoError(t, err)
assert.NotNil(t, proxy)
testServer := newProxyTestServer(proxy)
testServer.Proxy.SetAddress(p.GetAddress())
wg.Add(1)
go testServer.startGrpc(ctx, &wg, &p)
assert.NoError(t, testServer.waitForGrpcReady())
defer testServer.grpcServer.Stop()
client, err := grpcproxyclient.NewClient(ctx, "localhost:"+fmt.Sprint(p.Port))
assert.NoError(t, err)
proxy.stateCode.Store(commonpb.StateCode_Healthy)
rates := make([]*internalpb.Rate, 0)
req := &proxypb.SetRatesRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgID(int64(0)),
commonpbutil.WithTimeStamp(0),
),
Rates: rates,
}
_, err = client.SetRates(ctx, req)
// should be limited because of the rpc limit
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "ResourceExhausted"))
p.Remove("proxy.grpc.serverMaxRecvSize")
p.Init(typeutil.ProxyRole)
}

View File

@ -333,13 +333,9 @@ func (s *proxyTestServer) GetStatisticsChannel(ctx context.Context, request *int
return s.Proxy.GetStatisticsChannel(ctx)
}
func (s *proxyTestServer) startGrpc(ctx context.Context, wg *sync.WaitGroup) {
func (s *proxyTestServer) startGrpc(ctx context.Context, wg *sync.WaitGroup, p *paramtable.GrpcServerConfig) {
defer wg.Done()
var p paramtable.GrpcServerConfig
p.InitOnce(typeutil.ProxyRole)
s.Proxy.SetAddress(p.GetAddress())
var kaep = keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
@ -517,7 +513,12 @@ func TestProxy(t *testing.T) {
testServer := newProxyTestServer(proxy)
wg.Add(1)
go testServer.startGrpc(ctx, &wg)
var p paramtable.GrpcServerConfig
p.InitOnce(typeutil.ProxyRole)
testServer.Proxy.SetAddress(p.GetAddress())
go testServer.startGrpc(ctx, &wg, &p)
assert.NoError(t, testServer.waitForGrpcReady())
rootCoordClient, err := rcc.NewClient(ctx, Params.EtcdCfg.MetaRootPath.GetValue(), etcdcli)

View File

@ -13,7 +13,6 @@ package paramtable
import (
"fmt"
"math"
"strconv"
"sync"
"time"
@ -25,10 +24,10 @@ import (
const (
// DefaultServerMaxSendSize defines the maximum size of data per grpc request can send by server side.
DefaultServerMaxSendSize = math.MaxInt32
DefaultServerMaxSendSize = 512 * 1024 * 1024
// DefaultServerMaxRecvSize defines the maximum size of data per grpc request can receive by server side.
DefaultServerMaxRecvSize = math.MaxInt32
DefaultServerMaxRecvSize = 512 * 1024 * 1024
// DefaultClientMaxSendSize defines the maximum size of data per grpc request can send by client side.
DefaultClientMaxSendSize = 100 * 1024 * 1024
@ -122,60 +121,55 @@ type GrpcServerConfig struct {
// InitOnce initialize grpc server config once
func (p *GrpcServerConfig) InitOnce(domain string) {
p.once.Do(func() {
p.init(domain)
p.Init(domain)
})
}
func (p *GrpcServerConfig) init(domain string) {
func (p *GrpcServerConfig) Init(domain string) {
p.grpcConfig.init(domain)
p.initServerMaxSendSize()
p.initServerMaxRecvSize()
p.InitServerMaxSendSize()
p.InitServerMaxRecvSize()
}
func (p *GrpcServerConfig) initServerMaxSendSize() {
func (p *GrpcServerConfig) InitServerMaxSendSize() {
var err error
valueStr, err := p.Load("grpc.serverMaxSendSize")
if err != nil {
valueStr, err = p.Load(p.Domain + ".grpc.serverMaxSendSize")
}
valueStr, err := p.LoadWithPriority([]string{p.Domain + ".grpc.serverMaxSendSize", "grpc.serverMaxSendSize"})
if err != nil {
p.ServerMaxSendSize = DefaultServerMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil {
log.Warn("Failed to parse grpc.serverMaxSendSize, set to default",
zap.String("role", p.Domain), zap.String("grpc.serverMaxSendSize", valueStr),
zap.Error(err))
p.ServerMaxSendSize = DefaultServerMaxSendSize
} else {
value, err := strconv.Atoi(valueStr)
if err != nil {
log.Warn("Failed to parse grpc.serverMaxSendSize, set to default",
zap.String("role", p.Domain), zap.String("grpc.serverMaxSendSize", valueStr),
zap.Error(err))
p.ServerMaxSendSize = DefaultServerMaxSendSize
} else {
p.ServerMaxSendSize = value
}
p.ServerMaxSendSize = value
}
log.Debug("initServerMaxSendSize",
zap.String("role", p.Domain), zap.Int("grpc.serverMaxSendSize", p.ServerMaxSendSize))
}
func (p *GrpcServerConfig) initServerMaxRecvSize() {
func (p *GrpcServerConfig) InitServerMaxRecvSize() {
var err error
valueStr, err := p.Load("grpc.serverMaxRecvSize")
if err != nil {
valueStr, err = p.Load(p.Domain + ".grpc.serverMaxRecvSize")
}
valueStr, err := p.LoadWithPriority([]string{p.Domain + ".grpc.serverMaxRecvSize", "grpc.serverMaxRecvSize"})
if err != nil {
p.ServerMaxRecvSize = DefaultServerMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil {
log.Warn("Failed to parse grpc.serverMaxRecvSize, set to default",
zap.String("role", p.Domain), zap.String("grpc.serverMaxRecvSize", valueStr),
zap.Error(err))
p.ServerMaxRecvSize = DefaultServerMaxRecvSize
} else {
value, err := strconv.Atoi(valueStr)
if err != nil {
log.Warn("Failed to parse grpc.serverMaxRecvSize, set to default",
zap.String("role", p.Domain), zap.String("grpc.serverMaxRecvSize", valueStr),
zap.Error(err))
p.ServerMaxRecvSize = DefaultServerMaxRecvSize
} else {
p.ServerMaxRecvSize = value
}
p.ServerMaxRecvSize = value
}
log.Debug("initServerMaxRecvSize",

View File

@ -39,17 +39,32 @@ func TestGrpcServerParams(t *testing.T) {
t.Logf("ServerMaxRecvSize = %d", Params.ServerMaxRecvSize)
Params.Remove(role + ".grpc.serverMaxRecvSize")
Params.initServerMaxRecvSize()
Params.InitServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxRecvSize, DefaultServerMaxRecvSize)
Params.Remove("grpc.serverMaxRecvSize")
Params.InitServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxRecvSize, DefaultServerMaxRecvSize)
Params.Save("grpc.serverMaxRecvSize", "a")
Params.InitServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxSendSize, DefaultServerMaxRecvSize)
assert.NotZero(t, Params.ServerMaxSendSize)
t.Logf("ServerMaxSendSize = %d", Params.ServerMaxSendSize)
Params.Remove(role + ".grpc.serverMaxSendSize")
Params.initServerMaxSendSize()
Params.InitServerMaxSendSize()
assert.Equal(t, Params.ServerMaxSendSize, DefaultServerMaxSendSize)
Params.Remove("grpc.serverMaxSendSize")
Params.InitServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxSendSize, DefaultServerMaxSendSize)
Params.Save("grpc.serverMaxSendSize", "a")
Params.InitServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxSendSize, DefaultServerMaxSendSize)
}
func TestGrpcClientParams(t *testing.T) {
role := typeutil.DataNodeRole
var Params GrpcClientConfig