enhance: add more trace for search & query (#32734)

issue: https://github.com/milvus-io/milvus/issues/32728

---------

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
Jiquan Long 2024-05-07 13:03:29 +08:00 committed by GitHub
parent 4de063ae14
commit 1f58cda957
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 97 additions and 30 deletions

View File

@ -62,6 +62,7 @@ ReduceHelper::Reduce() {
void
ReduceHelper::Marshal() {
tracer::AutoSpan span("ReduceHelper::Marshal", trace_ctx_, false);
// get search result data blobs of slices
search_result_data_blobs_ =
std::make_unique<milvus::segcore::SearchResultDataBlobs>();
@ -131,6 +132,7 @@ ReduceHelper::FilterInvalidSearchResult(SearchResult* search_result) {
void
ReduceHelper::FillPrimaryKey() {
tracer::AutoSpan span("ReduceHelper::FillPrimaryKey", trace_ctx_, false);
// get primary keys for duplicates removal
uint32_t valid_index = 0;
for (auto& search_result : search_results_) {
@ -153,6 +155,8 @@ ReduceHelper::FillPrimaryKey() {
void
ReduceHelper::RefreshSearchResult() {
tracer::AutoSpan span(
"ReduceHelper::RefreshSearchResult", trace_ctx_, false);
for (int i = 0; i < num_segments_; i++) {
std::vector<int64_t> real_topks(total_nq_, 0);
auto search_result = search_results_[i];
@ -212,6 +216,7 @@ ReduceHelper::RefreshSearchResult() {
void
ReduceHelper::FillEntryData() {
tracer::AutoSpan span("ReduceHelper::FillEntryData", trace_ctx_, false);
for (auto search_result : search_results_) {
auto segment = static_cast<milvus::segcore::SegmentInterface*>(
search_result->segment_);
@ -312,6 +317,7 @@ ReduceHelper::ReduceSearchResultForOneNQ(int64_t qi,
void
ReduceHelper::ReduceResultData() {
tracer::AutoSpan span("ReduceHelper::ReduceResultData", trace_ctx_, false);
for (int i = 0; i < num_segments_; i++) {
auto search_result = search_results_[i];
auto result_count = search_result->get_total_result_count();

View File

@ -22,6 +22,7 @@
#include "common/QueryResult.h"
#include "query/PlanImpl.h"
#include "ReduceStructure.h"
#include "common/Tracer.h"
#include "segment_c.h"
namespace milvus::segcore {
@ -37,11 +38,13 @@ class ReduceHelper {
milvus::query::Plan* plan,
int64_t* slice_nqs,
int64_t* slice_topKs,
int64_t slice_num)
int64_t slice_num,
tracer::TraceContext* trace_ctx)
: search_results_(search_results),
plan_(plan),
slice_nqs_(slice_nqs, slice_nqs + slice_num),
slice_topKs_(slice_topKs, slice_topKs + slice_num) {
slice_topKs_(slice_topKs, slice_topKs + slice_num),
trace_ctx_(trace_ctx) {
Initialize();
}
@ -109,6 +112,8 @@ class ReduceHelper {
// output
std::unique_ptr<SearchResultDataBlobs> search_result_data_blobs_;
tracer::TraceContext* trace_ctx_;
};
} // namespace milvus::segcore

View File

@ -83,15 +83,17 @@ std::unique_ptr<proto::segcore::RetrieveResults>
SegmentInternalInterface::Retrieve(const query::RetrievePlan* plan,
Timestamp timestamp,
int64_t limit_size) const {
return Retrieve(plan, timestamp, limit_size, false);
return Retrieve(nullptr, plan, timestamp, limit_size, false);
}
std::unique_ptr<proto::segcore::RetrieveResults>
SegmentInternalInterface::Retrieve(const query::RetrievePlan* plan,
SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
const query::RetrievePlan* plan,
Timestamp timestamp,
int64_t limit_size,
bool ignore_non_pk) const {
std::shared_lock lck(mutex_);
tracer::AutoSpan span("Retrieve", trace_ctx, false);
auto results = std::make_unique<proto::segcore::RetrieveResults>();
query::ExecPlanNodeVisitor visitor(*this, timestamp);
auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_);
@ -118,7 +120,8 @@ SegmentInternalInterface::Retrieve(const query::RetrievePlan* plan,
results->mutable_offset()->Add(retrieve_results.result_offsets_.begin(),
retrieve_results.result_offsets_.end());
FillTargetEntry(plan,
FillTargetEntry(trace_ctx,
plan,
results,
retrieve_results.result_offsets_.data(),
retrieve_results.result_offsets_.size(),
@ -130,12 +133,15 @@ SegmentInternalInterface::Retrieve(const query::RetrievePlan* plan,
void
SegmentInternalInterface::FillTargetEntry(
tracer::TraceContext* trace_ctx,
const query::RetrievePlan* plan,
const std::unique_ptr<proto::segcore::RetrieveResults>& results,
const int64_t* offsets,
int64_t size,
bool ignore_non_pk,
bool fill_ids) const {
tracer::AutoSpan span("FillTargetEntry", trace_ctx, false);
auto fields_data = results->mutable_fields_data();
auto ids = results->mutable_ids();
auto pk_field_id = plan->schema_.get_primary_field_id();
@ -215,12 +221,14 @@ SegmentInternalInterface::FillTargetEntry(
}
std::unique_ptr<proto::segcore::RetrieveResults>
SegmentInternalInterface::Retrieve(const query::RetrievePlan* Plan,
SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
const query::RetrievePlan* Plan,
const int64_t* offsets,
int64_t size) const {
std::shared_lock lck(mutex_);
tracer::AutoSpan span("RetrieveByOffsets", trace_ctx, false);
auto results = std::make_unique<proto::segcore::RetrieveResults>();
FillTargetEntry(Plan, results, offsets, size, false, false);
FillTargetEntry(trace_ctx, Plan, results, offsets, size, false, false);
return results;
}

View File

@ -70,13 +70,15 @@ class SegmentInterface {
int64_t limit_size) const = 0;
virtual std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(const query::RetrievePlan* Plan,
Retrieve(tracer::TraceContext* trace_ctx,
const query::RetrievePlan* Plan,
Timestamp timestamp,
int64_t limit_size,
bool ignore_non_pk) const = 0;
virtual std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(const query::RetrievePlan* Plan,
Retrieve(tracer::TraceContext* trace_ctx,
const query::RetrievePlan* Plan,
const int64_t* offsets,
int64_t size) const = 0;
@ -171,13 +173,15 @@ class SegmentInternalInterface : public SegmentInterface {
int64_t limit_size) const override;
std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(const query::RetrievePlan* Plan,
Retrieve(tracer::TraceContext* trace_ctx,
const query::RetrievePlan* Plan,
Timestamp timestamp,
int64_t limit_size,
bool ignore_non_pk) const override;
std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(const query::RetrievePlan* Plan,
Retrieve(tracer::TraceContext* trace_ctx,
const query::RetrievePlan* Plan,
const int64_t* offsets,
int64_t size) const override;
@ -303,6 +307,7 @@ class SegmentInternalInterface : public SegmentInterface {
void
FillTargetEntry(
tracer::TraceContext* trace_ctx,
const query::RetrievePlan* plan,
const std::unique_ptr<proto::segcore::RetrieveResults>& results,
const int64_t* offsets,

View File

@ -74,7 +74,8 @@ GetStreamReduceResult(CSearchStreamReducer c_stream_reducer,
}
CStatus
ReduceSearchResultsAndFillData(CSearchResultDataBlobs* cSearchResultDataBlobs,
ReduceSearchResultsAndFillData(CTraceContext c_trace,
CSearchResultDataBlobs* cSearchResultDataBlobs,
CSearchPlan c_plan,
CSearchResult* c_search_results,
int64_t num_segments,
@ -85,13 +86,21 @@ ReduceSearchResultsAndFillData(CSearchResultDataBlobs* cSearchResultDataBlobs,
// get SearchResult and SearchPlan
auto plan = static_cast<milvus::query::Plan*>(c_plan);
AssertInfo(num_segments > 0, "num_segments must be greater than 0");
auto trace_ctx = milvus::tracer::TraceContext{
c_trace.traceID, c_trace.spanID, c_trace.traceFlags};
milvus::tracer::AutoSpan span(
"ReduceSearchResultsAndFillData", &trace_ctx, true);
std::vector<SearchResult*> search_results(num_segments);
for (int i = 0; i < num_segments; ++i) {
search_results[i] = static_cast<SearchResult*>(c_search_results[i]);
}
auto reduce_helper = milvus::segcore::ReduceHelper(
search_results, plan, slice_nqs, slice_topKs, num_slices);
auto reduce_helper = milvus::segcore::ReduceHelper(search_results,
plan,
slice_nqs,
slice_topKs,
num_slices,
&trace_ctx);
reduce_helper.Reduce();
reduce_helper.Marshal();

View File

@ -37,7 +37,8 @@ GetStreamReduceResult(CSearchStreamReducer c_stream_reducer,
CSearchResultDataBlobs* c_search_result_data_blobs);
CStatus
ReduceSearchResultsAndFillData(CSearchResultDataBlobs* cSearchResultDataBlobs,
ReduceSearchResultsAndFillData(CTraceContext c_trace,
CSearchResultDataBlobs* cSearchResultDataBlobs,
CSearchPlan c_plan,
CSearchResult* search_results,
int64_t num_segments,

View File

@ -141,8 +141,8 @@ Retrieve(CTraceContext c_trace,
c_trace.traceID, c_trace.spanID, c_trace.traceFlags};
milvus::tracer::AutoSpan span("SegCoreRetrieve", &trace_ctx, true);
auto retrieve_result =
segment->Retrieve(plan, timestamp, limit_size, ignore_non_pk);
auto retrieve_result = segment->Retrieve(
&trace_ctx, plan, timestamp, limit_size, ignore_non_pk);
auto size = retrieve_result->ByteSizeLong();
std::unique_ptr<uint8_t[]> buffer(new uint8_t[size]);
@ -174,7 +174,8 @@ RetrieveByOffsets(CTraceContext c_trace,
milvus::tracer::AutoSpan span(
"SegCoreRetrieveByOffsets", &trace_ctx, true);
auto retrieve_result = segment->Retrieve(plan, offsets, len);
auto retrieve_result =
segment->Retrieve(&trace_ctx, plan, offsets, len);
auto size = retrieve_result->ByteSizeLong();
std::unique_ptr<uint8_t[]> buffer(new uint8_t[size]);

View File

@ -1538,7 +1538,8 @@ TEST(CApiTest, ReduceNullResult) {
ASSERT_EQ(status.error_code, Success);
results.push_back(res);
CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
plan,
results.data(),
results.size(),
@ -1631,7 +1632,8 @@ TEST(CApiTest, ReduceRemoveDuplicates) {
results.push_back(res2);
CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
plan,
results.data(),
results.size(),
@ -1667,7 +1669,8 @@ TEST(CApiTest, ReduceRemoveDuplicates) {
results.push_back(res2);
results.push_back(res3);
CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
plan,
results.data(),
results.size(),
@ -1810,7 +1813,8 @@ testReduceSearchWithExpr(int N,
// 1. reduce
CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
plan,
results.data(),
results.size(),
@ -3504,7 +3508,8 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Term) {
auto slice_topKs = std::vector<int64_t>{topK};
CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
plan,
results.data(),
results.size(),
@ -3700,7 +3705,8 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Term) {
auto slice_topKs = std::vector<int64_t>{topK};
CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
plan,
results.data(),
results.size(),

View File

@ -621,7 +621,8 @@ TEST(GroupBY, Reduce) {
auto slice_nqs = std::vector<int64_t>{num_queries / 2, num_queries / 2};
auto slice_topKs = std::vector<int64_t>{topK / 2, topK};
CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
c_plan,
results.data(),
results.size(),

View File

@ -524,6 +524,9 @@ func (t *searchTask) reduceResults(ctx context.Context, toReduceResults []*inter
metricType = toReduceResults[0].GetMetricType()
}
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "reduceResults")
defer sp.End()
// Decode all search results
validSearchResults, err := decodeSearchResults(ctx, toReduceResults)
if err != nil {
@ -838,6 +841,8 @@ func doRequery(ctx context.Context,
// 3 2 5 4 1 (result ids)
// v3 v2 v5 v4 v1 (result vectors)
// ===========================================
_, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "reorganizeRequeryResults")
defer sp.End()
pkFieldData, err := typeutil.GetPrimaryFieldData(queryResult.GetFieldsData(), pkField)
if err != nil {
return err
@ -867,6 +872,8 @@ func doRequery(ctx context.Context,
}
func decodeSearchResults(ctx context.Context, searchResults []*internalpb.SearchResults) ([]*schemapb.SearchResultData, error) {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "decodeSearchResults")
defer sp.End()
tr := timerecord.NewTimeRecorder("decodeSearchResults")
results := make([]*schemapb.SearchResultData, 0)
for _, partialSearchResult := range searchResults {

View File

@ -151,7 +151,8 @@ func ReduceSearchResultsAndFillData(ctx context.Context, plan *SearchPlan, searc
cSliceTopKSPtr := (*C.int64_t)(&sliceTopKs[0])
cNumSlices := C.int64_t(len(sliceNQs))
var cSearchResultDataBlobs SearchResultDataBlobs
status := C.ReduceSearchResultsAndFillData(&cSearchResultDataBlobs, plan.cSearchPlan, cSearchResultPtr,
traceCtx := ParseCTraceContext(ctx)
status := C.ReduceSearchResultsAndFillData(traceCtx.ctx, &cSearchResultDataBlobs, plan.cSearchPlan, cSearchResultPtr,
cNumSegments, cSliceNQSPtr, cSliceTopKSPtr, cNumSlices)
if err := HandleCStatus(ctx, &status, "ReduceSearchResultsAndFillData failed"); err != nil {
return nil, err

View File

@ -51,6 +51,9 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult
return results[0], nil
}
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "ReduceSearchResults")
defer sp.End()
channelsMvcc := make(map[string]uint64)
for _, r := range results {
for ch, ts := range r.GetChannelsMvcc() {
@ -63,7 +66,7 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult
}
log := log.Ctx(ctx)
searchResultData, err := DecodeSearchResults(results)
searchResultData, err := DecodeSearchResults(ctx, results)
if err != nil {
log.Warn("shard leader decode search results errors", zap.Error(err))
return nil, err
@ -82,7 +85,7 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult
log.Warn("shard leader reduce errors", zap.Error(err))
return nil, err
}
searchResults, err := EncodeSearchResultData(reducedResultData, nq, topk, metricType)
searchResults, err := EncodeSearchResultData(ctx, reducedResultData, nq, topk, metricType)
if err != nil {
log.Warn("shard leader encode search result errors", zap.Error(err))
return nil, err
@ -112,6 +115,9 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult
}
func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.SearchResults, nq int64) (*internalpb.SearchResults, error) {
_, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "ReduceAdvancedSearchResults")
defer sp.End()
if len(results) == 1 {
return results[0], nil
}
@ -202,6 +208,8 @@ func MergeToAdvancedResults(ctx context.Context, results []*internalpb.SearchRes
}
func ReduceSearchResultData(ctx context.Context, searchResultData []*schemapb.SearchResultData, nq int64, topk int64) (*schemapb.SearchResultData, error) {
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "ReduceSearchResultData")
defer sp.End()
log := log.Ctx(ctx)
if len(searchResultData) == 0 {
@ -329,7 +337,10 @@ func SelectSearchResultData(dataArray []*schemapb.SearchResultData, resultOffset
return sel
}
func DecodeSearchResults(searchResults []*internalpb.SearchResults) ([]*schemapb.SearchResultData, error) {
func DecodeSearchResults(ctx context.Context, searchResults []*internalpb.SearchResults) ([]*schemapb.SearchResultData, error) {
_, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "DecodeSearchResults")
defer sp.End()
results := make([]*schemapb.SearchResultData, 0)
for _, partialSearchResult := range searchResults {
if partialSearchResult.SlicedBlob == nil {
@ -347,7 +358,10 @@ func DecodeSearchResults(searchResults []*internalpb.SearchResults) ([]*schemapb
return results, nil
}
func EncodeSearchResultData(searchResultData *schemapb.SearchResultData, nq int64, topk int64, metricType string) (searchResults *internalpb.SearchResults, err error) {
func EncodeSearchResultData(ctx context.Context, searchResultData *schemapb.SearchResultData, nq int64, topk int64, metricType string) (searchResults *internalpb.SearchResults, err error) {
_, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "EncodeSearchResultData")
defer sp.End()
searchResults = &internalpb.SearchResults{
Status: merr.Success(),
NumQueries: nq,
@ -382,6 +396,9 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
loopEnd int
)
_, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "MergeInternalRetrieveResult")
defer sp.End()
validRetrieveResults := []*internalpb.RetrieveResults{}
relatedDataSize := int64(0)
for _, r := range retrieveResults {