From 0992f106942cf9b5ebf0c34be46bc90de4d2d007 Mon Sep 17 00:00:00 2001 From: jaime Date: Mon, 1 Jul 2024 10:18:07 +0800 Subject: [PATCH] enhance: improve check health (#34265) issue: https://github.com/milvus-io/milvus/issues/34264 pr: #33800 Signed-off-by: jaime --- internal/datacoord/meta.go | 11 ++ internal/datacoord/server_test.go | 147 +++++++++++++----- internal/datacoord/services.go | 13 +- internal/datacoord/util.go | 35 +++++ internal/querycoordv2/services.go | 9 +- internal/querycoordv2/utils/util.go | 62 ++++++-- internal/rootcoord/mock_test.go | 12 -- internal/rootcoord/quota_center.go | 32 +--- internal/rootcoord/root_coord.go | 30 ++-- internal/rootcoord/root_coord_test.go | 134 +++++++++++++++- internal/rootcoord/util.go | 141 +++++++++++++++++ internal/util/componentutil/componentutil.go | 14 ++ .../rocksmq/server/rocksmq_retention.go | 2 +- pkg/util/merr/errors.go | 9 +- pkg/util/merr/utils.go | 31 ++-- pkg/util/paramtable/quota_param.go | 11 +- pkg/util/paramtable/quota_param_test.go | 2 +- 17 files changed, 555 insertions(+), 140 deletions(-) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 5587786a89..dd8e9effaf 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1474,6 +1474,17 @@ func (m *meta) DropChannelCheckpoint(vChannel string) error { return nil } +func (m *meta) GetChannelCheckpoints() map[string]*msgpb.MsgPosition { + m.channelCPs.RLock() + defer m.channelCPs.RUnlock() + + checkpoints := make(map[string]*msgpb.MsgPosition, len(m.channelCPs.checkpoints)) + for ch, cp := range m.channelCPs.checkpoints { + checkpoints[ch] = proto.Clone(cp).(*msgpb.MsgPosition) + } + return checkpoints +} + func (m *meta) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool { return m.catalog.GcConfirm(ctx, collectionID, partitionID) } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 69b59f4ec9..02daa69cad 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -3114,6 +3114,51 @@ func closeTestServer(t *testing.T, svr *Server) { } func Test_CheckHealth(t *testing.T) { + getSessionManager := func(isHealthy bool) *SessionManagerImpl { + var client *mockDataNodeClient + if isHealthy { + client = &mockDataNodeClient{ + id: 1, + state: commonpb.StateCode_Healthy, + } + } else { + client = &mockDataNodeClient{ + id: 1, + state: commonpb.StateCode_Abnormal, + } + } + + sm := NewSessionManagerImpl() + sm.sessions = struct { + sync.RWMutex + data map[int64]*Session + }{data: map[int64]*Session{1: { + client: client, + clientCreator: func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return client, nil + }, + }}} + return sm + } + + getChannelManager := func(t *testing.T, findWatcherOk bool) ChannelManager { + channelManager := NewMockChannelManager(t) + if findWatcherOk { + channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil) + } else { + channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error")) + } + return channelManager + } + + collections := map[UniqueID]*collectionInfo{ + 1: { + ID: 1, + VChannelNames: []string{"ch1", "ch2"}, + }, + 2: nil, + } + t.Run("not healthy", func(t *testing.T) { ctx := context.Background() s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} @@ -3124,56 +3169,76 @@ func Test_CheckHealth(t *testing.T) { assert.NotEmpty(t, resp.Reasons) }) - t.Run("data node health check is ok", func(t *testing.T) { - svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - svr.stateCode.Store(commonpb.StateCode_Healthy) - healthClient := &mockDataNodeClient{ - id: 1, - state: commonpb.StateCode_Healthy, - } - sm := NewSessionManagerImpl() - sm.sessions = struct { - sync.RWMutex - data map[int64]*Session - }{data: map[int64]*Session{1: { - client: healthClient, - clientCreator: func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { - return healthClient, nil - }, - }}} - - svr.sessionManager = sm - ctx := context.Background() - resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - assert.NoError(t, err) - assert.Equal(t, true, resp.IsHealthy) - assert.Empty(t, resp.Reasons) - }) - t.Run("data node health check is fail", func(t *testing.T) { svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} svr.stateCode.Store(commonpb.StateCode_Healthy) - unhealthClient := &mockDataNodeClient{ - id: 1, - state: commonpb.StateCode_Abnormal, - } - sm := NewSessionManagerImpl() - sm.sessions = struct { - sync.RWMutex - data map[int64]*Session - }{data: map[int64]*Session{1: { - client: unhealthClient, - clientCreator: func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { - return unhealthClient, nil - }, - }}} - svr.sessionManager = sm + svr.sessionManager = getSessionManager(false) ctx := context.Background() resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) assert.Equal(t, false, resp.IsHealthy) assert.NotEmpty(t, resp.Reasons) }) + + t.Run("check channel watched fail", func(t *testing.T) { + svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} + svr.stateCode.Store(commonpb.StateCode_Healthy) + svr.sessionManager = getSessionManager(true) + svr.channelManager = getChannelManager(t, false) + svr.meta = &meta{collections: collections} + ctx := context.Background() + resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + assert.NoError(t, err) + assert.Equal(t, false, resp.IsHealthy) + assert.NotEmpty(t, resp.Reasons) + }) + + t.Run("check checkpoint fail", func(t *testing.T) { + svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} + svr.stateCode.Store(commonpb.StateCode_Healthy) + svr.sessionManager = getSessionManager(true) + svr.channelManager = getChannelManager(t, true) + svr.meta = &meta{ + collections: collections, + channelCPs: &channelCPs{ + checkpoints: map[string]*msgpb.MsgPosition{ + "ch1": { + Timestamp: tsoutil.ComposeTSByTime(time.Now().Add(-1000*time.Hour), 0), + MsgID: []byte{1, 2, 3, 4}, + }, + }, + }, + } + + ctx := context.Background() + resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + assert.NoError(t, err) + assert.Equal(t, false, resp.IsHealthy) + assert.NotEmpty(t, resp.Reasons) + }) + + t.Run("ok", func(t *testing.T) { + svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} + svr.stateCode.Store(commonpb.StateCode_Healthy) + svr.sessionManager = getSessionManager(true) + svr.channelManager = getChannelManager(t, true) + svr.meta = &meta{ + collections: collections, + channelCPs: &channelCPs{ + checkpoints: map[string]*msgpb.MsgPosition{ + "ch1": { + Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0), + MsgID: []byte{1, 2, 3, 4}, + }, + }, + }, + } + ctx := context.Background() + resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + assert.NoError(t, err) + assert.Equal(t, true, resp.IsHealthy) + assert.Empty(t, resp.Reasons) + }) } func Test_newChunkManagerFactory(t *testing.T) { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 50e62b6a94..aba3f7e0b7 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/internal/util/segmentutil" "github.com/milvus-io/milvus/pkg/common" @@ -1591,10 +1592,18 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque err := s.sessionManager.CheckHealth(ctx) if err != nil { - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: []string{err.Error()}}, nil + return componentutil.CheckHealthRespWithErr(err), nil } - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil + if err = CheckAllChannelsWatched(s.meta, s.channelManager); err != nil { + return componentutil.CheckHealthRespWithErr(err), nil + } + + if err = CheckCheckPointsHealth(s.meta); err != nil { + return componentutil.CheckHealthRespWithErr(err), nil + } + + return componentutil.CheckHealthRespWithErr(nil), nil } func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) { diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index f96a67008e..db148b4e9c 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -18,6 +18,7 @@ package datacoord import ( "context" + "fmt" "strconv" "strings" "time" @@ -32,6 +33,8 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -237,3 +240,35 @@ func calculateL0SegmentSize(fields []*datapb.FieldBinlog) float64 { } return float64(size) } + +func CheckCheckPointsHealth(meta *meta) error { + for channel, cp := range meta.GetChannelCheckpoints() { + ts, _ := tsoutil.ParseTS(cp.Timestamp) + lag := time.Since(ts) + if lag > paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.GetAsDuration(time.Second) { + return merr.WrapErrChannelCPExceededMaxLag(channel, fmt.Sprintf("checkpoint lag: %f(min)", lag.Minutes())) + } + } + return nil +} + +func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) error { + collIDs := meta.ListCollections() + for _, collID := range collIDs { + collInfo := meta.GetCollection(collID) + if collInfo == nil { + log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID)) + continue + } + + for _, channelName := range collInfo.VChannelNames { + _, err := channelManager.FindWatcher(channelName) + if err != nil { + log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID), + zap.String("channelName", channelName), zap.Error(err)) + return err + } + } + } + return nil +} diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 159b933e80..75dcaf8f71 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/job" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" @@ -913,10 +914,14 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque errReasons, err := s.checkNodeHealth(ctx) if err != nil || len(errReasons) != 0 { - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: errReasons}, nil + return componentutil.CheckHealthRespWithErrMsg(errReasons...), nil } - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: errReasons}, nil + if err := utils.CheckCollectionsQueryable(s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil { + return componentutil.CheckHealthRespWithErr(err), nil + } + + return componentutil.CheckHealthRespWithErr(nil), nil } func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) { diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 286d7c7ab1..6ebf34232d 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -82,12 +82,12 @@ func CheckLeaderAvailable(nodeMgr *session.NodeManager, leader *meta.LeaderView, return nil } -func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) { +func checkLoadStatus(m *meta.Meta, collectionID int64) error { percentage := m.CollectionManager.CalculateLoadPercentage(collectionID) if percentage < 0 { err := merr.WrapErrCollectionNotLoaded(collectionID) log.Warn("failed to GetShardLeaders", zap.Error(err)) - return nil, err + return err } collection := m.CollectionManager.GetCollection(collectionID) if collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded { @@ -99,17 +99,14 @@ func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.Dis err := merr.WrapErrCollectionNotFullyLoaded(collectionID) msg := fmt.Sprintf("collection %v is not fully loaded", collectionID) log.Warn(msg) - return nil, err - } - - channels := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget) - if len(channels) == 0 { - msg := "loaded collection do not found any channel in target, may be in recovery" - err := merr.WrapErrCollectionOnRecovering(collectionID, msg) - log.Warn("failed to get channels", zap.Error(err)) - return nil, err + return err } + return nil +} +func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, + nodeMgr *session.NodeManager, collectionID int64, channels map[string]*meta.DmChannel, +) ([]*querypb.ShardLeadersList, error) { ret := make([]*querypb.ShardLeadersList, 0) currentTargets := targetMgr.GetSealedSegmentsByCollection(collectionID, meta.CurrentTarget) for _, channel := range channels { @@ -166,6 +163,49 @@ func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.Dis return ret, nil } +func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) { + if err := checkLoadStatus(m, collectionID); err != nil { + return nil, err + } + + channels := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget) + if len(channels) == 0 { + msg := "loaded collection do not found any channel in target, may be in recovery" + err := merr.WrapErrCollectionOnRecovering(collectionID, msg) + log.Warn("failed to get channels", zap.Error(err)) + return nil, err + } + return GetShardLeadersWithChannels(m, targetMgr, dist, nodeMgr, collectionID, channels) +} + +// CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection +func CheckCollectionsQueryable(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error { + for _, coll := range m.GetAllCollections() { + collectionID := coll.GetCollectionID() + if err := checkLoadStatus(m, collectionID); err != nil { + return err + } + + channels := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget) + if len(channels) == 0 { + msg := "loaded collection do not found any channel in target, may be in recovery" + err := merr.WrapErrCollectionOnRecovering(collectionID, msg) + log.Warn("failed to get channels", zap.Error(err)) + return err + } + + shardList, err := GetShardLeadersWithChannels(m, targetMgr, dist, nodeMgr, collectionID, channels) + if err != nil { + return err + } + + if len(channels) != len(shardList) { + return merr.WrapErrCollectionNotFullyLoaded(collectionID, "still have unwatched channels or loaded segments") + } + } + return nil +} + func filterDupLeaders(replicaManager *meta.ReplicaManager, leaders map[int64]*meta.LeaderView) map[int64]*meta.LeaderView { type leaderID struct { ReplicaID int64 diff --git a/internal/rootcoord/mock_test.go b/internal/rootcoord/mock_test.go index f17ff9d1e2..1828e2763b 100644 --- a/internal/rootcoord/mock_test.go +++ b/internal/rootcoord/mock_test.go @@ -677,18 +677,6 @@ func withDataCoord(dc types.DataCoordClient) Opt { } } -func withUnhealthyDataCoord() Opt { - dc := newMockDataCoord() - err := errors.New("mock error") - dc.GetComponentStatesFunc = func(ctx context.Context) (*milvuspb.ComponentStates, error) { - return &milvuspb.ComponentStates{ - State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Abnormal}, - Status: merr.Status(err), - }, retry.Unrecoverable(errors.New("error mock GetComponentStates")) - } - return withDataCoord(dc) -} - func withInvalidDataCoord() Opt { dc := newMockDataCoord() dc.GetComponentStatesFunc = func(ctx context.Context) (*milvuspb.ComponentStates, error) { diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index c16c5dbca9..4a00fc8e67 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -43,7 +43,6 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -357,19 +356,10 @@ func (q *QuotaCenter) collectMetrics() error { defer cancel() group := &errgroup.Group{} - req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) - if err != nil { - return err - } // get Query cluster metrics group.Go(func() error { - rsp, err := q.queryCoord.GetMetrics(ctx, req) - if err = merr.CheckRPCCall(rsp, err); err != nil { - return err - } - queryCoordTopology := &metricsinfo.QueryCoordTopology{} - err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), queryCoordTopology) + queryCoordTopology, err := getQueryCoordMetrics(ctx, q.queryCoord) if err != nil { return err } @@ -414,12 +404,7 @@ func (q *QuotaCenter) collectMetrics() error { }) // get Data cluster metrics group.Go(func() error { - rsp, err := q.dataCoord.GetMetrics(ctx, req) - if err = merr.CheckRPCCall(rsp, err); err != nil { - return err - } - dataCoordTopology := &metricsinfo.DataCoordTopology{} - err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), dataCoordTopology) + dataCoordTopology, err := getDataCoordMetrics(ctx, q.dataCoord) if err != nil { return err } @@ -505,17 +490,11 @@ func (q *QuotaCenter) collectMetrics() error { }) // get Proxies metrics group.Go(func() error { - // TODO: get more proxy metrics info - rsps, err := q.proxies.GetProxyMetrics(ctx) + ret, err := getProxyMetrics(ctx, q.proxies) if err != nil { return err } - for _, rsp := range rsps { - proxyMetric := &metricsinfo.ProxyInfos{} - err = metricsinfo.UnmarshalComponentInfos(rsp.GetResponse(), proxyMetric) - if err != nil { - return err - } + for _, proxyMetric := range ret { if proxyMetric.QuotaMetrics != nil { q.proxyMetrics[proxyMetric.ID] = proxyMetric.QuotaMetrics } @@ -532,7 +511,8 @@ func (q *QuotaCenter) collectMetrics() error { } return nil }) - err = group.Wait() + + err := group.Wait() if err != nil { return err } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index b83c96add7..d88eb8e666 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -2773,9 +2773,8 @@ func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest) }, nil } - mu := &sync.Mutex{} group, ctx := errgroup.WithContext(ctx) - errReasons := make([]string, 0, c.proxyClientManager.GetProxyCount()) + errs := typeutil.NewConcurrentSet[error]() proxyClients := c.proxyClientManager.GetProxyClients() proxyClients.Range(func(key int64, value types.ProxyClient) bool { @@ -2784,28 +2783,41 @@ func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest) group.Go(func() error { sta, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) if err != nil { + errs.Insert(err) return err } err = merr.AnalyzeState("Proxy", nodeID, sta) if err != nil { - mu.Lock() - defer mu.Unlock() - errReasons = append(errReasons, err.Error()) + errs.Insert(err) } - return nil + + return err }) return true }) + maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) + if maxDelay > 0 { + group.Go(func() error { + err := CheckTimeTickLagExceeded(ctx, c.queryCoord, c.dataCoord, maxDelay) + if err != nil { + errs.Insert(err) + } + return err + }) + } + err := group.Wait() - if err != nil || len(errReasons) != 0 { + if err != nil { return &milvuspb.CheckHealthResponse{ Status: merr.Success(), IsHealthy: false, - Reasons: errReasons, + Reasons: lo.Map(errs.Collect(), func(e error, i int) string { + return err.Error() + }), }, nil } - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: errReasons}, nil + return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index bbdb896b52..5fea892a72 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/proxypb" @@ -47,6 +48,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tikv" + "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -1450,6 +1452,65 @@ func TestRootCoord_AlterCollection(t *testing.T) { } func TestRootCoord_CheckHealth(t *testing.T) { + getQueryCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) { + clusterTopology := metricsinfo.QueryClusterTopology{ + ConnectedNodes: []metricsinfo.QueryNodeInfos{ + { + QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{ + Fgm: metricsinfo.FlowGraphMetric{ + MinFlowGraphChannel: "ch1", + MinFlowGraphTt: tt, + NumFlowGraph: 1, + }, + }, + }, + }, + } + + resp, _ := metricsinfo.MarshalTopology(metricsinfo.QueryCoordTopology{Cluster: clusterTopology}) + return &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: resp, + ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, 0), + }, nil + } + + getDataCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) { + clusterTopology := metricsinfo.DataClusterTopology{ + ConnectedDataNodes: []metricsinfo.DataNodeInfos{ + { + QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{ + Fgm: metricsinfo.FlowGraphMetric{ + MinFlowGraphChannel: "ch1", + MinFlowGraphTt: tt, + NumFlowGraph: 1, + }, + }, + }, + }, + } + + resp, _ := metricsinfo.MarshalTopology(metricsinfo.DataCoordTopology{Cluster: clusterTopology}) + return &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: resp, + ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, 0), + }, nil + } + + querynodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-1*time.Minute), 0) + datanodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-2*time.Minute), 0) + + dcClient := mocks.NewMockDataCoordClient(t) + dcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getDataCoordMetricsFunc(datanodeTT)) + qcClient := mocks.NewMockQueryCoordClient(t) + qcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getQueryCoordMetricsFunc(querynodeTT)) + + errDataCoordClient := mocks.NewMockDataCoordClient(t) + errDataCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error")) + errQueryCoordClient := mocks.NewMockQueryCoordClient(t) + errQueryCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error")) + t.Run("not healthy", func(t *testing.T) { ctx := context.Background() c := newTestCore(withAbnormalCode()) @@ -1459,10 +1520,12 @@ func TestRootCoord_CheckHealth(t *testing.T) { assert.NotEmpty(t, resp.Reasons) }) - t.Run("proxy health check is ok", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withValidProxyManager()) + t.Run("ok with disabled tt lag configuration", func(t *testing.T) { + v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() + Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "-1") + defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) + c := newTestCore(withHealthyCode(), withValidProxyManager()) ctx := context.Background() resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -1470,9 +1533,12 @@ func TestRootCoord_CheckHealth(t *testing.T) { assert.Empty(t, resp.Reasons) }) - t.Run("proxy health check is fail", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withInvalidProxyManager()) + t.Run("proxy health check fail with invalid proxy", func(t *testing.T) { + v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() + Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000") + defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) + + c := newTestCore(withHealthyCode(), withInvalidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient)) ctx := context.Background() resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) @@ -1480,6 +1546,62 @@ func TestRootCoord_CheckHealth(t *testing.T) { assert.Equal(t, false, resp.IsHealthy) assert.NotEmpty(t, resp.Reasons) }) + + t.Run("proxy health check fail with get metrics error", func(t *testing.T) { + v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() + Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000") + defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) + + { + c := newTestCore(withHealthyCode(), + withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(errQueryCoordClient)) + + ctx := context.Background() + resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + assert.NoError(t, err) + assert.Equal(t, false, resp.IsHealthy) + assert.NotEmpty(t, resp.Reasons) + } + + { + c := newTestCore(withHealthyCode(), + withValidProxyManager(), withDataCoord(errDataCoordClient), withQueryCoord(qcClient)) + + ctx := context.Background() + resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + assert.NoError(t, err) + assert.Equal(t, false, resp.IsHealthy) + assert.NotEmpty(t, resp.Reasons) + } + }) + + t.Run("ok with tt lag exceeded", func(t *testing.T) { + v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() + Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "90") + defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) + + c := newTestCore(withHealthyCode(), + withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient)) + ctx := context.Background() + resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + assert.NoError(t, err) + assert.Equal(t, false, resp.IsHealthy) + assert.NotEmpty(t, resp.Reasons) + }) + + t.Run("ok with tt lag checking", func(t *testing.T) { + v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() + Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "600") + defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) + + c := newTestCore(withHealthyCode(), + withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient)) + ctx := context.Background() + resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + assert.NoError(t, err) + assert.Equal(t, true, resp.IsHealthy) + assert.Empty(t, resp.Reasons) + }) } func TestRootCoord_DescribeDatabase(t *testing.T) { diff --git a/internal/rootcoord/util.go b/internal/rootcoord/util.go index 1ac9a4d4b9..89b3ad5ae0 100644 --- a/internal/rootcoord/util.go +++ b/internal/rootcoord/util.go @@ -17,16 +17,24 @@ package rootcoord import ( + "context" "encoding/json" "fmt" "strconv" + "time" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -204,3 +212,136 @@ func getRateLimitConfig(properties map[string]string, configKey string, configVa return configValue } + +func getQueryCoordMetrics(ctx context.Context, queryCoord types.QueryCoordClient) (*metricsinfo.QueryCoordTopology, error) { + req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) + if err != nil { + return nil, err + } + + rsp, err := queryCoord.GetMetrics(ctx, req) + if err = merr.CheckRPCCall(rsp, err); err != nil { + return nil, err + } + queryCoordTopology := &metricsinfo.QueryCoordTopology{} + if err := metricsinfo.UnmarshalTopology(rsp.GetResponse(), queryCoordTopology); err != nil { + return nil, err + } + + return queryCoordTopology, nil +} + +func getDataCoordMetrics(ctx context.Context, dataCoord types.DataCoordClient) (*metricsinfo.DataCoordTopology, error) { + req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) + if err != nil { + return nil, err + } + + rsp, err := dataCoord.GetMetrics(ctx, req) + if err = merr.CheckRPCCall(rsp, err); err != nil { + return nil, err + } + dataCoordTopology := &metricsinfo.DataCoordTopology{} + if err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), dataCoordTopology); err != nil { + return nil, err + } + + return dataCoordTopology, nil +} + +func getProxyMetrics(ctx context.Context, proxies proxyutil.ProxyClientManagerInterface) ([]*metricsinfo.ProxyInfos, error) { + resp, err := proxies.GetProxyMetrics(ctx) + if err != nil { + return nil, err + } + + ret := make([]*metricsinfo.ProxyInfos, 0, len(resp)) + for _, rsp := range resp { + proxyMetric := &metricsinfo.ProxyInfos{} + err = metricsinfo.UnmarshalComponentInfos(rsp.GetResponse(), proxyMetric) + if err != nil { + return nil, err + } + ret = append(ret, proxyMetric) + } + + return ret, nil +} + +func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordClient, dataCoord types.DataCoordClient, maxDelay time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, GetMetricsTimeout) + defer cancel() + + now := time.Now() + group := &errgroup.Group{} + queryNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]() + dataNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]() + + group.Go(func() error { + queryCoordTopology, err := getQueryCoordMetrics(ctx, queryCoord) + if err != nil { + return err + } + + for _, queryNodeMetric := range queryCoordTopology.Cluster.ConnectedNodes { + qm := queryNodeMetric.QuotaMetrics + if qm != nil { + if qm.Fgm.NumFlowGraph > 0 && qm.Fgm.MinFlowGraphChannel != "" { + minTt, _ := tsoutil.ParseTS(qm.Fgm.MinFlowGraphTt) + delay := now.Sub(minTt) + + if delay.Milliseconds() >= maxDelay.Milliseconds() { + queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel, delay) + } + } + } + } + return nil + }) + + // get Data cluster metrics + group.Go(func() error { + dataCoordTopology, err := getDataCoordMetrics(ctx, dataCoord) + if err != nil { + return err + } + + for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes { + dm := dataNodeMetric.QuotaMetrics + if dm.Fgm.NumFlowGraph > 0 && dm.Fgm.MinFlowGraphChannel != "" { + minTt, _ := tsoutil.ParseTS(dm.Fgm.MinFlowGraphTt) + delay := now.Sub(minTt) + + if delay.Milliseconds() >= maxDelay.Milliseconds() { + dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel, delay) + } + } + } + return nil + }) + + err := group.Wait() + if err != nil { + return err + } + + var maxLagChannel string + var maxLag time.Duration + findMaxLagChannel := func(params ...*typeutil.ConcurrentMap[string, time.Duration]) { + for _, param := range params { + param.Range(func(k string, v time.Duration) bool { + if v > maxLag { + maxLag = v + maxLagChannel = k + } + return true + }) + } + } + findMaxLagChannel(queryNodeTTDelay, dataNodeTTDelay) + + if maxLag > 0 && len(maxLagChannel) != 0 { + return fmt.Errorf("max timetick lag execced threhold, max timetick lag:%s on channel:%s", maxLag, maxLagChannel) + } + return nil +} diff --git a/internal/util/componentutil/componentutil.go b/internal/util/componentutil/componentutil.go index 93537d2445..d89c9db72b 100644 --- a/internal/util/componentutil/componentutil.go +++ b/internal/util/componentutil/componentutil.go @@ -84,3 +84,17 @@ func WaitForComponentHealthy[T interface { }](ctx context.Context, client T, serviceName string, attempts uint, sleep time.Duration) error { return WaitForComponentStates(ctx, client, serviceName, []commonpb.StateCode{commonpb.StateCode_Healthy}, attempts, sleep) } + +func CheckHealthRespWithErr(err error) *milvuspb.CheckHealthResponse { + if err != nil { + return CheckHealthRespWithErrMsg(err.Error()) + } + return CheckHealthRespWithErrMsg() +} + +func CheckHealthRespWithErrMsg(errMsg ...string) *milvuspb.CheckHealthResponse { + if len(errMsg) != 0 { + return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: errMsg} + } + return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}} +} diff --git a/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go b/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go index ccca8a0dc1..98ce3d03bc 100644 --- a/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go +++ b/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go @@ -21,7 +21,7 @@ import ( "github.com/tecbot/gorocksdb" "go.uber.org/zap" - "github.com/milvus-io/milvus/pkg/kv/rocksdb" + rocksdbkv "github.com/milvus-io/milvus/pkg/kv/rocksdb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index b25c04597f..8cae11ea3d 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -76,10 +76,11 @@ var ( ErrReplicaNotAvailable = newMilvusError("replica not available", 401, false) // Channel & Delegator related - ErrChannelNotFound = newMilvusError("channel not found", 500, false) - ErrChannelLack = newMilvusError("channel lacks", 501, false) - ErrChannelReduplicate = newMilvusError("channel reduplicates", 502, false) - ErrChannelNotAvailable = newMilvusError("channel not available", 503, false) + ErrChannelNotFound = newMilvusError("channel not found", 500, false) + ErrChannelLack = newMilvusError("channel lacks", 501, false) + ErrChannelReduplicate = newMilvusError("channel reduplicates", 502, false) + ErrChannelNotAvailable = newMilvusError("channel not available", 503, false) + ErrChannelCPExceededMaxLag = newMilvusError("channel checkpoint exceed max lag", 504, false) // Segment related ErrSegmentNotFound = newMilvusError("segment not found", 600, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 96f64a545a..10ed505902 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -629,36 +629,33 @@ func WrapErrReplicaNotAvailable(id int64, msg ...string) error { } // Channel related -func WrapErrChannelNotFound(name string, msg ...string) error { - err := wrapFields(ErrChannelNotFound, value("channel", name)) + +func warpChannelErr(mErr milvusError, name string, msg ...string) error { + err := wrapFields(mErr, value("channel", name)) if len(msg) > 0 { err = errors.Wrap(err, strings.Join(msg, "->")) } return err } +func WrapErrChannelNotFound(name string, msg ...string) error { + return warpChannelErr(ErrChannelNotFound, name, msg...) +} + +func WrapErrChannelCPExceededMaxLag(name string, msg ...string) error { + return warpChannelErr(ErrChannelCPExceededMaxLag, name, msg...) +} + func WrapErrChannelLack(name string, msg ...string) error { - err := wrapFields(ErrChannelLack, value("channel", name)) - if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "->")) - } - return err + return warpChannelErr(ErrChannelLack, name, msg...) } func WrapErrChannelReduplicate(name string, msg ...string) error { - err := wrapFields(ErrChannelReduplicate, value("channel", name)) - if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "->")) - } - return err + return warpChannelErr(ErrChannelReduplicate, name, msg...) } func WrapErrChannelNotAvailable(name string, msg ...string) error { - err := wrapFields(ErrChannelNotAvailable, value("channel", name)) - if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "->")) - } - return err + return warpChannelErr(ErrChannelNotAvailable, name, msg...) } // Segment related diff --git a/pkg/util/paramtable/quota_param.go b/pkg/util/paramtable/quota_param.go index c7b1b86499..97a099fd2e 100644 --- a/pkg/util/paramtable/quota_param.go +++ b/pkg/util/paramtable/quota_param.go @@ -1581,15 +1581,10 @@ specific conditions, such as memory of nodes to water marker), ` + "true" + ` me Version: "2.2.0", DefaultValue: defaultMaxTtDelay, Formatter: func(v string) string { - if !p.TtProtectionEnabled.GetAsBool() { - return fmt.Sprintf("%d", math.MaxInt64) + if getAsFloat(v) < 0 { + return "0" } - delay := getAsFloat(v) - // (0, 65536) - if delay <= 0 || delay >= 65536 { - return defaultMaxTtDelay - } - return fmt.Sprintf("%f", delay) + return v }, Doc: `maxTimeTickDelay indicates the backpressure for DML Operations. DML rates would be reduced according to the ratio of time tick delay to maxTimeTickDelay, diff --git a/pkg/util/paramtable/quota_param_test.go b/pkg/util/paramtable/quota_param_test.go index 8fbf3ad1a2..2d7d747c31 100644 --- a/pkg/util/paramtable/quota_param_test.go +++ b/pkg/util/paramtable/quota_param_test.go @@ -190,7 +190,7 @@ func TestQuotaParam(t *testing.T) { t.Run("test limit writing", func(t *testing.T) { assert.False(t, qc.ForceDenyWriting.GetAsBool()) assert.Equal(t, false, qc.TtProtectionEnabled.GetAsBool()) - assert.Equal(t, math.MaxInt64, qc.MaxTimeTickDelay.GetAsInt()) + assert.Equal(t, 300, qc.MaxTimeTickDelay.GetAsInt()) assert.Equal(t, defaultLowWaterLevel, qc.DataNodeMemoryLowWaterLevel.GetAsFloat()) assert.Equal(t, defaultHighWaterLevel, qc.DataNodeMemoryHighWaterLevel.GetAsFloat()) assert.Equal(t, defaultLowWaterLevel, qc.QueryNodeMemoryLowWaterLevel.GetAsFloat())