mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
c02fb64ad6
Allows proactive warming up of chunk cache. Original vector data will be asynchronously loaded into the chunk cache during the load process. It has the potential to significantly reduce query/search latency for a certain duration after the load, albeit with a concurrent increase in disk usage. issue: https://github.com/milvus-io/milvus/issues/30181 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
1538 lines
48 KiB
Go
1538 lines
48 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package segments
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"path"
|
|
"runtime/debug"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"github.com/samber/lo"
|
|
"go.opentelemetry.io/otel"
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
milvus_storage "github.com/milvus-io/milvus-storage/go/storage"
|
|
"github.com/milvus-io/milvus-storage/go/storage/options"
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
|
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
|
|
"github.com/milvus-io/milvus/internal/storage"
|
|
typeutil_internal "github.com/milvus-io/milvus/internal/util/typeutil"
|
|
"github.com/milvus-io/milvus/pkg/common"
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/metrics"
|
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
|
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
|
"github.com/milvus-io/milvus/pkg/util/hardware"
|
|
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
|
)
|
|
|
|
const (
|
|
UsedDiskMemoryRatio = 4
|
|
)
|
|
|
|
type Loader interface {
|
|
// Load loads binlogs, and spawn segments,
|
|
// NOTE: make sure the ref count of the corresponding collection will never go down to 0 during this
|
|
Load(ctx context.Context, collectionID int64, segmentType SegmentType, version int64, segments ...*querypb.SegmentLoadInfo) ([]Segment, error)
|
|
|
|
LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error
|
|
|
|
// LoadBloomFilterSet loads needed statslog for RemoteSegment.
|
|
LoadBloomFilterSet(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error)
|
|
|
|
// LoadIndex append index for segment and remove vector binlogs.
|
|
LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo, version int64) error
|
|
}
|
|
|
|
type LoadResource struct {
|
|
MemorySize uint64
|
|
DiskSize uint64
|
|
}
|
|
|
|
func (r *LoadResource) Add(resource LoadResource) {
|
|
r.MemorySize += resource.MemorySize
|
|
r.DiskSize += resource.DiskSize
|
|
}
|
|
|
|
func (r *LoadResource) Sub(resource LoadResource) {
|
|
r.MemorySize -= resource.MemorySize
|
|
r.DiskSize -= resource.DiskSize
|
|
}
|
|
|
|
type segmentLoaderV2 struct {
|
|
*segmentLoader
|
|
}
|
|
|
|
func NewLoaderV2(
|
|
manager *Manager,
|
|
cm storage.ChunkManager,
|
|
) *segmentLoaderV2 {
|
|
return &segmentLoaderV2{
|
|
segmentLoader: NewLoader(manager, cm),
|
|
}
|
|
}
|
|
|
|
func (loader *segmentLoaderV2) LoadDelta(ctx context.Context, collectionID int64, segment *LocalSegment) error {
|
|
collection := loader.manager.Collection.Get(collectionID)
|
|
if collection == nil {
|
|
err := merr.WrapErrCollectionNotFound(collectionID)
|
|
log.Warn("failed to get collection while loading delta", zap.Error(err))
|
|
return err
|
|
}
|
|
return segment.LoadDeltaData2(ctx, collection.Schema())
|
|
}
|
|
|
|
func (loader *segmentLoaderV2) Load(ctx context.Context,
|
|
collectionID int64,
|
|
segmentType SegmentType,
|
|
version int64,
|
|
segments ...*querypb.SegmentLoadInfo,
|
|
) ([]Segment, error) {
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("collectionID", collectionID),
|
|
zap.String("segmentType", segmentType.String()),
|
|
)
|
|
|
|
if len(segments) == 0 {
|
|
log.Info("no segment to load")
|
|
return nil, nil
|
|
}
|
|
// Filter out loaded & loading segments
|
|
infos := loader.prepare(segmentType, version, segments...)
|
|
defer loader.unregister(infos...)
|
|
|
|
log.With(
|
|
zap.Int64s("requestSegments", lo.Map(segments, func(s *querypb.SegmentLoadInfo, _ int) int64 { return s.GetSegmentID() })),
|
|
zap.Int64s("preparedSegments", lo.Map(infos, func(s *querypb.SegmentLoadInfo, _ int) int64 { return s.GetSegmentID() })),
|
|
)
|
|
|
|
// continue to wait other task done
|
|
log.Info("start loading...", zap.Int("segmentNum", len(segments)), zap.Int("afterFilter", len(infos)))
|
|
|
|
// Check memory & storage limit
|
|
resource, concurrencyLevel, err := loader.requestResource(ctx, infos...)
|
|
if err != nil {
|
|
log.Warn("request resource failed", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
defer loader.freeRequest(resource)
|
|
|
|
newSegments := typeutil.NewConcurrentMap[int64, Segment]()
|
|
loaded := typeutil.NewConcurrentMap[int64, Segment]()
|
|
defer func() {
|
|
newSegments.Range(func(_ int64, s Segment) bool {
|
|
s.Release()
|
|
return true
|
|
})
|
|
debug.FreeOSMemory()
|
|
}()
|
|
|
|
for _, info := range infos {
|
|
segmentID := info.GetSegmentID()
|
|
partitionID := info.GetPartitionID()
|
|
collectionID := info.GetCollectionID()
|
|
shard := info.GetInsertChannel()
|
|
|
|
collection := loader.manager.Collection.Get(collectionID)
|
|
if collection == nil {
|
|
err := merr.WrapErrCollectionNotFound(collectionID)
|
|
log.Warn("failed to get collection", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
segment, err := NewSegmentV2(ctx, collection, segmentID, partitionID, collectionID, shard, segmentType, version, info.GetStartPosition(), info.GetDeltaPosition(), info.GetStorageVersion(), info.GetLevel())
|
|
if err != nil {
|
|
log.Warn("load segment failed when create new segment",
|
|
zap.Int64("partitionID", partitionID),
|
|
zap.Int64("segmentID", segmentID),
|
|
zap.Error(err),
|
|
)
|
|
return nil, err
|
|
}
|
|
|
|
newSegments.Insert(segmentID, segment)
|
|
}
|
|
|
|
loadSegmentFunc := func(idx int) error {
|
|
loadInfo := infos[idx]
|
|
partitionID := loadInfo.PartitionID
|
|
segmentID := loadInfo.SegmentID
|
|
segment, _ := newSegments.Get(segmentID)
|
|
|
|
tr := timerecord.NewTimeRecorder("loadDurationPerSegment")
|
|
|
|
var err error
|
|
if loadInfo.GetLevel() == datapb.SegmentLevel_L0 {
|
|
err = loader.LoadDelta(ctx, collectionID, segment.(*LocalSegment))
|
|
} else {
|
|
err = loader.loadSegment(ctx, segment.(*LocalSegment), loadInfo)
|
|
}
|
|
if err != nil {
|
|
log.Warn("load segment failed when load data into memory",
|
|
zap.Int64("partitionID", partitionID),
|
|
zap.Int64("segmentID", segmentID),
|
|
zap.Error(err),
|
|
)
|
|
return err
|
|
}
|
|
loader.manager.Segment.Put(segmentType, segment)
|
|
newSegments.GetAndRemove(segmentID)
|
|
loaded.Insert(segmentID, segment)
|
|
log.Info("load segment done", zap.Int64("segmentID", segmentID))
|
|
loader.notifyLoadFinish(loadInfo)
|
|
|
|
metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(tr.ElapseSpan().Seconds())
|
|
return nil
|
|
}
|
|
|
|
// Start to load,
|
|
// Make sure we can always benefit from concurrency, and not spawn too many idle goroutines
|
|
log.Info("start to load segments in parallel",
|
|
zap.Int("segmentNum", len(infos)),
|
|
zap.Int("concurrencyLevel", concurrencyLevel))
|
|
err = funcutil.ProcessFuncParallel(len(infos),
|
|
concurrencyLevel, loadSegmentFunc, "loadSegmentFunc")
|
|
if err != nil {
|
|
log.Warn("failed to load some segments", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
// Wait for all segments loaded
|
|
if err := loader.waitSegmentLoadDone(ctx, segmentType, lo.Map(segments, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })...); err != nil {
|
|
log.Warn("failed to wait the filtered out segments load done", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
log.Info("all segment load done")
|
|
var result []Segment
|
|
loaded.Range(func(_ int64, s Segment) bool {
|
|
result = append(result, s)
|
|
return true
|
|
})
|
|
return result, nil
|
|
}
|
|
|
|
func (loader *segmentLoaderV2) LoadBloomFilterSet(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error) {
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("collectionID", collectionID),
|
|
zap.Int64s("segmentIDs", lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 {
|
|
return info.GetSegmentID()
|
|
})),
|
|
)
|
|
|
|
segmentNum := len(infos)
|
|
if segmentNum == 0 {
|
|
log.Info("no segment to load")
|
|
return nil, nil
|
|
}
|
|
|
|
collection := loader.manager.Collection.Get(collectionID)
|
|
if collection == nil {
|
|
err := merr.WrapErrCollectionNotFound(collectionID)
|
|
log.Warn("failed to get collection while loading segment", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
log.Info("start loading remote...", zap.Int("segmentNum", segmentNum))
|
|
|
|
loadedBfs := typeutil.NewConcurrentSet[*pkoracle.BloomFilterSet]()
|
|
// TODO check memory for bf size
|
|
loadRemoteFunc := func(idx int) error {
|
|
loadInfo := infos[idx]
|
|
partitionID := loadInfo.PartitionID
|
|
segmentID := loadInfo.SegmentID
|
|
bfs := pkoracle.NewBloomFilterSet(segmentID, partitionID, commonpb.SegmentState_Sealed)
|
|
|
|
log.Info("loading bloom filter for remote...")
|
|
err := loader.loadBloomFilter(ctx, segmentID, bfs, loadInfo.StorageVersion)
|
|
if err != nil {
|
|
log.Warn("load remote segment bloom filter failed",
|
|
zap.Int64("partitionID", partitionID),
|
|
zap.Int64("segmentID", segmentID),
|
|
zap.Error(err),
|
|
)
|
|
return err
|
|
}
|
|
loadedBfs.Insert(bfs)
|
|
|
|
return nil
|
|
}
|
|
|
|
err := funcutil.ProcessFuncParallel(segmentNum, segmentNum, loadRemoteFunc, "loadRemoteFunc")
|
|
if err != nil {
|
|
// no partial success here
|
|
log.Warn("failed to load remote segment", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
return loadedBfs.Collect(), nil
|
|
}
|
|
|
|
func (loader *segmentLoaderV2) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet,
|
|
storeVersion int64,
|
|
) error {
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("segmentID", segmentID),
|
|
)
|
|
|
|
startTs := time.Now()
|
|
|
|
url, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), segmentID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
space, err := milvus_storage.Open(url, options.NewSpaceOptionBuilder().SetVersion(storeVersion).Build())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
statsBlobs := space.StatisticsBlobs()
|
|
blobs := []*storage.Blob{}
|
|
|
|
for _, statsBlob := range statsBlobs {
|
|
blob := make([]byte, statsBlob.Size)
|
|
_, err := space.ReadBlob(statsBlob.Name, blob)
|
|
if err != nil && err != io.EOF {
|
|
return err
|
|
}
|
|
|
|
blobs = append(blobs, &storage.Blob{Value: blob})
|
|
}
|
|
|
|
var stats []*storage.PrimaryKeyStats
|
|
|
|
stats, err = storage.DeserializeStats(blobs)
|
|
if err != nil {
|
|
log.Warn("failed to deserialize stats", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
var size uint
|
|
for _, stat := range stats {
|
|
pkStat := &storage.PkStatistics{
|
|
PkFilter: stat.BF,
|
|
MinPK: stat.MinPk,
|
|
MaxPK: stat.MaxPk,
|
|
}
|
|
size += stat.BF.Cap()
|
|
bfs.AddHistoricalStats(pkStat)
|
|
}
|
|
log.Info("Successfully load pk stats", zap.Duration("time", time.Since(startTs)), zap.Uint("size", size))
|
|
return nil
|
|
}
|
|
|
|
func (loader *segmentLoaderV2) loadSegment(ctx context.Context,
|
|
segment *LocalSegment,
|
|
loadInfo *querypb.SegmentLoadInfo,
|
|
) error {
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("collectionID", segment.Collection()),
|
|
zap.Int64("partitionID", segment.Partition()),
|
|
zap.String("shard", segment.Shard()),
|
|
zap.Int64("segmentID", segment.ID()),
|
|
)
|
|
log.Info("start loading segment files",
|
|
zap.Int64("rowNum", loadInfo.GetNumOfRows()),
|
|
zap.String("segmentType", segment.Type().String()))
|
|
|
|
collection := loader.manager.Collection.Get(segment.Collection())
|
|
if collection == nil {
|
|
err := merr.WrapErrCollectionNotFound(segment.Collection())
|
|
log.Warn("failed to get collection while loading segment", zap.Error(err))
|
|
return err
|
|
}
|
|
// pkField := GetPkField(collection.Schema())
|
|
|
|
// TODO(xige-16): Optimize the data loading process and reduce data copying
|
|
// for now, there will be multiple copies in the process of data loading into segCore
|
|
defer debug.FreeOSMemory()
|
|
|
|
if segment.Type() == SegmentTypeSealed {
|
|
fieldsMap := typeutil.NewConcurrentMap[int64, *schemapb.FieldSchema]()
|
|
for _, field := range collection.Schema().Fields {
|
|
fieldsMap.Insert(field.FieldID, field)
|
|
}
|
|
// fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
|
|
indexedFieldInfos := make(map[int64]*IndexedFieldInfo)
|
|
for _, indexInfo := range loadInfo.IndexInfos {
|
|
if indexInfo.GetIndexStoreVersion() > 0 {
|
|
fieldID := indexInfo.FieldID
|
|
fieldInfo := &IndexedFieldInfo{
|
|
IndexInfo: indexInfo,
|
|
}
|
|
indexedFieldInfos[fieldID] = fieldInfo
|
|
fieldsMap.Remove(fieldID)
|
|
// fieldID2IndexInfo[fieldID] = indexInfo
|
|
}
|
|
}
|
|
|
|
if err := segment.AddFieldDataInfo(ctx, loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("load fields...",
|
|
zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)),
|
|
)
|
|
|
|
schemaHelper, err := typeutil.CreateSchemaHelper(collection.Schema())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil {
|
|
return err
|
|
}
|
|
|
|
// REMOVEME
|
|
keys := make([]int64, 0)
|
|
fieldsMap.Range(func(key int64, value *schemapb.FieldSchema) bool {
|
|
keys = append(keys, key)
|
|
return true
|
|
})
|
|
|
|
if err := loader.loadSealedSegmentFields(ctx, segment, fieldsMap, loadInfo.GetNumOfRows()); err != nil {
|
|
return err
|
|
}
|
|
// https://github.com/milvus-io/milvus/23654
|
|
// legacy entry num = 0
|
|
if err := loader.patchEntryNumber(ctx, segment, loadInfo); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if err := segment.LoadMultiFieldData(ctx, loadInfo.GetNumOfRows(), loadInfo.BinlogPaths); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// load statslog if it's growing segment
|
|
if segment.typ == SegmentTypeGrowing {
|
|
log.Info("loading statslog...")
|
|
// pkStatsBinlogs, logType := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID())
|
|
err := loader.loadBloomFilter(ctx, segment.segmentID, segment.bloomFilterSet, loadInfo.StorageVersion)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
log.Info("loading delta...")
|
|
return loader.LoadDelta(ctx, segment.Collection(), segment)
|
|
}
|
|
|
|
func (loader *segmentLoaderV2) loadSealedSegmentFields(ctx context.Context, segment *LocalSegment, fields *typeutil.ConcurrentMap[int64, *schemapb.FieldSchema], rowCount int64) error {
|
|
runningGroup, _ := errgroup.WithContext(ctx)
|
|
fields.Range(func(fieldID int64, fieldSchema *schemapb.FieldSchema) bool {
|
|
runningGroup.Go(func() error {
|
|
return segment.LoadFieldData(ctx, fieldID, rowCount, nil, common.IsMmapEnabled(fieldSchema.GetTypeParams()...))
|
|
})
|
|
return true
|
|
})
|
|
|
|
err := runningGroup.Wait()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Ctx(ctx).Info("load field binlogs done for sealed segment",
|
|
zap.Int64("collection", segment.collectionID),
|
|
zap.Int64("segment", segment.segmentID),
|
|
zap.String("segmentType", segment.Type().String()))
|
|
|
|
return nil
|
|
}
|
|
|
|
func NewLoader(
|
|
manager *Manager,
|
|
cm storage.ChunkManager,
|
|
) *segmentLoader {
|
|
cpuNum := hardware.GetCPUNum()
|
|
ioPoolSize := cpuNum * 8
|
|
// make sure small machines could load faster
|
|
if ioPoolSize < 32 {
|
|
ioPoolSize = 32
|
|
}
|
|
// limit the number of concurrency
|
|
if ioPoolSize > 256 {
|
|
ioPoolSize = 256
|
|
}
|
|
|
|
if configPoolSize := paramtable.Get().QueryNodeCfg.IoPoolSize.GetAsInt(); configPoolSize > 0 {
|
|
ioPoolSize = configPoolSize
|
|
}
|
|
|
|
log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize))
|
|
|
|
loader := &segmentLoader{
|
|
manager: manager,
|
|
cm: cm,
|
|
loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](),
|
|
}
|
|
|
|
return loader
|
|
}
|
|
|
|
type loadStatus = int32
|
|
|
|
const (
|
|
loading loadStatus = iota + 1
|
|
success
|
|
failure
|
|
)
|
|
|
|
type loadResult struct {
|
|
status *atomic.Int32
|
|
cond *sync.Cond
|
|
}
|
|
|
|
func newLoadResult() *loadResult {
|
|
return &loadResult{
|
|
status: atomic.NewInt32(loading),
|
|
cond: sync.NewCond(&sync.Mutex{}),
|
|
}
|
|
}
|
|
|
|
func (r *loadResult) SetResult(status loadStatus) {
|
|
r.status.CompareAndSwap(loading, status)
|
|
r.cond.Broadcast()
|
|
}
|
|
|
|
// segmentLoader is only responsible for loading the field data from binlog
|
|
type segmentLoader struct {
|
|
manager *Manager
|
|
cm storage.ChunkManager
|
|
|
|
mut sync.Mutex
|
|
// The channel will be closed as the segment loaded
|
|
loadingSegments *typeutil.ConcurrentMap[int64, *loadResult]
|
|
committedResource LoadResource
|
|
}
|
|
|
|
var _ Loader = (*segmentLoader)(nil)
|
|
|
|
func (loader *segmentLoader) Load(ctx context.Context,
|
|
collectionID int64,
|
|
segmentType SegmentType,
|
|
version int64,
|
|
segments ...*querypb.SegmentLoadInfo,
|
|
) ([]Segment, error) {
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("collectionID", collectionID),
|
|
zap.String("segmentType", segmentType.String()),
|
|
)
|
|
|
|
if len(segments) == 0 {
|
|
log.Info("no segment to load")
|
|
return nil, nil
|
|
}
|
|
// Filter out loaded & loading segments
|
|
infos := loader.prepare(segmentType, version, segments...)
|
|
defer loader.unregister(infos...)
|
|
|
|
log.With(
|
|
zap.Int64s("requestSegments", lo.Map(segments, func(s *querypb.SegmentLoadInfo, _ int) int64 { return s.GetSegmentID() })),
|
|
zap.Int64s("preparedSegments", lo.Map(infos, func(s *querypb.SegmentLoadInfo, _ int) int64 { return s.GetSegmentID() })),
|
|
)
|
|
|
|
// continue to wait other task done
|
|
log.Info("start loading...", zap.Int("segmentNum", len(segments)), zap.Int("afterFilter", len(infos)))
|
|
|
|
// Check memory & storage limit
|
|
resource, concurrencyLevel, err := loader.requestResource(ctx, infos...)
|
|
if err != nil {
|
|
log.Warn("request resource failed", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
defer loader.freeRequest(resource)
|
|
|
|
newSegments := typeutil.NewConcurrentMap[int64, Segment]()
|
|
loaded := typeutil.NewConcurrentMap[int64, Segment]()
|
|
defer func() {
|
|
newSegments.Range(func(segmentID int64, s Segment) bool {
|
|
log.Warn("release new segment created due to load failure",
|
|
zap.Int64("segmentID", segmentID),
|
|
zap.Error(err),
|
|
)
|
|
s.Release()
|
|
return true
|
|
})
|
|
debug.FreeOSMemory()
|
|
}()
|
|
|
|
for _, info := range infos {
|
|
segmentID := info.GetSegmentID()
|
|
partitionID := info.GetPartitionID()
|
|
collectionID := info.GetCollectionID()
|
|
shard := info.GetInsertChannel()
|
|
|
|
collection := loader.manager.Collection.Get(collectionID)
|
|
if collection == nil {
|
|
err := merr.WrapErrCollectionNotFound(collectionID)
|
|
log.Warn("failed to get collection", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
segment, err := NewSegment(
|
|
ctx,
|
|
collection,
|
|
segmentID,
|
|
partitionID,
|
|
collectionID,
|
|
shard,
|
|
segmentType,
|
|
version,
|
|
info.GetStartPosition(),
|
|
info.GetDeltaPosition(),
|
|
info.GetLevel(),
|
|
)
|
|
if err != nil {
|
|
log.Warn("load segment failed when create new segment",
|
|
zap.Int64("partitionID", partitionID),
|
|
zap.Int64("segmentID", segmentID),
|
|
zap.Error(err),
|
|
)
|
|
return nil, err
|
|
}
|
|
|
|
newSegments.Insert(segmentID, segment)
|
|
}
|
|
|
|
loadSegmentFunc := func(idx int) error {
|
|
loadInfo := infos[idx]
|
|
partitionID := loadInfo.PartitionID
|
|
segmentID := loadInfo.SegmentID
|
|
segment, _ := newSegments.Get(segmentID)
|
|
|
|
tr := timerecord.NewTimeRecorder("loadDurationPerSegment")
|
|
|
|
var err error
|
|
if loadInfo.GetLevel() == datapb.SegmentLevel_L0 {
|
|
err = loader.LoadDeltaLogs(ctx, segment, loadInfo.GetDeltalogs())
|
|
} else {
|
|
err = loader.loadSegment(ctx, segment.(*LocalSegment), loadInfo)
|
|
}
|
|
if err != nil {
|
|
log.Warn("load segment failed when load data into memory",
|
|
zap.Int64("partitionID", partitionID),
|
|
zap.Int64("segmentID", segmentID),
|
|
zap.Error(err),
|
|
)
|
|
return err
|
|
}
|
|
loader.manager.Segment.Put(segmentType, segment)
|
|
newSegments.GetAndRemove(segmentID)
|
|
loaded.Insert(segmentID, segment)
|
|
log.Info("load segment done", zap.Int64("segmentID", segmentID))
|
|
loader.notifyLoadFinish(loadInfo)
|
|
|
|
metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(tr.ElapseSpan().Seconds())
|
|
return nil
|
|
}
|
|
|
|
// Start to load,
|
|
// Make sure we can always benefit from concurrency, and not spawn too many idle goroutines
|
|
log.Info("start to load segments in parallel",
|
|
zap.Int("segmentNum", len(infos)),
|
|
zap.Int("concurrencyLevel", concurrencyLevel))
|
|
err = funcutil.ProcessFuncParallel(len(infos),
|
|
concurrencyLevel, loadSegmentFunc, "loadSegmentFunc")
|
|
if err != nil {
|
|
log.Warn("failed to load some segments", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
// Wait for all segments loaded
|
|
if err := loader.waitSegmentLoadDone(ctx, segmentType, lo.Map(segments, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })...); err != nil {
|
|
log.Warn("failed to wait the filtered out segments load done", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
log.Info("all segment load done")
|
|
var result []Segment
|
|
loaded.Range(func(_ int64, s Segment) bool {
|
|
result = append(result, s)
|
|
return true
|
|
})
|
|
return result, nil
|
|
}
|
|
|
|
func (loader *segmentLoader) prepare(segmentType SegmentType, version int64, segments ...*querypb.SegmentLoadInfo) []*querypb.SegmentLoadInfo {
|
|
loader.mut.Lock()
|
|
defer loader.mut.Unlock()
|
|
|
|
// filter out loaded & loading segments
|
|
infos := make([]*querypb.SegmentLoadInfo, 0, len(segments))
|
|
for _, segment := range segments {
|
|
// Not loaded & loading
|
|
if len(loader.manager.Segment.GetBy(WithType(segmentType), WithID(segment.GetSegmentID()))) == 0 &&
|
|
!loader.loadingSegments.Contain(segment.GetSegmentID()) {
|
|
infos = append(infos, segment)
|
|
loader.loadingSegments.Insert(segment.GetSegmentID(), newLoadResult())
|
|
} else {
|
|
// try to update segment version before skip load operation
|
|
loader.manager.Segment.UpdateBy(IncreaseVersion(version),
|
|
WithType(segmentType), WithID(segment.SegmentID))
|
|
log.Info("skip loaded/loading segment", zap.Int64("segmentID", segment.GetSegmentID()),
|
|
zap.Bool("isLoaded", len(loader.manager.Segment.GetBy(WithType(segmentType), WithID(segment.GetSegmentID()))) > 0),
|
|
zap.Bool("isLoading", loader.loadingSegments.Contain(segment.GetSegmentID())),
|
|
)
|
|
}
|
|
}
|
|
|
|
return infos
|
|
}
|
|
|
|
func (loader *segmentLoader) unregister(segments ...*querypb.SegmentLoadInfo) {
|
|
loader.mut.Lock()
|
|
defer loader.mut.Unlock()
|
|
for i := range segments {
|
|
result, ok := loader.loadingSegments.GetAndRemove(segments[i].GetSegmentID())
|
|
if ok {
|
|
result.SetResult(failure)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (loader *segmentLoader) notifyLoadFinish(segments ...*querypb.SegmentLoadInfo) {
|
|
for _, loadInfo := range segments {
|
|
result, ok := loader.loadingSegments.Get(loadInfo.GetSegmentID())
|
|
if ok {
|
|
result.SetResult(success)
|
|
}
|
|
}
|
|
}
|
|
|
|
// requestResource requests memory & storage to load segments,
|
|
// returns the memory usage, disk usage and concurrency with the gained memory.
|
|
func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*querypb.SegmentLoadInfo) (LoadResource, int, error) {
|
|
resource := LoadResource{}
|
|
// we need to deal with empty infos case separately,
|
|
// because the following judgement for requested resources are based on current status and static config
|
|
// which may block empty-load operations by accident
|
|
if len(infos) == 0 ||
|
|
infos[0].GetLevel() == datapb.SegmentLevel_L0 {
|
|
return resource, 0, nil
|
|
}
|
|
|
|
segmentIDs := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 {
|
|
return info.GetSegmentID()
|
|
})
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64s("segmentIDs", segmentIDs),
|
|
)
|
|
|
|
loader.mut.Lock()
|
|
defer loader.mut.Unlock()
|
|
|
|
memoryUsage := hardware.GetUsedMemoryCount()
|
|
totalMemory := hardware.GetMemoryCount()
|
|
|
|
diskUsage, err := GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue())
|
|
if err != nil {
|
|
return resource, 0, errors.Wrap(err, "get local used size failed")
|
|
}
|
|
diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64()
|
|
|
|
if loader.committedResource.MemorySize+memoryUsage >= totalMemory {
|
|
return resource, 0, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory))
|
|
} else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap {
|
|
return resource, 0, merr.WrapErrServiceDiskLimitExceeded(float32(loader.committedResource.DiskSize+uint64(diskUsage)), float32(diskCap))
|
|
}
|
|
|
|
concurrencyLevel := funcutil.Min(hardware.GetCPUNum(), len(infos))
|
|
mu, du, err := loader.checkSegmentSize(ctx, infos)
|
|
if err != nil {
|
|
log.Warn("no sufficient resource to load segments", zap.Error(err))
|
|
return resource, 0, err
|
|
}
|
|
|
|
resource.MemorySize += mu
|
|
resource.DiskSize += du
|
|
|
|
toMB := func(mem uint64) float64 {
|
|
return float64(mem) / 1024 / 1024
|
|
}
|
|
loader.committedResource.Add(resource)
|
|
log.Info("request resource for loading segments (unit in MiB)",
|
|
zap.Float64("memory", toMB(resource.MemorySize)),
|
|
zap.Float64("committedMemory", toMB(loader.committedResource.MemorySize)),
|
|
zap.Float64("disk", toMB(resource.DiskSize)),
|
|
zap.Float64("committedDisk", toMB(loader.committedResource.DiskSize)),
|
|
)
|
|
|
|
return resource, concurrencyLevel, nil
|
|
}
|
|
|
|
// freeRequest returns request memory & storage usage request.
|
|
func (loader *segmentLoader) freeRequest(resource LoadResource) {
|
|
loader.mut.Lock()
|
|
defer loader.mut.Unlock()
|
|
|
|
loader.committedResource.Sub(resource)
|
|
}
|
|
|
|
func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentType SegmentType, segmentIDs ...int64) error {
|
|
log := log.Ctx(ctx).With(
|
|
zap.String("segmentType", segmentType.String()),
|
|
zap.Int64s("segmentIDs", segmentIDs),
|
|
)
|
|
for _, segmentID := range segmentIDs {
|
|
if loader.manager.Segment.GetWithType(segmentID, segmentType) != nil {
|
|
continue
|
|
}
|
|
|
|
result, ok := loader.loadingSegments.Get(segmentID)
|
|
if !ok {
|
|
log.Warn("segment was removed from the loading map early", zap.Int64("segmentID", segmentID))
|
|
return errors.New("segment was removed from the loading map early")
|
|
}
|
|
|
|
log.Info("wait segment loaded...", zap.Int64("segmentID", segmentID))
|
|
|
|
signal := make(chan struct{})
|
|
go func() {
|
|
select {
|
|
case <-signal:
|
|
case <-ctx.Done():
|
|
result.cond.Broadcast()
|
|
}
|
|
}()
|
|
result.cond.L.Lock()
|
|
for result.status.Load() == loading && ctx.Err() == nil {
|
|
result.cond.Wait()
|
|
}
|
|
result.cond.L.Unlock()
|
|
close(signal)
|
|
|
|
if ctx.Err() != nil {
|
|
log.Warn("failed to wait segment loaded due to context done", zap.Int64("segmentID", segmentID))
|
|
return ctx.Err()
|
|
}
|
|
|
|
if result.status.Load() == failure {
|
|
log.Warn("failed to wait segment loaded", zap.Int64("segmentID", segmentID))
|
|
return merr.WrapErrSegmentLack(segmentID, "failed to wait segment loaded")
|
|
}
|
|
|
|
log.Info("segment loaded...", zap.Int64("segmentID", segmentID))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error) {
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("collectionID", collectionID),
|
|
zap.Int64s("segmentIDs", lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 {
|
|
return info.GetSegmentID()
|
|
})),
|
|
)
|
|
|
|
segmentNum := len(infos)
|
|
if segmentNum == 0 {
|
|
log.Info("no segment to load")
|
|
return nil, nil
|
|
}
|
|
|
|
collection := loader.manager.Collection.Get(collectionID)
|
|
if collection == nil {
|
|
err := merr.WrapErrCollectionNotFound(collectionID)
|
|
log.Warn("failed to get collection while loading segment", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
pkField := GetPkField(collection.Schema())
|
|
|
|
log.Info("start loading remote...", zap.Int("segmentNum", segmentNum))
|
|
|
|
loadedBfs := typeutil.NewConcurrentSet[*pkoracle.BloomFilterSet]()
|
|
// TODO check memory for bf size
|
|
loadRemoteFunc := func(idx int) error {
|
|
loadInfo := infos[idx]
|
|
partitionID := loadInfo.PartitionID
|
|
segmentID := loadInfo.SegmentID
|
|
bfs := pkoracle.NewBloomFilterSet(segmentID, partitionID, commonpb.SegmentState_Sealed)
|
|
|
|
log.Info("loading bloom filter for remote...")
|
|
pkStatsBinlogs, logType := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID())
|
|
err := loader.loadBloomFilter(ctx, segmentID, bfs, pkStatsBinlogs, logType)
|
|
if err != nil {
|
|
log.Warn("load remote segment bloom filter failed",
|
|
zap.Int64("partitionID", partitionID),
|
|
zap.Int64("segmentID", segmentID),
|
|
zap.Error(err),
|
|
)
|
|
return err
|
|
}
|
|
loadedBfs.Insert(bfs)
|
|
|
|
return nil
|
|
}
|
|
|
|
err := funcutil.ProcessFuncParallel(segmentNum, segmentNum, loadRemoteFunc, "loadRemoteFunc")
|
|
if err != nil {
|
|
// no partial success here
|
|
log.Warn("failed to load remote segment", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
return loadedBfs.Collect(), nil
|
|
}
|
|
|
|
func (loader *segmentLoader) loadSegment(ctx context.Context,
|
|
segment *LocalSegment,
|
|
loadInfo *querypb.SegmentLoadInfo,
|
|
) error {
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("collectionID", segment.Collection()),
|
|
zap.Int64("partitionID", segment.Partition()),
|
|
zap.String("shard", segment.Shard()),
|
|
zap.Int64("segmentID", segment.ID()),
|
|
)
|
|
log.Info("start loading segment files",
|
|
zap.Int64("rowNum", loadInfo.GetNumOfRows()),
|
|
zap.String("segmentType", segment.Type().String()))
|
|
|
|
collection := loader.manager.Collection.Get(segment.Collection())
|
|
if collection == nil {
|
|
err := merr.WrapErrCollectionNotFound(segment.Collection())
|
|
log.Warn("failed to get collection while loading segment", zap.Error(err))
|
|
return err
|
|
}
|
|
pkField := GetPkField(collection.Schema())
|
|
|
|
// TODO(xige-16): Optimize the data loading process and reduce data copying
|
|
// for now, there will be multiple copies in the process of data loading into segCore
|
|
defer debug.FreeOSMemory()
|
|
|
|
if segment.Type() == SegmentTypeSealed {
|
|
fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
|
|
for _, indexInfo := range loadInfo.IndexInfos {
|
|
if len(indexInfo.GetIndexFilePaths()) > 0 {
|
|
fieldID := indexInfo.FieldID
|
|
fieldID2IndexInfo[fieldID] = indexInfo
|
|
}
|
|
}
|
|
|
|
indexedFieldInfos := make(map[int64]*IndexedFieldInfo)
|
|
fieldBinlogs := make([]*datapb.FieldBinlog, 0, len(loadInfo.BinlogPaths))
|
|
|
|
for _, fieldBinlog := range loadInfo.BinlogPaths {
|
|
fieldID := fieldBinlog.FieldID
|
|
// check num rows of data meta and index meta are consistent
|
|
if indexInfo, ok := fieldID2IndexInfo[fieldID]; ok {
|
|
fieldInfo := &IndexedFieldInfo{
|
|
FieldBinlog: fieldBinlog,
|
|
IndexInfo: indexInfo,
|
|
}
|
|
indexedFieldInfos[fieldID] = fieldInfo
|
|
} else {
|
|
fieldBinlogs = append(fieldBinlogs, fieldBinlog)
|
|
}
|
|
}
|
|
|
|
schemaHelper, _ := typeutil.CreateSchemaHelper(collection.Schema())
|
|
|
|
if err := segment.AddFieldDataInfo(ctx, loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("load fields...",
|
|
zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)),
|
|
)
|
|
if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil {
|
|
return err
|
|
}
|
|
for fieldID, info := range indexedFieldInfos {
|
|
field, err := schemaHelper.GetFieldFromID(fieldID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !typeutil.IsVectorType(field.GetDataType()) && !segment.HasRawData(fieldID) {
|
|
log.Info("field index doesn't include raw data, load binlog...", zap.Int64("fieldID", fieldID), zap.String("index", info.IndexInfo.GetIndexName()))
|
|
if err = segment.LoadFieldData(ctx, fieldID, loadInfo.GetNumOfRows(), info.FieldBinlog, true); err != nil {
|
|
log.Warn("load raw data failed", zap.Int64("fieldID", fieldID), zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
if err := loader.loadSealedSegmentFields(ctx, segment, fieldBinlogs, loadInfo.GetNumOfRows()); err != nil {
|
|
return err
|
|
}
|
|
// https://github.com/milvus-io/milvus/23654
|
|
// legacy entry num = 0
|
|
if err := loader.patchEntryNumber(ctx, segment, loadInfo); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if err := segment.LoadMultiFieldData(ctx, loadInfo.GetNumOfRows(), loadInfo.BinlogPaths); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// load statslog if it's growing segment
|
|
if segment.typ == SegmentTypeGrowing {
|
|
log.Info("loading statslog...")
|
|
pkStatsBinlogs, logType := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID())
|
|
err := loader.loadBloomFilter(ctx, segment.segmentID, segment.bloomFilterSet, pkStatsBinlogs, logType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
metrics.QueryNodeNumEntities.WithLabelValues(
|
|
fmt.Sprint(paramtable.GetNodeID()),
|
|
fmt.Sprint(segment.Collection()),
|
|
fmt.Sprint(segment.Partition()),
|
|
segment.Type().String(),
|
|
strconv.FormatInt(int64(len(segment.Indexes())), 10),
|
|
).Add(float64(loadInfo.GetNumOfRows()))
|
|
|
|
log.Info("loading delta...")
|
|
return loader.LoadDeltaLogs(ctx, segment, loadInfo.Deltalogs)
|
|
}
|
|
|
|
func (loader *segmentLoader) filterPKStatsBinlogs(fieldBinlogs []*datapb.FieldBinlog, pkFieldID int64) ([]string, storage.StatsLogType) {
|
|
result := make([]string, 0)
|
|
for _, fieldBinlog := range fieldBinlogs {
|
|
if fieldBinlog.FieldID == pkFieldID {
|
|
for _, binlog := range fieldBinlog.GetBinlogs() {
|
|
_, logidx := path.Split(binlog.GetLogPath())
|
|
// if special status log exist
|
|
// only load one file
|
|
switch logidx {
|
|
case storage.CompoundStatsType.LogIdx():
|
|
return []string{binlog.GetLogPath()}, storage.CompoundStatsType
|
|
default:
|
|
result = append(result, binlog.GetLogPath())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return result, storage.DefaultStatsType
|
|
}
|
|
|
|
func (loader *segmentLoader) loadSealedSegmentFields(ctx context.Context, segment *LocalSegment, fields []*datapb.FieldBinlog, rowCount int64) error {
|
|
collection := loader.manager.Collection.Get(segment.Collection())
|
|
if collection == nil {
|
|
return merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load segment fields")
|
|
}
|
|
|
|
runningGroup, _ := errgroup.WithContext(ctx)
|
|
for _, field := range fields {
|
|
fieldBinLog := field
|
|
fieldID := field.FieldID
|
|
runningGroup.Go(func() error {
|
|
return segment.LoadFieldData(ctx,
|
|
fieldID,
|
|
rowCount,
|
|
fieldBinLog,
|
|
common.IsFieldMmapEnabled(collection.Schema(), fieldID),
|
|
)
|
|
})
|
|
}
|
|
err := runningGroup.Wait()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Ctx(ctx).Info("load field binlogs done for sealed segment",
|
|
zap.Int64("collection", segment.collectionID),
|
|
zap.Int64("segment", segment.segmentID),
|
|
zap.Int("len(field)", len(fields)),
|
|
zap.String("segmentType", segment.Type().String()))
|
|
|
|
return nil
|
|
}
|
|
|
|
func (loader *segmentLoader) loadFieldsIndex(ctx context.Context,
|
|
schemaHelper *typeutil.SchemaHelper,
|
|
segment *LocalSegment,
|
|
numRows int64,
|
|
indexedFieldInfos map[int64]*IndexedFieldInfo,
|
|
) error {
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("collectionID", segment.Collection()),
|
|
zap.Int64("partitionID", segment.Partition()),
|
|
zap.Int64("segmentID", segment.ID()),
|
|
zap.Int64("rowCount", numRows),
|
|
)
|
|
|
|
for fieldID, fieldInfo := range indexedFieldInfos {
|
|
indexInfo := fieldInfo.IndexInfo
|
|
err := loader.loadFieldIndex(ctx, segment, indexInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("load field binlogs done for sealed segment with index",
|
|
zap.Int64("fieldID", fieldID),
|
|
zap.Any("binlog", fieldInfo.FieldBinlog.Binlogs),
|
|
zap.Int32("current_index_version", fieldInfo.IndexInfo.GetCurrentIndexVersion()),
|
|
)
|
|
|
|
// set average row data size of variable field
|
|
field, err := schemaHelper.GetFieldFromID(fieldID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if typeutil.IsVariableDataType(field.GetDataType()) {
|
|
err = segment.UpdateFieldRawDataSize(ctx, numRows, fieldInfo.FieldBinlog)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (loader *segmentLoader) loadFieldIndex(ctx context.Context, segment *LocalSegment, indexInfo *querypb.FieldIndexInfo) error {
|
|
filteredPaths := make([]string, 0, len(indexInfo.IndexFilePaths))
|
|
|
|
for _, indexPath := range indexInfo.IndexFilePaths {
|
|
if path.Base(indexPath) != storage.IndexParamsKey {
|
|
filteredPaths = append(filteredPaths, indexPath)
|
|
}
|
|
}
|
|
|
|
indexInfo.IndexFilePaths = filteredPaths
|
|
fieldType, err := loader.getFieldType(segment.Collection(), indexInfo.FieldID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
collection := loader.manager.Collection.Get(segment.Collection())
|
|
if collection == nil {
|
|
return merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load field index")
|
|
}
|
|
|
|
return segment.LoadIndex(ctx, indexInfo, fieldType)
|
|
}
|
|
|
|
func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet,
|
|
binlogPaths []string, logType storage.StatsLogType,
|
|
) error {
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("segmentID", segmentID),
|
|
)
|
|
if len(binlogPaths) == 0 {
|
|
log.Info("there are no stats logs saved with segment")
|
|
return nil
|
|
}
|
|
|
|
startTs := time.Now()
|
|
values, err := loader.cm.MultiRead(ctx, binlogPaths)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
blobs := []*storage.Blob{}
|
|
for i := 0; i < len(values); i++ {
|
|
blobs = append(blobs, &storage.Blob{Value: values[i]})
|
|
}
|
|
|
|
var stats []*storage.PrimaryKeyStats
|
|
if logType == storage.CompoundStatsType {
|
|
stats, err = storage.DeserializeStatsList(blobs[0])
|
|
if err != nil {
|
|
log.Warn("failed to deserialize stats list", zap.Error(err))
|
|
return err
|
|
}
|
|
} else {
|
|
stats, err = storage.DeserializeStats(blobs)
|
|
if err != nil {
|
|
log.Warn("failed to deserialize stats", zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
var size uint
|
|
for _, stat := range stats {
|
|
pkStat := &storage.PkStatistics{
|
|
PkFilter: stat.BF,
|
|
MinPK: stat.MinPk,
|
|
MaxPK: stat.MaxPk,
|
|
}
|
|
size += stat.BF.Cap()
|
|
bfs.AddHistoricalStats(pkStat)
|
|
}
|
|
log.Info("Successfully load pk stats", zap.Duration("time", time.Since(startTs)), zap.Uint("size", size))
|
|
return nil
|
|
}
|
|
|
|
func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
|
|
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadDeltalogs-%d", segment.ID()))
|
|
defer sp.End()
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("segmentID", segment.ID()),
|
|
)
|
|
dCodec := storage.DeleteCodec{}
|
|
var blobs []*storage.Blob
|
|
var futures []*conc.Future[any]
|
|
for _, deltaLog := range deltaLogs {
|
|
for _, bLog := range deltaLog.GetBinlogs() {
|
|
bLog := bLog
|
|
// the segment has applied the delta logs, skip it
|
|
if bLog.GetTimestampTo() > 0 && // this field may be missed in legacy versions
|
|
bLog.GetTimestampTo() < segment.LastDeltaTimestamp() {
|
|
continue
|
|
}
|
|
future := GetLoadPool().Submit(func() (any, error) {
|
|
value, err := loader.cm.Read(ctx, bLog.GetLogPath())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
blob := &storage.Blob{
|
|
Key: bLog.GetLogPath(),
|
|
Value: value,
|
|
}
|
|
return blob, nil
|
|
})
|
|
futures = append(futures, future)
|
|
}
|
|
}
|
|
for _, future := range futures {
|
|
blob, err := future.Await()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
blobs = append(blobs, blob.(*storage.Blob))
|
|
}
|
|
if len(blobs) == 0 {
|
|
log.Info("there are no delta logs saved with segment, skip loading delete record")
|
|
return nil
|
|
}
|
|
_, _, deltaData, err := dCodec.Deserialize(blobs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = segment.LoadDeltaData(ctx, deltaData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("load delta logs done", zap.Int64("deleteCount", deltaData.RowCount))
|
|
return nil
|
|
}
|
|
|
|
func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo) error {
|
|
var needReset bool
|
|
|
|
segment.fieldIndexes.Range(func(fieldID int64, info *IndexedFieldInfo) bool {
|
|
for _, info := range info.FieldBinlog.GetBinlogs() {
|
|
if info.GetEntriesNum() == 0 {
|
|
needReset = true
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
if !needReset {
|
|
return nil
|
|
}
|
|
|
|
log.Warn("legacy segment binlog found, start to patch entry num", zap.Int64("segmentID", segment.segmentID))
|
|
rowIDField := lo.FindOrElse(loadInfo.BinlogPaths, nil, func(binlog *datapb.FieldBinlog) bool {
|
|
return binlog.GetFieldID() == common.RowIDField
|
|
})
|
|
|
|
if rowIDField == nil {
|
|
return errors.New("rowID field binlog not found")
|
|
}
|
|
|
|
counts := make([]int64, 0, len(rowIDField.GetBinlogs()))
|
|
for _, binlog := range rowIDField.GetBinlogs() {
|
|
// binlog.LogPath has already been filled
|
|
bs, err := loader.cm.Read(ctx, binlog.LogPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// get binlog entry num from rowID field
|
|
// since header does not store entry numb, we have to read all data here
|
|
|
|
reader, err := storage.NewBinlogReader(bs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
er, err := reader.NextEventReader()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rowIDs, err := er.GetInt64FromPayload()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
counts = append(counts, int64(len(rowIDs)))
|
|
}
|
|
|
|
var err error
|
|
segment.fieldIndexes.Range(func(fieldID int64, info *IndexedFieldInfo) bool {
|
|
if len(info.FieldBinlog.GetBinlogs()) != len(counts) {
|
|
err = errors.New("rowID & index binlog number not matched")
|
|
return false
|
|
}
|
|
for i, binlog := range info.FieldBinlog.GetBinlogs() {
|
|
binlog.EntriesNum = counts[i]
|
|
}
|
|
return true
|
|
})
|
|
return err
|
|
}
|
|
|
|
// JoinIDPath joins ids to path format.
|
|
func JoinIDPath(ids ...int64) string {
|
|
idStr := make([]string, 0, len(ids))
|
|
for _, id := range ids {
|
|
idStr = append(idStr, strconv.FormatInt(id, 10))
|
|
}
|
|
return path.Join(idStr...)
|
|
}
|
|
|
|
func GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo) (uint64, uint64, error) {
|
|
indexType, err := funcutil.GetAttrByKeyFromRepeatedKV(common.IndexTypeKey, indexInfo.IndexParams)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("index type not exist in index params")
|
|
}
|
|
if indexType == indexparamcheck.IndexDISKANN {
|
|
neededMemSize := indexInfo.IndexSize / UsedDiskMemoryRatio
|
|
neededDiskSize := indexInfo.IndexSize - neededMemSize
|
|
return uint64(neededMemSize), uint64(neededDiskSize), nil
|
|
}
|
|
|
|
return uint64(indexInfo.IndexSize), 0, nil
|
|
}
|
|
|
|
// checkSegmentSize checks whether the memory & disk is sufficient to load the segments
|
|
// returns the memory & disk usage while loading if possible to load,
|
|
// otherwise, returns error
|
|
func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo) (uint64, uint64, error) {
|
|
if len(segmentLoadInfos) == 0 {
|
|
return 0, 0, nil
|
|
}
|
|
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("collectionID", segmentLoadInfos[0].GetCollectionID()),
|
|
)
|
|
|
|
toMB := func(mem uint64) float64 {
|
|
return float64(mem) / 1024 / 1024
|
|
}
|
|
|
|
memUsage := hardware.GetUsedMemoryCount() + loader.committedResource.MemorySize
|
|
totalMem := hardware.GetMemoryCount()
|
|
if memUsage == 0 || totalMem == 0 {
|
|
return 0, 0, errors.New("get memory failed when checkSegmentSize")
|
|
}
|
|
|
|
localDiskUsage, err := GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue())
|
|
if err != nil {
|
|
return 0, 0, errors.Wrap(err, "get local used size failed")
|
|
}
|
|
|
|
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(toMB(uint64(localDiskUsage)))
|
|
diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize
|
|
|
|
maxSegmentSize := uint64(0)
|
|
predictMemUsage := memUsage
|
|
predictDiskUsage := diskUsage
|
|
mmapFieldCount := 0
|
|
for _, loadInfo := range segmentLoadInfos {
|
|
collection := loader.manager.Collection.Get(loadInfo.GetCollectionID())
|
|
|
|
oldUsedMem := predictMemUsage
|
|
vecFieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
|
|
for _, fieldIndexInfo := range loadInfo.IndexInfos {
|
|
if fieldIndexInfo.EnableIndex {
|
|
fieldID := fieldIndexInfo.FieldID
|
|
vecFieldID2IndexInfo[fieldID] = fieldIndexInfo
|
|
}
|
|
}
|
|
|
|
for _, fieldBinlog := range loadInfo.BinlogPaths {
|
|
fieldID := fieldBinlog.FieldID
|
|
mmapEnabled := common.IsFieldMmapEnabled(collection.Schema(), fieldID)
|
|
if fieldIndexInfo, ok := vecFieldID2IndexInfo[fieldID]; ok {
|
|
neededMemSize, neededDiskSize, err := GetIndexResourceUsage(fieldIndexInfo)
|
|
if err != nil {
|
|
log.Warn("failed to get index size",
|
|
zap.Int64("collectionID", loadInfo.CollectionID),
|
|
zap.Int64("segmentID", loadInfo.SegmentID),
|
|
zap.Int64("indexBuildID", fieldIndexInfo.BuildID),
|
|
zap.Error(err),
|
|
)
|
|
return 0, 0, err
|
|
}
|
|
if mmapEnabled {
|
|
predictDiskUsage += neededMemSize + neededDiskSize
|
|
} else {
|
|
predictMemUsage += neededMemSize
|
|
predictDiskUsage += neededDiskSize
|
|
}
|
|
} else {
|
|
if mmapEnabled {
|
|
predictDiskUsage += uint64(getBinlogDataSize(fieldBinlog))
|
|
} else {
|
|
predictMemUsage += uint64(getBinlogDataSize(fieldBinlog))
|
|
enableBinlogIndex := paramtable.Get().QueryNodeCfg.EnableTempSegmentIndex.GetAsBool()
|
|
if enableBinlogIndex {
|
|
buildBinlogIndexRate := paramtable.Get().QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat()
|
|
predictMemUsage += uint64(float32(getBinlogDataSize(fieldBinlog)) * float32(buildBinlogIndexRate))
|
|
}
|
|
}
|
|
}
|
|
|
|
if mmapEnabled {
|
|
mmapFieldCount++
|
|
}
|
|
}
|
|
|
|
// get size of stats data
|
|
for _, fieldBinlog := range loadInfo.Statslogs {
|
|
predictMemUsage += uint64(getBinlogDataSize(fieldBinlog))
|
|
}
|
|
|
|
// get size of delete data
|
|
for _, fieldBinlog := range loadInfo.Deltalogs {
|
|
predictMemUsage += uint64(getBinlogDataSize(fieldBinlog))
|
|
}
|
|
|
|
if predictMemUsage-oldUsedMem > maxSegmentSize {
|
|
maxSegmentSize = predictMemUsage - oldUsedMem
|
|
}
|
|
}
|
|
|
|
log.Info("predict memory and disk usage while loading (in MiB)",
|
|
zap.Float64("maxSegmentSize(MB)", toMB(maxSegmentSize)),
|
|
zap.Float64("committedMemSize(MB)", toMB(loader.committedResource.MemorySize)),
|
|
zap.Float64("memLimit(MB)", toMB(totalMem)),
|
|
zap.Float64("memUsage(MB)", toMB(memUsage)),
|
|
zap.Float64("committedDiskSize(MB)", toMB(loader.committedResource.DiskSize)),
|
|
zap.Float64("diskUsage(MB)", toMB(diskUsage)),
|
|
zap.Float64("predictMemUsage(MB)", toMB(predictMemUsage)),
|
|
zap.Float64("predictDiskUsage(MB)", toMB(predictDiskUsage)),
|
|
zap.Int("mmapFieldCount", mmapFieldCount),
|
|
)
|
|
|
|
if predictMemUsage > uint64(float64(totalMem)*paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()) {
|
|
return 0, 0, fmt.Errorf("load segment failed, OOM if load, maxSegmentSize = %v MB, memUsage = %v MB, predictMemUsage = %v MB, totalMem = %v MB thresholdFactor = %f",
|
|
toMB(maxSegmentSize),
|
|
toMB(memUsage),
|
|
toMB(predictMemUsage),
|
|
toMB(totalMem),
|
|
paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat())
|
|
}
|
|
|
|
if predictDiskUsage > uint64(float64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64())*paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()) {
|
|
return 0, 0, fmt.Errorf("load segment failed, disk space is not enough, diskUsage = %v MB, predictDiskUsage = %v MB, totalDisk = %v MB, thresholdFactor = %f",
|
|
toMB(diskUsage),
|
|
toMB(predictDiskUsage),
|
|
toMB(uint64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64())),
|
|
paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat())
|
|
}
|
|
|
|
return predictMemUsage - memUsage, predictDiskUsage - diskUsage, nil
|
|
}
|
|
|
|
func (loader *segmentLoader) getFieldType(collectionID, fieldID int64) (schemapb.DataType, error) {
|
|
collection := loader.manager.Collection.Get(collectionID)
|
|
if collection == nil {
|
|
return 0, merr.WrapErrCollectionNotFound(collectionID)
|
|
}
|
|
|
|
for _, field := range collection.Schema().GetFields() {
|
|
if field.GetFieldID() == fieldID {
|
|
return field.GetDataType(), nil
|
|
}
|
|
}
|
|
return 0, merr.WrapErrFieldNotFound(fieldID)
|
|
}
|
|
|
|
func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo, version int64) error {
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("collection", segment.Collection()),
|
|
zap.Int64("segment", segment.ID()),
|
|
)
|
|
|
|
// Filter out LOADING segments only
|
|
// use None to avoid loaded check
|
|
infos := loader.prepare(commonpb.SegmentState_SegmentStateNone, version, loadInfo)
|
|
defer loader.unregister(infos...)
|
|
|
|
indexInfo := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *querypb.SegmentLoadInfo {
|
|
info = typeutil.Clone(info)
|
|
info.BinlogPaths = nil
|
|
info.Deltalogs = nil
|
|
info.Statslogs = nil
|
|
return info
|
|
})
|
|
resource, _, err := loader.requestResource(ctx, indexInfo...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer loader.freeRequest(resource)
|
|
|
|
log.Info("segment loader start to load index", zap.Int("segmentNumAfterFilter", len(infos)))
|
|
|
|
for _, loadInfo := range infos {
|
|
fieldIDs := typeutil.NewSet(lo.Map(loadInfo.GetIndexInfos(), func(info *querypb.FieldIndexInfo, _ int) int64 { return info.GetFieldID() })...)
|
|
fieldInfos := lo.SliceToMap(lo.Filter(loadInfo.GetBinlogPaths(), func(info *datapb.FieldBinlog, _ int) bool { return fieldIDs.Contain(info.GetFieldID()) }),
|
|
func(info *datapb.FieldBinlog) (int64, *datapb.FieldBinlog) { return info.GetFieldID(), info })
|
|
|
|
for _, info := range loadInfo.GetIndexInfos() {
|
|
if len(info.GetIndexFilePaths()) == 0 {
|
|
log.Warn("failed to add index for segment, index file list is empty, the segment may be too small")
|
|
return merr.WrapErrIndexNotFound("index file list empty")
|
|
}
|
|
|
|
fieldInfo, ok := fieldInfos[info.GetFieldID()]
|
|
if !ok {
|
|
return merr.WrapErrParameterInvalid("index info with corresponding field info", "missing field info", strconv.FormatInt(fieldInfo.GetFieldID(), 10))
|
|
}
|
|
err := loader.loadFieldIndex(ctx, segment, info)
|
|
if err != nil {
|
|
log.Warn("failed to load index for segment", zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
loader.notifyLoadFinish(loadInfo)
|
|
}
|
|
|
|
return loader.waitSegmentLoadDone(ctx, commonpb.SegmentState_SegmentStateNone, loadInfo.GetSegmentID())
|
|
}
|
|
|
|
func getBinlogDataSize(fieldBinlog *datapb.FieldBinlog) int64 {
|
|
fieldSize := int64(0)
|
|
for _, binlog := range fieldBinlog.Binlogs {
|
|
fieldSize += binlog.LogSize
|
|
}
|
|
|
|
return fieldSize
|
|
}
|