diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 5a2c18a90b..9fefe6c7f0 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -310,15 +310,29 @@ func (loader *segmentLoader) loadFiledBinlogData(segment *Segment, fieldBinlogs segmentType := segment.getType() iCodec := storage.InsertCodec{} - blobs := make([]*storage.Blob, 0) + + // change all field bin log loading into concurrent + loadFutures := make([]*concurrency.Future, 0) for _, fieldBinlog := range fieldBinlogs { - fieldBlobs, err := loader.loadFieldBinlogs(fieldBinlog) - if err != nil { - return err - } - blobs = append(blobs, fieldBlobs...) + futures := loader.loadFieldBinlogsAsync(fieldBinlog) + loadFutures = append(loadFutures, futures...) } + // wait for async load result + blobs := make([]*storage.Blob, len(loadFutures)) + for index, future := range loadFutures { + if !future.OK() { + return future.Err() + } + + blob := future.Value().(*storage.Blob) + blobs[index] = blob + } + log.Info("log field binlogs done", + zap.Int64("collection", segment.collectionID), + zap.Int64("segment", segment.segmentID), + zap.Any("field", fieldBinlogs)) + _, _, insertData, err := iCodec.Deserialize(blobs) if err != nil { log.Warn(err.Error()) @@ -350,12 +364,8 @@ func (loader *segmentLoader) loadFiledBinlogData(segment *Segment, fieldBinlogs } } -// Load binlogs concurrently into memory from KV storage -func (loader *segmentLoader) loadFieldBinlogs(field *datapb.FieldBinlog) ([]*storage.Blob, error) { - log.Debug("load field binlogs", - zap.Int64("fieldID", field.FieldID), - zap.Int("len(binlogs)", len(field.Binlogs))) - +// Load binlogs concurrently into memory from KV storage asyncly +func (loader *segmentLoader) loadFieldBinlogsAsync(field *datapb.FieldBinlog) []*concurrency.Future { futures := make([]*concurrency.Future, 0, len(field.Binlogs)) for i := range field.Binlogs { path := field.Binlogs[i].GetLogPath() @@ -374,21 +384,7 @@ func (loader *segmentLoader) loadFieldBinlogs(field *datapb.FieldBinlog) ([]*sto futures = append(futures, future) } - - blobs := make([]*storage.Blob, 0, len(field.Binlogs)) - for _, future := range futures { - if !future.OK() { - return nil, future.Err() - } - - blob := future.Value().(*storage.Blob) - blobs = append(blobs, blob) - } - - log.Debug("log field binlogs done", - zap.Int64("fieldID", field.FieldID)) - - return blobs, nil + return futures } func (loader *segmentLoader) loadIndexedFieldData(segment *Segment, vecFieldInfos map[int64]*IndexedFieldInfo) error { @@ -587,7 +583,7 @@ func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb } } if len(blobs) == 0 { - log.Info("there are no delta logs saved with segment", zap.Any("segmentID", segment.segmentID)) + log.Info("there are no delta logs saved with segment, skip loading delete record", zap.Any("segmentID", segment.segmentID)) return nil } _, _, deltaData, err := dCodec.Deserialize(blobs)