From 2e090b242619cf9ed0c78c1307252765c81018d7 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Sun, 1 Sep 2024 17:07:10 +0800 Subject: [PATCH] enhance: Refine flusher channel management (#35870) Change the ChannelTask to ChannelLifetime, only removing it upon unregistering. issue: https://github.com/milvus-io/milvus/issues/33285 Signed-off-by: bigsheeper --- .../{channel_task.go => channel_lifetime.go} | 36 +++++----- .../flusher/flusherimpl/flusher_impl.go | 65 ++++++------------- .../flusher/flusherimpl/flusher_impl_test.go | 9 +-- 3 files changed, 40 insertions(+), 70 deletions(-) rename internal/streamingnode/server/flusher/flusherimpl/{channel_task.go => channel_lifetime.go} (84%) diff --git a/internal/streamingnode/server/flusher/flusherimpl/channel_task.go b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go similarity index 84% rename from internal/streamingnode/server/flusher/flusherimpl/channel_task.go rename to internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go index 3de3f12b23..c57693f7eb 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/channel_task.go +++ b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go @@ -34,29 +34,30 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" ) -type TaskState int +type LifetimeState int const ( - Pending TaskState = iota + Pending LifetimeState = iota Cancel Done ) -type ChannelTask interface { +type ChannelLifetime interface { Run() error Cancel() } -type channelTask struct { +type channelLifetime struct { mu sync.Mutex - state TaskState - f *flusherImpl + state LifetimeState vchannel string wal wal.WAL + scanner wal.Scanner + f *flusherImpl } -func NewChannelTask(f *flusherImpl, vchannel string, wal wal.WAL) ChannelTask { - return &channelTask{ +func NewChannelLifetime(f *flusherImpl, vchannel string, wal wal.WAL) ChannelLifetime { + return &channelLifetime{ state: Pending, f: f, vchannel: vchannel, @@ -64,13 +65,10 @@ func NewChannelTask(f *flusherImpl, vchannel string, wal wal.WAL) ChannelTask { } } -func (c *channelTask) Run() error { +func (c *channelLifetime) Run() error { c.mu.Lock() defer c.mu.Unlock() - if c.state == Cancel { - return nil - } - if c.f.fgMgr.HasFlowgraph(c.vchannel) { + if c.state == Cancel || c.state == Done { return nil } log.Info("start to build pipeline", zap.String("vchannel", c.vchannel)) @@ -110,14 +108,14 @@ func (c *channelTask) Run() error { } ds.Start() c.f.fgMgr.AddFlowgraph(ds) - c.f.scanners.Insert(c.vchannel, scanner) + c.scanner = scanner c.state = Done log.Info("build pipeline done", zap.String("vchannel", c.vchannel)) return nil } -func (c *channelTask) Cancel() { +func (c *channelLifetime) Cancel() { c.mu.Lock() defer c.mu.Unlock() switch c.state { @@ -126,11 +124,9 @@ func (c *channelTask) Cancel() { case Cancel: return case Done: - if scanner, ok := c.f.scanners.GetAndRemove(c.vchannel); ok { - err := scanner.Close() - if err != nil { - log.Warn("scanner error", zap.String("vchannel", c.vchannel), zap.Error(err)) - } + err := c.scanner.Close() + if err != nil { + log.Warn("scanner error", zap.String("vchannel", c.vchannel), zap.Error(err)) } c.f.fgMgr.RemoveFlowgraph(c.vchannel) c.f.wbMgr.RemoveChannel(c.vchannel) diff --git a/internal/streamingnode/server/flusher/flusherimpl/flusher_impl.go b/internal/streamingnode/server/flusher/flusherimpl/flusher_impl.go index b22feeea83..38b25c538b 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/flusher_impl.go +++ b/internal/streamingnode/server/flusher/flusherimpl/flusher_impl.go @@ -50,8 +50,7 @@ type flusherImpl struct { wbMgr writebuffer.BufferManager cpUpdater *util.ChannelCheckpointUpdater - tasks *typeutil.ConcurrentMap[string, ChannelTask] - scanners *typeutil.ConcurrentMap[string, wal.Scanner] // watched scanners + channelLifetimes *typeutil.ConcurrentMap[string, ChannelLifetime] notifyCh chan struct{} stopChan lifetime.SafeChan @@ -67,16 +66,15 @@ func NewFlusher(chunkManager storage.ChunkManager) flusher.Flusher { func newFlusherWithParam(params *util.PipelineParams) flusher.Flusher { fgMgr := pipeline.NewFlowgraphManager() return &flusherImpl{ - broker: params.Broker, - fgMgr: fgMgr, - syncMgr: params.SyncMgr, - wbMgr: params.WriteBufferManager, - cpUpdater: params.CheckpointUpdater, - tasks: typeutil.NewConcurrentMap[string, ChannelTask](), - scanners: typeutil.NewConcurrentMap[string, wal.Scanner](), - notifyCh: make(chan struct{}, 1), - stopChan: lifetime.NewSafeChan(), - pipelineParams: params, + broker: params.Broker, + fgMgr: fgMgr, + syncMgr: params.SyncMgr, + wbMgr: params.WriteBufferManager, + cpUpdater: params.CheckpointUpdater, + channelLifetimes: typeutil.NewConcurrentMap[string, ChannelLifetime](), + notifyCh: make(chan struct{}, 1), + stopChan: lifetime.NewSafeChan(), + pipelineParams: params, } } @@ -96,22 +94,15 @@ func (f *flusherImpl) RegisterPChannel(pchannel string, wal wal.WAL) error { } func (f *flusherImpl) RegisterVChannel(vchannel string, wal wal.WAL) { - if f.scanners.Contain(vchannel) { - return + _, ok := f.channelLifetimes.GetOrInsert(vchannel, NewChannelLifetime(f, vchannel, wal)) + if !ok { + log.Info("flusher register vchannel done", zap.String("vchannel", vchannel)) } - f.tasks.GetOrInsert(vchannel, NewChannelTask(f, vchannel, wal)) f.notify() - log.Info("flusher register vchannel done", zap.String("vchannel", vchannel)) } func (f *flusherImpl) UnregisterPChannel(pchannel string) { - f.tasks.Range(func(vchannel string, task ChannelTask) bool { - if funcutil.ToPhysicalChannel(vchannel) == pchannel { - f.UnregisterVChannel(vchannel) - } - return true - }) - f.scanners.Range(func(vchannel string, scanner wal.Scanner) bool { + f.channelLifetimes.Range(func(vchannel string, _ ChannelLifetime) bool { if funcutil.ToPhysicalChannel(vchannel) == pchannel { f.UnregisterVChannel(vchannel) } @@ -120,18 +111,9 @@ func (f *flusherImpl) UnregisterPChannel(pchannel string) { } func (f *flusherImpl) UnregisterVChannel(vchannel string) { - if task, ok := f.tasks.Get(vchannel); ok { - task.Cancel() + if clt, ok := f.channelLifetimes.GetAndRemove(vchannel); ok { + clt.Cancel() } - if scanner, ok := f.scanners.GetAndRemove(vchannel); ok { - err := scanner.Close() - if err != nil { - log.Warn("scanner error", zap.String("vchannel", vchannel), zap.Error(err)) - } - } - f.fgMgr.RemoveFlowgraph(vchannel) - f.wbMgr.RemoveChannel(vchannel) - log.Info("flusher unregister vchannel done", zap.String("vchannel", vchannel)) } func (f *flusherImpl) notify() { @@ -154,16 +136,14 @@ func (f *flusherImpl) Start() { return case <-f.notifyCh: futures := make([]*conc.Future[any], 0) - f.tasks.Range(func(vchannel string, task ChannelTask) bool { + f.channelLifetimes.Range(func(vchannel string, lifetime ChannelLifetime) bool { future := GetExecPool().Submit(func() (any, error) { - err := task.Run() + err := lifetime.Run() if err != nil { log.Warn("build pipeline failed", zap.String("vchannel", vchannel), zap.Error(err)) - // Notify to trigger retry. - f.notify() + f.notify() // Notify to trigger retry. return nil, err } - f.tasks.Remove(vchannel) return nil, nil }) futures = append(futures, future) @@ -178,11 +158,8 @@ func (f *flusherImpl) Start() { func (f *flusherImpl) Stop() { f.stopChan.Close() f.stopWg.Wait() - f.scanners.Range(func(vchannel string, scanner wal.Scanner) bool { - err := scanner.Close() - if err != nil { - log.Warn("scanner error", zap.String("vchannel", vchannel), zap.Error(err)) - } + f.channelLifetimes.Range(func(vchannel string, lifetime ChannelLifetime) bool { + lifetime.Cancel() return true }) f.fgMgr.ClearFlowgraphs() diff --git a/internal/streamingnode/server/flusher/flusherimpl/flusher_impl_test.go b/internal/streamingnode/server/flusher/flusherimpl/flusher_impl_test.go index ff0b0f5441..b737da1fef 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/flusher_impl_test.go +++ b/internal/streamingnode/server/flusher/flusherimpl/flusher_impl_test.go @@ -160,8 +160,7 @@ func TestFlusher_RegisterPChannel(t *testing.T) { f.UnregisterPChannel(pchannel) assert.Equal(t, 0, f.(*flusherImpl).fgMgr.GetFlowgraphCount()) - assert.Equal(t, 0, f.(*flusherImpl).scanners.Len()) - assert.Equal(t, 0, f.(*flusherImpl).tasks.Len()) + assert.Equal(t, 0, f.(*flusherImpl).channelLifetimes.Len()) } func TestFlusher_RegisterVChannel(t *testing.T) { @@ -199,8 +198,7 @@ func TestFlusher_RegisterVChannel(t *testing.T) { f.UnregisterVChannel(vchannel) } assert.Equal(t, 0, f.(*flusherImpl).fgMgr.GetFlowgraphCount()) - assert.Equal(t, 0, f.(*flusherImpl).scanners.Len()) - assert.Equal(t, 0, f.(*flusherImpl).tasks.Len()) + assert.Equal(t, 0, f.(*flusherImpl).channelLifetimes.Len()) } func TestFlusher_Concurrency(t *testing.T) { @@ -248,6 +246,5 @@ func TestFlusher_Concurrency(t *testing.T) { } assert.Equal(t, 0, f.(*flusherImpl).fgMgr.GetFlowgraphCount()) - assert.Equal(t, 0, f.(*flusherImpl).scanners.Len()) - assert.Equal(t, 0, f.(*flusherImpl).tasks.Len()) + assert.Equal(t, 0, f.(*flusherImpl).channelLifetimes.Len()) }