diff --git a/internal/metastore/kv/rootcoord/kv_catalog.go b/internal/metastore/kv/rootcoord/kv_catalog.go index dc88227b6d..e8c9266ed7 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog.go +++ b/internal/metastore/kv/rootcoord/kv_catalog.go @@ -373,8 +373,17 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias), ) } - delMetakeysSnap = append(delMetakeysSnap, buildPartitionPrefix(collectionInfo.CollectionID)) - delMetakeysSnap = append(delMetakeysSnap, buildFieldPrefix(collectionInfo.CollectionID)) + // Snapshot will list all (k, v) pairs and then use Txn.MultiSave to save tombstone for these keys when it prepares + // to remove a prefix, so though we have very few prefixes, the final operations may exceed the max txn number. + // TODO(longjiquan): should we list all partitions & fields in KV anyway? + for _, partition := range collectionInfo.Partitions { + delMetakeysSnap = append(delMetakeysSnap, buildPartitionKey(collectionInfo.CollectionID, partition.PartitionID)) + } + for _, field := range collectionInfo.Fields { + delMetakeysSnap = append(delMetakeysSnap, buildFieldKey(collectionInfo.CollectionID, field.FieldID)) + } + // delMetakeysSnap = append(delMetakeysSnap, buildPartitionPrefix(collectionInfo.CollectionID)) + // delMetakeysSnap = append(delMetakeysSnap, buildFieldPrefix(collectionInfo.CollectionID)) // Though batchMultiSaveAndRemoveWithPrefix is not atomic enough, we can promise atomicity outside. // If we found collection under dropping state, we'll know that gc is not completely on this collection. diff --git a/internal/rootcoord/drop_collection_task.go b/internal/rootcoord/drop_collection_task.go index 4954194458..eafc0514b1 100644 --- a/internal/rootcoord/drop_collection_task.go +++ b/internal/rootcoord/drop_collection_task.go @@ -86,7 +86,10 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error { redoTask.AddAsyncStep(&deleteCollectionMetaStep{ baseStep: baseStep{core: t.core}, collectionID: collMeta.CollectionID, - ts: ts, + // This ts is less than the ts when we notify data nodes to drop collection, but it's OK since we have already + // marked this collection as deleted. If we want to make this ts greater than the notification's ts, we should + // wrap a step who will have these three children and connect them with ts. + ts: ts, }) return redoTask.Execute(ctx) diff --git a/internal/rootcoord/drop_partition_task.go b/internal/rootcoord/drop_partition_task.go index b0342b88d5..9b33a1296b 100644 --- a/internal/rootcoord/drop_partition_task.go +++ b/internal/rootcoord/drop_partition_task.go @@ -82,7 +82,10 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error { baseStep: baseStep{core: t.core}, collectionID: t.collMeta.CollectionID, partitionID: partID, - ts: t.GetTs(), + // This ts is less than the ts when we notify data nodes to drop partition, but it's OK since we have already + // marked this partition as deleted. If we want to make this ts greater than the notification's ts, we should + // wrap a step who will have these children and connect them with ts. + ts: t.GetTs(), }) return redoTask.Execute(ctx) diff --git a/internal/rootcoord/garbage_collector.go b/internal/rootcoord/garbage_collector.go index b975c8ec00..02b8b40bd9 100644 --- a/internal/rootcoord/garbage_collector.go +++ b/internal/rootcoord/garbage_collector.go @@ -50,7 +50,10 @@ func (c *bgGarbageCollector) ReDropCollection(collMeta *model.Collection, ts Tim redo.AddAsyncStep(&deleteCollectionMetaStep{ baseStep: baseStep{core: c.s}, collectionID: collMeta.CollectionID, - ts: ts, + // This ts is less than the ts when we notify data nodes to drop collection, but it's OK since we have already + // marked this collection as deleted. If we want to make this ts greater than the notification's ts, we should + // wrap a step who will have these three children and connect them with ts. + ts: ts, }) // err is ignored since no sync steps will be executed. @@ -94,7 +97,10 @@ func (c *bgGarbageCollector) ReDropPartition(pChannels []string, partition *mode baseStep: baseStep{core: c.s}, collectionID: partition.CollectionID, partitionID: partition.PartitionID, - ts: ts, + // This ts is less than the ts when we notify data nodes to drop partition, but it's OK since we have already + // marked this partition as deleted. If we want to make this ts greater than the notification's ts, we should + // wrap a step who will have these children and connect them with ts. + ts: ts, }) // err is ignored since no sync steps will be executed. diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 8f96278672..16febec072 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -223,7 +223,6 @@ func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID if err := mt.catalog.DropCollection(ctx1, &model.Collection{CollectionID: collectionID, Aliases: aliases}, ts); err != nil { return err } - delete(mt.collID2Meta, collectionID) var name string coll, ok := mt.collID2Meta[collectionID] @@ -236,6 +235,8 @@ func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID delete(mt.collAlias2ID, alias) } + delete(mt.collID2Meta, collectionID) + log.Info("remove collection", zap.String("name", name), zap.Int64("id", collectionID), zap.Strings("aliases", aliases)) return nil } diff --git a/internal/rootcoord/step.go b/internal/rootcoord/step.go index 07ada29aae..6fbc36bbec 100644 --- a/internal/rootcoord/step.go +++ b/internal/rootcoord/step.go @@ -198,6 +198,8 @@ type waitForTsSyncedStep struct { func (s *waitForTsSyncedStep) Execute(ctx context.Context) ([]nestedStep, error) { syncedTs := s.core.chanTimeTick.getSyncedTimeTick(s.channel) if syncedTs < s.ts { + // TODO: there may be frequent log here. + // time.Sleep(Params.ProxyCfg.TimeTickInterval) return nil, fmt.Errorf("ts not synced yet, channel: %s, synced: %d, want: %d", s.channel, syncedTs, s.ts) } return nil, nil diff --git a/internal/rootcoord/step_executor.go b/internal/rootcoord/step_executor.go index 611000ee15..84fca842b7 100644 --- a/internal/rootcoord/step_executor.go +++ b/internal/rootcoord/step_executor.go @@ -95,7 +95,7 @@ type bgStepExecutor struct { bufferedSteps map[*stepStack]struct{} selector selectStepPolicy mu sync.Mutex - notify chan struct{} + notifyChan chan struct{} interval time.Duration } @@ -108,7 +108,7 @@ func newBgStepExecutor(ctx context.Context, opts ...bgOpt) *bgStepExecutor { bufferedSteps: make(map[*stepStack]struct{}), selector: defaultSelectPolicy(), mu: sync.Mutex{}, - notify: make(chan struct{}, 1), + notifyChan: make(chan struct{}, 1), interval: defaultBgExecutingInterval, } for _, opt := range opts { @@ -128,14 +128,8 @@ func (bg *bgStepExecutor) Stop() { } func (bg *bgStepExecutor) AddSteps(s *stepStack) { - bg.mu.Lock() bg.addStepsInternal(s) - bg.mu.Unlock() - - select { - case bg.notify <- struct{}{}: - default: - } + bg.notify() } func (bg *bgStepExecutor) process(steps []*stepStack) { @@ -150,7 +144,8 @@ func (bg *bgStepExecutor) process(steps []*stepStack) { defer wg.Done() child := s.Execute(bg.ctx) if child != nil { - bg.AddSteps(child) + // don't notify, wait for reschedule. + bg.addStepsInternal(child) } }() } @@ -161,7 +156,7 @@ func (bg *bgStepExecutor) schedule() { bg.mu.Lock() selected := bg.selector(bg.bufferedSteps) for _, s := range selected { - bg.removeStepsInternal(s) + bg.unlockRemoveSteps(s) } bg.mu.Unlock() @@ -178,7 +173,7 @@ func (bg *bgStepExecutor) scheduleLoop() { select { case <-bg.ctx.Done(): return - case <-bg.notify: + case <-bg.notifyChan: bg.schedule() case <-ticker.C: bg.schedule() @@ -187,9 +182,22 @@ func (bg *bgStepExecutor) scheduleLoop() { } func (bg *bgStepExecutor) addStepsInternal(s *stepStack) { + bg.mu.Lock() + bg.unlockAddSteps(s) + bg.mu.Unlock() +} + +func (bg *bgStepExecutor) unlockAddSteps(s *stepStack) { bg.bufferedSteps[s] = struct{}{} } -func (bg *bgStepExecutor) removeStepsInternal(s *stepStack) { +func (bg *bgStepExecutor) unlockRemoveSteps(s *stepStack) { delete(bg.bufferedSteps, s) } + +func (bg *bgStepExecutor) notify() { + select { + case bg.notifyChan <- struct{}{}: + default: + } +} diff --git a/internal/rootcoord/step_test.go b/internal/rootcoord/step_test.go new file mode 100644 index 0000000000..f1c4215968 --- /dev/null +++ b/internal/rootcoord/step_test.go @@ -0,0 +1,29 @@ +package rootcoord + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_waitForTsSyncedStep_Execute(t *testing.T) { + //Params.InitOnce() + //Params.ProxyCfg.TimeTickInterval = time.Millisecond + + ticker := newRocksMqTtSynchronizer() + core := newTestCore(withTtSynchronizer(ticker)) + core.chanTimeTick.syncedTtHistogram.update("ch1", 100) + s := &waitForTsSyncedStep{ + baseStep: baseStep{core: core}, + ts: 101, + channel: "ch1", + } + children, err := s.Execute(context.Background()) + assert.Equal(t, 0, len(children)) + assert.Error(t, err) + core.chanTimeTick.syncedTtHistogram.update("ch1", 102) + children, err = s.Execute(context.Background()) + assert.Equal(t, 0, len(children)) + assert.NoError(t, err) +}