mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Check last heartbeat response time while fetching shard leaders (#20968)
Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
47b76e13be
commit
aec347e591
@ -181,6 +181,7 @@ queryCoord:
|
||||
channelTaskTimeout: 60000 # 1 minute
|
||||
segmentTaskTimeout: 120000 # 2 minute
|
||||
distPullInterval: 500
|
||||
heartbeatAvailableInterval: 2500 # Only QueryNodes which fetched heartbeats within the duration are available
|
||||
loadTimeoutSeconds: 600
|
||||
checkHandoffInterval: 5000
|
||||
taskMergeCap: 16
|
||||
|
8
internal/querycoordv2/dist/dist_handler.go
vendored
8
internal/querycoordv2/dist/dist_handler.go
vendored
@ -81,6 +81,13 @@ func (dh *distHandler) start(ctx context.Context) {
|
||||
if err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
failures++
|
||||
dh.logFailureInfo(resp, err)
|
||||
node := dh.nodeManager.Get(dh.nodeID)
|
||||
if node != nil {
|
||||
log.RatedDebug(30.0, "failed to get node's data distribution",
|
||||
zap.Int64("nodeID", dh.nodeID),
|
||||
zap.Time("lastHeartbeat", node.LastHeartbeat()),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
failures = 0
|
||||
dh.handleDistResp(resp)
|
||||
@ -115,6 +122,7 @@ func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse)
|
||||
session.WithSegmentCnt(len(resp.GetSegments())),
|
||||
session.WithChannelCnt(len(resp.GetChannels())),
|
||||
)
|
||||
node.SetLastHeartbeat(time.Now())
|
||||
}
|
||||
|
||||
dh.updateSegmentsDistribution(resp)
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||
@ -657,12 +658,13 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
|
||||
addrs := make([]string, 0, len(leaders))
|
||||
for _, leader := range leaders {
|
||||
info := s.nodeMgr.Get(leader.ID)
|
||||
if info == nil {
|
||||
if info == nil || time.Since(info.LastHeartbeat()) > Params.QueryCoordCfg.HeartbeatAvailableInterval {
|
||||
continue
|
||||
}
|
||||
isAllNodeAvailable := true
|
||||
for _, version := range leader.Segments {
|
||||
if s.nodeMgr.Get(version.NodeID) == nil {
|
||||
info := s.nodeMgr.Get(version.NodeID)
|
||||
if info == nil || time.Since(info.LastHeartbeat()) > Params.QueryCoordCfg.HeartbeatAvailableInterval {
|
||||
isAllNodeAvailable = false
|
||||
break
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||
@ -891,6 +892,8 @@ func (suite *ServiceSuite) TestGetShardLeaders() {
|
||||
req := &querypb.GetShardLeadersRequest{
|
||||
CollectionID: collection,
|
||||
}
|
||||
|
||||
suite.fetchHeartbeats()
|
||||
resp, err := server.GetShardLeaders(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
@ -1109,6 +1112,13 @@ func (suite *ServiceSuite) updateCollectionStatus(collectionID int64, status que
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) fetchHeartbeats() {
|
||||
for _, node := range suite.nodes {
|
||||
node := suite.nodeMgr.Get(node)
|
||||
node.SetLastHeartbeat(time.Now())
|
||||
}
|
||||
}
|
||||
|
||||
func TestService(t *testing.T) {
|
||||
suite.Run(t, new(ServiceSuite))
|
||||
}
|
||||
|
@ -18,8 +18,10 @@ package session
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
type Manager interface {
|
||||
@ -72,9 +74,10 @@ func NewNodeManager() *NodeManager {
|
||||
|
||||
type NodeInfo struct {
|
||||
stats
|
||||
mu sync.RWMutex
|
||||
id int64
|
||||
addr string
|
||||
mu sync.RWMutex
|
||||
id int64
|
||||
addr string
|
||||
lastHeartbeat *atomic.Int64
|
||||
}
|
||||
|
||||
func (n *NodeInfo) ID() int64 {
|
||||
@ -97,6 +100,14 @@ func (n *NodeInfo) ChannelCnt() int {
|
||||
return n.stats.getChannelCnt()
|
||||
}
|
||||
|
||||
func (n *NodeInfo) SetLastHeartbeat(time time.Time) {
|
||||
n.lastHeartbeat.Store(time.UnixNano())
|
||||
}
|
||||
|
||||
func (n *NodeInfo) LastHeartbeat() time.Time {
|
||||
return time.Unix(0, n.lastHeartbeat.Load())
|
||||
}
|
||||
|
||||
func (n *NodeInfo) UpdateStats(opts ...StatsOption) {
|
||||
n.mu.Lock()
|
||||
for _, opt := range opts {
|
||||
@ -107,9 +118,10 @@ func (n *NodeInfo) UpdateStats(opts ...StatsOption) {
|
||||
|
||||
func NewNodeInfo(id int64, addr string) *NodeInfo {
|
||||
return &NodeInfo{
|
||||
stats: newStats(),
|
||||
id: id,
|
||||
addr: addr,
|
||||
stats: newStats(),
|
||||
id: id,
|
||||
addr: addr,
|
||||
lastHeartbeat: atomic.NewInt64(0),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -723,6 +723,7 @@ type queryCoordConfig struct {
|
||||
ChannelTaskTimeout time.Duration
|
||||
SegmentTaskTimeout time.Duration
|
||||
DistPullInterval time.Duration
|
||||
HeartbeatAvailableInterval time.Duration
|
||||
LoadTimeoutSeconds time.Duration
|
||||
CheckHandoffInterval time.Duration
|
||||
EnableActiveStandby bool
|
||||
@ -751,6 +752,7 @@ func (p *queryCoordConfig) init(base *BaseTable) {
|
||||
p.initChannelTaskTimeout()
|
||||
p.initSegmentTaskTimeout()
|
||||
p.initDistPullInterval()
|
||||
p.initHeartbeatAvailableInterval()
|
||||
p.initLoadTimeoutSeconds()
|
||||
p.initEnableActiveStandby()
|
||||
p.initNextTargetSurviveTime()
|
||||
@ -860,6 +862,11 @@ func (p *queryCoordConfig) initDistPullInterval() {
|
||||
p.DistPullInterval = time.Duration(pullInterval) * time.Millisecond
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) initHeartbeatAvailableInterval() {
|
||||
interval := p.Base.ParseInt32WithDefault("queryCoord.heartbeatAvailableInterval", 2500)
|
||||
p.HeartbeatAvailableInterval = time.Duration(interval) * time.Millisecond
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) initLoadTimeoutSeconds() {
|
||||
timeout := p.Base.LoadWithDefault("queryCoord.loadTimeoutSeconds", "600")
|
||||
loadTimeout, err := strconv.ParseInt(timeout, 10, 64)
|
||||
|
Loading…
Reference in New Issue
Block a user