Refine catalog of partition & alias. (#18546)

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
Jiquan Long 2022-08-10 10:22:38 +08:00 committed by GitHub
parent 097f144db1
commit e7b3bacbec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1378 additions and 271 deletions

View File

@ -0,0 +1,53 @@
package kv
import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type mockSnapshotKV struct {
SnapShotKV
SaveFunc func(key string, value string, ts typeutil.Timestamp) error
LoadFunc func(key string, ts typeutil.Timestamp) (string, error)
MultiSaveFunc func(kvs map[string]string, ts typeutil.Timestamp) error
LoadWithPrefixFunc func(key string, ts typeutil.Timestamp) ([]string, []string, error)
MultiSaveAndRemoveWithPrefixFunc func(saves map[string]string, removals []string, ts typeutil.Timestamp) error
}
func NewMockSnapshotKV() *mockSnapshotKV {
return &mockSnapshotKV{}
}
func (m mockSnapshotKV) Save(key string, value string, ts typeutil.Timestamp) error {
if m.SaveFunc != nil {
return m.SaveFunc(key, value, ts)
}
return nil
}
func (m mockSnapshotKV) Load(key string, ts typeutil.Timestamp) (string, error) {
if m.LoadFunc != nil {
return m.LoadFunc(key, ts)
}
return "", nil
}
func (m mockSnapshotKV) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error {
if m.MultiSaveFunc != nil {
return m.MultiSaveFunc(kvs, ts)
}
return nil
}
func (m mockSnapshotKV) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
if m.LoadWithPrefixFunc != nil {
return m.LoadWithPrefixFunc(key, ts)
}
return nil, nil, nil
}
func (m mockSnapshotKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
if m.MultiSaveAndRemoveWithPrefixFunc != nil {
return m.MultiSaveAndRemoveWithPrefixFunc(saves, removals, ts)
}
return nil
}

View File

@ -0,0 +1,89 @@
package kv
import (
"testing"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
)
func Test_mockSnapshotKV_Save(t *testing.T) {
t.Run("func not set", func(t *testing.T) {
snapshot := NewMockSnapshotKV()
err := snapshot.Save("k", "v", 0)
assert.NoError(t, err)
})
t.Run("func set", func(t *testing.T) {
snapshot := NewMockSnapshotKV()
snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error {
return nil
}
err := snapshot.Save("k", "v", 0)
assert.NoError(t, err)
})
}
func Test_mockSnapshotKV_Load(t *testing.T) {
t.Run("func not set", func(t *testing.T) {
snapshot := NewMockSnapshotKV()
_, err := snapshot.Load("k", 0)
assert.NoError(t, err)
})
t.Run("func set", func(t *testing.T) {
snapshot := NewMockSnapshotKV()
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return "", nil
}
_, err := snapshot.Load("k", 0)
assert.NoError(t, err)
})
}
func Test_mockSnapshotKV_MultiSave(t *testing.T) {
t.Run("func not set", func(t *testing.T) {
snapshot := NewMockSnapshotKV()
err := snapshot.MultiSave(nil, 0)
assert.NoError(t, err)
})
t.Run("func set", func(t *testing.T) {
snapshot := NewMockSnapshotKV()
snapshot.MultiSaveFunc = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
err := snapshot.MultiSave(nil, 0)
assert.NoError(t, err)
})
}
func Test_mockSnapshotKV_LoadWithPrefix(t *testing.T) {
t.Run("func not set", func(t *testing.T) {
snapshot := NewMockSnapshotKV()
_, _, err := snapshot.LoadWithPrefix("prefix", 0)
assert.NoError(t, err)
})
t.Run("func set", func(t *testing.T) {
snapshot := NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, nil
}
_, _, err := snapshot.LoadWithPrefix("prefix", 0)
assert.NoError(t, err)
})
}
func Test_mockSnapshotKV_MultiSaveAndRemoveWithPrefix(t *testing.T) {
t.Run("func not set", func(t *testing.T) {
snapshot := NewMockSnapshotKV()
err := snapshot.MultiSaveAndRemoveWithPrefix(nil, nil, 0)
assert.NoError(t, err)
})
t.Run("func set", func(t *testing.T) {
snapshot := NewMockSnapshotKV()
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return nil
}
err := snapshot.MultiSaveAndRemoveWithPrefix(nil, nil, 0)
assert.NoError(t, err)
})
}

View File

