mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
Fix rootcoord restoration missing gcConfirmStep (#25293)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
3112dad356
commit
412b2fab67
@ -66,6 +66,7 @@ func (c *bgGarbageCollector) ReDropCollection(collMeta *model.Collection, ts Tim
|
||||
baseStep: baseStep{core: c.s},
|
||||
pChannels: collMeta.PhysicalChannelNames,
|
||||
})
|
||||
redo.AddAsyncStep(newConfirmGCStep(c.s, collMeta.CollectionID, allPartition))
|
||||
redo.AddAsyncStep(&deleteCollectionMetaStep{
|
||||
baseStep: baseStep{core: c.s},
|
||||
collectionID: collMeta.CollectionID,
|
||||
@ -121,6 +122,7 @@ func (c *bgGarbageCollector) ReDropPartition(dbID int64, pChannels []string, par
|
||||
baseStep: baseStep{core: c.s},
|
||||
pChannels: pChannels,
|
||||
})
|
||||
redo.AddAsyncStep(newConfirmGCStep(c.s, partition.CollectionID, partition.PartitionID))
|
||||
redo.AddAsyncStep(&removePartitionMetaStep{
|
||||
baseStep: baseStep{core: c.s},
|
||||
dbID: dbID,
|
||||
|
@ -35,6 +35,11 @@ import (
|
||||
)
|
||||
|
||||
func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
|
||||
oldValue := confirmGCInterval
|
||||
defer func() {
|
||||
confirmGCInterval = oldValue
|
||||
}()
|
||||
confirmGCInterval = 0
|
||||
t.Run("failed to release collection", func(t *testing.T) {
|
||||
broker := newMockBroker()
|
||||
broker.ReleaseCollectionFunc = func(ctx context.Context, collectionID UniqueID) error {
|
||||
@ -110,6 +115,13 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
|
||||
releaseCollectionChan <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
gcConfirmCalled := false
|
||||
gcConfirmChan := make(chan struct{})
|
||||
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
|
||||
gcConfirmCalled = true
|
||||
close(gcConfirmChan)
|
||||
return true
|
||||
}
|
||||
dropCollectionIndexCalled := false
|
||||
dropCollectionIndexChan := make(chan struct{}, 1)
|
||||
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
|
||||
@ -146,6 +158,8 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
|
||||
assert.True(t, releaseCollectionCalled)
|
||||
<-dropCollectionIndexChan
|
||||
assert.True(t, dropCollectionIndexCalled)
|
||||
<-gcConfirmChan
|
||||
assert.True(t, gcConfirmCalled)
|
||||
<-dropMetaChan
|
||||
})
|
||||
|
||||
@ -165,6 +179,13 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
|
||||
dropCollectionIndexChan <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
gcConfirmCalled := false
|
||||
gcConfirmChan := make(chan struct{})
|
||||
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
|
||||
gcConfirmCalled = true
|
||||
close(gcConfirmChan)
|
||||
return true
|
||||
}
|
||||
meta := mockrootcoord.NewIMetaTable(t)
|
||||
removeCollectionCalled := false
|
||||
removeCollectionChan := make(chan struct{}, 1)
|
||||
@ -197,6 +218,8 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
|
||||
assert.True(t, dropCollectionIndexCalled)
|
||||
<-removeCollectionChan
|
||||
assert.True(t, removeCollectionCalled)
|
||||
<-gcConfirmChan
|
||||
assert.True(t, gcConfirmCalled)
|
||||
})
|
||||
}
|
||||
|
||||
@ -312,6 +335,11 @@ func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
|
||||
oldValue := confirmGCInterval
|
||||
defer func() {
|
||||
confirmGCInterval = oldValue
|
||||
}()
|
||||
confirmGCInterval = 0
|
||||
t.Run("failed to GcPartitionData", func(t *testing.T) {
|
||||
ticker := newTickerWithMockFailStream() // failed to broadcast drop msg.
|
||||
shardsNum := int(common.DefaultShardsNum)
|
||||
@ -347,15 +375,26 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
|
||||
return errors.New("error mock RemovePartition")
|
||||
})
|
||||
|
||||
broker := newMockBroker()
|
||||
gcConfirmCalled := false
|
||||
gcConfirmChan := make(chan struct{})
|
||||
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
|
||||
gcConfirmCalled = true
|
||||
close(gcConfirmChan)
|
||||
return true
|
||||
}
|
||||
|
||||
tsoAllocator := newMockTsoAllocator()
|
||||
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
|
||||
return 100, nil
|
||||
}
|
||||
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex())
|
||||
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex(), withBroker(broker))
|
||||
core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator)
|
||||
gc := newBgGarbageCollector(core)
|
||||
core.garbageCollector = gc
|
||||
gc.ReDropPartition(0, pchans, &model.Partition{}, 100000)
|
||||
<-gcConfirmChan
|
||||
assert.True(t, gcConfirmCalled)
|
||||
<-removePartitionChan
|
||||
assert.True(t, removePartitionCalled)
|
||||
})
|
||||
@ -380,15 +419,27 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
|
||||
broker := newMockBroker()
|
||||
|
||||
gcConfirmCalled := false
|
||||
gcConfirmChan := make(chan struct{})
|
||||
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
|
||||
gcConfirmCalled = true
|
||||
close(gcConfirmChan)
|
||||
return true
|
||||
}
|
||||
|
||||
tsoAllocator := newMockTsoAllocator()
|
||||
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
|
||||
return 100, nil
|
||||
}
|
||||
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex())
|
||||
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex(), withBroker(broker))
|
||||
core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator)
|
||||
gc := newBgGarbageCollector(core)
|
||||
core.garbageCollector = gc
|
||||
gc.ReDropPartition(0, pchans, &model.Partition{}, 100000)
|
||||
<-gcConfirmChan
|
||||
assert.True(t, gcConfirmCalled)
|
||||
<-removePartitionChan
|
||||
assert.True(t, removePartitionCalled)
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user