diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index 57732ed725..444f9dc55e 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -167,11 +167,13 @@ func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string, ms.consumerChannels = append(ms.consumerChannels, channel) return nil } - err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second)) + // TODO if know the former subscribe is invalid, should we use pulsarctl to accelerate recovery speed + err := retry.Do(context.TODO(), fn, retry.Attempts(50), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second)) if err != nil { errMsg := "Failed to create consumer " + channel + ", error = " + err.Error() panic(errMsg) } + log.Info("Successfully create consumer", zap.String("channel", channel), zap.String("subname", subName)) } } diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go index 2d8898d0d4..bd2687ee76 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go @@ -115,6 +115,12 @@ func (pc *Consumer) Close() { // Unsubscribe for the consumer fn := func() error { err := pc.c.Unsubscribe() + if isPulsarError(err, pulsar.SubscriptionNotFound) || isPulsarError(err, pulsar.ConsumerNotFound) { + log.Warn("failed to find consumer, skip unsubscribe", + zap.String("subscription", pc.Subscription()), + zap.Error(err)) + return nil + } if err != nil { return err } diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index 9c7cafd9f9..6a74345462 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -1121,6 +1121,7 @@ func (m *MetaReplica) getWatchedChannelsByNodeID(nodeID int64) *querypb.Unsubscr // 1. find all the search/dmChannel/deltaChannel the node has watched colID2DmChannels := make(map[UniqueID][]string) colID2DeltaChannels := make(map[UniqueID][]string) + // TODO remove colID2QueryChannel since it's not used colID2QueryChannel := make(map[UniqueID]string) dmChannelInfos := m.getDmChannelInfosByNodeID(nodeID) // get dmChannel/search channel the node has watched @@ -1137,9 +1138,13 @@ func (m *MetaReplica) getWatchedChannelsByNodeID(nodeID int64) *querypb.Unsubscr } } segmentInfos := m.getSegmentInfosByNode(nodeID) - // get delta/search channel the node has watched + colIDs := make(map[UniqueID]bool) + // iterate through segments to find unique collection ids for _, segmentInfo := range segmentInfos { - collectionID := segmentInfo.CollectionID + colIDs[segmentInfo.CollectionID] = true + } + // get delta/search channel the node has watched + for collectionID := range colIDs { if _, ok := colID2DeltaChannels[collectionID]; !ok { deltaChanelInfos, err := m.getDeltaChannelsByCollectionID(collectionID) if err != nil {