From bf2c42760979972716c2ed0017c8a10c859d212b Mon Sep 17 00:00:00 2001 From: godchen Date: Fri, 20 Aug 2021 17:52:11 +0800 Subject: [PATCH] Fix connection timeout (#7203) Signed-off-by: godchen --- internal/distributed/datacoord/client/client.go | 5 ++++- internal/distributed/datanode/client/client.go | 5 ++++- internal/distributed/indexcoord/client/client.go | 5 ++++- internal/distributed/indexnode/client/client.go | 5 ++++- internal/distributed/proxy/client/client.go | 5 ++++- internal/distributed/querycoord/client/client.go | 5 ++++- internal/distributed/querynode/client/client.go | 5 ++++- internal/distributed/rootcoord/client/client.go | 5 ++++- 8 files changed, 32 insertions(+), 8 deletions(-) diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index 8d1c0e74a0..df19d26f67 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -15,6 +15,7 @@ import ( "context" "errors" "fmt" + "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" @@ -90,7 +91,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { } opts := trace.GetInterceptorOpts() log.Debug("DataCoordClient try reconnect ", zap.String("address", c.addr)) - conn, err := grpc.DialContext(c.ctx, c.addr, + ctx, cancel := context.WithTimeout(c.ctx, 15*time.Second) + defer cancel() + conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 4e56fd7b78..b6d38e5414 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -15,6 +15,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/retry" @@ -68,7 +69,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { connectGrpcFunc := func() error { opts := trace.GetInterceptorOpts() log.Debug("DataNode connect ", zap.String("address", c.addr)) - conn, err := grpc.DialContext(c.ctx, c.addr, + ctx, cancel := context.WithTimeout(c.ctx, 15*time.Second) + defer cancel() + conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), diff --git a/internal/distributed/indexcoord/client/client.go b/internal/distributed/indexcoord/client/client.go index 5a7b23bf34..fc7dc6bc9f 100644 --- a/internal/distributed/indexcoord/client/client.go +++ b/internal/distributed/indexcoord/client/client.go @@ -15,6 +15,7 @@ import ( "context" "errors" "fmt" + "time" "google.golang.org/grpc" @@ -91,7 +92,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { } opts := trace.GetInterceptorOpts() log.Debug("IndexCoordClient try connect ", zap.String("address", c.addr)) - conn, err := grpc.DialContext(c.ctx, c.addr, + ctx, cancel := context.WithTimeout(c.ctx, 15*time.Second) + defer cancel() + conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 5f63f0926e..8695c90d50 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -15,6 +15,7 @@ import ( "context" "errors" "fmt" + "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" @@ -64,7 +65,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { connectGrpcFunc := func() error { opts := trace.GetInterceptorOpts() log.Debug("IndexNodeClient try connect ", zap.String("address", c.addr)) - conn, err := grpc.DialContext(c.ctx, c.addr, + ctx, cancel := context.WithTimeout(c.ctx, 15*time.Second) + defer cancel() + conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), diff --git a/internal/distributed/proxy/client/client.go b/internal/distributed/proxy/client/client.go index 6e70fe5045..2b03e26879 100644 --- a/internal/distributed/proxy/client/client.go +++ b/internal/distributed/proxy/client/client.go @@ -15,6 +15,7 @@ import ( "context" "errors" "fmt" + "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" @@ -63,7 +64,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { connectGrpcFunc := func() error { opts := trace.GetInterceptorOpts() log.Debug("ProxyClient try connect ", zap.String("address", c.addr)) - conn, err := grpc.DialContext(c.ctx, c.addr, + ctx, cancel := context.WithTimeout(c.ctx, 15*time.Second) + defer cancel() + conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), diff --git a/internal/distributed/querycoord/client/client.go b/internal/distributed/querycoord/client/client.go index 3e90c9e9f2..46d6dad814 100644 --- a/internal/distributed/querycoord/client/client.go +++ b/internal/distributed/querycoord/client/client.go @@ -15,6 +15,7 @@ import ( "context" "errors" "fmt" + "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" @@ -91,7 +92,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { } opts := trace.GetInterceptorOpts() log.Debug("QueryCoordClient try reconnect ", zap.String("address", c.addr)) - conn, err := grpc.DialContext(c.ctx, c.addr, + ctx, cancel := context.WithTimeout(c.ctx, 15*time.Second) + defer cancel() + conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 6f0eb8d2a1..19a8d58258 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -15,6 +15,7 @@ import ( "context" "errors" "fmt" + "time" "google.golang.org/grpc" @@ -64,7 +65,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { connectGrpcFunc := func() error { opts := trace.GetInterceptorOpts() log.Debug("QueryNodeClient try connect ", zap.String("address", c.addr)) - conn, err := grpc.DialContext(c.ctx, c.addr, + ctx, cancel := context.WithTimeout(c.ctx, 15*time.Second) + defer cancel() + conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 532bfabb46..52b3f54adb 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -15,6 +15,7 @@ import ( "context" "errors" "fmt" + "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" @@ -99,7 +100,9 @@ func (c *GrpcClient) connect(retryOptions ...retry.Option) error { } opts := trace.GetInterceptorOpts() log.Debug("RootCoordClient try reconnect ", zap.String("address", c.addr)) - conn, err := grpc.DialContext(c.ctx, c.addr, + ctx, cancel := context.WithTimeout(c.ctx, 15*time.Second) + defer cancel() + conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),