feat: add clustering key in create/describe collection (#29506)

#28410
/kind feature

Signed-off-by: wayblink <anyang.wang@zilliz.com>
This commit is contained in:
wayblink 2024-01-07 19:56:48 +08:00 committed by GitHub
parent 156a0dd450
commit 635a7f777c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 299 additions and 65 deletions

2
go.mod
View File

@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.16.7
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231228051838-b5442d755fa4
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231229025438-39bce6abb18f
github.com/minio/minio-go/v7 v7.0.61
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0

4
go.sum
View File

@ -583,8 +583,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231228051838-b5442d755fa4 h1:nxIohfJOCMbixFAC3q4Lclmv0xg/8q6D8T7D8l258To=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231228051838-b5442d755fa4/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231229025438-39bce6abb18f h1:8lNcRqhQgUROtmtiIEdpQHGW82KMI5oASVKxkaZ/tBg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231229025438-39bce6abb18f/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092 h1:UYJ7JB+QlMOoFHNdd8mUa3/lV63t9dnBX7ILXmEEWPY=
github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=

View File

@ -7,19 +7,20 @@ import (
)
type Field struct {
FieldID int64
Name string
IsPrimaryKey bool
Description string
DataType schemapb.DataType
TypeParams []*commonpb.KeyValuePair
IndexParams []*commonpb.KeyValuePair
AutoID bool
State schemapb.FieldState
IsDynamic bool
IsPartitionKey bool // partition key mode, multi logic partitions share a physical partition
DefaultValue *schemapb.ValueField
ElementType schemapb.DataType
FieldID int64
Name string
IsPrimaryKey bool
Description string
DataType schemapb.DataType
TypeParams []*commonpb.KeyValuePair
IndexParams []*commonpb.KeyValuePair
AutoID bool
State schemapb.FieldState
IsDynamic bool
IsPartitionKey bool // partition key mode, multi logic partitions share a physical partition
IsClusteringKey bool
DefaultValue *schemapb.ValueField
ElementType schemapb.DataType
}
func (f *Field) Available() bool {
@ -28,19 +29,20 @@ func (f *Field) Available() bool {
func (f *Field) Clone() *Field {
return &Field{
FieldID: f.FieldID,
Name: f.Name,
IsPrimaryKey: f.IsPrimaryKey,
Description: f.Description,
DataType: f.DataType,
TypeParams: common.CloneKeyValuePairs(f.TypeParams),
IndexParams: common.CloneKeyValuePairs(f.IndexParams),
AutoID: f.AutoID,
State: f.State,
IsDynamic: f.IsDynamic,
IsPartitionKey: f.IsPartitionKey,
DefaultValue: f.DefaultValue,
ElementType: f.ElementType,
FieldID: f.FieldID,
Name: f.Name,
IsPrimaryKey: f.IsPrimaryKey,
Description: f.Description,
DataType: f.DataType,
TypeParams: common.CloneKeyValuePairs(f.TypeParams),
IndexParams: common.CloneKeyValuePairs(f.IndexParams),
AutoID: f.AutoID,
State: f.State,
IsDynamic: f.IsDynamic,
IsPartitionKey: f.IsPartitionKey,
IsClusteringKey: f.IsClusteringKey,
DefaultValue: f.DefaultValue,
ElementType: f.ElementType,
}
}
@ -68,6 +70,7 @@ func (f *Field) Equal(other Field) bool {
f.AutoID == other.AutoID &&
f.IsPartitionKey == other.IsPartitionKey &&
f.IsDynamic == other.IsDynamic &&
f.IsClusteringKey == other.IsClusteringKey &&
f.DefaultValue == other.DefaultValue &&
f.ElementType == other.ElementType
}
@ -91,18 +94,19 @@ func MarshalFieldModel(field *Field) *schemapb.FieldSchema {
}
return &schemapb.FieldSchema{
FieldID: field.FieldID,
Name: field.Name,
IsPrimaryKey: field.IsPrimaryKey,
Description: field.Description,
DataType: field.DataType,
TypeParams: field.TypeParams,
IndexParams: field.IndexParams,
AutoID: field.AutoID,
IsDynamic: field.IsDynamic,
IsPartitionKey: field.IsPartitionKey,
DefaultValue: field.DefaultValue,
ElementType: field.ElementType,
FieldID: field.FieldID,
Name: field.Name,
IsPrimaryKey: field.IsPrimaryKey,
Description: field.Description,
DataType: field.DataType,
TypeParams: field.TypeParams,
IndexParams: field.IndexParams,
AutoID: field.AutoID,
IsDynamic: field.IsDynamic,
IsPartitionKey: field.IsPartitionKey,
IsClusteringKey: field.IsClusteringKey,
DefaultValue: field.DefaultValue,
ElementType: field.ElementType,
}
}
@ -124,18 +128,19 @@ func UnmarshalFieldModel(fieldSchema *schemapb.FieldSchema) *Field {
}
return &Field{
FieldID: fieldSchema.FieldID,
Name: fieldSchema.Name,
IsPrimaryKey: fieldSchema.IsPrimaryKey,
Description: fieldSchema.Description,
DataType: fieldSchema.DataType,
TypeParams: fieldSchema.TypeParams,
IndexParams: fieldSchema.IndexParams,
AutoID: fieldSchema.AutoID,
IsDynamic: fieldSchema.IsDynamic,
IsPartitionKey: fieldSchema.IsPartitionKey,
DefaultValue: fieldSchema.DefaultValue,
ElementType: fieldSchema.ElementType,
FieldID: fieldSchema.FieldID,
Name: fieldSchema.Name,
IsPrimaryKey: fieldSchema.IsPrimaryKey,
Description: fieldSchema.Description,
DataType: fieldSchema.DataType,
TypeParams: fieldSchema.TypeParams,
IndexParams: fieldSchema.IndexParams,
AutoID: fieldSchema.AutoID,
IsDynamic: fieldSchema.IsDynamic,
IsPartitionKey: fieldSchema.IsPartitionKey,
IsClusteringKey: fieldSchema.IsClusteringKey,
DefaultValue: fieldSchema.DefaultValue,
ElementType: fieldSchema.ElementType,
}
}

View File

@ -208,6 +208,36 @@ func (t *createCollectionTask) validatePartitionKey() error {
return nil
}
func (t *createCollectionTask) validateClusteringKey() error {
idx := -1
for i, field := range t.schema.Fields {
if field.GetIsClusteringKey() {
if idx != -1 {
return merr.WrapErrCollectionIllegalSchema(t.CollectionName,
fmt.Sprintf("there are more than one clustering key, field name = %s, %s", t.schema.Fields[idx].Name, field.Name))
}
if field.GetIsPrimaryKey() {
return merr.WrapErrCollectionIllegalSchema(t.CollectionName,
fmt.Sprintf("the clustering key field must not be primary key field, field name = %s", field.Name))
}
if field.GetIsPartitionKey() {
return merr.WrapErrCollectionIllegalSchema(t.CollectionName,
fmt.Sprintf("the clustering key field must not be partition key field, field name = %s", field.Name))
}
idx = i
}
}
if idx != -1 {
log.Info("create collection with clustering key",
zap.String("collectionName", t.CollectionName),
zap.String("clusteringKeyField", t.schema.Fields[idx].Name))
}
return nil
}
func (t *createCollectionTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_CreateCollection
t.Base.SourceID = paramtable.GetNodeID()
@ -266,6 +296,11 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error {
return err
}
// validate clustering key
if err := t.validateClusteringKey(); err != nil {
return err
}
for _, field := range t.schema.Fields {
// validate field name
if err := validateFieldName(field.Name); err != nil {
@ -572,18 +607,19 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error {
}
if field.FieldID >= common.StartOfUserFieldID {
t.result.Schema.Fields = append(t.result.Schema.Fields, &schemapb.FieldSchema{
FieldID: field.FieldID,
Name: field.Name,
IsPrimaryKey: field.IsPrimaryKey,
AutoID: field.AutoID,
Description: field.Description,
DataType: field.DataType,
TypeParams: field.TypeParams,
IndexParams: field.IndexParams,
IsDynamic: field.IsDynamic,
IsPartitionKey: field.IsPartitionKey,
DefaultValue: field.DefaultValue,
ElementType: field.ElementType,
FieldID: field.FieldID,
Name: field.Name,
IsPrimaryKey: field.IsPrimaryKey,
AutoID: field.AutoID,
Description: field.Description,
DataType: field.DataType,
TypeParams: field.TypeParams,
IndexParams: field.IndexParams,
IsDynamic: field.IsDynamic,
IsPartitionKey: field.IsPartitionKey,
IsClusteringKey: field.IsClusteringKey,
DefaultValue: field.DefaultValue,
ElementType: field.ElementType,
})
}
}

View File

@ -3493,3 +3493,186 @@ func TestPartitionKey(t *testing.T) {
assert.Error(t, err)
})
}
func TestClusteringKey(t *testing.T) {
rc := NewRootCoordMock()
defer rc.Close()
qc := getQueryCoordClient()
ctx := context.Background()
mgr := newShardClientMgr()
err := InitMetaCache(ctx, rc, qc, mgr)
assert.NoError(t, err)
shardsNum := common.DefaultShardsNum
prefix := "TestClusteringKey"
collectionName := prefix + funcutil.GenRandomStr()
t.Run("create collection normal", func(t *testing.T) {
fieldName2Type := make(map[string]schemapb.DataType)
fieldName2Type["int64_field"] = schemapb.DataType_Int64
fieldName2Type["varChar_field"] = schemapb.DataType_VarChar
schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false)
fieldName2Type["cluster_key_field"] = schemapb.DataType_Int64
clusterKeyField := &schemapb.FieldSchema{
Name: "cluster_key_field",
DataType: schemapb.DataType_Int64,
IsClusteringKey: true,
}
schema.Fields = append(schema.Fields, clusterKeyField)
vecField := &schemapb.FieldSchema{
Name: "fvec_field",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(testVecDim),
},
},
}
schema.Fields = append(schema.Fields, vecField)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
createCollectionTask := &createCollectionTask{
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
Timestamp: Timestamp(time.Now().UnixNano()),
},
DbName: "",
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: shardsNum,
},
ctx: ctx,
rootCoord: rc,
result: nil,
schema: nil,
}
err = createCollectionTask.PreExecute(ctx)
assert.NoError(t, err)
err = createCollectionTask.Execute(ctx)
assert.NoError(t, err)
})
t.Run("create collection clustering key can not be partition key", func(t *testing.T) {
fieldName2Type := make(map[string]schemapb.DataType)
fieldName2Type["int64_field"] = schemapb.DataType_Int64
fieldName2Type["varChar_field"] = schemapb.DataType_VarChar
fieldName2Type["fvec_field"] = schemapb.DataType_FloatVector
schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false)
fieldName2Type["cluster_key_field"] = schemapb.DataType_Int64
clusterKeyField := &schemapb.FieldSchema{
Name: "cluster_key_field",
DataType: schemapb.DataType_Int64,
IsClusteringKey: true,
IsPartitionKey: true,
}
schema.Fields = append(schema.Fields, clusterKeyField)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
createCollectionTask := &createCollectionTask{
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
Timestamp: Timestamp(time.Now().UnixNano()),
},
DbName: "",
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: shardsNum,
},
ctx: ctx,
rootCoord: rc,
result: nil,
schema: nil,
}
err = createCollectionTask.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("create collection clustering key can not be primary key", func(t *testing.T) {
fieldName2Type := make(map[string]schemapb.DataType)
fieldName2Type["varChar_field"] = schemapb.DataType_VarChar
fieldName2Type["fvec_field"] = schemapb.DataType_FloatVector
schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false)
fieldName2Type["cluster_key_field"] = schemapb.DataType_Int64
clusterKeyField := &schemapb.FieldSchema{
Name: "cluster_key_field",
DataType: schemapb.DataType_Int64,
IsClusteringKey: true,
IsPrimaryKey: true,
}
schema.Fields = append(schema.Fields, clusterKeyField)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
createCollectionTask := &createCollectionTask{
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
Timestamp: Timestamp(time.Now().UnixNano()),
},
DbName: "",
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: shardsNum,
},
ctx: ctx,
rootCoord: rc,
result: nil,
schema: nil,
}
err = createCollectionTask.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("create collection not support more than one clustering key", func(t *testing.T) {
fieldName2Type := make(map[string]schemapb.DataType)
fieldName2Type["int64_field"] = schemapb.DataType_Int64
fieldName2Type["varChar_field"] = schemapb.DataType_VarChar
schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false)
fieldName2Type["cluster_key_field"] = schemapb.DataType_Int64
clusterKeyField := &schemapb.FieldSchema{
Name: "cluster_key_field",
DataType: schemapb.DataType_Int64,
IsClusteringKey: true,
}
schema.Fields = append(schema.Fields, clusterKeyField)
clusterKeyField2 := &schemapb.FieldSchema{
Name: "cluster_key_field2",
DataType: schemapb.DataType_Int64,
IsClusteringKey: true,
}
schema.Fields = append(schema.Fields, clusterKeyField2)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
createCollectionTask := &createCollectionTask{
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
Timestamp: Timestamp(time.Now().UnixNano()),
},
DbName: "",
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: shardsNum,
},
ctx: ctx,
rootCoord: rc,
result: nil,
schema: nil,
}
err = createCollectionTask.PreExecute(ctx)
assert.Error(t, err)
})
}

View File

@ -50,6 +50,7 @@ var (
ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false)
ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true)
ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false)
ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false)
// Partition related
ErrPartitionNotFound = newMilvusError("partition not found", 200, false)

View File

@ -465,6 +465,15 @@ func WrapErrCollectionLoaded(collection string, msgAndArgs ...any) error {
return err
}
func WrapErrCollectionIllegalSchema(collection string, msgAndArgs ...any) error {
err := wrapFields(ErrCollectionIllegalSchema, value("collection", collection))
if len(msgAndArgs) > 0 {
msg := msgAndArgs[0].(string)
err = errors.Wrapf(err, msg, msgAndArgs[1:]...)
}
return err
}
func WrapErrAliasNotFound(db any, alias any, msg ...string) error {
err := wrapFields(ErrAliasNotFound,
value("database", db),