UnWatch channels if failed to create collection (#19390)

Signed-off-by: longjiquan <jiquan.long@zilliz.com>

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
Jiquan Long 2022-09-23 16:56:50 +08:00 committed by GitHub
parent d4bc00423c
commit 22477d4601
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 320 additions and 42 deletions

View File

@ -268,7 +268,14 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
}, &deleteCollectionMetaStep{ }, &deleteCollectionMetaStep{
baseStep: baseStep{core: t.core}, baseStep: baseStep{core: t.core},
collectionID: collID, collectionID: collID,
ts: ts, // When we undo createCollectionTask, this ts may be less than the ts when unwatch channels.
ts: ts,
})
// serve for this case: watching channels succeed in datacoord but failed due to network failure.
undoTask.AddStep(&nullStep{}, &unwatchChannelsStep{
baseStep: baseStep{core: t.core},
collectionID: collID,
channels: t.channels,
}) })
undoTask.AddStep(&watchChannelsStep{ undoTask.AddStep(&watchChannelsStep{
baseStep: baseStep{core: t.core}, baseStep: baseStep{core: t.core},
@ -278,11 +285,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
vChannels: t.channels.virtualChannels, vChannels: t.channels.virtualChannels,
startPositions: toKeyDataPairs(startPositions), startPositions: toKeyDataPairs(startPositions),
}, },
}, &unwatchChannelsStep{ }, &nullStep{})
baseStep: baseStep{core: t.core},
collectionID: collID,
channels: t.channels,
})
undoTask.AddStep(&changeCollectionStateStep{ undoTask.AddStep(&changeCollectionStateStep{
baseStep: baseStep{core: t.core}, baseStep: baseStep{core: t.core},
collectionID: collID, collectionID: collID,

View File

@ -6,6 +6,10 @@ import (
"testing" "testing"
"time" "time"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
"github.com/stretchr/testify/mock"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/milvuspb" "github.com/milvus-io/milvus/api/milvuspb"
@ -446,17 +450,26 @@ func Test_createCollectionTask_Execute(t *testing.T) {
broker.WatchChannelsFunc = func(ctx context.Context, info *watchInfo) error { broker.WatchChannelsFunc = func(ctx context.Context, info *watchInfo) error {
return nil return nil
} }
unwatchChannelsCalled := false unwatchChannelsCalled := false
unwatchChannelsChan := make(chan struct{}, 1) unwatchChannelsChan := make(chan struct{}, 1)
broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error { gc := mockrootcoord.NewGarbageCollector(t)
gc.On("GcCollectionData",
mock.Anything, // context.Context
mock.Anything, // *model.Collection
).Return(func(ctx context.Context, collection *model.Collection) (ddlTs Timestamp) {
for _, pchan := range pchans {
ticker.syncedTtHistogram.update(pchan, 101)
}
unwatchChannelsCalled = true unwatchChannelsCalled = true
unwatchChannelsChan <- struct{}{} unwatchChannelsChan <- struct{}{}
return nil return 100
} }, nil)
core := newTestCore(withValidIDAllocator(), core := newTestCore(withValidIDAllocator(),
withMeta(meta), withMeta(meta),
withTtSynchronizer(ticker), withTtSynchronizer(ticker),
withGarbageCollector(gc),
withBroker(broker)) withBroker(broker))
schema := &schemapb.CollectionSchema{ schema := &schemapb.CollectionSchema{

View File

@ -10,6 +10,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/metastore/model"
) )
//go:generate mockery --name=GarbageCollector --outpkg=mockrootcoord
type GarbageCollector interface { type GarbageCollector interface {
ReDropCollection(collMeta *model.Collection, ts Timestamp) ReDropCollection(collMeta *model.Collection, ts Timestamp)
RemoveCreatingCollection(collMeta *model.Collection) RemoveCreatingCollection(collMeta *model.Collection)
@ -62,7 +63,11 @@ func (c *bgGarbageCollector) ReDropCollection(collMeta *model.Collection, ts Tim
} }
func (c *bgGarbageCollector) RemoveCreatingCollection(collMeta *model.Collection) { func (c *bgGarbageCollector) RemoveCreatingCollection(collMeta *model.Collection) {
// TODO: remove this after data gc can be notified by rpc.
c.s.chanTimeTick.addDmlChannels(collMeta.PhysicalChannelNames...)
redo := newBaseRedoTask(c.s.stepExecutor) redo := newBaseRedoTask(c.s.stepExecutor)
redo.AddAsyncStep(&unwatchChannelsStep{ redo.AddAsyncStep(&unwatchChannelsStep{
baseStep: baseStep{core: c.s}, baseStep: baseStep{core: c.s},
collectionID: collMeta.CollectionID, collectionID: collMeta.CollectionID,
@ -71,10 +76,15 @@ func (c *bgGarbageCollector) RemoveCreatingCollection(collMeta *model.Collection
physicalChannels: collMeta.PhysicalChannelNames, physicalChannels: collMeta.PhysicalChannelNames,
}, },
}) })
redo.AddAsyncStep(&removeDmlChannelsStep{
baseStep: baseStep{core: c.s},
pChannels: collMeta.PhysicalChannelNames,
})
redo.AddAsyncStep(&deleteCollectionMetaStep{ redo.AddAsyncStep(&deleteCollectionMetaStep{
baseStep: baseStep{core: c.s}, baseStep: baseStep{core: c.s},
collectionID: collMeta.CollectionID, collectionID: collMeta.CollectionID,
ts: collMeta.CreateTime, // When we undo createCollectionTask, this ts may be less than the ts when unwatch channels.
ts: collMeta.CreateTime,
}) })
// err is ignored since no sync steps will be executed. // err is ignored since no sync steps will be executed.
_ = redo.Execute(context.Background()) _ = redo.Execute(context.Background())

View File

@ -3,8 +3,12 @@ package rootcoord
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"testing" "testing"
mocktso "github.com/milvus-io/milvus/internal/tso/mocks"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/metastore/model"
@ -164,45 +168,87 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) { func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) {
t.Run("failed to UnwatchChannels", func(t *testing.T) { t.Run("failed to UnwatchChannels", func(t *testing.T) {
broker := newMockBroker() defer cleanTestEnv()
broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error {
return errors.New("error mock UnwatchChannels") shardNum := 2
ticker := newRocksMqTtSynchronizer()
pchans := ticker.getDmlChannelNames(shardNum)
tsoAllocator := mocktso.NewAllocator(t)
tsoAllocator.
On("GenerateTSO", mock.AnythingOfType("uint32")).
Return(Timestamp(0), errors.New("error mock GenerateTSO"))
executed := make(chan struct{}, 1)
executor := newMockStepExecutor()
executor.AddStepsFunc = func(s *stepStack) {
s.Execute(context.Background())
executed <- struct{}{}
} }
core := newTestCore(withBroker(broker))
core := newTestCore(withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withStepExecutor(executor))
gc := newBgGarbageCollector(core) gc := newBgGarbageCollector(core)
core.ddlTsLockManager = newDdlTsLockManagerV2(tsoAllocator)
core.garbageCollector = gc core.garbageCollector = gc
gc.RemoveCreatingCollection(&model.Collection{})
gc.RemoveCreatingCollection(&model.Collection{PhysicalChannelNames: pchans})
<-executed
}) })
t.Run("failed to RemoveCollection", func(t *testing.T) { t.Run("failed to RemoveCollection", func(t *testing.T) {
broker := newMockBroker() defer cleanTestEnv()
unwatchChannelsCalled := false
unwatchChannelsChan := make(chan struct{}, 1) shardNum := 2
broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error {
unwatchChannelsCalled = true ticker := newRocksMqTtSynchronizer()
unwatchChannelsChan <- struct{}{} pchans := ticker.getDmlChannelNames(shardNum)
return nil
tsoAllocator := mocktso.NewAllocator(t)
tsoAllocator.
On("GenerateTSO", mock.AnythingOfType("uint32")).
Return(Timestamp(100), nil)
for _, pchan := range pchans {
ticker.syncedTtHistogram.update(pchan, 101)
} }
meta := newMockMetaTable() meta := newMockMetaTable()
removeCollectionCalled := false
removeCollectionChan := make(chan struct{}, 1)
meta.RemoveCollectionFunc = func(ctx context.Context, collectionID UniqueID, ts Timestamp) error { meta.RemoveCollectionFunc = func(ctx context.Context, collectionID UniqueID, ts Timestamp) error {
return errors.New("error mock RemoveCollection") removeCollectionCalled = true
removeCollectionChan <- struct{}{}
return fmt.Errorf("error mock RemoveCollection")
} }
core := newTestCore(withBroker(broker), withMeta(meta))
core := newTestCore(withTtSynchronizer(ticker), withMeta(meta), withTsoAllocator(tsoAllocator))
gc := newBgGarbageCollector(core) gc := newBgGarbageCollector(core)
gc.RemoveCreatingCollection(&model.Collection{}) core.ddlTsLockManager = newDdlTsLockManagerV2(tsoAllocator)
<-unwatchChannelsChan core.garbageCollector = gc
assert.True(t, unwatchChannelsCalled)
gc.RemoveCreatingCollection(&model.Collection{PhysicalChannelNames: pchans})
<-removeCollectionChan
assert.True(t, removeCollectionCalled) // though it fail.
}) })
t.Run("normal case", func(t *testing.T) { t.Run("normal case", func(t *testing.T) {
broker := newMockBroker() defer cleanTestEnv()
unwatchChannelsCalled := false
unwatchChannelsChan := make(chan struct{}, 1) shardNum := 2
broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error {
unwatchChannelsCalled = true ticker := newRocksMqTtSynchronizer()
unwatchChannelsChan <- struct{}{} pchans := ticker.getDmlChannelNames(shardNum)
return nil
tsoAllocator := mocktso.NewAllocator(t)
tsoAllocator.
On("GenerateTSO", mock.AnythingOfType("uint32")).
Return(Timestamp(100), nil)
for _, pchan := range pchans {
ticker.syncedTtHistogram.update(pchan, 101)
} }
meta := newMockMetaTable() meta := newMockMetaTable()
removeCollectionCalled := false removeCollectionCalled := false
removeCollectionChan := make(chan struct{}, 1) removeCollectionChan := make(chan struct{}, 1)
@ -211,11 +257,13 @@ func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) {
removeCollectionChan <- struct{}{} removeCollectionChan <- struct{}{}
return nil return nil
} }
core := newTestCore(withBroker(broker), withMeta(meta))
core := newTestCore(withTtSynchronizer(ticker), withMeta(meta), withTsoAllocator(tsoAllocator))
gc := newBgGarbageCollector(core) gc := newBgGarbageCollector(core)
gc.RemoveCreatingCollection(&model.Collection{}) core.ddlTsLockManager = newDdlTsLockManagerV2(tsoAllocator)
<-unwatchChannelsChan core.garbageCollector = gc
assert.True(t, unwatchChannelsCalled)
gc.RemoveCreatingCollection(&model.Collection{PhysicalChannelNames: pchans})
<-removeCollectionChan <-removeCollectionChan
assert.True(t, removeCollectionCalled) assert.True(t, removeCollectionCalled)
}) })

View File

@ -0,0 +1,87 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
package mockrootcoord
import (
context "context"
model "github.com/milvus-io/milvus/internal/metastore/model"
mock "github.com/stretchr/testify/mock"
)
// GarbageCollector is an autogenerated mock type for the GarbageCollector type
type GarbageCollector struct {
mock.Mock
}
// GcCollectionData provides a mock function with given fields: ctx, coll
func (_m *GarbageCollector) GcCollectionData(ctx context.Context, coll *model.Collection) (uint64, error) {
ret := _m.Called(ctx, coll)
var r0 uint64
if rf, ok := ret.Get(0).(func(context.Context, *model.Collection) uint64); ok {
r0 = rf(ctx, coll)
} else {
r0 = ret.Get(0).(uint64)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *model.Collection) error); ok {
r1 = rf(ctx, coll)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GcPartitionData provides a mock function with given fields: ctx, pChannels, partition
func (_m *GarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (uint64, error) {
ret := _m.Called(ctx, pChannels, partition)
var r0 uint64
if rf, ok := ret.Get(0).(func(context.Context, []string, *model.Partition) uint64); ok {
r0 = rf(ctx, pChannels, partition)
} else {
r0 = ret.Get(0).(uint64)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, []string, *model.Partition) error); ok {
r1 = rf(ctx, pChannels, partition)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ReDropCollection provides a mock function with given fields: collMeta, ts
func (_m *GarbageCollector) ReDropCollection(collMeta *model.Collection, ts uint64) {
_m.Called(collMeta, ts)
}
// ReDropPartition provides a mock function with given fields: pChannels, partition, ts
func (_m *GarbageCollector) ReDropPartition(pChannels []string, partition *model.Partition, ts uint64) {
_m.Called(pChannels, partition, ts)
}
// RemoveCreatingCollection provides a mock function with given fields: collMeta
func (_m *GarbageCollector) RemoveCreatingCollection(collMeta *model.Collection) {
_m.Called(collMeta)
}
type mockConstructorTestingTNewGarbageCollector interface {
mock.TestingT
Cleanup(func())
}
// NewGarbageCollector creates a new instance of GarbageCollector. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewGarbageCollector(t mockConstructorTestingTNewGarbageCollector) *GarbageCollector {
mock := &GarbageCollector{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -111,8 +111,11 @@ type unwatchChannelsStep struct {
} }
func (s *unwatchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) { func (s *unwatchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.broker.UnwatchChannels(ctx, &watchInfo{collectionID: s.collectionID, vChannels: s.channels.virtualChannels}) unwatchByDropMsg := &deleteCollectionDataStep{
return nil, err baseStep: baseStep{core: s.core},
coll: &model.Collection{CollectionID: s.collectionID, PhysicalChannelNames: s.channels.physicalChannels},
}
return unwatchByDropMsg.Execute(ctx)
} }
func (s *unwatchChannelsStep) Desc() string { func (s *unwatchChannelsStep) Desc() string {

View File

@ -42,6 +42,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
//go:generate mockery --name=Allocator --outpkg=mocktso
// Allocator is a Timestamp Oracle allocator. // Allocator is a Timestamp Oracle allocator.
type Allocator interface { type Allocator interface {
// Initialize is used to initialize a TSO allocator. // Initialize is used to initialize a TSO allocator.

View File

@ -4,6 +4,8 @@ import (
"time" "time"
) )
// TODO(longjiquan): replace this by mockery.
type MockAllocator struct { type MockAllocator struct {
Allocator Allocator
InitializeF func() error InitializeF func() error

View File

@ -0,0 +1,111 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
package mocktso
import (
time "time"
mock "github.com/stretchr/testify/mock"
)
// Allocator is an autogenerated mock type for the Allocator type
type Allocator struct {
mock.Mock
}
// GenerateTSO provides a mock function with given fields: count
func (_m *Allocator) GenerateTSO(count uint32) (uint64, error) {
ret := _m.Called(count)
var r0 uint64
if rf, ok := ret.Get(0).(func(uint32) uint64); ok {
r0 = rf(count)
} else {
r0 = ret.Get(0).(uint64)
}
var r1 error
if rf, ok := ret.Get(1).(func(uint32) error); ok {
r1 = rf(count)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetLastSavedTime provides a mock function with given fields:
func (_m *Allocator) GetLastSavedTime() time.Time {
ret := _m.Called()
var r0 time.Time
if rf, ok := ret.Get(0).(func() time.Time); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(time.Time)
}
return r0
}
// Initialize provides a mock function with given fields:
func (_m *Allocator) Initialize() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// Reset provides a mock function with given fields:
func (_m *Allocator) Reset() {
_m.Called()
}
// SetTSO provides a mock function with given fields: _a0
func (_m *Allocator) SetTSO(_a0 uint64) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(uint64) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// UpdateTSO provides a mock function with given fields:
func (_m *Allocator) UpdateTSO() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
type mockConstructorTestingTNewAllocator interface {
mock.TestingT
Cleanup(func())
}
// NewAllocator creates a new instance of Allocator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewAllocator(t mockConstructorTestingTNewAllocator) *Allocator {
mock := &Allocator{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -30,10 +30,10 @@ echo "Running unittest under ./internal"
if [[ $(uname -s) == "Darwin" && "$(uname -m)" == "arm64" ]]; then if [[ $(uname -s) == "Darwin" && "$(uname -m)" == "arm64" ]]; then
APPLE_SILICON_FLAG="-tags dynamic" APPLE_SILICON_FLAG="-tags dynamic"
fi fi
for d in $(go list ./internal/... | grep -v -e vendor -e kafka -e planparserv2/generated -e internal/mocks); do for d in $(go list ./internal/... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do
go test -race ${APPLE_SILICON_FLAG} -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" go test -race ${APPLE_SILICON_FLAG} -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
if [ -f profile.out ]; then if [ -f profile.out ]; then
grep -v kafka profile.out | grep -v planparserv2/generated | sed '1d' >> ${FILE_COVERAGE_INFO} grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ${FILE_COVERAGE_INFO}
rm profile.out rm profile.out
fi fi
done done