fix: Correct the update logic of timerecorder (#34339)

Correct the update logic of timerecorder in the flowgraph to avoid false
failure: "some node(s) haven't received input".

issue: https://github.com/milvus-io/milvus/issues/34337

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-07-04 16:34:17 +08:00 committed by GitHub
parent 7611128e57
commit 0b404bff22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 23 additions and 16 deletions

View File

@ -76,7 +76,7 @@ type ddNode struct {
// Name returns node name, implementing flowgraph.Node
func (ddn *ddNode) Name() string {
return fmt.Sprintf("ddNode-%d-%s", ddn.collectionID, ddn.vChannelName)
return fmt.Sprintf("ddNode-%s", ddn.vChannelName)
}
func (ddn *ddNode) IsValidInMsg(in []Msg) bool {

View File

@ -83,7 +83,7 @@ func TestFlowGraph_DDNode_newDDNode(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, ddNode)
assert.Equal(t, fmt.Sprintf("ddNode-%d-%s", ddNode.collectionID, ddNode.vChannelName), ddNode.Name())
assert.Equal(t, fmt.Sprintf("ddNode-%s", ddNode.vChannelName), ddNode.Name())
assert.Equal(t, len(test.inSealedSegs), len(ddNode.sealedSegInfo))
assert.Equal(t, len(test.inGrowingSegs), len(ddNode.growingSegInfo))

View File

@ -62,7 +62,7 @@ func newDmInputNode(initCtx context.Context, dispatcherClient msgdispatcher.Clie
log.Info("datanode consume successfully when register to msgDispatcher")
}
name := fmt.Sprintf("dmInputNode-data-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName)
name := fmt.Sprintf("dmInputNode-data-%s", dmNodeConfig.vChannelName)
node := flowgraph.NewInputNode(
input,
name,

View File

@ -2,6 +2,7 @@ package pipeline
import (
"context"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
@ -27,6 +28,11 @@ type writeNode struct {
metacache metacache.MetaCache
}
// Name returns node name, implementing flowgraph.Node
func (wNode *writeNode) Name() string {
return fmt.Sprintf("writeNode-%s", wNode.channelName)
}
func (wNode *writeNode) Operate(in []Msg) []Msg {
fgMsg := in[0].(*FlowGraphMsg)

View File

@ -75,27 +75,23 @@ func (nodeCtxManager *nodeCtxManager) Start() {
// in dmInputNode, message from mq to channel, alloc goroutines
// limit the goroutines in other node to prevent huge goroutines numbers
nodeCtxManager.closeWg.Add(1)
go nodeCtxManager.workNodeStart()
}
func (nodeCtxManager *nodeCtxManager) workNodeStart() {
defer nodeCtxManager.closeWg.Done()
inputNode := nodeCtxManager.inputNodeCtx
curNode := inputNode
curNode := nodeCtxManager.inputNodeCtx
// tt checker start
var checker *timerecord.Checker
if enableTtChecker {
manager := timerecord.GetCheckerManger("fgNode", nodeCtxTtInterval, func(list []string) {
log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", nodeCtxTtInterval))
})
for curNode != nil {
name := fmt.Sprintf("nodeCtxTtChecker-%s", curNode.node.Name())
checker = timerecord.NewChecker(name, manager)
curNode.checker = timerecord.NewChecker(name, manager)
curNode = curNode.downstream
defer checker.Close()
}
}
go nodeCtxManager.workNodeStart()
}
func (nodeCtxManager *nodeCtxManager) workNodeStart() {
defer nodeCtxManager.closeWg.Done()
for {
select {
case <-nodeCtxManager.closeCh:
@ -105,7 +101,8 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() {
// 2. invoke node.Operate
// 3. deliver the Operate result to downstream nodes
default:
curNode = inputNode
inputNode := nodeCtxManager.inputNodeCtx
curNode := inputNode
for curNode != nil {
// inputs from inputsMessages for Operate
var input, output []Msg
@ -137,8 +134,8 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() {
if curNode.downstream != nil {
curNode.downstream.inputChannel <- output
}
if enableTtChecker {
checker.Check()
if enableTtChecker && curNode.checker != nil {
curNode.checker.Check()
}
curNode = curNode.downstream
}
@ -157,6 +154,7 @@ type nodeCtx struct {
node Node
inputChannel chan []Msg
downstream *nodeCtx
checker *timerecord.Checker
blockMutex sync.RWMutex
}
@ -192,6 +190,9 @@ func (nodeCtx *nodeCtx) Close() {
if nodeCtx.node.IsInputNode() {
for nodeCtx != nil {
nodeCtx.node.Close()
if nodeCtx.checker != nil {
nodeCtx.checker.Close()
}
log.Debug("flow graph node closed", zap.String("nodeName", nodeCtx.node.Name()))
nodeCtx = nodeCtx.downstream
}