fix: lazy load index data in cache (#31094)

issue: https://github.com/milvus-io/milvus/issues/31571

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
Bingyi Sun 2024-03-25 15:43:07 +08:00 committed by GitHub
parent fe1961ff14
commit 8e661f791a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 197 additions and 111 deletions

4
go.sum
View File

@ -287,7 +287,6 @@ github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2C
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
@ -512,7 +511,6 @@ github.com/kataras/iris/v12 v12.1.8/go.mod h1:LMYy4VlP67TQ3Zgriz8RE2h2kMZV2SgMYb
github.com/kataras/neffos v0.0.14/go.mod h1:8lqADm8PnbeFfL7CLXh1WHw53dG27MC3pgi2R1rmoTE=
github.com/kataras/pio v0.0.2/go.mod h1:hAoW0t9UmXi4R5Oyq5Z4irTbaTsOemSrDGUtaTl7Dro=
github.com/kataras/sitemap v0.0.5/go.mod h1:KY2eugMKiPwsJgx7+U103YZehfvNGOXURubcGyk0Bz8=
github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
@ -588,8 +586,6 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240228061649-a922b16f2a46 h1:IgoGNTbsRPa2kdNI+IWuZrrortFEjTB42/gYDklZHVU=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240228061649-a922b16f2a46/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240317125658-67a0f065c1de h1:pxpJWrA1B94UNcKWC6H3Qg08Y5ZR77wdf/b5UU1Gizo=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240317125658-67a0f065c1de/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI=

View File

@ -63,8 +63,8 @@ type MockBroker_AllocTimestamp_Call struct {
}
// AllocTimestamp is a helper method to define mock.On call
// - ctx context.Context
// - num uint32
// - ctx context.Context
// - num uint32
func (_e *MockBroker_Expecter) AllocTimestamp(ctx interface{}, num interface{}) *MockBroker_AllocTimestamp_Call {
return &MockBroker_AllocTimestamp_Call{Call: _e.mock.On("AllocTimestamp", ctx, num)}
}
@ -125,8 +125,8 @@ type MockBroker_AssignSegmentID_Call struct {
}
// AssignSegmentID is a helper method to define mock.On call
// - ctx context.Context
// - reqs ...*datapb.SegmentIDRequest
// - ctx context.Context
// - reqs ...*datapb.SegmentIDRequest
func (_e *MockBroker_Expecter) AssignSegmentID(ctx interface{}, reqs ...interface{}) *MockBroker_AssignSegmentID_Call {
return &MockBroker_AssignSegmentID_Call{Call: _e.mock.On("AssignSegmentID",
append([]interface{}{ctx}, reqs...)...)}
@ -187,9 +187,9 @@ type MockBroker_DescribeCollection_Call struct {
}
// DescribeCollection is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - ts uint64
// - ctx context.Context
// - collectionID int64
// - ts uint64
func (_e *MockBroker_Expecter) DescribeCollection(ctx interface{}, collectionID interface{}, ts interface{}) *MockBroker_DescribeCollection_Call {
return &MockBroker_DescribeCollection_Call{Call: _e.mock.On("DescribeCollection", ctx, collectionID, ts)}
}
@ -243,8 +243,8 @@ type MockBroker_DropVirtualChannel_Call struct {
}
// DropVirtualChannel is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.DropVirtualChannelRequest
// - ctx context.Context
// - req *datapb.DropVirtualChannelRequest
func (_e *MockBroker_Expecter) DropVirtualChannel(ctx interface{}, req interface{}) *MockBroker_DropVirtualChannel_Call {
return &MockBroker_DropVirtualChannel_Call{Call: _e.mock.On("DropVirtualChannel", ctx, req)}
}
@ -298,8 +298,8 @@ type MockBroker_GetSegmentInfo_Call struct {
}
// GetSegmentInfo is a helper method to define mock.On call
// - ctx context.Context
// - segmentIDs []int64
// - ctx context.Context
// - segmentIDs []int64
func (_e *MockBroker_Expecter) GetSegmentInfo(ctx interface{}, segmentIDs interface{}) *MockBroker_GetSegmentInfo_Call {
return &MockBroker_GetSegmentInfo_Call{Call: _e.mock.On("GetSegmentInfo", ctx, segmentIDs)}
}
@ -341,8 +341,8 @@ type MockBroker_ReportTimeTick_Call struct {
}
// ReportTimeTick is a helper method to define mock.On call
// - ctx context.Context
// - msgs []*msgpb.DataNodeTtMsg
// - ctx context.Context
// - msgs []*msgpb.DataNodeTtMsg
func (_e *MockBroker_Expecter) ReportTimeTick(ctx interface{}, msgs interface{}) *MockBroker_ReportTimeTick_Call {
return &MockBroker_ReportTimeTick_Call{Call: _e.mock.On("ReportTimeTick", ctx, msgs)}
}
@ -384,8 +384,8 @@ type MockBroker_SaveBinlogPaths_Call struct {
}
// SaveBinlogPaths is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.SaveBinlogPathsRequest
// - ctx context.Context
// - req *datapb.SaveBinlogPathsRequest
func (_e *MockBroker_Expecter) SaveBinlogPaths(ctx interface{}, req interface{}) *MockBroker_SaveBinlogPaths_Call {
return &MockBroker_SaveBinlogPaths_Call{Call: _e.mock.On("SaveBinlogPaths", ctx, req)}
}
@ -439,9 +439,9 @@ type MockBroker_ShowPartitions_Call struct {
}
// ShowPartitions is a helper method to define mock.On call
// - ctx context.Context
// - dbName string
// - collectionName string
// - ctx context.Context
// - dbName string
// - collectionName string
func (_e *MockBroker_Expecter) ShowPartitions(ctx interface{}, dbName interface{}, collectionName interface{}) *MockBroker_ShowPartitions_Call {
return &MockBroker_ShowPartitions_Call{Call: _e.mock.On("ShowPartitions", ctx, dbName, collectionName)}
}
@ -483,8 +483,8 @@ type MockBroker_UpdateChannelCheckpoint_Call struct {
}
// UpdateChannelCheckpoint is a helper method to define mock.On call
// - ctx context.Context
// - channelCPs []*msgpb.MsgPosition
// - ctx context.Context
// - channelCPs []*msgpb.MsgPosition
func (_e *MockBroker_Expecter) UpdateChannelCheckpoint(ctx interface{}, channelCPs interface{}) *MockBroker_UpdateChannelCheckpoint_Call {
return &MockBroker_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, channelCPs)}
}
@ -526,8 +526,8 @@ type MockBroker_UpdateSegmentStatistics_Call struct {
}
// UpdateSegmentStatistics is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.UpdateSegmentStatisticsRequest
// - ctx context.Context
// - req *datapb.UpdateSegmentStatisticsRequest
func (_e *MockBroker_Expecter) UpdateSegmentStatistics(ctx interface{}, req interface{}) *MockBroker_UpdateSegmentStatistics_Call {
return &MockBroker_UpdateSegmentStatistics_Call{Call: _e.mock.On("UpdateSegmentStatistics", ctx, req)}
}

