mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Fix msgstream unsubscription (#16883)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
34833680da
commit
000c5ff3de
@ -167,11 +167,13 @@ func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string,
|
||||
ms.consumerChannels = append(ms.consumerChannels, channel)
|
||||
return nil
|
||||
}
|
||||
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
|
||||
// TODO if know the former subscribe is invalid, should we use pulsarctl to accelerate recovery speed
|
||||
err := retry.Do(context.TODO(), fn, retry.Attempts(50), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
|
||||
if err != nil {
|
||||
errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
|
||||
panic(errMsg)
|
||||
}
|
||||
log.Info("Successfully create consumer", zap.String("channel", channel), zap.String("subname", subName))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,6 +115,12 @@ func (pc *Consumer) Close() {
|
||||
// Unsubscribe for the consumer
|
||||
fn := func() error {
|
||||
err := pc.c.Unsubscribe()
|
||||
if isPulsarError(err, pulsar.SubscriptionNotFound) || isPulsarError(err, pulsar.ConsumerNotFound) {
|
||||
log.Warn("failed to find consumer, skip unsubscribe",
|
||||
zap.String("subscription", pc.Subscription()),
|
||||
zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1121,6 +1121,7 @@ func (m *MetaReplica) getWatchedChannelsByNodeID(nodeID int64) *querypb.Unsubscr
|
||||
// 1. find all the search/dmChannel/deltaChannel the node has watched
|
||||
colID2DmChannels := make(map[UniqueID][]string)
|
||||
colID2DeltaChannels := make(map[UniqueID][]string)
|
||||
// TODO remove colID2QueryChannel since it's not used
|
||||
colID2QueryChannel := make(map[UniqueID]string)
|
||||
dmChannelInfos := m.getDmChannelInfosByNodeID(nodeID)
|
||||
// get dmChannel/search channel the node has watched
|
||||
@ -1137,9 +1138,13 @@ func (m *MetaReplica) getWatchedChannelsByNodeID(nodeID int64) *querypb.Unsubscr
|
||||
}
|
||||
}
|
||||
segmentInfos := m.getSegmentInfosByNode(nodeID)
|
||||
// get delta/search channel the node has watched
|
||||
colIDs := make(map[UniqueID]bool)
|
||||
// iterate through segments to find unique collection ids
|
||||
for _, segmentInfo := range segmentInfos {
|
||||
collectionID := segmentInfo.CollectionID
|
||||
colIDs[segmentInfo.CollectionID] = true
|
||||
}
|
||||
// get delta/search channel the node has watched
|
||||
for collectionID := range colIDs {
|
||||
if _, ok := colID2DeltaChannels[collectionID]; !ok {
|
||||
deltaChanelInfos, err := m.getDeltaChannelsByCollectionID(collectionID)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user