@ -17,8 +17,8 @@ type Catalog interface {
CollectionExists(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) bool
DropCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error
CreatePartition(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error
DropPartition(ctx context.Context, collectionInfo *model.Collection, partitionID typeutil.UniqueID, ts typeutil.Timestamp) error
CreatePartition(ctx context.Context, partition *model.Partition, ts typeutil.Timestamp) error
DropPartition(ctx context.Context, collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) error
CreateIndex(ctx context.Context, col *model.Collection, index *model.Index) error
// AlterIndex newIndex only contains updated parts
@ -26,10 +26,10 @@ type Catalog interface {
DropIndex(ctx context.Context, collectionInfo *model.Collection, dropIdxID typeutil.UniqueID, ts typeutil.Timestamp) error
ListIndexes(ctx context.Context) ([]*model.Index, error)
CreateAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error
DropAlias(ctx context.Context, collectionID typeutil.UniqueID, alias string, ts typeutil.Timestamp) error
AlterAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error
ListAliases(ctx context.Context) ([]*model.Collection, error)
CreateAlias(ctx context.Context, alias *model.Alias, ts typeutil.Timestamp) error
DropAlias(ctx context.Context, alias string, ts typeutil.Timestamp) error
AlterAlias(ctx context.Context, alias *model.Alias, ts typeutil.Timestamp) error
ListAliases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error)
GetCredential(ctx context.Context, username string) (*model.Credential, error)
CreateCredential(ctx context.Context, credential *model.Credential) error

View File

@ -28,13 +28,84 @@ import (
"go.uber.org/zap"
)
// prefix/collection/collection_id -> CollectionInfo
// prefix/partitions/collection_id/partition_id -> PartitionInfo
// prefix/aliases/alias_name -> AliasInfo
// prefix/fields/collection_id/field_id -> FieldSchema
type Catalog struct {
Txn kv.TxnKV
Snapshot kv.SnapShotKV
}
func buildCollectionKey(collectionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID)
}
func buildPartitionPrefix(collectionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d", PartitionMetaPrefix, collectionID)
}
func buildPartitionKey(collectionID, partitionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d", buildPartitionPrefix(collectionID), partitionID)
}
func buildFieldPrefix(collectionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d", FieldMetaPrefix, collectionID)
}
func buildFieldKey(collectionID typeutil.UniqueID, fieldID int64) string {
return fmt.Sprintf("%s/%d", buildFieldPrefix(collectionID), fieldID)
}
func buildAliasKey(aliasName string) string {
return fmt.Sprintf("%s/%s", AliasMetaPrefix, aliasName)
}
func buildKvs(keys, values []string) (map[string]string, error) {
if len(keys) != len(values) {
return nil, fmt.Errorf("length of keys (%d) and values (%d) are not equal", len(keys), len(values))
}
ret := make(map[string]string, len(keys))
for i, k := range keys {
_, ok := ret[k]
if ok {
return nil, fmt.Errorf("duplicated key was found: %s", k)
}
ret[k] = values[i]
}
return ret, nil
}
// TODO: atomicity should be promised outside.
func batchSave(snapshot kv.SnapShotKV, maxTxnNum int, kvs map[string]string, ts typeutil.Timestamp) error {
keys := make([]string, 0, len(kvs))
values := make([]string, 0, len(kvs))
for k, v := range kvs {
keys = append(keys, k)
values = append(values, v)
}
min := func(a, b int) int {
if a < b {
return a
}
return b
}
for i := 0; i < len(kvs); i = i + maxTxnNum {
end := min(i+maxTxnNum, len(keys))
batch, err := buildKvs(keys[i:end], values[i:end])
if err != nil {
return err
}
// TODO: atomicity is not promised. Garbage will be generated.
if err := snapshot.MultiSave(batch, ts); err != nil {
return err
}
}
return nil
}
func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.CollectionID)
k1 := buildCollectionKey(coll.CollectionID)
collInfo := model.MarshalCollectionModel(coll)
v1, err := proto.Marshal(collInfo)
if err != nil {
@ -42,46 +113,100 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection,
return err
}
// save ddOpStr into etcd
kvs := map[string]string{k1: string(v1)}
for k, v := range coll.Extra {
kvs[k] = v
// save partition info to newly path.
for _, partition := range coll.Partitions {
k := buildPartitionKey(coll.CollectionID, partition.PartitionID)
partitionInfo := model.MarshalPartitionModel(partition)
v, err := proto.Marshal(partitionInfo)
if err != nil {
return err
}
kvs[k] = string(v)
}
err = kc.Snapshot.MultiSave(kvs, ts)
if err != nil {
log.Error("create collection persist meta fail", zap.String("key", k1), zap.Error(err))
return err
// no default aliases will be created.
// save fields info to newly path.
for _, field := range coll.Fields {
k := buildFieldKey(coll.CollectionID, field.FieldID)
fieldInfo := model.MarshalFieldModel(field)
v, err := proto.Marshal(fieldInfo)
if err != nil {
return err
}
kvs[k] = string(v)
}
return nil
// TODO: atomicity should be promised outside.
maxTxnNum := 64
return batchSave(kc.Snapshot, maxTxnNum, kvs, ts)
}
func (kc *Catalog) CreatePartition(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.CollectionID)
collInfo := model.MarshalCollectionModel(coll)
v1, err := proto.Marshal(collInfo)
func (kc *Catalog) loadCollection(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
collKey := buildCollectionKey(collectionID)
collVal, err := kc.Snapshot.Load(collKey, ts)
if err != nil {
log.Error("get collection meta fail", zap.String("key", collKey), zap.Error(err))
return nil, err
}
collMeta := &pb.CollectionInfo{}
err = proto.Unmarshal([]byte(collVal), collMeta)
return collMeta, err
}
func partitionVersionAfter210(collMeta *pb.CollectionInfo) bool {
return len(collMeta.GetPartitionIDs()) <= 0 &&
len(collMeta.GetPartitionNames()) <= 0 &&
len(collMeta.GetPartitionCreatedTimestamps()) <= 0
}
func partitionExistByID(collMeta *pb.CollectionInfo, partitionID typeutil.UniqueID) bool {
return funcutil.SliceContain(collMeta.GetPartitionIDs(), partitionID)
}
func partitionExistByName(collMeta *pb.CollectionInfo, partitionName string) bool {
return funcutil.SliceContain(collMeta.GetPartitionNames(), partitionName)
}
func (kc *Catalog) CreatePartition(ctx context.Context, partition *model.Partition, ts typeutil.Timestamp) error {
collMeta, err := kc.loadCollection(ctx, partition.CollectionID, ts)
if err != nil {
log.Error("create partition marshal fail", zap.String("key", k1), zap.Error(err))
return err
}
kvs := map[string]string{k1: string(v1)}
err = kc.Snapshot.MultiSave(kvs, ts)
if err != nil {
log.Error("create partition persist meta fail", zap.String("key", k1), zap.Error(err))
return err
if partitionVersionAfter210(collMeta) {
// save to newly path.
k := buildPartitionKey(partition.CollectionID, partition.PartitionID)
partitionInfo := model.MarshalPartitionModel(partition)
v, err := proto.Marshal(partitionInfo)
if err != nil {
return err
}
return kc.Snapshot.Save(k, string(v), ts)
}
// save ddOpStr into etcd
err = kc.Txn.MultiSave(coll.Extra)
if err != nil {
// will not panic, missing create msg
log.Warn("create partition persist ddop meta fail", zap.Int64("collectionID", coll.CollectionID), zap.Error(err))
return err
if partitionExistByID(collMeta, partition.PartitionID) {
return fmt.Errorf("partition already exist: %d", partition.PartitionID)
}
return nil
if partitionExistByName(collMeta, partition.PartitionName) {
return fmt.Errorf("partition already exist: %s", partition.PartitionName)
}
// keep consistent with older version, otherwise it's hard to judge where to find partitions.
collMeta.PartitionIDs = append(collMeta.PartitionIDs, partition.PartitionID)
collMeta.PartitionNames = append(collMeta.PartitionNames, partition.PartitionName)
collMeta.PartitionCreatedTimestamps = append(collMeta.PartitionCreatedTimestamps, partition.PartitionCreatedTimestamp)
k := buildCollectionKey(partition.CollectionID)
v, err := proto.Marshal(collMeta)
if err != nil {
return err
}
return kc.Snapshot.Save(k, string(v), ts)
}
func (kc *Catalog) CreateIndex(ctx context.Context, col *model.Collection, index *model.Index) error {
@ -192,21 +317,16 @@ func (kc *Catalog) AlterIndex(ctx context.Context, oldIndex *model.Index, newInd
}
}
func (kc *Catalog) CreateAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error {
k := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collection.Aliases[0])
v, err := proto.Marshal(&pb.CollectionInfo{ID: collection.CollectionID, Schema: &schemapb.CollectionSchema{Name: collection.Aliases[0]}})
func (kc *Catalog) CreateAlias(ctx context.Context, alias *model.Alias, ts typeutil.Timestamp) error {
oldKBefore210 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias.Name)
k := buildAliasKey(alias.Name)
aliasInfo := model.MarshalAliasModel(alias)
v, err := proto.Marshal(aliasInfo)
if err != nil {
log.Error("create alias marshal fail", zap.String("key", k), zap.Error(err))
return err
}
err = kc.Snapshot.Save(k, string(v), ts)
if err != nil {
log.Error("create alias persist meta fail", zap.String("key", k), zap.Error(err))
return err
}
return nil
kvs := map[string]string{k: string(v)}
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(kvs, []string{oldKBefore210}, ts)
}
func (kc *Catalog) CreateCredential(ctx context.Context, credential *model.Credential) error {
@ -226,22 +346,73 @@ func (kc *Catalog) CreateCredential(ctx context.Context, credential *model.Crede
return nil
}
func (kc *Catalog) GetCollectionByID(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*model.Collection, error) {
collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID)
collVal, err := kc.Snapshot.Load(collKey, ts)
func (kc *Catalog) listPartitionsAfter210(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) ([]*model.Partition, error) {
prefix := buildPartitionPrefix(collectionID)
_, values, err := kc.Snapshot.LoadWithPrefix(prefix, ts)
if err != nil {
log.Error("get collection meta fail", zap.String("key", collKey), zap.Error(err))
return nil, err
}
partitions := make([]*model.Partition, 0, len(values))
for _, v := range values {
partitionMeta := &pb.PartitionInfo{}
err := proto.Unmarshal([]byte(v), partitionMeta)
if err != nil {
return nil, err
}
partitions = append(partitions, model.UnmarshalPartitionModel(partitionMeta))
}
return partitions, nil
}
collMeta := &pb.CollectionInfo{}
err = proto.Unmarshal([]byte(collVal), collMeta)
func fieldVersionAfter210(collMeta *pb.CollectionInfo) bool {
return len(collMeta.GetSchema().GetFields()) <= 0
}
func (kc *Catalog) listFieldsAfter210(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) ([]*model.Field, error) {
prefix := buildFieldPrefix(collectionID)
_, values, err := kc.Snapshot.LoadWithPrefix(prefix, ts)
if err != nil {
return nil, err
}
fields := make([]*model.Field, 0, len(values))
for _, v := range values {
partitionMeta := &schemapb.FieldSchema{}
err := proto.Unmarshal([]byte(v), partitionMeta)
if err != nil {
return nil, err
}
fields = append(fields, model.UnmarshalFieldModel(partitionMeta))
}
return fields, nil
}
func (kc *Catalog) GetCollectionByID(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*model.Collection, error) {
collKey := buildCollectionKey(collectionID)
collMeta, err := kc.loadCollection(ctx, collectionID, ts)
if err != nil {
log.Error("collection meta marshal fail", zap.String("key", collKey), zap.Error(err))
return nil, err
}
return model.UnmarshalCollectionModel(collMeta), nil
collection := model.UnmarshalCollectionModel(collMeta)
if !partitionVersionAfter210(collMeta) && !fieldVersionAfter210(collMeta) {
return collection, nil
}
partitions, err := kc.listPartitionsAfter210(ctx, collectionID, ts)
if err != nil {
return nil, err
}
collection.Partitions = partitions
fields, err := kc.listFieldsAfter210(ctx, collectionID, ts)
if err != nil {
return nil, err
}
collection.Fields = fields
return collection, nil
}
func (kc *Catalog) CollectionExists(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) bool {
@ -266,8 +437,8 @@ func (kc *Catalog) GetCredential(ctx context.Context, username string) (*model.C
return &model.Credential{Username: username, EncryptedPassword: credentialInfo.EncryptedPassword}, nil
}
func (kc *Catalog) AlterAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error {
return kc.CreateAlias(ctx, collection, ts)
func (kc *Catalog) AlterAlias(ctx context.Context, alias *model.Alias, ts typeutil.Timestamp) error {
return kc.CreateAlias(ctx, alias, ts)
}
func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error {
@ -306,45 +477,45 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col
return nil
}
func (kc *Catalog) DropPartition(ctx context.Context, collectionInfo *model.Collection, partitionID typeutil.UniqueID, ts typeutil.Timestamp) error {
collMeta := model.MarshalCollectionModel(collectionInfo)
k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collectionInfo.CollectionID, 10))
func dropPartition(collMeta *pb.CollectionInfo, partitionID typeutil.UniqueID) {
if collMeta == nil {
return
}
{
loc := -1
for idx, pid := range collMeta.GetPartitionIDs() {
if pid == partitionID {
loc = idx
break
}
}
if loc != -1 {
collMeta.PartitionIDs = append(collMeta.GetPartitionIDs()[:loc], collMeta.GetPartitionIDs()[loc+1:]...)
collMeta.PartitionNames = append(collMeta.GetPartitionNames()[:loc], collMeta.GetPartitionNames()[loc+1:]...)
collMeta.PartitionCreatedTimestamps = append(collMeta.GetPartitionCreatedTimestamps()[:loc], collMeta.GetPartitionCreatedTimestamps()[loc+1:]...)
}
}
}
func (kc *Catalog) DropPartition(ctx context.Context, collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) error {
collMeta, err := kc.loadCollection(ctx, collectionID, ts)
if err != nil {
return err
}
if partitionVersionAfter210(collMeta) {
k := buildPartitionKey(collectionID, partitionID)
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k}, ts)
}
k := buildCollectionKey(collectionID)
dropPartition(collMeta, partitionID)
v, err := proto.Marshal(collMeta)
if err != nil {
log.Error("drop partition marshal fail", zap.String("key", k), zap.Error(err))
return err
}
err = kc.Snapshot.Save(k, string(v), ts)
if err != nil {
log.Error("drop partition update collection meta fail",
zap.Int64("collectionID", collectionInfo.CollectionID),
zap.Int64("partitionID", partitionID),
zap.Error(err))
return err
}
var delMetaKeys []string
for _, idxInfo := range collMeta.FieldIndexes {
k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partitionID)
delMetaKeys = append(delMetaKeys, k)
}
// Txn operation
metaTxn := map[string]string{}
for k, v := range collectionInfo.Extra {
metaTxn[k] = v
}
err = kc.Txn.MultiSaveAndRemoveWithPrefix(metaTxn, delMetaKeys)
if err != nil {
log.Warn("drop partition update meta fail",
zap.Int64("collectionID", collectionInfo.CollectionID),
zap.Int64("partitionID", partitionID),
zap.Error(err))
return err
}
return nil
return kc.Snapshot.Save(k, string(v), ts)
}
func (kc *Catalog) DropIndex(ctx context.Context, collectionInfo *model.Collection, dropIdxID typeutil.UniqueID, ts typeutil.Timestamp) error {
@ -386,19 +557,10 @@ func (kc *Catalog) DropCredential(ctx context.Context, username string) error {
return nil
}
func (kc *Catalog) DropAlias(ctx context.Context, collectionID typeutil.UniqueID, alias string, ts typeutil.Timestamp) error {
delMetakeys := []string{
fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias),
}
meta := make(map[string]string)
err := kc.Snapshot.MultiSaveAndRemoveWithPrefix(meta, delMetakeys, ts)
if err != nil {
log.Error("drop alias update meta fail", zap.String("alias", alias), zap.Error(err))
return err
}
return nil
func (kc *Catalog) DropAlias(ctx context.Context, alias string, ts typeutil.Timestamp) error {
oldKBefore210 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias)
k := buildAliasKey(alias)
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k, oldKBefore210}, ts)
}
func (kc *Catalog) GetCollectionByName(ctx context.Context, collectionName string, ts typeutil.Timestamp) (*model.Collection, error) {
@ -416,7 +578,8 @@ func (kc *Catalog) GetCollectionByName(ctx context.Context, collectionName strin
continue
}
if colMeta.Schema.Name == collectionName {
return model.UnmarshalCollectionModel(&colMeta), nil
// compatibility handled by kc.GetCollectionByID.
return kc.GetCollectionByID(ctx, colMeta.GetID(), ts)
}
}
@ -441,31 +604,71 @@ func (kc *Catalog) ListCollections(ctx context.Context, ts typeutil.Timestamp) (
log.Warn("unmarshal collection info failed", zap.Error(err))
continue
}
colls[collMeta.Schema.Name] = model.UnmarshalCollectionModel(&collMeta)
collection, err := kc.GetCollectionByID(ctx, collMeta.GetID(), ts)
if err != nil {
return nil, err
}
colls[collMeta.Schema.Name] = collection
}
return colls, nil
}
func (kc *Catalog) ListAliases(ctx context.Context) ([]*model.Collection, error) {
_, values, err := kc.Snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, 0)
func (kc *Catalog) listAliasesBefore210(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) {
_, values, err := kc.Snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, ts)
if err != nil {
log.Error("get aliases meta fail", zap.String("prefix", CollectionAliasMetaPrefix), zap.Error(err))
return nil, err
}
var colls []*model.Collection
// aliases before 210 stored by CollectionInfo.
aliases := make([]*model.Alias, 0, len(values))
for _, value := range values {
aliasInfo := pb.CollectionInfo{}
err = proto.Unmarshal([]byte(value), &aliasInfo)
coll := &pb.CollectionInfo{}
err := proto.Unmarshal([]byte(value), coll)
if err != nil {
log.Warn("unmarshal aliases failed", zap.Error(err))
continue
return nil, err
}
colls = append(colls, model.UnmarshalCollectionModel(&aliasInfo))
aliases = append(aliases, &model.Alias{
Name: coll.GetSchema().GetName(),
CollectionID: coll.GetID(),
CreatedTime: 0, // not accurate.
})
}
return aliases, nil
}
return colls, nil
func (kc *Catalog) listAliasesAfter210(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) {
_, values, err := kc.Snapshot.LoadWithPrefix(AliasMetaPrefix, ts)
if err != nil {
return nil, err
}
// aliases after 210 stored by AliasInfo.
aliases := make([]*model.Alias, 0, len(values))
for _, value := range values {
info := &pb.AliasInfo{}
err := proto.Unmarshal([]byte(value), info)
if err != nil {
return nil, err
}
aliases = append(aliases, &model.Alias{
Name: info.GetAliasName(),
CollectionID: info.GetCollectionId(),
CreatedTime: info.GetCreatedTime(),
})
}
return aliases, nil
}
func (kc *Catalog) ListAliases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) {
aliases1, err := kc.listAliasesBefore210(ctx, ts)
if err != nil {
return nil, err
}
aliases2, err := kc.listAliasesAfter210(ctx, ts)
if err != nil {
return nil, err
}
aliases := append(aliases1, aliases2...)
return aliases, nil
}
func (kc *Catalog) listSegmentIndexes(ctx context.Context) (map[int64]*model.Index, error) {

View File

@ -5,6 +5,10 @@ import (
"errors"
"testing"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/proto/commonpb"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
@ -164,3 +168,691 @@ func Test_ListIndexes(t *testing.T) {
})
}
func TestCatalog_loadCollection(t *testing.T) {
t.Run("load failed", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return "", errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
_, err := kc.loadCollection(ctx, 1, 0)
assert.Error(t, err)
})
t.Run("load, not collection info", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return "not in pb format", nil
}
kc := Catalog{Snapshot: snapshot}
_, err := kc.loadCollection(ctx, 1, 0)
assert.Error(t, err)
})
t.Run("load, normal collection info", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
coll := &pb.CollectionInfo{ID: 1}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return string(value), nil
}
kc := Catalog{Snapshot: snapshot}
got, err := kc.loadCollection(ctx, 1, 0)
assert.NoError(t, err)
assert.Equal(t, got.GetID(), coll.GetID())
})
}
func Test_partitionVersionAfter210(t *testing.T) {
t.Run("after 210", func(t *testing.T) {
coll := &pb.CollectionInfo{}
assert.True(t, partitionVersionAfter210(coll))
})
t.Run("before 210", func(t *testing.T) {
coll := &pb.CollectionInfo{
PartitionIDs: []int64{1},
PartitionNames: []string{"partition"},
PartitionCreatedTimestamps: []uint64{2},
}
assert.False(t, partitionVersionAfter210(coll))
})
}
func Test_partitionExist(t *testing.T) {
t.Run("in partitions ids", func(t *testing.T) {
coll := &pb.CollectionInfo{
PartitionIDs: []int64{1},
}
assert.True(t, partitionExistByID(coll, 1))
})
t.Run("not exist", func(t *testing.T) {
coll := &pb.CollectionInfo{}
assert.False(t, partitionExistByID(coll, 1))
})
}
func Test_partitionExistByName(t *testing.T) {
t.Run("in partitions ids", func(t *testing.T) {
coll := &pb.CollectionInfo{
PartitionNames: []string{"partition"},
}
assert.True(t, partitionExistByName(coll, "partition"))
})
t.Run("not exist", func(t *testing.T) {
coll := &pb.CollectionInfo{}
assert.False(t, partitionExistByName(coll, "partition"))
})
}
func TestCatalog_CreatePartitionV2(t *testing.T) {
t.Run("collection not exist", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return "", errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
err := kc.CreatePartition(ctx, &model.Partition{}, 0)
assert.Error(t, err)
})
t.Run("partition version after 210", func(t *testing.T) {
ctx := context.Background()
coll := &pb.CollectionInfo{}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return string(value), nil
}
snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error {
return errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
err = kc.CreatePartition(ctx, &model.Partition{}, 0)
assert.Error(t, err)
snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error {
return nil
}
err = kc.CreatePartition(ctx, &model.Partition{}, 0)
assert.NoError(t, err)
})
t.Run("partition version before 210, id exist", func(t *testing.T) {
ctx := context.Background()
partID := typeutil.UniqueID(1)
coll := &pb.CollectionInfo{PartitionIDs: []int64{partID}}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return string(value), nil
}
kc := Catalog{Snapshot: snapshot}
err = kc.CreatePartition(ctx, &model.Partition{PartitionID: partID}, 0)
assert.Error(t, err)
})
t.Run("partition version before 210, name exist", func(t *testing.T) {
ctx := context.Background()
partition := "partition"
coll := &pb.CollectionInfo{PartitionNames: []string{partition}}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return string(value), nil
}
kc := Catalog{Snapshot: snapshot}
err = kc.CreatePartition(ctx, &model.Partition{PartitionName: partition}, 0)
assert.Error(t, err)
})
t.Run("partition version before 210, not exist", func(t *testing.T) {
ctx := context.Background()
coll := &pb.CollectionInfo{
PartitionNames: []string{"partition"},
PartitionIDs: []int64{111},
PartitionCreatedTimestamps: []uint64{111111},
}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return string(value), nil
}
snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error {
return errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
err = kc.CreatePartition(ctx, &model.Partition{}, 0)
assert.Error(t, err)
snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error {
return nil
}
err = kc.CreatePartition(ctx, &model.Partition{}, 0)
assert.NoError(t, err)
})
}
func TestCatalog_CreateAliasV2(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
err := kc.CreateAlias(ctx, &model.Alias{}, 0)
assert.Error(t, err)
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return nil
}
err = kc.CreateAlias(ctx, &model.Alias{}, 0)
assert.NoError(t, err)
}
func TestCatalog_listPartitionsAfter210(t *testing.T) {
t.Run("load failed", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
_, err := kc.listPartitionsAfter210(ctx, 1, 0)
assert.Error(t, err)
})
t.Run("not in pb format", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return []string{"key"}, []string{"not in pb format"}, nil
}
kc := Catalog{Snapshot: snapshot}
_, err := kc.listPartitionsAfter210(ctx, 1, 0)
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
ctx := context.Background()
partition := &pb.PartitionInfo{PartitionID: 100}
value, err := proto.Marshal(partition)
assert.NoError(t, err)
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return []string{"key"}, []string{string(value)}, nil
}
kc := Catalog{Snapshot: snapshot}
got, err := kc.listPartitionsAfter210(ctx, 1, 0)
assert.NoError(t, err)
assert.Equal(t, 1, len(got))
assert.Equal(t, int64(100), got[0].PartitionID)
})
}
func Test_fieldVersionAfter210(t *testing.T) {
coll := &pb.CollectionInfo{Schema: &schemapb.CollectionSchema{}}
assert.True(t, fieldVersionAfter210(coll))
coll.Schema.Fields = []*schemapb.FieldSchema{{FieldID: 101}}
assert.False(t, fieldVersionAfter210(coll))
}
func TestCatalog_listFieldsAfter210(t *testing.T) {
t.Run("load failed", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
_, err := kc.listFieldsAfter210(ctx, 1, 0)
assert.Error(t, err)
})
t.Run("not in pb format", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return []string{"key"}, []string{"not in pb format"}, nil
}
kc := Catalog{Snapshot: snapshot}
_, err := kc.listFieldsAfter210(ctx, 1, 0)
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
ctx := context.Background()
field := &schemapb.FieldSchema{FieldID: 101}
value, err := proto.Marshal(field)
assert.NoError(t, err)
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return []string{"key"}, []string{string(value)}, nil
}
kc := Catalog{Snapshot: snapshot}
got, err := kc.listFieldsAfter210(ctx, 1, 0)
assert.NoError(t, err)
assert.Equal(t, 1, len(got))
assert.Equal(t, int64(101), got[0].FieldID)
})
}
func TestCatalog_AlterAliasV2(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
err := kc.AlterAlias(ctx, &model.Alias{}, 0)
assert.Error(t, err)
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return nil
}
err = kc.AlterAlias(ctx, &model.Alias{}, 0)
assert.NoError(t, err)
}
func Test_dropPartition(t *testing.T) {
t.Run("nil, won't panic", func(t *testing.T) {
dropPartition(nil, 1)
})
t.Run("not exist", func(t *testing.T) {
coll := &pb.CollectionInfo{}
dropPartition(coll, 100)
})
t.Run("in partition ids", func(t *testing.T) {
coll := &pb.CollectionInfo{
PartitionIDs: []int64{100, 101, 102, 103, 104},
PartitionNames: []string{"0", "1", "2", "3", "4"},
PartitionCreatedTimestamps: []uint64{1, 2, 3, 4, 5},
}
dropPartition(coll, 100)
assert.Equal(t, 4, len(coll.GetPartitionIDs()))
dropPartition(coll, 104)
assert.Equal(t, 3, len(coll.GetPartitionIDs()))
dropPartition(coll, 102)
assert.Equal(t, 2, len(coll.GetPartitionIDs()))
dropPartition(coll, 103)
assert.Equal(t, 1, len(coll.GetPartitionIDs()))
dropPartition(coll, 101)
assert.Equal(t, 0, len(coll.GetPartitionIDs()))
})
}
func TestCatalog_DropPartitionV2(t *testing.T) {
t.Run("failed to load collection", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return "", errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
err := kc.DropPartition(ctx, 100, 101, 0)
assert.Error(t, err)
})
t.Run("partition version after 210", func(t *testing.T) {
ctx := context.Background()
coll := &pb.CollectionInfo{}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return string(value), nil
}
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
err = kc.DropPartition(ctx, 100, 101, 0)
assert.Error(t, err)
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return nil
}
err = kc.DropPartition(ctx, 100, 101, 0)
assert.NoError(t, err)
})
t.Run("partition before 210", func(t *testing.T) {
ctx := context.Background()
coll := &pb.CollectionInfo{
PartitionIDs: []int64{101, 102},
PartitionNames: []string{"partition1", "partition2"},
PartitionCreatedTimestamps: []uint64{101, 102},
}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return string(value), nil
}
snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error {
return errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
err = kc.DropPartition(ctx, 100, 101, 0)
assert.Error(t, err)
snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error {
return nil
}
err = kc.DropPartition(ctx, 100, 102, 0)
assert.NoError(t, err)
})
}
func TestCatalog_DropAliasV2(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
err := kc.DropAlias(ctx, "alias", 0)
assert.Error(t, err)
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return nil
}
err = kc.DropAlias(ctx, "alias", 0)
assert.NoError(t, err)
}
func TestCatalog_listAliasesBefore210(t *testing.T) {
t.Run("load failed", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
_, err := kc.listAliasesBefore210(ctx, 0)
assert.Error(t, err)
})
t.Run("not in pb format", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return []string{"key"}, []string{"not in pb format"}, nil
}
kc := Catalog{Snapshot: snapshot}
_, err := kc.listAliasesBefore210(ctx, 0)
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
ctx := context.Background()
coll := &pb.CollectionInfo{ID: 100}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return []string{"key"}, []string{string(value)}, nil
}
kc := Catalog{Snapshot: snapshot}
got, err := kc.listAliasesBefore210(ctx, 0)
assert.NoError(t, err)
assert.Equal(t, 1, len(got))
assert.Equal(t, int64(100), got[0].CollectionID)
})
}
func TestCatalog_listAliasesAfter210(t *testing.T) {
t.Run("load failed", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return nil, nil, errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
_, err := kc.listAliasesAfter210(ctx, 0)
assert.Error(t, err)
})
t.Run("not in pb format", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return []string{"key"}, []string{"not in pb format"}, nil
}
kc := Catalog{Snapshot: snapshot}
_, err := kc.listAliasesAfter210(ctx, 0)
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
ctx := context.Background()
coll := &pb.AliasInfo{CollectionId: 100}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return []string{"key"}, []string{string(value)}, nil
}
kc := Catalog{Snapshot: snapshot}
got, err := kc.listAliasesAfter210(ctx, 0)
assert.NoError(t, err)
assert.Equal(t, 1, len(got))
assert.Equal(t, int64(100), got[0].CollectionID)
})
}
func TestCatalog_ListAliasesV2(t *testing.T) {
t.Run("failed to list aliases before 210", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
return []string{"key"}, []string{"not in pb format"}, nil
}
kc := Catalog{Snapshot: snapshot}
_, err := kc.ListAliases(ctx, 0)
assert.Error(t, err)
})
t.Run("failed to list aliases after 210", func(t *testing.T) {
ctx := context.Background()
coll := &pb.CollectionInfo{ID: 100, ShardsNum: 50}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
if key == AliasMetaPrefix {
return nil, nil, errors.New("mock")
}
return []string{"key"}, []string{string(value)}, nil
}
kc := Catalog{Snapshot: snapshot}
_, err = kc.ListAliases(ctx, 0)
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
ctx := context.Background()
coll := &pb.CollectionInfo{Schema: &schemapb.CollectionSchema{Name: "alias1"}, ID: 100, ShardsNum: 50}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
alias := &pb.AliasInfo{CollectionId: 101, AliasName: "alias2"}
value2, err := proto.Marshal(alias)
assert.NoError(t, err)
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
if key == AliasMetaPrefix {
return []string{"key1"}, []string{string(value2)}, nil
}
return []string{"key"}, []string{string(value)}, nil
}
kc := Catalog{Snapshot: snapshot}
got, err := kc.ListAliases(ctx, 0)
assert.NoError(t, err)
assert.Equal(t, 2, len(got))
assert.Equal(t, "alias1", got[0].Name)
assert.Equal(t, "alias2", got[1].Name)
})
}
func Test_buildKvs(t *testing.T) {
t.Run("length not equal", func(t *testing.T) {
keys := []string{"k1", "k2"}
values := []string{"v1"}
_, err := buildKvs(keys, values)
assert.Error(t, err)
})
t.Run("duplicate", func(t *testing.T) {
keys := []string{"k1", "k1"}
values := []string{"v1", "v2"}
_, err := buildKvs(keys, values)
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
keys := []string{"k1", "k2"}
values := []string{"v1", "v2"}
kvs, err := buildKvs(keys, values)
assert.NoError(t, err)
for i, k := range keys {
v, ok := kvs[k]
assert.True(t, ok)
assert.Equal(t, values[i], v)
}
})
}
func Test_batchSave(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
snapshot := kv.NewMockSnapshotKV()
snapshot.MultiSaveFunc = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
kvs := map[string]string{
"k1": "v1",
"k2": "v2",
"k3": "v3",
}
maxTxnNum := 2
err := batchSave(snapshot, maxTxnNum, kvs, 100)
assert.NoError(t, err)
})
t.Run("multi save failed", func(t *testing.T) {
snapshot := kv.NewMockSnapshotKV()
snapshot.MultiSaveFunc = func(kvs map[string]string, ts typeutil.Timestamp) error {
return errors.New("mock")
}
kvs := map[string]string{
"k1": "v1",
"k2": "v2",
"k3": "v3",
}
maxTxnNum := 2
err := batchSave(snapshot, maxTxnNum, kvs, 100)
assert.Error(t, err)
})
}

