fix: force to set the metric type in the search request (#36279)

- issue: #35960
- pr: #35962

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2024-09-18 19:21:11 +08:00 committed by GitHub
parent 817fe486d6
commit 95e47bfcf8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 38 additions and 1 deletions

View File

@ -366,6 +366,8 @@ func (ex *Executor) subscribeChannel(task *ChannelTask, step int) error {
partitions...,
)
ex.setMetricTypeForMetaInfo(loadMeta, indexInfo)
dmChannel := ex.targetMgr.GetDmChannel(task.CollectionID(), action.ChannelName(), meta.NextTarget)
if dmChannel == nil {
msg := "channel does not exist in next target, skip it"
@ -738,4 +740,5 @@ func (ex *Executor) setMetricTypeForMetaInfo(metaInfo *querypb.LoadMetaInfo, ind
}
}
}
log.Warn("metric type not found in index info, set it to default", zap.Any("indexInfos", indexInfos), zap.Any("metaInfo", metaInfo))
}

View File

@ -40,6 +40,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -143,7 +144,8 @@ type Collection struct {
// but Collection in Manager will be released before assign new replica of new resource group on these node.
// so we don't need to update resource group in Collection.
// if resource group is not updated, the reference count of collection manager works failed.
metricType atomic.String // deprecated
// Deprecated
metricType *atomic.String
schema atomic.Pointer[schemapb.CollectionSchema]
isGpuIndex bool
loadFields typeutil.Set[int64]
@ -161,6 +163,12 @@ func (c *Collection) GetResourceGroup() string {
return c.resourceGroup
}
// Deprecated
// GetMetricType it'll deprecate in the 2.5.x, please don't use it
func (c *Collection) GetMetricType() string {
return c.metricType.Load()
}
// ID returns collection id
func (c *Collection) ID() int64 {
return c.id
@ -281,6 +289,7 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM
refCount: atomic.NewUint32(0),
isGpuIndex: isGpuIndex,
loadFields: loadFieldIDs,
metricType: atomic.NewString(GetMetricType(schema, indexMeta)), // only for compatibility of rolling upgrade from 2.3.x to 2.4
}
for _, partitionID := range loadMetaInfo.GetPartitionIDs() {
coll.partitions.Insert(partitionID)
@ -290,6 +299,27 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM
return coll
}
func GetMetricType(schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta) string {
vecField, err := typeutil.GetVectorFieldSchema(schema)
if err != nil {
log.Warn("get vector field failed", zap.String("collection", schema.GetName()), zap.Error(err))
return ""
}
vecIndexMeta, ok := lo.Find(indexMeta.GetIndexMetas(), func(fieldIndexMeta *segcorepb.FieldIndexMeta) bool {
return fieldIndexMeta.GetFieldID() == vecField.GetFieldID()
})
if !ok || vecIndexMeta == nil {
log.Warn("cannot find index info for field", zap.String("field", vecField.GetName()), zap.String("collection", schema.GetName()))
return ""
}
metricType, err := funcutil.GetAttrByKeyFromRepeatedKV(common.MetricTypeKey, vecIndexMeta.GetIndexParams())
if err != nil {
log.Warn("get metric type failed", zap.String("collection", schema.GetName()), zap.Error(err))
return ""
}
return metricType
}
func NewCollectionWithoutSchema(collectionID int64, loadType querypb.LoadType) *Collection {
return &Collection{
id: collectionID,

View File

@ -648,6 +648,7 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
zap.Int64("collectionID", req.Req.GetCollectionID()),
zap.String("channel", channel),
zap.String("scope", req.GetScope().String()),
zap.Any("metric_type", req.GetReq().GetMetricType()),
)
channelsMvcc := make(map[string]uint64)
for _, ch := range req.GetDmlChannels() {
@ -751,6 +752,9 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
resp.Status = merr.Status(merr.WrapErrCollectionNotFound(req.GetReq().GetCollectionID()))
return resp, nil
}
if req.Req.MetricType == "" {
req.Req.MetricType = collection.GetMetricType()
}
toReduceResults := make([]*internalpb.SearchResults, len(req.GetDmlChannels()))
runningGp, runningCtx := errgroup.WithContext(ctx)