From 225f4a6134cdb142580a581969134c3653e5851a Mon Sep 17 00:00:00 2001 From: smellthemoon <64083300+smellthemoon@users.noreply.github.com> Date: Fri, 17 May 2024 14:27:42 +0800 Subject: [PATCH] enhance: use the only MaxEtcdTxnNum (#33070) #33071 Signed-off-by: lixinguo Co-authored-by: lixinguo --- internal/metastore/kv/datacoord/kv_catalog.go | 34 +++---------------- internal/metastore/kv/rootcoord/kv_catalog.go | 22 ++++++------ .../metastore/kv/rootcoord/kv_catalog_test.go | 6 ++-- .../metastore/kv/rootcoord/suffix_snapshot.go | 3 +- pkg/util/constant.go | 2 ++ pkg/util/etcd/etcd_util.go | 11 ------ pkg/util/etcd/etcd_util_test.go | 24 ++++++------- 7 files changed, 35 insertions(+), 67 deletions(-) diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index a7cae134c9..f6b4190f75 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -45,8 +45,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -var maxEtcdTxnNum = 128 - var paginationSize = 2000 type Catalog struct { @@ -341,32 +339,10 @@ func (kc *Catalog) SaveByBatch(kvs map[string]string) error { saveFn := func(partialKvs map[string]string) error { return kc.MetaKv.MultiSave(partialKvs) } - if len(kvs) <= maxEtcdTxnNum { - if err := etcd.SaveByBatch(kvs, saveFn); err != nil { - log.Error("failed to save by batch", zap.Error(err)) - return err - } - } else { - // Split kvs into multiple operations to avoid over-sized operations. - // Also make sure kvs of the same segment are not split into different operations. - batch := make(map[string]string) - for k, v := range kvs { - if len(batch) == maxEtcdTxnNum { - if err := etcd.SaveByBatch(batch, saveFn); err != nil { - log.Error("failed to save by batch", zap.Error(err)) - return err - } - maps.Clear(batch) - } - batch[k] = v - } - - if len(batch) > 0 { - if err := etcd.SaveByBatch(batch, saveFn); err != nil { - log.Error("failed to save by batch", zap.Error(err)) - return err - } - } + err := etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, saveFn) + if err != nil { + log.Error("failed to save by batch", zap.Error(err)) + return err } return nil } @@ -434,7 +410,7 @@ func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*d saveFn := func(partialKvs map[string]string) error { return kc.MetaKv.MultiSave(partialKvs) } - if err := etcd.SaveByBatch(kvs, saveFn); err != nil { + if err := etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, saveFn); err != nil { return err } diff --git a/internal/metastore/kv/rootcoord/kv_catalog.go b/internal/metastore/kv/rootcoord/kv_catalog.go index 7bf240c0b5..9edcfe13f6 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog.go +++ b/internal/metastore/kv/rootcoord/kv_catalog.go @@ -27,10 +27,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -const ( - maxTxnNum = 64 -) - // prefix/collection/collection_id -> CollectionInfo // prefix/partitions/collection_id/partition_id -> PartitionInfo // prefix/aliases/alias_name -> AliasInfo @@ -87,11 +83,13 @@ func BuildAliasPrefixWithDB(dbID int64) string { return fmt.Sprintf("%s/%s/%d", DatabaseMetaPrefix, Aliases, dbID) } -func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, maxTxnNum int, saves map[string]string, removals []string, ts typeutil.Timestamp) error { +// since SnapshotKV may save both snapshot key and the original key if the original key is newest +// MaxEtcdTxnNum need to divided by 2 +func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, limit int, saves map[string]string, removals []string, ts typeutil.Timestamp) error { saveFn := func(partialKvs map[string]string) error { return snapshot.MultiSave(partialKvs, ts) } - if err := etcd.SaveByBatchWithLimit(saves, maxTxnNum, saveFn); err != nil { + if err := etcd.SaveByBatchWithLimit(saves, limit, saveFn); err != nil { return err } @@ -104,7 +102,7 @@ func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, maxTxnNum int, sa removeFn := func(partialKeys []string) error { return snapshot.MultiSaveAndRemoveWithPrefix(nil, partialKeys, ts) } - return etcd.RemoveByBatchWithLimit(removals, maxTxnNum/2, removeFn) + return etcd.RemoveByBatchWithLimit(removals, limit, removeFn) } func (kc *Catalog) CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error { @@ -200,7 +198,9 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, // Though batchSave is not atomic enough, we can promise the atomicity outside. // Recovering from failure, if we found collection is creating, we should remove all these related meta. - return etcd.SaveByBatchWithLimit(kvs, maxTxnNum/2, func(partialKvs map[string]string) error { + // since SnapshotKV may save both snapshot key and the original key if the original key is newest + // MaxEtcdTxnNum need to divided by 2 + return etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum/2, func(partialKvs map[string]string) error { return kc.Snapshot.MultiSave(partialKvs, ts) }) } @@ -453,9 +453,9 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col // Though batchMultiSaveAndRemoveWithPrefix is not atomic enough, we can promise atomicity outside. // If we found collection under dropping state, we'll know that gc is not completely on this collection. // However, if we remove collection first, we cannot remove other metas. - // We set maxTxnNum to 64, since SnapshotKV may save both snapshot key and the original key if the original key is - // newest. - if err := batchMultiSaveAndRemoveWithPrefix(kc.Snapshot, maxTxnNum, nil, delMetakeysSnap, ts); err != nil { + // since SnapshotKV may save both snapshot key and the original key if the original key is newest + // MaxEtcdTxnNum need to divided by 2 + if err := batchMultiSaveAndRemoveWithPrefix(kc.Snapshot, util.MaxEtcdTxnNum/2, nil, delMetakeysSnap, ts); err != nil { return err } diff --git a/internal/metastore/kv/rootcoord/kv_catalog_test.go b/internal/metastore/kv/rootcoord/kv_catalog_test.go index b5c4502eb6..7523c82167 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog_test.go +++ b/internal/metastore/kv/rootcoord/kv_catalog_test.go @@ -949,7 +949,7 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) { return errors.New("error mock MultiSave") } saves := map[string]string{"k": "v"} - err := batchMultiSaveAndRemoveWithPrefix(snapshot, maxTxnNum, saves, []string{}, 0) + err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, []string{}, 0) assert.Error(t, err) }) t.Run("failed to remove", func(t *testing.T) { @@ -962,7 +962,7 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) { } saves := map[string]string{"k": "v"} removals := []string{"prefix1", "prefix2"} - err := batchMultiSaveAndRemoveWithPrefix(snapshot, maxTxnNum, saves, removals, 0) + err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0) assert.Error(t, err) }) t.Run("normal case", func(t *testing.T) { @@ -983,7 +983,7 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) { saves[fmt.Sprintf("k%d", i)] = fmt.Sprintf("v%d", i) removals = append(removals, fmt.Sprintf("k%d", i)) } - err := batchMultiSaveAndRemoveWithPrefix(snapshot, 64, saves, removals, 0) + err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0) assert.NoError(t, err) }) } diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index 832bfd45d9..f945dc958d 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -596,7 +597,7 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey s removeFn := func(partialKeys []string) error { return ss.MetaKv.MultiRemove(keyGroup) } - return etcd.RemoveByBatch(keyGroup, removeFn) + return etcd.RemoveByBatchWithLimit(keyGroup, util.MaxEtcdTxnNum, removeFn) } func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error { diff --git a/pkg/util/constant.go b/pkg/util/constant.go index 7bdad8a371..36e52d83dc 100644 --- a/pkg/util/constant.go +++ b/pkg/util/constant.go @@ -70,6 +70,8 @@ const ( RoleConfigObjectName = "object_name" RoleConfigDBName = "db_name" RoleConfigPrivilege = "privilege" + + MaxEtcdTxnNum = 128 ) const ( diff --git a/pkg/util/etcd/etcd_util.go b/pkg/util/etcd/etcd_util.go index b92d651a2f..37717ed4d2 100644 --- a/pkg/util/etcd/etcd_util.go +++ b/pkg/util/etcd/etcd_util.go @@ -33,8 +33,6 @@ import ( "github.com/milvus-io/milvus/pkg/log" ) -var maxTxnNum = 128 - // GetEtcdClient returns etcd client // should only used for test func GetEtcdClient( @@ -191,11 +189,6 @@ func SaveByBatchWithLimit(kvs map[string]string, limit int, op func(partialKvs m return nil } -// SaveByBatch there will not guarantee atomicity. -func SaveByBatch(kvs map[string]string, op func(partialKvs map[string]string) error) error { - return SaveByBatchWithLimit(kvs, maxTxnNum, op) -} - func RemoveByBatchWithLimit(removals []string, limit int, op func(partialKeys []string) error) error { if len(removals) == 0 { return nil @@ -211,10 +204,6 @@ func RemoveByBatchWithLimit(removals []string, limit int, op func(partialKeys [] return nil } -func RemoveByBatch(removals []string, op func(partialKeys []string) error) error { - return RemoveByBatchWithLimit(removals, maxTxnNum, op) -} - func buildKvGroup(keys, values []string) (map[string]string, error) { if len(keys) != len(values) { return nil, fmt.Errorf("length of keys (%d) and values (%d) are not equal", len(keys), len(values)) diff --git a/pkg/util/etcd/etcd_util_test.go b/pkg/util/etcd/etcd_util_test.go index 86a60ae4ea..aa94d49dcb 100644 --- a/pkg/util/etcd/etcd_util_test.go +++ b/pkg/util/etcd/etcd_util_test.go @@ -104,8 +104,8 @@ func Test_SaveByBatch(t *testing.T) { return nil } - maxTxnNum = 2 - err := SaveByBatch(kvs, saveFn) + limit := 2 + err := SaveByBatchWithLimit(kvs, limit, saveFn) assert.NoError(t, err) assert.Equal(t, 0, group) assert.Equal(t, 0, count) @@ -126,8 +126,8 @@ func Test_SaveByBatch(t *testing.T) { return nil } - maxTxnNum = 2 - err := SaveByBatch(kvs, saveFn) + limit := 2 + err := SaveByBatchWithLimit(kvs, limit, saveFn) assert.NoError(t, err) assert.Equal(t, 2, group) assert.Equal(t, 3, count) @@ -142,8 +142,8 @@ func Test_SaveByBatch(t *testing.T) { "k2": "v2", "k3": "v3", } - maxTxnNum = 2 - err := SaveByBatch(kvs, saveFn) + limit := 2 + err := SaveByBatchWithLimit(kvs, limit, saveFn) assert.Error(t, err) }) } @@ -160,8 +160,8 @@ func Test_RemoveByBatch(t *testing.T) { return nil } - maxTxnNum = 2 - err := RemoveByBatch(kvs, removeFn) + limit := 2 + err := RemoveByBatchWithLimit(kvs, limit, removeFn) assert.NoError(t, err) assert.Equal(t, 0, group) assert.Equal(t, 0, count) @@ -178,8 +178,8 @@ func Test_RemoveByBatch(t *testing.T) { return nil } - maxTxnNum = 2 - err := RemoveByBatch(kvs, removeFn) + limit := 2 + err := RemoveByBatchWithLimit(kvs, limit, removeFn) assert.NoError(t, err) assert.Equal(t, 3, group) assert.Equal(t, 5, count) @@ -190,8 +190,8 @@ func Test_RemoveByBatch(t *testing.T) { return errors.New("mock") } kvs := []string{"k1", "k2", "k3", "k4", "k5"} - maxTxnNum = 2 - err := RemoveByBatch(kvs, removeFn) + limit := 2 + err := RemoveByBatchWithLimit(kvs, limit, removeFn) assert.Error(t, err) }) }