Use vchannel name in datanode subName (#18519)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2022-08-03 17:10:36 +08:00 committed by GitHub
parent 91ab8373b3
commit 28aadc988d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 4 additions and 3 deletions

View File

@ -403,7 +403,8 @@ func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) {
nodeID := ncInfo.NodeID
for _, ch := range ncInfo.Channels {
subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, nodeID, ch.CollectionID)
// align to datanode subname, using vchannel name
subName := fmt.Sprintf("%s-%d-%s", Params.CommonCfg.DataNodeSubName, nodeID, ch.Name)
pchannelName := funcutil.ToPhysicalChannel(ch.Name)
msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
}

View File

@ -34,8 +34,8 @@ import (
// flowgraph ddNode.
func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNodeConfig *nodeConfig) (*flowgraph.InputNode, error) {
// subName should be unique, since pchannelName is shared among several collections
// consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10)
consumeSubName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, Params.DataNodeCfg.GetNodeID(), dmNodeConfig.collectionID)
// use vchannel in case of reuse pchannel for same collection
consumeSubName := fmt.Sprintf("%s-%d-%s", Params.CommonCfg.DataNodeSubName, Params.DataNodeCfg.GetNodeID(), dmNodeConfig.vChannelName)
insertStream, err := dmNodeConfig.msFactory.NewTtMsgStream(ctx)
if err != nil {
return nil, err