Add release failed case unittest for query node (#7971)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2021-09-17 11:35:49 +08:00 committed by GitHub
parent a865dfaba9
commit c9b6a92366
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 68 additions and 20 deletions

View File

@ -43,7 +43,7 @@ type dataSyncService struct {
}
// collection flow graph
func (dsService *dataSyncService) addCollectionFlowGraph(collectionID UniqueID, vChannels []string) error {
func (dsService *dataSyncService) addCollectionFlowGraph(collectionID UniqueID, vChannels []string) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
@ -66,7 +66,6 @@ func (dsService *dataSyncService) addCollectionFlowGraph(collectionID UniqueID,
zap.Any("collectionID", collectionID),
zap.Any("channel", vChannel))
}
return nil
}
func (dsService *dataSyncService) getCollectionFlowGraphs(collectionID UniqueID, vChannels []string) (map[Channel]*queryNodeFlowGraph, error) {
@ -119,7 +118,7 @@ func (dsService *dataSyncService) removeCollectionFlowGraph(collectionID UniqueI
}
// partition flow graph
func (dsService *dataSyncService) addPartitionFlowGraph(collectionID UniqueID, partitionID UniqueID, vChannels []string) error {
func (dsService *dataSyncService) addPartitionFlowGraph(collectionID UniqueID, partitionID UniqueID, vChannels []string) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
@ -137,7 +136,6 @@ func (dsService *dataSyncService) addPartitionFlowGraph(collectionID UniqueID, p
dsService.msFactory)
dsService.partitionFlowGraphs[partitionID][vChannel] = newFlowGraph
}
return nil
}
func (dsService *dataSyncService) getPartitionFlowGraphs(partitionID UniqueID, vChannels []string) (map[Channel]*queryNodeFlowGraph, error) {

View File

@ -117,8 +117,7 @@ func TestDataSyncService_Start(t *testing.T) {
assert.Nil(t, err)
channels := []Channel{"0"}
err = node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, channels)
assert.NoError(t, err)
node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, channels)
err = node.streaming.dataSyncService.startCollectionFlowGraph(collectionID, channels)
assert.NoError(t, err)
@ -142,8 +141,7 @@ func TestDataSyncService_collectionFlowGraphs(t *testing.T) {
dataSyncService := newDataSyncService(ctx, streaming.replica, streaming.tSafeReplica, fac)
assert.NotNil(t, dataSyncService)
err = dataSyncService.addCollectionFlowGraph(defaultCollectionID, []Channel{defaultVChannel})
assert.NoError(t, err)
dataSyncService.addCollectionFlowGraph(defaultCollectionID, []Channel{defaultVChannel})
fg, err := dataSyncService.getCollectionFlowGraphs(defaultCollectionID, []Channel{defaultVChannel})
assert.NotNil(t, fg)
@ -186,8 +184,7 @@ func TestDataSyncService_partitionFlowGraphs(t *testing.T) {
dataSyncService := newDataSyncService(ctx, streaming.replica, streaming.tSafeReplica, fac)
assert.NotNil(t, dataSyncService)
err = dataSyncService.addPartitionFlowGraph(defaultPartitionID, defaultPartitionID, []Channel{defaultVChannel})
assert.NoError(t, err)
dataSyncService.addPartitionFlowGraph(defaultPartitionID, defaultPartitionID, []Channel{defaultVChannel})
fg, err := dataSyncService.getPartitionFlowGraphs(defaultPartitionID, []Channel{defaultVChannel})
assert.NotNil(t, fg)

View File

@ -229,16 +229,10 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
// add flow graph
if loadPartition {
err = w.node.streaming.dataSyncService.addPartitionFlowGraph(collectionID, partitionID, vChannels)
if err != nil {
return err
}
w.node.streaming.dataSyncService.addPartitionFlowGraph(collectionID, partitionID, vChannels)
log.Debug("query node add partition flow graphs", zap.Any("channels", vChannels))
} else {
err = w.node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, vChannels)
if err != nil {
return err
}
w.node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, vChannels)
log.Debug("query node add collection flow graphs", zap.Any("channels", vChannels))
}

View File

@ -287,6 +287,9 @@ func TestTask_loadSegmentsTask(t *testing.T) {
}
func TestTask_releaseCollectionTask(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
genReleaseCollectionRequest := func() *querypb.ReleaseCollectionRequest {
req := &querypb.ReleaseCollectionRequest{
Base: genCommonMsgBase(commonpb.MsgType_LoadSegments),
@ -318,6 +321,33 @@ func TestTask_releaseCollectionTask(t *testing.T) {
err = task.OnEnqueue()
assert.NoError(t, err)
})
t.Run("test execute", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
task := releaseCollectionTask{
req: genReleaseCollectionRequest(),
node: node,
}
err = task.Execute(ctx)
assert.NoError(t, err)
})
t.Run("test execute no collection in streaming", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
err = node.streaming.replica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
task := releaseCollectionTask{
req: genReleaseCollectionRequest(),
node: node,
}
err = task.Execute(ctx)
assert.Error(t, err)
})
}
func TestTask_releasePartitionTask(t *testing.T) {
@ -365,11 +395,40 @@ func TestTask_releasePartitionTask(t *testing.T) {
req: genReleasePartitionsRequest(),
node: node,
}
err = task.node.streaming.dataSyncService.addPartitionFlowGraph(defaultCollectionID,
task.node.streaming.dataSyncService.addPartitionFlowGraph(defaultCollectionID,
defaultPartitionID,
[]Channel{defaultVChannel})
assert.NoError(t, err)
err = task.Execute(ctx)
assert.NoError(t, err)
})
t.Run("test execute no collection in historical", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
task := releasePartitionsTask{
req: genReleasePartitionsRequest(),
node: node,
}
err = node.historical.replica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
err = task.Execute(ctx)
assert.Error(t, err)
})
t.Run("test execute no collection in streaming", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
task := releasePartitionsTask{
req: genReleasePartitionsRequest(),
node: node,
}
err = node.streaming.replica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
err = task.Execute(ctx)
assert.Error(t, err)
})
}