mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 19:08:30 +08:00
Wait for flow graph to exit (#17341)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
76eaa3fc50
commit
48cf63a2d7
@ -79,12 +79,9 @@ func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, in []string, out []stri
|
||||
// Start starts all nodes in timetick flowgragh
|
||||
func (fg *TimeTickedFlowGraph) Start() {
|
||||
fg.startOnce.Do(func() {
|
||||
wg := sync.WaitGroup{}
|
||||
for _, v := range fg.nodeCtx {
|
||||
wg.Add(1)
|
||||
v.Start(&wg)
|
||||
v.Start()
|
||||
}
|
||||
wg.Wait()
|
||||
})
|
||||
}
|
||||
|
||||
@ -92,9 +89,17 @@ func (fg *TimeTickedFlowGraph) Start() {
|
||||
func (fg *TimeTickedFlowGraph) Close() {
|
||||
fg.stopOnce.Do(func() {
|
||||
for _, v := range fg.nodeCtx {
|
||||
// maybe need to stop in order
|
||||
if v.node.IsInputNode() {
|
||||
// close inputNode first
|
||||
v.Close()
|
||||
}
|
||||
}
|
||||
for _, v := range fg.nodeCtx {
|
||||
if !v.node.IsInputNode() {
|
||||
// close other nodes
|
||||
v.Close()
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -64,7 +64,7 @@ func (inNode *InputNode) InStream() msgstream.MsgStream {
|
||||
func (inNode *InputNode) Operate(in []Msg) []Msg {
|
||||
msgPack, ok := <-inNode.inStream.Chan()
|
||||
if !ok {
|
||||
log.Warn("Receive Msg failed from upstream node", zap.Any("input node", inNode.Name()))
|
||||
log.Warn("MsgStream closed", zap.Any("input node", inNode.Name()))
|
||||
return []Msg{}
|
||||
}
|
||||
|
||||
|
@ -58,15 +58,16 @@ type nodeCtx struct {
|
||||
downstream []*nodeCtx
|
||||
downstreamInputChanIdx map[string]int
|
||||
|
||||
closeCh chan struct{}
|
||||
closeCh chan struct{} // notify work to exit
|
||||
closeWg sync.WaitGroup // block Close until work exit
|
||||
}
|
||||
|
||||
// Start invoke Node `Start` method and start a worker goroutine
|
||||
func (nodeCtx *nodeCtx) Start(wg *sync.WaitGroup) {
|
||||
func (nodeCtx *nodeCtx) Start() {
|
||||
nodeCtx.node.Start()
|
||||
|
||||
nodeCtx.closeWg.Add(1)
|
||||
go nodeCtx.work()
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
// work handles node work spinning
|
||||
@ -74,6 +75,7 @@ func (nodeCtx *nodeCtx) Start(wg *sync.WaitGroup) {
|
||||
// 2. invoke node.Operate
|
||||
// 3. deliver the Operate result to downstream nodes
|
||||
func (nodeCtx *nodeCtx) work() {
|
||||
defer nodeCtx.closeWg.Done()
|
||||
name := fmt.Sprintf("nodeCtxTtChecker-%s", nodeCtx.node.Name())
|
||||
var checker *timerecord.GroupChecker
|
||||
if enableTtChecker {
|
||||
@ -87,10 +89,10 @@ func (nodeCtx *nodeCtx) work() {
|
||||
for {
|
||||
select {
|
||||
case <-nodeCtx.closeCh:
|
||||
log.Debug("flow graph node closed", zap.String("nodeName", nodeCtx.node.Name()))
|
||||
return
|
||||
default:
|
||||
// inputs from inputsMessages for Operate
|
||||
|
||||
var inputs, res []Msg
|
||||
if !nodeCtx.node.IsInputNode() {
|
||||
nodeCtx.collectInputMessages()
|
||||
@ -124,10 +126,15 @@ func (nodeCtx *nodeCtx) work() {
|
||||
|
||||
// Close handles cleanup logic and notify worker to quit
|
||||
func (nodeCtx *nodeCtx) Close() {
|
||||
// close Node
|
||||
nodeCtx.node.Close()
|
||||
// notify worker
|
||||
if nodeCtx.node.IsInputNode() {
|
||||
nodeCtx.node.Close() // close input msgStream
|
||||
close(nodeCtx.closeCh)
|
||||
nodeCtx.closeWg.Wait()
|
||||
} else {
|
||||
close(nodeCtx.closeCh)
|
||||
nodeCtx.closeWg.Wait()
|
||||
nodeCtx.node.Close() // close output msgStream, and etc...
|
||||
}
|
||||
}
|
||||
|
||||
// deliverMsg tries to put the Msg to specified downstream channel
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"context"
|
||||
"math"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -90,9 +89,9 @@ func TestNodeCtx_Start(t *testing.T) {
|
||||
node.inputChannels[i] = make(chan Msg)
|
||||
}
|
||||
|
||||
var waitGroup sync.WaitGroup
|
||||
waitGroup.Add(1)
|
||||
node.Start(&waitGroup)
|
||||
assert.NotPanics(t, func() {
|
||||
node.Start()
|
||||
})
|
||||
|
||||
node.Close()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user