Return error when create message stream failed in QueryNode (#15317)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2022-03-09 15:17:59 +08:00 committed by GitHub
parent b8be44f190
commit 9ad5c14a4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 140 additions and 90 deletions

View File

@ -43,67 +43,83 @@ type dataSyncService struct {
}
// addFlowGraphsForDMLChannels add flowGraphs to dmlChannel2FlowGraph
func (dsService *dataSyncService) addFlowGraphsForDMLChannels(collectionID UniqueID, dmlChannels []string) map[string]*queryNodeFlowGraph {
func (dsService *dataSyncService) addFlowGraphsForDMLChannels(collectionID UniqueID, dmlChannels []string) (map[string]*queryNodeFlowGraph, error) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
results := make(map[string]*queryNodeFlowGraph)
for _, channel := range dmlChannels {
if fg, ok := dsService.dmlChannel2FlowGraph[channel]; ok {
if _, ok := dsService.dmlChannel2FlowGraph[channel]; ok {
log.Warn("dml flow graph has been existed",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel),
)
results[channel] = fg
continue
}
newFlowGraph := newQueryNodeFlowGraph(dsService.ctx,
newFlowGraph, err := newQueryNodeFlowGraph(dsService.ctx,
collectionID,
dsService.streamingReplica,
dsService.tSafeReplica,
channel,
dsService.msFactory)
dsService.dmlChannel2FlowGraph[channel] = newFlowGraph
if err != nil {
for _, fg := range results {
fg.flowGraph.Close()
}
return nil, err
}
results[channel] = newFlowGraph
}
for channel, fg := range results {
dsService.dmlChannel2FlowGraph[channel] = fg
log.Debug("add DML flow graph",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel))
results[channel] = newFlowGraph
metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
}
return results
return results, nil
}
// addFlowGraphsForDeltaChannels add flowGraphs to deltaChannel2FlowGraph
func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID UniqueID, deltaChannels []string) map[string]*queryNodeFlowGraph {
func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID UniqueID, deltaChannels []string) (map[string]*queryNodeFlowGraph, error) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
results := make(map[string]*queryNodeFlowGraph)
for _, channel := range deltaChannels {
if fg, ok := dsService.deltaChannel2FlowGraph[channel]; ok {
if _, ok := dsService.deltaChannel2FlowGraph[channel]; ok {
log.Warn("delta flow graph has been existed",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel),
)
results[channel] = fg
continue
}
newFlowGraph := newQueryNodeDeltaFlowGraph(dsService.ctx,
newFlowGraph, err := newQueryNodeDeltaFlowGraph(dsService.ctx,
collectionID,
dsService.historicalReplica,
dsService.tSafeReplica,
channel,
dsService.msFactory)
dsService.deltaChannel2FlowGraph[channel] = newFlowGraph
if err != nil {
for _, fg := range results {
fg.flowGraph.Close()
}
return nil, err
}
results[channel] = newFlowGraph
}
for channel, fg := range results {
dsService.deltaChannel2FlowGraph[channel] = fg
log.Debug("add delta flow graph",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel))
results[channel] = newFlowGraph
metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
}
return results
return results, nil
}
// getFlowGraphByDMLChannel returns the DML flowGraph by channel

View File

@ -40,10 +40,12 @@ func TestDataSyncService_DMLFlowGraphs(t *testing.T) {
dataSyncService := newDataSyncService(ctx, streamingReplica, historicalReplica, tSafe, fac)
assert.NotNil(t, dataSyncService)
dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
_, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1)
dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
_, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1)
fg, err := dataSyncService.getFlowGraphByDMLChannel(defaultCollectionID, defaultDMLChannel)
@ -67,7 +69,8 @@ func TestDataSyncService_DMLFlowGraphs(t *testing.T) {
assert.Nil(t, fg)
assert.Error(t, err)
dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
_, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1)
dataSyncService.close()
@ -91,10 +94,12 @@ func TestDataSyncService_DeltaFlowGraphs(t *testing.T) {
dataSyncService := newDataSyncService(ctx, streamingReplica, historicalReplica, tSafe, fac)
assert.NotNil(t, dataSyncService)
dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel})
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1)
dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel})
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1)
fg, err := dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, defaultDeltaChannel)
@ -118,7 +123,8 @@ func TestDataSyncService_DeltaFlowGraphs(t *testing.T) {
assert.Nil(t, fg)
assert.Error(t, err)
dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel})
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1)
dataSyncService.close()

