fix: Make submit idempotent (#33053)

issue: #33054

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2024-05-14 22:09:34 +08:00 committed by GitHub
parent 6fc00e42d7
commit d4837307b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 53 additions and 0 deletions

View File

@ -76,6 +76,27 @@ func NewChannelManager(dn *DataNode) *ChannelManagerImpl {
func (m *ChannelManagerImpl) Submit(info *datapb.ChannelWatchInfo) error {
channel := info.GetVchan().GetChannelName()
// skip enqueue datacoord re-submit the same operations
if runner, ok := m.opRunners.Get(channel); ok {
if runner.Exist(info.GetOpID()) {
log.Warn("op already exist, skip", zap.Int64("opID", info.GetOpID()), zap.String("channel", channel))
return nil
}
}
if info.GetState() == datapb.ChannelWatchState_ToWatch &&
m.fgManager.HasFlowgraphWithOpID(channel, info.GetOpID()) {
log.Warn("Watch op already finished, skip", zap.Int64("opID", info.GetOpID()), zap.String("channel", channel))
return nil
}
if info.GetState() == datapb.ChannelWatchState_ToRelease &&
!m.fgManager.HasFlowgraph(channel) {
log.Warn("Release op already finished, skip", zap.Int64("opID", info.GetOpID()), zap.String("channel", channel))
return nil
}
runner := m.getOrCreateRunner(channel)
return runner.Enqueue(info)
}

View File

@ -207,6 +207,32 @@ func (s *ChannelManagerSuite) TestSubmitIdempotent() {
s.Equal(1, runner.UnfinishedOpSize())
}
func (s *ChannelManagerSuite) TestSubmitSkip() {
channel := "by-dev-rootcoord-dml-1"
info := getWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
s.Require().Equal(0, s.manager.opRunners.Len())
err := s.manager.Submit(info)
s.NoError(err)
s.Equal(1, s.manager.opRunners.Len())
s.True(s.manager.opRunners.Contain(channel))
opState := <-s.manager.communicateCh
s.NotNil(opState)
s.Equal(datapb.ChannelWatchState_WatchSuccess, opState.state)
s.NotNil(opState.fg)
s.Equal(info.GetOpID(), opState.fg.opID)
s.manager.handleOpState(opState)
err = s.manager.Submit(info)
s.NoError(err)
runner, ok := s.manager.opRunners.Get(channel)
s.False(ok)
s.Nil(runner)
}
func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() {
channel := "by-dev-rootcoord-dml-0"
@ -253,4 +279,10 @@ func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() {
s.Equal(0, s.manager.fgManager.GetFlowgraphCount())
s.False(s.manager.opRunners.Contain(info.GetVchan().GetChannelName()))
s.Equal(0, s.manager.opRunners.Len())
err = s.manager.Submit(info)
s.NoError(err)
runner, ok := s.manager.opRunners.Get(channel)
s.False(ok)
s.Nil(runner)
}