enhance: Avoid create schema helper for each read task (#30981)

See also #30806

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-03-04 19:39:00 +08:00 committed by GitHub
parent bd063a352d
commit 9b3005f1be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 80 additions and 104 deletions

View File

@ -88,12 +88,7 @@ func ParseIdentifier(schema *typeutil.SchemaHelper, identifier string, checkFunc
return checkFunc(predicate.expr)
}
func CreateRetrievePlan(schemaPb *schemapb.CollectionSchema, exprStr string) (*planpb.PlanNode, error) {
schema, err := typeutil.CreateSchemaHelper(schemaPb)
if err != nil {
return nil, err
}
func CreateRetrievePlan(schema *typeutil.SchemaHelper, exprStr string) (*planpb.PlanNode, error) {
expr, err := ParseExpr(schema, exprStr)
if err != nil {
return nil, err
@ -109,12 +104,7 @@ func CreateRetrievePlan(schemaPb *schemapb.CollectionSchema, exprStr string) (*p
return planNode, nil
}
func CreateSearchPlan(schemaPb *schemapb.CollectionSchema, exprStr string, vectorFieldName string, queryInfo *planpb.QueryInfo) (*planpb.PlanNode, error) {
schema, err := typeutil.CreateSchemaHelper(schemaPb)
if err != nil {
return nil, err
}
func CreateSearchPlan(schema *typeutil.SchemaHelper, exprStr string, vectorFieldName string, queryInfo *planpb.QueryInfo) (*planpb.PlanNode, error) {
parse := func() (*planpb.Expr, error) {
if len(exprStr) <= 0 {
return nil, nil

View File

@ -5,6 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/planpb"
@ -47,6 +48,13 @@ func newTestSchema() *schemapb.CollectionSchema {
}
}
func newTestSchemaHelper(t *testing.T) *typeutil.SchemaHelper {
schema := newTestSchema()
schemaHelper, err := typeutil.CreateSchemaHelper(schema)
require.NoError(t, err)
return schemaHelper
}
func assertValidExpr(t *testing.T, helper *typeutil.SchemaHelper, exprStr string) {
_, err := ParseExpr(helper, exprStr)
assert.NoError(t, err, exprStr)
@ -382,13 +390,13 @@ func TestExpr_Combinations(t *testing.T) {
}
func TestCreateRetrievePlan(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
_, err := CreateRetrievePlan(schema, "Int64Field > 0")
assert.NoError(t, err)
}
func TestCreateSearchPlan(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
_, err := CreateSearchPlan(schema, `$meta["A"] != 10`, "FloatVectorField", &planpb.QueryInfo{
Topk: 0,
MetricType: "",
@ -399,7 +407,7 @@ func TestCreateSearchPlan(t *testing.T) {
}
func TestCreateFloat16SearchPlan(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
_, err := CreateSearchPlan(schema, `$meta["A"] != 10`, "Float16VectorField", &planpb.QueryInfo{
Topk: 0,
MetricType: "",
@ -410,7 +418,7 @@ func TestCreateFloat16SearchPlan(t *testing.T) {
}
func TestCreateBFloat16earchPlan(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
_, err := CreateSearchPlan(schema, `$meta["A"] != 10`, "BFloat16VectorField", &planpb.QueryInfo{
Topk: 0,
MetricType: "",
@ -557,42 +565,28 @@ func TestExpr_Invalid(t *testing.T) {
}
func TestCreateRetrievePlan_Invalid(t *testing.T) {
t.Run("invalid schema", func(t *testing.T) {
schema := newTestSchema()
schema.Fields = append(schema.Fields, schema.Fields[0])
_, err := CreateRetrievePlan(schema, "")
assert.Error(t, err)
})
t.Run("invalid expr", func(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
_, err := CreateRetrievePlan(schema, "invalid expression")
assert.Error(t, err)
})
}
func TestCreateSearchPlan_Invalid(t *testing.T) {
t.Run("invalid schema", func(t *testing.T) {
schema := newTestSchema()
schema.Fields = append(schema.Fields, schema.Fields[0])
_, err := CreateSearchPlan(schema, "", "", nil)
assert.Error(t, err)
})
t.Run("invalid expr", func(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
_, err := CreateSearchPlan(schema, "invalid expression", "", nil)
assert.Error(t, err)
})
t.Run("invalid vector field", func(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
_, err := CreateSearchPlan(schema, "Int64Field > 0", "not_exist", nil)
assert.Error(t, err)
})
t.Run("not vector type", func(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
_, err := CreateSearchPlan(schema, "Int64Field > 0", "VarCharField", nil)
assert.Error(t, err)
})
@ -641,7 +635,7 @@ func Test_handleExpr_17126_26662(t *testing.T) {
}
func Test_JSONExpr(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
expr := ""
var err error
// search
@ -700,7 +694,7 @@ func Test_JSONExpr(t *testing.T) {
}
func Test_InvalidExprOnJSONField(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
expr := ""
var err error
// search
@ -747,8 +741,9 @@ func Test_InvalidExprWithoutJSONField(t *testing.T) {
AutoID: true,
Fields: fields,
}
schemaHelper, err := typeutil.CreateSchemaHelper(schema)
require.NoError(t, err)
expr := ""
var err error
exprs := []string{
`A == 0`,
@ -761,7 +756,7 @@ func Test_InvalidExprWithoutJSONField(t *testing.T) {
}
for _, expr = range exprs {
_, err = CreateSearchPlan(schema, expr, "FloatVectorField", &planpb.QueryInfo{
_, err = CreateSearchPlan(schemaHelper, expr, "FloatVectorField", &planpb.QueryInfo{
Topk: 0,
MetricType: "",
SearchParams: "",
@ -785,9 +780,10 @@ func Test_InvalidExprWithMultipleJSONField(t *testing.T) {
AutoID: true,
Fields: fields,
}
schemaHelper, err := typeutil.CreateSchemaHelper(schema)
require.NoError(t, err)
expr := ""
var err error
exprs := []string{
`A == 0`,
`A in [1, 2, 3]`,
@ -797,7 +793,7 @@ func Test_InvalidExprWithMultipleJSONField(t *testing.T) {
}
for _, expr = range exprs {
_, err = CreateSearchPlan(schema, expr, "FloatVectorField", &planpb.QueryInfo{
_, err = CreateSearchPlan(schemaHelper, expr, "FloatVectorField", &planpb.QueryInfo{
Topk: 0,
MetricType: "",
SearchParams: "",
@ -808,7 +804,7 @@ func Test_InvalidExprWithMultipleJSONField(t *testing.T) {
}
func Test_exprWithSingleQuotes(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
expr := ""
var err error
exprs := []string{
@ -844,7 +840,7 @@ func Test_exprWithSingleQuotes(t *testing.T) {
}
func Test_JSONContains(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
expr := ""
var err error
exprs := []string{
@ -876,7 +872,7 @@ func Test_JSONContains(t *testing.T) {
}
func Test_InvalidJSONContains(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
expr := ""
var err error
exprs := []string{
@ -938,7 +934,7 @@ func Test_isEmptyExpression(t *testing.T) {
}
func Test_EscapeString(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
expr := ""
var err error
exprs := []string{
@ -989,7 +985,7 @@ c'`,
}
func Test_JSONContainsAll(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
expr := ""
var err error
var plan *planpb.PlanNode
@ -1035,7 +1031,7 @@ func Test_JSONContainsAll(t *testing.T) {
}
func Test_JSONContainsAny(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
expr := ""
var err error
var plan *planpb.PlanNode
@ -1081,7 +1077,7 @@ func Test_JSONContainsAny(t *testing.T) {
}
func Test_ArrayExpr(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
expr := ""
var err error
@ -1163,7 +1159,7 @@ func Test_ArrayExpr(t *testing.T) {
}
func Test_ArrayLength(t *testing.T) {
schema := newTestSchema()
schema := newTestSchemaHelper(t)
expr := ""
var err error

View File

@ -4,12 +4,14 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/parser/planparserv2"
"github.com/milvus-io/milvus/internal/proto/planpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestParsePartitionKeys(t *testing.T) {
@ -27,6 +29,9 @@ func TestParsePartitionKeys(t *testing.T) {
IsPartitionKey: true,
}
schema.Fields = append(schema.Fields, partitionKeyField)
schemaHelper, err := typeutil.CreateSchemaHelper(schema)
require.NoError(t, err)
fieldID := common.StartOfUserFieldID
for _, field := range schema.Fields {
field.FieldID = int64(fieldID)
@ -109,7 +114,7 @@ func TestParsePartitionKeys(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// test search plan
searchPlan, err := planparserv2.CreateSearchPlan(schema, tc.expr, "fvec_field", queryInfo)
searchPlan, err := planparserv2.CreateSearchPlan(schemaHelper, tc.expr, "fvec_field", queryInfo)
assert.NoError(t, err)
expr, err := ParseExprFromPlan(searchPlan)
assert.NoError(t, err)
@ -122,7 +127,7 @@ func TestParsePartitionKeys(t *testing.T) {
}
// test query plan
queryPlan, err := planparserv2.CreateRetrievePlan(schema, tc.expr)
queryPlan, err := planparserv2.CreateRetrievePlan(schemaHelper, tc.expr)
assert.NoError(t, err)
expr, err = ParseExprFromPlan(queryPlan)
assert.NoError(t, err)

View File

@ -114,6 +114,7 @@ type schemaInfo struct {
fieldMap *typeutil.ConcurrentMap[string, int64] // field name to id mapping
hasPartitionKeyField bool
pkField *schemapb.FieldSchema
schemaHelper *typeutil.SchemaHelper
}
func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
@ -129,11 +130,14 @@ func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
pkField = field
}
}
// schema shall be verified before
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
return &schemaInfo{
CollectionSchema: schema,
fieldMap: fieldMap,
hasPartitionKeyField: hasPartitionkey,
pkField: pkField,
schemaHelper: schemaHelper,
}
}

View File

@ -80,7 +80,7 @@ func initSearchRequest(ctx context.Context, t *searchTask) error {
}
t.offset = offset
plan, err := planparserv2.CreateSearchPlan(t.schema.CollectionSchema, t.request.Dsl, annsField, queryInfo)
plan, err := planparserv2.CreateSearchPlan(t.schema.schemaHelper, t.request.Dsl, annsField, queryInfo)
if err != nil {
log.Warn("failed to create query plan", zap.Error(err),
zap.String("dsl", t.request.Dsl), // may be very large if large term passed.

View File

@ -301,7 +301,7 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
}
func (dr *deleteRunner) Run(ctx context.Context) error {
plan, err := planparserv2.CreateRetrievePlan(dr.schema.CollectionSchema, dr.req.Expr)
plan, err := planparserv2.CreateRetrievePlan(dr.schema.schemaHelper, dr.req.Expr)
if err != nil {
return merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err)
}

View File

@ -8,6 +8,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
@ -24,10 +25,11 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func Test_getPrimaryKeysFromPlan(t *testing.T) {
schema := &schemapb.CollectionSchema{
collSchema := &schemapb.CollectionSchema{
Name: "test_delete",
Description: "",
AutoID: false,
@ -46,11 +48,14 @@ func Test_getPrimaryKeysFromPlan(t *testing.T) {
},
},
}
t.Run("delelte with complex pk expr", func(t *testing.T) {
schema, err := typeutil.CreateSchemaHelper(collSchema)
require.NoError(t, err)
t.Run("delete with complex pk expr", func(t *testing.T) {
expr := "pk < 4"
plan, err := planparserv2.CreateRetrievePlan(schema, expr)
assert.NoError(t, err)
isSimple, _, _ := getPrimaryKeysFromPlan(schema, plan)
isSimple, _, _ := getPrimaryKeysFromPlan(collSchema, plan)
assert.False(t, isSimple)
})
@ -58,7 +63,7 @@ func Test_getPrimaryKeysFromPlan(t *testing.T) {
expr := "non_pk == 1"
plan, err := planparserv2.CreateRetrievePlan(schema, expr)
assert.NoError(t, err)
isSimple, _, _ := getPrimaryKeysFromPlan(schema, plan)
isSimple, _, _ := getPrimaryKeysFromPlan(collSchema, plan)
assert.False(t, isSimple)
})
@ -66,7 +71,7 @@ func Test_getPrimaryKeysFromPlan(t *testing.T) {
expr := "pk in [1, 2, 3]"
plan, err := planparserv2.CreateRetrievePlan(schema, expr)
assert.NoError(t, err)
isSimple, _, rowNum := getPrimaryKeysFromPlan(schema, plan)
isSimple, _, rowNum := getPrimaryKeysFromPlan(collSchema, plan)
assert.True(t, isSimple)
assert.Equal(t, int64(3), rowNum)
})
@ -78,7 +83,7 @@ func Test_getPrimaryKeysFromPlan(t *testing.T) {
termExpr := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_TermExpr)
termExpr.TermExpr.ColumnInfo.DataType = -1
isSimple, _, _ := getPrimaryKeysFromPlan(schema, plan)
isSimple, _, _ := getPrimaryKeysFromPlan(collSchema, plan)
assert.False(t, isSimple)
})
@ -86,7 +91,7 @@ func Test_getPrimaryKeysFromPlan(t *testing.T) {
expr := "pk == 1"
plan, err := planparserv2.CreateRetrievePlan(schema, expr)
assert.NoError(t, err)
isSimple, _, rowNum := getPrimaryKeysFromPlan(schema, plan)
isSimple, _, rowNum := getPrimaryKeysFromPlan(collSchema, plan)
assert.True(t, isSimple)
assert.Equal(t, int64(1), rowNum)
})
@ -98,7 +103,7 @@ func Test_getPrimaryKeysFromPlan(t *testing.T) {
unaryRangeExpr := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_UnaryRangeExpr)
unaryRangeExpr.UnaryRangeExpr.ColumnInfo.DataType = -1
isSimple, _, _ := getPrimaryKeysFromPlan(schema, plan)
isSimple, _, _ := getPrimaryKeysFromPlan(collSchema, plan)
assert.False(t, isSimple)
})
}
@ -935,7 +940,9 @@ func TestDeleteRunner_StreamingQueryAndDelteFunc(t *testing.T) {
globalMetaCache = mockCache
defer func() { globalMetaCache = nil }()
plan, err := planparserv2.CreateRetrievePlan(dr.schema.CollectionSchema, dr.req.Expr)
schemaHelper, err := typeutil.CreateSchemaHelper(dr.schema.CollectionSchema)
require.NoError(t, err)
plan, err := planparserv2.CreateRetrievePlan(schemaHelper, dr.req.Expr)
assert.NoError(t, err)
queryFunc := dr.getStreamingQueryAndDelteFunc(plan)
assert.Error(t, queryFunc(ctx, 1, qn, ""))
@ -978,7 +985,9 @@ func TestDeleteRunner_StreamingQueryAndDelteFunc(t *testing.T) {
globalMetaCache = mockCache
defer func() { globalMetaCache = nil }()
plan, err := planparserv2.CreateRetrievePlan(dr.schema.CollectionSchema, dr.req.Expr)
schemaHelper, err := typeutil.CreateSchemaHelper(dr.schema.CollectionSchema)
require.NoError(t, err)
plan, err := planparserv2.CreateRetrievePlan(schemaHelper, dr.req.Expr)
assert.NoError(t, err)
queryFunc := dr.getStreamingQueryAndDelteFunc(plan)
assert.Error(t, queryFunc(ctx, 1, qn, ""))

View File

@ -182,7 +182,7 @@ func matchCountRule(outputs []string) bool {
return len(outputs) == 1 && strings.ToLower(strings.TrimSpace(outputs[0])) == "count(*)"
}
func createCntPlan(expr string, schema *schemapb.CollectionSchema) (*planpb.PlanNode, error) {
func createCntPlan(expr string, schemaHelper *typeutil.SchemaHelper) (*planpb.PlanNode, error) {
if expr == "" {
return &planpb.PlanNode{
Node: &planpb.PlanNode_Query{
@ -194,7 +194,7 @@ func createCntPlan(expr string, schema *schemapb.CollectionSchema) (*planpb.Plan
}, nil
}
plan, err := planparserv2.CreateRetrievePlan(schema, expr)
plan, err := planparserv2.CreateRetrievePlan(schemaHelper, expr)
if err != nil {
return nil, merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err)
}
@ -210,14 +210,14 @@ func (t *queryTask) createPlan(ctx context.Context) error {
cntMatch := matchCountRule(t.request.GetOutputFields())
if cntMatch {
var err error
t.plan, err = createCntPlan(t.request.GetExpr(), schema.CollectionSchema)
t.plan, err = createCntPlan(t.request.GetExpr(), schema.schemaHelper)
t.userOutputFields = []string{"count(*)"}
return err
}
var err error
if t.plan == nil {
t.plan, err = planparserv2.CreateRetrievePlan(schema.CollectionSchema, t.request.Expr)
t.plan, err = planparserv2.CreateRetrievePlan(schema.schemaHelper, t.request.Expr)
if err != nil {
return merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err)
}

View File

@ -824,11 +824,6 @@ func Test_createCntPlan(t *testing.T) {
assert.Nil(t, plan.GetQuery().GetPredicates())
})
t.Run("invalid schema", func(t *testing.T) {
_, err := createCntPlan("a > b", nil)
assert.Error(t, err)
})
t.Run("invalid schema", func(t *testing.T) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
@ -840,7 +835,9 @@ func Test_createCntPlan(t *testing.T) {
},
},
}
plan, err := createCntPlan("a > 4", schema)
schemaHelper, err := typeutil.CreateSchemaHelper(schema)
require.NoError(t, err)
plan, err := createCntPlan("a > 4", schemaHelper)
assert.NoError(t, err)
assert.True(t, plan.GetQuery().GetIsCount())
assert.NotNil(t, plan.GetQuery().GetPredicates())
@ -848,17 +845,8 @@ func Test_createCntPlan(t *testing.T) {
}
func Test_queryTask_createPlan(t *testing.T) {
collSchema := newTestSchema()
t.Run("match count rule", func(t *testing.T) {
collSchema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "a",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
}
schema := newSchemaInfo(collSchema)
tsk := &queryTask{
request: &milvuspb.QueryRequest{
@ -874,27 +862,18 @@ func Test_queryTask_createPlan(t *testing.T) {
})
t.Run("query without expression", func(t *testing.T) {
schema := newSchemaInfo(collSchema)
tsk := &queryTask{
request: &milvuspb.QueryRequest{
OutputFields: []string{"a"},
OutputFields: []string{"Int64"},
},
schema: &schemaInfo{},
schema: schema,
}
err := tsk.createPlan(context.TODO())
assert.Error(t, err)
})
t.Run("invalid expression", func(t *testing.T) {
collSchema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "a",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
}
schema := newSchemaInfo(collSchema)
tsk := &queryTask{
@ -909,16 +888,6 @@ func Test_queryTask_createPlan(t *testing.T) {
})
t.Run("invalid output fields", func(t *testing.T) {
collSchema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "a",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
}
schema := newSchemaInfo(collSchema)
tsk := &queryTask{

View File

@ -184,11 +184,14 @@ func ConstructCollectionSchemaWithPartitionKey(collectionName string, fieldName2
func constructCollectionSchemaByDataType(collectionName string, fieldName2DataType map[string]schemapb.DataType, primaryFieldName string, autoID bool) *schemapb.CollectionSchema {
fieldsSchema := make([]*schemapb.FieldSchema, 0)
idx := int64(100)
for fieldName, dataType := range fieldName2DataType {
fieldSchema := &schemapb.FieldSchema{
FieldID: idx,
Name: fieldName,
DataType: dataType,
}
idx++
if isVectorType(dataType) {
fieldSchema.TypeParams = []*commonpb.KeyValuePair{
{