View File

@ -1258,8 +1258,8 @@ type DataCoordCatalog_SaveChannelCheckpoints_Call struct {
}
// SaveChannelCheckpoints is a helper method to define mock.On call
// - ctx context.Context
// - positions []*msgpb.MsgPosition
// - ctx context.Context
// - positions []*msgpb.MsgPosition
func (_e *DataCoordCatalog_Expecter) SaveChannelCheckpoints(ctx interface{}, positions interface{}) *DataCoordCatalog_SaveChannelCheckpoints_Call {
return &DataCoordCatalog_SaveChannelCheckpoints_Call{Call: _e.mock.On("SaveChannelCheckpoints", ctx, positions)}
}

View File

@ -158,6 +158,7 @@ type Manager struct {
Collection CollectionManager
Segment SegmentManager
DiskCache cache.Cache[int64, Segment]
Loader Loader
}
func NewManager() *Manager {
@ -189,7 +190,7 @@ func NewManager() *Manager {
if collection == nil {
return nil, merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load segment fields")
}
err := loadSealedSegmentFields(context.Background(), collection, segment.(*LocalSegment), info.BinlogPaths, info.GetNumOfRows(), WithLoadStatus(LoadStatusMapped))
err := manager.Loader.LoadSegment(context.Background(), segment.(*LocalSegment), info, LoadStatusMapped)
return nil, err
})
if err != nil {
@ -205,6 +206,10 @@ func NewManager() *Manager {
return manager
}
func (mgr *Manager) SetLoader(loader Loader) {
mgr.Loader = loader
}
type SegmentManager interface {
// Put puts the given segments in,
// and increases the ref count of the corresponding collection,

View File

@ -261,6 +261,51 @@ func (_c *MockLoader_LoadIndex_Call) RunAndReturn(run func(context.Context, *Loc
return _c
}
// LoadSegment provides a mock function with given fields: ctx, segment, loadInfo, loadStatus
func (_m *MockLoader) LoadSegment(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo, loadStatus LoadStatus) error {
ret := _m.Called(ctx, segment, loadInfo, loadStatus)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo, LoadStatus) error); ok {
r0 = rf(ctx, segment, loadInfo, loadStatus)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockLoader_LoadSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadSegment'
type MockLoader_LoadSegment_Call struct {
*mock.Call
}
// LoadSegment is a helper method to define mock.On call
// - ctx context.Context
// - segment *LocalSegment
// - loadInfo *querypb.SegmentLoadInfo
// - loadStatus LoadStatus
func (_e *MockLoader_Expecter) LoadSegment(ctx interface{}, segment interface{}, loadInfo interface{}, loadStatus interface{}) *MockLoader_LoadSegment_Call {
return &MockLoader_LoadSegment_Call{Call: _e.mock.On("LoadSegment", ctx, segment, loadInfo, loadStatus)}
}
func (_c *MockLoader_LoadSegment_Call) Run(run func(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo, loadStatus LoadStatus)) *MockLoader_LoadSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*LocalSegment), args[2].(*querypb.SegmentLoadInfo), args[3].(LoadStatus))
})
return _c
}
func (_c *MockLoader_LoadSegment_Call) Return(_a0 error) *MockLoader_LoadSegment_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockLoader_LoadSegment_Call) RunAndReturn(run func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo, LoadStatus) error) *MockLoader_LoadSegment_Call {
_c.Call.Return(run)
return _c
}
// NewMockLoader creates a new instance of MockLoader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockLoader(t interface {

