mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Fix drop collection failed due to etcd txn limit. (#19362)
Signed-off-by: longjiquan <jiquan.long@zilliz.com> Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
parent
5117017355
commit
cfd01a188c
@ -373,8 +373,17 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col
|
||||
fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias),
|
||||
)
|
||||
}
|
||||
delMetakeysSnap = append(delMetakeysSnap, buildPartitionPrefix(collectionInfo.CollectionID))
|
||||
delMetakeysSnap = append(delMetakeysSnap, buildFieldPrefix(collectionInfo.CollectionID))
|
||||
// Snapshot will list all (k, v) pairs and then use Txn.MultiSave to save tombstone for these keys when it prepares
|
||||
// to remove a prefix, so though we have very few prefixes, the final operations may exceed the max txn number.
|
||||
// TODO(longjiquan): should we list all partitions & fields in KV anyway?
|
||||
for _, partition := range collectionInfo.Partitions {
|
||||
delMetakeysSnap = append(delMetakeysSnap, buildPartitionKey(collectionInfo.CollectionID, partition.PartitionID))
|
||||
}
|
||||
for _, field := range collectionInfo.Fields {
|
||||
delMetakeysSnap = append(delMetakeysSnap, buildFieldKey(collectionInfo.CollectionID, field.FieldID))
|
||||
}
|
||||
// delMetakeysSnap = append(delMetakeysSnap, buildPartitionPrefix(collectionInfo.CollectionID))
|
||||
// delMetakeysSnap = append(delMetakeysSnap, buildFieldPrefix(collectionInfo.CollectionID))
|
||||
|
||||
// Though batchMultiSaveAndRemoveWithPrefix is not atomic enough, we can promise atomicity outside.
|
||||
// If we found collection under dropping state, we'll know that gc is not completely on this collection.
|
||||
|
@ -86,6 +86,9 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
|
||||
redoTask.AddAsyncStep(&deleteCollectionMetaStep{
|
||||
baseStep: baseStep{core: t.core},
|
||||
collectionID: collMeta.CollectionID,
|
||||
// This ts is less than the ts when we notify data nodes to drop collection, but it's OK since we have already
|
||||
// marked this collection as deleted. If we want to make this ts greater than the notification's ts, we should
|
||||
// wrap a step who will have these three children and connect them with ts.
|
||||
ts: ts,
|
||||
})
|
||||
|
||||
|
@ -82,6 +82,9 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
|
||||
baseStep: baseStep{core: t.core},
|
||||
collectionID: t.collMeta.CollectionID,
|
||||
partitionID: partID,
|
||||
// This ts is less than the ts when we notify data nodes to drop partition, but it's OK since we have already
|
||||
// marked this partition as deleted. If we want to make this ts greater than the notification's ts, we should
|
||||
// wrap a step who will have these children and connect them with ts.
|
||||
ts: t.GetTs(),
|
||||
})
|
||||
|
||||
|
@ -50,6 +50,9 @@ func (c *bgGarbageCollector) ReDropCollection(collMeta *model.Collection, ts Tim
|
||||
redo.AddAsyncStep(&deleteCollectionMetaStep{
|
||||
baseStep: baseStep{core: c.s},
|
||||
collectionID: collMeta.CollectionID,
|
||||
// This ts is less than the ts when we notify data nodes to drop collection, but it's OK since we have already
|
||||
// marked this collection as deleted. If we want to make this ts greater than the notification's ts, we should
|
||||
// wrap a step who will have these three children and connect them with ts.
|
||||
ts: ts,
|
||||
})
|
||||
|
||||
@ -94,6 +97,9 @@ func (c *bgGarbageCollector) ReDropPartition(pChannels []string, partition *mode
|
||||
baseStep: baseStep{core: c.s},
|
||||
collectionID: partition.CollectionID,
|
||||
partitionID: partition.PartitionID,
|
||||
// This ts is less than the ts when we notify data nodes to drop partition, but it's OK since we have already
|
||||
// marked this partition as deleted. If we want to make this ts greater than the notification's ts, we should
|
||||
// wrap a step who will have these children and connect them with ts.
|
||||
ts: ts,
|
||||
})
|
||||
|
||||
|
@ -223,7 +223,6 @@ func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID
|
||||
if err := mt.catalog.DropCollection(ctx1, &model.Collection{CollectionID: collectionID, Aliases: aliases}, ts); err != nil {
|
||||
return err
|
||||
}
|
||||
delete(mt.collID2Meta, collectionID)
|
||||
|
||||
var name string
|
||||
coll, ok := mt.collID2Meta[collectionID]
|
||||
@ -236,6 +235,8 @@ func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID
|
||||
delete(mt.collAlias2ID, alias)
|
||||
}
|
||||
|
||||
delete(mt.collID2Meta, collectionID)
|
||||
|
||||
log.Info("remove collection", zap.String("name", name), zap.Int64("id", collectionID), zap.Strings("aliases", aliases))
|
||||
return nil
|
||||
}
|
||||
|
@ -198,6 +198,8 @@ type waitForTsSyncedStep struct {
|
||||
func (s *waitForTsSyncedStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
||||
syncedTs := s.core.chanTimeTick.getSyncedTimeTick(s.channel)
|
||||
if syncedTs < s.ts {
|
||||
// TODO: there may be frequent log here.
|
||||
// time.Sleep(Params.ProxyCfg.TimeTickInterval)
|
||||
return nil, fmt.Errorf("ts not synced yet, channel: %s, synced: %d, want: %d", s.channel, syncedTs, s.ts)
|
||||
}
|
||||
return nil, nil
|
||||
|
@ -95,7 +95,7 @@ type bgStepExecutor struct {
|
||||
bufferedSteps map[*stepStack]struct{}
|
||||
selector selectStepPolicy
|
||||
mu sync.Mutex
|
||||
notify chan struct{}
|
||||
notifyChan chan struct{}
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
@ -108,7 +108,7 @@ func newBgStepExecutor(ctx context.Context, opts ...bgOpt) *bgStepExecutor {
|
||||
bufferedSteps: make(map[*stepStack]struct{}),
|
||||
selector: defaultSelectPolicy(),
|
||||
mu: sync.Mutex{},
|
||||
notify: make(chan struct{}, 1),
|
||||
notifyChan: make(chan struct{}, 1),
|
||||
interval: defaultBgExecutingInterval,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
@ -128,14 +128,8 @@ func (bg *bgStepExecutor) Stop() {
|
||||
}
|
||||
|
||||
func (bg *bgStepExecutor) AddSteps(s *stepStack) {
|
||||
bg.mu.Lock()
|
||||
bg.addStepsInternal(s)
|
||||
bg.mu.Unlock()
|
||||
|
||||
select {
|
||||
case bg.notify <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
bg.notify()
|
||||
}
|
||||
|
||||
func (bg *bgStepExecutor) process(steps []*stepStack) {
|
||||
@ -150,7 +144,8 @@ func (bg *bgStepExecutor) process(steps []*stepStack) {
|
||||
defer wg.Done()
|
||||
child := s.Execute(bg.ctx)
|
||||
if child != nil {
|
||||
bg.AddSteps(child)
|
||||
// don't notify, wait for reschedule.
|
||||
bg.addStepsInternal(child)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@ -161,7 +156,7 @@ func (bg *bgStepExecutor) schedule() {
|
||||
bg.mu.Lock()
|
||||
selected := bg.selector(bg.bufferedSteps)
|
||||
for _, s := range selected {
|
||||
bg.removeStepsInternal(s)
|
||||
bg.unlockRemoveSteps(s)
|
||||
}
|
||||
bg.mu.Unlock()
|
||||
|
||||
@ -178,7 +173,7 @@ func (bg *bgStepExecutor) scheduleLoop() {
|
||||
select {
|
||||
case <-bg.ctx.Done():
|
||||
return
|
||||
case <-bg.notify:
|
||||
case <-bg.notifyChan:
|
||||
bg.schedule()
|
||||
case <-ticker.C:
|
||||
bg.schedule()
|
||||
@ -187,9 +182,22 @@ func (bg *bgStepExecutor) scheduleLoop() {
|
||||
}
|
||||
|
||||
func (bg *bgStepExecutor) addStepsInternal(s *stepStack) {
|
||||
bg.mu.Lock()
|
||||
bg.unlockAddSteps(s)
|
||||
bg.mu.Unlock()
|
||||
}
|
||||
|
||||
func (bg *bgStepExecutor) unlockAddSteps(s *stepStack) {
|
||||
bg.bufferedSteps[s] = struct{}{}
|
||||
}
|
||||
|
||||
func (bg *bgStepExecutor) removeStepsInternal(s *stepStack) {
|
||||
func (bg *bgStepExecutor) unlockRemoveSteps(s *stepStack) {
|
||||
delete(bg.bufferedSteps, s)
|
||||
}
|
||||
|
||||
func (bg *bgStepExecutor) notify() {
|
||||
select {
|
||||
case bg.notifyChan <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
29
internal/rootcoord/step_test.go
Normal file
29
internal/rootcoord/step_test.go
Normal file
@ -0,0 +1,29 @@
|
||||
package rootcoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func Test_waitForTsSyncedStep_Execute(t *testing.T) {
|
||||
//Params.InitOnce()
|
||||
//Params.ProxyCfg.TimeTickInterval = time.Millisecond
|
||||
|
||||
ticker := newRocksMqTtSynchronizer()
|
||||
core := newTestCore(withTtSynchronizer(ticker))
|
||||
core.chanTimeTick.syncedTtHistogram.update("ch1", 100)
|
||||
s := &waitForTsSyncedStep{
|
||||
baseStep: baseStep{core: core},
|
||||
ts: 101,
|
||||
channel: "ch1",
|
||||
}
|
||||
children, err := s.Execute(context.Background())
|
||||
assert.Equal(t, 0, len(children))
|
||||
assert.Error(t, err)
|
||||
core.chanTimeTick.syncedTtHistogram.update("ch1", 102)
|
||||
children, err = s.Execute(context.Background())
|
||||
assert.Equal(t, 0, len(children))
|
||||
assert.NoError(t, err)
|
||||
}
|
Loading…
Reference in New Issue
Block a user