mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
remove MultiRemoveWithPrefix (#26924)
Signed-off-by: yiwangdr <yiwangdr@gmail.com>
This commit is contained in:
parent
45d9fb5929
commit
f85af0732c
@ -475,19 +475,6 @@ func (kv *EmbedEtcdKV) WatchWithRevision(key string, revision int64) clientv3.Wa
|
||||
return rch
|
||||
}
|
||||
|
||||
func (kv *EmbedEtcdKV) MultiRemoveWithPrefix(keys []string) error {
|
||||
ops := make([]clientv3.Op, 0, len(keys))
|
||||
for _, k := range keys {
|
||||
op := clientv3.OpDelete(path.Join(kv.rootPath, k), clientv3.WithPrefix())
|
||||
ops = append(ops, op)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
return err
|
||||
}
|
||||
|
||||
// MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.
|
||||
func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
|
||||
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
|
||||
|
@ -522,7 +522,7 @@ func TestEmbedEtcd(te *testing.T) {
|
||||
assert.Empty(t, vs)
|
||||
})
|
||||
|
||||
te.Run("etcdKV MultiRemoveWithPrefix", func(t *testing.T) {
|
||||
te.Run("etcdKV MultiSaveAndRemoveWithPrefix", func(t *testing.T) {
|
||||
rootPath := "/etcd/test/root/multi_remove_with_prefix"
|
||||
metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg)
|
||||
require.NoError(t, err)
|
||||
@ -539,45 +539,6 @@ func TestEmbedEtcd(te *testing.T) {
|
||||
"x/den/2": "200",
|
||||
}
|
||||
|
||||
err = metaKv.MultiSave(prepareTests)
|
||||
require.NoError(t, err)
|
||||
|
||||
multiRemoveWithPrefixTests := []struct {
|
||||
prefix []string
|
||||
|
||||
testKey string
|
||||
expectedValue string
|
||||
}{
|
||||
{[]string{"x/abc"}, "x/abc/1", ""},
|
||||
{[]string{}, "x/abc/2", ""},
|
||||
{[]string{}, "x/def/1", "10"},
|
||||
{[]string{}, "x/def/2", "20"},
|
||||
{[]string{}, "x/den/1", "100"},
|
||||
{[]string{}, "x/den/2", "200"},
|
||||
{[]string{}, "not-exist", ""},
|
||||
{[]string{"x/def", "x/den"}, "x/def/1", ""},
|
||||
{[]string{}, "x/def/1", ""},
|
||||
{[]string{}, "x/def/2", ""},
|
||||
{[]string{}, "x/den/1", ""},
|
||||
{[]string{}, "x/den/2", ""},
|
||||
{[]string{}, "not-exist", ""},
|
||||
}
|
||||
|
||||
for _, test := range multiRemoveWithPrefixTests {
|
||||
if len(test.prefix) > 0 {
|
||||
err = metaKv.MultiRemoveWithPrefix(test.prefix)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
v, _ := metaKv.Load(test.testKey)
|
||||
assert.Equal(t, test.expectedValue, v)
|
||||
}
|
||||
|
||||
k, v, err := metaKv.LoadWithPrefix("/")
|
||||
assert.NoError(t, err)
|
||||
assert.Zero(t, len(k))
|
||||
assert.Zero(t, len(v))
|
||||
|
||||
// MultiSaveAndRemoveWithPrefix
|
||||
err = metaKv.MultiSave(prepareTests)
|
||||
require.NoError(t, err)
|
||||
@ -597,7 +558,7 @@ func TestEmbedEtcd(te *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range multiSaveAndRemoveWithPrefixTests {
|
||||
k, _, err = metaKv.LoadWithPrefix(test.loadPrefix)
|
||||
k, _, err := metaKv.LoadWithPrefix(test.loadPrefix)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, test.lengthBeforeRemove, len(k))
|
||||
|
||||
@ -628,40 +589,6 @@ func TestEmbedEtcd(te *testing.T) {
|
||||
"x/den/2": []byte("200"),
|
||||
}
|
||||
|
||||
err = metaKv.MultiSaveBytes(prepareTests)
|
||||
require.NoError(t, err)
|
||||
|
||||
multiRemoveWithPrefixTests := []struct {
|
||||
prefix []string
|
||||
|
||||
testKey string
|
||||
expectedValue []byte
|
||||
}{
|
||||
{[]string{"x/abc"}, "x/abc/1", nil},
|
||||
{[]string{}, "x/abc/2", nil},
|
||||
{[]string{}, "x/def/1", []byte("10")},
|
||||
{[]string{}, "x/def/2", []byte("20")},
|
||||
{[]string{}, "x/den/1", []byte("100")},
|
||||
{[]string{}, "x/den/2", []byte("200")},
|
||||
{[]string{}, "not-exist", nil},
|
||||
{[]string{"x/def", "x/den"}, "x/def/1", nil},
|
||||
{[]string{}, "x/def/1", nil},
|
||||
{[]string{}, "x/def/2", nil},
|
||||
{[]string{}, "x/den/1", nil},
|
||||
{[]string{}, "x/den/2", nil},
|
||||
{[]string{}, "not-exist", nil},
|
||||
}
|
||||
|
||||
for _, test := range multiRemoveWithPrefixTests {
|
||||
if len(test.prefix) > 0 {
|
||||
err = metaKv.MultiRemoveWithPrefix(test.prefix)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
v, _ := metaKv.LoadBytes(test.testKey)
|
||||
assert.Equal(t, test.expectedValue, v)
|
||||
}
|
||||
|
||||
k, v, err := metaKv.LoadBytesWithPrefix("/")
|
||||
assert.NoError(t, err)
|
||||
assert.Zero(t, len(k))
|
||||
|
@ -529,25 +529,6 @@ func (kv *etcdKV) WatchWithRevision(key string, revision int64) clientv3.WatchCh
|
||||
return rch
|
||||
}
|
||||
|
||||
// MultiRemoveWithPrefix removes the keys with given prefix.
|
||||
func (kv *etcdKV) MultiRemoveWithPrefix(keys []string) error {
|
||||
start := time.Now()
|
||||
ops := make([]clientv3.Op, 0, len(keys))
|
||||
for _, k := range keys {
|
||||
op := clientv3.OpDelete(path.Join(kv.rootPath, k), clientv3.WithPrefix())
|
||||
ops = append(ops, op)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...)
|
||||
if err != nil {
|
||||
log.Warn("Etcd MultiRemoveWithPrefix error", zap.Strings("keys", keys), zap.Int("len", len(keys)), zap.Error(err))
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi remove with prefix", zap.Strings("keys", keys))
|
||||
return err
|
||||
}
|
||||
|
||||
// MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.
|
||||
func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
|
||||
start := time.Now()
|
||||
|
@ -551,8 +551,8 @@ func TestEtcdKV_Load(te *testing.T) {
|
||||
assert.Empty(t, vs)
|
||||
})
|
||||
|
||||
te.Run("etcdKV MultiRemoveWithPrefix", func(t *testing.T) {
|
||||
rootPath := "/etcd/test/root/multi_remove_with_prefix"
|
||||
te.Run("etcdKV MultiSaveAndRemoveWithPrefix", func(t *testing.T) {
|
||||
rootPath := "/etcd/test/root/multi_save_and_remove_with_prefix"
|
||||
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
|
||||
defer etcdKV.Close()
|
||||
defer etcdKV.RemoveWithPrefix("")
|
||||
@ -566,45 +566,6 @@ func TestEtcdKV_Load(te *testing.T) {
|
||||
"x/den/2": "200",
|
||||
}
|
||||
|
||||
err = etcdKV.MultiSave(prepareTests)
|
||||
require.NoError(t, err)
|
||||
|
||||
multiRemoveWithPrefixTests := []struct {
|
||||
prefix []string
|
||||
|
||||
testKey string
|
||||
expectedValue string
|
||||
}{
|
||||
{[]string{"x/abc"}, "x/abc/1", ""},
|
||||
{[]string{}, "x/abc/2", ""},
|
||||
{[]string{}, "x/def/1", "10"},
|
||||
{[]string{}, "x/def/2", "20"},
|
||||
{[]string{}, "x/den/1", "100"},
|
||||
{[]string{}, "x/den/2", "200"},
|
||||
{[]string{}, "not-exist", ""},
|
||||
{[]string{"x/def", "x/den"}, "x/def/1", ""},
|
||||
{[]string{}, "x/def/1", ""},
|
||||
{[]string{}, "x/def/2", ""},
|
||||
{[]string{}, "x/den/1", ""},
|
||||
{[]string{}, "x/den/2", ""},
|
||||
{[]string{}, "not-exist", ""},
|
||||
}
|
||||
|
||||
for _, test := range multiRemoveWithPrefixTests {
|
||||
if len(test.prefix) > 0 {
|
||||
err = etcdKV.MultiRemoveWithPrefix(test.prefix)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
v, _ := etcdKV.Load(test.testKey)
|
||||
assert.Equal(t, test.expectedValue, v)
|
||||
}
|
||||
|
||||
k, v, err := etcdKV.LoadWithPrefix("/")
|
||||
assert.NoError(t, err)
|
||||
assert.Zero(t, len(k))
|
||||
assert.Zero(t, len(v))
|
||||
|
||||
// MultiSaveAndRemoveWithPrefix
|
||||
err = etcdKV.MultiSave(prepareTests)
|
||||
require.NoError(t, err)
|
||||
@ -624,7 +585,7 @@ func TestEtcdKV_Load(te *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range multiSaveAndRemoveWithPrefixTests {
|
||||
k, _, err = etcdKV.LoadWithPrefix(test.loadPrefix)
|
||||
k, _, err := etcdKV.LoadWithPrefix(test.loadPrefix)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, test.lengthBeforeRemove, len(k))
|
||||
|
||||
|
@ -58,7 +58,6 @@ type BaseKV interface {
|
||||
type TxnKV interface {
|
||||
BaseKV
|
||||
MultiSaveAndRemove(saves map[string]string, removals []string) error
|
||||
MultiRemoveWithPrefix(keys []string) error
|
||||
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/google/btree"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
)
|
||||
@ -282,11 +281,6 @@ func (kv *MemoryKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error)
|
||||
func (kv *MemoryKV) Close() {
|
||||
}
|
||||
|
||||
// MultiRemoveWithPrefix not implemented
|
||||
func (kv *MemoryKV) MultiRemoveWithPrefix(keys []string) error {
|
||||
return errors.New("not implement")
|
||||
}
|
||||
|
||||
// MultiSaveAndRemoveWithPrefix saves key-value pairs in @saves, & remove key with prefix in @removals in MemoryKV atomically.
|
||||
func (kv *MemoryKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
|
||||
kv.Lock()
|
||||
|
@ -420,18 +420,6 @@ func (kv *RocksdbKV) DeleteRange(startKey, endKey string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// MultiRemoveWithPrefix is used to remove a batch of key-values with the same prefix
|
||||
func (kv *RocksdbKV) MultiRemoveWithPrefix(prefixes []string) error {
|
||||
if kv.DB == nil {
|
||||
return errors.New("rocksdb instance is nil when do RemoveWithPrefix")
|
||||
}
|
||||
writeBatch := gorocksdb.NewWriteBatch()
|
||||
defer writeBatch.Destroy()
|
||||
kv.prepareRemovePrefix(prefixes, writeBatch)
|
||||
err := kv.DB.Write(kv.WriteOptions, writeBatch)
|
||||
return err
|
||||
}
|
||||
|
||||
// MultiSaveAndRemoveWithPrefix is used to execute a batch operators with the same prefix
|
||||
func (kv *RocksdbKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
|
||||
if kv.DB == nil {
|
||||
|
@ -215,30 +215,6 @@ func TestRocksdbKV_Txn(t *testing.T) {
|
||||
assert.Equal(t, len(keys), 3)
|
||||
assert.Equal(t, len(vals), 3)
|
||||
|
||||
removePrefix := []string{"abc", "abd"}
|
||||
rocksdbKV.MultiRemoveWithPrefix(removePrefix)
|
||||
|
||||
keys, vals, err = rocksdbKV.LoadWithPrefix("")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(keys), 0)
|
||||
assert.Equal(t, len(vals), 0)
|
||||
|
||||
err = rocksdbKV.MultiSave(kvs)
|
||||
assert.NoError(t, err)
|
||||
keys, vals, err = rocksdbKV.LoadWithPrefix("")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(keys), 3)
|
||||
assert.Equal(t, len(vals), 3)
|
||||
|
||||
// test delete the whole table
|
||||
removePrefix = []string{"", "hello"}
|
||||
rocksdbKV.MultiRemoveWithPrefix(removePrefix)
|
||||
|
||||
keys, vals, err = rocksdbKV.LoadWithPrefix("")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(keys), 0)
|
||||
assert.Equal(t, len(vals), 0)
|
||||
|
||||
err = rocksdbKV.MultiSave(kvs)
|
||||
assert.NoError(t, err)
|
||||
keys, vals, err = rocksdbKV.LoadWithPrefix("")
|
||||
@ -247,7 +223,7 @@ func TestRocksdbKV_Txn(t *testing.T) {
|
||||
assert.Equal(t, len(vals), 3)
|
||||
|
||||
// test remove and save
|
||||
removePrefix = []string{"abc", "abd"}
|
||||
removePrefix := []string{"abc", "abd"}
|
||||
kvs2 := map[string]string{
|
||||
"abfad": "12345",
|
||||
}
|
||||
|
@ -473,64 +473,6 @@ func (kv *txnTiKV) MultiSaveAndRemove(saves map[string]string, removals []string
|
||||
return nil
|
||||
}
|
||||
|
||||
// MultiRemoveWithPrefix removes the keys with given prefix.
|
||||
func (kv *txnTiKV) MultiRemoveWithPrefix(prefixes []string) error {
|
||||
start := time.Now()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var logging_error error
|
||||
defer logWarnOnFailure(&logging_error, "txnTiKV MultiRemoveWithPrefix error", zap.Strings("keys", prefixes), zap.Int("len", len(prefixes)))
|
||||
|
||||
txn, err := beginTxn(kv.txn)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, "Failed to create txn for MultiRemoveWithPrefix")
|
||||
return logging_error
|
||||
}
|
||||
|
||||
// Defer a rollback only if the transaction hasn't been committed
|
||||
defer rollbackOnFailure(&logging_error, txn)
|
||||
|
||||
// Need in order for err to be propogated to the defered logger
|
||||
// := within forloop will shadow the original variable, resulting in logger
|
||||
// missing it
|
||||
for _, prefix := range prefixes {
|
||||
prefix = path.Join(kv.rootPath, prefix)
|
||||
// Get the start and end keys for the prefix range
|
||||
startKey := []byte(prefix)
|
||||
endKey := tikv.PrefixNextKey([]byte(prefix))
|
||||
// Use Scan to iterate over keys in the prefix range
|
||||
iter, err := txn.Iter(startKey, endKey)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, "Failed to create iterater for MultiRemoveWithPrefix")
|
||||
return logging_error
|
||||
}
|
||||
// Iterate over keys and delete them
|
||||
for iter.Valid() {
|
||||
key := iter.Key()
|
||||
err = txn.Delete(key)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiRemoveWithPrefix", string(key)))
|
||||
return logging_error
|
||||
}
|
||||
// Move the iterator to the next key
|
||||
err = iter.Next()
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to move Iterator after key %s for MultiRemoveWithPrefix", string(key)))
|
||||
return logging_error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = kv.executeTxn(txn, ctx)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, "Failed to commit for MultiRemoveWithPrefix")
|
||||
return logging_error
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow txnTiKV MultiRemoveWithPrefix() operation", zap.Strings("keys", prefixes))
|
||||
return nil
|
||||
}
|
||||
|
||||
// MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.
|
||||
func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
|
||||
start := time.Now()
|
||||
|
@ -243,7 +243,7 @@ func TestTiKVLoad(te *testing.T) {
|
||||
assert.Empty(t, vs)
|
||||
})
|
||||
|
||||
te.Run("kv MultiRemoveWithPrefix", func(t *testing.T) {
|
||||
te.Run("kv MultiSaveAndRemoveWithPrefix", func(t *testing.T) {
|
||||
rootPath := "/tikv/test/root/multi_remove_with_prefix"
|
||||
kv := NewTiKV(txnClient, rootPath)
|
||||
defer kv.Close()
|
||||
@ -258,47 +258,8 @@ func TestTiKVLoad(te *testing.T) {
|
||||
"x/den/2": "200",
|
||||
}
|
||||
|
||||
err := kv.MultiSave(prepareTests)
|
||||
require.NoError(t, err)
|
||||
|
||||
multiRemoveWithPrefixTests := []struct {
|
||||
prefix []string
|
||||
|
||||
testKey string
|
||||
expectedValue string
|
||||
}{
|
||||
{[]string{"x/abc"}, "x/abc/1", ""},
|
||||
{[]string{}, "x/abc/2", ""},
|
||||
{[]string{}, "x/def/1", "10"},
|
||||
{[]string{}, "x/def/2", "20"},
|
||||
{[]string{}, "x/den/1", "100"},
|
||||
{[]string{}, "x/den/2", "200"},
|
||||
{[]string{}, "not-exist", ""},
|
||||
{[]string{"x/def", "x/den"}, "x/def/1", ""},
|
||||
{[]string{}, "x/def/1", ""},
|
||||
{[]string{}, "x/def/2", ""},
|
||||
{[]string{}, "x/den/1", ""},
|
||||
{[]string{}, "x/den/2", ""},
|
||||
{[]string{}, "not-exist", ""},
|
||||
}
|
||||
|
||||
for _, test := range multiRemoveWithPrefixTests {
|
||||
if len(test.prefix) > 0 {
|
||||
err = kv.MultiRemoveWithPrefix(test.prefix)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
v, _ := kv.Load(test.testKey)
|
||||
assert.Equal(t, test.expectedValue, v)
|
||||
}
|
||||
|
||||
k, v, err := kv.LoadWithPrefix("/")
|
||||
assert.NoError(t, err)
|
||||
assert.Zero(t, len(k))
|
||||
assert.Zero(t, len(v))
|
||||
|
||||
// MultiSaveAndRemoveWithPrefix
|
||||
err = kv.MultiSave(prepareTests)
|
||||
err := kv.MultiSave(prepareTests)
|
||||
require.NoError(t, err)
|
||||
multiSaveAndRemoveWithPrefixTests := []struct {
|
||||
multiSave map[string]string
|
||||
@ -316,7 +277,7 @@ func TestTiKVLoad(te *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range multiSaveAndRemoveWithPrefixTests {
|
||||
k, _, err = kv.LoadWithPrefix(test.loadPrefix)
|
||||
k, _, err := kv.LoadWithPrefix(test.loadPrefix)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, test.lengthBeforeRemove, len(k))
|
||||
|
||||
@ -352,8 +313,6 @@ func TestTiKVLoad(te *testing.T) {
|
||||
assert.Error(t, err)
|
||||
err = kv.MultiSaveAndRemoveWithPrefix(map[string]string{"y/c": "vvv"}, []string{"/"})
|
||||
assert.Error(t, err)
|
||||
err = kv.MultiRemoveWithPrefix([]string{"x/def", "x/den"})
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
te.Run("kv failed to commit txn", func(t *testing.T) {
|
||||
@ -380,8 +339,6 @@ func TestTiKVLoad(te *testing.T) {
|
||||
assert.Error(t, err)
|
||||
err = kv.MultiSaveAndRemoveWithPrefix(map[string]string{"y/c": "vvv"}, []string{"/"})
|
||||
assert.Error(t, err)
|
||||
err = kv.MultiRemoveWithPrefix([]string{"x/def", "x/den"})
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user