diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 044000bf0a..92b5e9e60b 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -18,11 +18,13 @@ package datacoord import ( "context" + "fmt" "sync" "time" "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/datapb" "go.uber.org/zap" "stathat.com/c/consistent" @@ -44,6 +46,7 @@ type ChannelManager struct { assignPolicy ChannelAssignPolicy reassignPolicy ChannelReassignPolicy bgChecker ChannelBGChecker + msgstreamFactory msgstream.Factory } type channel struct { @@ -57,10 +60,15 @@ type ChannelManagerOpt func(c *ChannelManager) func withFactory(f ChannelPolicyFactory) ChannelManagerOpt { return func(c *ChannelManager) { c.factory = f } } + func defaultFactory(hash *consistent.Consistent) ChannelPolicyFactory { return NewConsistentHashChannelPolicyFactory(hash) } +func withMsgstreamFactory(f msgstream.Factory) ChannelManagerOpt { + return func(c *ChannelManager) { c.msgstreamFactory = f } +} + // NewChannelManager returns a new ChannelManager func NewChannelManager( kv kv.TxnKV, @@ -226,6 +234,13 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error { c.mu.Lock() defer c.mu.Unlock() + nodeChannelInfo := c.store.GetNode(nodeID) + if nodeChannelInfo == nil { + return nil + } + + c.tryToUnsubscribe(nodeChannelInfo) + updates := c.deregisterPolicy(c.store, nodeID) log.Debug("deregister node", zap.Int64("unregistered node", nodeID), @@ -243,6 +258,41 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error { return err } +func (c *ChannelManager) tryToUnsubscribe(nodeChannelInfo *NodeChannelInfo) { + if nodeChannelInfo == nil { + return + } + + if c.msgstreamFactory == nil { + log.Warn("msgstream factory is not set") + return + } + + nodeID := nodeChannelInfo.NodeID + for _, ch := range nodeChannelInfo.Channels { + subscriptionName := subscriptionGenerator(ch.CollectionID, nodeID) + err := c.unsubscribe(subscriptionName, ch.Name) + if err != nil { + log.Warn("failed to unsubcribe topic", zap.String("subscription name", subscriptionName), zap.String("channel name", ch.Name)) + } + } +} + +func subscriptionGenerator(collectionID int64, nodeID int64) string { + return fmt.Sprintf("%s-%s-%d-%d", Params.DataNodeCfg.MsgChannelSubName, Params.DataNodeCfg.SubscriptionNamePrefix, nodeID, collectionID) +} + +func (c *ChannelManager) unsubscribe(subscriptionName string, channel string) error { + msgStream, err := c.msgstreamFactory.NewMsgStream(context.TODO()) + if err != nil { + return err + } + + msgStream.AsConsumer([]string{channel}, subscriptionName) + msgStream.Close() + return nil +} + // Watch try to add the channel to cluster. If the channel already exists, do nothing func (c *ChannelManager) Watch(ch *channel) error { c.mu.Lock() diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index d309bfc805..05d88bf538 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -307,7 +307,7 @@ func (s *Server) initCluster() error { } var err error - s.channelManager, err = NewChannelManager(s.kvClient, s.handler) + s.channelManager, err = NewChannelManager(s.kvClient, s.handler, withMsgstreamFactory(s.msFactory)) if err != nil { return err } diff --git a/internal/util/paramtable/global_param.go b/internal/util/paramtable/global_param.go index 0a5116eb30..f074b24094 100644 --- a/internal/util/paramtable/global_param.go +++ b/internal/util/paramtable/global_param.go @@ -1246,6 +1246,8 @@ type dataNodeConfig struct { CreatedTime time.Time UpdatedTime time.Time + + SubscriptionNamePrefix string } func (p *dataNodeConfig) init(bp *BaseParamTable) { @@ -1267,6 +1269,7 @@ func (p *dataNodeConfig) init(bp *BaseParamTable) { p.initDmlChannelName() p.initDeltaChannelName() + p.initSubscriptionNamePrefix() } // Refresh is called after session init @@ -1364,6 +1367,13 @@ func (p *dataNodeConfig) initDeltaChannelName() { p.DeltaChannelName = strings.Join(s, "-") } +func (p *dataNodeConfig) initSubscriptionNamePrefix() { + prefix, err := p.BaseParams.Load("msgChannel.subNamePrefix.dataNodeSubNamePrefix") + if err != nil { + p.SubscriptionNamePrefix = prefix + } +} + /////////////////////////////////////////////////////////////////////////////// // --- indexcoord --- type indexCoordConfig struct {