From 412b2fab674e4a8380fbb61c2417d5f79f0e5f26 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 10 Jul 2023 10:16:26 +0800 Subject: [PATCH] Fix rootcoord restoration missing gcConfirmStep (#25293) Signed-off-by: Congqi Xia --- internal/rootcoord/garbage_collector.go | 2 + internal/rootcoord/garbage_collector_test.go | 55 +++++++++++++++++++- 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/internal/rootcoord/garbage_collector.go b/internal/rootcoord/garbage_collector.go index a3970685c7..bbca6c058d 100644 --- a/internal/rootcoord/garbage_collector.go +++ b/internal/rootcoord/garbage_collector.go @@ -66,6 +66,7 @@ func (c *bgGarbageCollector) ReDropCollection(collMeta *model.Collection, ts Tim baseStep: baseStep{core: c.s}, pChannels: collMeta.PhysicalChannelNames, }) + redo.AddAsyncStep(newConfirmGCStep(c.s, collMeta.CollectionID, allPartition)) redo.AddAsyncStep(&deleteCollectionMetaStep{ baseStep: baseStep{core: c.s}, collectionID: collMeta.CollectionID, @@ -121,6 +122,7 @@ func (c *bgGarbageCollector) ReDropPartition(dbID int64, pChannels []string, par baseStep: baseStep{core: c.s}, pChannels: pChannels, }) + redo.AddAsyncStep(newConfirmGCStep(c.s, partition.CollectionID, partition.PartitionID)) redo.AddAsyncStep(&removePartitionMetaStep{ baseStep: baseStep{core: c.s}, dbID: dbID, diff --git a/internal/rootcoord/garbage_collector_test.go b/internal/rootcoord/garbage_collector_test.go index 7f1d7c3672..948a0f9a65 100644 --- a/internal/rootcoord/garbage_collector_test.go +++ b/internal/rootcoord/garbage_collector_test.go @@ -35,6 +35,11 @@ import ( ) func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { + oldValue := confirmGCInterval + defer func() { + confirmGCInterval = oldValue + }() + confirmGCInterval = 0 t.Run("failed to release collection", func(t *testing.T) { broker := newMockBroker() broker.ReleaseCollectionFunc = func(ctx context.Context, collectionID UniqueID) error { @@ -110,6 +115,13 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { releaseCollectionChan <- struct{}{} return nil } + gcConfirmCalled := false + gcConfirmChan := make(chan struct{}) + broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool { + gcConfirmCalled = true + close(gcConfirmChan) + return true + } dropCollectionIndexCalled := false dropCollectionIndexChan := make(chan struct{}, 1) broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error { @@ -146,6 +158,8 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { assert.True(t, releaseCollectionCalled) <-dropCollectionIndexChan assert.True(t, dropCollectionIndexCalled) + <-gcConfirmChan + assert.True(t, gcConfirmCalled) <-dropMetaChan }) @@ -165,6 +179,13 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { dropCollectionIndexChan <- struct{}{} return nil } + gcConfirmCalled := false + gcConfirmChan := make(chan struct{}) + broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool { + gcConfirmCalled = true + close(gcConfirmChan) + return true + } meta := mockrootcoord.NewIMetaTable(t) removeCollectionCalled := false removeCollectionChan := make(chan struct{}, 1) @@ -197,6 +218,8 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { assert.True(t, dropCollectionIndexCalled) <-removeCollectionChan assert.True(t, removeCollectionCalled) + <-gcConfirmChan + assert.True(t, gcConfirmCalled) }) } @@ -312,6 +335,11 @@ func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) { } func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) { + oldValue := confirmGCInterval + defer func() { + confirmGCInterval = oldValue + }() + confirmGCInterval = 0 t.Run("failed to GcPartitionData", func(t *testing.T) { ticker := newTickerWithMockFailStream() // failed to broadcast drop msg. shardsNum := int(common.DefaultShardsNum) @@ -347,15 +375,26 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) { return errors.New("error mock RemovePartition") }) + broker := newMockBroker() + gcConfirmCalled := false + gcConfirmChan := make(chan struct{}) + broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool { + gcConfirmCalled = true + close(gcConfirmChan) + return true + } + tsoAllocator := newMockTsoAllocator() tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) { return 100, nil } - core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex()) + core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex(), withBroker(broker)) core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator) gc := newBgGarbageCollector(core) core.garbageCollector = gc gc.ReDropPartition(0, pchans, &model.Partition{}, 100000) + <-gcConfirmChan + assert.True(t, gcConfirmCalled) <-removePartitionChan assert.True(t, removePartitionCalled) }) @@ -380,15 +419,27 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) { return nil }) + broker := newMockBroker() + + gcConfirmCalled := false + gcConfirmChan := make(chan struct{}) + broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool { + gcConfirmCalled = true + close(gcConfirmChan) + return true + } + tsoAllocator := newMockTsoAllocator() tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) { return 100, nil } - core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex()) + core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex(), withBroker(broker)) core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator) gc := newBgGarbageCollector(core) core.garbageCollector = gc gc.ReDropPartition(0, pchans, &model.Partition{}, 100000) + <-gcConfirmChan + assert.True(t, gcConfirmCalled) <-removePartitionChan assert.True(t, removePartitionCalled) })