From 28aadc988d206202b89c5cbb0df2e26671674534 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 3 Aug 2022 17:10:36 +0800 Subject: [PATCH] Use vchannel name in datanode subName (#18519) Signed-off-by: Congqi Xia --- internal/datacoord/channel_manager.go | 3 ++- internal/datanode/flow_graph_dmstream_input_node.go | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 005ed24bf2..438808fb2d 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -403,7 +403,8 @@ func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) { nodeID := ncInfo.NodeID for _, ch := range ncInfo.Channels { - subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, nodeID, ch.CollectionID) + // align to datanode subname, using vchannel name + subName := fmt.Sprintf("%s-%d-%s", Params.CommonCfg.DataNodeSubName, nodeID, ch.Name) pchannelName := funcutil.ToPhysicalChannel(ch.Name) msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName}) } diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index 86979098b7..2c17700c87 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -34,8 +34,8 @@ import ( // flowgraph ddNode. func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNodeConfig *nodeConfig) (*flowgraph.InputNode, error) { // subName should be unique, since pchannelName is shared among several collections - // consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10) - consumeSubName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, Params.DataNodeCfg.GetNodeID(), dmNodeConfig.collectionID) + // use vchannel in case of reuse pchannel for same collection + consumeSubName := fmt.Sprintf("%s-%d-%s", Params.CommonCfg.DataNodeSubName, Params.DataNodeCfg.GetNodeID(), dmNodeConfig.vChannelName) insertStream, err := dmNodeConfig.msFactory.NewTtMsgStream(ctx) if err != nil { return nil, err