2021-11-10 23:55:32 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 10:09:43 +08:00
// with the License. You may obtain a copy of the License at
//
2021-11-10 23:55:32 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 10:09:43 +08:00
//
2021-11-10 23:55:32 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 10:09:43 +08:00
2021-06-22 14:40:07 +08:00
package proxy
2020-11-26 16:01:31 +08:00
import (
2022-08-04 11:04:34 +08:00
"context"
2023-08-30 10:52:26 +08:00
"encoding/json"
2021-03-05 10:15:27 +08:00
"fmt"
2020-11-26 16:01:31 +08:00
"strconv"
"strings"
2022-05-24 12:05:59 +08:00
"time"
2020-11-26 16:01:31 +08:00
2023-02-26 11:31:49 +08:00
"github.com/cockroachdb/errors"
2022-08-04 11:04:34 +08:00
"go.uber.org/zap"
2022-08-25 15:48:54 +08:00
"golang.org/x/crypto/bcrypt"
2022-08-04 11:04:34 +08:00
"google.golang.org/grpc/metadata"
2023-06-09 01:28:37 +08:00
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
2023-05-23 10:19:26 +08:00
"github.com/milvus-io/milvus/internal/parser/planparserv2"
"github.com/milvus-io/milvus/internal/proto/planpb"
2023-04-06 19:14:32 +08:00
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
2023-06-06 10:24:34 +08:00
typeutil2 "github.com/milvus-io/milvus/internal/util/typeutil"
2023-05-16 17:41:22 +08:00
"github.com/milvus-io/milvus/pkg/common"
2023-04-06 19:14:32 +08:00
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
2023-11-27 20:14:26 +08:00
"github.com/milvus-io/milvus/pkg/util/contextutil"
2023-04-06 19:14:32 +08:00
"github.com/milvus-io/milvus/pkg/util/crypto"
2024-03-27 19:33:09 +08:00
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
2023-04-06 19:14:32 +08:00
"github.com/milvus-io/milvus/pkg/util/merr"
2023-07-19 16:16:58 +08:00
"github.com/milvus-io/milvus/pkg/util/metric"
2023-12-03 19:22:33 +08:00
"github.com/milvus-io/milvus/pkg/util/paramtable"
2023-04-06 19:14:32 +08:00
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
2020-11-26 16:01:31 +08:00
)
2022-08-25 15:48:54 +08:00
const (
strongTS = 0
boundedTS = 2
// enableMultipleVectorFields indicates whether to enable multiple vector fields.
2023-12-28 16:40:46 +08:00
enableMultipleVectorFields = true
2022-05-24 12:05:59 +08:00
2022-08-25 15:48:54 +08:00
defaultMaxVarCharLength = 65535
2023-09-19 14:23:23 +08:00
defaultMaxArrayCapacity = 4096
2024-01-08 15:34:48 +08:00
defaultMaxSearchRequest = 1024
2023-10-15 13:52:06 +08:00
// DefaultArithmeticIndexType name of default index type for scalar field
2024-03-27 19:33:09 +08:00
DefaultArithmeticIndexType = indexparamcheck . IndexINVERTED
2022-08-25 15:48:54 +08:00
// DefaultStringIndexType name of default index type for varChar/string field
2024-03-27 19:33:09 +08:00
DefaultStringIndexType = indexparamcheck . IndexINVERTED
2024-01-08 15:34:48 +08:00
defaultRRFParamsValue = 60
2024-01-17 20:28:58 +08:00
maxRRFParamsValue = 16384
2022-08-25 15:48:54 +08:00
)
2022-03-14 23:20:02 +08:00
2022-08-04 11:04:34 +08:00
var logger = log . L ( ) . WithOptions ( zap . Fields ( zap . String ( "role" , typeutil . ProxyRole ) ) )
2021-12-24 19:27:38 +08:00
// isAlpha check if c is alpha.
2020-11-26 16:01:31 +08:00
func isAlpha ( c uint8 ) bool {
if ( c < 'A' || c > 'Z' ) && ( c < 'a' || c > 'z' ) {
return false
}
return true
}
2021-12-17 21:52:46 +08:00
// isNumber check if c is a number.
2020-11-26 16:01:31 +08:00
func isNumber ( c uint8 ) bool {
if c < '0' || c > '9' {
return false
}
return true
}
2023-06-25 14:42:43 +08:00
func validateMaxQueryResultWindow ( offset int64 , limit int64 ) error {
if offset < 0 {
return fmt . Errorf ( "%s [%d] is invalid, should be gte than 0" , OffsetKey , offset )
}
if limit <= 0 {
return fmt . Errorf ( "%s [%d] is invalid, should be greater than 0" , LimitKey , limit )
}
depth := offset + limit
maxQueryResultWindow := Params . QuotaConfig . MaxQueryResultWindow . GetAsInt64 ( )
if depth <= 0 || depth > maxQueryResultWindow {
return fmt . Errorf ( "(offset+limit) should be in range [1, %d], but got %d" , maxQueryResultWindow , depth )
}
return nil
}
2024-06-26 22:14:04 +08:00
func validateLimit ( limit int64 ) error {
2023-06-07 10:38:36 +08:00
topKLimit := Params . QuotaConfig . TopKLimit . GetAsInt64 ( )
2024-06-26 22:14:04 +08:00
if limit <= 0 || limit > topKLimit {
return fmt . Errorf ( "it should be in range [1, %d], but got %d" , topKLimit , limit )
2023-06-07 10:38:36 +08:00
}
return nil
}
func validateNQLimit ( limit int64 ) error {
nqLimit := Params . QuotaConfig . NQLimit . GetAsInt64 ( )
if limit <= 0 || limit > nqLimit {
return fmt . Errorf ( "nq (number of search vector per search request) should be in range [1, %d], but got %d" , nqLimit , limit )
2022-05-24 21:27:59 +08:00
}
return nil
}
2021-09-18 11:13:51 +08:00
func validateCollectionNameOrAlias ( entity , entityType string ) error {
entity = strings . TrimSpace ( entity )
2020-11-26 16:01:31 +08:00
2021-09-18 11:13:51 +08:00
if entity == "" {
2023-10-09 10:09:33 +08:00
return merr . WrapErrParameterInvalidMsg ( "collection %s should not be empty" , entityType )
2020-11-26 16:01:31 +08:00
}
2021-09-18 11:13:51 +08:00
invalidMsg := fmt . Sprintf ( "Invalid collection %s: %s. " , entityType , entity )
2022-12-07 18:01:19 +08:00
if len ( entity ) > Params . ProxyCfg . MaxNameLength . GetAsInt ( ) {
2023-10-09 10:09:33 +08:00
return merr . WrapErrParameterInvalidMsg ( "%s the length of a collection %s must be less than %s characters" , invalidMsg , entityType ,
2023-02-07 17:52:31 +08:00
Params . ProxyCfg . MaxNameLength . GetValue ( ) )
2020-11-26 16:01:31 +08:00
}
2021-09-18 11:13:51 +08:00
firstChar := entity [ 0 ]
2020-11-26 16:01:31 +08:00
if firstChar != '_' && ! isAlpha ( firstChar ) {
2023-10-09 10:09:33 +08:00
return merr . WrapErrParameterInvalidMsg ( "%s the first character of a collection %s must be an underscore or letter" , invalidMsg , entityType )
2020-11-26 16:01:31 +08:00
}
2021-09-18 11:13:51 +08:00
for i := 1 ; i < len ( entity ) ; i ++ {
c := entity [ i ]
2022-09-08 10:16:34 +08:00
if c != '_' && ! isAlpha ( c ) && ! isNumber ( c ) {
2023-10-09 10:09:33 +08:00
return merr . WrapErrParameterInvalidMsg ( "%s collection %s can only contain numbers, letters and underscores" , invalidMsg , entityType )
2023-02-07 17:52:31 +08:00
}
}
return nil
}
func ValidateResourceGroupName ( entity string ) error {
if entity == "" {
2023-02-16 10:48:34 +08:00
return errors . New ( "resource group name couldn't be empty" )
2023-02-07 17:52:31 +08:00
}
invalidMsg := fmt . Sprintf ( "Invalid resource group name %s." , entity )
if len ( entity ) > Params . ProxyCfg . MaxNameLength . GetAsInt ( ) {
2023-10-09 10:09:33 +08:00
return merr . WrapErrParameterInvalidMsg ( "%s the length of a resource group name must be less than %s characters" ,
2023-02-07 17:52:31 +08:00
invalidMsg , Params . ProxyCfg . MaxNameLength . GetValue ( ) )
}
firstChar := entity [ 0 ]
if firstChar != '_' && ! isAlpha ( firstChar ) {
2023-10-09 10:09:33 +08:00
return merr . WrapErrParameterInvalidMsg ( "%s the first character of a resource group name must be an underscore or letter" , invalidMsg )
2023-02-07 17:52:31 +08:00
}
for i := 1 ; i < len ( entity ) ; i ++ {
c := entity [ i ]
if c != '_' && ! isAlpha ( c ) && ! isNumber ( c ) {
2023-10-09 10:09:33 +08:00
return merr . WrapErrParameterInvalidMsg ( "%s resource group name can only contain numbers, letters and underscores" , invalidMsg )
2020-11-26 16:01:31 +08:00
}
}
return nil
}
2023-06-25 17:20:43 +08:00
func ValidateDatabaseName ( dbName string ) error {
if dbName == "" {
2023-10-07 11:29:32 +08:00
return merr . WrapErrDatabaseNameInvalid ( dbName , "database name couldn't be empty" )
2023-06-25 17:20:43 +08:00
}
if len ( dbName ) > Params . ProxyCfg . MaxNameLength . GetAsInt ( ) {
2023-10-07 11:29:32 +08:00
return merr . WrapErrDatabaseNameInvalid ( dbName ,
2023-06-25 17:20:43 +08:00
fmt . Sprintf ( "the length of a database name must be less than %d characters" , Params . ProxyCfg . MaxNameLength . GetAsInt ( ) ) )
}
firstChar := dbName [ 0 ]
if firstChar != '_' && ! isAlpha ( firstChar ) {
2023-10-07 11:29:32 +08:00
return merr . WrapErrDatabaseNameInvalid ( dbName ,
2023-06-25 17:20:43 +08:00
"the first character of a database name must be an underscore or letter" )
}
for i := 1 ; i < len ( dbName ) ; i ++ {
c := dbName [ i ]
if c != '_' && ! isAlpha ( c ) && ! isNumber ( c ) {
2023-10-07 11:29:32 +08:00
return merr . WrapErrDatabaseNameInvalid ( dbName ,
2023-06-25 17:20:43 +08:00
"database name can only contain numbers, letters and underscores" )
}
}
return nil
}
2021-10-02 12:04:12 +08:00
// ValidateCollectionAlias returns true if collAlias is a valid alias name for collection, otherwise returns false.
2021-09-18 11:13:51 +08:00
func ValidateCollectionAlias ( collAlias string ) error {
return validateCollectionNameOrAlias ( collAlias , "alias" )
}
2021-10-23 10:53:12 +08:00
func validateCollectionName ( collName string ) error {
2021-09-18 11:13:51 +08:00
return validateCollectionNameOrAlias ( collName , "name" )
}
2021-10-23 18:23:44 +08:00
func validatePartitionTag ( partitionTag string , strictCheck bool ) error {
2020-11-26 16:01:31 +08:00
partitionTag = strings . TrimSpace ( partitionTag )
2021-06-17 16:08:01 +08:00
invalidMsg := "Invalid partition name: " + partitionTag + ". "
2020-11-26 16:01:31 +08:00
if partitionTag == "" {
2021-06-17 16:08:01 +08:00
msg := invalidMsg + "Partition name should not be empty."
2020-11-26 16:01:31 +08:00
return errors . New ( msg )
}
2022-12-07 18:01:19 +08:00
if len ( partitionTag ) > Params . ProxyCfg . MaxNameLength . GetAsInt ( ) {
msg := invalidMsg + "The length of a partition name must be less than " + Params . ProxyCfg . MaxNameLength . GetValue ( ) + " characters."
2020-11-26 16:01:31 +08:00
return errors . New ( msg )
}
if strictCheck {
firstChar := partitionTag [ 0 ]
if firstChar != '_' && ! isAlpha ( firstChar ) && ! isNumber ( firstChar ) {
2022-09-08 10:16:34 +08:00
msg := invalidMsg + "The first character of a partition name must be an underscore or letter."
2020-11-26 16:01:31 +08:00
return errors . New ( msg )
}
tagSize := len ( partitionTag )
for i := 1 ; i < tagSize ; i ++ {
c := partitionTag [ i ]
2022-09-08 10:16:34 +08:00
if c != '_' && ! isAlpha ( c ) && ! isNumber ( c ) {
msg := invalidMsg + "Partition name can only contain numbers, letters and underscores."
2020-11-26 16:01:31 +08:00
return errors . New ( msg )
}
}
}
return nil
}
2021-10-23 18:25:37 +08:00
func validateFieldName ( fieldName string ) error {
2020-11-26 16:01:31 +08:00
fieldName = strings . TrimSpace ( fieldName )
if fieldName == "" {
2023-10-19 17:24:07 +08:00
return merr . WrapErrFieldNameInvalid ( fieldName , "field name should not be empty" )
2020-11-26 16:01:31 +08:00
}
invalidMsg := "Invalid field name: " + fieldName + ". "
2022-12-07 18:01:19 +08:00
if len ( fieldName ) > Params . ProxyCfg . MaxNameLength . GetAsInt ( ) {
msg := invalidMsg + "The length of a field name must be less than " + Params . ProxyCfg . MaxNameLength . GetValue ( ) + " characters."
2023-10-19 17:24:07 +08:00
return merr . WrapErrFieldNameInvalid ( fieldName , msg )
2020-11-26 16:01:31 +08:00
}
firstChar := fieldName [ 0 ]
if firstChar != '_' && ! isAlpha ( firstChar ) {
msg := invalidMsg + "The first character of a field name must be an underscore or letter."
2023-10-19 17:24:07 +08:00
return merr . WrapErrFieldNameInvalid ( fieldName , msg )
2020-11-26 16:01:31 +08:00
}
fieldNameSize := len ( fieldName )
for i := 1 ; i < fieldNameSize ; i ++ {
c := fieldName [ i ]
if c != '_' && ! isAlpha ( c ) && ! isNumber ( c ) {
2024-06-17 10:37:58 +08:00
msg := invalidMsg + "Field name can only contain numbers, letters, and underscores."
2023-10-19 17:24:07 +08:00
return merr . WrapErrFieldNameInvalid ( fieldName , msg )
2020-11-26 16:01:31 +08:00
}
}
return nil
}
2022-04-29 13:35:49 +08:00
func validateDimension ( field * schemapb . FieldSchema ) error {
exist := false
var dim int64
for _ , param := range field . TypeParams {
2023-05-16 17:41:22 +08:00
if param . Key == common . DimKey {
2022-04-29 13:35:49 +08:00
exist = true
tmp , err := strconv . ParseInt ( param . Value , 10 , 64 )
if err != nil {
return err
}
dim = tmp
break
}
}
2024-04-07 14:27:22 +08:00
if typeutil . IsSparseFloatVectorType ( field . DataType ) {
2024-03-14 05:32:54 +08:00
if exist {
return fmt . Errorf ( "dim should not be specified for sparse vector field %s(%d)" , field . Name , field . FieldID )
}
return nil
}
2022-04-29 13:35:49 +08:00
if ! exist {
return errors . New ( "dimension is not defined in field type params, check type param `dim` for vector field" )
}
2024-03-05 14:21:00 +08:00
if dim <= 1 {
return fmt . Errorf ( "invalid dimension: %d. should be in range 2 ~ %d" , dim , Params . ProxyCfg . MaxDimension . GetAsInt ( ) )
2020-11-26 16:01:31 +08:00
}
2024-03-05 14:21:00 +08:00
2024-04-07 14:27:22 +08:00
if typeutil . IsFloatVectorType ( field . DataType ) {
2024-03-05 14:21:00 +08:00
if dim > Params . ProxyCfg . MaxDimension . GetAsInt64 ( ) {
return fmt . Errorf ( "invalid dimension: %d. float vector dimension should be in range 2 ~ %d" , dim , Params . ProxyCfg . MaxDimension . GetAsInt ( ) )
}
} else {
if dim % 8 != 0 {
return fmt . Errorf ( "invalid dimension: %d. binary vector dimension should be multiple of 8. " , dim )
}
if dim > Params . ProxyCfg . MaxDimension . GetAsInt64 ( ) * 8 {
return fmt . Errorf ( "invalid dimension: %d. binary vector dimension should be in range 2 ~ %d" , dim , Params . ProxyCfg . MaxDimension . GetAsInt ( ) * 8 )
}
2020-11-26 16:01:31 +08:00
}
return nil
}
2020-11-30 19:38:23 +08:00
2022-04-29 13:35:49 +08:00
func validateMaxLengthPerRow ( collectionName string , field * schemapb . FieldSchema ) error {
exist := false
for _ , param := range field . TypeParams {
2023-05-16 17:41:22 +08:00
if param . Key != common . MaxLengthKey {
2023-06-30 11:50:26 +08:00
continue
2022-04-29 13:35:49 +08:00
}
maxLengthPerRow , err := strconv . ParseInt ( param . Value , 10 , 64 )
if err != nil {
return err
}
if maxLengthPerRow > defaultMaxVarCharLength || maxLengthPerRow <= 0 {
2023-10-19 17:24:07 +08:00
return merr . WrapErrParameterInvalidMsg ( "the maximum length specified for a VarChar should be in (0, 65535]" )
2022-04-29 13:35:49 +08:00
}
exist = true
}
2022-06-07 15:58:06 +08:00
// if not exist type params max_length, return error
2022-04-29 13:35:49 +08:00
if ! exist {
2022-06-07 15:58:06 +08:00
return fmt . Errorf ( "type param(max_length) should be specified for varChar field of collection %s" , collectionName )
2022-04-29 13:35:49 +08:00
}
return nil
}
2023-09-19 14:23:23 +08:00
func validateMaxCapacityPerRow ( collectionName string , field * schemapb . FieldSchema ) error {
exist := false
for _ , param := range field . TypeParams {
if param . Key != common . MaxCapacityKey {
continue
}
maxCapacityPerRow , err := strconv . ParseInt ( param . Value , 10 , 64 )
if err != nil {
return fmt . Errorf ( "the value of %s must be an integer" , common . MaxCapacityKey )
}
if maxCapacityPerRow > defaultMaxArrayCapacity || maxCapacityPerRow <= 0 {
2023-10-19 17:24:07 +08:00
return fmt . Errorf ( "the maximum capacity specified for a Array should be in (0, 4096]" )
2023-09-19 14:23:23 +08:00
}
exist = true
}
// if not exist type params max_length, return error
if ! exist {
return fmt . Errorf ( "type param(max_capacity) should be specified for array field of collection %s" , collectionName )
}
return nil
}
2021-10-25 23:32:20 +08:00
func validateVectorFieldMetricType ( field * schemapb . FieldSchema ) error {
2024-04-07 14:27:22 +08:00
if ! typeutil . IsVectorType ( field . DataType ) {
2020-11-30 19:38:23 +08:00
return nil
}
for _ , params := range field . IndexParams {
2023-05-16 17:41:22 +08:00
if params . Key == common . MetricTypeKey {
2020-11-30 19:38:23 +08:00
return nil
}
}
return errors . New ( "vector float without metric_type" )
}
2021-10-26 10:38:41 +08:00
func validateDuplicatedFieldName ( fields [ ] * schemapb . FieldSchema ) error {
2020-11-30 19:38:23 +08:00
names := make ( map [ string ] bool )
for _ , field := range fields {
_ , ok := names [ field . Name ]
if ok {
2021-03-08 12:41:46 +08:00
return errors . New ( "duplicated field name" )
2020-11-30 19:38:23 +08:00
}
names [ field . Name ] = true
}
return nil
}
2023-09-19 14:23:23 +08:00
func validateElementType ( dataType schemapb . DataType ) error {
switch dataType {
case schemapb . DataType_Bool , schemapb . DataType_Int8 , schemapb . DataType_Int16 , schemapb . DataType_Int32 ,
schemapb . DataType_Int64 , schemapb . DataType_Float , schemapb . DataType_Double , schemapb . DataType_VarChar :
return nil
case schemapb . DataType_String :
return errors . New ( "string data type not supported yet, please use VarChar type instead" )
case schemapb . DataType_None :
return errors . New ( "element data type None is not valid" )
}
return fmt . Errorf ( "element type %s is not supported" , dataType . String ( ) )
}
2022-03-03 16:57:56 +08:00
func validateFieldType ( schema * schemapb . CollectionSchema ) error {
for _ , field := range schema . GetFields ( ) {
switch field . GetDataType ( ) {
case schemapb . DataType_String :
2022-03-14 23:20:02 +08:00
return errors . New ( "string data type not supported yet, please use VarChar type instead" )
2022-03-03 16:57:56 +08:00
case schemapb . DataType_None :
return errors . New ( "data type None is not valid" )
2023-09-19 14:23:23 +08:00
case schemapb . DataType_Array :
if err := validateElementType ( field . GetElementType ( ) ) ; err != nil {
return err
}
2022-03-03 16:57:56 +08:00
}
}
return nil
}
2022-10-12 18:37:23 +08:00
// ValidateFieldAutoID call after validatePrimaryKey
2021-06-21 11:42:18 +08:00
func ValidateFieldAutoID ( coll * schemapb . CollectionSchema ) error {
2023-06-07 10:38:36 +08:00
idx := - 1
2021-06-21 11:42:18 +08:00
for i , field := range coll . Fields {
if field . AutoID {
if idx != - 1 {
return fmt . Errorf ( "only one field can speficy AutoID with true, field name = %s, %s" , coll . Fields [ idx ] . Name , field . Name )
}
idx = i
if ! field . IsPrimaryKey {
return fmt . Errorf ( "only primary field can speficy AutoID with true, field name = %s" , field . Name )
2020-11-30 19:38:23 +08:00
}
}
}
2021-06-21 11:42:18 +08:00
return nil
}
2021-10-25 23:42:29 +08:00
func validatePrimaryKey ( coll * schemapb . CollectionSchema ) error {
2020-11-30 19:38:23 +08:00
idx := - 1
for i , field := range coll . Fields {
if field . IsPrimaryKey {
if idx != - 1 {
2021-03-08 12:41:46 +08:00
return fmt . Errorf ( "there are more than one primary key, field name = %s, %s" , coll . Fields [ idx ] . Name , field . Name )
2020-11-30 19:38:23 +08:00
}
2022-03-14 23:20:02 +08:00
// The type of the primary key field can only be int64 and varchar
if field . DataType != schemapb . DataType_Int64 && field . DataType != schemapb . DataType_VarChar {
return errors . New ( "the data type of primary key should be Int64 or VarChar" )
2020-11-30 19:38:23 +08:00
}
2022-03-14 23:20:02 +08:00
// varchar field do not support autoID
// If autoID is required, it is recommended to use int64 field as the primary key
2023-06-16 17:00:40 +08:00
//if field.DataType == schemapb.DataType_VarChar {
// if field.AutoID {
// return fmt.Errorf("autoID is not supported when the VarChar field is the primary key")
// }
//}
2022-03-14 23:20:02 +08:00
2020-11-30 19:38:23 +08:00
idx = i
}
}
if idx == - 1 {
2021-06-21 11:42:18 +08:00
return errors . New ( "primary key is not specified" )
2020-11-30 19:38:23 +08:00
}
return nil
}
2021-03-08 12:41:46 +08:00
2023-05-18 09:33:24 +08:00
func validateDynamicField ( coll * schemapb . CollectionSchema ) error {
for _ , field := range coll . Fields {
if field . IsDynamic {
return fmt . Errorf ( "cannot explicitly set a field as a dynamic field" )
}
}
return nil
}
2021-11-02 21:43:08 +08:00
// RepeatedKeyValToMap transfer the kv pairs to map.
2021-03-08 12:41:46 +08:00
func RepeatedKeyValToMap ( kvPairs [ ] * commonpb . KeyValuePair ) ( map [ string ] string , error ) {
resMap := make ( map [ string ] string )
for _ , kv := range kvPairs {
_ , ok := resMap [ kv . Key ]
if ok {
return nil , fmt . Errorf ( "duplicated param key: %s" , kv . Key )
}
resMap [ kv . Key ] = kv . Value
}
return resMap , nil
}
2021-12-17 21:54:42 +08:00
// isVector check if dataType belongs to vector type.
2021-03-08 12:41:46 +08:00
func isVector ( dataType schemapb . DataType ) ( bool , error ) {
switch dataType {
2021-03-12 14:22:09 +08:00
case schemapb . DataType_Bool , schemapb . DataType_Int8 ,
schemapb . DataType_Int16 , schemapb . DataType_Int32 ,
schemapb . DataType_Int64 ,
schemapb . DataType_Float , schemapb . DataType_Double :
2021-03-08 12:41:46 +08:00
return false , nil
2024-03-14 05:32:54 +08:00
case schemapb . DataType_FloatVector , schemapb . DataType_BinaryVector , schemapb . DataType_Float16Vector , schemapb . DataType_BFloat16Vector , schemapb . DataType_SparseFloatVector :
2021-03-08 12:41:46 +08:00
return true , nil
}
return false , fmt . Errorf ( "invalid data type: %d" , dataType )
}
2021-10-25 23:44:21 +08:00
func validateMetricType ( dataType schemapb . DataType , metricTypeStrRaw string ) error {
2021-03-08 12:41:46 +08:00
metricTypeStr := strings . ToUpper ( metricTypeStrRaw )
switch metricTypeStr {
2023-07-19 16:16:58 +08:00
case metric . L2 , metric . IP , metric . COSINE :
2024-04-07 14:27:22 +08:00
if typeutil . IsFloatVectorType ( dataType ) {
2021-03-08 12:41:46 +08:00
return nil
}
2023-08-31 20:07:00 +08:00
case metric . JACCARD , metric . HAMMING , metric . SUBSTRUCTURE , metric . SUPERSTRUCTURE :
2021-03-12 14:22:09 +08:00
if dataType == schemapb . DataType_BinaryVector {
2021-03-08 12:41:46 +08:00
return nil
}
}
return fmt . Errorf ( "data_type %s mismatch with metric_type %s" , dataType . String ( ) , metricTypeStrRaw )
}
2021-10-25 23:56:19 +08:00
func validateSchema ( coll * schemapb . CollectionSchema ) error {
2021-03-08 12:41:46 +08:00
autoID := coll . AutoID
primaryIdx := - 1
idMap := make ( map [ int64 ] int ) // fieldId -> idx
nameMap := make ( map [ string ] int ) // name -> idx
for idx , field := range coll . Fields {
// check system field
if field . FieldID < 100 {
// System Fields, not injected yet
2021-11-29 12:25:17 +08:00
return fmt . Errorf ( "fieldID(%d) that is less than 100 is reserved for system fields: %s" , field . FieldID , field . Name )
2021-03-08 12:41:46 +08:00
}
// primary key detector
if field . IsPrimaryKey {
if autoID {
return fmt . Errorf ( "autoId forbids primary key" )
} else if primaryIdx != - 1 {
return fmt . Errorf ( "there are more than one primary key, field name = %s, %s" , coll . Fields [ primaryIdx ] . Name , field . Name )
}
2021-03-12 14:22:09 +08:00
if field . DataType != schemapb . DataType_Int64 {
2023-10-19 17:24:07 +08:00
return fmt . Errorf ( "type of primary key should be int64" )
2021-03-08 12:41:46 +08:00
}
primaryIdx = idx
}
// check unique
elemIdx , ok := idMap [ field . FieldID ]
if ok {
return fmt . Errorf ( "duplicate field ids: %d" , coll . Fields [ elemIdx ] . FieldID )
}
idMap [ field . FieldID ] = idx
elemIdx , ok = nameMap [ field . Name ]
if ok {
return fmt . Errorf ( "duplicate field names: %s" , coll . Fields [ elemIdx ] . Name )
}
nameMap [ field . Name ] = idx
isVec , err3 := isVector ( field . DataType )
if err3 != nil {
return err3
}
if isVec {
indexKv , err1 := RepeatedKeyValToMap ( field . IndexParams )
if err1 != nil {
return err1
}
typeKv , err2 := RepeatedKeyValToMap ( field . TypeParams )
if err2 != nil {
return err2
}
2024-04-07 14:27:22 +08:00
if ! typeutil . IsSparseFloatVectorType ( field . DataType ) {
2024-03-14 05:32:54 +08:00
dimStr , ok := typeKv [ common . DimKey ]
if ! ok {
return fmt . Errorf ( "dim not found in type_params for vector field %s(%d)" , field . Name , field . FieldID )
}
dim , err := strconv . Atoi ( dimStr )
if err != nil || dim < 0 {
return fmt . Errorf ( "invalid dim; %s" , dimStr )
}
2021-03-08 12:41:46 +08:00
}
2023-05-16 17:41:22 +08:00
metricTypeStr , ok := indexKv [ common . MetricTypeKey ]
2021-03-08 12:41:46 +08:00
if ok {
2021-10-25 23:44:21 +08:00
err4 := validateMetricType ( field . DataType , metricTypeStr )
2021-03-08 12:41:46 +08:00
if err4 != nil {
return err4
}
}
2023-09-19 10:05:22 +08:00
// in C++, default type will be specified
// do nothing
2021-03-08 12:41:46 +08:00
} else {
if len ( field . IndexParams ) != 0 {
return fmt . Errorf ( "index params is not empty for scalar field: %s(%d)" , field . Name , field . FieldID )
}
if len ( field . TypeParams ) != 0 {
return fmt . Errorf ( "type params is not empty for scalar field: %s(%d)" , field . Name , field . FieldID )
}
}
}
if ! autoID && primaryIdx == - 1 {
return fmt . Errorf ( "primary key is required for non autoid mode" )
}
return nil
}
2021-12-06 10:03:34 +08:00
2021-12-17 22:19:03 +08:00
// validateMultipleVectorFields check if schema has multiple vector fields.
2021-12-06 10:03:34 +08:00
func validateMultipleVectorFields ( schema * schemapb . CollectionSchema ) error {
vecExist := false
var vecName string
for i := range schema . Fields {
name := schema . Fields [ i ] . Name
dType := schema . Fields [ i ] . DataType
2024-04-07 14:27:22 +08:00
isVec := typeutil . IsVectorType ( dType )
2021-12-06 10:03:34 +08:00
if isVec && vecExist && ! enableMultipleVectorFields {
return fmt . Errorf (
"multiple vector fields is not supported, fields name: %s, %s" ,
vecName ,
name ,
)
} else if isVec {
vecExist = true
vecName = name
}
}
return nil
}
2022-03-25 14:27:25 +08:00
// parsePrimaryFieldData2IDs get IDs to fill grpc result, for example insert request, delete request etc.
func parsePrimaryFieldData2IDs ( fieldData * schemapb . FieldData ) ( * schemapb . IDs , error ) {
primaryData := & schemapb . IDs { }
switch fieldData . Field . ( type ) {
case * schemapb . FieldData_Scalars :
scalarField := fieldData . GetScalars ( )
switch scalarField . Data . ( type ) {
case * schemapb . ScalarField_LongData :
primaryData . IdField = & schemapb . IDs_IntId {
IntId : scalarField . GetLongData ( ) ,
}
case * schemapb . ScalarField_StringData :
primaryData . IdField = & schemapb . IDs_StrId {
StrId : scalarField . GetStringData ( ) ,
}
default :
2024-07-01 14:56:12 +08:00
return nil , merr . WrapErrParameterInvalidMsg ( "currently only support DataType Int64 or VarChar as PrimaryField" )
2022-03-25 14:27:25 +08:00
}
default :
2024-07-01 14:56:12 +08:00
return nil , merr . WrapErrParameterInvalidMsg ( "currently not support vector field as PrimaryField" )
2022-03-25 14:27:25 +08:00
}
return primaryData , nil
}
// autoGenPrimaryFieldData generate primary data when autoID == true
func autoGenPrimaryFieldData ( fieldSchema * schemapb . FieldSchema , data interface { } ) ( * schemapb . FieldData , error ) {
var fieldData schemapb . FieldData
fieldData . FieldName = fieldSchema . Name
fieldData . Type = fieldSchema . DataType
switch data := data . ( type ) {
case [ ] int64 :
2023-06-16 17:00:40 +08:00
switch fieldData . Type {
case schemapb . DataType_Int64 :
fieldData . Field = & schemapb . FieldData_Scalars {
Scalars : & schemapb . ScalarField {
Data : & schemapb . ScalarField_LongData {
LongData : & schemapb . LongArray {
Data : data ,
} ,
2022-03-25 14:27:25 +08:00
} ,
} ,
2023-06-16 17:00:40 +08:00
}
case schemapb . DataType_VarChar :
2024-03-28 06:33:11 +08:00
strIDs := make ( [ ] string , len ( data ) )
2023-06-16 17:00:40 +08:00
for i , v := range data {
2024-03-28 06:33:11 +08:00
strIDs [ i ] = strconv . FormatInt ( v , 10 )
2023-06-16 17:00:40 +08:00
}
fieldData . Field = & schemapb . FieldData_Scalars {
Scalars : & schemapb . ScalarField {
Data : & schemapb . ScalarField_StringData {
StringData : & schemapb . StringArray {
2024-03-28 06:33:11 +08:00
Data : strIDs ,
2023-06-16 17:00:40 +08:00
} ,
} ,
} ,
}
default :
return nil , errors . New ( "currently only support autoID for int64 and varchar PrimaryField" )
2022-03-25 14:27:25 +08:00
}
default :
2023-06-16 17:00:40 +08:00
return nil , errors . New ( "currently only int64 is supported as the data source for the autoID of a PrimaryField" )
2022-03-25 14:27:25 +08:00
}
return & fieldData , nil
}
2023-05-23 14:27:25 +08:00
func autoGenDynamicFieldData ( data [ ] [ ] byte ) * schemapb . FieldData {
return & schemapb . FieldData {
2023-05-18 09:33:24 +08:00
FieldName : common . MetaFieldName ,
Type : schemapb . DataType_JSON ,
Field : & schemapb . FieldData_Scalars {
Scalars : & schemapb . ScalarField {
Data : & schemapb . ScalarField_JsonData {
JsonData : & schemapb . JSONArray {
Data : data ,
} ,
} ,
} ,
} ,
2023-05-23 14:27:25 +08:00
IsDynamic : true ,
2023-05-18 09:33:24 +08:00
}
}
2022-03-25 14:27:25 +08:00
// fillFieldIDBySchema set fieldID to fieldData according FieldSchemas
func fillFieldIDBySchema ( columns [ ] * schemapb . FieldData , schema * schemapb . CollectionSchema ) error {
if len ( columns ) != len ( schema . GetFields ( ) ) {
return fmt . Errorf ( "len(columns) mismatch the len(fields), len(columns): %d, len(fields): %d" ,
len ( columns ) , len ( schema . GetFields ( ) ) )
}
fieldName2Schema := make ( map [ string ] * schemapb . FieldSchema )
for _ , field := range schema . GetFields ( ) {
fieldName2Schema [ field . Name ] = field
}
for _ , fieldData := range columns {
if fieldSchema , ok := fieldName2Schema [ fieldData . FieldName ] ; ok {
fieldData . FieldId = fieldSchema . FieldID
fieldData . Type = fieldSchema . DataType
} else {
return fmt . Errorf ( "fieldName %v not exist in collection schema" , fieldData . FieldName )
}
}
return nil
}
2022-04-11 19:49:34 +08:00
func ValidateUsername ( username string ) error {
username = strings . TrimSpace ( username )
if username == "" {
2023-10-09 10:09:33 +08:00
return merr . WrapErrParameterInvalidMsg ( "username must be not empty" )
2022-04-11 19:49:34 +08:00
}
2022-12-07 18:01:19 +08:00
if len ( username ) > Params . ProxyCfg . MaxUsernameLength . GetAsInt ( ) {
2023-10-09 10:09:33 +08:00
return merr . WrapErrParameterInvalidMsg ( "invalid username %s with length %d, the length of username must be less than %d" , username , len ( username ) , Params . ProxyCfg . MaxUsernameLength . GetValue ( ) )
2022-04-11 19:49:34 +08:00
}
firstChar := username [ 0 ]
if ! isAlpha ( firstChar ) {
2024-01-18 22:18:55 +08:00
return merr . WrapErrParameterInvalidMsg ( "invalid user name %s, the first character must be a letter, but got %s" , username , string ( firstChar ) )
2022-04-11 19:49:34 +08:00
}
usernameSize := len ( username )
for i := 1 ; i < usernameSize ; i ++ {
c := username [ i ]
if c != '_' && ! isAlpha ( c ) && ! isNumber ( c ) {
2023-10-09 10:09:33 +08:00
return merr . WrapErrParameterInvalidMsg ( "invalid user name %s, username must contain only numbers, letters and underscores, but got %s" , username , c )
2022-04-11 19:49:34 +08:00
}
}
return nil
}
func ValidatePassword ( password string ) error {
2022-12-07 18:01:19 +08:00
if len ( password ) < Params . ProxyCfg . MinPasswordLength . GetAsInt ( ) || len ( password ) > Params . ProxyCfg . MaxPasswordLength . GetAsInt ( ) {
2023-10-09 10:09:33 +08:00
return merr . WrapErrParameterInvalidRange ( Params . ProxyCfg . MinPasswordLength . GetAsInt ( ) ,
Params . ProxyCfg . MaxPasswordLength . GetAsInt ( ) ,
len ( password ) , "invalid password length" )
2022-04-11 19:49:34 +08:00
}
return nil
}
2022-05-11 09:47:53 +08:00
func ReplaceID2Name ( oldStr string , id int64 , name string ) string {
return strings . ReplaceAll ( oldStr , strconv . FormatInt ( id , 10 ) , name )
}
2022-05-24 12:05:59 +08:00
2023-05-30 21:01:29 +08:00
func parseGuaranteeTsFromConsistency ( ts , tMax typeutil . Timestamp , consistency commonpb . ConsistencyLevel ) typeutil . Timestamp {
switch consistency {
case commonpb . ConsistencyLevel_Strong :
ts = tMax
case commonpb . ConsistencyLevel_Bounded :
ratio := Params . CommonCfg . GracefulTime . GetAsDuration ( time . Millisecond )
ts = tsoutil . AddPhysicalDurationOnTs ( tMax , - ratio )
case commonpb . ConsistencyLevel_Eventually :
ts = 1
}
return ts
}
2022-05-24 12:05:59 +08:00
func parseGuaranteeTs ( ts , tMax typeutil . Timestamp ) typeutil . Timestamp {
switch ts {
case strongTS :
ts = tMax
case boundedTS :
2023-02-15 17:22:34 +08:00
ratio := Params . CommonCfg . GracefulTime . GetAsDuration ( time . Millisecond )
ts = tsoutil . AddPhysicalDurationOnTs ( tMax , - ratio )
2022-05-24 12:05:59 +08:00
}
return ts
}
2022-08-04 11:04:34 +08:00
func validateName ( entity string , nameType string ) error {
entity = strings . TrimSpace ( entity )
if entity == "" {
2023-09-04 09:57:09 +08:00
return merr . WrapErrParameterInvalid ( "not empty" , entity , nameType + " should be not empty" )
2022-08-04 11:04:34 +08:00
}
2022-12-07 18:01:19 +08:00
if len ( entity ) > Params . ProxyCfg . MaxNameLength . GetAsInt ( ) {
2023-09-04 09:57:09 +08:00
return merr . WrapErrParameterInvalidRange ( 0 ,
Params . ProxyCfg . MaxNameLength . GetAsInt ( ) ,
len ( entity ) ,
fmt . Sprintf ( "the length of %s must be not greater than limit" , nameType ) )
2022-08-04 11:04:34 +08:00
}
firstChar := entity [ 0 ]
if firstChar != '_' && ! isAlpha ( firstChar ) {
2023-09-04 09:57:09 +08:00
return merr . WrapErrParameterInvalid ( '_' ,
firstChar ,
fmt . Sprintf ( "the first character of %s must be an underscore or letter" , nameType ) )
2022-08-04 11:04:34 +08:00
}
for i := 1 ; i < len ( entity ) ; i ++ {
c := entity [ i ]
if c != '_' && c != '$' && ! isAlpha ( c ) && ! isNumber ( c ) {
2023-09-04 09:57:09 +08:00
return merr . WrapErrParameterInvalidMsg ( "%s can only contain numbers, letters, dollars and underscores, found %c at %d" , nameType , c , i )
2022-08-04 11:04:34 +08:00
}
}
return nil
}
func ValidateRoleName ( entity string ) error {
return validateName ( entity , "role name" )
}
2022-08-15 16:40:48 +08:00
func IsDefaultRole ( roleName string ) bool {
for _ , defaultRole := range util . DefaultRoles {
if defaultRole == roleName {
return true
}
2022-08-04 11:04:34 +08:00
}
2022-08-15 16:40:48 +08:00
return false
}
2022-08-04 11:04:34 +08:00
2022-08-15 16:40:48 +08:00
func ValidateObjectName ( entity string ) error {
if util . IsAnyWord ( entity ) {
return nil
2022-08-04 11:04:34 +08:00
}
2022-08-15 16:40:48 +08:00
return validateName ( entity , "role name" )
2022-08-04 11:04:34 +08:00
}
func ValidateObjectType ( entity string ) error {
return validateName ( entity , "ObjectType" )
}
func ValidatePrincipalName ( entity string ) error {
return validateName ( entity , "PrincipalName" )
}
func ValidatePrincipalType ( entity string ) error {
return validateName ( entity , "PrincipalType" )
}
func ValidatePrivilege ( entity string ) error {
2022-08-15 16:40:48 +08:00
if util . IsAnyWord ( entity ) {
return nil
}
2022-08-04 11:04:34 +08:00
return validateName ( entity , "Privilege" )
}
func GetCurUserFromContext ( ctx context . Context ) ( string , error ) {
2024-03-28 07:13:10 +08:00
return contextutil . GetCurUserFromContext ( ctx )
2022-08-04 11:04:34 +08:00
}
2024-03-28 06:33:11 +08:00
func GetCurUserFromContextOrDefault ( ctx context . Context ) string {
username , _ := GetCurUserFromContext ( ctx )
return username
}
2023-06-25 17:20:43 +08:00
func GetCurDBNameFromContextOrDefault ( ctx context . Context ) string {
md , ok := metadata . FromIncomingContext ( ctx )
if ! ok {
return util . DefaultDBName
}
dbNameData := md [ strings . ToLower ( util . HeaderDBName ) ]
if len ( dbNameData ) < 1 || dbNameData [ 0 ] == "" {
return util . DefaultDBName
}
return dbNameData [ 0 ]
}
2023-10-17 20:00:14 +08:00
func NewContextWithMetadata ( ctx context . Context , username string , dbName string ) context . Context {
2024-01-28 16:03:01 +08:00
dbKey := strings . ToLower ( util . HeaderDBName )
if username == "" {
return contextutil . AppendToIncomingContext ( ctx , dbKey , dbName )
}
2023-10-17 20:00:14 +08:00
originValue := fmt . Sprintf ( "%s%s%s" , username , util . CredentialSeperator , username )
authKey := strings . ToLower ( util . HeaderAuthorize )
authValue := crypto . Base64Encode ( originValue )
2023-11-27 20:14:26 +08:00
return contextutil . AppendToIncomingContext ( ctx , authKey , authValue , dbKey , dbName )
2023-10-17 20:00:14 +08:00
}
2024-04-29 19:09:34 +08:00
func AppendUserInfoForRPC ( ctx context . Context ) context . Context {
curUser , _ := GetCurUserFromContext ( ctx )
if curUser != "" {
originValue := fmt . Sprintf ( "%s%s%s" , curUser , util . CredentialSeperator , curUser )
authKey := strings . ToLower ( util . HeaderAuthorize )
authValue := crypto . Base64Encode ( originValue )
ctx = metadata . AppendToOutgoingContext ( ctx , authKey , authValue )
}
return ctx
}
2022-08-04 11:04:34 +08:00
func GetRole ( username string ) ( [ ] string , error ) {
if globalMetaCache == nil {
2023-03-24 15:27:58 +08:00
return [ ] string { } , merr . WrapErrServiceUnavailable ( "internal: Milvus Proxy is not ready yet. please wait" )
2022-08-04 11:04:34 +08:00
}
return globalMetaCache . GetUserRole ( username ) , nil
}
2022-08-19 19:42:50 +08:00
2023-08-08 10:15:07 +08:00
func PasswordVerify ( ctx context . Context , username , rawPwd string ) bool {
return passwordVerify ( ctx , username , rawPwd , globalMetaCache )
}
2023-10-18 16:36:12 +08:00
func VerifyAPIKey ( rawToken string ) ( string , error ) {
if hoo == nil {
return "" , merr . WrapErrServiceInternal ( "internal: Milvus Proxy is not ready yet. please wait" )
}
user , err := hoo . VerifyAPIKey ( rawToken )
if err != nil {
log . Warn ( "fail to verify apikey" , zap . String ( "api_key" , rawToken ) , zap . Error ( err ) )
return "" , merr . WrapErrParameterInvalidMsg ( "invalid apikey: [%s]" , rawToken )
}
return user , nil
}
2022-08-19 19:42:50 +08:00
// PasswordVerify verify password
func passwordVerify ( ctx context . Context , username , rawPwd string , globalMetaCache Cache ) bool {
// it represents the cache miss if Sha256Password is empty within credInfo, which shall be updated first connection.
// meanwhile, generating Sha256Password depends on raw password and encrypted password will not cache.
credInfo , err := globalMetaCache . GetCredentialInfo ( ctx , username )
if err != nil {
log . Error ( "found no credential" , zap . String ( "username" , username ) , zap . Error ( err ) )
return false
}
// hit cache
sha256Pwd := crypto . SHA256 ( rawPwd , credInfo . Username )
if credInfo . Sha256Password != "" {
return sha256Pwd == credInfo . Sha256Password
}
// miss cache, verify against encrypted password from etcd
if err := bcrypt . CompareHashAndPassword ( [ ] byte ( credInfo . EncryptedPassword ) , [ ] byte ( rawPwd ) ) ; err != nil {
log . Error ( "Verify password failed" , zap . Error ( err ) )
return false
}
// update cache after miss cache
credInfo . Sha256Password = sha256Pwd
log . Debug ( "get credential miss cache, update cache with" , zap . Any ( "credential" , credInfo ) )
globalMetaCache . UpdateCredential ( credInfo )
return true
}
2022-10-08 15:38:58 +08:00
2023-09-12 10:19:17 +08:00
func translatePkOutputFields ( schema * schemapb . CollectionSchema ) ( [ ] string , [ ] int64 ) {
pkNames := [ ] string { }
fieldIDs := [ ] int64 { }
for _ , field := range schema . Fields {
if field . IsPrimaryKey {
pkNames = append ( pkNames , field . GetName ( ) )
fieldIDs = append ( fieldIDs , field . GetFieldID ( ) )
}
}
return pkNames , fieldIDs
}
2022-10-08 15:38:58 +08:00
// Support wildcard in output fields:
2022-10-12 18:37:23 +08:00
//
2023-05-17 12:41:22 +08:00
// "*" - all fields
2022-10-12 18:37:23 +08:00
//
2022-10-08 15:38:58 +08:00
// For example, A and B are scalar fields, C and D are vector fields, duplicated fields will automatically be removed.
2022-10-12 18:37:23 +08:00
//
2023-05-17 12:41:22 +08:00
// output_fields=["*"] ==> [A,B,C,D]
// output_fields=["*",A] ==> [A,B,C,D]
// output_fields=["*",C] ==> [A,B,C,D]
2024-01-04 17:28:46 +08:00
func translateOutputFields ( outputFields [ ] string , schema * schemaInfo , addPrimary bool ) ( [ ] string , [ ] string , error ) {
2022-10-08 15:38:58 +08:00
var primaryFieldName string
2023-05-19 09:41:25 +08:00
allFieldNameMap := make ( map [ string ] bool )
2022-10-08 15:38:58 +08:00
resultFieldNameMap := make ( map [ string ] bool )
resultFieldNames := make ( [ ] string , 0 )
2023-05-23 10:19:26 +08:00
userOutputFieldsMap := make ( map [ string ] bool )
userOutputFields := make ( [ ] string , 0 )
2022-10-08 15:38:58 +08:00
for _ , field := range schema . Fields {
if field . IsPrimaryKey {
primaryFieldName = field . Name
}
2023-05-19 09:41:25 +08:00
allFieldNameMap [ field . Name ] = true
2022-10-08 15:38:58 +08:00
}
for _ , outputFieldName := range outputFields {
outputFieldName = strings . TrimSpace ( outputFieldName )
if outputFieldName == "*" {
2023-05-19 09:41:25 +08:00
for fieldName := range allFieldNameMap {
2022-10-08 15:38:58 +08:00
resultFieldNameMap [ fieldName ] = true
2023-05-23 10:19:26 +08:00
userOutputFieldsMap [ fieldName ] = true
2022-10-08 15:38:58 +08:00
}
} else {
2023-05-19 09:41:25 +08:00
if _ , ok := allFieldNameMap [ outputFieldName ] ; ok {
resultFieldNameMap [ outputFieldName ] = true
2023-05-23 10:19:26 +08:00
userOutputFieldsMap [ outputFieldName ] = true
2023-05-19 09:41:25 +08:00
} else {
if schema . EnableDynamicField {
2024-01-04 17:28:46 +08:00
schemaH , err := typeutil . CreateSchemaHelper ( schema . CollectionSchema )
2023-05-23 10:19:26 +08:00
if err != nil {
return nil , nil , err
}
err = planparserv2 . ParseIdentifier ( schemaH , outputFieldName , func ( expr * planpb . Expr ) error {
if len ( expr . GetColumnExpr ( ) . GetInfo ( ) . GetNestedPath ( ) ) == 1 &&
expr . GetColumnExpr ( ) . GetInfo ( ) . GetNestedPath ( ) [ 0 ] == outputFieldName {
return nil
}
2023-10-30 14:40:27 +08:00
return fmt . Errorf ( "not support getting subkeys of json field yet" )
2023-05-23 10:19:26 +08:00
} )
if err != nil {
log . Info ( "parse output field name failed" , zap . String ( "field name" , outputFieldName ) )
return nil , nil , fmt . Errorf ( "parse output field name failed: %s" , outputFieldName )
}
2023-05-19 09:41:25 +08:00
resultFieldNameMap [ common . MetaFieldName ] = true
2023-05-23 10:19:26 +08:00
userOutputFieldsMap [ outputFieldName ] = true
2023-05-19 09:41:25 +08:00
} else {
2023-05-23 10:19:26 +08:00
return nil , nil , fmt . Errorf ( "field %s not exist" , outputFieldName )
2023-05-19 09:41:25 +08:00
}
}
2022-10-08 15:38:58 +08:00
}
}
if addPrimary {
resultFieldNameMap [ primaryFieldName ] = true
2023-05-23 10:19:26 +08:00
userOutputFieldsMap [ primaryFieldName ] = true
2022-10-08 15:38:58 +08:00
}
for fieldName := range resultFieldNameMap {
resultFieldNames = append ( resultFieldNames , fieldName )
}
2023-05-23 10:19:26 +08:00
for fieldName := range userOutputFieldsMap {
userOutputFields = append ( userOutputFields , fieldName )
}
return resultFieldNames , userOutputFields , nil
2022-10-08 15:38:58 +08:00
}
2022-10-16 21:05:25 +08:00
func validateIndexName ( indexName string ) error {
indexName = strings . TrimSpace ( indexName )
if indexName == "" {
return nil
}
invalidMsg := "Invalid index name: " + indexName + ". "
2022-12-07 18:01:19 +08:00
if len ( indexName ) > Params . ProxyCfg . MaxNameLength . GetAsInt ( ) {
msg := invalidMsg + "The length of a index name must be less than " + Params . ProxyCfg . MaxNameLength . GetValue ( ) + " characters."
2022-10-16 21:05:25 +08:00
return errors . New ( msg )
}
firstChar := indexName [ 0 ]
if firstChar != '_' && ! isAlpha ( firstChar ) {
msg := invalidMsg + "The first character of a index name must be an underscore or letter."
return errors . New ( msg )
}
indexNameSize := len ( indexName )
for i := 1 ; i < indexNameSize ; i ++ {
c := indexName [ i ]
if c != '_' && ! isAlpha ( c ) && ! isNumber ( c ) {
2024-06-17 10:37:58 +08:00
msg := invalidMsg + "Index name can only contain numbers, letters, and underscores."
2022-10-16 21:05:25 +08:00
return errors . New ( msg )
}
}
return nil
}
2022-10-21 14:41:28 +08:00
2023-09-26 09:57:25 +08:00
func isCollectionLoaded ( ctx context . Context , qc types . QueryCoordClient , collID int64 ) ( bool , error ) {
2022-10-21 14:41:28 +08:00
// get all loading collections
resp , err := qc . ShowCollections ( ctx , & querypb . ShowCollectionsRequest {
CollectionIDs : nil ,
} )
if err != nil {
return false , err
}
2023-09-12 16:07:18 +08:00
if resp . GetStatus ( ) . GetErrorCode ( ) != commonpb . ErrorCode_Success {
2023-10-07 11:29:32 +08:00
return false , merr . Error ( resp . GetStatus ( ) )
2022-10-21 14:41:28 +08:00
}
for _ , loadedCollID := range resp . GetCollectionIDs ( ) {
2022-10-27 13:05:31 +08:00
if collID == loadedCollID {
return true , nil
2022-10-21 14:41:28 +08:00
}
}
2022-10-27 13:05:31 +08:00
return false , nil
2022-10-21 14:41:28 +08:00
}
2023-09-26 09:57:25 +08:00
func isPartitionLoaded ( ctx context . Context , qc types . QueryCoordClient , collID int64 , partIDs [ ] int64 ) ( bool , error ) {
2022-10-21 14:41:28 +08:00
// get all loading collections
resp , err := qc . ShowPartitions ( ctx , & querypb . ShowPartitionsRequest {
2022-10-27 13:05:31 +08:00
CollectionID : collID ,
2022-10-21 14:41:28 +08:00
PartitionIDs : nil ,
} )
if err != nil {
return false , err
}
2023-09-12 16:07:18 +08:00
if resp . GetStatus ( ) . GetErrorCode ( ) != commonpb . ErrorCode_Success {
2023-10-07 11:29:32 +08:00
return false , merr . Error ( resp . GetStatus ( ) )
2022-10-21 14:41:28 +08:00
}
for _ , loadedPartID := range resp . GetPartitionIDs ( ) {
for _ , partID := range partIDs {
if partID == loadedPartID {
2022-10-27 13:05:31 +08:00
return true , nil
2022-10-21 14:41:28 +08:00
}
}
}
2022-10-27 13:05:31 +08:00
return false , nil
2022-10-21 14:41:28 +08:00
}
2022-12-08 18:37:19 +08:00
2024-07-11 16:53:35 +08:00
func checkFieldsDataBySchema ( schema * schemapb . CollectionSchema , insertMsg * msgstream . InsertMsg , inInsert bool ) error {
2024-05-16 11:57:35 +08:00
log := log . With ( zap . String ( "collection" , schema . GetName ( ) ) )
2023-11-24 15:08:24 +08:00
primaryKeyNum := 0
2024-05-16 11:57:35 +08:00
autoGenFieldNum := 0
2023-05-15 16:15:21 +08:00
2023-06-07 10:38:36 +08:00
dataNameSet := typeutil . NewSet [ string ] ( )
2023-05-15 16:15:21 +08:00
for _ , data := range insertMsg . FieldsData {
2023-05-31 20:32:31 +08:00
fieldName := data . GetFieldName ( )
if dataNameSet . Contain ( fieldName ) {
2023-11-24 15:08:24 +08:00
return merr . WrapErrParameterInvalidMsg ( "duplicated field %s found" , fieldName )
2023-05-31 20:32:31 +08:00
}
dataNameSet . Insert ( fieldName )
2023-05-15 16:15:21 +08:00
}
for _ , fieldSchema := range schema . Fields {
if fieldSchema . AutoID && ! fieldSchema . IsPrimaryKey {
2023-11-24 15:08:24 +08:00
log . Warn ( "not primary key field, but set autoID true" , zap . String ( "field" , fieldSchema . GetName ( ) ) )
return merr . WrapErrParameterInvalidMsg ( "only primary key could be with AutoID enabled" )
2023-05-15 16:15:21 +08:00
}
2024-07-11 16:53:35 +08:00
2024-05-16 11:57:35 +08:00
if fieldSchema . IsPrimaryKey {
primaryKeyNum ++
}
2023-05-15 16:15:21 +08:00
if fieldSchema . GetDefaultValue ( ) != nil && fieldSchema . IsPrimaryKey {
2023-11-24 15:08:24 +08:00
return merr . WrapErrParameterInvalidMsg ( "primary key can't be with default value" )
2023-05-15 16:15:21 +08:00
}
2024-07-11 16:53:35 +08:00
if fieldSchema . IsPrimaryKey && fieldSchema . AutoID && ! Params . ProxyCfg . SkipAutoIDCheck . GetAsBool ( ) && inInsert {
// when inInsert, no need to pass when pk is autoid and SkipAutoIDCheck is false
autoGenFieldNum ++
}
2023-05-15 16:15:21 +08:00
if _ , ok := dataNameSet [ fieldSchema . GetName ( ) ] ; ! ok {
2024-07-11 16:53:35 +08:00
if fieldSchema . IsPrimaryKey && fieldSchema . AutoID && ! Params . ProxyCfg . SkipAutoIDCheck . GetAsBool ( ) && inInsert {
// autoGenField
2023-05-15 16:15:21 +08:00
continue
}
2024-05-16 11:57:35 +08:00
if fieldSchema . GetDefaultValue ( ) == nil && ! fieldSchema . GetNullable ( ) {
log . Warn ( "no corresponding fieldData pass in" , zap . String ( "fieldSchema" , fieldSchema . GetName ( ) ) )
return merr . WrapErrParameterInvalidMsg ( "fieldSchema(%s) has no corresponding fieldData pass in" , fieldSchema . GetName ( ) )
}
// when use default_value or has set Nullable
// it's ok that no corresponding fieldData found
2023-05-15 16:15:21 +08:00
dataToAppend := & schemapb . FieldData {
Type : fieldSchema . GetDataType ( ) ,
FieldName : fieldSchema . GetName ( ) ,
}
insertMsg . FieldsData = append ( insertMsg . FieldsData , dataToAppend )
}
}
2023-11-24 15:08:24 +08:00
if primaryKeyNum > 1 {
log . Warn ( "more than 1 primary keys not supported" ,
2024-05-16 11:57:35 +08:00
zap . Int64 ( "primaryKeyNum" , int64 ( primaryKeyNum ) ) )
2023-11-24 15:08:24 +08:00
return merr . WrapErrParameterInvalidMsg ( "more than 1 primary keys not supported, got %d" , primaryKeyNum )
2022-12-08 18:37:19 +08:00
}
2024-05-16 11:57:35 +08:00
expectedNum := len ( schema . Fields )
2024-07-11 16:53:35 +08:00
actualNum := len ( insertMsg . FieldsData ) + autoGenFieldNum
2024-05-16 11:57:35 +08:00
if expectedNum != actualNum {
log . Warn ( "the number of fields is not the same as needed" , zap . Int ( "expected" , expectedNum ) , zap . Int ( "actual" , actualNum ) )
return merr . WrapErrParameterInvalid ( expectedNum , actualNum , "more fieldData has pass in" )
2022-12-08 18:37:19 +08:00
}
return nil
}
2024-07-11 16:53:35 +08:00
func checkPrimaryFieldData ( schema * schemapb . CollectionSchema , insertMsg * msgstream . InsertMsg , inInsert bool ) ( * schemapb . IDs , error ) {
log := log . With ( zap . String ( "collectionName" , insertMsg . CollectionName ) )
2022-12-08 18:37:19 +08:00
rowNums := uint32 ( insertMsg . NRows ( ) )
// TODO(dragondriver): in fact, NumRows is not trustable, we should check all input fields
if insertMsg . NRows ( ) <= 0 {
2023-03-24 15:27:58 +08:00
return nil , merr . WrapErrParameterInvalid ( "invalid num_rows" , fmt . Sprint ( rowNums ) , "num_rows should be greater than 0" )
2022-12-08 18:37:19 +08:00
}
2024-07-11 16:53:35 +08:00
if err := checkFieldsDataBySchema ( schema , insertMsg , inInsert ) ; err != nil {
2022-12-08 18:37:19 +08:00
return nil , err
}
primaryFieldSchema , err := typeutil . GetPrimaryFieldSchema ( schema )
if err != nil {
2024-07-11 16:53:35 +08:00
log . Error ( "get primary field schema failed" , zap . Any ( "schema" , schema ) , zap . Error ( err ) )
2022-12-08 18:37:19 +08:00
return nil , err
}
2024-05-16 11:57:35 +08:00
if primaryFieldSchema . GetNullable ( ) {
return nil , merr . WrapErrParameterInvalidMsg ( "primary field not support null" )
}
2022-12-08 18:37:19 +08:00
// get primaryFieldData whether autoID is true or not
var primaryFieldData * schemapb . FieldData
2023-01-17 17:53:42 +08:00
if inInsert {
// when checkPrimaryFieldData in insert
2024-04-29 10:29:26 +08:00
skipAutoIDCheck := Params . ProxyCfg . SkipAutoIDCheck . GetAsBool ( ) &&
primaryFieldSchema . AutoID &&
typeutil . IsPrimaryFieldDataExist ( insertMsg . GetFieldsData ( ) , primaryFieldSchema )
if ! primaryFieldSchema . AutoID || skipAutoIDCheck {
2023-01-17 17:53:42 +08:00
primaryFieldData , err = typeutil . GetPrimaryFieldData ( insertMsg . GetFieldsData ( ) , primaryFieldSchema )
if err != nil {
2024-07-11 16:53:35 +08:00
log . Info ( "get primary field data failed" , zap . Error ( err ) )
2023-01-17 17:53:42 +08:00
return nil , err
}
} else {
// check primary key data not exist
if typeutil . IsPrimaryFieldDataExist ( insertMsg . GetFieldsData ( ) , primaryFieldSchema ) {
2024-07-11 16:53:35 +08:00
return nil , merr . WrapErrParameterInvalidMsg ( fmt . Sprintf ( "can not assign primary field data when auto id enabled %v" , primaryFieldSchema . Name ) )
2023-01-17 17:53:42 +08:00
}
2023-06-16 17:00:40 +08:00
// if autoID == true, currently support autoID for int64 and varchar PrimaryField
2023-01-17 17:53:42 +08:00
primaryFieldData , err = autoGenPrimaryFieldData ( primaryFieldSchema , insertMsg . GetRowIDs ( ) )
if err != nil {
2024-07-11 16:53:35 +08:00
log . Info ( "generate primary field data failed when autoID == true" , zap . Error ( err ) )
2023-01-17 17:53:42 +08:00
return nil , err
}
// if autoID == true, set the primary field data
// insertMsg.fieldsData need append primaryFieldData
insertMsg . FieldsData = append ( insertMsg . FieldsData , primaryFieldData )
2022-12-08 18:37:19 +08:00
}
} else {
2024-07-11 16:53:35 +08:00
primaryFieldID := primaryFieldSchema . FieldID
primaryFieldName := primaryFieldSchema . Name
for i , field := range insertMsg . GetFieldsData ( ) {
if field . FieldId == primaryFieldID || field . FieldName == primaryFieldName {
primaryFieldData = field
if primaryFieldSchema . AutoID {
// use the passed pk as new pk when autoID == false
// automatic generate pk as new pk wehen autoID == true
newPrimaryFieldData , err := autoGenPrimaryFieldData ( primaryFieldSchema , insertMsg . GetRowIDs ( ) )
if err != nil {
log . Info ( "generate new primary field data failed when upsert" , zap . Error ( err ) )
return nil , err
}
insertMsg . FieldsData = append ( insertMsg . GetFieldsData ( ) [ : i ] , insertMsg . GetFieldsData ( ) [ i + 1 : ] ... )
insertMsg . FieldsData = append ( insertMsg . FieldsData , newPrimaryFieldData )
}
break
}
2022-12-08 18:37:19 +08:00
}
2024-07-11 16:53:35 +08:00
// must assign primary field data when upsert
if primaryFieldData == nil {
return nil , merr . WrapErrParameterInvalidMsg ( fmt . Sprintf ( "must assign pk when upsert, primary field: %v" , primaryFieldName ) )
2022-12-08 18:37:19 +08:00
}
2023-01-04 17:21:36 +08:00
}
// parse primaryFieldData to result.IDs, and as returned primary keys
ids , err := parsePrimaryFieldData2IDs ( primaryFieldData )
if err != nil {
2024-07-11 16:53:35 +08:00
log . Warn ( "parse primary field data to IDs failed" , zap . Error ( err ) )
2023-01-04 17:21:36 +08:00
return nil , err
}
return ids , nil
}
2023-06-06 10:24:34 +08:00
func getPartitionKeyFieldData ( fieldSchema * schemapb . FieldSchema , insertMsg * msgstream . InsertMsg ) ( * schemapb . FieldData , error ) {
2024-04-29 10:29:26 +08:00
if len ( insertMsg . GetPartitionName ( ) ) > 0 && ! Params . ProxyCfg . SkipPartitionKeyCheck . GetAsBool ( ) {
2023-06-06 10:24:34 +08:00
return nil , errors . New ( "not support manually specifying the partition names if partition key mode is used" )
}
for _ , fieldData := range insertMsg . GetFieldsData ( ) {
if fieldData . GetFieldId ( ) == fieldSchema . GetFieldID ( ) {
return fieldData , nil
}
}
return nil , errors . New ( "partition key not specify when insert" )
}
2023-04-12 15:06:28 +08:00
func getCollectionProgress (
ctx context . Context ,
2023-09-26 09:57:25 +08:00
queryCoord types . QueryCoordClient ,
2023-04-12 15:06:28 +08:00
msgBase * commonpb . MsgBase ,
collectionID int64 ,
) ( loadProgress int64 , refreshProgress int64 , err error ) {
2022-12-16 14:39:24 +08:00
resp , err := queryCoord . ShowCollections ( ctx , & querypb . ShowCollectionsRequest {
Base : commonpbutil . UpdateMsgBase (
msgBase ,
2023-06-25 17:20:43 +08:00
commonpbutil . WithMsgType ( commonpb . MsgType_ShowCollections ) ,
2022-12-16 14:39:24 +08:00
) ,
CollectionIDs : [ ] int64 { collectionID } ,
} )
if err != nil {
2023-09-30 10:31:28 +08:00
log . Warn ( "fail to show collections" ,
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Error ( err ) ,
)
2023-04-12 15:06:28 +08:00
return
2022-12-16 14:39:24 +08:00
}
2023-09-30 10:31:28 +08:00
err = merr . Error ( resp . GetStatus ( ) )
if err != nil {
log . Warn ( "fail to show collections" ,
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Error ( err ) )
2023-04-12 15:06:28 +08:00
return
2022-12-16 14:39:24 +08:00
}
2023-04-12 15:06:28 +08:00
loadProgress = resp . GetInMemoryPercentages ( ) [ 0 ]
if len ( resp . GetRefreshProgress ( ) ) > 0 { // Compatibility for new Proxy with old QueryCoord
refreshProgress = resp . GetRefreshProgress ( ) [ 0 ]
}
return
2022-12-16 14:39:24 +08:00
}
2023-04-12 15:06:28 +08:00
func getPartitionProgress (
ctx context . Context ,
2023-09-26 09:57:25 +08:00
queryCoord types . QueryCoordClient ,
2023-04-12 15:06:28 +08:00
msgBase * commonpb . MsgBase ,
partitionNames [ ] string ,
collectionName string ,
collectionID int64 ,
2023-08-22 17:06:22 +08:00
dbName string ,
2023-04-12 15:06:28 +08:00
) ( loadProgress int64 , refreshProgress int64 , err error ) {
2022-12-16 14:39:24 +08:00
IDs2Names := make ( map [ int64 ] string )
partitionIDs := make ( [ ] int64 , 0 )
for _ , partitionName := range partitionNames {
2023-04-12 15:06:28 +08:00
var partitionID int64
2023-08-22 17:06:22 +08:00
partitionID , err = globalMetaCache . GetPartitionID ( ctx , dbName , collectionName , partitionName )
2022-12-16 14:39:24 +08:00
if err != nil {
2023-04-12 15:06:28 +08:00
return
2022-12-16 14:39:24 +08:00
}
IDs2Names [ partitionID ] = partitionName
partitionIDs = append ( partitionIDs , partitionID )
}
2023-06-25 17:20:43 +08:00
var resp * querypb . ShowPartitionsResponse
resp , err = queryCoord . ShowPartitions ( ctx , & querypb . ShowPartitionsRequest {
2022-12-16 14:39:24 +08:00
Base : commonpbutil . UpdateMsgBase (
msgBase ,
commonpbutil . WithMsgType ( commonpb . MsgType_ShowPartitions ) ,
) ,
CollectionID : collectionID ,
PartitionIDs : partitionIDs ,
} )
if err != nil {
log . Warn ( "fail to show partitions" , zap . Int64 ( "collection_id" , collectionID ) ,
zap . String ( "collection_name" , collectionName ) ,
zap . Strings ( "partition_names" , partitionNames ) ,
zap . Error ( err ) )
2023-04-12 15:06:28 +08:00
return
2022-12-16 14:39:24 +08:00
}
2023-06-15 11:14:39 +08:00
2023-09-30 10:31:28 +08:00
err = merr . Error ( resp . GetStatus ( ) )
if err != nil {
2023-06-15 11:14:39 +08:00
err = merr . Error ( resp . GetStatus ( ) )
log . Warn ( "fail to show partitions" ,
2023-09-30 10:31:28 +08:00
zap . String ( "collectionName" , collectionName ) ,
zap . Strings ( "partitionNames" , partitionNames ) ,
zap . Error ( err ) )
2023-06-15 11:14:39 +08:00
return
}
2022-12-16 14:39:24 +08:00
for _ , p := range resp . InMemoryPercentages {
2023-04-12 15:06:28 +08:00
loadProgress += p
}
loadProgress /= int64 ( len ( partitionIDs ) )
if len ( resp . GetRefreshProgress ( ) ) > 0 { // Compatibility for new Proxy with old QueryCoord
refreshProgress = resp . GetRefreshProgress ( ) [ 0 ]
2022-12-16 14:39:24 +08:00
}
2023-04-12 15:06:28 +08:00
return
2022-12-16 14:39:24 +08:00
}
2023-05-15 16:15:21 +08:00
2023-06-25 17:20:43 +08:00
func isPartitionKeyMode ( ctx context . Context , dbName string , colName string ) ( bool , error ) {
colSchema , err := globalMetaCache . GetCollectionSchema ( ctx , dbName , colName )
2023-06-06 10:24:34 +08:00
if err != nil {
return false , err
}
for _ , fieldSchema := range colSchema . GetFields ( ) {
if fieldSchema . IsPartitionKey {
return true , nil
}
}
return false , nil
}
2023-12-26 19:52:48 +08:00
func hasParitionKeyModeField ( schema * schemapb . CollectionSchema ) bool {
for _ , fieldSchema := range schema . GetFields ( ) {
if fieldSchema . IsPartitionKey {
return true
}
}
return false
}
2024-05-23 11:13:40 +08:00
// getDefaultPartitionsInPartitionKeyMode only used in partition key mode
2023-06-25 17:20:43 +08:00
func getDefaultPartitionsInPartitionKeyMode ( ctx context . Context , dbName string , collectionName string ) ( [ ] string , error ) {
partitions , err := globalMetaCache . GetPartitions ( ctx , dbName , collectionName )
if err != nil {
return nil , err
}
// Make sure the order of the partition names got every time is the same
2023-07-11 15:18:28 +08:00
partitionNames , _ , err := typeutil . RearrangePartitionsForPartitionKey ( partitions )
if err != nil {
return nil , err
2023-06-25 17:20:43 +08:00
}
return partitionNames , nil
}
2023-06-06 10:24:34 +08:00
func assignChannelsByPK ( pks * schemapb . IDs , channelNames [ ] string , insertMsg * msgstream . InsertMsg ) map [ string ] [ ] int {
insertMsg . HashValues = typeutil . HashPK2Channels ( pks , channelNames )
// groupedHashKeys represents the dmChannel index
channel2RowOffsets := make ( map [ string ] [ ] int ) // channelName to count
// assert len(it.hashValues) < maxInt
for offset , channelID := range insertMsg . HashValues {
channelName := channelNames [ channelID ]
if _ , ok := channel2RowOffsets [ channelName ] ; ! ok {
channel2RowOffsets [ channelName ] = [ ] int { }
}
channel2RowOffsets [ channelName ] = append ( channel2RowOffsets [ channelName ] , offset )
}
return channel2RowOffsets
}
2023-06-25 17:20:43 +08:00
func assignPartitionKeys ( ctx context . Context , dbName string , collName string , keys [ ] * planpb . GenericValue ) ( [ ] string , error ) {
2023-12-20 10:02:43 +08:00
partitionNames , err := globalMetaCache . GetPartitionsIndex ( ctx , dbName , collName )
2023-06-06 10:24:34 +08:00
if err != nil {
return nil , err
}
2023-06-25 17:20:43 +08:00
schema , err := globalMetaCache . GetCollectionSchema ( ctx , dbName , collName )
2023-06-06 10:24:34 +08:00
if err != nil {
return nil , err
}
2024-01-04 17:28:46 +08:00
partitionKeyFieldSchema , err := typeutil . GetPartitionKeyFieldSchema ( schema . CollectionSchema )
2023-06-06 10:24:34 +08:00
if err != nil {
return nil , err
}
hashedPartitionNames , err := typeutil2 . HashKey2Partitions ( partitionKeyFieldSchema , keys , partitionNames )
return hashedPartitionNames , err
}
2023-08-30 14:47:00 +08:00
func ErrWithLog ( logger * log . MLogger , msg string , err error ) error {
wrapErr := errors . Wrap ( err , msg )
if logger != nil {
logger . Warn ( msg , zap . Error ( err ) )
return wrapErr
}
log . Warn ( msg , zap . Error ( err ) )
return wrapErr
}
2023-08-30 10:52:26 +08:00
func verifyDynamicFieldData ( schema * schemapb . CollectionSchema , insertMsg * msgstream . InsertMsg ) error {
for _ , field := range insertMsg . FieldsData {
if field . GetFieldName ( ) == common . MetaFieldName {
if ! schema . EnableDynamicField {
return fmt . Errorf ( "without dynamic schema enabled, the field name cannot be set to %s" , common . MetaFieldName )
}
for _ , rowData := range field . GetScalars ( ) . GetJsonData ( ) . GetData ( ) {
jsonData := make ( map [ string ] interface { } )
if err := json . Unmarshal ( rowData , & jsonData ) ; err != nil {
return err
}
if _ , ok := jsonData [ common . MetaFieldName ] ; ok {
return fmt . Errorf ( "cannot set json key to: %s" , common . MetaFieldName )
}
}
}
}
return nil
}
func checkDynamicFieldData ( schema * schemapb . CollectionSchema , insertMsg * msgstream . InsertMsg ) error {
for _ , data := range insertMsg . FieldsData {
if data . IsDynamic {
data . FieldName = common . MetaFieldName
return verifyDynamicFieldData ( schema , insertMsg )
}
}
defaultData := make ( [ ] [ ] byte , insertMsg . NRows ( ) )
for i := range defaultData {
defaultData [ i ] = [ ] byte ( "{}" )
}
dynamicData := autoGenDynamicFieldData ( defaultData )
insertMsg . FieldsData = append ( insertMsg . FieldsData , dynamicData )
return nil
}
2023-10-20 14:26:09 +08:00
func SendReplicateMessagePack ( ctx context . Context , replicateMsgStream msgstream . MsgStream , request interface { GetBase ( ) * commonpb . MsgBase } ) {
if replicateMsgStream == nil || request == nil {
log . Warn ( "replicate msg stream or request is nil" , zap . Any ( "request" , request ) )
return
}
msgBase := request . GetBase ( )
ts := msgBase . GetTimestamp ( )
if msgBase . GetReplicateInfo ( ) . GetIsReplicate ( ) {
ts = msgBase . GetReplicateInfo ( ) . GetMsgTimestamp ( )
}
getBaseMsg := func ( ctx context . Context , ts uint64 ) msgstream . BaseMsg {
return msgstream . BaseMsg {
Ctx : ctx ,
HashValues : [ ] uint32 { 0 } ,
BeginTimestamp : ts ,
EndTimestamp : ts ,
}
}
var tsMsg msgstream . TsMsg
switch r := request . ( type ) {
case * milvuspb . CreateDatabaseRequest :
tsMsg = & msgstream . CreateDatabaseMsg {
BaseMsg : getBaseMsg ( ctx , ts ) ,
CreateDatabaseRequest : * r ,
}
case * milvuspb . DropDatabaseRequest :
tsMsg = & msgstream . DropDatabaseMsg {
BaseMsg : getBaseMsg ( ctx , ts ) ,
DropDatabaseRequest : * r ,
}
case * milvuspb . FlushRequest :
tsMsg = & msgstream . FlushMsg {
BaseMsg : getBaseMsg ( ctx , ts ) ,
FlushRequest : * r ,
}
case * milvuspb . LoadCollectionRequest :
tsMsg = & msgstream . LoadCollectionMsg {
BaseMsg : getBaseMsg ( ctx , ts ) ,
LoadCollectionRequest : * r ,
}
case * milvuspb . ReleaseCollectionRequest :
tsMsg = & msgstream . ReleaseCollectionMsg {
BaseMsg : getBaseMsg ( ctx , ts ) ,
ReleaseCollectionRequest : * r ,
}
case * milvuspb . CreateIndexRequest :
tsMsg = & msgstream . CreateIndexMsg {
BaseMsg : getBaseMsg ( ctx , ts ) ,
CreateIndexRequest : * r ,
}
case * milvuspb . DropIndexRequest :
tsMsg = & msgstream . DropIndexMsg {
BaseMsg : getBaseMsg ( ctx , ts ) ,
DropIndexRequest : * r ,
}
2023-11-23 15:38:24 +08:00
case * milvuspb . LoadPartitionsRequest :
tsMsg = & msgstream . LoadPartitionsMsg {
BaseMsg : getBaseMsg ( ctx , ts ) ,
LoadPartitionsRequest : * r ,
}
case * milvuspb . ReleasePartitionsRequest :
tsMsg = & msgstream . ReleasePartitionsMsg {
BaseMsg : getBaseMsg ( ctx , ts ) ,
ReleasePartitionsRequest : * r ,
}
2023-12-21 18:07:24 +08:00
case * milvuspb . AlterIndexRequest :
tsMsg = & msgstream . AlterIndexMsg {
BaseMsg : getBaseMsg ( ctx , ts ) ,
AlterIndexRequest : * r ,
}
2023-10-20 14:26:09 +08:00
default :
log . Warn ( "unknown request" , zap . Any ( "request" , request ) )
return
}
msgPack := & msgstream . MsgPack {
BeginTs : ts ,
EndTs : ts ,
Msgs : [ ] msgstream . TsMsg { tsMsg } ,
}
msgErr := replicateMsgStream . Produce ( msgPack )
// ignore the error if the msg stream failed to produce the msg,
// because it can be manually fixed in this error
if msgErr != nil {
log . Warn ( "send replicate msg failed" , zap . Any ( "pack" , msgPack ) , zap . Error ( msgErr ) )
}
}
2023-12-03 19:22:33 +08:00
2024-01-04 17:28:46 +08:00
func GetCachedCollectionSchema ( ctx context . Context , dbName string , colName string ) ( * schemaInfo , error ) {
2023-12-03 19:22:33 +08:00
if globalMetaCache != nil {
return globalMetaCache . GetCollectionSchema ( ctx , dbName , colName )
}
return nil , merr . WrapErrServiceNotReady ( paramtable . GetRole ( ) , paramtable . GetNodeID ( ) , "initialization" )
}
func CheckDatabase ( ctx context . Context , dbName string ) bool {
if globalMetaCache != nil {
return globalMetaCache . HasDatabase ( ctx , dbName )
}
return false
}
2024-03-28 06:33:11 +08:00
func SetReportValue ( status * commonpb . Status , value int ) {
if value <= 0 {
return
}
if ! merr . Ok ( status ) {
return
}
if status . ExtraInfo == nil {
status . ExtraInfo = make ( map [ string ] string )
}
status . ExtraInfo [ "report_value" ] = strconv . Itoa ( value )
}
2024-05-08 11:53:29 +08:00
func GetCostValue ( status * commonpb . Status ) int {
if status == nil || status . ExtraInfo == nil {
return 0
}
value , err := strconv . Atoi ( status . ExtraInfo [ "report_value" ] )
if err != nil {
return 0
}
return value
}
2024-05-24 14:19:41 +08:00
type isProxyRequestKeyType struct { }
var ctxProxyRequestKey = isProxyRequestKeyType { }
func SetRequestLabelForContext ( ctx context . Context ) context . Context {
return context . WithValue ( ctx , ctxProxyRequestKey , true )
}
func GetRequestLabelFromContext ( ctx context . Context ) bool {
if ctx == nil {
return false
}
v := ctx . Value ( ctxProxyRequestKey )
if v == nil {
return false
}
return v . ( bool )
}