enhance: improve check health (#34265)

issue: https://github.com/milvus-io/milvus/issues/34264
pr: #33800

Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
jaime 2024-07-01 10:18:07 +08:00 committed by GitHub
parent fc6bd387b8
commit 0992f10694
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 555 additions and 140 deletions

View File

@ -1474,6 +1474,17 @@ func (m *meta) DropChannelCheckpoint(vChannel string) error {
return nil
}
func (m *meta) GetChannelCheckpoints() map[string]*msgpb.MsgPosition {
m.channelCPs.RLock()
defer m.channelCPs.RUnlock()
checkpoints := make(map[string]*msgpb.MsgPosition, len(m.channelCPs.checkpoints))
for ch, cp := range m.channelCPs.checkpoints {
checkpoints[ch] = proto.Clone(cp).(*msgpb.MsgPosition)
}
return checkpoints
}
func (m *meta) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool {
return m.catalog.GcConfirm(ctx, collectionID, partitionID)
}

View File

@ -3114,6 +3114,51 @@ func closeTestServer(t *testing.T, svr *Server) {
}
func Test_CheckHealth(t *testing.T) {
getSessionManager := func(isHealthy bool) *SessionManagerImpl {
var client *mockDataNodeClient
if isHealthy {
client = &mockDataNodeClient{
id: 1,
state: commonpb.StateCode_Healthy,
}
} else {
client = &mockDataNodeClient{
id: 1,
state: commonpb.StateCode_Abnormal,
}
}
sm := NewSessionManagerImpl()
sm.sessions = struct {
sync.RWMutex
data map[int64]*Session
}{data: map[int64]*Session{1: {
client: client,
clientCreator: func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return client, nil
},
}}}
return sm
}
getChannelManager := func(t *testing.T, findWatcherOk bool) ChannelManager {
channelManager := NewMockChannelManager(t)
if findWatcherOk {
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil)
} else {
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error"))
}
return channelManager
}
collections := map[UniqueID]*collectionInfo{
1: {
ID: 1,
VChannelNames: []string{"ch1", "ch2"},
},
2: nil,
}
t.Run("not healthy", func(t *testing.T) {
ctx := context.Background()
s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
@ -3124,56 +3169,76 @@ func Test_CheckHealth(t *testing.T) {
assert.NotEmpty(t, resp.Reasons)
})
t.Run("data node health check is ok", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
healthClient := &mockDataNodeClient{
id: 1,
state: commonpb.StateCode_Healthy,
}
sm := NewSessionManagerImpl()
sm.sessions = struct {
sync.RWMutex
data map[int64]*Session
}{data: map[int64]*Session{1: {
client: healthClient,
clientCreator: func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return healthClient, nil
},
}}}
svr.sessionManager = sm
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, true, resp.IsHealthy)
assert.Empty(t, resp.Reasons)
})
t.Run("data node health check is fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
unhealthClient := &mockDataNodeClient{
id: 1,
state: commonpb.StateCode_Abnormal,
}
sm := NewSessionManagerImpl()
sm.sessions = struct {
sync.RWMutex
data map[int64]*Session
}{data: map[int64]*Session{1: {
client: unhealthClient,
clientCreator: func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return unhealthClient, nil
},
}}}
svr.sessionManager = sm
svr.sessionManager = getSessionManager(false)
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, false, resp.IsHealthy)
assert.NotEmpty(t, resp.Reasons)
})
t.Run("check channel watched fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, false)
svr.meta = &meta{collections: collections}
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, false, resp.IsHealthy)
assert.NotEmpty(t, resp.Reasons)
})
t.Run("check checkpoint fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, true)
svr.meta = &meta{
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
"ch1": {
Timestamp: tsoutil.ComposeTSByTime(time.Now().Add(-1000*time.Hour), 0),
MsgID: []byte{1, 2, 3, 4},
},
},
},
}
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, false, resp.IsHealthy)
assert.NotEmpty(t, resp.Reasons)
})
t.Run("ok", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, true)
svr.meta = &meta{
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
"ch1": {
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
MsgID: []byte{1, 2, 3, 4},
},
},
},
}
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, true, resp.IsHealthy)
assert.Empty(t, resp.Reasons)
})
}
func Test_newChunkManagerFactory(t *testing.T) {

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/pkg/common"
@ -1591,10 +1592,18 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
err := s.sessionManager.CheckHealth(ctx)
if err != nil {
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: []string{err.Error()}}, nil
return componentutil.CheckHealthRespWithErr(err), nil
}
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil
if err = CheckAllChannelsWatched(s.meta, s.channelManager); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
if err = CheckCheckPointsHealth(s.meta); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
return componentutil.CheckHealthRespWithErr(nil), nil
}
func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {

View File

@ -18,6 +18,7 @@ package datacoord
import (
"context"
"fmt"
"strconv"
"strings"
"time"
@ -32,6 +33,8 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -237,3 +240,35 @@ func calculateL0SegmentSize(fields []*datapb.FieldBinlog) float64 {
}
return float64(size)
}
func CheckCheckPointsHealth(meta *meta) error {
for channel, cp := range meta.GetChannelCheckpoints() {
ts, _ := tsoutil.ParseTS(cp.Timestamp)
lag := time.Since(ts)
if lag > paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.GetAsDuration(time.Second) {
return merr.WrapErrChannelCPExceededMaxLag(channel, fmt.Sprintf("checkpoint lag: %f(min)", lag.Minutes()))
}
}
return nil
}
func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) error {
collIDs := meta.ListCollections()
for _, collID := range collIDs {
collInfo := meta.GetCollection(collID)
if collInfo == nil {
log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID))
continue
}
for _, channelName := range collInfo.VChannelNames {
_, err := channelManager.FindWatcher(channelName)
if err != nil {
log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID),
zap.String("channelName", channelName), zap.Error(err))
return err
}
}
}
return nil
}

