mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-04 04:49:08 +08:00
Cherry-pick from master pr: #32945 issue: #31822 --------- Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
parent
32bfd9befa
commit
d98e1f6ff5
@ -44,18 +44,20 @@ func newDefaultLimitReducer(req *querypb.QueryRequest, schema *schemapb.Collecti
|
||||
}
|
||||
|
||||
type defaultLimitReducerSegcore struct {
|
||||
req *querypb.QueryRequest
|
||||
schema *schemapb.CollectionSchema
|
||||
req *querypb.QueryRequest
|
||||
schema *schemapb.CollectionSchema
|
||||
manager *Manager
|
||||
}
|
||||
|
||||
func (r *defaultLimitReducerSegcore) Reduce(ctx context.Context, results []*segcorepb.RetrieveResults, segments []Segment, plan *RetrievePlan) (*segcorepb.RetrieveResults, error) {
|
||||
mergeParam := NewMergeParam(r.req.GetReq().GetLimit(), r.req.GetReq().GetOutputFieldsId(), r.schema, r.req.GetReq().GetReduceStopForBest())
|
||||
return mergeSegcoreRetrieveResultsAndFillIfEmpty(ctx, results, mergeParam, segments, plan)
|
||||
return mergeSegcoreRetrieveResultsAndFillIfEmpty(ctx, results, mergeParam, segments, plan, r.manager)
|
||||
}
|
||||
|
||||
func newDefaultLimitReducerSegcore(req *querypb.QueryRequest, schema *schemapb.CollectionSchema) *defaultLimitReducerSegcore {
|
||||
func newDefaultLimitReducerSegcore(req *querypb.QueryRequest, schema *schemapb.CollectionSchema, manager *Manager) *defaultLimitReducerSegcore {
|
||||
return &defaultLimitReducerSegcore{
|
||||
req: req,
|
||||
schema: schema,
|
||||
req: req,
|
||||
schema: schema,
|
||||
manager: manager,
|
||||
}
|
||||
}
|
||||
|
@ -24,9 +24,9 @@ type segCoreReducer interface {
|
||||
Reduce(context.Context, []*segcorepb.RetrieveResults, []Segment, *RetrievePlan) (*segcorepb.RetrieveResults, error)
|
||||
}
|
||||
|
||||
func CreateSegCoreReducer(req *querypb.QueryRequest, schema *schemapb.CollectionSchema) segCoreReducer {
|
||||
func CreateSegCoreReducer(req *querypb.QueryRequest, schema *schemapb.CollectionSchema, manager *Manager) segCoreReducer {
|
||||
if req.GetReq().GetIsCount() {
|
||||
return &cntReducerSegCore{}
|
||||
}
|
||||
return newDefaultLimitReducerSegcore(req, schema)
|
||||
return newDefaultLimitReducerSegcore(req, schema, manager)
|
||||
}
|
||||
|
@ -49,12 +49,12 @@ func (suite *ReducerFactorySuite) TestCreateSegCoreReducer() {
|
||||
},
|
||||
}
|
||||
|
||||
suite.sr = CreateSegCoreReducer(req, nil)
|
||||
suite.sr = CreateSegCoreReducer(req, nil, nil)
|
||||
_, suite.ok = suite.sr.(*defaultLimitReducerSegcore)
|
||||
suite.True(suite.ok)
|
||||
|
||||
req.Req.IsCount = true
|
||||
suite.sr = CreateSegCoreReducer(req, nil)
|
||||
suite.sr = CreateSegCoreReducer(req, nil, nil)
|
||||
_, suite.ok = suite.sr.(*cntReducerSegCore)
|
||||
suite.True(suite.ok)
|
||||
}
|
||||
|
@ -494,7 +494,7 @@ func getTS(i *internalpb.RetrieveResults, idx int64) uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcorepb.RetrieveResults, param *mergeParam, segments []Segment, plan *RetrievePlan) (*segcorepb.RetrieveResults, error) {
|
||||
func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcorepb.RetrieveResults, param *mergeParam, segments []Segment, plan *RetrievePlan, manager *Manager) (*segcorepb.RetrieveResults, error) {
|
||||
ctx, span := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "MergeSegcoreResults")
|
||||
defer span.End()
|
||||
|
||||
@ -605,8 +605,12 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
|
||||
}
|
||||
idx, theOffsets := i, offsets
|
||||
future := GetSQPool().Submit(func() (any, error) {
|
||||
r, err := validSegments[idx].RetrieveByOffsets(ctx, plan, theOffsets)
|
||||
if err != nil {
|
||||
var r *segcorepb.RetrieveResults
|
||||
var err error
|
||||
if err := doOnSegment(ctx, manager, validSegments[idx], func(ctx context.Context, segment Segment) error {
|
||||
r, err = segment.RetrieveByOffsets(ctx, plan, theOffsets)
|
||||
return err
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
segmentResults[idx] = r
|
||||
@ -666,8 +670,9 @@ func mergeSegcoreRetrieveResultsAndFillIfEmpty(
|
||||
param *mergeParam,
|
||||
segments []Segment,
|
||||
plan *RetrievePlan,
|
||||
manager *Manager,
|
||||
) (*segcorepb.RetrieveResults, error) {
|
||||
mergedResult, err := MergeSegcoreRetrieveResults(ctx, retrieveResults, param, segments, plan)
|
||||
mergedResult, err := MergeSegcoreRetrieveResults(ctx, retrieveResults, param, segments, plan, manager)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ type ResultSuite struct {
|
||||
|
||||
func MergeSegcoreRetrieveResultsV1(ctx context.Context, retrieveResults []*segcorepb.RetrieveResults, param *mergeParam) (*segcorepb.RetrieveResults, error) {
|
||||
plan := &RetrievePlan{ignoreNonPk: false}
|
||||
return MergeSegcoreRetrieveResults(ctx, retrieveResults, param, nil, plan)
|
||||
return MergeSegcoreRetrieveResults(ctx, retrieveResults, param, nil, plan, nil)
|
||||
}
|
||||
|
||||
func (suite *ResultSuite) TestResult_MergeSegcoreRetrieveResults() {
|
||||
|
@ -22,13 +22,11 @@ import (
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/segcorepb"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/segments/metricsutil"
|
||||
"github.com/milvus-io/milvus/internal/util/streamrpc"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
@ -48,9 +46,15 @@ type RetrieveSegmentResult struct {
|
||||
func retrieveOnSegments(ctx context.Context, mgr *Manager, segments []Segment, segType SegmentType, plan *RetrievePlan, req *querypb.QueryRequest) ([]RetrieveSegmentResult, error) {
|
||||
resultCh := make(chan RetrieveSegmentResult, len(segments))
|
||||
|
||||
// TODO(longjiquan): remove this limit after two-phase retrieval can be applied on lru-segment.
|
||||
plan.ignoreNonPk = !paramtable.Get().QueryNodeCfg.UseStreamComputing.GetAsBool() &&
|
||||
len(segments) > 1 && req.GetReq().GetLimit() != typeutil.Unlimited && plan.ShouldIgnoreNonPk()
|
||||
anySegIsLazyLoad := func() bool {
|
||||
for _, seg := range segments {
|
||||
if seg.IsLazyLoad() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}()
|
||||
plan.ignoreNonPk = !anySegIsLazyLoad && len(segments) > 1 && req.GetReq().GetLimit() != typeutil.Unlimited && plan.ShouldIgnoreNonPk()
|
||||
|
||||
label := metrics.SealedSegmentLabel
|
||||
if segType == commonpb.SegmentState_Growing {
|
||||
@ -72,41 +76,8 @@ func retrieveOnSegments(ctx context.Context, mgr *Manager, segments []Segment, s
|
||||
return nil
|
||||
}
|
||||
|
||||
errGroup, ctx := errgroup.WithContext(ctx)
|
||||
for _, segment := range segments {
|
||||
seg := segment
|
||||
errGroup.Go(func() error {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
// record search time and cache miss
|
||||
var err error
|
||||
accessRecord := metricsutil.NewQuerySegmentAccessRecord(getSegmentMetricLabel(seg))
|
||||
defer func() {
|
||||
accessRecord.Finish(err)
|
||||
}()
|
||||
|
||||
if seg.IsLazyLoad() {
|
||||
ctx, cancel := withLazyLoadTimeoutContext(ctx)
|
||||
defer cancel()
|
||||
|
||||
var missing bool
|
||||
missing, err = mgr.DiskCache.Do(ctx, seg.ID(), retriever)
|
||||
if missing {
|
||||
accessRecord.CacheMissing()
|
||||
}
|
||||
if err != nil {
|
||||
log.Warn("failed to do query disk cache", zap.Int64("segID", seg.ID()), zap.Error(err))
|
||||
}
|
||||
return err
|
||||
}
|
||||
return retriever(ctx, seg)
|
||||
})
|
||||
}
|
||||
err := errGroup.Wait()
|
||||
err := doOnSegments(ctx, mgr, segments, retriever)
|
||||
close(resultCh)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -118,7 +89,7 @@ func retrieveOnSegments(ctx context.Context, mgr *Manager, segments []Segment, s
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func retrieveOnSegmentsWithStream(ctx context.Context, segments []Segment, segType SegmentType, plan *RetrievePlan, svr streamrpc.QueryStreamServer) error {
|
||||
func retrieveOnSegmentsWithStream(ctx context.Context, mgr *Manager, segments []Segment, segType SegmentType, plan *RetrievePlan, svr streamrpc.QueryStreamServer) error {
|
||||
var (
|
||||
errs = make([]error, len(segments))
|
||||
wg sync.WaitGroup
|
||||
@ -134,7 +105,12 @@ func retrieveOnSegmentsWithStream(ctx context.Context, segments []Segment, segTy
|
||||
go func(segment Segment, i int) {
|
||||
defer wg.Done()
|
||||
tr := timerecord.NewTimeRecorder("retrieveOnSegmentsWithStream")
|
||||
result, err := segment.Retrieve(ctx, plan)
|
||||
var result *segcorepb.RetrieveResults
|
||||
err := doOnSegment(ctx, mgr, segment, func(ctx context.Context, segment Segment) error {
|
||||
var err error
|
||||
result, err = segment.Retrieve(ctx, plan)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
errs[i] = err
|
||||
return
|
||||
@ -214,6 +190,6 @@ func RetrieveStream(ctx context.Context, manager *Manager, plan *RetrievePlan, r
|
||||
return retrieveSegments, err
|
||||
}
|
||||
|
||||
err = retrieveOnSegmentsWithStream(ctx, retrieveSegments, SegType, plan, srv)
|
||||
err = retrieveOnSegmentsWithStream(ctx, manager, retrieveSegments, SegType, plan, srv)
|
||||
return retrieveSegments, err
|
||||
}
|
||||
|
66
internal/querynodev2/segments/segment_do.go
Normal file
66
internal/querynodev2/segments/segment_do.go
Normal file
@ -0,0 +1,66 @@
|
||||
package segments
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/segments/metricsutil"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
)
|
||||
|
||||
type doOnSegmentFunc func(ctx context.Context, segment Segment) error
|
||||
|
||||
func doOnSegment(ctx context.Context, mgr *Manager, seg Segment, do doOnSegmentFunc) error {
|
||||
// record search time and cache miss
|
||||
var err error
|
||||
accessRecord := metricsutil.NewQuerySegmentAccessRecord(getSegmentMetricLabel(seg))
|
||||
defer func() {
|
||||
accessRecord.Finish(err)
|
||||
}()
|
||||
if seg.IsLazyLoad() {
|
||||
ctx, cancel := withLazyLoadTimeoutContext(ctx)
|
||||
defer cancel()
|
||||
|
||||
var missing bool
|
||||
missing, err = mgr.DiskCache.Do(ctx, seg.ID(), do)
|
||||
if missing {
|
||||
accessRecord.CacheMissing()
|
||||
}
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("failed to do query disk cache", zap.Int64("segID", seg.ID()), zap.Error(err))
|
||||
}
|
||||
return err
|
||||
}
|
||||
return do(ctx, seg)
|
||||
}
|
||||
|
||||
// doOnSegments Be careful to use this, since no any pool is used.
|
||||
func doOnSegments(ctx context.Context, mgr *Manager, segments []Segment, do doOnSegmentFunc) error {
|
||||
errGroup, ctx := errgroup.WithContext(ctx)
|
||||
for _, segment := range segments {
|
||||
seg := segment
|
||||
errGroup.Go(func() error {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
return doOnSegment(ctx, mgr, seg, do)
|
||||
})
|
||||
}
|
||||
return errGroup.Wait()
|
||||
}
|
||||
|
||||
func doOnSegmentsWithPool(ctx context.Context, mgr *Manager, segments []Segment, do doOnSegmentFunc, pool *conc.Pool[any]) error {
|
||||
futures := make([]*conc.Future[any], 0, len(segments))
|
||||
for _, segment := range segments {
|
||||
seg := segment
|
||||
future := pool.Submit(func() (any, error) {
|
||||
err := doOnSegment(ctx, mgr, seg, do)
|
||||
return nil, err
|
||||
})
|
||||
futures = append(futures, future)
|
||||
}
|
||||
return conc.BlockOnAll(futures...)
|
||||
}
|
@ -123,6 +123,7 @@ func (t *QueryTask) Execute() error {
|
||||
reducer := segments.CreateSegCoreReducer(
|
||||
t.req,
|
||||
t.collection.Schema(),
|
||||
t.segmentManager,
|
||||
)
|
||||
beforeReduce := time.Now()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user