diff --git a/go.mod b/go.mod index cdfd5c2da2..365c79f1d4 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/antonmedv/expr v1.8.9 github.com/apache/pulsar-client-go v0.5.0 github.com/bits-and-blooms/bitset v1.2.0 // indirect - github.com/bits-and-blooms/bloom/v3 v3.0.1 // indirect + github.com/bits-and-blooms/bloom/v3 v3.0.1 github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect @@ -39,7 +39,7 @@ require ( go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.0 go.etcd.io/etcd/server/v3 v3.5.0 - go.uber.org/atomic v1.7.0 // indirect + go.uber.org/atomic v1.7.0 go.uber.org/zap v1.17.0 golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 google.golang.org/grpc v1.38.0 diff --git a/internal/datacoord/grpc_services.go b/internal/datacoord/grpc_services.go index 52db243b6a..cefa718787 100644 --- a/internal/datacoord/grpc_services.go +++ b/internal/datacoord/grpc_services.go @@ -472,6 +472,13 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest zap.String("metric_type", metricType)) if metricType == metricsinfo.SystemInfoMetrics { + ret, err := s.metricsCacheManager.GetSystemInfoMetrics() + if err == nil && ret != nil { + return ret, nil + } + log.Debug("failed to get system info metrics from cache, recompute instead", + zap.Error(err)) + metrics, err := s.getSystemInfoMetrics(ctx, req) log.Debug("DataCoord.GetMetrics", @@ -481,6 +488,8 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large zap.Error(err)) + s.metricsCacheManager.UpdateSystemInfoMetrics(metrics) + return metrics, err } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 485c09490d..608e2c215b 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -20,6 +20,8 @@ import ( "sync/atomic" "time" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" "github.com/milvus-io/milvus/internal/logutil" @@ -93,6 +95,8 @@ type Server struct { rootCoordClient types.RootCoord ddChannelName string + metricsCacheManager *metricsinfo.MetricsCacheManager + flushCh chan UniqueID msFactory msgstream.Factory @@ -138,6 +142,8 @@ func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option dataClientCreator: defaultDataNodeCreatorFunc, rootCoordClientCreator: defaultRootCoordCreatorFunc, helper: defaultServerHelper(), + + metricsCacheManager: metricsinfo.NewMetricsCacheManager(), } for _, opt := range opts { @@ -431,11 +437,13 @@ func (s *Server) startWatchService(ctx context.Context) { zap.String("address", info.Address), zap.Int64("serverID", info.Version)) s.cluster.Register(node) + s.metricsCacheManager.InvalidateSystemInfoMetrics() case sessionutil.SessionDelEvent: log.Info("received datanode unregister", zap.String("address", info.Address), zap.Int64("serverID", info.Version)) s.cluster.UnRegister(node) + s.metricsCacheManager.InvalidateSystemInfoMetrics() default: log.Warn("receive unknown service event type", zap.Any("type", event.EventType)) diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 87d736811c..c0868958ec 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -72,6 +72,8 @@ type IndexCoord struct { metaTable *metaTable nodeManager *NodeManager + metricsCacheManager *metricsinfo.MetricsCacheManager + nodeLock sync.RWMutex // Add callback functions at different stages @@ -186,6 +188,9 @@ func (i *IndexCoord) Init() error { return err } log.Debug("IndexCoord new task scheduler success") + + i.metricsCacheManager = metricsinfo.NewMetricsCacheManager() + i.UpdateStateCode(internalpb.StateCode_Healthy) log.Debug("IndexCoord", zap.Any("State", i.stateCode.Load())) @@ -478,6 +483,13 @@ func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReq zap.String("metric_type", metricType)) if metricType == metricsinfo.SystemInfoMetrics { + ret, err := i.metricsCacheManager.GetSystemInfoMetrics() + if err == nil && ret != nil { + return ret, nil + } + log.Debug("failed to get system info metrics from cache, recompute instead", + zap.Error(err)) + metrics, err := getSystemInfoMetrics(ctx, req, i) log.Debug("IndexCoord.GetMetrics", @@ -487,6 +499,8 @@ func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReq zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large zap.Error(err)) + i.metricsCacheManager.UpdateSystemInfoMetrics(metrics) + return metrics, err } @@ -599,10 +613,12 @@ func (i *IndexCoord) watchNodeLoop() { } log.Debug("IndexCoord", zap.Any("IndexNode number", len(i.nodeManager.nodeClients))) }() + i.metricsCacheManager.InvalidateSystemInfoMetrics() case sessionutil.SessionDelEvent: serverID := event.Session.ServerID log.Debug("IndexCoord watchNodeLoop SessionDelEvent", zap.Any("serverID", serverID)) i.nodeManager.RemoveNode(serverID) + i.metricsCacheManager.InvalidateSystemInfoMetrics() } } } diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 8deabf3b32..cb53468fbe 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2202,6 +2202,13 @@ func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReque zap.String("metric_type", metricType)) if metricType == metricsinfo.SystemInfoMetrics { + ret, err := node.metricsCacheManager.GetSystemInfoMetrics() + if err == nil && ret != nil { + return ret, nil + } + log.Debug("failed to get system info metrics from cache, recompute instead", + zap.Error(err)) + metrics, err := getSystemInfoMetrics(ctx, req, node) log.Debug("Proxy.GetMetrics", @@ -2211,6 +2218,8 @@ func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReque zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large zap.Error(err)) + node.metricsCacheManager.UpdateSystemInfoMetrics(metrics) + return metrics, err } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 08a4c8c53e..6461b3a81d 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -20,6 +20,8 @@ import ( "sync/atomic" "time" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/metrics" "go.uber.org/zap" @@ -70,6 +72,8 @@ type Proxy struct { tsoAllocator *TimestampAllocator segAssigner *SegIDAssigner + metricsCacheManager *metricsinfo.MetricsCacheManager + session *sessionutil.Session msFactory msgstream.Factory @@ -261,6 +265,8 @@ func (node *Proxy) Init() error { node.chTicker = newChannelsTimeTicker(node.ctx, channelMgrTickerInterval, []string{}, node.sched.getPChanStatistics, tsoAllocator) + node.metricsCacheManager = metricsinfo.NewMetricsCacheManager() + return nil } diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index 80f196815e..6bacead8ab 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -548,6 +548,13 @@ func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe zap.String("metric_type", metricType)) if metricType == metricsinfo.SystemInfoMetrics { + ret, err := qc.metricsCacheManager.GetSystemInfoMetrics() + if err == nil && ret != nil { + return ret, nil + } + log.Debug("failed to get system info metrics from cache, recompute instead", + zap.Error(err)) + metrics, err := getSystemInfoMetrics(ctx, req, qc) log.Debug("QueryCoord.GetMetrics", @@ -557,6 +564,8 @@ func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large zap.Error(err)) + qc.metricsCacheManager.UpdateSystemInfoMetrics(metrics) + return metrics, err } log.Debug("QueryCoord.GetMetrics failed, request metric type is not implemented yet", diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 475d72090b..cd5a067e18 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -20,6 +20,8 @@ import ( "sync/atomic" "time" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/golang/protobuf/proto" "go.etcd.io/etcd/api/v3/mvccpb" "go.uber.org/zap" @@ -55,6 +57,8 @@ type QueryCoord struct { newNodeFn newQueryNodeFn scheduler *TaskScheduler + metricsCacheManager *metricsinfo.MetricsCacheManager + dataCoordClient types.DataCoord rootCoordClient types.RootCoord @@ -111,6 +115,8 @@ func (qc *QueryCoord) Init() error { return err } + qc.metricsCacheManager = metricsinfo.NewMetricsCacheManager() + return nil } @@ -240,6 +246,7 @@ func (qc *QueryCoord) watchNodeLoop() { if err != nil { log.Error("query node failed to register", zap.Int64("nodeID", serverID), zap.String("error info", err.Error())) } + qc.metricsCacheManager.InvalidateSystemInfoMetrics() case sessionutil.SessionDelEvent: serverID := event.Session.ServerID log.Debug("get a del event after queryNode down", zap.Int64("nodeID", serverID)) @@ -272,6 +279,7 @@ func (qc *QueryCoord) watchNodeLoop() { meta: qc.meta, } qc.scheduler.Enqueue([]task{loadBalanceTask}) + qc.metricsCacheManager.InvalidateSystemInfoMetrics() } } } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 2deb7dcc60..c5e1f2def4 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -130,6 +130,9 @@ type Core struct { // proxy clients proxyClientManager *proxyClientManager + // metrics cache manager + metricsCacheManager *metricsinfo.MetricsCacheManager + // channel timetick chanTimeTick *timetickSync @@ -994,6 +997,8 @@ func (c *Core) Init() error { c.proxyManager.AddSession(c.chanTimeTick.AddProxy, c.proxyClientManager.AddProxyClient) c.proxyManager.DelSession(c.chanTimeTick.DelProxy, c.proxyClientManager.DelProxyClient) + c.metricsCacheManager = metricsinfo.NewMetricsCacheManager() + initError = c.setMsgStreams() if initError != nil { return @@ -1986,6 +1991,13 @@ func (c *Core) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) zap.String("metric_type", metricType)) if metricType == metricsinfo.SystemInfoMetrics { + ret, err := c.metricsCacheManager.GetSystemInfoMetrics() + if err == nil && ret != nil { + return ret, nil + } + log.Debug("failed to get system info metrics from cache, recompute instead", + zap.Error(err)) + systemInfoMetrics, err := c.getSystemInfoMetrics(ctx, req) log.Debug("RootCoord.GetMetrics", @@ -1995,6 +2007,8 @@ func (c *Core) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) zap.Any("systemInfoMetrics", systemInfoMetrics), // TODO(dragondriver): necessary? may be very large zap.Error(err)) + c.metricsCacheManager.UpdateSystemInfoMetrics(systemInfoMetrics) + return systemInfoMetrics, err } diff --git a/internal/util/metricsinfo/cache.go b/internal/util/metricsinfo/cache.go new file mode 100644 index 0000000000..b9a3bf5187 --- /dev/null +++ b/internal/util/metricsinfo/cache.go @@ -0,0 +1,109 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package metricsinfo + +import ( + "sync" + "time" + + "github.com/milvus-io/milvus/internal/proto/milvuspb" +) + +// TODO(dragondriver): load from config file +const DefaultMetricsRetention = time.Second * 5 + +// TODO(dragondriver): we can use a map to manage the metrics if there are too many kind metrics +type MetricsCacheManager struct { + systemInfoMetrics *milvuspb.GetMetricsResponse + systemInfoMetricsInvalid bool + systemInfoMetricsLastUpdatedTime time.Time + systemInfoMetricsMtx sync.RWMutex + + retention time.Duration + retentionMtx sync.RWMutex // necessary? +} + +func NewMetricsCacheManager() *MetricsCacheManager { + manager := &MetricsCacheManager{ + systemInfoMetrics: nil, + systemInfoMetricsInvalid: false, + systemInfoMetricsLastUpdatedTime: time.Now(), + systemInfoMetricsMtx: sync.RWMutex{}, + retention: DefaultMetricsRetention, + } + + return manager +} + +func (manager *MetricsCacheManager) GetRetention() time.Duration { + manager.retentionMtx.RLock() + defer manager.retentionMtx.RUnlock() + + return manager.retention +} + +func (manager *MetricsCacheManager) SetRetention(retention time.Duration) { + manager.retentionMtx.Lock() + defer manager.retentionMtx.Unlock() + + manager.retention = retention +} + +func (manager *MetricsCacheManager) ResetRetention() { + manager.retentionMtx.Lock() + defer manager.retentionMtx.Unlock() + + manager.retention = DefaultMetricsRetention +} + +func (manager *MetricsCacheManager) InvalidateSystemInfoMetrics() { + manager.systemInfoMetricsMtx.Lock() + defer manager.systemInfoMetricsMtx.Unlock() + + manager.systemInfoMetricsInvalid = true +} + +func (manager *MetricsCacheManager) IsSystemInfoMetricsValid() bool { + retention := manager.GetRetention() + + manager.systemInfoMetricsMtx.RLock() + defer manager.systemInfoMetricsMtx.RUnlock() + + return (!manager.systemInfoMetricsInvalid) && + (manager.systemInfoMetrics != nil) && + (time.Since(manager.systemInfoMetricsLastUpdatedTime) < retention) +} + +func (manager *MetricsCacheManager) GetSystemInfoMetrics() (*milvuspb.GetMetricsResponse, error) { + retention := manager.GetRetention() + + manager.systemInfoMetricsMtx.RLock() + defer manager.systemInfoMetricsMtx.RUnlock() + + if manager.systemInfoMetricsInvalid || + manager.systemInfoMetrics == nil || + time.Since(manager.systemInfoMetricsLastUpdatedTime) >= retention { + + return nil, ErrInvalidSystemInfosMetricCache + } + + return manager.systemInfoMetrics, nil +} + +func (manager *MetricsCacheManager) UpdateSystemInfoMetrics(infos *milvuspb.GetMetricsResponse) { + manager.systemInfoMetricsMtx.Lock() + defer manager.systemInfoMetricsMtx.Unlock() + + manager.systemInfoMetrics = infos + manager.systemInfoMetricsInvalid = false + manager.systemInfoMetricsLastUpdatedTime = time.Now() +} diff --git a/internal/util/metricsinfo/cache_test.go b/internal/util/metricsinfo/cache_test.go new file mode 100644 index 0000000000..deff1e80a2 --- /dev/null +++ b/internal/util/metricsinfo/cache_test.go @@ -0,0 +1,165 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package metricsinfo + +import ( + "testing" + "time" + + "github.com/milvus-io/milvus/internal/proto/milvuspb" + + "github.com/stretchr/testify/assert" +) + +func TestNewMetricsCacheManager(t *testing.T) { + manager := NewMetricsCacheManager() + assert.NotNil(t, manager) +} + +func TestMetricsCacheManager_GetRetention(t *testing.T) { + manager := NewMetricsCacheManager() + assert.NotNil(t, manager) + + assert.Equal(t, DefaultMetricsRetention, manager.GetRetention()) + + retention := time.Second * 3 + manager.SetRetention(retention) + assert.Equal(t, retention, manager.GetRetention()) + + manager.ResetRetention() + assert.Equal(t, DefaultMetricsRetention, manager.GetRetention()) +} + +func TestMetricsCacheManager_SetRetention(t *testing.T) { + manager := NewMetricsCacheManager() + assert.NotNil(t, manager) + + retention := time.Second * 3 + manager.SetRetention(retention) + assert.Equal(t, retention, manager.GetRetention()) +} + +func TestMetricsCacheManager_ResetRetention(t *testing.T) { + manager := NewMetricsCacheManager() + assert.NotNil(t, manager) + + assert.Equal(t, DefaultMetricsRetention, manager.GetRetention()) + + retention := time.Second * 3 + manager.SetRetention(retention) + assert.Equal(t, retention, manager.GetRetention()) + + manager.ResetRetention() + assert.Equal(t, DefaultMetricsRetention, manager.GetRetention()) +} + +func TestMetricsCacheManager_InvalidateSystemInfoMetrics(t *testing.T) { + manager := NewMetricsCacheManager() + assert.NotNil(t, manager) + + manager.InvalidateSystemInfoMetrics() + assert.Equal(t, true, manager.systemInfoMetricsInvalid) + assert.Equal(t, false, manager.IsSystemInfoMetricsValid()) +} + +func TestMetricsCacheManager_IsSystemInfoMetricsValid(t *testing.T) { + manager := NewMetricsCacheManager() + assert.NotNil(t, manager) + + manager.InvalidateSystemInfoMetrics() + assert.Equal(t, false, manager.IsSystemInfoMetricsValid()) + + bigRetention := time.Hour * 24 + smallRetention := time.Millisecond + + manager.SetRetention(bigRetention) + manager.UpdateSystemInfoMetrics(&milvuspb.GetMetricsResponse{}) + assert.Equal(t, true, manager.IsSystemInfoMetricsValid()) + + manager.UpdateSystemInfoMetrics(nil) + assert.Equal(t, false, manager.IsSystemInfoMetricsValid()) + + manager.SetRetention(smallRetention) + manager.UpdateSystemInfoMetrics(&milvuspb.GetMetricsResponse{}) + time.Sleep(smallRetention) + assert.Equal(t, false, manager.IsSystemInfoMetricsValid()) +} + +func TestMetricsCacheManager_UpdateSystemInfoMetrics(t *testing.T) { + manager := NewMetricsCacheManager() + assert.NotNil(t, manager) + + manager.InvalidateSystemInfoMetrics() + assert.Equal(t, false, manager.IsSystemInfoMetricsValid()) + resp, err := manager.GetSystemInfoMetrics() + assert.NotNil(t, err) + assert.Nil(t, resp) + + bigRetention := time.Hour * 24 + smallRetention := time.Millisecond + + manager.SetRetention(bigRetention) + manager.UpdateSystemInfoMetrics(&milvuspb.GetMetricsResponse{}) + assert.Equal(t, true, manager.IsSystemInfoMetricsValid()) + resp, err = manager.GetSystemInfoMetrics() + assert.Nil(t, err) + assert.NotNil(t, resp) + + manager.UpdateSystemInfoMetrics(nil) + assert.Equal(t, false, manager.IsSystemInfoMetricsValid()) + resp, err = manager.GetSystemInfoMetrics() + assert.NotNil(t, err) + assert.Nil(t, resp) + + manager.SetRetention(smallRetention) + manager.UpdateSystemInfoMetrics(&milvuspb.GetMetricsResponse{}) + time.Sleep(smallRetention) + assert.Equal(t, false, manager.IsSystemInfoMetricsValid()) + resp, err = manager.GetSystemInfoMetrics() + assert.NotNil(t, err) + assert.Nil(t, resp) +} + +func TestMetricsCacheManager_GetSystemInfoMetrics(t *testing.T) { + manager := NewMetricsCacheManager() + assert.NotNil(t, manager) + + manager.InvalidateSystemInfoMetrics() + assert.Equal(t, false, manager.IsSystemInfoMetricsValid()) + resp, err := manager.GetSystemInfoMetrics() + assert.NotNil(t, err) + assert.Nil(t, resp) + + bigRetention := time.Hour * 24 + smallRetention := time.Millisecond + + manager.SetRetention(bigRetention) + manager.UpdateSystemInfoMetrics(&milvuspb.GetMetricsResponse{}) + assert.Equal(t, true, manager.IsSystemInfoMetricsValid()) + resp, err = manager.GetSystemInfoMetrics() + assert.Nil(t, err) + assert.NotNil(t, resp) + + manager.UpdateSystemInfoMetrics(nil) + assert.Equal(t, false, manager.IsSystemInfoMetricsValid()) + resp, err = manager.GetSystemInfoMetrics() + assert.NotNil(t, err) + assert.Nil(t, resp) + + manager.SetRetention(smallRetention) + manager.UpdateSystemInfoMetrics(&milvuspb.GetMetricsResponse{}) + time.Sleep(smallRetention) + assert.Equal(t, false, manager.IsSystemInfoMetricsValid()) + resp, err = manager.GetSystemInfoMetrics() + assert.NotNil(t, err) + assert.Nil(t, resp) +} diff --git a/internal/util/metricsinfo/err.go b/internal/util/metricsinfo/err.go index 037d8796b0..c6e2927459 100644 --- a/internal/util/metricsinfo/err.go +++ b/internal/util/metricsinfo/err.go @@ -13,6 +13,10 @@ package metricsinfo import "errors" -const MsgUnimplementedMetric = "sorry, but this metric type is not implemented" +const ( + MsgUnimplementedMetric = "sorry, but this metric type is not implemented" + MsgInvalidSystemInfosMetricCache = "system infos metric is invalid" +) var ErrUnimplementedMetric = errors.New(MsgUnimplementedMetric) +var ErrInvalidSystemInfosMetricCache = errors.New(MsgInvalidSystemInfosMetricCache)