mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Support rename db name of the collection (#26543)
Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
parent
c603f1c244
commit
9f61dba3f9
@ -452,12 +452,18 @@ func (kc *Catalog) alterModifyCollection(oldColl *model.Collection, newColl *mod
|
||||
oldCollClone.CreateTime = newColl.CreateTime
|
||||
oldCollClone.ConsistencyLevel = newColl.ConsistencyLevel
|
||||
oldCollClone.State = newColl.State
|
||||
key := BuildCollectionKey(newColl.DBID, oldColl.CollectionID)
|
||||
|
||||
oldKey := BuildCollectionKey(oldColl.DBID, oldColl.CollectionID)
|
||||
newKey := BuildCollectionKey(newColl.DBID, oldColl.CollectionID)
|
||||
value, err := proto.Marshal(model.MarshalCollectionModel(oldCollClone))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return kc.Snapshot.Save(key, string(value), ts)
|
||||
saves := map[string]string{newKey: string(value)}
|
||||
if oldKey == newKey {
|
||||
return kc.Snapshot.Save(newKey, string(value), ts)
|
||||
}
|
||||
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(saves, []string{oldKey}, ts)
|
||||
}
|
||||
|
||||
func (kc *Catalog) AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, alterType metastore.AlterType, ts typeutil.Timestamp) error {
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/atomic"
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -1007,6 +1008,24 @@ func TestCatalog_AlterCollection(t *testing.T) {
|
||||
err := kc.AlterCollection(ctx, oldC, newC, metastore.MODIFY, 0)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("modify db name", func(t *testing.T) {
|
||||
var collectionID int64 = 1
|
||||
snapshot := kv.NewMockSnapshotKV()
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
assert.ElementsMatch(t, []string{BuildCollectionKey(0, collectionID)}, removals)
|
||||
assert.Equal(t, len(saves), 1)
|
||||
assert.Contains(t, maps.Keys(saves), BuildCollectionKey(1, collectionID))
|
||||
return nil
|
||||
}
|
||||
|
||||
kc := &Catalog{Snapshot: snapshot}
|
||||
ctx := context.Background()
|
||||
oldC := &model.Collection{DBID: 0, CollectionID: collectionID, State: pb.CollectionState_CollectionCreated}
|
||||
newC := &model.Collection{DBID: 1, CollectionID: collectionID, State: pb.CollectionState_CollectionCreated}
|
||||
err := kc.AlterCollection(ctx, oldC, newC, metastore.MODIFY, 0)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCatalog_AlterPartition(t *testing.T) {
|
||||
|
@ -42,9 +42,12 @@ func (t *listDatabaseTask) Execute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
dbNames := make([]string, 0, len(ret))
|
||||
createdTimes := make([]uint64, 0, len(ret))
|
||||
for _, db := range ret {
|
||||
dbNames = append(dbNames, db.Name)
|
||||
createdTimes = append(createdTimes, db.CreatedTime)
|
||||
}
|
||||
t.Resp.DbNames = dbNames
|
||||
t.Resp.CreatedTimestamp = createdTimes
|
||||
return nil
|
||||
}
|
||||
|
@ -65,7 +65,7 @@ type IMetaTable interface {
|
||||
DropAlias(ctx context.Context, dbName string, alias string, ts Timestamp) error
|
||||
AlterAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error
|
||||
AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, ts Timestamp) error
|
||||
RenameCollection(ctx context.Context, dbName string, oldName string, newName string, ts Timestamp) error
|
||||
RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts Timestamp) error
|
||||
|
||||
// TODO: it'll be a big cost if we handle the time travel logic, since we should always list all aliases in catalog.
|
||||
IsAlias(db, name string) bool
|
||||
@ -700,14 +700,15 @@ func (mt *MetaTable) AlterCollection(ctx context.Context, oldColl *model.Collect
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldName string, newName string, ts Timestamp) error {
|
||||
func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts Timestamp) error {
|
||||
mt.ddLock.Lock()
|
||||
defer mt.ddLock.Unlock()
|
||||
|
||||
ctx = contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.String("db", dbName),
|
||||
zap.String("oldDBName", dbName),
|
||||
zap.String("oldName", oldName),
|
||||
zap.String("newDBName", newDBName),
|
||||
zap.String("newName", newName),
|
||||
)
|
||||
|
||||
@ -717,17 +718,29 @@ func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldNam
|
||||
dbName = util.DefaultDBName
|
||||
}
|
||||
|
||||
if newDBName == "" {
|
||||
log.Warn("target db name is empty")
|
||||
newDBName = dbName
|
||||
}
|
||||
|
||||
// check target db
|
||||
targetDB, ok := mt.dbName2Meta[newDBName]
|
||||
if !ok {
|
||||
return fmt.Errorf("target database:%s not found", newDBName)
|
||||
}
|
||||
|
||||
//old collection should not be an alias
|
||||
_, ok := mt.aliases.get(dbName, oldName)
|
||||
_, ok = mt.aliases.get(dbName, oldName)
|
||||
if ok {
|
||||
log.Warn("unsupported use a alias to rename collection")
|
||||
return fmt.Errorf("unsupported use an alias to rename collection, alias:%s", oldName)
|
||||
}
|
||||
|
||||
// check new collection already exists
|
||||
newColl, err := mt.getCollectionByNameInternal(ctx, dbName, newName, ts)
|
||||
newColl, err := mt.getCollectionByNameInternal(ctx, newDBName, newName, ts)
|
||||
if newColl != nil {
|
||||
return fmt.Errorf("duplicated new collection name :%s with other collection name or alias", newName)
|
||||
log.Warn("check new collection fail")
|
||||
return fmt.Errorf("duplicated new collection name %s:%s with other collection name or alias", newDBName, newName)
|
||||
}
|
||||
if err != nil && !common.IsCollectionNotExistErrorV2(err) {
|
||||
log.Warn("check new collection name fail")
|
||||
@ -737,16 +750,24 @@ func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldNam
|
||||
// get old collection meta
|
||||
oldColl, err := mt.getCollectionByNameInternal(ctx, dbName, oldName, ts)
|
||||
if err != nil {
|
||||
log.Warn("get old collection fail")
|
||||
return err
|
||||
}
|
||||
|
||||
// unsupported rename collection while the collection has aliases
|
||||
aliases := mt.listAliasesByID(oldColl.CollectionID)
|
||||
if len(aliases) > 0 && oldColl.DBID != targetDB.ID {
|
||||
return fmt.Errorf("fail to rename db name, must drop all aliases of this collection before rename")
|
||||
}
|
||||
|
||||
newColl = oldColl.Clone()
|
||||
newColl.Name = newName
|
||||
newColl.DBID = targetDB.ID
|
||||
if err := mt.catalog.AlterCollection(ctx, oldColl, newColl, metastore.MODIFY, ts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mt.names.insert(dbName, newName, oldColl.CollectionID)
|
||||
mt.names.insert(newDBName, newName, oldColl.CollectionID)
|
||||
mt.names.remove(dbName, oldName)
|
||||
|
||||
mt.collID2Meta[oldColl.CollectionID] = newColl
|
||||
|
@ -1170,7 +1170,16 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
aliases: newNameDb(),
|
||||
}
|
||||
meta.names.insert("", "alias", 1)
|
||||
err := meta.RenameCollection(context.TODO(), "", "alias", "new", typeutil.MaxTimestamp)
|
||||
err := meta.RenameCollection(context.TODO(), "", "alias", "", "new", typeutil.MaxTimestamp)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("target db doesn't exist", func(t *testing.T) {
|
||||
meta := &MetaTable{
|
||||
names: newNameDb(),
|
||||
aliases: newNameDb(),
|
||||
}
|
||||
err := meta.RenameCollection(context.TODO(), "", "non-exists", "non-exists", "new", typeutil.MaxTimestamp)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1179,7 +1188,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
names: newNameDb(),
|
||||
aliases: newNameDb(),
|
||||
}
|
||||
err := meta.RenameCollection(context.TODO(), "", "non-exists", "new", typeutil.MaxTimestamp)
|
||||
err := meta.RenameCollection(context.TODO(), "", "non-exists", "", "new", typeutil.MaxTimestamp)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1189,16 +1198,19 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
aliases: newNameDb(),
|
||||
}
|
||||
meta.names.insert("", "old", 1)
|
||||
err := meta.RenameCollection(context.TODO(), "", "old", "new", typeutil.MaxTimestamp)
|
||||
err := meta.RenameCollection(context.TODO(), "", "old", "", "new", typeutil.MaxTimestamp)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("new collection name already exist-1", func(t *testing.T) {
|
||||
meta := &MetaTable{
|
||||
dbName2Meta: map[string]*model.Database{
|
||||
util.DefaultDBName: model.NewDefaultDatabase(),
|
||||
},
|
||||
names: newNameDb(),
|
||||
aliases: newNameDb(),
|
||||
collID2Meta: map[typeutil.UniqueID]*model.Collection{
|
||||
2: {
|
||||
util.DefaultDBID: {
|
||||
CollectionID: 1,
|
||||
Name: "old",
|
||||
State: pb.CollectionState_CollectionCreated,
|
||||
@ -1206,8 +1218,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
},
|
||||
}
|
||||
meta.names.insert(util.DefaultDBName, "old", 1)
|
||||
meta.names.insert(util.DefaultDBName, "new", 2)
|
||||
err := meta.RenameCollection(context.TODO(), "", "old", "new", 1000)
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", util.DefaultDBName, "old", 1000)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1229,7 +1240,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
}
|
||||
meta.names.insert(util.DefaultDBName, "old", 1)
|
||||
meta.names.insert(util.DefaultDBName, "new", 2)
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "new", 1000)
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "", "new", 1000)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1264,7 +1275,37 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
},
|
||||
}
|
||||
meta.names.insert(util.DefaultDBName, "old", 1)
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "new", 1000)
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "", "new", 1000)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("rename db name fails if aliases exists", func(t *testing.T) {
|
||||
catalog := mocks.NewRootCoordCatalog(t)
|
||||
catalog.On("GetCollectionByName",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).Return(nil, common.NewCollectionNotExistError("error"))
|
||||
meta := &MetaTable{
|
||||
dbName2Meta: map[string]*model.Database{
|
||||
util.DefaultDBName: model.NewDefaultDatabase(),
|
||||
"db1": model.NewDatabase(2, "db1", pb.DatabaseState_DatabaseCreated),
|
||||
},
|
||||
catalog: catalog,
|
||||
names: newNameDb(),
|
||||
aliases: newNameDb(),
|
||||
collID2Meta: map[typeutil.UniqueID]*model.Collection{
|
||||
1: {
|
||||
CollectionID: 1,
|
||||
Name: "old",
|
||||
},
|
||||
},
|
||||
}
|
||||
meta.names.insert(util.DefaultDBName, "old", 1)
|
||||
meta.aliases.insert(util.DefaultDBName, "alias", 1)
|
||||
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "db1", "new", 1000)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1298,7 +1339,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
},
|
||||
}
|
||||
meta.names.insert(util.DefaultDBName, "old", 1)
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "new", 1000)
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "", "new", 1000)
|
||||
assert.NoError(t, err)
|
||||
|
||||
id, ok := meta.names.get(util.DefaultDBName, "new")
|
||||
|
@ -155,7 +155,7 @@ func (m mockMetaTable) AlterCollection(ctx context.Context, oldColl *model.Colle
|
||||
return m.AlterCollectionFunc(ctx, oldColl, newColl, ts)
|
||||
}
|
||||
|
||||
func (m *mockMetaTable) RenameCollection(ctx context.Context, dbName string, oldName string, newName string, ts Timestamp) error {
|
||||
func (m *mockMetaTable) RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts Timestamp) error {
|
||||
return m.RenameCollectionFunc(ctx, oldName, newName, ts)
|
||||
}
|
||||
|
||||
|
@ -1569,12 +1569,12 @@ func (_c *IMetaTable_RemovePartition_Call) Return(_a0 error) *IMetaTable_RemoveP
|
||||
}
|
||||
|
||||
// RenameCollection provides a mock function with given fields: ctx, dbName, oldName, newName, ts
|
||||
func (_m *IMetaTable) RenameCollection(ctx context.Context, dbName string, oldName string, newName string, ts uint64) error {
|
||||
ret := _m.Called(ctx, dbName, oldName, newName, ts)
|
||||
func (_m *IMetaTable) RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts uint64) error {
|
||||
ret := _m.Called(ctx, dbName, oldName, newDBName, newName, ts)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, string, uint64) error); ok {
|
||||
r0 = rf(ctx, dbName, oldName, newName, ts)
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string, uint64) error); ok {
|
||||
r0 = rf(ctx, dbName, oldName, newDBName, newName, ts)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
@ -39,5 +39,5 @@ func (t *renameCollectionTask) Execute(ctx context.Context) error {
|
||||
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetOldName()}, InvalidCollectionID, t.GetTs()); err != nil {
|
||||
return err
|
||||
}
|
||||
return t.core.meta.RenameCollection(ctx, t.Req.GetDbName(), t.Req.GetOldName(), t.Req.GetNewName(), t.GetTs())
|
||||
return t.core.meta.RenameCollection(ctx, t.Req.GetDbName(), t.Req.GetOldName(), t.Req.GetNewDBName(), t.Req.GetNewName(), t.GetTs())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user