View File

@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/job"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -913,10 +914,14 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
errReasons, err := s.checkNodeHealth(ctx)
if err != nil || len(errReasons) != 0 {
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: errReasons}, nil
return componentutil.CheckHealthRespWithErrMsg(errReasons...), nil
}
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: errReasons}, nil
if err := utils.CheckCollectionsQueryable(s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
return componentutil.CheckHealthRespWithErr(nil), nil
}
func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) {

View File

@ -82,12 +82,12 @@ func CheckLeaderAvailable(nodeMgr *session.NodeManager, leader *meta.LeaderView,
return nil
}
func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) {
func checkLoadStatus(m *meta.Meta, collectionID int64) error {
percentage := m.CollectionManager.CalculateLoadPercentage(collectionID)
if percentage < 0 {
err := merr.WrapErrCollectionNotLoaded(collectionID)
log.Warn("failed to GetShardLeaders", zap.Error(err))
return nil, err
return err
}
collection := m.CollectionManager.GetCollection(collectionID)
if collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded {
@ -99,17 +99,14 @@ func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.Dis
err := merr.WrapErrCollectionNotFullyLoaded(collectionID)
msg := fmt.Sprintf("collection %v is not fully loaded", collectionID)
log.Warn(msg)
return nil, err
}
channels := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)
if len(channels) == 0 {
msg := "loaded collection do not found any channel in target, may be in recovery"
err := merr.WrapErrCollectionOnRecovering(collectionID, msg)
log.Warn("failed to get channels", zap.Error(err))
return nil, err
return err
}
return nil
}
func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager,
nodeMgr *session.NodeManager, collectionID int64, channels map[string]*meta.DmChannel,
) ([]*querypb.ShardLeadersList, error) {
ret := make([]*querypb.ShardLeadersList, 0)
currentTargets := targetMgr.GetSealedSegmentsByCollection(collectionID, meta.CurrentTarget)
for _, channel := range channels {
@ -166,6 +163,49 @@ func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.Dis
return ret, nil
}
func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) {
if err := checkLoadStatus(m, collectionID); err != nil {
return nil, err
}
channels := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)
if len(channels) == 0 {
msg := "loaded collection do not found any channel in target, may be in recovery"
err := merr.WrapErrCollectionOnRecovering(collectionID, msg)
log.Warn("failed to get channels", zap.Error(err))
return nil, err
}
return GetShardLeadersWithChannels(m, targetMgr, dist, nodeMgr, collectionID, channels)
}
// CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection
func CheckCollectionsQueryable(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error {
for _, coll := range m.GetAllCollections() {
collectionID := coll.GetCollectionID()
if err := checkLoadStatus(m, collectionID); err != nil {
return err
}
channels := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)
if len(channels) == 0 {
msg := "loaded collection do not found any channel in target, may be in recovery"
err := merr.WrapErrCollectionOnRecovering(collectionID, msg)
log.Warn("failed to get channels", zap.Error(err))
return err
}
shardList, err := GetShardLeadersWithChannels(m, targetMgr, dist, nodeMgr, collectionID, channels)
if err != nil {
return err
}
if len(channels) != len(shardList) {
return merr.WrapErrCollectionNotFullyLoaded(collectionID, "still have unwatched channels or loaded segments")
}
}
return nil
}
func filterDupLeaders(replicaManager *meta.ReplicaManager, leaders map[int64]*meta.LeaderView) map[int64]*meta.LeaderView {
type leaderID struct {
ReplicaID int64

View File

@ -677,18 +677,6 @@ func withDataCoord(dc types.DataCoordClient) Opt {
}
}
func withUnhealthyDataCoord() Opt {
dc := newMockDataCoord()
err := errors.New("mock error")
dc.GetComponentStatesFunc = func(ctx context.Context) (*milvuspb.ComponentStates, error) {
return &milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Abnormal},
Status: merr.Status(err),
}, retry.Unrecoverable(errors.New("error mock GetComponentStates"))
}
return withDataCoord(dc)
}
func withInvalidDataCoord() Opt {
dc := newMockDataCoord()
dc.GetComponentStatesFunc = func(ctx context.Context) (*milvuspb.ComponentStates, error) {

View File

@ -43,7 +43,6 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
@ -357,19 +356,10 @@ func (q *QuotaCenter) collectMetrics() error {
defer cancel()
group := &errgroup.Group{}
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
if err != nil {
return err
}
// get Query cluster metrics
group.Go(func() error {
rsp, err := q.queryCoord.GetMetrics(ctx, req)
if err = merr.CheckRPCCall(rsp, err); err != nil {
return err
}
queryCoordTopology := &metricsinfo.QueryCoordTopology{}
err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), queryCoordTopology)
queryCoordTopology, err := getQueryCoordMetrics(ctx, q.queryCoord)
if err != nil {
return err
}
@ -414,12 +404,7 @@ func (q *QuotaCenter) collectMetrics() error {
})
// get Data cluster metrics
group.Go(func() error {
rsp, err := q.dataCoord.GetMetrics(ctx, req)
if err = merr.CheckRPCCall(rsp, err); err != nil {
return err
}
dataCoordTopology := &metricsinfo.DataCoordTopology{}
err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), dataCoordTopology)
dataCoordTopology, err := getDataCoordMetrics(ctx, q.dataCoord)
if err != nil {
return err
}
@ -505,17 +490,11 @@ func (q *QuotaCenter) collectMetrics() error {
})
// get Proxies metrics
group.Go(func() error {
// TODO: get more proxy metrics info
rsps, err := q.proxies.GetProxyMetrics(ctx)
ret, err := getProxyMetrics(ctx, q.proxies)
if err != nil {
return err
}
for _, rsp := range rsps {
proxyMetric := &metricsinfo.ProxyInfos{}
err = metricsinfo.UnmarshalComponentInfos(rsp.GetResponse(), proxyMetric)
if err != nil {
return err
}
for _, proxyMetric := range ret {
if proxyMetric.QuotaMetrics != nil {
q.proxyMetrics[proxyMetric.ID] = proxyMetric.QuotaMetrics
}
@ -532,7 +511,8 @@ func (q *QuotaCenter) collectMetrics() error {
}
return nil
})
err = group.Wait()
err := group.Wait()
if err != nil {
return err
}

View File

@ -2773,9 +2773,8 @@ func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest)
}, nil
}
mu := &sync.Mutex{}
group, ctx := errgroup.WithContext(ctx)
errReasons := make([]string, 0, c.proxyClientManager.GetProxyCount())
errs := typeutil.NewConcurrentSet[error]()
proxyClients := c.proxyClientManager.GetProxyClients()
proxyClients.Range(func(key int64, value types.ProxyClient) bool {
@ -2784,28 +2783,41 @@ func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest)
group.Go(func() error {
sta, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
errs.Insert(err)
return err
}
err = merr.AnalyzeState("Proxy", nodeID, sta)
if err != nil {
mu.Lock()
defer mu.Unlock()
errReasons = append(errReasons, err.Error())
errs.Insert(err)
}
return nil
return err
})
return true
})
maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
if maxDelay > 0 {
group.Go(func() error {
err := CheckTimeTickLagExceeded(ctx, c.queryCoord, c.dataCoord, maxDelay)
if err != nil {
errs.Insert(err)
}
return err
})
}
err := group.Wait()
if err != nil || len(errReasons) != 0 {
if err != nil {
return &milvuspb.CheckHealthResponse{
Status: merr.Success(),
IsHealthy: false,
Reasons: errReasons,
Reasons: lo.Map(errs.Collect(), func(e error, i int) string {
return err.Error()
}),
}, nil
}
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: errReasons}, nil
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil
}

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
@ -47,6 +48,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tikv"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -1450,6 +1452,65 @@ func TestRootCoord_AlterCollection(t *testing.T) {
}
func TestRootCoord_CheckHealth(t *testing.T) {
getQueryCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) {
clusterTopology := metricsinfo.QueryClusterTopology{
ConnectedNodes: []metricsinfo.QueryNodeInfos{
{
QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{
Fgm: metricsinfo.FlowGraphMetric{
MinFlowGraphChannel: "ch1",
MinFlowGraphTt: tt,
NumFlowGraph: 1,
},
},
},
},
}
resp, _ := metricsinfo.MarshalTopology(metricsinfo.QueryCoordTopology{Cluster: clusterTopology})
return &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, 0),
}, nil
}
getDataCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) {
clusterTopology := metricsinfo.DataClusterTopology{
ConnectedDataNodes: []metricsinfo.DataNodeInfos{
{
QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{
Fgm: metricsinfo.FlowGraphMetric{
MinFlowGraphChannel: "ch1",
MinFlowGraphTt: tt,
NumFlowGraph: 1,
},
},
},
},
}
resp, _ := metricsinfo.MarshalTopology(metricsinfo.DataCoordTopology{Cluster: clusterTopology})
return &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, 0),
}, nil
}
querynodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-1*time.Minute), 0)
datanodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-2*time.Minute), 0)
dcClient := mocks.NewMockDataCoordClient(t)
dcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getDataCoordMetricsFunc(datanodeTT))
qcClient := mocks.NewMockQueryCoordClient(t)
qcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getQueryCoordMetricsFunc(querynodeTT))
errDataCoordClient := mocks.NewMockDataCoordClient(t)
errDataCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error"))
errQueryCoordClient := mocks.NewMockQueryCoordClient(t)
errQueryCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error"))
t.Run("not healthy", func(t *testing.T) {
ctx := context.Background()
c := newTestCore(withAbnormalCode())
@ -1459,10 +1520,12 @@ func TestRootCoord_CheckHealth(t *testing.T) {
assert.NotEmpty(t, resp.Reasons)
})
t.Run("proxy health check is ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withValidProxyManager())
t.Run("ok with disabled tt lag configuration", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "-1")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
c := newTestCore(withHealthyCode(), withValidProxyManager())
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
@ -1470,9 +1533,12 @@ func TestRootCoord_CheckHealth(t *testing.T) {
assert.Empty(t, resp.Reasons)
})
t.Run("proxy health check is fail", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidProxyManager())
t.Run("proxy health check fail with invalid proxy", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
c := newTestCore(withHealthyCode(), withInvalidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient))
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
@ -1480,6 +1546,62 @@ func TestRootCoord_CheckHealth(t *testing.T) {
assert.Equal(t, false, resp.IsHealthy)
assert.NotEmpty(t, resp.Reasons)
})
t.Run("proxy health check fail with get metrics error", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
{
c := newTestCore(withHealthyCode(),
withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(errQueryCoordClient))
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, false, resp.IsHealthy)
assert.NotEmpty(t, resp.Reasons)
}
{
c := newTestCore(withHealthyCode(),
withValidProxyManager(), withDataCoord(errDataCoordClient), withQueryCoord(qcClient))
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, false, resp.IsHealthy)
assert.NotEmpty(t, resp.Reasons)
}
})
t.Run("ok with tt lag exceeded", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "90")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
c := newTestCore(withHealthyCode(),
withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient))
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, false, resp.IsHealthy)
assert.NotEmpty(t, resp.Reasons)
})
t.Run("ok with tt lag checking", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "600")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
c := newTestCore(withHealthyCode(),
withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient))
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, true, resp.IsHealthy)
assert.Empty(t, resp.Reasons)
})
}
func TestRootCoord_DescribeDatabase(t *testing.T) {

View File

@ -17,16 +17,24 @@
package rootcoord
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -204,3 +212,136 @@ func getRateLimitConfig(properties map[string]string, configKey string, configVa
return configValue
}
func getQueryCoordMetrics(ctx context.Context, queryCoord types.QueryCoordClient) (*metricsinfo.QueryCoordTopology, error) {
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
if err != nil {
return nil, err
}
rsp, err := queryCoord.GetMetrics(ctx, req)
if err = merr.CheckRPCCall(rsp, err); err != nil {
return nil, err
}
queryCoordTopology := &metricsinfo.QueryCoordTopology{}
if err := metricsinfo.UnmarshalTopology(rsp.GetResponse(), queryCoordTopology); err != nil {
return nil, err
}
return queryCoordTopology, nil
}
func getDataCoordMetrics(ctx context.Context, dataCoord types.DataCoordClient) (*metricsinfo.DataCoordTopology, error) {
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
if err != nil {
return nil, err
}
rsp, err := dataCoord.GetMetrics(ctx, req)
if err = merr.CheckRPCCall(rsp, err); err != nil {
return nil, err
}
dataCoordTopology := &metricsinfo.DataCoordTopology{}
if err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), dataCoordTopology); err != nil {
return nil, err
}
return dataCoordTopology, nil
}
func getProxyMetrics(ctx context.Context, proxies proxyutil.ProxyClientManagerInterface) ([]*metricsinfo.ProxyInfos, error) {
resp, err := proxies.GetProxyMetrics(ctx)
if err != nil {
return nil, err
}
ret := make([]*metricsinfo.ProxyInfos, 0, len(resp))
for _, rsp := range resp {
proxyMetric := &metricsinfo.ProxyInfos{}
err = metricsinfo.UnmarshalComponentInfos(rsp.GetResponse(), proxyMetric)
if err != nil {
return nil, err
}
ret = append(ret, proxyMetric)
}
return ret, nil
}
func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordClient, dataCoord types.DataCoordClient, maxDelay time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, GetMetricsTimeout)
defer cancel()
now := time.Now()
group := &errgroup.Group{}
queryNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]()
dataNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]()
group.Go(func() error {
queryCoordTopology, err := getQueryCoordMetrics(ctx, queryCoord)
if err != nil {
return err
}
for _, queryNodeMetric := range queryCoordTopology.Cluster.ConnectedNodes {
qm := queryNodeMetric.QuotaMetrics
if qm != nil {
if qm.Fgm.NumFlowGraph > 0 && qm.Fgm.MinFlowGraphChannel != "" {
minTt, _ := tsoutil.ParseTS(qm.Fgm.MinFlowGraphTt)
delay := now.Sub(minTt)
if delay.Milliseconds() >= maxDelay.Milliseconds() {
queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel, delay)
}
}
}
}
return nil
})
// get Data cluster metrics
group.Go(func() error {
dataCoordTopology, err := getDataCoordMetrics(ctx, dataCoord)
if err != nil {
return err
}
for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes {
dm := dataNodeMetric.QuotaMetrics
if dm.Fgm.NumFlowGraph > 0 && dm.Fgm.MinFlowGraphChannel != "" {
minTt, _ := tsoutil.ParseTS(dm.Fgm.MinFlowGraphTt)
delay := now.Sub(minTt)
if delay.Milliseconds() >= maxDelay.Milliseconds() {
dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel, delay)
}
}
}
return nil
})
err := group.Wait()
if err != nil {
return err
}
var maxLagChannel string
var maxLag time.Duration
findMaxLagChannel := func(params ...*typeutil.ConcurrentMap[string, time.Duration]) {
for _, param := range params {
param.Range(func(k string, v time.Duration) bool {
if v > maxLag {
maxLag = v
maxLagChannel = k
}
return true
})
}
}
findMaxLagChannel(queryNodeTTDelay, dataNodeTTDelay)
if maxLag > 0 && len(maxLagChannel) != 0 {
return fmt.Errorf("max timetick lag execced threhold, max timetick lag:%s on channel:%s", maxLag, maxLagChannel)
}
return nil
}

