milvus/internal/querynode/segment_loader.go
cai.zhang 5b42a3223c
Increase compatibility for EstimateMemorySize interface (#10603)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
2021-10-26 15:34:21 +08:00

495 lines
14 KiB
Go

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"context"
"errors"
"fmt"
"path"
"strconv"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
minioKV "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
)
const (
queryNodeSegmentMetaPrefix = "queryNode-segmentMeta"
)
// segmentLoader is only responsible for loading the field data from binlog
type segmentLoader struct {
historicalReplica ReplicaInterface
dataCoord types.DataCoord
minioKV kv.DataKV // minio minioKV
etcdKV *etcdkv.EtcdKV
indexLoader *indexLoader
}
func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest) error {
// no segment needs to load, return
if len(req.Infos) == 0 {
return nil
}
newSegments := make(map[UniqueID]*Segment)
segmentGC := func() {
for _, s := range newSegments {
deleteSegment(s)
}
}
segmentFieldBinLogs := make(map[UniqueID][]*datapb.FieldBinlog)
segmentIndexedFieldIDs := make(map[UniqueID][]FieldID)
segmentSizes := make(map[UniqueID]int64)
// prepare and estimate segments size
for _, info := range req.Infos {
segmentID := info.SegmentID
partitionID := info.PartitionID
collectionID := info.CollectionID
collection, err := loader.historicalReplica.getCollectionByID(collectionID)
if err != nil {
segmentGC()
return err
}
segment := newSegment(collection, segmentID, partitionID, collectionID, "", segmentTypeSealed, true)
newSegments[segmentID] = segment
fieldBinlog, indexedFieldID, err := loader.getFieldAndIndexInfo(segment, info)
if err != nil {
segmentGC()
return err
}
segmentSize, err := loader.estimateSegmentSize(segment, fieldBinlog, indexedFieldID)
if err != nil {
segmentGC()
return err
}
segmentFieldBinLogs[segmentID] = fieldBinlog
segmentIndexedFieldIDs[segmentID] = indexedFieldID
segmentSizes[segmentID] = segmentSize
}
// check memory limit
err := loader.checkSegmentSize(req.Infos[0].CollectionID, segmentSizes)
if err != nil {
segmentGC()
return err
}
// start to load
for _, info := range req.Infos {
segmentID := info.SegmentID
if newSegments[segmentID] == nil || segmentFieldBinLogs[segmentID] == nil || segmentIndexedFieldIDs[segmentID] == nil {
segmentGC()
return errors.New(fmt.Sprintln("unexpected error, cannot find load infos, this error should not happen, collectionID = ", req.Infos[0].CollectionID))
}
err = loader.loadSegmentInternal(newSegments[segmentID],
segmentFieldBinLogs[segmentID],
segmentIndexedFieldIDs[segmentID],
info)
if err != nil {
segmentGC()
return err
}
}
// set segments
for _, s := range newSegments {
err := loader.historicalReplica.setSegment(s)
if err != nil {
segmentGC()
return err
}
}
return nil
}
func (loader *segmentLoader) loadSegmentInternal(segment *Segment,
fieldBinLogs []*datapb.FieldBinlog,
indexFieldIDs []FieldID,
segmentLoadInfo *querypb.SegmentLoadInfo) error {
log.Debug("loading insert...")
err := loader.loadSegmentFieldsData(segment, fieldBinLogs)
if err != nil {
return err
}
pkIDField, err := loader.historicalReplica.getPKFieldIDByCollectionID(segment.collectionID)
if err != nil {
return err
}
if pkIDField == common.InvalidFieldID {
log.Warn("segment primary key field doesn't exist when load segment")
} else {
log.Debug("loading bloom filter...")
pkStatsBinlogs := loader.filterPKStatsBinlogs(segmentLoadInfo.Statslogs, pkIDField)
err = loader.loadSegmentBloomFilter(segment, pkStatsBinlogs)
if err != nil {
return err
}
}
log.Debug("loading delta...")
err = loader.loadDeltaLogs(segment, segmentLoadInfo.Deltalogs)
if err != nil {
return err
}
for _, id := range indexFieldIDs {
log.Debug("loading index...")
err = loader.indexLoader.loadIndex(segment, id)
if err != nil {
return err
}
}
return nil
}
func (loader *segmentLoader) filterPKStatsBinlogs(fieldBinlogs []*datapb.FieldBinlog, pkFieldID int64) []string {
result := make([]string, 0)
for _, fieldBinlog := range fieldBinlogs {
if fieldBinlog.FieldID == pkFieldID {
result = append(result, fieldBinlog.Binlogs...)
}
}
return result
}
func (loader *segmentLoader) filterFieldBinlogs(fieldBinlogs []*datapb.FieldBinlog, skipFieldIDs []int64) []*datapb.FieldBinlog {
result := make([]*datapb.FieldBinlog, 0)
for _, fieldBinlog := range fieldBinlogs {
if !funcutil.SliceContain(skipFieldIDs, fieldBinlog.FieldID) {
result = append(result, fieldBinlog)
}
}
return result
}
func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlogs []*datapb.FieldBinlog) error {
iCodec := storage.InsertCodec{}
defer func() {
err := iCodec.Close()
if err != nil {
log.Warn(err.Error())
}
}()
blobs := make([]*storage.Blob, 0)
for _, fb := range fieldBinlogs {
log.Debug("load segment fields data",
zap.Int64("segmentID", segment.segmentID),
zap.Any("fieldID", fb.FieldID),
zap.String("paths", fmt.Sprintln(fb.Binlogs)),
)
for _, path := range fb.Binlogs {
p := path
binLog, err := loader.minioKV.Load(path)
if err != nil {
// TODO: return or continue?
return err
}
blob := &storage.Blob{
Key: p,
Value: []byte(binLog),
}
blobs = append(blobs, blob)
}
}
_, _, insertData, err := iCodec.Deserialize(blobs)
if err != nil {
log.Warn(err.Error())
return err
}
for fieldID, value := range insertData.Data {
var numRows []int64
var data interface{}
switch fieldData := value.(type) {
case *storage.BoolFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.Int8FieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.Int16FieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.Int32FieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.Int64FieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.FloatFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.DoubleFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.StringFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.FloatVectorFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.BinaryVectorFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
default:
return errors.New("unexpected field data type")
}
if fieldID == common.TimeStampField {
segment.setIDBinlogRowSizes(numRows)
}
totalNumRows := int64(0)
for _, numRow := range numRows {
totalNumRows += numRow
}
err = segment.segmentLoadFieldData(fieldID, int(totalNumRows), data)
if err != nil {
// TODO: return or continue?
return err
}
}
return nil
}
func (loader *segmentLoader) loadSegmentBloomFilter(segment *Segment, binlogPaths []string) error {
if len(binlogPaths) == 0 {
log.Info("there are no stats logs saved with segment", zap.Any("segmentID", segment.segmentID))
return nil
}
values, err := loader.minioKV.MultiLoad(binlogPaths)
if err != nil {
return err
}
blobs := make([]*storage.Blob, 0)
for i := 0; i < len(values); i++ {
blobs = append(blobs, &storage.Blob{Value: []byte(values[i])})
}
stats, err := storage.DeserializeStats(blobs)
if err != nil {
return err
}
for _, stat := range stats {
if stat.BF == nil {
log.Warn("stat log with nil bloom filter", zap.Int64("segmentID", segment.segmentID), zap.Any("stat", stat))
continue
}
err = segment.pkFilter.Merge(stat.BF)
if err != nil {
return err
}
}
return nil
}
func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb.DeltaLogInfo) error {
if len(deltaLogs) == 0 {
log.Info("there are no delta logs saved with segment", zap.Any("segmentID", segment.segmentID))
return nil
}
dCodec := storage.DeleteCodec{}
blobs := make([]*storage.Blob, 0)
for _, deltaLog := range deltaLogs {
value, err := loader.minioKV.Load(deltaLog.DeltaLogPath)
if err != nil {
return err
}
blob := &storage.Blob{
Key: deltaLog.DeltaLogPath,
Value: []byte(value),
}
blobs = append(blobs, blob)
}
_, _, deltaData, err := dCodec.Deserialize(blobs)
if err != nil {
return err
}
rowCount := len(deltaData.Data)
pks := make([]int64, 0)
tss := make([]Timestamp, 0)
for pk, ts := range deltaData.Data {
pks = append(pks, pk)
tss = append(tss, Timestamp(ts))
}
err = segment.segmentLoadDeletedRecord(pks, tss, int64(rowCount))
if err != nil {
return err
}
return nil
}
// JoinIDPath joins ids to path format.
func JoinIDPath(ids ...UniqueID) string {
idStr := make([]string, len(ids))
for _, id := range ids {
idStr = append(idStr, strconv.FormatInt(id, 10))
}
return path.Join(idStr...)
}
func (loader *segmentLoader) getFieldAndIndexInfo(segment *Segment,
segmentLoadInfo *querypb.SegmentLoadInfo) ([]*datapb.FieldBinlog, []FieldID, error) {
collectionID := segment.collectionID
vectorFieldIDs, err := loader.historicalReplica.getVecFieldIDsByCollectionID(collectionID)
if err != nil {
return nil, nil, err
}
if len(vectorFieldIDs) <= 0 {
return nil, nil, fmt.Errorf("no vector field in collection %d", collectionID)
}
// add VectorFieldInfo for vector fields
for _, fieldBinlog := range segmentLoadInfo.BinlogPaths {
if funcutil.SliceContain(vectorFieldIDs, fieldBinlog.FieldID) {
vectorFieldInfo := newVectorFieldInfo(fieldBinlog)
segment.setVectorFieldInfo(fieldBinlog.FieldID, vectorFieldInfo)
}
}
indexedFieldIDs := make([]FieldID, 0)
for _, vecFieldID := range vectorFieldIDs {
err = loader.indexLoader.setIndexInfo(collectionID, segment, vecFieldID)
if err != nil {
log.Warn(err.Error())
continue
}
indexedFieldIDs = append(indexedFieldIDs, vecFieldID)
}
// we don't need to load raw data for indexed vector field
fieldBinlogs := loader.filterFieldBinlogs(segmentLoadInfo.BinlogPaths, indexedFieldIDs)
return fieldBinlogs, indexedFieldIDs, nil
}
func (loader *segmentLoader) estimateSegmentSize(segment *Segment,
fieldBinLogs []*datapb.FieldBinlog,
indexFieldIDs []FieldID) (int64, error) {
segmentSize := int64(0)
// get fields data size, if len(indexFieldIDs) == 0, vector field would be involved in fieldBinLogs
for _, fb := range fieldBinLogs {
log.Debug("estimate segment fields size",
zap.Any("collectionID", segment.collectionID),
zap.Any("segmentID", segment.ID()),
zap.Any("fieldID", fb.FieldID),
zap.Any("paths", fb.Binlogs),
)
for _, binlogPath := range fb.Binlogs {
logSize, err := storage.EstimateMemorySize(loader.minioKV, binlogPath)
if err != nil {
logSize, err = storage.GetBinlogSize(loader.minioKV, binlogPath)
if err != nil {
return 0, err
}
}
segmentSize += logSize
}
}
// get index size
for _, fieldID := range indexFieldIDs {
indexSize, err := loader.indexLoader.estimateIndexBinlogSize(segment, fieldID)
if err != nil {
return 0, err
}
segmentSize += indexSize
}
return segmentSize, nil
}
func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentSizes map[UniqueID]int64) error {
const thresholdFactor = 0.9
usedMem, err := getUsedMemory()
if err != nil {
return err
}
totalMem, err := getTotalMemory()
if err != nil {
return err
}
segmentTotalSize := int64(0)
for _, size := range segmentSizes {
segmentTotalSize += size
}
for segmentID, size := range segmentSizes {
log.Debug("memory stats when load segment",
zap.Any("collectionIDs", collectionID),
zap.Any("segmentID", segmentID),
zap.Any("totalMem", totalMem),
zap.Any("usedMem", usedMem),
zap.Any("segmentTotalSize", segmentTotalSize),
zap.Any("currentSegmentSize", size),
zap.Any("thresholdFactor", thresholdFactor),
)
if int64(usedMem)+segmentTotalSize+size > int64(float64(totalMem)*thresholdFactor) {
return errors.New(fmt.Sprintln("load segment failed, OOM if load, "+
"collectionID = ", collectionID, ", ",
"usedMem = ", usedMem, ", ",
"segmentTotalSize = ", segmentTotalSize, ", ",
"currentSegmentSize = ", size, ", ",
"totalMem = ", totalMem, ", ",
"thresholdFactor = ", thresholdFactor,
))
}
}
return nil
}
func newSegmentLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface, etcdKV *etcdkv.EtcdKV) *segmentLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSLStr,
CreateBucket: true,
BucketName: Params.MinioBucketName,
}
client, err := minioKV.NewMinIOKV(ctx, option)
if err != nil {
panic(err)
}
iLoader := newIndexLoader(ctx, rootCoord, indexCoord, replica)
return &segmentLoader{
historicalReplica: replica,
minioKV: client,
etcdKV: etcdKV,
indexLoader: iLoader,
}
}