mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-04 21:09:06 +08:00
change_coord_connect_retry (#6064)
Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
parent
2a33f66ef6
commit
064f8f76cd
@ -83,23 +83,16 @@ func (c *Client) Init() error {
|
|||||||
|
|
||||||
func (c *Client) connect() error {
|
func (c *Client) connect() error {
|
||||||
var err error
|
var err error
|
||||||
getDataCoordAddressFn := func() error {
|
connectDataCoordFn := func() error {
|
||||||
c.addr, err = getDataCoordAddress(c.sess)
|
c.addr, err = getDataCoordAddress(c.sess)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Debug("DataCoordClient getDataCoordAddr failed", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
|
||||||
err = retry.Do(c.ctx, getDataCoordAddressFn, c.retryOptions...)
|
|
||||||
if err != nil {
|
|
||||||
log.Debug("DataCoordClient try reconnect getDataCoordAddressFn failed", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
connectGrpcFunc := func() error {
|
|
||||||
opts := trace.GetInterceptorOpts()
|
opts := trace.GetInterceptorOpts()
|
||||||
log.Debug("DataCoordClient try reconnect ", zap.String("address", c.addr))
|
log.Debug("DataCoordClient try reconnect ", zap.String("address", c.addr))
|
||||||
conn, err := grpc.DialContext(c.ctx, c.addr,
|
conn, err := grpc.DialContext(c.ctx, c.addr,
|
||||||
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second),
|
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(3*time.Second),
|
||||||
grpc.WithUnaryInterceptor(
|
grpc.WithUnaryInterceptor(
|
||||||
grpc_middleware.ChainUnaryClient(
|
grpc_middleware.ChainUnaryClient(
|
||||||
grpc_retry.UnaryClientInterceptor(),
|
grpc_retry.UnaryClientInterceptor(),
|
||||||
@ -118,7 +111,7 @@ func (c *Client) connect() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
|
err = retry.Do(c.ctx, connectDataCoordFn, c.retryOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("DataCoord try reconnect failed", zap.Error(err))
|
log.Debug("DataCoord try reconnect failed", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
|
@ -85,24 +85,16 @@ func (c *Client) Init() error {
|
|||||||
|
|
||||||
func (c *Client) connect() error {
|
func (c *Client) connect() error {
|
||||||
var err error
|
var err error
|
||||||
getIndexCoordaddrFn := func() error {
|
connectIndexCoordaddrFn := func() error {
|
||||||
c.addr, err = getIndexCoordAddr(c.sess)
|
c.addr, err = getIndexCoordAddr(c.sess)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Debug("IndexCoordClient getIndexCoordAddress failed")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
|
||||||
err = retry.Do(c.ctx, getIndexCoordaddrFn, c.retryOptions...)
|
|
||||||
if err != nil {
|
|
||||||
log.Debug("IndexCoordClient getIndexCoordAddress failed", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.Debug("IndexCoordClient getIndexCoordAddress success")
|
|
||||||
connectGrpcFunc := func() error {
|
|
||||||
opts := trace.GetInterceptorOpts()
|
opts := trace.GetInterceptorOpts()
|
||||||
log.Debug("IndexCoordClient try connect ", zap.String("address", c.addr))
|
log.Debug("IndexCoordClient try connect ", zap.String("address", c.addr))
|
||||||
conn, err := grpc.DialContext(c.ctx, c.addr,
|
conn, err := grpc.DialContext(c.ctx, c.addr,
|
||||||
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second),
|
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(3*time.Second),
|
||||||
grpc.WithUnaryInterceptor(
|
grpc.WithUnaryInterceptor(
|
||||||
grpc_middleware.ChainUnaryClient(
|
grpc_middleware.ChainUnaryClient(
|
||||||
grpc_retry.UnaryClientInterceptor(),
|
grpc_retry.UnaryClientInterceptor(),
|
||||||
@ -121,7 +113,7 @@ func (c *Client) connect() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
|
err = retry.Do(c.ctx, connectIndexCoordaddrFn, c.retryOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("IndexCoordClient try connect failed", zap.Error(err))
|
log.Debug("IndexCoordClient try connect failed", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
|
@ -84,19 +84,12 @@ func (c *Client) Init() error {
|
|||||||
|
|
||||||
func (c *Client) connect() error {
|
func (c *Client) connect() error {
|
||||||
var err error
|
var err error
|
||||||
getQueryCoordAddressFn := func() error {
|
connectQueryCoordAddressFn := func() error {
|
||||||
c.addr, err = getQueryCoordAddress(c.sess)
|
c.addr, err = getQueryCoordAddress(c.sess)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Debug("QueryCoordClient getQueryCoordAddress failed", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
|
||||||
err = retry.Do(c.ctx, getQueryCoordAddressFn, c.retryOptions...)
|
|
||||||
if err != nil {
|
|
||||||
log.Debug("QueryCoordClient getQueryCoordAddress failed", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
connectGrpcFunc := func() error {
|
|
||||||
opts := trace.GetInterceptorOpts()
|
opts := trace.GetInterceptorOpts()
|
||||||
log.Debug("QueryCoordClient try reconnect ", zap.String("address", c.addr))
|
log.Debug("QueryCoordClient try reconnect ", zap.String("address", c.addr))
|
||||||
conn, err := grpc.DialContext(c.ctx, c.addr,
|
conn, err := grpc.DialContext(c.ctx, c.addr,
|
||||||
@ -119,7 +112,7 @@ func (c *Client) connect() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
|
err = retry.Do(c.ctx, connectQueryCoordAddressFn, c.retryOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("QueryCoordClient try reconnect failed", zap.Error(err))
|
log.Debug("QueryCoordClient try reconnect failed", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
|
@ -91,19 +91,12 @@ func (c *GrpcClient) Init() error {
|
|||||||
|
|
||||||
func (c *GrpcClient) connect() error {
|
func (c *GrpcClient) connect() error {
|
||||||
var err error
|
var err error
|
||||||
getRootCoordAddrFn := func() error {
|
connectRootCoordAddrFn := func() error {
|
||||||
c.addr, err = getRootCoordAddr(c.sess)
|
c.addr, err = getRootCoordAddr(c.sess)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Debug("RootCoordClient getRootCoordAddr failed", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
|
||||||
err = retry.Do(c.ctx, getRootCoordAddrFn, c.retryOptions...)
|
|
||||||
if err != nil {
|
|
||||||
log.Debug("RootCoordClient getRootCoordAddr failed", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
connectGrpcFunc := func() error {
|
|
||||||
opts := trace.GetInterceptorOpts()
|
opts := trace.GetInterceptorOpts()
|
||||||
log.Debug("RootCoordClient try reconnect ", zap.String("address", c.addr))
|
log.Debug("RootCoordClient try reconnect ", zap.String("address", c.addr))
|
||||||
conn, err := grpc.DialContext(c.ctx, c.addr,
|
conn, err := grpc.DialContext(c.ctx, c.addr,
|
||||||
@ -126,7 +119,7 @@ func (c *GrpcClient) connect() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
|
err = retry.Do(c.ctx, connectRootCoordAddrFn, c.retryOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("RootCoordClient try reconnect failed", zap.Error(err))
|
log.Debug("RootCoordClient try reconnect failed", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
|
Loading…
Reference in New Issue
Block a user