View File

@ -74,6 +74,7 @@ var ErrSegmentUnhealthy = errors.New("segment unhealthy")
type IndexedFieldInfo struct {
FieldBinlog *datapb.FieldBinlog
IndexInfo *querypb.FieldIndexInfo
LazyLoad bool
}
type baseSegment struct {
@ -1137,9 +1138,19 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
opt(options)
}
if options.LoadStatus == LoadStatusMeta {
s.addIndex(indexInfo.GetFieldID(), &IndexedFieldInfo{
FieldBinlog: &datapb.FieldBinlog{
FieldID: indexInfo.GetFieldID(),
},
IndexInfo: indexInfo,
LazyLoad: true,
})
return nil
}
old := s.GetIndex(indexInfo.GetFieldID())
// the index loaded
if old != nil && old.IndexInfo.GetIndexID() == indexInfo.GetIndexID() {
if old != nil && old.IndexInfo.GetIndexID() == indexInfo.GetIndexID() && !old.LazyLoad {
log.Warn("index already loaded")
return nil
}
@ -1228,6 +1239,7 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F
FieldID: indexInfo.GetFieldID(),
},
IndexInfo: indexInfo,
LazyLoad: false,
})
log.Info("updateSegmentIndex done")
return nil

View File

@ -78,6 +78,12 @@ type Loader interface {
// LoadIndex append index for segment and remove vector binlogs.
LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo, version int64) error
LoadSegment(ctx context.Context,
segment *LocalSegment,
loadInfo *querypb.SegmentLoadInfo,
loadStatus LoadStatus,
) error
}
type LoadResource struct {
@ -208,7 +214,7 @@ func (loader *segmentLoaderV2) Load(ctx context.Context,
if loadInfo.GetLevel() == datapb.SegmentLevel_L0 {
err = loader.LoadDelta(ctx, collectionID, segment.(*LocalSegment))
} else {
err = loader.loadSegment(ctx, segment.(*LocalSegment), loadInfo)
err = loader.LoadSegment(ctx, segment.(*LocalSegment), loadInfo, LoadStatusInMemory)
}
if err != nil {
log.Warn("load segment failed when load data into memory",
@ -365,9 +371,10 @@ func (loader *segmentLoaderV2) loadBloomFilter(ctx context.Context, segmentID in
return nil
}
func (loader *segmentLoaderV2) loadSegment(ctx context.Context,
func (loader *segmentLoaderV2) LoadSegment(ctx context.Context,
segment *LocalSegment,
loadInfo *querypb.SegmentLoadInfo,
loadstatus LoadStatus,
) error {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", segment.Collection()),
@ -596,16 +603,21 @@ func (loader *segmentLoader) Load(ctx context.Context,
debug.FreeOSMemory()
}()
loadStatus := LoadStatusInMemory
collection := loader.manager.Collection.Get(collectionID)
if collection == nil {
err := merr.WrapErrCollectionNotFound(collectionID)
log.Warn("failed to get collection", zap.Error(err))
return nil, err
}
if common.IsCollectionLazyLoadEnabled(collection.Schema().Properties...) {
loadStatus = LoadStatusMeta
}
for _, info := range infos {
loadInfo := info
collection := loader.manager.Collection.Get(collectionID)
if collection == nil {
err := merr.WrapErrCollectionNotFound(collectionID)
log.Warn("failed to get collection", zap.Error(err))
return nil, err
}
segment, err := NewSegment(
ctx,
collection,
@ -640,7 +652,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
if loadInfo.GetLevel() == datapb.SegmentLevel_L0 {
err = loader.LoadDeltaLogs(ctx, segment, loadInfo.GetDeltalogs())
} else {
err = loader.loadSegment(ctx, segment.(*LocalSegment), loadInfo)
err = loader.LoadSegment(ctx, segment.(*LocalSegment), loadInfo, loadStatus)
}
if err != nil {
log.Warn("load segment failed when load data into memory",
@ -913,9 +925,88 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI
return loadedBfs.Collect(), nil
}
func (loader *segmentLoader) loadSegment(ctx context.Context,
func separateIndexAndBinlog(loadInfo *querypb.SegmentLoadInfo) (map[int64]*IndexedFieldInfo, []*datapb.FieldBinlog) {
fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
for _, indexInfo := range loadInfo.IndexInfos {
if len(indexInfo.GetIndexFilePaths()) > 0 {
fieldID := indexInfo.FieldID
fieldID2IndexInfo[fieldID] = indexInfo
}
}
indexedFieldInfos := make(map[int64]*IndexedFieldInfo)
fieldBinlogs := make([]*datapb.FieldBinlog, 0, len(loadInfo.BinlogPaths))
for _, fieldBinlog := range loadInfo.BinlogPaths {
fieldID := fieldBinlog.FieldID
// check num rows of data meta and index meta are consistent
if indexInfo, ok := fieldID2IndexInfo[fieldID]; ok {
fieldInfo := &IndexedFieldInfo{
FieldBinlog: fieldBinlog,
IndexInfo: indexInfo,
}
indexedFieldInfos[fieldID] = fieldInfo
} else {
fieldBinlogs = append(fieldBinlogs, fieldBinlog)
}
}
return indexedFieldInfos, fieldBinlogs
}
func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *querypb.SegmentLoadInfo, segment *LocalSegment, collection *Collection, loadStatus LoadStatus) error {
indexedFieldInfos, fieldBinlogs := separateIndexAndBinlog(loadInfo)
schemaHelper, _ := typeutil.CreateSchemaHelper(collection.Schema())
if err := segment.AddFieldDataInfo(ctx, loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil {
return err
}
tr := timerecord.NewTimeRecorder("segmentLoader.LoadIndex")
log.Info("load fields...",
zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)),
)
if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos, WithLoadStatus(loadStatus)); err != nil {
return err
}
metrics.QueryNodeLoadIndexLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
for fieldID, info := range indexedFieldInfos {
field, err := schemaHelper.GetFieldFromID(fieldID)
if err != nil {
return err
}
if !typeutil.IsVectorType(field.GetDataType()) && !segment.HasRawData(fieldID) {
log.Info("field index doesn't include raw data, load binlog...",
zap.Int64("fieldID", fieldID),
zap.String("index", info.IndexInfo.GetIndexName()),
)
status := loadStatus
if status != LoadStatusMeta {
status = LoadStatusMapped
}
// for scalar index's raw data, only load to mmap not memory
if err = segment.LoadFieldData(ctx, fieldID, loadInfo.GetNumOfRows(), info.FieldBinlog, WithLoadStatus(status)); err != nil {
log.Warn("load raw data failed", zap.Int64("fieldID", fieldID), zap.Error(err))
return err
}
}
}
if err := loadSealedSegmentFields(ctx, collection, segment, fieldBinlogs, loadInfo.GetNumOfRows(), WithLoadStatus(loadStatus)); err != nil {
return err
}
// https://github.com/milvus-io/milvus/23654
// legacy entry num = 0
if err := loader.patchEntryNumber(ctx, segment, loadInfo); err != nil {
return err
}
return nil
}
func (loader *segmentLoader) LoadSegment(ctx context.Context,
segment *LocalSegment,
loadInfo *querypb.SegmentLoadInfo,
loadStatus LoadStatus,
) error {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", segment.Collection()),
@ -940,75 +1031,10 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
defer debug.FreeOSMemory()
if segment.Type() == SegmentTypeSealed {
loadStatus := LoadStatusInMemory
if common.IsCollectionLazyLoadEnabled(collection.Schema().GetProperties()...) {
if loadStatus == LoadStatusMeta {
segment.baseSegment.loadInfo = loadInfo
loadStatus = LoadStatusMeta
}
fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
for _, indexInfo := range loadInfo.IndexInfos {
if len(indexInfo.GetIndexFilePaths()) > 0 {
fieldID := indexInfo.FieldID
fieldID2IndexInfo[fieldID] = indexInfo
}
}
indexedFieldInfos := make(map[int64]*IndexedFieldInfo)
fieldBinlogs := make([]*datapb.FieldBinlog, 0, len(loadInfo.BinlogPaths))
for _, fieldBinlog := range loadInfo.BinlogPaths {
fieldID := fieldBinlog.FieldID
// check num rows of data meta and index meta are consistent
if indexInfo, ok := fieldID2IndexInfo[fieldID]; ok {
fieldInfo := &IndexedFieldInfo{
FieldBinlog: fieldBinlog,
IndexInfo: indexInfo,
}
indexedFieldInfos[fieldID] = fieldInfo
} else {
fieldBinlogs = append(fieldBinlogs, fieldBinlog)
}
}
schemaHelper, _ := typeutil.CreateSchemaHelper(collection.Schema())
if err := segment.AddFieldDataInfo(ctx, loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil {
return err
}
tr := timerecord.NewTimeRecorder("segmentLoader.LoadIndex")
log.Info("load fields...",
zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)),
)
if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil {
return err
}
metrics.QueryNodeLoadIndexLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
for fieldID, info := range indexedFieldInfos {
field, err := schemaHelper.GetFieldFromID(fieldID)
if err != nil {
return err
}
if !typeutil.IsVectorType(field.GetDataType()) && !segment.HasRawData(fieldID) {
log.Info("field index doesn't include raw data, load binlog...",
zap.Int64("fieldID", fieldID),
zap.String("index", info.IndexInfo.GetIndexName()),
)
// for scalar index's raw data, only load to mmap not memory
if err = segment.LoadFieldData(ctx, fieldID, loadInfo.GetNumOfRows(), info.FieldBinlog, WithLoadStatus(LoadStatusMapped)); err != nil {
log.Warn("load raw data failed", zap.Int64("fieldID", fieldID), zap.Error(err))
return err
}
}
}
if err := loadSealedSegmentFields(ctx, collection, segment, fieldBinlogs, loadInfo.GetNumOfRows(), WithLoadStatus(loadStatus)); err != nil {
return err
}
// https://github.com/milvus-io/milvus/23654
// legacy entry num = 0
if err := loader.patchEntryNumber(ctx, segment, loadInfo); err != nil {
if err := loader.loadSealedSegment(ctx, loadInfo, segment, collection, loadStatus); err != nil {
return err
}
} else {
@ -1102,6 +1128,7 @@ func (loader *segmentLoader) loadFieldsIndex(ctx context.Context,
segment *LocalSegment,
numRows int64,
indexedFieldInfos map[int64]*IndexedFieldInfo,
opts ...loadOption,
) error {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", segment.Collection()),
@ -1112,7 +1139,7 @@ func (loader *segmentLoader) loadFieldsIndex(ctx context.Context,
for fieldID, fieldInfo := range indexedFieldInfos {
indexInfo := fieldInfo.IndexInfo
err := loader.loadFieldIndex(ctx, segment, indexInfo)
err := loader.loadFieldIndex(ctx, segment, indexInfo, opts...)
if err != nil {
return err
}
@ -1139,7 +1166,7 @@ func (loader *segmentLoader) loadFieldsIndex(ctx context.Context,
return nil
}
func (loader *segmentLoader) loadFieldIndex(ctx context.Context, segment *LocalSegment, indexInfo *querypb.FieldIndexInfo) error {
func (loader *segmentLoader) loadFieldIndex(ctx context.Context, segment *LocalSegment, indexInfo *querypb.FieldIndexInfo, opts ...loadOption) error {
filteredPaths := make([]string, 0, len(indexInfo.IndexFilePaths))
for _, indexPath := range indexInfo.IndexFilePaths {
@ -1159,7 +1186,7 @@ func (loader *segmentLoader) loadFieldIndex(ctx context.Context, segment *LocalS
return merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load field index")
}
return segment.LoadIndex(ctx, indexInfo, fieldType)
return segment.LoadIndex(ctx, indexInfo, fieldType, opts...)
}
func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet,

View File

@ -354,6 +354,7 @@ func (node *QueryNode) Init() error {
} else {
node.loader = segments.NewLoader(node.manager, node.chunkManager)
}
node.manager.SetLoader(node.loader)
node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, node.GetNodeID())
// init pipeline manager
node.pipelineManager = pipeline.NewManager(node.manager, node.tSafeManager, node.dispClient, node.delegators)