View File

@ -7,6 +7,10 @@ const (
// CollectionMetaPrefix prefix for collection meta
CollectionMetaPrefix = ComponentPrefix + "/collection"
PartitionMetaPrefix = ComponentPrefix + "/partitions"
AliasMetaPrefix = ComponentPrefix + "/aliases"
FieldMetaPrefix = ComponentPrefix + "/fields"
// SegmentIndexMetaPrefix prefix for segment index meta
SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index"

View File

@ -0,0 +1,25 @@
package model
import pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
type Alias struct {
Name string
CollectionID int64
CreatedTime uint64
}
func MarshalAliasModel(alias *Alias) *pb.AliasInfo {
return &pb.AliasInfo{
AliasName: alias.Name,
CollectionId: alias.CollectionID,
CreatedTime: alias.CreatedTime,
}
}
func UnmarshalAliasModel(info *pb.AliasInfo) *Alias {
return &Alias{
Name: info.GetAliasName(),
CollectionID: info.GetCollectionId(),
CreatedTime: info.GetCreatedTime(),
}
}

View File

@ -22,7 +22,7 @@ type Collection struct {
StartPositions []*commonpb.KeyDataPair
CreateTime uint64
ConsistencyLevel commonpb.ConsistencyLevel
Aliases []string
Aliases []string // TODO: deprecate this.
Extra map[string]string // extra kvs
}
@ -53,24 +53,12 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection {
}
// backward compatible for deprecated fields
var partitions []*Partition
if len(coll.Partitions) != 0 {
partitions = make([]*Partition, len(coll.Partitions))
for idx, partition := range coll.Partitions {
partitions[idx] = &Partition{
PartitionID: partition.GetPartitionID(),
PartitionName: partition.GetPartitionName(),
PartitionCreatedTimestamp: partition.GetPartitionCreatedTimestamp(),
}
}
} else {
partitions = make([]*Partition, len(coll.PartitionIDs))
for idx := range coll.PartitionIDs {
partitions[idx] = &Partition{
PartitionID: coll.PartitionIDs[idx],
PartitionName: coll.PartitionNames[idx],
PartitionCreatedTimestamp: coll.PartitionCreatedTimestamps[idx],
}
partitions := make([]*Partition, len(coll.PartitionIDs))
for idx := range coll.PartitionIDs {
partitions[idx] = &Partition{
PartitionID: coll.PartitionIDs[idx],
PartitionName: coll.PartitionNames[idx],
PartitionCreatedTimestamp: coll.PartitionCreatedTimestamps[idx],
}
}
@ -87,7 +75,7 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection {
Name: coll.Schema.Name,
Description: coll.Schema.Description,
AutoID: coll.Schema.AutoID,
Fields: UnmarshalFieldModels(coll.Schema.Fields),
Fields: UnmarshalFieldModels(coll.GetSchema().GetFields()),
Partitions: partitions,
FieldIDToIndexID: filedIDToIndexIDs,
VirtualChannelNames: coll.VirtualChannelNames,
@ -99,29 +87,17 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection {
}
}
// MarshalCollectionModel marshal only collection-related information.
// partitions, aliases and fields won't be marshaled. They should be written to newly path.
func MarshalCollectionModel(coll *Collection) *pb.CollectionInfo {
if coll == nil {
return nil
}
fields := make([]*schemapb.FieldSchema, len(coll.Fields))
for idx, field := range coll.Fields {
fields[idx] = &schemapb.FieldSchema{
FieldID: field.FieldID,
Name: field.Name,
IsPrimaryKey: field.IsPrimaryKey,
Description: field.Description,
DataType: field.DataType,
TypeParams: field.TypeParams,
IndexParams: field.IndexParams,
AutoID: field.AutoID,
}
}
collSchema := &schemapb.CollectionSchema{
Name: coll.Name,
Description: coll.Description,
AutoID: coll.AutoID,
Fields: fields,
}
partitions := make([]*pb.PartitionInfo, len(coll.Partitions))
@ -140,10 +116,10 @@ func MarshalCollectionModel(coll *Collection) *pb.CollectionInfo {
IndexID: tuple.Value,
}
}
return &pb.CollectionInfo{
ID: coll.CollectionID,
Schema: collSchema,
Partitions: partitions,
FieldIndexes: fieldIndexes,
CreateTime: coll.CreateTime,
VirtualChannelNames: coll.VirtualChannelNames,

View File

@ -87,35 +87,6 @@ var (
StartPositions: startPositions,
ConsistencyLevel: commonpb.ConsistencyLevel_Strong,
}
newColPb = &pb.CollectionInfo{
ID: colID,
Schema: &schemapb.CollectionSchema{
Name: colName,
Description: "none",
AutoID: false,
Fields: []*schemapb.FieldSchema{filedSchemaPb},
},
CreateTime: 1,
Partitions: []*pb.PartitionInfo{
{
PartitionID: partID,
PartitionName: partName,
PartitionCreatedTimestamp: 1,
},
},
FieldIndexes: []*pb.FieldIndexInfo{
{
FiledID: fieldID,
IndexID: indexID,
},
},
VirtualChannelNames: []string{"vch"},
PhysicalChannelNames: []string{"pch"},
ShardsNum: 1,
StartPositions: startPositions,
ConsistencyLevel: commonpb.ConsistencyLevel_Strong,
}
)
func TestUnmarshalCollectionModel(t *testing.T) {
@ -123,16 +94,9 @@ func TestUnmarshalCollectionModel(t *testing.T) {
ret.TenantID = tenantID
assert.Equal(t, ret, colModel)
ret = UnmarshalCollectionModel(newColPb)
ret.TenantID = tenantID
assert.Equal(t, ret, colModel)
assert.Nil(t, UnmarshalCollectionModel(nil))
}
func TestMarshalCollectionModel(t *testing.T) {
ret := MarshalCollectionModel(colModel)
assert.Equal(t, ret, newColPb)
assert.Nil(t, MarshalCollectionModel(nil))
}

View File

@ -1,8 +1,29 @@
package model
import pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
type Partition struct {
PartitionID int64
PartitionName string
PartitionCreatedTimestamp uint64
Extra map[string]string
CollectionID int64
}
func MarshalPartitionModel(partition *Partition) *pb.PartitionInfo {
return &pb.PartitionInfo{
PartitionID: partition.PartitionID,
PartitionName: partition.PartitionName,
PartitionCreatedTimestamp: partition.PartitionCreatedTimestamp,
CollectionId: partition.CollectionID,
}
}
func UnmarshalPartitionModel(info *pb.PartitionInfo) *Partition {
return &Partition{
PartitionID: info.GetPartitionID(),
PartitionName: info.GetPartitionName(),
PartitionCreatedTimestamp: info.GetPartitionCreatedTimestamp(),
CollectionID: info.GetCollectionId(),
}
}

View File

@ -36,13 +36,19 @@ message CollectionInfo {
int32 shards_num = 10;
repeated common.KeyDataPair start_positions = 11;
common.ConsistencyLevel consistency_level = 12;
repeated PartitionInfo partitions = 13;
}
message PartitionInfo {
int64 partitionID = 1;
string partitionName = 2;
uint64 partition_created_timestamp = 3;
int64 collection_id = 4;
}
message AliasInfo {
string alias_name = 1;
int64 collection_id = 2;
uint64 created_time = 3;
}
message SegmentIndexInfo {

View File

@ -156,7 +156,6 @@ type CollectionInfo struct {
ShardsNum int32 `protobuf:"varint,10,opt,name=shards_num,json=shardsNum,proto3" json:"shards_num,omitempty"`
StartPositions []*commonpb.KeyDataPair `protobuf:"bytes,11,rep,name=start_positions,json=startPositions,proto3" json:"start_positions,omitempty"`
ConsistencyLevel commonpb.ConsistencyLevel `protobuf:"varint,12,opt,name=consistency_level,json=consistencyLevel,proto3,enum=milvus.proto.common.ConsistencyLevel" json:"consistency_level,omitempty"`
Partitions []*PartitionInfo `protobuf:"bytes,13,rep,name=partitions,proto3" json:"partitions,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -271,17 +270,11 @@ func (m *CollectionInfo) GetConsistencyLevel() commonpb.ConsistencyLevel {
return commonpb.ConsistencyLevel_Strong
}
func (m *CollectionInfo) GetPartitions() []*PartitionInfo {
if m != nil {
return m.Partitions
}
return nil
}
type PartitionInfo struct {
PartitionID int64 `protobuf:"varint,1,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
PartitionName string `protobuf:"bytes,2,opt,name=partitionName,proto3" json:"partitionName,omitempty"`
PartitionCreatedTimestamp uint64 `protobuf:"varint,3,opt,name=partition_created_timestamp,json=partitionCreatedTimestamp,proto3" json:"partition_created_timestamp,omitempty"`
CollectionId int64 `protobuf:"varint,4,opt,name=collection_id,json=collectionId,proto3" json:"collection_id,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -333,6 +326,68 @@ func (m *PartitionInfo) GetPartitionCreatedTimestamp() uint64 {
return 0
}
func (m *PartitionInfo) GetCollectionId() int64 {
if m != nil {
return m.CollectionId
}
return 0
}
type AliasInfo struct {
AliasName string `protobuf:"bytes,1,opt,name=alias_name,json=aliasName,proto3" json:"alias_name,omitempty"`
CollectionId int64 `protobuf:"varint,2,opt,name=collection_id,json=collectionId,proto3" json:"collection_id,omitempty"`
CreatedTime uint64 `protobuf:"varint,3,opt,name=created_time,json=createdTime,proto3" json:"created_time,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *AliasInfo) Reset() { *m = AliasInfo{} }
func (m *AliasInfo) String() string { return proto.CompactTextString(m) }
func (*AliasInfo) ProtoMessage() {}
func (*AliasInfo) Descriptor() ([]byte, []int) {
return fileDescriptor_975d306d62b73e88, []int{4}
}
func (m *AliasInfo) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_AliasInfo.Unmarshal(m, b)
}
func (m *AliasInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_AliasInfo.Marshal(b, m, deterministic)
}
func (m *AliasInfo) XXX_Merge(src proto.Message) {
xxx_messageInfo_AliasInfo.Merge(m, src)
}
func (m *AliasInfo) XXX_Size() int {
return xxx_messageInfo_AliasInfo.Size(m)
}
func (m *AliasInfo) XXX_DiscardUnknown() {
xxx_messageInfo_AliasInfo.DiscardUnknown(m)
}
var xxx_messageInfo_AliasInfo proto.InternalMessageInfo
func (m *AliasInfo) GetAliasName() string {
if m != nil {
return m.AliasName
}
return ""
}
func (m *AliasInfo) GetCollectionId() int64 {
if m != nil {
return m.CollectionId
}
return 0
}
func (m *AliasInfo) GetCreatedTime() uint64 {
if m != nil {
return m.CreatedTime
}
return 0
}
type SegmentIndexInfo struct {
CollectionID int64 `protobuf:"varint,1,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,2,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
@ -351,7 +406,7 @@ func (m *SegmentIndexInfo) Reset() { *m = SegmentIndexInfo{} }
func (m *SegmentIndexInfo) String() string { return proto.CompactTextString(m) }
func (*SegmentIndexInfo) ProtoMessage() {}
func (*SegmentIndexInfo) Descriptor() ([]byte, []int) {
return fileDescriptor_975d306d62b73e88, []int{4}
return fileDescriptor_975d306d62b73e88, []int{5}
}
func (m *SegmentIndexInfo) XXX_Unmarshal(b []byte) error {
@ -445,7 +500,7 @@ func (m *CollectionMeta) Reset() { *m = CollectionMeta{} }
func (m *CollectionMeta) String() string { return proto.CompactTextString(m) }
func (*CollectionMeta) ProtoMessage() {}
func (*CollectionMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_975d306d62b73e88, []int{5}
return fileDescriptor_975d306d62b73e88, []int{6}
}
func (m *CollectionMeta) XXX_Unmarshal(b []byte) error {
@ -525,7 +580,7 @@ func (m *CredentialInfo) Reset() { *m = CredentialInfo{} }
func (m *CredentialInfo) String() string { return proto.CompactTextString(m) }
func (*CredentialInfo) ProtoMessage() {}
func (*CredentialInfo) Descriptor() ([]byte, []int) {
return fileDescriptor_975d306d62b73e88, []int{6}
return fileDescriptor_975d306d62b73e88, []int{7}
}
func (m *CredentialInfo) XXX_Unmarshal(b []byte) error {
@ -586,6 +641,7 @@ func init() {
proto.RegisterType((*FieldIndexInfo)(nil), "milvus.proto.etcd.FieldIndexInfo")
proto.RegisterType((*CollectionInfo)(nil), "milvus.proto.etcd.CollectionInfo")
proto.RegisterType((*PartitionInfo)(nil), "milvus.proto.etcd.PartitionInfo")
proto.RegisterType((*AliasInfo)(nil), "milvus.proto.etcd.AliasInfo")
proto.RegisterType((*SegmentIndexInfo)(nil), "milvus.proto.etcd.SegmentIndexInfo")
proto.RegisterType((*CollectionMeta)(nil), "milvus.proto.etcd.CollectionMeta")
proto.RegisterType((*CredentialInfo)(nil), "milvus.proto.etcd.CredentialInfo")
@ -594,58 +650,59 @@ func init() {
func init() { proto.RegisterFile("etcd_meta.proto", fileDescriptor_975d306d62b73e88) }
var fileDescriptor_975d306d62b73e88 = []byte{
// 837 bytes of a gzipped FileDescriptorProto
// 861 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x55, 0xdd, 0x6e, 0xe3, 0x44,
0x14, 0x96, 0xe3, 0xe6, 0xc7, 0x27, 0x69, 0xba, 0x1d, 0x60, 0x35, 0x5b, 0x16, 0xf0, 0x46, 0x2c,
0xf8, 0x66, 0x5b, 0xd1, 0x05, 0xee, 0x40, 0x2b, 0x6a, 0xad, 0x14, 0x01, 0xab, 0x68, 0x5a, 0x71,
0xc1, 0x8d, 0x35, 0xb1, 0x4f, 0x9b, 0x91, 0xec, 0xb1, 0xe5, 0x19, 0x17, 0xf2, 0x06, 0xbc, 0x01,
0x8f, 0xc2, 0x2d, 0x37, 0x3c, 0x0d, 0x2f, 0x81, 0x3c, 0xfe, 0x89, 0x9d, 0xb4, 0x5c, 0xee, 0x5d,
0xce, 0x77, 0xe6, 0x9c, 0xcc, 0x77, 0xce, 0x37, 0x9f, 0xe1, 0x04, 0x75, 0x18, 0x05, 0x09, 0x6a,
0x7e, 0x9e, 0xe5, 0xa9, 0x4e, 0xc9, 0x69, 0x22, 0xe2, 0xfb, 0x42, 0x55, 0xd1, 0x79, 0x99, 0x3d,
0x9b, 0x85, 0x69, 0x92, 0xa4, 0xb2, 0x82, 0xce, 0x66, 0x2a, 0xdc, 0x60, 0x52, 0x1f, 0x5f, 0xfc,
0x63, 0x81, 0xb3, 0x94, 0x11, 0xfe, 0xbe, 0x94, 0xb7, 0x29, 0xf9, 0x04, 0x40, 0x94, 0x41, 0x20,
0x79, 0x82, 0xd4, 0x72, 0x2d, 0xcf, 0x61, 0x8e, 0x41, 0xde, 0xf1, 0x04, 0x09, 0x85, 0xb1, 0x09,
0x96, 0x3e, 0x1d, 0xb8, 0x96, 0x67, 0xb3, 0x26, 0x24, 0x3e, 0xcc, 0xaa, 0xc2, 0x8c, 0xe7, 0x3c,
0x51, 0xd4, 0x76, 0x6d, 0x6f, 0x7a, 0xf9, 0xe2, 0xbc, 0x77, 0x99, 0xfa, 0x1a, 0x3f, 0xe2, 0xf6,
0x17, 0x1e, 0x17, 0xb8, 0xe2, 0x22, 0x67, 0x53, 0x53, 0xb6, 0x32, 0x55, 0x65, 0xff, 0x08, 0x63,
0xd4, 0x18, 0xd1, 0x23, 0xd7, 0xf2, 0x26, 0xac, 0x09, 0xc9, 0x67, 0x30, 0x0d, 0x73, 0xe4, 0x1a,
0x03, 0x2d, 0x12, 0xa4, 0x43, 0xd7, 0xf2, 0x8e, 0x18, 0x54, 0xd0, 0x8d, 0x48, 0x70, 0xe1, 0xc3,
0xfc, 0xad, 0xc0, 0x38, 0xda, 0x71, 0xa1, 0x30, 0xbe, 0x15, 0x31, 0x46, 0x4b, 0xdf, 0x10, 0xb1,
0x59, 0x13, 0x3e, 0x4e, 0x63, 0xf1, 0xf7, 0x10, 0xe6, 0x57, 0x69, 0x1c, 0x63, 0xa8, 0x45, 0x2a,
0x4d, 0x9b, 0x39, 0x0c, 0xda, 0x0e, 0x83, 0xa5, 0x4f, 0xbe, 0x83, 0x51, 0x35, 0x40, 0x53, 0x3b,
0xbd, 0x7c, 0xd9, 0xe7, 0x58, 0x0f, 0x77, 0xd7, 0xe4, 0xda, 0x00, 0xac, 0x2e, 0xda, 0x27, 0x62,
0xef, 0x13, 0x21, 0x0b, 0x98, 0x65, 0x3c, 0xd7, 0xc2, 0x5c, 0xc0, 0x57, 0xf4, 0xc8, 0xb5, 0x3d,
0x9b, 0xf5, 0x30, 0xf2, 0x05, 0xcc, 0xdb, 0xb8, 0x5c, 0x8c, 0xa2, 0x43, 0xd7, 0xf6, 0x1c, 0xb6,
0x87, 0x92, 0xb7, 0x70, 0x7c, 0x5b, 0x0e, 0x25, 0x30, 0xfc, 0x50, 0xd1, 0xd1, 0x43, 0x6b, 0x29,
0x35, 0x72, 0xde, 0x1f, 0x1e, 0x9b, 0xdd, 0xb6, 0x31, 0x2a, 0x72, 0x09, 0x1f, 0xdd, 0x8b, 0x5c,
0x17, 0x3c, 0x0e, 0xc2, 0x0d, 0x97, 0x12, 0x63, 0x23, 0x10, 0x45, 0xc7, 0xe6, 0x6f, 0x3f, 0xa8,
0x93, 0x57, 0x55, 0xae, 0xfa, 0xef, 0xaf, 0xe1, 0x69, 0xb6, 0xd9, 0x2a, 0x11, 0x1e, 0x14, 0x4d,
0x4c, 0xd1, 0x87, 0x4d, 0xb6, 0x57, 0xf5, 0x06, 0x9e, 0xb7, 0x1c, 0x82, 0x6a, 0x2a, 0x91, 0x99,
0x94, 0xd2, 0x3c, 0xc9, 0x14, 0x75, 0x5c, 0xdb, 0x3b, 0x62, 0x67, 0xed, 0x99, 0xab, 0xea, 0xc8,
0x4d, 0x7b, 0xa2, 0x94, 0xb0, 0xda, 0xf0, 0x3c, 0x52, 0x81, 0x2c, 0x12, 0x0a, 0xae, 0xe5, 0x0d,
0x99, 0x53, 0x21, 0xef, 0x8a, 0x84, 0x2c, 0xe1, 0x44, 0x69, 0x9e, 0xeb, 0x20, 0x4b, 0x95, 0xe9,
0xa0, 0xe8, 0xd4, 0x0c, 0xc5, 0x7d, 0x4c, 0xab, 0x3e, 0xd7, 0xdc, 0x48, 0x75, 0x6e, 0x0a, 0x57,
0x4d, 0x1d, 0x61, 0x70, 0x1a, 0xa6, 0x52, 0x09, 0xa5, 0x51, 0x86, 0xdb, 0x20, 0xc6, 0x7b, 0x8c,
0xe9, 0xcc, 0xb5, 0xbc, 0xf9, 0xbe, 0x28, 0xea, 0x66, 0x57, 0xbb, 0xd3, 0x3f, 0x95, 0x87, 0xd9,
0x93, 0x70, 0x0f, 0x21, 0x6f, 0x00, 0x5a, 0x6e, 0x8a, 0x1e, 0x3f, 0x74, 0x33, 0xb3, 0xae, 0x55,
0x2b, 0x87, 0x72, 0x5b, 0x9d, 0x9a, 0xc5, 0x9f, 0x16, 0x1c, 0xf7, 0xb2, 0xc4, 0x85, 0x69, 0x47,
0x3d, 0xb5, 0x94, 0xbb, 0x10, 0xf9, 0x1c, 0x8e, 0x7b, 0xca, 0x31, 0xd2, 0x76, 0x58, 0x1f, 0x24,
0xdf, 0xc3, 0xc7, 0xff, 0xb3, 0x9b, 0x5a, 0xca, 0xcf, 0x1e, 0x5d, 0xcd, 0xe2, 0x8f, 0x01, 0x3c,
0xb9, 0xc6, 0xbb, 0x04, 0xa5, 0xde, 0xbd, 0xd2, 0x05, 0xcc, 0xc2, 0xdd, 0x83, 0x6b, 0x6e, 0xd7,
0xc3, 0xf6, 0x09, 0x0c, 0x0e, 0x09, 0x3c, 0x07, 0x47, 0xd5, 0x9d, 0x7d, 0x73, 0x11, 0x9b, 0xed,
0x80, 0xca, 0x09, 0x4a, 0x39, 0xfb, 0xc6, 0x56, 0x8c, 0x13, 0x98, 0xb0, 0xeb, 0x04, 0xc3, 0xbe,
0xa1, 0x51, 0x18, 0xaf, 0x0b, 0x61, 0x6a, 0x46, 0x55, 0xa6, 0x0e, 0xc9, 0x0b, 0x98, 0xa1, 0xe4,
0xeb, 0x18, 0xab, 0x57, 0x45, 0xc7, 0xc6, 0xa9, 0xa6, 0x15, 0x66, 0x88, 0xed, 0x3f, 0xf2, 0xc9,
0x81, 0x5b, 0xfd, 0x6b, 0x75, 0x7d, 0xe6, 0x67, 0xd4, 0xfc, 0xbd, 0xfb, 0xcc, 0xa7, 0x00, 0xed,
0x84, 0x1a, 0x97, 0xe9, 0x20, 0xe4, 0x65, 0xc7, 0x63, 0x02, 0xcd, 0xef, 0x1a, 0x8f, 0xd9, 0x89,
0xe2, 0x86, 0xdf, 0xa9, 0x03, 0xbb, 0x1a, 0x1d, 0xda, 0xd5, 0xe2, 0xaf, 0x92, 0x6d, 0x8e, 0x11,
0x4a, 0x2d, 0x78, 0x6c, 0xd6, 0x7e, 0x06, 0x93, 0x42, 0x61, 0xde, 0xf9, 0xcc, 0xb4, 0x31, 0x79,
0x05, 0x04, 0x65, 0x98, 0x6f, 0xb3, 0x52, 0x5f, 0x19, 0x57, 0xea, 0xb7, 0x34, 0x8f, 0x6a, 0x49,
0x9e, 0xb6, 0x99, 0x55, 0x9d, 0x20, 0x4f, 0x61, 0xa4, 0x51, 0x72, 0xa9, 0x0d, 0x49, 0x87, 0xd5,
0x11, 0x79, 0x06, 0x13, 0xa1, 0x02, 0x55, 0x64, 0x98, 0x37, 0x5f, 0x13, 0xa1, 0xae, 0xcb, 0x90,
0x7c, 0x09, 0x27, 0x6a, 0xc3, 0x2f, 0xbf, 0xf9, 0x76, 0xd7, 0x7e, 0x68, 0x6a, 0xe7, 0x15, 0xdc,
0xf4, 0xfe, 0xe1, 0xf5, 0xaf, 0x5f, 0xdd, 0x09, 0xbd, 0x29, 0xd6, 0xe5, 0x13, 0xbe, 0xa8, 0x16,
0xf0, 0x4a, 0xa4, 0xf5, 0xaf, 0x0b, 0x21, 0x75, 0x79, 0xe7, 0xf8, 0xc2, 0xec, 0xe4, 0xa2, 0x7c,
0x99, 0xd9, 0x7a, 0x3d, 0x32, 0xd1, 0xeb, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x26, 0x75, 0x94,
0x72, 0x9b, 0x07, 0x00, 0x00,
0x14, 0x96, 0xe3, 0xfc, 0xf9, 0x24, 0x4d, 0xb7, 0x03, 0xac, 0xbc, 0x65, 0x01, 0x6f, 0x60, 0xc1,
0x37, 0xdb, 0x8a, 0x2e, 0x70, 0x07, 0x02, 0x6a, 0xad, 0x14, 0x01, 0x55, 0x34, 0xad, 0xb8, 0xe0,
0xc6, 0x9a, 0xd8, 0xa7, 0xcd, 0x48, 0xfe, 0x93, 0x67, 0x5c, 0xe8, 0x1b, 0xf0, 0x46, 0xdc, 0x70,
0xcb, 0xd3, 0xf0, 0x0e, 0x08, 0xcd, 0x78, 0xec, 0xd8, 0x09, 0xe5, 0x72, 0xef, 0xf2, 0x7d, 0x33,
0xe7, 0xf8, 0xfc, 0x7c, 0xf3, 0x05, 0x8e, 0x51, 0x46, 0x71, 0x98, 0xa2, 0x64, 0x67, 0x45, 0x99,
0xcb, 0x9c, 0x9c, 0xa4, 0x3c, 0xb9, 0xaf, 0x44, 0x8d, 0xce, 0xd4, 0xe9, 0xe9, 0x3c, 0xca, 0xd3,
0x34, 0xcf, 0x6a, 0xea, 0x74, 0x2e, 0xa2, 0x2d, 0xa6, 0xe6, 0xfa, 0xf2, 0x2f, 0x0b, 0x9c, 0x55,
0x16, 0xe3, 0x6f, 0xab, 0xec, 0x36, 0x27, 0x1f, 0x00, 0x70, 0x05, 0xc2, 0x8c, 0xa5, 0xe8, 0x5a,
0x9e, 0xe5, 0x3b, 0xd4, 0xd1, 0xcc, 0x15, 0x4b, 0x91, 0xb8, 0x30, 0xd1, 0x60, 0x15, 0xb8, 0x03,
0xcf, 0xf2, 0x6d, 0xda, 0x40, 0x12, 0xc0, 0xbc, 0x0e, 0x2c, 0x58, 0xc9, 0x52, 0xe1, 0xda, 0x9e,
0xed, 0xcf, 0x2e, 0x5e, 0x9c, 0xf5, 0x8a, 0x31, 0x65, 0xfc, 0x80, 0x0f, 0x3f, 0xb3, 0xa4, 0xc2,
0x35, 0xe3, 0x25, 0x9d, 0xe9, 0xb0, 0xb5, 0x8e, 0x52, 0xf9, 0x63, 0x4c, 0x50, 0x62, 0xec, 0x0e,
0x3d, 0xcb, 0x9f, 0xd2, 0x06, 0x92, 0x8f, 0x60, 0x16, 0x95, 0xc8, 0x24, 0x86, 0x92, 0xa7, 0xe8,
0x8e, 0x3c, 0xcb, 0x1f, 0x52, 0xa8, 0xa9, 0x1b, 0x9e, 0xe2, 0x32, 0x80, 0xc5, 0x1b, 0x8e, 0x49,
0xbc, 0xeb, 0xc5, 0x85, 0xc9, 0x2d, 0x4f, 0x30, 0x5e, 0x05, 0xba, 0x11, 0x9b, 0x36, 0xf0, 0xf1,
0x36, 0x96, 0xff, 0x0c, 0x61, 0x71, 0x99, 0x27, 0x09, 0x46, 0x92, 0xe7, 0x99, 0x4e, 0xb3, 0x80,
0x41, 0x9b, 0x61, 0xb0, 0x0a, 0xc8, 0xd7, 0x30, 0xae, 0x07, 0xa8, 0x63, 0x67, 0x17, 0x2f, 0xfb,
0x3d, 0x9a, 0xe1, 0xee, 0x92, 0x5c, 0x6b, 0x82, 0x9a, 0xa0, 0xfd, 0x46, 0xec, 0xfd, 0x46, 0xc8,
0x12, 0xe6, 0x05, 0x2b, 0x25, 0xd7, 0x05, 0x04, 0xc2, 0x1d, 0x7a, 0xb6, 0x6f, 0xd3, 0x1e, 0x47,
0x3e, 0x85, 0x45, 0x8b, 0xd5, 0x62, 0x84, 0x3b, 0xf2, 0x6c, 0xdf, 0xa1, 0x7b, 0x2c, 0x79, 0x03,
0x47, 0xb7, 0x6a, 0x28, 0xa1, 0xee, 0x0f, 0x85, 0x3b, 0xfe, 0xaf, 0xb5, 0x28, 0x8d, 0x9c, 0xf5,
0x87, 0x47, 0xe7, 0xb7, 0x2d, 0x46, 0x41, 0x2e, 0xe0, 0xbd, 0x7b, 0x5e, 0xca, 0x8a, 0x25, 0x61,
0xb4, 0x65, 0x59, 0x86, 0x89, 0x16, 0x88, 0x70, 0x27, 0xfa, 0xb3, 0xef, 0x98, 0xc3, 0xcb, 0xfa,
0xac, 0xfe, 0xf6, 0x17, 0xf0, 0xb4, 0xd8, 0x3e, 0x08, 0x1e, 0x1d, 0x04, 0x4d, 0x75, 0xd0, 0xbb,
0xcd, 0x69, 0x2f, 0xea, 0x5b, 0x78, 0xde, 0xf6, 0x10, 0xd6, 0x53, 0x89, 0xf5, 0xa4, 0x84, 0x64,
0x69, 0x21, 0x5c, 0xc7, 0xb3, 0xfd, 0x21, 0x3d, 0x6d, 0xef, 0x5c, 0xd6, 0x57, 0x6e, 0xda, 0x1b,
0x4a, 0xc2, 0x62, 0xcb, 0xca, 0x58, 0x84, 0x59, 0x95, 0xba, 0xe0, 0x59, 0xfe, 0x88, 0x3a, 0x35,
0x73, 0x55, 0xa5, 0x64, 0x05, 0xc7, 0x42, 0xb2, 0x52, 0x86, 0x45, 0x2e, 0x74, 0x06, 0xe1, 0xce,
0xf4, 0x50, 0xbc, 0xc7, 0xb4, 0x1a, 0x30, 0xc9, 0xb4, 0x54, 0x17, 0x3a, 0x70, 0xdd, 0xc4, 0x11,
0x0a, 0x27, 0x51, 0x9e, 0x09, 0x2e, 0x24, 0x66, 0xd1, 0x43, 0x98, 0xe0, 0x3d, 0x26, 0xee, 0xdc,
0xb3, 0xfc, 0xc5, 0xbe, 0x28, 0x4c, 0xb2, 0xcb, 0xdd, 0xed, 0x1f, 0xd5, 0x65, 0xfa, 0x24, 0xda,
0x63, 0x96, 0x7f, 0x5a, 0x70, 0xb4, 0x6e, 0x57, 0xad, 0xf4, 0xe7, 0xc1, 0xac, 0xb3, 0x7b, 0x23,
0xc4, 0x2e, 0x45, 0x3e, 0x81, 0xa3, 0xde, 0xde, 0xb5, 0x30, 0x1d, 0xda, 0x27, 0xc9, 0x37, 0xf0,
0xfe, 0xff, 0x4c, 0xd6, 0x08, 0xf1, 0xd9, 0xa3, 0x83, 0x25, 0x1f, 0xc3, 0x51, 0xd4, 0x8a, 0x3a,
0xe4, 0xf5, 0x0b, 0xb5, 0xe9, 0x7c, 0x47, 0xae, 0xe2, 0x65, 0x09, 0xce, 0x77, 0x09, 0x67, 0xa2,
0x31, 0x13, 0xa6, 0x40, 0xcf, 0x4c, 0x34, 0xa3, 0x0b, 0x3a, 0x48, 0x38, 0x38, 0x4c, 0x48, 0x5e,
0xc0, 0xbc, 0x5b, 0xab, 0x29, 0xd3, 0x3c, 0x21, 0x5d, 0xdd, 0xf2, 0xf7, 0x01, 0x3c, 0xb9, 0xc6,
0xbb, 0x14, 0x33, 0xb9, 0x7b, 0xfc, 0x4b, 0xe8, 0xe6, 0x69, 0xc6, 0xd6, 0xe3, 0xf6, 0x27, 0x3b,
0x38, 0x9c, 0xec, 0x73, 0x70, 0x84, 0xc9, 0x1c, 0xe8, 0x4f, 0xdb, 0x74, 0x47, 0xd4, 0x06, 0xa3,
0x5e, 0x49, 0x60, 0x66, 0xd1, 0xc0, 0xae, 0xc1, 0x8c, 0xfa, 0x3e, 0xe9, 0xc2, 0x64, 0x53, 0x71,
0x1d, 0x33, 0xae, 0x4f, 0x0c, 0x54, 0x9d, 0x62, 0xc6, 0x36, 0x09, 0xd6, 0x8f, 0xd5, 0x9d, 0x68,
0x03, 0x9c, 0xd5, 0x9c, 0x6e, 0x6c, 0xdf, 0x3b, 0xa6, 0x07, 0x26, 0xf8, 0xb7, 0xd5, 0xb5, 0xaf,
0x9f, 0x50, 0xb2, 0xb7, 0x6e, 0x5f, 0x1f, 0x02, 0xb4, 0x13, 0x6a, 0xcc, 0xab, 0xc3, 0x90, 0x97,
0x1d, 0xeb, 0x0a, 0x25, 0xbb, 0x6b, 0xac, 0x6b, 0xa7, 0xd6, 0x1b, 0x76, 0x27, 0x0e, 0x5c, 0x70,
0x7c, 0xe8, 0x82, 0xcb, 0x3f, 0x54, 0xb7, 0x25, 0xc6, 0x98, 0x49, 0xce, 0x12, 0xbd, 0xf6, 0x53,
0x98, 0x56, 0x02, 0xcb, 0x8e, 0xe0, 0x5a, 0x4c, 0x5e, 0x01, 0xc1, 0x2c, 0x2a, 0x1f, 0x0a, 0x25,
0xa6, 0x82, 0x09, 0xf1, 0x6b, 0x5e, 0xc6, 0xe6, 0xad, 0x9c, 0xb4, 0x27, 0x6b, 0x73, 0x40, 0x9e,
0xc2, 0x58, 0x62, 0xc6, 0x32, 0xa9, 0x9b, 0x74, 0xa8, 0x41, 0xe4, 0x19, 0x4c, 0xb9, 0x08, 0x45,
0x55, 0x60, 0xd9, 0xfc, 0x49, 0x71, 0x71, 0xad, 0x20, 0xf9, 0x0c, 0x8e, 0xc5, 0x96, 0x5d, 0x7c,
0xf9, 0xd5, 0x2e, 0xfd, 0x48, 0xc7, 0x2e, 0x6a, 0xba, 0xc9, 0xfd, 0xfd, 0xeb, 0x5f, 0x3e, 0xbf,
0xe3, 0x72, 0x5b, 0x6d, 0x94, 0x33, 0x9c, 0xd7, 0x0b, 0x78, 0xc5, 0x73, 0xf3, 0xeb, 0x9c, 0x67,
0x52, 0xd5, 0x9c, 0x9c, 0xeb, 0x9d, 0x9c, 0x2b, 0x7f, 0x2e, 0x36, 0x9b, 0xb1, 0x46, 0xaf, 0xff,
0x0d, 0x00, 0x00, 0xff, 0xff, 0xc5, 0x28, 0x7e, 0xad, 0xf2, 0x07, 0x00, 0x00,
}

View File

@ -111,7 +111,7 @@ func (mt *MetaTable) reloadFromCatalog() error {
mt.segID2IndexID = make(map[typeutil.UniqueID]typeutil.UniqueID)
mt.indexID2Meta = make(map[typeutil.UniqueID]*model.Index)
collAliases, err := mt.catalog.ListAliases(mt.ctx)
collAliases, err := mt.catalog.ListAliases(mt.ctx, 0)
if err != nil {
return err
}
@ -389,14 +389,15 @@ func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string
// no necessary to check created timestamp
}
coll.Partitions = append(coll.Partitions,
&model.Partition{
PartitionID: partitionID,
PartitionName: partitionName,
PartitionCreatedTimestamp: ts,
})
partition := &model.Partition{
PartitionID: partitionID,
PartitionName: partitionName,
PartitionCreatedTimestamp: ts,
CollectionID: collID,
}
coll.Partitions = append(coll.Partitions, partition)
if err := mt.catalog.CreatePartition(mt.ctx, &coll, ts); err != nil {
if err := mt.catalog.CreatePartition(mt.ctx, partition, ts); err != nil {
return err
}
@ -507,7 +508,7 @@ func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
}
col.Partitions = parts
if err := mt.catalog.DropPartition(mt.ctx, &col, partID, ts); err != nil {
if err := mt.catalog.DropPartition(mt.ctx, col.CollectionID, partID, ts); err != nil {
return 0, err
}
@ -1230,11 +1231,12 @@ func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string, ts
return fmt.Errorf("aliased collection name does not exist, name = %s", collectionName)
}
coll := &model.Collection{
alias := &model.Alias{
CollectionID: id,
Aliases: []string{collectionAlias},
Name: collectionAlias,
CreatedTime: ts,
}
if err := mt.catalog.CreateAlias(mt.ctx, coll, ts); err != nil {
if err := mt.catalog.CreateAlias(mt.ctx, alias, ts); err != nil {
return err
}
@ -1246,12 +1248,13 @@ func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string, ts
func (mt *MetaTable) DropAlias(collectionAlias string, ts typeutil.Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
collectionID, ok := mt.collAlias2ID[collectionAlias]
// TODO: drop alias should be idempotent.
_, ok := mt.collAlias2ID[collectionAlias]
if !ok {
return fmt.Errorf("alias does not exist, alias = %s", collectionAlias)
}
if err := mt.catalog.DropAlias(mt.ctx, collectionID, collectionAlias, ts); err != nil {
if err := mt.catalog.DropAlias(mt.ctx, collectionAlias, ts); err != nil {
return err
}
delete(mt.collAlias2ID, collectionAlias)
@ -1271,12 +1274,12 @@ func (mt *MetaTable) AlterAlias(collectionAlias string, collectionName string, t
return fmt.Errorf("aliased collection name does not exist, name = %s", collectionName)
}
coll := &model.Collection{
alias := &model.Alias{
CollectionID: id,
Aliases: []string{collectionAlias},
Name: collectionAlias,
CreatedTime: ts,
}
if err := mt.catalog.AlterAlias(mt.ctx, coll, ts); err != nil {
if err := mt.catalog.AlterAlias(mt.ctx, alias, ts); err != nil {
return err
}
mt.collAlias2ID[collectionAlias] = id

View File

@ -632,7 +632,12 @@ func TestMetaTable(t *testing.T) {
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return fmt.Errorf("multi save error")
}
tmpSaveFunc := mockKV.save
mockKV.save = func(key, value string, ts typeutil.Timestamp) error {
return errors.New("mock")
}
assert.Error(t, mt.AddPartition(coll.CollectionID, "no-part", 22, ts, ""))
mockKV.save = tmpSaveFunc
//err = mt.AddPartition(coll.CollectionID, "no-part", 22, ts, nil)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save error")
@ -2240,9 +2245,9 @@ func (mc *MockedCatalog) ListIndexes(ctx context.Context) ([]*model.Index, error
return args.Get(0).([]*model.Index), nil
}
func (mc *MockedCatalog) ListAliases(ctx context.Context) ([]*model.Collection, error) {
func (mc *MockedCatalog) ListAliases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) {
args := mc.Called()
return args.Get(0).([]*model.Collection), nil
return args.Get(0).([]*model.Alias), nil
}
func (mc *MockedCatalog) AlterIndex(ctx context.Context, oldIndex *model.Index, newIndex *model.Index, alterType metastore.AlterType) error {
@ -2356,7 +2361,16 @@ func TestMetaTable_ReloadFromKV(t *testing.T) {
alias2 := *collInfo
alias2.Name = collInfo.Aliases[1]
mc.On("ListAliases").Return([]*model.Collection{&alias1, &alias2}, nil)
mc.On("ListAliases").Return([]*model.Alias{
{
CollectionID: collInfo.CollectionID,
Name: collInfo.Aliases[0],
},
{
CollectionID: collInfo.CollectionID,
Name: collInfo.Aliases[1],
},
}, nil)
mt := &MetaTable{}
mt.catalog = mc