milvus/internal/querynode/metrics_info.go
bigsheeper 8c510a52b2
Use replica memory size and cacheSize in getSystemInfoMetrics (#9579)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
2021-10-11 15:06:43 +08:00

137 lines
4.9 KiB
Go

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"context"
"errors"
"fmt"
"os"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, node *QueryNode) (*milvuspb.GetMetricsResponse, error) {
nodeInfos := metricsinfo.QueryNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, Params.QueryNodeID),
HardwareInfos: metricsinfo.HardwareMetrics{
IP: node.session.Address,
CPUCoreCount: metricsinfo.GetCPUCoreCount(false),
CPUCoreUsage: metricsinfo.GetCPUUsage(),
Memory: uint64(getUsedMemory(node.historical.replica, node.streaming.replica)),
MemoryUsage: uint64(getTotalMemory()),
Disk: metricsinfo.GetDiskCount(),
DiskUsage: metricsinfo.GetDiskUsage(),
},
SystemInfo: metricsinfo.DeployMetrics{
SystemVersion: os.Getenv(metricsinfo.GitCommitEnvKey),
DeployMode: os.Getenv(metricsinfo.DeployModeEnvKey),
},
// TODO(dragondriver): CreatedTime & UpdatedTime, easy but time-costing
Type: typeutil.QueryNodeRole,
},
SystemConfigurations: metricsinfo.QueryNodeConfiguration{
SearchReceiveBufSize: Params.SearchReceiveBufSize,
SearchPulsarBufSize: Params.SearchPulsarBufSize,
SearchResultReceiveBufSize: Params.SearchResultReceiveBufSize,
RetrieveReceiveBufSize: Params.RetrieveReceiveBufSize,
RetrievePulsarBufSize: Params.RetrievePulsarBufSize,
RetrieveResultReceiveBufSize: Params.RetrieveResultReceiveBufSize,
SimdType: Params.SimdType,
},
}
resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, Params.QueryNodeID),
}, nil
}
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, Params.QueryNodeID),
}, nil
}
func getUsedMemory(historicalReplica, streamingReplica ReplicaInterface) int64 {
historicalSegmentsMemSize := historicalReplica.getSegmentsMemSize()
streamingSegmentsMemSize := streamingReplica.getSegmentsMemSize()
return historicalSegmentsMemSize + streamingSegmentsMemSize
}
func getTotalMemory() int64 {
return Params.CacheSize * 1024 * 1024 * 1024
}
func checkSegmentMemory(segmentLoadInfos []*querypb.SegmentLoadInfo, historicalReplica, streamingReplica ReplicaInterface) error {
usedRAMInMB := getUsedMemory(historicalReplica, streamingReplica) / 1024.0 / 1024.0
totalRAMInMB := getTotalMemory() / 1024.0 / 1024.0
segmentTotalSize := int64(0)
for _, segInfo := range segmentLoadInfos {
collectionID := segInfo.CollectionID
segmentID := segInfo.SegmentID
col, err := historicalReplica.getCollectionByID(collectionID)
if err != nil {
return err
}
sizePerRecord, err := typeutil.EstimateSizePerRecord(col.schema)
if err != nil {
return err
}
segmentSize := int64(sizePerRecord) * segInfo.NumOfRows
segmentTotalSize += segmentSize / 1024.0 / 1024.0
// TODO: get threshold factor from param table
thresholdMemSize := float64(totalRAMInMB) * 0.7
log.Debug("memory stats when load segment",
zap.Any("collectionIDs", collectionID),
zap.Any("segmentID", segmentID),
zap.Any("numOfRows", segInfo.NumOfRows),
zap.Any("totalRAM(MB)", totalRAMInMB),
zap.Any("usedRAM(MB)", usedRAMInMB),
zap.Any("segmentTotalSize(MB)", segmentTotalSize),
zap.Any("thresholdMemSize(MB)", thresholdMemSize),
)
if usedRAMInMB+segmentTotalSize > int64(thresholdMemSize) {
return errors.New(fmt.Sprintln("load segment failed, OOM if load, "+
"collectionID = ", collectionID, ", ",
"usedRAM(MB) = ", usedRAMInMB, ", ",
"segmentTotalSize(MB) = ", segmentTotalSize, ", ",
"thresholdMemSize(MB) = ", thresholdMemSize))
}
}
return nil
}