From 95e47bfcf8acaf4727a08532beb6a84f70b3a690 Mon Sep 17 00:00:00 2001 From: SimFG Date: Wed, 18 Sep 2024 19:21:11 +0800 Subject: [PATCH] fix: force to set the metric type in the search request (#36279) - issue: #35960 - pr: #35962 Signed-off-by: SimFG --- internal/querycoordv2/task/executor.go | 3 ++ internal/querynodev2/segments/collection.go | 32 ++++++++++++++++++++- internal/querynodev2/services.go | 4 +++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 3196c4ccbc..264911c659 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -366,6 +366,8 @@ func (ex *Executor) subscribeChannel(task *ChannelTask, step int) error { partitions..., ) + ex.setMetricTypeForMetaInfo(loadMeta, indexInfo) + dmChannel := ex.targetMgr.GetDmChannel(task.CollectionID(), action.ChannelName(), meta.NextTarget) if dmChannel == nil { msg := "channel does not exist in next target, skip it" @@ -738,4 +740,5 @@ func (ex *Executor) setMetricTypeForMetaInfo(metaInfo *querypb.LoadMetaInfo, ind } } } + log.Warn("metric type not found in index info, set it to default", zap.Any("indexInfos", indexInfos), zap.Any("metaInfo", metaInfo)) } diff --git a/internal/querynodev2/segments/collection.go b/internal/querynodev2/segments/collection.go index 5a5679c0c9..720da4dca8 100644 --- a/internal/querynodev2/segments/collection.go +++ b/internal/querynodev2/segments/collection.go @@ -40,6 +40,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -143,7 +144,8 @@ type Collection struct { // but Collection in Manager will be released before assign new replica of new resource group on these node. // so we don't need to update resource group in Collection. // if resource group is not updated, the reference count of collection manager works failed. - metricType atomic.String // deprecated + // Deprecated + metricType *atomic.String schema atomic.Pointer[schemapb.CollectionSchema] isGpuIndex bool loadFields typeutil.Set[int64] @@ -161,6 +163,12 @@ func (c *Collection) GetResourceGroup() string { return c.resourceGroup } +// Deprecated +// GetMetricType it'll deprecate in the 2.5.x, please don't use it +func (c *Collection) GetMetricType() string { + return c.metricType.Load() +} + // ID returns collection id func (c *Collection) ID() int64 { return c.id @@ -281,6 +289,7 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM refCount: atomic.NewUint32(0), isGpuIndex: isGpuIndex, loadFields: loadFieldIDs, + metricType: atomic.NewString(GetMetricType(schema, indexMeta)), // only for compatibility of rolling upgrade from 2.3.x to 2.4 } for _, partitionID := range loadMetaInfo.GetPartitionIDs() { coll.partitions.Insert(partitionID) @@ -290,6 +299,27 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM return coll } +func GetMetricType(schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta) string { + vecField, err := typeutil.GetVectorFieldSchema(schema) + if err != nil { + log.Warn("get vector field failed", zap.String("collection", schema.GetName()), zap.Error(err)) + return "" + } + vecIndexMeta, ok := lo.Find(indexMeta.GetIndexMetas(), func(fieldIndexMeta *segcorepb.FieldIndexMeta) bool { + return fieldIndexMeta.GetFieldID() == vecField.GetFieldID() + }) + if !ok || vecIndexMeta == nil { + log.Warn("cannot find index info for field", zap.String("field", vecField.GetName()), zap.String("collection", schema.GetName())) + return "" + } + metricType, err := funcutil.GetAttrByKeyFromRepeatedKV(common.MetricTypeKey, vecIndexMeta.GetIndexParams()) + if err != nil { + log.Warn("get metric type failed", zap.String("collection", schema.GetName()), zap.Error(err)) + return "" + } + return metricType +} + func NewCollectionWithoutSchema(collectionID int64, loadType querypb.LoadType) *Collection { return &Collection{ id: collectionID, diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index dad0962d4d..a8e6b3d03d 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -648,6 +648,7 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe zap.Int64("collectionID", req.Req.GetCollectionID()), zap.String("channel", channel), zap.String("scope", req.GetScope().String()), + zap.Any("metric_type", req.GetReq().GetMetricType()), ) channelsMvcc := make(map[string]uint64) for _, ch := range req.GetDmlChannels() { @@ -751,6 +752,9 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( resp.Status = merr.Status(merr.WrapErrCollectionNotFound(req.GetReq().GetCollectionID())) return resp, nil } + if req.Req.MetricType == "" { + req.Req.MetricType = collection.GetMetricType() + } toReduceResults := make([]*internalpb.SearchResults, len(req.GetDmlChannels())) runningGp, runningCtx := errgroup.WithContext(ctx)