View File

@ -41,14 +41,14 @@ func (dNode *deleteNode) Name() string {
// Operate handles input messages, do delete operations
func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if len(in) != 1 {
log.Error("Invalid operate message input in deleteNode", zap.Int("input length", len(in)))
// TODO: add error handling
log.Warn("Invalid operate message input in deleteNode", zap.Int("input length", len(in)))
return []Msg{}
}
dMsg, ok := in[0].(*deleteMsg)
if !ok {
log.Warn("type assertion failed for deleteMsg")
// TODO: add error handling
return []Msg{}
}
delData := &deleteData{

View File

@ -44,14 +44,14 @@ func (fddNode *filterDeleteNode) Name() string {
// Operate handles input messages, to filter invalid delete messages
func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if len(in) != 1 {
log.Error("Invalid operate message input in filterDDNode", zap.Int("input length", len(in)))
// TODO: add error handling
log.Warn("Invalid operate message input in filterDDNode", zap.Int("input length", len(in)))
return []Msg{}
}
msgStreamMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Warn("type assertion failed for MsgStreamMsg")
// TODO: add error handling
return []Msg{}
}
if msgStreamMsg == nil {

View File

@ -46,14 +46,14 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
//log.Debug("Do filterDmNode operation")
if len(in) != 1 {
log.Error("Invalid operate message input in filterDmNode", zap.Int("input length", len(in)))
// TODO: add error handling
log.Warn("Invalid operate message input in filterDmNode", zap.Int("input length", len(in)))
return []Msg{}
}
msgStreamMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Warn("type assertion failed for MsgStreamMsg")
// TODO: add error handling
return []Msg{}
}
if msgStreamMsg == nil {

View File

@ -71,14 +71,14 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
//log.Debug("Do insertNode operation")
if len(in) != 1 {
log.Error("Invalid operate message input in insertNode", zap.Int("input length", len(in)))
// TODO: add error handling
log.Warn("Invalid operate message input in insertNode", zap.Int("input length", len(in)))
return []Msg{}
}
iMsg, ok := in[0].(*insertMsg)
if !ok {
log.Warn("type assertion failed for insertMsg")
// TODO: add error handling
return []Msg{}
}
iData := insertData{

View File

@ -47,7 +47,7 @@ func newQueryNodeFlowGraph(ctx context.Context,
streamingReplica ReplicaInterface,
tSafeReplica TSafeReplicaInterface,
channel Channel,
factory msgstream.Factory) *queryNodeFlowGraph {
factory msgstream.Factory) (*queryNodeFlowGraph, error) {
ctx1, cancel := context.WithCancel(ctx)
@ -59,10 +59,13 @@ func newQueryNodeFlowGraph(ctx context.Context,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
}
var dmStreamNode node = q.newDmInputNode(ctx1, factory)
dmStreamNode, err := q.newDmInputNode(ctx1, factory)
if err != nil {
return nil, err
}
var filterDmNode node = newFilteredDmNode(streamingReplica, collectionID)
var insertNode node = newInsertNode(streamingReplica)
var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, collectionID, channel)
var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, channel)
q.flowGraph.AddNode(dmStreamNode)
q.flowGraph.AddNode(filterDmNode)
@ -70,12 +73,12 @@ func newQueryNodeFlowGraph(ctx context.Context,
q.flowGraph.AddNode(serviceTimeNode)
// dmStreamNode
var err = q.flowGraph.SetEdges(dmStreamNode.Name(),
err = q.flowGraph.SetEdges(dmStreamNode.Name(),
[]string{},
[]string{filterDmNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", dmStreamNode.Name()))
return nil, fmt.Errorf("set edges failed in node: %s, err = %s", dmStreamNode.Name(), err.Error())
}
// filterDmNode
@ -84,7 +87,7 @@ func newQueryNodeFlowGraph(ctx context.Context,
[]string{insertNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", filterDmNode.Name()))
return nil, fmt.Errorf("set edges failed in node: %s, err = %s", filterDmNode.Name(), err.Error())
}
// insertNode
@ -93,7 +96,7 @@ func newQueryNodeFlowGraph(ctx context.Context,
[]string{serviceTimeNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", insertNode.Name()))
return nil, fmt.Errorf("set edges failed in node: %s, err = %s", insertNode.Name(), err.Error())
}
// serviceTimeNode
@ -102,10 +105,10 @@ func newQueryNodeFlowGraph(ctx context.Context,
[]string{},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", serviceTimeNode.Name()))
return nil, fmt.Errorf("set edges failed in node: %s, err = %s", serviceTimeNode.Name(), err.Error())
}
return q
return q, nil
}
// newQueryNodeDeltaFlowGraph returns a new queryNodeFlowGraph
@ -114,7 +117,7 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
historicalReplica ReplicaInterface,
tSafeReplica TSafeReplicaInterface,
channel Channel,
factory msgstream.Factory) *queryNodeFlowGraph {
factory msgstream.Factory) (*queryNodeFlowGraph, error) {
ctx1, cancel := context.WithCancel(ctx)
@ -126,10 +129,13 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
}
var dmStreamNode node = q.newDmInputNode(ctx1, factory)
dmStreamNode, err := q.newDmInputNode(ctx1, factory)
if err != nil {
return nil, err
}
var filterDeleteNode node = newFilteredDeleteNode(historicalReplica, collectionID)
var deleteNode node = newDeleteNode(historicalReplica)
var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, collectionID, channel)
var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, channel)
q.flowGraph.AddNode(dmStreamNode)
q.flowGraph.AddNode(filterDeleteNode)
@ -137,12 +143,12 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
q.flowGraph.AddNode(serviceTimeNode)
// dmStreamNode
var err = q.flowGraph.SetEdges(dmStreamNode.Name(),
err = q.flowGraph.SetEdges(dmStreamNode.Name(),
[]string{},
[]string{filterDeleteNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", dmStreamNode.Name()))
return nil, fmt.Errorf("set edges failed in node: %s, err = %s", dmStreamNode.Name(), err.Error())
}
// filterDmNode
@ -151,7 +157,7 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
[]string{deleteNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", filterDeleteNode.Name()))
return nil, fmt.Errorf("set edges failed in node: %s, err = %s", filterDeleteNode.Name(), err.Error())
}
// insertNode
@ -160,7 +166,7 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
[]string{serviceTimeNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", deleteNode.Name()))
return nil, fmt.Errorf("set edges failed in node: %s, err = %s", deleteNode.Name(), err.Error())
}
// serviceTimeNode
@ -169,30 +175,30 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
[]string{},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", serviceTimeNode.Name()))
return nil, fmt.Errorf("set edges failed in node: %s, err = %s", serviceTimeNode.Name(), err.Error())
}
return q
return q, nil
}
// newDmInputNode returns a new inputNode
func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory) *flowgraph.InputNode {
func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory) (*flowgraph.InputNode, error) {
insertStream, err := factory.NewTtMsgStream(ctx)
if err != nil {
log.Warn(err.Error())
} else {
q.dmlStream = insertStream
return nil, err
}
q.dmlStream = insertStream
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
node := flowgraph.NewInputNode(insertStream, "dmlInputNode", maxQueueLength, maxParallelism)
return node
return node, nil
}
// consumerFlowGraph would consume by channel and subName
func (q *queryNodeFlowGraph) consumerFlowGraph(channel Channel, subName ConsumeSubName) error {
// consumeFlowGraph would consume by channel and subName
func (q *queryNodeFlowGraph) consumeFlowGraph(channel Channel, subName ConsumeSubName) error {
if q.dmlStream == nil {
return errors.New("null dml message stream in flow graph")
}
@ -207,8 +213,8 @@ func (q *queryNodeFlowGraph) consumerFlowGraph(channel Channel, subName ConsumeS
return nil
}
// consumerFlowGraphLatest would consume from latest by channel and subName
func (q *queryNodeFlowGraph) consumerFlowGraphLatest(channel Channel, subName ConsumeSubName) error {
// consumeFlowGraphFromLatest would consume from latest by channel and subName
func (q *queryNodeFlowGraph) consumeFlowGraphFromLatest(channel Channel, subName ConsumeSubName) error {
if q.dmlStream == nil {
return errors.New("null dml message stream in flow graph")
}

View File

@ -37,14 +37,15 @@ func TestQueryNodeFlowGraph_consumerFlowGraph(t *testing.T) {
fac, err := genFactory()
assert.NoError(t, err)
fg := newQueryNodeFlowGraph(ctx,
fg, err := newQueryNodeFlowGraph(ctx,
defaultCollectionID,
streamingReplica,
tSafe,
defaultDMLChannel,
fac)
assert.NoError(t, err)
err = fg.consumerFlowGraph(defaultDMLChannel, defaultSubName)
err = fg.consumeFlowGraph(defaultDMLChannel, defaultSubName)
assert.NoError(t, err)
fg.close()
@ -62,12 +63,13 @@ func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) {
tSafe := newTSafeReplica()
fg := newQueryNodeFlowGraph(ctx,
fg, err := newQueryNodeFlowGraph(ctx,
defaultCollectionID,
streamingReplica,
tSafe,
defaultDMLChannel,
fac)
assert.NoError(t, err)
position := &internalpb.MsgPosition{
ChannelName: defaultDMLChannel,

View File

@ -17,7 +17,6 @@
package querynode
import (
"context"
"fmt"
"go.uber.org/zap"
@ -39,23 +38,19 @@ func (stNode *serviceTimeNode) Name() string {
return fmt.Sprintf("stNode-%d-%s", stNode.collectionID, stNode.vChannel)
}
// Close would close serviceTimeNode
func (stNode *serviceTimeNode) Close() {
}
// Operate handles input messages, to execute insert operations
func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
//log.Debug("Do serviceTimeNode operation")
if len(in) != 1 {
log.Error("Invalid operate message input in serviceTimeNode, input length = ", zap.Int("input node", len(in)))
// TODO: add error handling
log.Warn("Invalid operate message input in serviceTimeNode, input length = ", zap.Int("input node", len(in)))
return []Msg{}
}
serviceTimeMsg, ok := in[0].(*serviceTimeMsg)
if !ok {
log.Warn("type assertion failed for serviceTimeMsg")
// TODO: add error handling
return []Msg{}
}
if serviceTimeMsg == nil {
@ -83,8 +78,7 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
// newServiceTimeNode returns a new serviceTimeNode
func newServiceTimeNode(ctx context.Context,
tSafeReplica TSafeReplicaInterface,
func newServiceTimeNode(tSafeReplica TSafeReplicaInterface,
collectionID UniqueID,
channel Channel) *serviceTimeNode {

View File

@ -17,22 +17,17 @@
package querynode
import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
func TestServiceTimeNode_Operate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
genServiceTimeNode := func() *serviceTimeNode {
tSafe := newTSafeReplica()
tSafe.addTSafe(defaultDMLChannel)
node := newServiceTimeNode(ctx,
tSafe,
node := newServiceTimeNode(tSafe,
defaultCollectionID,
defaultDMLChannel)
return node

View File

@ -671,6 +671,21 @@ func genFactory() (msgstream.Factory, error) {
return msFactory, nil
}
func genInvalidFactory() (msgstream.Factory, error) {
const receiveBufSize = 1024
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": receiveBufSize,
"pulsarAddress": "",
"pulsarBufSize": 1024}
err := msFactory.SetParams(m)
if err != nil {
return nil, err
}
return msFactory, nil
}
func genQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
fac, err := genFactory()
if err != nil {

View File

@ -380,15 +380,18 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
)
// add flow graph
channel2FlowGraph := w.node.dataSyncService.addFlowGraphsForDMLChannels(collectionID, vChannels)
channel2FlowGraph, err := w.node.dataSyncService.addFlowGraphsForDMLChannels(collectionID, vChannels)
if err != nil {
log.Warn("watchDMChannel, add flowGraph for dmChannels failed", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels), zap.Error(err))
return err
}
log.Debug("Query node add DML flow graphs", zap.Int64("collectionID", collectionID), zap.Any("channels", vChannels))
// channels as consumer
for _, channel := range vChannels {
fg := channel2FlowGraph[channel]
for channel, fg := range channel2FlowGraph {
if _, ok := channel2AsConsumerPosition[channel]; ok {
// use pChannel to consume
err = fg.consumerFlowGraph(VPChannels[channel], consumeSubName)
err = fg.consumeFlowGraph(VPChannels[channel], consumeSubName)
if err != nil {
log.Error("msgStream as consumer failed for dmChannels", zap.Int64("collectionID", collectionID), zap.String("vChannel", channel))
break
@ -412,7 +415,11 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
for _, fg := range channel2FlowGraph {
fg.flowGraph.Close()
}
w.node.dataSyncService.removeFlowGraphsByDMLChannels(vChannels)
gcChannels := make([]Channel, 0)
for channel := range channel2FlowGraph {
gcChannels = append(gcChannels, channel)
}
w.node.dataSyncService.removeFlowGraphsByDMLChannels(gcChannels)
return err
}
@ -531,13 +538,16 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
return err
}
channel2FlowGraph := w.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels)
channel2FlowGraph, err := w.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels)
if err != nil {
log.Warn("watchDeltaChannel, add flowGraph for deltaChannel failed", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels), zap.Error(err))
return err
}
consumeSubName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
// channels as consumer
for _, channel := range vDeltaChannels {
fg := channel2FlowGraph[channel]
for channel, fg := range channel2FlowGraph {
// use pChannel to consume
err = fg.consumerFlowGraphLatest(VPDeltaChannels[channel], consumeSubName)
err = fg.consumeFlowGraphFromLatest(VPDeltaChannels[channel], consumeSubName)
if err != nil {
log.Error("msgStream as consumer failed for deltaChannels", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels))
break
@ -553,7 +563,11 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
for _, fg := range channel2FlowGraph {
fg.flowGraph.Close()
}
w.node.dataSyncService.removeFlowGraphsByDeltaChannels(vDeltaChannels)
gcChannels := make([]Channel, 0)
for channel := range channel2FlowGraph {
gcChannels = append(gcChannels, channel)
}
w.node.dataSyncService.removeFlowGraphsByDeltaChannels(gcChannels)
return err
}

View File

@ -738,7 +738,8 @@ func TestTask_releasePartitionTask(t *testing.T) {
req: genReleasePartitionsRequest(),
node: node,
}
task.node.dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
_, err = task.node.dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
err = task.Execute(ctx)
assert.NoError(t, err)
})
@ -781,7 +782,8 @@ func TestTask_releasePartitionTask(t *testing.T) {
req: genReleasePartitionsRequest(),
node: node,
}
task.node.dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
_, err = task.node.dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
err = task.Execute(ctx)
assert.NoError(t, err)
})