Prevent exclusive consumer exception in pulsar (#25376)

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2023-07-12 17:26:30 +08:00 committed by GitHub
parent fc12d3997c
commit f9e2d00f91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 38 additions and 5 deletions

View File

@ -426,12 +426,12 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version
switch watchInfo.State {
case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch:
if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err))
watchInfo.State = datapb.ChannelWatchState_WatchFailure
return fmt.Errorf("fail to add and start flowgraph for vChanName: %s, err: %v", vChanName, err)
} else {
log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName))
watchInfo.State = datapb.ChannelWatchState_WatchSuccess
}
log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName))
watchInfo.State = datapb.ChannelWatchState_WatchSuccess
case datapb.ChannelWatchState_ToRelease:
// there is no reason why we release fail
node.tryToReleaseFlowgraph(vChanName)

View File

@ -73,7 +73,11 @@ func (e *channelEventManager) Run() {
case event := <-e.eventChan:
switch event.eventType {
case putEventType:
e.handlePutEvent(event.info, event.version)
err := e.handlePutEvent(event.info, event.version)
if err != nil {
// logging the error is convenient for follow-up investigation of problems
log.Warn("handle put event failed", zap.String("vChanName", event.vChanName), zap.Error(err))
}
case deleteEventType:
e.handleDeleteEvent(event.vChanName)
}

View File

@ -456,6 +456,7 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position
if err != nil {
return nil, err
}
defer stream.Close()
vchannelName := position.ChannelName
pChannelName := funcutil.ToPhysicalChannel(vchannelName)
position.ChannelName = pChannelName

View File

@ -300,6 +300,32 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
RowCount: 1,
},
}, 10)
// load sealed
s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: commonpbutil.NewMsgBase(),
DstNodeID: 1,
CollectionID: s.collectionID,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: 1000,
CollectionID: s.collectionID,
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 5000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 5000},
},
},
})
s.Require().NoError(err)
s.delegator.ProcessDelete([]*DeleteData{
{
PartitionID: 500,
PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)},
Timestamps: []uint64{10},
RowCount: 1,
},
}, 10)
}
func (s *DelegatorDataSuite) TestLoadSegments() {

View File

@ -373,6 +373,7 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() {
// init msgstream failed
suite.factory.EXPECT().NewTtMsgStream(mock.Anything).Return(suite.msgStream, nil)
suite.msgStream.EXPECT().AsConsumer([]string{suite.pchannel}, mock.Anything, mock.Anything).Return()
suite.msgStream.EXPECT().Close().Return()
suite.msgStream.EXPECT().Seek(mock.Anything).Return(errors.New("mock error"))
status, err := suite.node.WatchDmChannels(ctx, req)

View File

@ -99,6 +99,7 @@ func NewDispatcher(factory msgstream.Factory,
stream.AsConsumer([]string{pchannel}, subName, mqwrapper.SubscriptionPositionUnknown)
err = stream.Seek([]*Pos{position})
if err != nil {
stream.Close()
log.Error("seek failed", zap.Error(err))
return nil, err
}