View File

@ -84,3 +84,17 @@ func WaitForComponentHealthy[T interface {
}](ctx context.Context, client T, serviceName string, attempts uint, sleep time.Duration) error {
return WaitForComponentStates(ctx, client, serviceName, []commonpb.StateCode{commonpb.StateCode_Healthy}, attempts, sleep)
}
func CheckHealthRespWithErr(err error) *milvuspb.CheckHealthResponse {
if err != nil {
return CheckHealthRespWithErrMsg(err.Error())
}
return CheckHealthRespWithErrMsg()
}
func CheckHealthRespWithErrMsg(errMsg ...string) *milvuspb.CheckHealthResponse {
if len(errMsg) != 0 {
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: errMsg}
}
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}
}

View File

@ -21,7 +21,7 @@ import (
"github.com/tecbot/gorocksdb"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/kv/rocksdb"
rocksdbkv "github.com/milvus-io/milvus/pkg/kv/rocksdb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"

View File

@ -76,10 +76,11 @@ var (
ErrReplicaNotAvailable = newMilvusError("replica not available", 401, false)
// Channel & Delegator related
ErrChannelNotFound = newMilvusError("channel not found", 500, false)
ErrChannelLack = newMilvusError("channel lacks", 501, false)
ErrChannelReduplicate = newMilvusError("channel reduplicates", 502, false)
ErrChannelNotAvailable = newMilvusError("channel not available", 503, false)
ErrChannelNotFound = newMilvusError("channel not found", 500, false)
ErrChannelLack = newMilvusError("channel lacks", 501, false)
ErrChannelReduplicate = newMilvusError("channel reduplicates", 502, false)
ErrChannelNotAvailable = newMilvusError("channel not available", 503, false)
ErrChannelCPExceededMaxLag = newMilvusError("channel checkpoint exceed max lag", 504, false)
// Segment related
ErrSegmentNotFound = newMilvusError("segment not found", 600, false)

View File

@ -629,36 +629,33 @@ func WrapErrReplicaNotAvailable(id int64, msg ...string) error {
}
// Channel related
func WrapErrChannelNotFound(name string, msg ...string) error {
err := wrapFields(ErrChannelNotFound, value("channel", name))
func warpChannelErr(mErr milvusError, name string, msg ...string) error {
err := wrapFields(mErr, value("channel", name))
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}
func WrapErrChannelNotFound(name string, msg ...string) error {
return warpChannelErr(ErrChannelNotFound, name, msg...)
}
func WrapErrChannelCPExceededMaxLag(name string, msg ...string) error {
return warpChannelErr(ErrChannelCPExceededMaxLag, name, msg...)
}
func WrapErrChannelLack(name string, msg ...string) error {
err := wrapFields(ErrChannelLack, value("channel", name))
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
return warpChannelErr(ErrChannelLack, name, msg...)
}
func WrapErrChannelReduplicate(name string, msg ...string) error {
err := wrapFields(ErrChannelReduplicate, value("channel", name))
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
return warpChannelErr(ErrChannelReduplicate, name, msg...)
}
func WrapErrChannelNotAvailable(name string, msg ...string) error {
err := wrapFields(ErrChannelNotAvailable, value("channel", name))
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
return warpChannelErr(ErrChannelNotAvailable, name, msg...)
}
// Segment related

View File

@ -1581,15 +1581,10 @@ specific conditions, such as memory of nodes to water marker), ` + "true" + ` me
Version: "2.2.0",
DefaultValue: defaultMaxTtDelay,
Formatter: func(v string) string {
if !p.TtProtectionEnabled.GetAsBool() {
return fmt.Sprintf("%d", math.MaxInt64)
if getAsFloat(v) < 0 {
return "0"
}
delay := getAsFloat(v)
// (0, 65536)
if delay <= 0 || delay >= 65536 {
return defaultMaxTtDelay
}
return fmt.Sprintf("%f", delay)
return v
},
Doc: `maxTimeTickDelay indicates the backpressure for DML Operations.
DML rates would be reduced according to the ratio of time tick delay to maxTimeTickDelay,

View File

@ -190,7 +190,7 @@ func TestQuotaParam(t *testing.T) {
t.Run("test limit writing", func(t *testing.T) {
assert.False(t, qc.ForceDenyWriting.GetAsBool())
assert.Equal(t, false, qc.TtProtectionEnabled.GetAsBool())
assert.Equal(t, math.MaxInt64, qc.MaxTimeTickDelay.GetAsInt())
assert.Equal(t, 300, qc.MaxTimeTickDelay.GetAsInt())
assert.Equal(t, defaultLowWaterLevel, qc.DataNodeMemoryLowWaterLevel.GetAsFloat())
assert.Equal(t, defaultHighWaterLevel, qc.DataNodeMemoryHighWaterLevel.GetAsFloat())
assert.Equal(t, defaultLowWaterLevel, qc.QueryNodeMemoryLowWaterLevel.GetAsFloat())