enhance: Refine flusher channel management (#35870)

Change the ChannelTask to ChannelLifetime, only removing it upon
unregistering.

issue: https://github.com/milvus-io/milvus/issues/33285

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-09-01 17:07:10 +08:00 committed by GitHub
parent ef451f5e1f
commit 2e090b2426
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 40 additions and 70 deletions

View File

@ -34,29 +34,30 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
)
type TaskState int
type LifetimeState int
const (
Pending TaskState = iota
Pending LifetimeState = iota
Cancel
Done
)
type ChannelTask interface {
type ChannelLifetime interface {
Run() error
Cancel()
}
type channelTask struct {
type channelLifetime struct {
mu sync.Mutex
state TaskState
f *flusherImpl
state LifetimeState
vchannel string
wal wal.WAL
scanner wal.Scanner
f *flusherImpl
}
func NewChannelTask(f *flusherImpl, vchannel string, wal wal.WAL) ChannelTask {
return &channelTask{
func NewChannelLifetime(f *flusherImpl, vchannel string, wal wal.WAL) ChannelLifetime {
return &channelLifetime{
state: Pending,
f: f,
vchannel: vchannel,
@ -64,13 +65,10 @@ func NewChannelTask(f *flusherImpl, vchannel string, wal wal.WAL) ChannelTask {
}
}
func (c *channelTask) Run() error {
func (c *channelLifetime) Run() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.state == Cancel {
return nil
}
if c.f.fgMgr.HasFlowgraph(c.vchannel) {
if c.state == Cancel || c.state == Done {
return nil
}
log.Info("start to build pipeline", zap.String("vchannel", c.vchannel))
@ -110,14 +108,14 @@ func (c *channelTask) Run() error {
}
ds.Start()
c.f.fgMgr.AddFlowgraph(ds)
c.f.scanners.Insert(c.vchannel, scanner)
c.scanner = scanner
c.state = Done
log.Info("build pipeline done", zap.String("vchannel", c.vchannel))
return nil
}
func (c *channelTask) Cancel() {
func (c *channelLifetime) Cancel() {
c.mu.Lock()
defer c.mu.Unlock()
switch c.state {
@ -126,11 +124,9 @@ func (c *channelTask) Cancel() {
case Cancel:
return
case Done:
if scanner, ok := c.f.scanners.GetAndRemove(c.vchannel); ok {
err := scanner.Close()
if err != nil {
log.Warn("scanner error", zap.String("vchannel", c.vchannel), zap.Error(err))
}
err := c.scanner.Close()
if err != nil {
log.Warn("scanner error", zap.String("vchannel", c.vchannel), zap.Error(err))
}
c.f.fgMgr.RemoveFlowgraph(c.vchannel)
c.f.wbMgr.RemoveChannel(c.vchannel)

View File

@ -50,8 +50,7 @@ type flusherImpl struct {
wbMgr writebuffer.BufferManager
cpUpdater *util.ChannelCheckpointUpdater
tasks *typeutil.ConcurrentMap[string, ChannelTask]
scanners *typeutil.ConcurrentMap[string, wal.Scanner] // watched scanners
channelLifetimes *typeutil.ConcurrentMap[string, ChannelLifetime]
notifyCh chan struct{}
stopChan lifetime.SafeChan
@ -67,16 +66,15 @@ func NewFlusher(chunkManager storage.ChunkManager) flusher.Flusher {
func newFlusherWithParam(params *util.PipelineParams) flusher.Flusher {
fgMgr := pipeline.NewFlowgraphManager()
return &flusherImpl{
broker: params.Broker,
fgMgr: fgMgr,
syncMgr: params.SyncMgr,
wbMgr: params.WriteBufferManager,
cpUpdater: params.CheckpointUpdater,
tasks: typeutil.NewConcurrentMap[string, ChannelTask](),
scanners: typeutil.NewConcurrentMap[string, wal.Scanner](),
notifyCh: make(chan struct{}, 1),
stopChan: lifetime.NewSafeChan(),
pipelineParams: params,
broker: params.Broker,
fgMgr: fgMgr,
syncMgr: params.SyncMgr,
wbMgr: params.WriteBufferManager,
cpUpdater: params.CheckpointUpdater,
channelLifetimes: typeutil.NewConcurrentMap[string, ChannelLifetime](),
notifyCh: make(chan struct{}, 1),
stopChan: lifetime.NewSafeChan(),
pipelineParams: params,
}
}
@ -96,22 +94,15 @@ func (f *flusherImpl) RegisterPChannel(pchannel string, wal wal.WAL) error {
}
func (f *flusherImpl) RegisterVChannel(vchannel string, wal wal.WAL) {
if f.scanners.Contain(vchannel) {
return
_, ok := f.channelLifetimes.GetOrInsert(vchannel, NewChannelLifetime(f, vchannel, wal))
if !ok {
log.Info("flusher register vchannel done", zap.String("vchannel", vchannel))
}
f.tasks.GetOrInsert(vchannel, NewChannelTask(f, vchannel, wal))
f.notify()
log.Info("flusher register vchannel done", zap.String("vchannel", vchannel))
}
func (f *flusherImpl) UnregisterPChannel(pchannel string) {
f.tasks.Range(func(vchannel string, task ChannelTask) bool {
if funcutil.ToPhysicalChannel(vchannel) == pchannel {
f.UnregisterVChannel(vchannel)
}
return true
})
f.scanners.Range(func(vchannel string, scanner wal.Scanner) bool {
f.channelLifetimes.Range(func(vchannel string, _ ChannelLifetime) bool {
if funcutil.ToPhysicalChannel(vchannel) == pchannel {
f.UnregisterVChannel(vchannel)
}
@ -120,18 +111,9 @@ func (f *flusherImpl) UnregisterPChannel(pchannel string) {
}
func (f *flusherImpl) UnregisterVChannel(vchannel string) {
if task, ok := f.tasks.Get(vchannel); ok {
task.Cancel()
if clt, ok := f.channelLifetimes.GetAndRemove(vchannel); ok {
clt.Cancel()
}
if scanner, ok := f.scanners.GetAndRemove(vchannel); ok {
err := scanner.Close()
if err != nil {
log.Warn("scanner error", zap.String("vchannel", vchannel), zap.Error(err))
}
}
f.fgMgr.RemoveFlowgraph(vchannel)
f.wbMgr.RemoveChannel(vchannel)
log.Info("flusher unregister vchannel done", zap.String("vchannel", vchannel))
}
func (f *flusherImpl) notify() {
@ -154,16 +136,14 @@ func (f *flusherImpl) Start() {
return
case <-f.notifyCh:
futures := make([]*conc.Future[any], 0)
f.tasks.Range(func(vchannel string, task ChannelTask) bool {
f.channelLifetimes.Range(func(vchannel string, lifetime ChannelLifetime) bool {
future := GetExecPool().Submit(func() (any, error) {
err := task.Run()
err := lifetime.Run()
if err != nil {
log.Warn("build pipeline failed", zap.String("vchannel", vchannel), zap.Error(err))
// Notify to trigger retry.
f.notify()
f.notify() // Notify to trigger retry.
return nil, err
}
f.tasks.Remove(vchannel)
return nil, nil
})
futures = append(futures, future)
@ -178,11 +158,8 @@ func (f *flusherImpl) Start() {
func (f *flusherImpl) Stop() {
f.stopChan.Close()
f.stopWg.Wait()
f.scanners.Range(func(vchannel string, scanner wal.Scanner) bool {
err := scanner.Close()
if err != nil {
log.Warn("scanner error", zap.String("vchannel", vchannel), zap.Error(err))
}
f.channelLifetimes.Range(func(vchannel string, lifetime ChannelLifetime) bool {
lifetime.Cancel()
return true
})
f.fgMgr.ClearFlowgraphs()

View File

@ -160,8 +160,7 @@ func TestFlusher_RegisterPChannel(t *testing.T) {
f.UnregisterPChannel(pchannel)
assert.Equal(t, 0, f.(*flusherImpl).fgMgr.GetFlowgraphCount())
assert.Equal(t, 0, f.(*flusherImpl).scanners.Len())
assert.Equal(t, 0, f.(*flusherImpl).tasks.Len())
assert.Equal(t, 0, f.(*flusherImpl).channelLifetimes.Len())
}
func TestFlusher_RegisterVChannel(t *testing.T) {
@ -199,8 +198,7 @@ func TestFlusher_RegisterVChannel(t *testing.T) {
f.UnregisterVChannel(vchannel)
}
assert.Equal(t, 0, f.(*flusherImpl).fgMgr.GetFlowgraphCount())
assert.Equal(t, 0, f.(*flusherImpl).scanners.Len())
assert.Equal(t, 0, f.(*flusherImpl).tasks.Len())
assert.Equal(t, 0, f.(*flusherImpl).channelLifetimes.Len())
}
func TestFlusher_Concurrency(t *testing.T) {
@ -248,6 +246,5 @@ func TestFlusher_Concurrency(t *testing.T) {
}
assert.Equal(t, 0, f.(*flusherImpl).fgMgr.GetFlowgraphCount())
assert.Equal(t, 0, f.(*flusherImpl).scanners.Len())
assert.Equal(t, 0, f.(*flusherImpl).tasks.Len())
assert.Equal(t, 0, f.(*flusherImpl).channelLifetimes.Len())
}