From e7b3bacbecd241ecb481c3ee44bf25f0f8150caf Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Wed, 10 Aug 2022 10:22:38 +0800 Subject: [PATCH] Refine catalog of partition & alias. (#18546) Signed-off-by: longjiquan --- internal/kv/mock_snapshot_kv.go | 53 ++ internal/kv/mock_snapshot_kv_test.go | 89 +++ internal/metastore/catalog.go | 12 +- internal/metastore/kv/kv_catalog.go | 419 +++++++++--- internal/metastore/kv/kv_catalog_test.go | 692 ++++++++++++++++++++ internal/metastore/kv/rootcorod_constant.go | 4 + internal/metastore/model/alias.go | 25 + internal/metastore/model/collection.go | 46 +- internal/metastore/model/collection_test.go | 36 - internal/metastore/model/partition.go | 21 + internal/proto/etcd_meta.proto | 8 +- internal/proto/etcdpb/etcd_meta.pb.go | 185 ++++-- internal/rootcoord/meta_table.go | 39 +- internal/rootcoord/meta_table_test.go | 20 +- 14 files changed, 1378 insertions(+), 271 deletions(-) create mode 100644 internal/kv/mock_snapshot_kv.go create mode 100644 internal/kv/mock_snapshot_kv_test.go create mode 100644 internal/metastore/model/alias.go diff --git a/internal/kv/mock_snapshot_kv.go b/internal/kv/mock_snapshot_kv.go new file mode 100644 index 0000000000..55cc2bb63e --- /dev/null +++ b/internal/kv/mock_snapshot_kv.go @@ -0,0 +1,53 @@ +package kv + +import ( + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +type mockSnapshotKV struct { + SnapShotKV + SaveFunc func(key string, value string, ts typeutil.Timestamp) error + LoadFunc func(key string, ts typeutil.Timestamp) (string, error) + MultiSaveFunc func(kvs map[string]string, ts typeutil.Timestamp) error + LoadWithPrefixFunc func(key string, ts typeutil.Timestamp) ([]string, []string, error) + MultiSaveAndRemoveWithPrefixFunc func(saves map[string]string, removals []string, ts typeutil.Timestamp) error +} + +func NewMockSnapshotKV() *mockSnapshotKV { + return &mockSnapshotKV{} +} + +func (m mockSnapshotKV) Save(key string, value string, ts typeutil.Timestamp) error { + if m.SaveFunc != nil { + return m.SaveFunc(key, value, ts) + } + return nil +} + +func (m mockSnapshotKV) Load(key string, ts typeutil.Timestamp) (string, error) { + if m.LoadFunc != nil { + return m.LoadFunc(key, ts) + } + return "", nil +} + +func (m mockSnapshotKV) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error { + if m.MultiSaveFunc != nil { + return m.MultiSaveFunc(kvs, ts) + } + return nil +} + +func (m mockSnapshotKV) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) { + if m.LoadWithPrefixFunc != nil { + return m.LoadWithPrefixFunc(key, ts) + } + return nil, nil, nil +} + +func (m mockSnapshotKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + if m.MultiSaveAndRemoveWithPrefixFunc != nil { + return m.MultiSaveAndRemoveWithPrefixFunc(saves, removals, ts) + } + return nil +} diff --git a/internal/kv/mock_snapshot_kv_test.go b/internal/kv/mock_snapshot_kv_test.go new file mode 100644 index 0000000000..319ce6b13e --- /dev/null +++ b/internal/kv/mock_snapshot_kv_test.go @@ -0,0 +1,89 @@ +package kv + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/util/typeutil" + + "github.com/stretchr/testify/assert" +) + +func Test_mockSnapshotKV_Save(t *testing.T) { + t.Run("func not set", func(t *testing.T) { + snapshot := NewMockSnapshotKV() + err := snapshot.Save("k", "v", 0) + assert.NoError(t, err) + }) + t.Run("func set", func(t *testing.T) { + snapshot := NewMockSnapshotKV() + snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error { + return nil + } + err := snapshot.Save("k", "v", 0) + assert.NoError(t, err) + }) +} + +func Test_mockSnapshotKV_Load(t *testing.T) { + t.Run("func not set", func(t *testing.T) { + snapshot := NewMockSnapshotKV() + _, err := snapshot.Load("k", 0) + assert.NoError(t, err) + }) + t.Run("func set", func(t *testing.T) { + snapshot := NewMockSnapshotKV() + snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { + return "", nil + } + _, err := snapshot.Load("k", 0) + assert.NoError(t, err) + }) +} + +func Test_mockSnapshotKV_MultiSave(t *testing.T) { + t.Run("func not set", func(t *testing.T) { + snapshot := NewMockSnapshotKV() + err := snapshot.MultiSave(nil, 0) + assert.NoError(t, err) + }) + t.Run("func set", func(t *testing.T) { + snapshot := NewMockSnapshotKV() + snapshot.MultiSaveFunc = func(kvs map[string]string, ts typeutil.Timestamp) error { + return nil + } + err := snapshot.MultiSave(nil, 0) + assert.NoError(t, err) + }) +} + +func Test_mockSnapshotKV_LoadWithPrefix(t *testing.T) { + t.Run("func not set", func(t *testing.T) { + snapshot := NewMockSnapshotKV() + _, _, err := snapshot.LoadWithPrefix("prefix", 0) + assert.NoError(t, err) + }) + t.Run("func set", func(t *testing.T) { + snapshot := NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return nil, nil, nil + } + _, _, err := snapshot.LoadWithPrefix("prefix", 0) + assert.NoError(t, err) + }) +} + +func Test_mockSnapshotKV_MultiSaveAndRemoveWithPrefix(t *testing.T) { + t.Run("func not set", func(t *testing.T) { + snapshot := NewMockSnapshotKV() + err := snapshot.MultiSaveAndRemoveWithPrefix(nil, nil, 0) + assert.NoError(t, err) + }) + t.Run("func set", func(t *testing.T) { + snapshot := NewMockSnapshotKV() + snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + return nil + } + err := snapshot.MultiSaveAndRemoveWithPrefix(nil, nil, 0) + assert.NoError(t, err) + }) +} diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index 8c97f04c31..bf5fd83612 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -17,8 +17,8 @@ type Catalog interface { CollectionExists(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) bool DropCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error - CreatePartition(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error - DropPartition(ctx context.Context, collectionInfo *model.Collection, partitionID typeutil.UniqueID, ts typeutil.Timestamp) error + CreatePartition(ctx context.Context, partition *model.Partition, ts typeutil.Timestamp) error + DropPartition(ctx context.Context, collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) error CreateIndex(ctx context.Context, col *model.Collection, index *model.Index) error // AlterIndex newIndex only contains updated parts @@ -26,10 +26,10 @@ type Catalog interface { DropIndex(ctx context.Context, collectionInfo *model.Collection, dropIdxID typeutil.UniqueID, ts typeutil.Timestamp) error ListIndexes(ctx context.Context) ([]*model.Index, error) - CreateAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error - DropAlias(ctx context.Context, collectionID typeutil.UniqueID, alias string, ts typeutil.Timestamp) error - AlterAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error - ListAliases(ctx context.Context) ([]*model.Collection, error) + CreateAlias(ctx context.Context, alias *model.Alias, ts typeutil.Timestamp) error + DropAlias(ctx context.Context, alias string, ts typeutil.Timestamp) error + AlterAlias(ctx context.Context, alias *model.Alias, ts typeutil.Timestamp) error + ListAliases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) GetCredential(ctx context.Context, username string) (*model.Credential, error) CreateCredential(ctx context.Context, credential *model.Credential) error diff --git a/internal/metastore/kv/kv_catalog.go b/internal/metastore/kv/kv_catalog.go index a6609a9ae8..66d5364762 100644 --- a/internal/metastore/kv/kv_catalog.go +++ b/internal/metastore/kv/kv_catalog.go @@ -28,13 +28,84 @@ import ( "go.uber.org/zap" ) +// prefix/collection/collection_id -> CollectionInfo +// prefix/partitions/collection_id/partition_id -> PartitionInfo +// prefix/aliases/alias_name -> AliasInfo +// prefix/fields/collection_id/field_id -> FieldSchema type Catalog struct { Txn kv.TxnKV Snapshot kv.SnapShotKV } +func buildCollectionKey(collectionID typeutil.UniqueID) string { + return fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID) +} + +func buildPartitionPrefix(collectionID typeutil.UniqueID) string { + return fmt.Sprintf("%s/%d", PartitionMetaPrefix, collectionID) +} + +func buildPartitionKey(collectionID, partitionID typeutil.UniqueID) string { + return fmt.Sprintf("%s/%d", buildPartitionPrefix(collectionID), partitionID) +} + +func buildFieldPrefix(collectionID typeutil.UniqueID) string { + return fmt.Sprintf("%s/%d", FieldMetaPrefix, collectionID) +} + +func buildFieldKey(collectionID typeutil.UniqueID, fieldID int64) string { + return fmt.Sprintf("%s/%d", buildFieldPrefix(collectionID), fieldID) +} + +func buildAliasKey(aliasName string) string { + return fmt.Sprintf("%s/%s", AliasMetaPrefix, aliasName) +} + +func buildKvs(keys, values []string) (map[string]string, error) { + if len(keys) != len(values) { + return nil, fmt.Errorf("length of keys (%d) and values (%d) are not equal", len(keys), len(values)) + } + ret := make(map[string]string, len(keys)) + for i, k := range keys { + _, ok := ret[k] + if ok { + return nil, fmt.Errorf("duplicated key was found: %s", k) + } + ret[k] = values[i] + } + return ret, nil +} + +// TODO: atomicity should be promised outside. +func batchSave(snapshot kv.SnapShotKV, maxTxnNum int, kvs map[string]string, ts typeutil.Timestamp) error { + keys := make([]string, 0, len(kvs)) + values := make([]string, 0, len(kvs)) + for k, v := range kvs { + keys = append(keys, k) + values = append(values, v) + } + min := func(a, b int) int { + if a < b { + return a + } + return b + } + for i := 0; i < len(kvs); i = i + maxTxnNum { + end := min(i+maxTxnNum, len(keys)) + batch, err := buildKvs(keys[i:end], values[i:end]) + if err != nil { + return err + } + // TODO: atomicity is not promised. Garbage will be generated. + if err := snapshot.MultiSave(batch, ts); err != nil { + return err + } + } + return nil +} + func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error { - k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.CollectionID) + k1 := buildCollectionKey(coll.CollectionID) collInfo := model.MarshalCollectionModel(coll) v1, err := proto.Marshal(collInfo) if err != nil { @@ -42,46 +113,100 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, return err } - // save ddOpStr into etcd kvs := map[string]string{k1: string(v1)} - for k, v := range coll.Extra { - kvs[k] = v + + // save partition info to newly path. + for _, partition := range coll.Partitions { + k := buildPartitionKey(coll.CollectionID, partition.PartitionID) + partitionInfo := model.MarshalPartitionModel(partition) + v, err := proto.Marshal(partitionInfo) + if err != nil { + return err + } + kvs[k] = string(v) } - err = kc.Snapshot.MultiSave(kvs, ts) - if err != nil { - log.Error("create collection persist meta fail", zap.String("key", k1), zap.Error(err)) - return err + // no default aliases will be created. + + // save fields info to newly path. + for _, field := range coll.Fields { + k := buildFieldKey(coll.CollectionID, field.FieldID) + fieldInfo := model.MarshalFieldModel(field) + v, err := proto.Marshal(fieldInfo) + if err != nil { + return err + } + kvs[k] = string(v) } - return nil + // TODO: atomicity should be promised outside. + maxTxnNum := 64 + return batchSave(kc.Snapshot, maxTxnNum, kvs, ts) } -func (kc *Catalog) CreatePartition(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error { - k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.CollectionID) - collInfo := model.MarshalCollectionModel(coll) - v1, err := proto.Marshal(collInfo) +func (kc *Catalog) loadCollection(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) { + collKey := buildCollectionKey(collectionID) + collVal, err := kc.Snapshot.Load(collKey, ts) + if err != nil { + log.Error("get collection meta fail", zap.String("key", collKey), zap.Error(err)) + return nil, err + } + + collMeta := &pb.CollectionInfo{} + err = proto.Unmarshal([]byte(collVal), collMeta) + return collMeta, err +} + +func partitionVersionAfter210(collMeta *pb.CollectionInfo) bool { + return len(collMeta.GetPartitionIDs()) <= 0 && + len(collMeta.GetPartitionNames()) <= 0 && + len(collMeta.GetPartitionCreatedTimestamps()) <= 0 +} + +func partitionExistByID(collMeta *pb.CollectionInfo, partitionID typeutil.UniqueID) bool { + return funcutil.SliceContain(collMeta.GetPartitionIDs(), partitionID) +} + +func partitionExistByName(collMeta *pb.CollectionInfo, partitionName string) bool { + return funcutil.SliceContain(collMeta.GetPartitionNames(), partitionName) +} + +func (kc *Catalog) CreatePartition(ctx context.Context, partition *model.Partition, ts typeutil.Timestamp) error { + collMeta, err := kc.loadCollection(ctx, partition.CollectionID, ts) if err != nil { - log.Error("create partition marshal fail", zap.String("key", k1), zap.Error(err)) return err } - kvs := map[string]string{k1: string(v1)} - err = kc.Snapshot.MultiSave(kvs, ts) - if err != nil { - log.Error("create partition persist meta fail", zap.String("key", k1), zap.Error(err)) - return err + if partitionVersionAfter210(collMeta) { + // save to newly path. + k := buildPartitionKey(partition.CollectionID, partition.PartitionID) + partitionInfo := model.MarshalPartitionModel(partition) + v, err := proto.Marshal(partitionInfo) + if err != nil { + return err + } + return kc.Snapshot.Save(k, string(v), ts) } - // save ddOpStr into etcd - err = kc.Txn.MultiSave(coll.Extra) - if err != nil { - // will not panic, missing create msg - log.Warn("create partition persist ddop meta fail", zap.Int64("collectionID", coll.CollectionID), zap.Error(err)) - return err + if partitionExistByID(collMeta, partition.PartitionID) { + return fmt.Errorf("partition already exist: %d", partition.PartitionID) } - return nil + if partitionExistByName(collMeta, partition.PartitionName) { + return fmt.Errorf("partition already exist: %s", partition.PartitionName) + } + + // keep consistent with older version, otherwise it's hard to judge where to find partitions. + collMeta.PartitionIDs = append(collMeta.PartitionIDs, partition.PartitionID) + collMeta.PartitionNames = append(collMeta.PartitionNames, partition.PartitionName) + collMeta.PartitionCreatedTimestamps = append(collMeta.PartitionCreatedTimestamps, partition.PartitionCreatedTimestamp) + + k := buildCollectionKey(partition.CollectionID) + v, err := proto.Marshal(collMeta) + if err != nil { + return err + } + return kc.Snapshot.Save(k, string(v), ts) } func (kc *Catalog) CreateIndex(ctx context.Context, col *model.Collection, index *model.Index) error { @@ -192,21 +317,16 @@ func (kc *Catalog) AlterIndex(ctx context.Context, oldIndex *model.Index, newInd } } -func (kc *Catalog) CreateAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error { - k := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collection.Aliases[0]) - v, err := proto.Marshal(&pb.CollectionInfo{ID: collection.CollectionID, Schema: &schemapb.CollectionSchema{Name: collection.Aliases[0]}}) +func (kc *Catalog) CreateAlias(ctx context.Context, alias *model.Alias, ts typeutil.Timestamp) error { + oldKBefore210 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias.Name) + k := buildAliasKey(alias.Name) + aliasInfo := model.MarshalAliasModel(alias) + v, err := proto.Marshal(aliasInfo) if err != nil { - log.Error("create alias marshal fail", zap.String("key", k), zap.Error(err)) return err } - - err = kc.Snapshot.Save(k, string(v), ts) - if err != nil { - log.Error("create alias persist meta fail", zap.String("key", k), zap.Error(err)) - return err - } - - return nil + kvs := map[string]string{k: string(v)} + return kc.Snapshot.MultiSaveAndRemoveWithPrefix(kvs, []string{oldKBefore210}, ts) } func (kc *Catalog) CreateCredential(ctx context.Context, credential *model.Credential) error { @@ -226,22 +346,73 @@ func (kc *Catalog) CreateCredential(ctx context.Context, credential *model.Crede return nil } -func (kc *Catalog) GetCollectionByID(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*model.Collection, error) { - collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID) - collVal, err := kc.Snapshot.Load(collKey, ts) +func (kc *Catalog) listPartitionsAfter210(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) ([]*model.Partition, error) { + prefix := buildPartitionPrefix(collectionID) + _, values, err := kc.Snapshot.LoadWithPrefix(prefix, ts) if err != nil { - log.Error("get collection meta fail", zap.String("key", collKey), zap.Error(err)) return nil, err } + partitions := make([]*model.Partition, 0, len(values)) + for _, v := range values { + partitionMeta := &pb.PartitionInfo{} + err := proto.Unmarshal([]byte(v), partitionMeta) + if err != nil { + return nil, err + } + partitions = append(partitions, model.UnmarshalPartitionModel(partitionMeta)) + } + return partitions, nil +} - collMeta := &pb.CollectionInfo{} - err = proto.Unmarshal([]byte(collVal), collMeta) +func fieldVersionAfter210(collMeta *pb.CollectionInfo) bool { + return len(collMeta.GetSchema().GetFields()) <= 0 +} + +func (kc *Catalog) listFieldsAfter210(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) ([]*model.Field, error) { + prefix := buildFieldPrefix(collectionID) + _, values, err := kc.Snapshot.LoadWithPrefix(prefix, ts) + if err != nil { + return nil, err + } + fields := make([]*model.Field, 0, len(values)) + for _, v := range values { + partitionMeta := &schemapb.FieldSchema{} + err := proto.Unmarshal([]byte(v), partitionMeta) + if err != nil { + return nil, err + } + fields = append(fields, model.UnmarshalFieldModel(partitionMeta)) + } + return fields, nil +} + +func (kc *Catalog) GetCollectionByID(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*model.Collection, error) { + collKey := buildCollectionKey(collectionID) + collMeta, err := kc.loadCollection(ctx, collectionID, ts) if err != nil { log.Error("collection meta marshal fail", zap.String("key", collKey), zap.Error(err)) return nil, err } - return model.UnmarshalCollectionModel(collMeta), nil + collection := model.UnmarshalCollectionModel(collMeta) + + if !partitionVersionAfter210(collMeta) && !fieldVersionAfter210(collMeta) { + return collection, nil + } + + partitions, err := kc.listPartitionsAfter210(ctx, collectionID, ts) + if err != nil { + return nil, err + } + collection.Partitions = partitions + + fields, err := kc.listFieldsAfter210(ctx, collectionID, ts) + if err != nil { + return nil, err + } + collection.Fields = fields + + return collection, nil } func (kc *Catalog) CollectionExists(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) bool { @@ -266,8 +437,8 @@ func (kc *Catalog) GetCredential(ctx context.Context, username string) (*model.C return &model.Credential{Username: username, EncryptedPassword: credentialInfo.EncryptedPassword}, nil } -func (kc *Catalog) AlterAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error { - return kc.CreateAlias(ctx, collection, ts) +func (kc *Catalog) AlterAlias(ctx context.Context, alias *model.Alias, ts typeutil.Timestamp) error { + return kc.CreateAlias(ctx, alias, ts) } func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error { @@ -306,45 +477,45 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col return nil } -func (kc *Catalog) DropPartition(ctx context.Context, collectionInfo *model.Collection, partitionID typeutil.UniqueID, ts typeutil.Timestamp) error { - collMeta := model.MarshalCollectionModel(collectionInfo) - k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collectionInfo.CollectionID, 10)) +func dropPartition(collMeta *pb.CollectionInfo, partitionID typeutil.UniqueID) { + if collMeta == nil { + return + } + + { + loc := -1 + for idx, pid := range collMeta.GetPartitionIDs() { + if pid == partitionID { + loc = idx + break + } + } + if loc != -1 { + collMeta.PartitionIDs = append(collMeta.GetPartitionIDs()[:loc], collMeta.GetPartitionIDs()[loc+1:]...) + collMeta.PartitionNames = append(collMeta.GetPartitionNames()[:loc], collMeta.GetPartitionNames()[loc+1:]...) + collMeta.PartitionCreatedTimestamps = append(collMeta.GetPartitionCreatedTimestamps()[:loc], collMeta.GetPartitionCreatedTimestamps()[loc+1:]...) + } + } +} + +func (kc *Catalog) DropPartition(ctx context.Context, collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) error { + collMeta, err := kc.loadCollection(ctx, collectionID, ts) + if err != nil { + return err + } + + if partitionVersionAfter210(collMeta) { + k := buildPartitionKey(collectionID, partitionID) + return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k}, ts) + } + + k := buildCollectionKey(collectionID) + dropPartition(collMeta, partitionID) v, err := proto.Marshal(collMeta) if err != nil { - log.Error("drop partition marshal fail", zap.String("key", k), zap.Error(err)) return err } - - err = kc.Snapshot.Save(k, string(v), ts) - if err != nil { - log.Error("drop partition update collection meta fail", - zap.Int64("collectionID", collectionInfo.CollectionID), - zap.Int64("partitionID", partitionID), - zap.Error(err)) - return err - } - - var delMetaKeys []string - for _, idxInfo := range collMeta.FieldIndexes { - k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partitionID) - delMetaKeys = append(delMetaKeys, k) - } - - // Txn operation - metaTxn := map[string]string{} - for k, v := range collectionInfo.Extra { - metaTxn[k] = v - } - err = kc.Txn.MultiSaveAndRemoveWithPrefix(metaTxn, delMetaKeys) - if err != nil { - log.Warn("drop partition update meta fail", - zap.Int64("collectionID", collectionInfo.CollectionID), - zap.Int64("partitionID", partitionID), - zap.Error(err)) - return err - } - - return nil + return kc.Snapshot.Save(k, string(v), ts) } func (kc *Catalog) DropIndex(ctx context.Context, collectionInfo *model.Collection, dropIdxID typeutil.UniqueID, ts typeutil.Timestamp) error { @@ -386,19 +557,10 @@ func (kc *Catalog) DropCredential(ctx context.Context, username string) error { return nil } -func (kc *Catalog) DropAlias(ctx context.Context, collectionID typeutil.UniqueID, alias string, ts typeutil.Timestamp) error { - delMetakeys := []string{ - fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias), - } - - meta := make(map[string]string) - err := kc.Snapshot.MultiSaveAndRemoveWithPrefix(meta, delMetakeys, ts) - if err != nil { - log.Error("drop alias update meta fail", zap.String("alias", alias), zap.Error(err)) - return err - } - - return nil +func (kc *Catalog) DropAlias(ctx context.Context, alias string, ts typeutil.Timestamp) error { + oldKBefore210 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias) + k := buildAliasKey(alias) + return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k, oldKBefore210}, ts) } func (kc *Catalog) GetCollectionByName(ctx context.Context, collectionName string, ts typeutil.Timestamp) (*model.Collection, error) { @@ -416,7 +578,8 @@ func (kc *Catalog) GetCollectionByName(ctx context.Context, collectionName strin continue } if colMeta.Schema.Name == collectionName { - return model.UnmarshalCollectionModel(&colMeta), nil + // compatibility handled by kc.GetCollectionByID. + return kc.GetCollectionByID(ctx, colMeta.GetID(), ts) } } @@ -441,31 +604,71 @@ func (kc *Catalog) ListCollections(ctx context.Context, ts typeutil.Timestamp) ( log.Warn("unmarshal collection info failed", zap.Error(err)) continue } - colls[collMeta.Schema.Name] = model.UnmarshalCollectionModel(&collMeta) + collection, err := kc.GetCollectionByID(ctx, collMeta.GetID(), ts) + if err != nil { + return nil, err + } + colls[collMeta.Schema.Name] = collection } return colls, nil } -func (kc *Catalog) ListAliases(ctx context.Context) ([]*model.Collection, error) { - _, values, err := kc.Snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, 0) +func (kc *Catalog) listAliasesBefore210(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) { + _, values, err := kc.Snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, ts) if err != nil { - log.Error("get aliases meta fail", zap.String("prefix", CollectionAliasMetaPrefix), zap.Error(err)) return nil, err } - - var colls []*model.Collection + // aliases before 210 stored by CollectionInfo. + aliases := make([]*model.Alias, 0, len(values)) for _, value := range values { - aliasInfo := pb.CollectionInfo{} - err = proto.Unmarshal([]byte(value), &aliasInfo) + coll := &pb.CollectionInfo{} + err := proto.Unmarshal([]byte(value), coll) if err != nil { - log.Warn("unmarshal aliases failed", zap.Error(err)) - continue + return nil, err } - colls = append(colls, model.UnmarshalCollectionModel(&aliasInfo)) + aliases = append(aliases, &model.Alias{ + Name: coll.GetSchema().GetName(), + CollectionID: coll.GetID(), + CreatedTime: 0, // not accurate. + }) } + return aliases, nil +} - return colls, nil +func (kc *Catalog) listAliasesAfter210(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) { + _, values, err := kc.Snapshot.LoadWithPrefix(AliasMetaPrefix, ts) + if err != nil { + return nil, err + } + // aliases after 210 stored by AliasInfo. + aliases := make([]*model.Alias, 0, len(values)) + for _, value := range values { + info := &pb.AliasInfo{} + err := proto.Unmarshal([]byte(value), info) + if err != nil { + return nil, err + } + aliases = append(aliases, &model.Alias{ + Name: info.GetAliasName(), + CollectionID: info.GetCollectionId(), + CreatedTime: info.GetCreatedTime(), + }) + } + return aliases, nil +} + +func (kc *Catalog) ListAliases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) { + aliases1, err := kc.listAliasesBefore210(ctx, ts) + if err != nil { + return nil, err + } + aliases2, err := kc.listAliasesAfter210(ctx, ts) + if err != nil { + return nil, err + } + aliases := append(aliases1, aliases2...) + return aliases, nil } func (kc *Catalog) listSegmentIndexes(ctx context.Context) (map[int64]*model.Index, error) { diff --git a/internal/metastore/kv/kv_catalog_test.go b/internal/metastore/kv/kv_catalog_test.go index f0961e7e10..c21ef0cb6f 100644 --- a/internal/metastore/kv/kv_catalog_test.go +++ b/internal/metastore/kv/kv_catalog_test.go @@ -5,6 +5,10 @@ import ( "errors" "testing" + "github.com/milvus-io/milvus/internal/proto/schemapb" + + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/proto/commonpb" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" @@ -164,3 +168,691 @@ func Test_ListIndexes(t *testing.T) { }) } + +func TestCatalog_loadCollection(t *testing.T) { + t.Run("load failed", func(t *testing.T) { + ctx := context.Background() + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { + return "", errors.New("mock") + } + kc := Catalog{Snapshot: snapshot} + _, err := kc.loadCollection(ctx, 1, 0) + assert.Error(t, err) + }) + + t.Run("load, not collection info", func(t *testing.T) { + ctx := context.Background() + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { + return "not in pb format", nil + } + kc := Catalog{Snapshot: snapshot} + _, err := kc.loadCollection(ctx, 1, 0) + assert.Error(t, err) + }) + + t.Run("load, normal collection info", func(t *testing.T) { + ctx := context.Background() + snapshot := kv.NewMockSnapshotKV() + coll := &pb.CollectionInfo{ID: 1} + value, err := proto.Marshal(coll) + assert.NoError(t, err) + snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { + return string(value), nil + } + kc := Catalog{Snapshot: snapshot} + got, err := kc.loadCollection(ctx, 1, 0) + assert.NoError(t, err) + assert.Equal(t, got.GetID(), coll.GetID()) + }) +} + +func Test_partitionVersionAfter210(t *testing.T) { + t.Run("after 210", func(t *testing.T) { + coll := &pb.CollectionInfo{} + assert.True(t, partitionVersionAfter210(coll)) + }) + + t.Run("before 210", func(t *testing.T) { + coll := &pb.CollectionInfo{ + PartitionIDs: []int64{1}, + PartitionNames: []string{"partition"}, + PartitionCreatedTimestamps: []uint64{2}, + } + assert.False(t, partitionVersionAfter210(coll)) + }) +} + +func Test_partitionExist(t *testing.T) { + t.Run("in partitions ids", func(t *testing.T) { + coll := &pb.CollectionInfo{ + PartitionIDs: []int64{1}, + } + assert.True(t, partitionExistByID(coll, 1)) + }) + + t.Run("not exist", func(t *testing.T) { + coll := &pb.CollectionInfo{} + assert.False(t, partitionExistByID(coll, 1)) + }) +} + +func Test_partitionExistByName(t *testing.T) { + t.Run("in partitions ids", func(t *testing.T) { + coll := &pb.CollectionInfo{ + PartitionNames: []string{"partition"}, + } + assert.True(t, partitionExistByName(coll, "partition")) + }) + + t.Run("not exist", func(t *testing.T) { + coll := &pb.CollectionInfo{} + assert.False(t, partitionExistByName(coll, "partition")) + }) +} + +func TestCatalog_CreatePartitionV2(t *testing.T) { + t.Run("collection not exist", func(t *testing.T) { + ctx := context.Background() + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { + return "", errors.New("mock") + } + kc := Catalog{Snapshot: snapshot} + err := kc.CreatePartition(ctx, &model.Partition{}, 0) + assert.Error(t, err) + }) + + t.Run("partition version after 210", func(t *testing.T) { + ctx := context.Background() + + coll := &pb.CollectionInfo{} + value, err := proto.Marshal(coll) + assert.NoError(t, err) + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { + return string(value), nil + } + snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error { + return errors.New("mock") + } + + kc := Catalog{Snapshot: snapshot} + + err = kc.CreatePartition(ctx, &model.Partition{}, 0) + assert.Error(t, err) + + snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error { + return nil + } + err = kc.CreatePartition(ctx, &model.Partition{}, 0) + assert.NoError(t, err) + }) + + t.Run("partition version before 210, id exist", func(t *testing.T) { + ctx := context.Background() + + partID := typeutil.UniqueID(1) + coll := &pb.CollectionInfo{PartitionIDs: []int64{partID}} + value, err := proto.Marshal(coll) + assert.NoError(t, err) + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { + return string(value), nil + } + + kc := Catalog{Snapshot: snapshot} + + err = kc.CreatePartition(ctx, &model.Partition{PartitionID: partID}, 0) + assert.Error(t, err) + }) + + t.Run("partition version before 210, name exist", func(t *testing.T) { + ctx := context.Background() + + partition := "partition" + coll := &pb.CollectionInfo{PartitionNames: []string{partition}} + value, err := proto.Marshal(coll) + assert.NoError(t, err) + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { + return string(value), nil + } + + kc := Catalog{Snapshot: snapshot} + + err = kc.CreatePartition(ctx, &model.Partition{PartitionName: partition}, 0) + assert.Error(t, err) + }) + + t.Run("partition version before 210, not exist", func(t *testing.T) { + ctx := context.Background() + + coll := &pb.CollectionInfo{ + PartitionNames: []string{"partition"}, + PartitionIDs: []int64{111}, + PartitionCreatedTimestamps: []uint64{111111}, + } + value, err := proto.Marshal(coll) + assert.NoError(t, err) + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { + return string(value), nil + } + snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error { + return errors.New("mock") + } + + kc := Catalog{Snapshot: snapshot} + + err = kc.CreatePartition(ctx, &model.Partition{}, 0) + assert.Error(t, err) + + snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error { + return nil + } + err = kc.CreatePartition(ctx, &model.Partition{}, 0) + assert.NoError(t, err) + }) +} + +func TestCatalog_CreateAliasV2(t *testing.T) { + ctx := context.Background() + + snapshot := kv.NewMockSnapshotKV() + snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + return errors.New("mock") + } + + kc := Catalog{Snapshot: snapshot} + + err := kc.CreateAlias(ctx, &model.Alias{}, 0) + assert.Error(t, err) + + snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + return nil + } + err = kc.CreateAlias(ctx, &model.Alias{}, 0) + assert.NoError(t, err) +} + +func TestCatalog_listPartitionsAfter210(t *testing.T) { + t.Run("load failed", func(t *testing.T) { + ctx := context.Background() + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return nil, nil, errors.New("mock") + } + + kc := Catalog{Snapshot: snapshot} + + _, err := kc.listPartitionsAfter210(ctx, 1, 0) + assert.Error(t, err) + }) + + t.Run("not in pb format", func(t *testing.T) { + ctx := context.Background() + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return []string{"key"}, []string{"not in pb format"}, nil + } + + kc := Catalog{Snapshot: snapshot} + + _, err := kc.listPartitionsAfter210(ctx, 1, 0) + assert.Error(t, err) + }) + + t.Run("normal case", func(t *testing.T) { + ctx := context.Background() + + partition := &pb.PartitionInfo{PartitionID: 100} + value, err := proto.Marshal(partition) + assert.NoError(t, err) + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return []string{"key"}, []string{string(value)}, nil + } + + kc := Catalog{Snapshot: snapshot} + + got, err := kc.listPartitionsAfter210(ctx, 1, 0) + assert.NoError(t, err) + assert.Equal(t, 1, len(got)) + assert.Equal(t, int64(100), got[0].PartitionID) + }) +} + +func Test_fieldVersionAfter210(t *testing.T) { + coll := &pb.CollectionInfo{Schema: &schemapb.CollectionSchema{}} + assert.True(t, fieldVersionAfter210(coll)) + + coll.Schema.Fields = []*schemapb.FieldSchema{{FieldID: 101}} + assert.False(t, fieldVersionAfter210(coll)) +} + +func TestCatalog_listFieldsAfter210(t *testing.T) { + t.Run("load failed", func(t *testing.T) { + ctx := context.Background() + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return nil, nil, errors.New("mock") + } + + kc := Catalog{Snapshot: snapshot} + + _, err := kc.listFieldsAfter210(ctx, 1, 0) + assert.Error(t, err) + }) + + t.Run("not in pb format", func(t *testing.T) { + ctx := context.Background() + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return []string{"key"}, []string{"not in pb format"}, nil + } + + kc := Catalog{Snapshot: snapshot} + + _, err := kc.listFieldsAfter210(ctx, 1, 0) + assert.Error(t, err) + }) + + t.Run("normal case", func(t *testing.T) { + ctx := context.Background() + + field := &schemapb.FieldSchema{FieldID: 101} + value, err := proto.Marshal(field) + assert.NoError(t, err) + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return []string{"key"}, []string{string(value)}, nil + } + + kc := Catalog{Snapshot: snapshot} + + got, err := kc.listFieldsAfter210(ctx, 1, 0) + assert.NoError(t, err) + assert.Equal(t, 1, len(got)) + assert.Equal(t, int64(101), got[0].FieldID) + }) +} + +func TestCatalog_AlterAliasV2(t *testing.T) { + ctx := context.Background() + + snapshot := kv.NewMockSnapshotKV() + snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + return errors.New("mock") + } + + kc := Catalog{Snapshot: snapshot} + + err := kc.AlterAlias(ctx, &model.Alias{}, 0) + assert.Error(t, err) + + snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + return nil + } + err = kc.AlterAlias(ctx, &model.Alias{}, 0) + assert.NoError(t, err) +} + +func Test_dropPartition(t *testing.T) { + t.Run("nil, won't panic", func(t *testing.T) { + dropPartition(nil, 1) + }) + + t.Run("not exist", func(t *testing.T) { + coll := &pb.CollectionInfo{} + dropPartition(coll, 100) + }) + + t.Run("in partition ids", func(t *testing.T) { + coll := &pb.CollectionInfo{ + PartitionIDs: []int64{100, 101, 102, 103, 104}, + PartitionNames: []string{"0", "1", "2", "3", "4"}, + PartitionCreatedTimestamps: []uint64{1, 2, 3, 4, 5}, + } + dropPartition(coll, 100) + assert.Equal(t, 4, len(coll.GetPartitionIDs())) + dropPartition(coll, 104) + assert.Equal(t, 3, len(coll.GetPartitionIDs())) + dropPartition(coll, 102) + assert.Equal(t, 2, len(coll.GetPartitionIDs())) + dropPartition(coll, 103) + assert.Equal(t, 1, len(coll.GetPartitionIDs())) + dropPartition(coll, 101) + assert.Equal(t, 0, len(coll.GetPartitionIDs())) + }) +} + +func TestCatalog_DropPartitionV2(t *testing.T) { + t.Run("failed to load collection", func(t *testing.T) { + ctx := context.Background() + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { + return "", errors.New("mock") + } + + kc := Catalog{Snapshot: snapshot} + + err := kc.DropPartition(ctx, 100, 101, 0) + assert.Error(t, err) + }) + + t.Run("partition version after 210", func(t *testing.T) { + ctx := context.Background() + + coll := &pb.CollectionInfo{} + value, err := proto.Marshal(coll) + assert.NoError(t, err) + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { + return string(value), nil + } + snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + return errors.New("mock") + } + + kc := Catalog{Snapshot: snapshot} + + err = kc.DropPartition(ctx, 100, 101, 0) + assert.Error(t, err) + + snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + return nil + } + err = kc.DropPartition(ctx, 100, 101, 0) + assert.NoError(t, err) + }) + + t.Run("partition before 210", func(t *testing.T) { + ctx := context.Background() + + coll := &pb.CollectionInfo{ + PartitionIDs: []int64{101, 102}, + PartitionNames: []string{"partition1", "partition2"}, + PartitionCreatedTimestamps: []uint64{101, 102}, + } + value, err := proto.Marshal(coll) + assert.NoError(t, err) + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { + return string(value), nil + } + snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error { + return errors.New("mock") + } + + kc := Catalog{Snapshot: snapshot} + + err = kc.DropPartition(ctx, 100, 101, 0) + assert.Error(t, err) + + snapshot.SaveFunc = func(key string, value string, ts typeutil.Timestamp) error { + return nil + } + err = kc.DropPartition(ctx, 100, 102, 0) + assert.NoError(t, err) + }) +} + +func TestCatalog_DropAliasV2(t *testing.T) { + ctx := context.Background() + + snapshot := kv.NewMockSnapshotKV() + snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + return errors.New("mock") + } + + kc := Catalog{Snapshot: snapshot} + + err := kc.DropAlias(ctx, "alias", 0) + assert.Error(t, err) + + snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + return nil + } + err = kc.DropAlias(ctx, "alias", 0) + assert.NoError(t, err) +} + +func TestCatalog_listAliasesBefore210(t *testing.T) { + t.Run("load failed", func(t *testing.T) { + ctx := context.Background() + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return nil, nil, errors.New("mock") + } + + kc := Catalog{Snapshot: snapshot} + + _, err := kc.listAliasesBefore210(ctx, 0) + assert.Error(t, err) + }) + + t.Run("not in pb format", func(t *testing.T) { + ctx := context.Background() + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return []string{"key"}, []string{"not in pb format"}, nil + } + + kc := Catalog{Snapshot: snapshot} + + _, err := kc.listAliasesBefore210(ctx, 0) + assert.Error(t, err) + }) + + t.Run("normal case", func(t *testing.T) { + ctx := context.Background() + + coll := &pb.CollectionInfo{ID: 100} + value, err := proto.Marshal(coll) + assert.NoError(t, err) + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return []string{"key"}, []string{string(value)}, nil + } + + kc := Catalog{Snapshot: snapshot} + + got, err := kc.listAliasesBefore210(ctx, 0) + assert.NoError(t, err) + assert.Equal(t, 1, len(got)) + assert.Equal(t, int64(100), got[0].CollectionID) + }) +} + +func TestCatalog_listAliasesAfter210(t *testing.T) { + t.Run("load failed", func(t *testing.T) { + ctx := context.Background() + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return nil, nil, errors.New("mock") + } + + kc := Catalog{Snapshot: snapshot} + + _, err := kc.listAliasesAfter210(ctx, 0) + assert.Error(t, err) + }) + + t.Run("not in pb format", func(t *testing.T) { + ctx := context.Background() + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return []string{"key"}, []string{"not in pb format"}, nil + } + + kc := Catalog{Snapshot: snapshot} + + _, err := kc.listAliasesAfter210(ctx, 0) + assert.Error(t, err) + }) + + t.Run("normal case", func(t *testing.T) { + ctx := context.Background() + + coll := &pb.AliasInfo{CollectionId: 100} + value, err := proto.Marshal(coll) + assert.NoError(t, err) + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return []string{"key"}, []string{string(value)}, nil + } + + kc := Catalog{Snapshot: snapshot} + + got, err := kc.listAliasesAfter210(ctx, 0) + assert.NoError(t, err) + assert.Equal(t, 1, len(got)) + assert.Equal(t, int64(100), got[0].CollectionID) + }) +} + +func TestCatalog_ListAliasesV2(t *testing.T) { + t.Run("failed to list aliases before 210", func(t *testing.T) { + ctx := context.Background() + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return []string{"key"}, []string{"not in pb format"}, nil + } + + kc := Catalog{Snapshot: snapshot} + + _, err := kc.ListAliases(ctx, 0) + assert.Error(t, err) + }) + + t.Run("failed to list aliases after 210", func(t *testing.T) { + ctx := context.Background() + + coll := &pb.CollectionInfo{ID: 100, ShardsNum: 50} + value, err := proto.Marshal(coll) + assert.NoError(t, err) + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + if key == AliasMetaPrefix { + return nil, nil, errors.New("mock") + } + return []string{"key"}, []string{string(value)}, nil + } + + kc := Catalog{Snapshot: snapshot} + + _, err = kc.ListAliases(ctx, 0) + assert.Error(t, err) + }) + + t.Run("normal case", func(t *testing.T) { + ctx := context.Background() + + coll := &pb.CollectionInfo{Schema: &schemapb.CollectionSchema{Name: "alias1"}, ID: 100, ShardsNum: 50} + value, err := proto.Marshal(coll) + assert.NoError(t, err) + + alias := &pb.AliasInfo{CollectionId: 101, AliasName: "alias2"} + value2, err := proto.Marshal(alias) + assert.NoError(t, err) + + snapshot := kv.NewMockSnapshotKV() + snapshot.LoadWithPrefixFunc = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { + if key == AliasMetaPrefix { + return []string{"key1"}, []string{string(value2)}, nil + } + return []string{"key"}, []string{string(value)}, nil + } + + kc := Catalog{Snapshot: snapshot} + + got, err := kc.ListAliases(ctx, 0) + assert.NoError(t, err) + assert.Equal(t, 2, len(got)) + assert.Equal(t, "alias1", got[0].Name) + assert.Equal(t, "alias2", got[1].Name) + }) +} + +func Test_buildKvs(t *testing.T) { + t.Run("length not equal", func(t *testing.T) { + keys := []string{"k1", "k2"} + values := []string{"v1"} + _, err := buildKvs(keys, values) + assert.Error(t, err) + }) + + t.Run("duplicate", func(t *testing.T) { + keys := []string{"k1", "k1"} + values := []string{"v1", "v2"} + _, err := buildKvs(keys, values) + assert.Error(t, err) + }) + + t.Run("normal case", func(t *testing.T) { + keys := []string{"k1", "k2"} + values := []string{"v1", "v2"} + kvs, err := buildKvs(keys, values) + assert.NoError(t, err) + for i, k := range keys { + v, ok := kvs[k] + assert.True(t, ok) + assert.Equal(t, values[i], v) + } + }) +} + +func Test_batchSave(t *testing.T) { + t.Run("normal case", func(t *testing.T) { + snapshot := kv.NewMockSnapshotKV() + snapshot.MultiSaveFunc = func(kvs map[string]string, ts typeutil.Timestamp) error { + return nil + } + kvs := map[string]string{ + "k1": "v1", + "k2": "v2", + "k3": "v3", + } + maxTxnNum := 2 + err := batchSave(snapshot, maxTxnNum, kvs, 100) + assert.NoError(t, err) + }) + + t.Run("multi save failed", func(t *testing.T) { + snapshot := kv.NewMockSnapshotKV() + snapshot.MultiSaveFunc = func(kvs map[string]string, ts typeutil.Timestamp) error { + return errors.New("mock") + } + kvs := map[string]string{ + "k1": "v1", + "k2": "v2", + "k3": "v3", + } + maxTxnNum := 2 + err := batchSave(snapshot, maxTxnNum, kvs, 100) + assert.Error(t, err) + }) +} diff --git a/internal/metastore/kv/rootcorod_constant.go b/internal/metastore/kv/rootcorod_constant.go index 76bf1b1d14..aaf6e61eb1 100644 --- a/internal/metastore/kv/rootcorod_constant.go +++ b/internal/metastore/kv/rootcorod_constant.go @@ -7,6 +7,10 @@ const ( // CollectionMetaPrefix prefix for collection meta CollectionMetaPrefix = ComponentPrefix + "/collection" + PartitionMetaPrefix = ComponentPrefix + "/partitions" + AliasMetaPrefix = ComponentPrefix + "/aliases" + FieldMetaPrefix = ComponentPrefix + "/fields" + // SegmentIndexMetaPrefix prefix for segment index meta SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index" diff --git a/internal/metastore/model/alias.go b/internal/metastore/model/alias.go new file mode 100644 index 0000000000..a1bc8cc991 --- /dev/null +++ b/internal/metastore/model/alias.go @@ -0,0 +1,25 @@ +package model + +import pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + +type Alias struct { + Name string + CollectionID int64 + CreatedTime uint64 +} + +func MarshalAliasModel(alias *Alias) *pb.AliasInfo { + return &pb.AliasInfo{ + AliasName: alias.Name, + CollectionId: alias.CollectionID, + CreatedTime: alias.CreatedTime, + } +} + +func UnmarshalAliasModel(info *pb.AliasInfo) *Alias { + return &Alias{ + Name: info.GetAliasName(), + CollectionID: info.GetCollectionId(), + CreatedTime: info.GetCreatedTime(), + } +} diff --git a/internal/metastore/model/collection.go b/internal/metastore/model/collection.go index 152ee154c1..4b32af5055 100644 --- a/internal/metastore/model/collection.go +++ b/internal/metastore/model/collection.go @@ -22,7 +22,7 @@ type Collection struct { StartPositions []*commonpb.KeyDataPair CreateTime uint64 ConsistencyLevel commonpb.ConsistencyLevel - Aliases []string + Aliases []string // TODO: deprecate this. Extra map[string]string // extra kvs } @@ -53,24 +53,12 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection { } // backward compatible for deprecated fields - var partitions []*Partition - if len(coll.Partitions) != 0 { - partitions = make([]*Partition, len(coll.Partitions)) - for idx, partition := range coll.Partitions { - partitions[idx] = &Partition{ - PartitionID: partition.GetPartitionID(), - PartitionName: partition.GetPartitionName(), - PartitionCreatedTimestamp: partition.GetPartitionCreatedTimestamp(), - } - } - } else { - partitions = make([]*Partition, len(coll.PartitionIDs)) - for idx := range coll.PartitionIDs { - partitions[idx] = &Partition{ - PartitionID: coll.PartitionIDs[idx], - PartitionName: coll.PartitionNames[idx], - PartitionCreatedTimestamp: coll.PartitionCreatedTimestamps[idx], - } + partitions := make([]*Partition, len(coll.PartitionIDs)) + for idx := range coll.PartitionIDs { + partitions[idx] = &Partition{ + PartitionID: coll.PartitionIDs[idx], + PartitionName: coll.PartitionNames[idx], + PartitionCreatedTimestamp: coll.PartitionCreatedTimestamps[idx], } } @@ -87,7 +75,7 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection { Name: coll.Schema.Name, Description: coll.Schema.Description, AutoID: coll.Schema.AutoID, - Fields: UnmarshalFieldModels(coll.Schema.Fields), + Fields: UnmarshalFieldModels(coll.GetSchema().GetFields()), Partitions: partitions, FieldIDToIndexID: filedIDToIndexIDs, VirtualChannelNames: coll.VirtualChannelNames, @@ -99,29 +87,17 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection { } } +// MarshalCollectionModel marshal only collection-related information. +// partitions, aliases and fields won't be marshaled. They should be written to newly path. func MarshalCollectionModel(coll *Collection) *pb.CollectionInfo { if coll == nil { return nil } - fields := make([]*schemapb.FieldSchema, len(coll.Fields)) - for idx, field := range coll.Fields { - fields[idx] = &schemapb.FieldSchema{ - FieldID: field.FieldID, - Name: field.Name, - IsPrimaryKey: field.IsPrimaryKey, - Description: field.Description, - DataType: field.DataType, - TypeParams: field.TypeParams, - IndexParams: field.IndexParams, - AutoID: field.AutoID, - } - } collSchema := &schemapb.CollectionSchema{ Name: coll.Name, Description: coll.Description, AutoID: coll.AutoID, - Fields: fields, } partitions := make([]*pb.PartitionInfo, len(coll.Partitions)) @@ -140,10 +116,10 @@ func MarshalCollectionModel(coll *Collection) *pb.CollectionInfo { IndexID: tuple.Value, } } + return &pb.CollectionInfo{ ID: coll.CollectionID, Schema: collSchema, - Partitions: partitions, FieldIndexes: fieldIndexes, CreateTime: coll.CreateTime, VirtualChannelNames: coll.VirtualChannelNames, diff --git a/internal/metastore/model/collection_test.go b/internal/metastore/model/collection_test.go index 9e5a30109c..5e7971cb11 100644 --- a/internal/metastore/model/collection_test.go +++ b/internal/metastore/model/collection_test.go @@ -87,35 +87,6 @@ var ( StartPositions: startPositions, ConsistencyLevel: commonpb.ConsistencyLevel_Strong, } - - newColPb = &pb.CollectionInfo{ - ID: colID, - Schema: &schemapb.CollectionSchema{ - Name: colName, - Description: "none", - AutoID: false, - Fields: []*schemapb.FieldSchema{filedSchemaPb}, - }, - CreateTime: 1, - Partitions: []*pb.PartitionInfo{ - { - PartitionID: partID, - PartitionName: partName, - PartitionCreatedTimestamp: 1, - }, - }, - FieldIndexes: []*pb.FieldIndexInfo{ - { - FiledID: fieldID, - IndexID: indexID, - }, - }, - VirtualChannelNames: []string{"vch"}, - PhysicalChannelNames: []string{"pch"}, - ShardsNum: 1, - StartPositions: startPositions, - ConsistencyLevel: commonpb.ConsistencyLevel_Strong, - } ) func TestUnmarshalCollectionModel(t *testing.T) { @@ -123,16 +94,9 @@ func TestUnmarshalCollectionModel(t *testing.T) { ret.TenantID = tenantID assert.Equal(t, ret, colModel) - ret = UnmarshalCollectionModel(newColPb) - ret.TenantID = tenantID - assert.Equal(t, ret, colModel) - assert.Nil(t, UnmarshalCollectionModel(nil)) } func TestMarshalCollectionModel(t *testing.T) { - ret := MarshalCollectionModel(colModel) - assert.Equal(t, ret, newColPb) - assert.Nil(t, MarshalCollectionModel(nil)) } diff --git a/internal/metastore/model/partition.go b/internal/metastore/model/partition.go index facfefee0c..81fa863d92 100644 --- a/internal/metastore/model/partition.go +++ b/internal/metastore/model/partition.go @@ -1,8 +1,29 @@ package model +import pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + type Partition struct { PartitionID int64 PartitionName string PartitionCreatedTimestamp uint64 Extra map[string]string + CollectionID int64 +} + +func MarshalPartitionModel(partition *Partition) *pb.PartitionInfo { + return &pb.PartitionInfo{ + PartitionID: partition.PartitionID, + PartitionName: partition.PartitionName, + PartitionCreatedTimestamp: partition.PartitionCreatedTimestamp, + CollectionId: partition.CollectionID, + } +} + +func UnmarshalPartitionModel(info *pb.PartitionInfo) *Partition { + return &Partition{ + PartitionID: info.GetPartitionID(), + PartitionName: info.GetPartitionName(), + PartitionCreatedTimestamp: info.GetPartitionCreatedTimestamp(), + CollectionID: info.GetCollectionId(), + } } diff --git a/internal/proto/etcd_meta.proto b/internal/proto/etcd_meta.proto index 67b31932ea..7ee223c547 100644 --- a/internal/proto/etcd_meta.proto +++ b/internal/proto/etcd_meta.proto @@ -36,13 +36,19 @@ message CollectionInfo { int32 shards_num = 10; repeated common.KeyDataPair start_positions = 11; common.ConsistencyLevel consistency_level = 12; - repeated PartitionInfo partitions = 13; } message PartitionInfo { int64 partitionID = 1; string partitionName = 2; uint64 partition_created_timestamp = 3; + int64 collection_id = 4; +} + +message AliasInfo { + string alias_name = 1; + int64 collection_id = 2; + uint64 created_time = 3; } message SegmentIndexInfo { diff --git a/internal/proto/etcdpb/etcd_meta.pb.go b/internal/proto/etcdpb/etcd_meta.pb.go index bf337827b6..a5a05d2d60 100644 --- a/internal/proto/etcdpb/etcd_meta.pb.go +++ b/internal/proto/etcdpb/etcd_meta.pb.go @@ -156,7 +156,6 @@ type CollectionInfo struct { ShardsNum int32 `protobuf:"varint,10,opt,name=shards_num,json=shardsNum,proto3" json:"shards_num,omitempty"` StartPositions []*commonpb.KeyDataPair `protobuf:"bytes,11,rep,name=start_positions,json=startPositions,proto3" json:"start_positions,omitempty"` ConsistencyLevel commonpb.ConsistencyLevel `protobuf:"varint,12,opt,name=consistency_level,json=consistencyLevel,proto3,enum=milvus.proto.common.ConsistencyLevel" json:"consistency_level,omitempty"` - Partitions []*PartitionInfo `protobuf:"bytes,13,rep,name=partitions,proto3" json:"partitions,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -271,17 +270,11 @@ func (m *CollectionInfo) GetConsistencyLevel() commonpb.ConsistencyLevel { return commonpb.ConsistencyLevel_Strong } -func (m *CollectionInfo) GetPartitions() []*PartitionInfo { - if m != nil { - return m.Partitions - } - return nil -} - type PartitionInfo struct { PartitionID int64 `protobuf:"varint,1,opt,name=partitionID,proto3" json:"partitionID,omitempty"` PartitionName string `protobuf:"bytes,2,opt,name=partitionName,proto3" json:"partitionName,omitempty"` PartitionCreatedTimestamp uint64 `protobuf:"varint,3,opt,name=partition_created_timestamp,json=partitionCreatedTimestamp,proto3" json:"partition_created_timestamp,omitempty"` + CollectionId int64 `protobuf:"varint,4,opt,name=collection_id,json=collectionId,proto3" json:"collection_id,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -333,6 +326,68 @@ func (m *PartitionInfo) GetPartitionCreatedTimestamp() uint64 { return 0 } +func (m *PartitionInfo) GetCollectionId() int64 { + if m != nil { + return m.CollectionId + } + return 0 +} + +type AliasInfo struct { + AliasName string `protobuf:"bytes,1,opt,name=alias_name,json=aliasName,proto3" json:"alias_name,omitempty"` + CollectionId int64 `protobuf:"varint,2,opt,name=collection_id,json=collectionId,proto3" json:"collection_id,omitempty"` + CreatedTime uint64 `protobuf:"varint,3,opt,name=created_time,json=createdTime,proto3" json:"created_time,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *AliasInfo) Reset() { *m = AliasInfo{} } +func (m *AliasInfo) String() string { return proto.CompactTextString(m) } +func (*AliasInfo) ProtoMessage() {} +func (*AliasInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_975d306d62b73e88, []int{4} +} + +func (m *AliasInfo) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_AliasInfo.Unmarshal(m, b) +} +func (m *AliasInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_AliasInfo.Marshal(b, m, deterministic) +} +func (m *AliasInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_AliasInfo.Merge(m, src) +} +func (m *AliasInfo) XXX_Size() int { + return xxx_messageInfo_AliasInfo.Size(m) +} +func (m *AliasInfo) XXX_DiscardUnknown() { + xxx_messageInfo_AliasInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_AliasInfo proto.InternalMessageInfo + +func (m *AliasInfo) GetAliasName() string { + if m != nil { + return m.AliasName + } + return "" +} + +func (m *AliasInfo) GetCollectionId() int64 { + if m != nil { + return m.CollectionId + } + return 0 +} + +func (m *AliasInfo) GetCreatedTime() uint64 { + if m != nil { + return m.CreatedTime + } + return 0 +} + type SegmentIndexInfo struct { CollectionID int64 `protobuf:"varint,1,opt,name=collectionID,proto3" json:"collectionID,omitempty"` PartitionID int64 `protobuf:"varint,2,opt,name=partitionID,proto3" json:"partitionID,omitempty"` @@ -351,7 +406,7 @@ func (m *SegmentIndexInfo) Reset() { *m = SegmentIndexInfo{} } func (m *SegmentIndexInfo) String() string { return proto.CompactTextString(m) } func (*SegmentIndexInfo) ProtoMessage() {} func (*SegmentIndexInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_975d306d62b73e88, []int{4} + return fileDescriptor_975d306d62b73e88, []int{5} } func (m *SegmentIndexInfo) XXX_Unmarshal(b []byte) error { @@ -445,7 +500,7 @@ func (m *CollectionMeta) Reset() { *m = CollectionMeta{} } func (m *CollectionMeta) String() string { return proto.CompactTextString(m) } func (*CollectionMeta) ProtoMessage() {} func (*CollectionMeta) Descriptor() ([]byte, []int) { - return fileDescriptor_975d306d62b73e88, []int{5} + return fileDescriptor_975d306d62b73e88, []int{6} } func (m *CollectionMeta) XXX_Unmarshal(b []byte) error { @@ -525,7 +580,7 @@ func (m *CredentialInfo) Reset() { *m = CredentialInfo{} } func (m *CredentialInfo) String() string { return proto.CompactTextString(m) } func (*CredentialInfo) ProtoMessage() {} func (*CredentialInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_975d306d62b73e88, []int{6} + return fileDescriptor_975d306d62b73e88, []int{7} } func (m *CredentialInfo) XXX_Unmarshal(b []byte) error { @@ -586,6 +641,7 @@ func init() { proto.RegisterType((*FieldIndexInfo)(nil), "milvus.proto.etcd.FieldIndexInfo") proto.RegisterType((*CollectionInfo)(nil), "milvus.proto.etcd.CollectionInfo") proto.RegisterType((*PartitionInfo)(nil), "milvus.proto.etcd.PartitionInfo") + proto.RegisterType((*AliasInfo)(nil), "milvus.proto.etcd.AliasInfo") proto.RegisterType((*SegmentIndexInfo)(nil), "milvus.proto.etcd.SegmentIndexInfo") proto.RegisterType((*CollectionMeta)(nil), "milvus.proto.etcd.CollectionMeta") proto.RegisterType((*CredentialInfo)(nil), "milvus.proto.etcd.CredentialInfo") @@ -594,58 +650,59 @@ func init() { func init() { proto.RegisterFile("etcd_meta.proto", fileDescriptor_975d306d62b73e88) } var fileDescriptor_975d306d62b73e88 = []byte{ - // 837 bytes of a gzipped FileDescriptorProto + // 861 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x55, 0xdd, 0x6e, 0xe3, 0x44, - 0x14, 0x96, 0xe3, 0xe6, 0xc7, 0x27, 0x69, 0xba, 0x1d, 0x60, 0x35, 0x5b, 0x16, 0xf0, 0x46, 0x2c, - 0xf8, 0x66, 0x5b, 0xd1, 0x05, 0xee, 0x40, 0x2b, 0x6a, 0xad, 0x14, 0x01, 0xab, 0x68, 0x5a, 0x71, - 0xc1, 0x8d, 0x35, 0xb1, 0x4f, 0x9b, 0x91, 0xec, 0xb1, 0xe5, 0x19, 0x17, 0xf2, 0x06, 0xbc, 0x01, - 0x8f, 0xc2, 0x2d, 0x37, 0x3c, 0x0d, 0x2f, 0x81, 0x3c, 0xfe, 0x89, 0x9d, 0xb4, 0x5c, 0xee, 0x5d, - 0xce, 0x77, 0xe6, 0x9c, 0xcc, 0x77, 0xce, 0x37, 0x9f, 0xe1, 0x04, 0x75, 0x18, 0x05, 0x09, 0x6a, - 0x7e, 0x9e, 0xe5, 0xa9, 0x4e, 0xc9, 0x69, 0x22, 0xe2, 0xfb, 0x42, 0x55, 0xd1, 0x79, 0x99, 0x3d, - 0x9b, 0x85, 0x69, 0x92, 0xa4, 0xb2, 0x82, 0xce, 0x66, 0x2a, 0xdc, 0x60, 0x52, 0x1f, 0x5f, 0xfc, - 0x63, 0x81, 0xb3, 0x94, 0x11, 0xfe, 0xbe, 0x94, 0xb7, 0x29, 0xf9, 0x04, 0x40, 0x94, 0x41, 0x20, - 0x79, 0x82, 0xd4, 0x72, 0x2d, 0xcf, 0x61, 0x8e, 0x41, 0xde, 0xf1, 0x04, 0x09, 0x85, 0xb1, 0x09, - 0x96, 0x3e, 0x1d, 0xb8, 0x96, 0x67, 0xb3, 0x26, 0x24, 0x3e, 0xcc, 0xaa, 0xc2, 0x8c, 0xe7, 0x3c, - 0x51, 0xd4, 0x76, 0x6d, 0x6f, 0x7a, 0xf9, 0xe2, 0xbc, 0x77, 0x99, 0xfa, 0x1a, 0x3f, 0xe2, 0xf6, - 0x17, 0x1e, 0x17, 0xb8, 0xe2, 0x22, 0x67, 0x53, 0x53, 0xb6, 0x32, 0x55, 0x65, 0xff, 0x08, 0x63, - 0xd4, 0x18, 0xd1, 0x23, 0xd7, 0xf2, 0x26, 0xac, 0x09, 0xc9, 0x67, 0x30, 0x0d, 0x73, 0xe4, 0x1a, - 0x03, 0x2d, 0x12, 0xa4, 0x43, 0xd7, 0xf2, 0x8e, 0x18, 0x54, 0xd0, 0x8d, 0x48, 0x70, 0xe1, 0xc3, - 0xfc, 0xad, 0xc0, 0x38, 0xda, 0x71, 0xa1, 0x30, 0xbe, 0x15, 0x31, 0x46, 0x4b, 0xdf, 0x10, 0xb1, - 0x59, 0x13, 0x3e, 0x4e, 0x63, 0xf1, 0xf7, 0x10, 0xe6, 0x57, 0x69, 0x1c, 0x63, 0xa8, 0x45, 0x2a, - 0x4d, 0x9b, 0x39, 0x0c, 0xda, 0x0e, 0x83, 0xa5, 0x4f, 0xbe, 0x83, 0x51, 0x35, 0x40, 0x53, 0x3b, - 0xbd, 0x7c, 0xd9, 0xe7, 0x58, 0x0f, 0x77, 0xd7, 0xe4, 0xda, 0x00, 0xac, 0x2e, 0xda, 0x27, 0x62, - 0xef, 0x13, 0x21, 0x0b, 0x98, 0x65, 0x3c, 0xd7, 0xc2, 0x5c, 0xc0, 0x57, 0xf4, 0xc8, 0xb5, 0x3d, - 0x9b, 0xf5, 0x30, 0xf2, 0x05, 0xcc, 0xdb, 0xb8, 0x5c, 0x8c, 0xa2, 0x43, 0xd7, 0xf6, 0x1c, 0xb6, - 0x87, 0x92, 0xb7, 0x70, 0x7c, 0x5b, 0x0e, 0x25, 0x30, 0xfc, 0x50, 0xd1, 0xd1, 0x43, 0x6b, 0x29, - 0x35, 0x72, 0xde, 0x1f, 0x1e, 0x9b, 0xdd, 0xb6, 0x31, 0x2a, 0x72, 0x09, 0x1f, 0xdd, 0x8b, 0x5c, - 0x17, 0x3c, 0x0e, 0xc2, 0x0d, 0x97, 0x12, 0x63, 0x23, 0x10, 0x45, 0xc7, 0xe6, 0x6f, 0x3f, 0xa8, - 0x93, 0x57, 0x55, 0xae, 0xfa, 0xef, 0xaf, 0xe1, 0x69, 0xb6, 0xd9, 0x2a, 0x11, 0x1e, 0x14, 0x4d, - 0x4c, 0xd1, 0x87, 0x4d, 0xb6, 0x57, 0xf5, 0x06, 0x9e, 0xb7, 0x1c, 0x82, 0x6a, 0x2a, 0x91, 0x99, - 0x94, 0xd2, 0x3c, 0xc9, 0x14, 0x75, 0x5c, 0xdb, 0x3b, 0x62, 0x67, 0xed, 0x99, 0xab, 0xea, 0xc8, - 0x4d, 0x7b, 0xa2, 0x94, 0xb0, 0xda, 0xf0, 0x3c, 0x52, 0x81, 0x2c, 0x12, 0x0a, 0xae, 0xe5, 0x0d, - 0x99, 0x53, 0x21, 0xef, 0x8a, 0x84, 0x2c, 0xe1, 0x44, 0x69, 0x9e, 0xeb, 0x20, 0x4b, 0x95, 0xe9, - 0xa0, 0xe8, 0xd4, 0x0c, 0xc5, 0x7d, 0x4c, 0xab, 0x3e, 0xd7, 0xdc, 0x48, 0x75, 0x6e, 0x0a, 0x57, - 0x4d, 0x1d, 0x61, 0x70, 0x1a, 0xa6, 0x52, 0x09, 0xa5, 0x51, 0x86, 0xdb, 0x20, 0xc6, 0x7b, 0x8c, - 0xe9, 0xcc, 0xb5, 0xbc, 0xf9, 0xbe, 0x28, 0xea, 0x66, 0x57, 0xbb, 0xd3, 0x3f, 0x95, 0x87, 0xd9, - 0x93, 0x70, 0x0f, 0x21, 0x6f, 0x00, 0x5a, 0x6e, 0x8a, 0x1e, 0x3f, 0x74, 0x33, 0xb3, 0xae, 0x55, - 0x2b, 0x87, 0x72, 0x5b, 0x9d, 0x9a, 0xc5, 0x9f, 0x16, 0x1c, 0xf7, 0xb2, 0xc4, 0x85, 0x69, 0x47, - 0x3d, 0xb5, 0x94, 0xbb, 0x10, 0xf9, 0x1c, 0x8e, 0x7b, 0xca, 0x31, 0xd2, 0x76, 0x58, 0x1f, 0x24, - 0xdf, 0xc3, 0xc7, 0xff, 0xb3, 0x9b, 0x5a, 0xca, 0xcf, 0x1e, 0x5d, 0xcd, 0xe2, 0x8f, 0x01, 0x3c, - 0xb9, 0xc6, 0xbb, 0x04, 0xa5, 0xde, 0xbd, 0xd2, 0x05, 0xcc, 0xc2, 0xdd, 0x83, 0x6b, 0x6e, 0xd7, - 0xc3, 0xf6, 0x09, 0x0c, 0x0e, 0x09, 0x3c, 0x07, 0x47, 0xd5, 0x9d, 0x7d, 0x73, 0x11, 0x9b, 0xed, - 0x80, 0xca, 0x09, 0x4a, 0x39, 0xfb, 0xc6, 0x56, 0x8c, 0x13, 0x98, 0xb0, 0xeb, 0x04, 0xc3, 0xbe, - 0xa1, 0x51, 0x18, 0xaf, 0x0b, 0x61, 0x6a, 0x46, 0x55, 0xa6, 0x0e, 0xc9, 0x0b, 0x98, 0xa1, 0xe4, - 0xeb, 0x18, 0xab, 0x57, 0x45, 0xc7, 0xc6, 0xa9, 0xa6, 0x15, 0x66, 0x88, 0xed, 0x3f, 0xf2, 0xc9, - 0x81, 0x5b, 0xfd, 0x6b, 0x75, 0x7d, 0xe6, 0x67, 0xd4, 0xfc, 0xbd, 0xfb, 0xcc, 0xa7, 0x00, 0xed, - 0x84, 0x1a, 0x97, 0xe9, 0x20, 0xe4, 0x65, 0xc7, 0x63, 0x02, 0xcd, 0xef, 0x1a, 0x8f, 0xd9, 0x89, - 0xe2, 0x86, 0xdf, 0xa9, 0x03, 0xbb, 0x1a, 0x1d, 0xda, 0xd5, 0xe2, 0xaf, 0x92, 0x6d, 0x8e, 0x11, - 0x4a, 0x2d, 0x78, 0x6c, 0xd6, 0x7e, 0x06, 0x93, 0x42, 0x61, 0xde, 0xf9, 0xcc, 0xb4, 0x31, 0x79, - 0x05, 0x04, 0x65, 0x98, 0x6f, 0xb3, 0x52, 0x5f, 0x19, 0x57, 0xea, 0xb7, 0x34, 0x8f, 0x6a, 0x49, - 0x9e, 0xb6, 0x99, 0x55, 0x9d, 0x20, 0x4f, 0x61, 0xa4, 0x51, 0x72, 0xa9, 0x0d, 0x49, 0x87, 0xd5, - 0x11, 0x79, 0x06, 0x13, 0xa1, 0x02, 0x55, 0x64, 0x98, 0x37, 0x5f, 0x13, 0xa1, 0xae, 0xcb, 0x90, - 0x7c, 0x09, 0x27, 0x6a, 0xc3, 0x2f, 0xbf, 0xf9, 0x76, 0xd7, 0x7e, 0x68, 0x6a, 0xe7, 0x15, 0xdc, - 0xf4, 0xfe, 0xe1, 0xf5, 0xaf, 0x5f, 0xdd, 0x09, 0xbd, 0x29, 0xd6, 0xe5, 0x13, 0xbe, 0xa8, 0x16, - 0xf0, 0x4a, 0xa4, 0xf5, 0xaf, 0x0b, 0x21, 0x75, 0x79, 0xe7, 0xf8, 0xc2, 0xec, 0xe4, 0xa2, 0x7c, - 0x99, 0xd9, 0x7a, 0x3d, 0x32, 0xd1, 0xeb, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x26, 0x75, 0x94, - 0x72, 0x9b, 0x07, 0x00, 0x00, + 0x14, 0x96, 0xe3, 0xfc, 0xf9, 0x24, 0x4d, 0xb7, 0x03, 0xac, 0xbc, 0x65, 0x01, 0x6f, 0x60, 0xc1, + 0x37, 0xdb, 0x8a, 0x2e, 0x70, 0x07, 0x02, 0x6a, 0xad, 0x14, 0x01, 0x55, 0x34, 0xad, 0xb8, 0xe0, + 0xc6, 0x9a, 0xd8, 0xa7, 0xcd, 0x48, 0xfe, 0x93, 0x67, 0x5c, 0xe8, 0x1b, 0xf0, 0x46, 0xdc, 0x70, + 0xcb, 0xd3, 0xf0, 0x0e, 0x08, 0xcd, 0x78, 0xec, 0xd8, 0x09, 0xe5, 0x72, 0xef, 0xf2, 0x7d, 0x33, + 0xe7, 0xf8, 0xfc, 0x7c, 0xf3, 0x05, 0x8e, 0x51, 0x46, 0x71, 0x98, 0xa2, 0x64, 0x67, 0x45, 0x99, + 0xcb, 0x9c, 0x9c, 0xa4, 0x3c, 0xb9, 0xaf, 0x44, 0x8d, 0xce, 0xd4, 0xe9, 0xe9, 0x3c, 0xca, 0xd3, + 0x34, 0xcf, 0x6a, 0xea, 0x74, 0x2e, 0xa2, 0x2d, 0xa6, 0xe6, 0xfa, 0xf2, 0x2f, 0x0b, 0x9c, 0x55, + 0x16, 0xe3, 0x6f, 0xab, 0xec, 0x36, 0x27, 0x1f, 0x00, 0x70, 0x05, 0xc2, 0x8c, 0xa5, 0xe8, 0x5a, + 0x9e, 0xe5, 0x3b, 0xd4, 0xd1, 0xcc, 0x15, 0x4b, 0x91, 0xb8, 0x30, 0xd1, 0x60, 0x15, 0xb8, 0x03, + 0xcf, 0xf2, 0x6d, 0xda, 0x40, 0x12, 0xc0, 0xbc, 0x0e, 0x2c, 0x58, 0xc9, 0x52, 0xe1, 0xda, 0x9e, + 0xed, 0xcf, 0x2e, 0x5e, 0x9c, 0xf5, 0x8a, 0x31, 0x65, 0xfc, 0x80, 0x0f, 0x3f, 0xb3, 0xa4, 0xc2, + 0x35, 0xe3, 0x25, 0x9d, 0xe9, 0xb0, 0xb5, 0x8e, 0x52, 0xf9, 0x63, 0x4c, 0x50, 0x62, 0xec, 0x0e, + 0x3d, 0xcb, 0x9f, 0xd2, 0x06, 0x92, 0x8f, 0x60, 0x16, 0x95, 0xc8, 0x24, 0x86, 0x92, 0xa7, 0xe8, + 0x8e, 0x3c, 0xcb, 0x1f, 0x52, 0xa8, 0xa9, 0x1b, 0x9e, 0xe2, 0x32, 0x80, 0xc5, 0x1b, 0x8e, 0x49, + 0xbc, 0xeb, 0xc5, 0x85, 0xc9, 0x2d, 0x4f, 0x30, 0x5e, 0x05, 0xba, 0x11, 0x9b, 0x36, 0xf0, 0xf1, + 0x36, 0x96, 0xff, 0x0c, 0x61, 0x71, 0x99, 0x27, 0x09, 0x46, 0x92, 0xe7, 0x99, 0x4e, 0xb3, 0x80, + 0x41, 0x9b, 0x61, 0xb0, 0x0a, 0xc8, 0xd7, 0x30, 0xae, 0x07, 0xa8, 0x63, 0x67, 0x17, 0x2f, 0xfb, + 0x3d, 0x9a, 0xe1, 0xee, 0x92, 0x5c, 0x6b, 0x82, 0x9a, 0xa0, 0xfd, 0x46, 0xec, 0xfd, 0x46, 0xc8, + 0x12, 0xe6, 0x05, 0x2b, 0x25, 0xd7, 0x05, 0x04, 0xc2, 0x1d, 0x7a, 0xb6, 0x6f, 0xd3, 0x1e, 0x47, + 0x3e, 0x85, 0x45, 0x8b, 0xd5, 0x62, 0x84, 0x3b, 0xf2, 0x6c, 0xdf, 0xa1, 0x7b, 0x2c, 0x79, 0x03, + 0x47, 0xb7, 0x6a, 0x28, 0xa1, 0xee, 0x0f, 0x85, 0x3b, 0xfe, 0xaf, 0xb5, 0x28, 0x8d, 0x9c, 0xf5, + 0x87, 0x47, 0xe7, 0xb7, 0x2d, 0x46, 0x41, 0x2e, 0xe0, 0xbd, 0x7b, 0x5e, 0xca, 0x8a, 0x25, 0x61, + 0xb4, 0x65, 0x59, 0x86, 0x89, 0x16, 0x88, 0x70, 0x27, 0xfa, 0xb3, 0xef, 0x98, 0xc3, 0xcb, 0xfa, + 0xac, 0xfe, 0xf6, 0x17, 0xf0, 0xb4, 0xd8, 0x3e, 0x08, 0x1e, 0x1d, 0x04, 0x4d, 0x75, 0xd0, 0xbb, + 0xcd, 0x69, 0x2f, 0xea, 0x5b, 0x78, 0xde, 0xf6, 0x10, 0xd6, 0x53, 0x89, 0xf5, 0xa4, 0x84, 0x64, + 0x69, 0x21, 0x5c, 0xc7, 0xb3, 0xfd, 0x21, 0x3d, 0x6d, 0xef, 0x5c, 0xd6, 0x57, 0x6e, 0xda, 0x1b, + 0x4a, 0xc2, 0x62, 0xcb, 0xca, 0x58, 0x84, 0x59, 0x95, 0xba, 0xe0, 0x59, 0xfe, 0x88, 0x3a, 0x35, + 0x73, 0x55, 0xa5, 0x64, 0x05, 0xc7, 0x42, 0xb2, 0x52, 0x86, 0x45, 0x2e, 0x74, 0x06, 0xe1, 0xce, + 0xf4, 0x50, 0xbc, 0xc7, 0xb4, 0x1a, 0x30, 0xc9, 0xb4, 0x54, 0x17, 0x3a, 0x70, 0xdd, 0xc4, 0x11, + 0x0a, 0x27, 0x51, 0x9e, 0x09, 0x2e, 0x24, 0x66, 0xd1, 0x43, 0x98, 0xe0, 0x3d, 0x26, 0xee, 0xdc, + 0xb3, 0xfc, 0xc5, 0xbe, 0x28, 0x4c, 0xb2, 0xcb, 0xdd, 0xed, 0x1f, 0xd5, 0x65, 0xfa, 0x24, 0xda, + 0x63, 0x96, 0x7f, 0x5a, 0x70, 0xb4, 0x6e, 0x57, 0xad, 0xf4, 0xe7, 0xc1, 0xac, 0xb3, 0x7b, 0x23, + 0xc4, 0x2e, 0x45, 0x3e, 0x81, 0xa3, 0xde, 0xde, 0xb5, 0x30, 0x1d, 0xda, 0x27, 0xc9, 0x37, 0xf0, + 0xfe, 0xff, 0x4c, 0xd6, 0x08, 0xf1, 0xd9, 0xa3, 0x83, 0x25, 0x1f, 0xc3, 0x51, 0xd4, 0x8a, 0x3a, + 0xe4, 0xf5, 0x0b, 0xb5, 0xe9, 0x7c, 0x47, 0xae, 0xe2, 0x65, 0x09, 0xce, 0x77, 0x09, 0x67, 0xa2, + 0x31, 0x13, 0xa6, 0x40, 0xcf, 0x4c, 0x34, 0xa3, 0x0b, 0x3a, 0x48, 0x38, 0x38, 0x4c, 0x48, 0x5e, + 0xc0, 0xbc, 0x5b, 0xab, 0x29, 0xd3, 0x3c, 0x21, 0x5d, 0xdd, 0xf2, 0xf7, 0x01, 0x3c, 0xb9, 0xc6, + 0xbb, 0x14, 0x33, 0xb9, 0x7b, 0xfc, 0x4b, 0xe8, 0xe6, 0x69, 0xc6, 0xd6, 0xe3, 0xf6, 0x27, 0x3b, + 0x38, 0x9c, 0xec, 0x73, 0x70, 0x84, 0xc9, 0x1c, 0xe8, 0x4f, 0xdb, 0x74, 0x47, 0xd4, 0x06, 0xa3, + 0x5e, 0x49, 0x60, 0x66, 0xd1, 0xc0, 0xae, 0xc1, 0x8c, 0xfa, 0x3e, 0xe9, 0xc2, 0x64, 0x53, 0x71, + 0x1d, 0x33, 0xae, 0x4f, 0x0c, 0x54, 0x9d, 0x62, 0xc6, 0x36, 0x09, 0xd6, 0x8f, 0xd5, 0x9d, 0x68, + 0x03, 0x9c, 0xd5, 0x9c, 0x6e, 0x6c, 0xdf, 0x3b, 0xa6, 0x07, 0x26, 0xf8, 0xb7, 0xd5, 0xb5, 0xaf, + 0x9f, 0x50, 0xb2, 0xb7, 0x6e, 0x5f, 0x1f, 0x02, 0xb4, 0x13, 0x6a, 0xcc, 0xab, 0xc3, 0x90, 0x97, + 0x1d, 0xeb, 0x0a, 0x25, 0xbb, 0x6b, 0xac, 0x6b, 0xa7, 0xd6, 0x1b, 0x76, 0x27, 0x0e, 0x5c, 0x70, + 0x7c, 0xe8, 0x82, 0xcb, 0x3f, 0x54, 0xb7, 0x25, 0xc6, 0x98, 0x49, 0xce, 0x12, 0xbd, 0xf6, 0x53, + 0x98, 0x56, 0x02, 0xcb, 0x8e, 0xe0, 0x5a, 0x4c, 0x5e, 0x01, 0xc1, 0x2c, 0x2a, 0x1f, 0x0a, 0x25, + 0xa6, 0x82, 0x09, 0xf1, 0x6b, 0x5e, 0xc6, 0xe6, 0xad, 0x9c, 0xb4, 0x27, 0x6b, 0x73, 0x40, 0x9e, + 0xc2, 0x58, 0x62, 0xc6, 0x32, 0xa9, 0x9b, 0x74, 0xa8, 0x41, 0xe4, 0x19, 0x4c, 0xb9, 0x08, 0x45, + 0x55, 0x60, 0xd9, 0xfc, 0x49, 0x71, 0x71, 0xad, 0x20, 0xf9, 0x0c, 0x8e, 0xc5, 0x96, 0x5d, 0x7c, + 0xf9, 0xd5, 0x2e, 0xfd, 0x48, 0xc7, 0x2e, 0x6a, 0xba, 0xc9, 0xfd, 0xfd, 0xeb, 0x5f, 0x3e, 0xbf, + 0xe3, 0x72, 0x5b, 0x6d, 0x94, 0x33, 0x9c, 0xd7, 0x0b, 0x78, 0xc5, 0x73, 0xf3, 0xeb, 0x9c, 0x67, + 0x52, 0xd5, 0x9c, 0x9c, 0xeb, 0x9d, 0x9c, 0x2b, 0x7f, 0x2e, 0x36, 0x9b, 0xb1, 0x46, 0xaf, 0xff, + 0x0d, 0x00, 0x00, 0xff, 0xff, 0xc5, 0x28, 0x7e, 0xad, 0xf2, 0x07, 0x00, 0x00, } diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index a7fec24a58..cde41a8cd9 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -111,7 +111,7 @@ func (mt *MetaTable) reloadFromCatalog() error { mt.segID2IndexID = make(map[typeutil.UniqueID]typeutil.UniqueID) mt.indexID2Meta = make(map[typeutil.UniqueID]*model.Index) - collAliases, err := mt.catalog.ListAliases(mt.ctx) + collAliases, err := mt.catalog.ListAliases(mt.ctx, 0) if err != nil { return err } @@ -389,14 +389,15 @@ func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string // no necessary to check created timestamp } - coll.Partitions = append(coll.Partitions, - &model.Partition{ - PartitionID: partitionID, - PartitionName: partitionName, - PartitionCreatedTimestamp: ts, - }) + partition := &model.Partition{ + PartitionID: partitionID, + PartitionName: partitionName, + PartitionCreatedTimestamp: ts, + CollectionID: collID, + } + coll.Partitions = append(coll.Partitions, partition) - if err := mt.catalog.CreatePartition(mt.ctx, &coll, ts); err != nil { + if err := mt.catalog.CreatePartition(mt.ctx, partition, ts); err != nil { return err } @@ -507,7 +508,7 @@ func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName str } col.Partitions = parts - if err := mt.catalog.DropPartition(mt.ctx, &col, partID, ts); err != nil { + if err := mt.catalog.DropPartition(mt.ctx, col.CollectionID, partID, ts); err != nil { return 0, err } @@ -1230,11 +1231,12 @@ func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string, ts return fmt.Errorf("aliased collection name does not exist, name = %s", collectionName) } - coll := &model.Collection{ + alias := &model.Alias{ CollectionID: id, - Aliases: []string{collectionAlias}, + Name: collectionAlias, + CreatedTime: ts, } - if err := mt.catalog.CreateAlias(mt.ctx, coll, ts); err != nil { + if err := mt.catalog.CreateAlias(mt.ctx, alias, ts); err != nil { return err } @@ -1246,12 +1248,13 @@ func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string, ts func (mt *MetaTable) DropAlias(collectionAlias string, ts typeutil.Timestamp) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() - collectionID, ok := mt.collAlias2ID[collectionAlias] + // TODO: drop alias should be idempotent. + _, ok := mt.collAlias2ID[collectionAlias] if !ok { return fmt.Errorf("alias does not exist, alias = %s", collectionAlias) } - if err := mt.catalog.DropAlias(mt.ctx, collectionID, collectionAlias, ts); err != nil { + if err := mt.catalog.DropAlias(mt.ctx, collectionAlias, ts); err != nil { return err } delete(mt.collAlias2ID, collectionAlias) @@ -1271,12 +1274,12 @@ func (mt *MetaTable) AlterAlias(collectionAlias string, collectionName string, t return fmt.Errorf("aliased collection name does not exist, name = %s", collectionName) } - coll := &model.Collection{ + alias := &model.Alias{ CollectionID: id, - Aliases: []string{collectionAlias}, + Name: collectionAlias, + CreatedTime: ts, } - - if err := mt.catalog.AlterAlias(mt.ctx, coll, ts); err != nil { + if err := mt.catalog.AlterAlias(mt.ctx, alias, ts); err != nil { return err } mt.collAlias2ID[collectionAlias] = id diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index d37a2bbd26..2230c82b16 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -632,7 +632,12 @@ func TestMetaTable(t *testing.T) { mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error { return fmt.Errorf("multi save error") } + tmpSaveFunc := mockKV.save + mockKV.save = func(key, value string, ts typeutil.Timestamp) error { + return errors.New("mock") + } assert.Error(t, mt.AddPartition(coll.CollectionID, "no-part", 22, ts, "")) + mockKV.save = tmpSaveFunc //err = mt.AddPartition(coll.CollectionID, "no-part", 22, ts, nil) //assert.NotNil(t, err) //assert.EqualError(t, err, "multi save error") @@ -2240,9 +2245,9 @@ func (mc *MockedCatalog) ListIndexes(ctx context.Context) ([]*model.Index, error return args.Get(0).([]*model.Index), nil } -func (mc *MockedCatalog) ListAliases(ctx context.Context) ([]*model.Collection, error) { +func (mc *MockedCatalog) ListAliases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) { args := mc.Called() - return args.Get(0).([]*model.Collection), nil + return args.Get(0).([]*model.Alias), nil } func (mc *MockedCatalog) AlterIndex(ctx context.Context, oldIndex *model.Index, newIndex *model.Index, alterType metastore.AlterType) error { @@ -2356,7 +2361,16 @@ func TestMetaTable_ReloadFromKV(t *testing.T) { alias2 := *collInfo alias2.Name = collInfo.Aliases[1] - mc.On("ListAliases").Return([]*model.Collection{&alias1, &alias2}, nil) + mc.On("ListAliases").Return([]*model.Alias{ + { + CollectionID: collInfo.CollectionID, + Name: collInfo.Aliases[0], + }, + { + CollectionID: collInfo.CollectionID, + Name: collInfo.Aliases[1], + }, + }, nil) mt := &MetaTable{} mt.catalog = mc