2021-12-30 10:06:06 +08:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
2021-04-19 13:42:47 +08:00
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
2021-12-30 10:06:06 +08:00
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
2021-04-19 13:42:47 +08:00
|
|
|
//
|
2021-12-30 10:06:06 +08:00
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2021-04-19 13:42:47 +08:00
|
|
|
|
2020-11-02 16:44:54 +08:00
|
|
|
package flowgraph
|
|
|
|
|
|
|
|
import (
|
2020-11-12 12:04:12 +08:00
|
|
|
"fmt"
|
2020-11-02 16:44:54 +08:00
|
|
|
"sync"
|
2021-02-25 15:08:50 +08:00
|
|
|
"time"
|
2021-08-03 22:43:25 +08:00
|
|
|
|
2021-12-29 14:55:21 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/util/timerecord"
|
|
|
|
|
2021-08-03 22:43:25 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
2021-11-15 19:20:34 +08:00
|
|
|
"go.uber.org/zap"
|
2020-11-02 16:44:54 +08:00
|
|
|
)
|
|
|
|
|
2021-12-29 14:55:21 +08:00
|
|
|
const (
|
|
|
|
// TODO: better to be configured
|
|
|
|
nodeCtxTtInterval = 2 * time.Minute
|
|
|
|
enableTtChecker = true
|
|
|
|
)
|
|
|
|
|
2021-09-29 20:19:59 +08:00
|
|
|
// Node is the interface defines the behavior of flowgraph
|
2020-11-02 16:44:54 +08:00
|
|
|
type Node interface {
|
|
|
|
Name() string
|
|
|
|
MaxQueueLength() int32
|
|
|
|
MaxParallelism() int32
|
2021-03-25 14:41:46 +08:00
|
|
|
Operate(in []Msg) []Msg
|
2020-11-09 16:27:11 +08:00
|
|
|
IsInputNode() bool
|
2021-09-29 20:19:59 +08:00
|
|
|
Start()
|
2021-03-26 18:40:04 +08:00
|
|
|
Close()
|
2020-11-02 16:44:54 +08:00
|
|
|
}
|
|
|
|
|
2021-09-29 20:19:59 +08:00
|
|
|
// BaseNode defines some common node attributes and behavior
|
2020-11-02 19:30:12 +08:00
|
|
|
type BaseNode struct {
|
2020-11-02 16:44:54 +08:00
|
|
|
maxQueueLength int32
|
|
|
|
maxParallelism int32
|
|
|
|
}
|
|
|
|
|
2021-09-29 20:19:59 +08:00
|
|
|
// nodeCtx maintains the running context for a Node in flowgragh
|
2020-11-02 16:44:54 +08:00
|
|
|
type nodeCtx struct {
|
2021-02-25 17:35:36 +08:00
|
|
|
node Node
|
2021-03-25 14:41:46 +08:00
|
|
|
inputChannels []chan Msg
|
2021-02-25 17:35:36 +08:00
|
|
|
inputMessages []Msg
|
2020-11-02 16:44:54 +08:00
|
|
|
downstream []*nodeCtx
|
|
|
|
downstreamInputChanIdx map[string]int
|
2020-11-09 16:27:11 +08:00
|
|
|
|
2022-06-02 19:48:04 +08:00
|
|
|
closeCh chan struct{} // notify work to exit
|
|
|
|
closeWg sync.WaitGroup // block Close until work exit
|
2020-11-02 16:44:54 +08:00
|
|
|
}
|
|
|
|
|
2021-09-29 20:19:59 +08:00
|
|
|
// Start invoke Node `Start` method and start a worker goroutine
|
2022-06-02 19:48:04 +08:00
|
|
|
func (nodeCtx *nodeCtx) Start() {
|
2021-09-29 20:19:59 +08:00
|
|
|
nodeCtx.node.Start()
|
2020-11-09 16:27:11 +08:00
|
|
|
|
2022-06-02 19:48:04 +08:00
|
|
|
nodeCtx.closeWg.Add(1)
|
2021-09-29 20:19:59 +08:00
|
|
|
go nodeCtx.work()
|
|
|
|
}
|
|
|
|
|
|
|
|
// work handles node work spinning
|
|
|
|
// 1. collectMessage from upstream or just produce Msg from InputNode
|
|
|
|
// 2. invoke node.Operate
|
|
|
|
// 3. deliver the Operate result to downstream nodes
|
|
|
|
func (nodeCtx *nodeCtx) work() {
|
2022-06-02 19:48:04 +08:00
|
|
|
defer nodeCtx.closeWg.Done()
|
2021-12-29 14:55:21 +08:00
|
|
|
name := fmt.Sprintf("nodeCtxTtChecker-%s", nodeCtx.node.Name())
|
2022-01-06 19:27:25 +08:00
|
|
|
var checker *timerecord.GroupChecker
|
2021-12-29 14:55:21 +08:00
|
|
|
if enableTtChecker {
|
2022-01-06 19:27:25 +08:00
|
|
|
checker = timerecord.GetGroupChecker("fgNode", nodeCtxTtInterval, func(list []string) {
|
|
|
|
log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", nodeCtxTtInterval))
|
|
|
|
})
|
|
|
|
checker.Check(name)
|
|
|
|
defer checker.Remove(name)
|
2021-12-29 14:55:21 +08:00
|
|
|
}
|
|
|
|
|
2020-11-02 16:44:54 +08:00
|
|
|
for {
|
|
|
|
select {
|
2021-09-29 20:19:59 +08:00
|
|
|
case <-nodeCtx.closeCh:
|
2022-06-02 19:48:04 +08:00
|
|
|
log.Debug("flow graph node closed", zap.String("nodeName", nodeCtx.node.Name()))
|
2020-11-02 16:44:54 +08:00
|
|
|
return
|
|
|
|
default:
|
|
|
|
// inputs from inputsMessages for Operate
|
2021-11-30 09:51:51 +08:00
|
|
|
var inputs, res []Msg
|
2021-02-25 17:35:36 +08:00
|
|
|
if !nodeCtx.node.IsInputNode() {
|
2021-09-29 20:19:59 +08:00
|
|
|
nodeCtx.collectInputMessages()
|
2020-11-09 16:27:11 +08:00
|
|
|
inputs = nodeCtx.inputMessages
|
2020-11-02 16:44:54 +08:00
|
|
|
}
|
2021-02-25 17:35:36 +08:00
|
|
|
n := nodeCtx.node
|
2021-03-25 14:41:46 +08:00
|
|
|
res = n.Operate(inputs)
|
2020-11-12 12:04:12 +08:00
|
|
|
|
2021-12-29 14:55:21 +08:00
|
|
|
if enableTtChecker {
|
2022-01-06 19:27:25 +08:00
|
|
|
checker.Check(name)
|
2021-12-29 14:55:21 +08:00
|
|
|
}
|
|
|
|
|
2020-11-05 10:52:50 +08:00
|
|
|
downstreamLength := len(nodeCtx.downstreamInputChanIdx)
|
|
|
|
if len(nodeCtx.downstream) < downstreamLength {
|
2021-08-03 22:43:25 +08:00
|
|
|
log.Warn("", zap.Any("nodeCtx.downstream length", len(nodeCtx.downstream)))
|
2020-11-05 10:52:50 +08:00
|
|
|
}
|
|
|
|
if len(res) < downstreamLength {
|
2021-03-22 16:36:10 +08:00
|
|
|
// log.Println("node result length = ", len(res))
|
2020-11-12 12:04:12 +08:00
|
|
|
break
|
2020-11-05 10:52:50 +08:00
|
|
|
}
|
2020-11-12 12:04:12 +08:00
|
|
|
|
|
|
|
w := sync.WaitGroup{}
|
2020-11-05 10:52:50 +08:00
|
|
|
for i := 0; i < downstreamLength; i++ {
|
2020-11-12 12:04:12 +08:00
|
|
|
w.Add(1)
|
2021-09-29 20:19:59 +08:00
|
|
|
go nodeCtx.downstream[i].deliverMsg(&w, res[i], nodeCtx.downstreamInputChanIdx[nodeCtx.downstream[i].node.Name()])
|
2020-11-02 16:44:54 +08:00
|
|
|
}
|
2020-11-12 12:04:12 +08:00
|
|
|
w.Wait()
|
2020-11-02 16:44:54 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-29 20:19:59 +08:00
|
|
|
// Close handles cleanup logic and notify worker to quit
|
2020-11-02 16:44:54 +08:00
|
|
|
func (nodeCtx *nodeCtx) Close() {
|
2022-06-02 19:48:04 +08:00
|
|
|
if nodeCtx.node.IsInputNode() {
|
|
|
|
nodeCtx.node.Close() // close input msgStream
|
|
|
|
close(nodeCtx.closeCh)
|
|
|
|
nodeCtx.closeWg.Wait()
|
|
|
|
} else {
|
|
|
|
close(nodeCtx.closeCh)
|
|
|
|
nodeCtx.closeWg.Wait()
|
|
|
|
nodeCtx.node.Close() // close output msgStream, and etc...
|
|
|
|
}
|
2020-11-02 16:44:54 +08:00
|
|
|
}
|
|
|
|
|
2021-09-29 20:19:59 +08:00
|
|
|
// deliverMsg tries to put the Msg to specified downstream channel
|
|
|
|
func (nodeCtx *nodeCtx) deliverMsg(wg *sync.WaitGroup, msg Msg, inputChanIdx int) {
|
2021-03-26 18:40:04 +08:00
|
|
|
defer wg.Done()
|
|
|
|
defer func() {
|
|
|
|
err := recover()
|
|
|
|
if err != nil {
|
2021-08-03 22:43:25 +08:00
|
|
|
log.Warn(fmt.Sprintln(err))
|
2021-03-26 18:40:04 +08:00
|
|
|
}
|
|
|
|
}()
|
2021-09-29 20:19:59 +08:00
|
|
|
select {
|
|
|
|
case <-nodeCtx.closeCh:
|
|
|
|
case nodeCtx.inputChannels[inputChanIdx] <- msg:
|
|
|
|
}
|
2020-11-02 16:44:54 +08:00
|
|
|
}
|
|
|
|
|
2021-09-29 20:19:59 +08:00
|
|
|
func (nodeCtx *nodeCtx) collectInputMessages() {
|
2020-11-02 16:44:54 +08:00
|
|
|
inputsNum := len(nodeCtx.inputChannels)
|
2021-02-25 17:35:36 +08:00
|
|
|
nodeCtx.inputMessages = make([]Msg, inputsNum)
|
2020-11-02 16:44:54 +08:00
|
|
|
|
|
|
|
// init inputMessages,
|
|
|
|
// receive messages from inputChannels,
|
|
|
|
// and move them to inputMessages.
|
|
|
|
for i := 0; i < inputsNum; i++ {
|
|
|
|
channel := nodeCtx.inputChannels[i]
|
2021-03-23 01:49:50 +08:00
|
|
|
select {
|
2021-09-29 20:19:59 +08:00
|
|
|
case <-nodeCtx.closeCh:
|
2021-03-25 14:41:46 +08:00
|
|
|
return
|
|
|
|
case msg, ok := <-channel:
|
2021-03-23 01:49:50 +08:00
|
|
|
if !ok {
|
|
|
|
// TODO: add status
|
2021-08-03 22:43:25 +08:00
|
|
|
log.Warn("input channel closed")
|
2021-03-25 14:41:46 +08:00
|
|
|
return
|
2021-03-23 01:49:50 +08:00
|
|
|
}
|
2021-03-25 14:41:46 +08:00
|
|
|
nodeCtx.inputMessages[i] = msg
|
2020-11-12 12:04:12 +08:00
|
|
|
}
|
2021-02-25 17:35:36 +08:00
|
|
|
}
|
|
|
|
|
2020-12-10 16:31:09 +08:00
|
|
|
// timeTick alignment check
|
|
|
|
if len(nodeCtx.inputMessages) > 1 {
|
2021-02-25 17:35:36 +08:00
|
|
|
t := nodeCtx.inputMessages[0].TimeTick()
|
2021-02-25 15:08:50 +08:00
|
|
|
latestTime := t
|
2020-12-10 16:31:09 +08:00
|
|
|
for i := 1; i < len(nodeCtx.inputMessages); i++ {
|
2021-12-29 14:57:24 +08:00
|
|
|
if latestTime < nodeCtx.inputMessages[i].TimeTick() {
|
2021-02-25 17:35:36 +08:00
|
|
|
latestTime = nodeCtx.inputMessages[i].TimeTick()
|
2021-02-25 15:08:50 +08:00
|
|
|
}
|
|
|
|
}
|
2021-02-27 18:33:29 +08:00
|
|
|
|
2021-02-25 15:08:50 +08:00
|
|
|
// wait for time tick
|
2021-02-27 18:33:29 +08:00
|
|
|
sign := make(chan struct{})
|
|
|
|
go func() {
|
|
|
|
for i := 0; i < len(nodeCtx.inputMessages); i++ {
|
|
|
|
for nodeCtx.inputMessages[i].TimeTick() != latestTime {
|
2021-10-29 12:01:16 +08:00
|
|
|
log.Debug("Try to align timestamp", zap.Uint64("t1", latestTime), zap.Uint64("t2", nodeCtx.inputMessages[i].TimeTick()))
|
2021-02-27 18:33:29 +08:00
|
|
|
channel := nodeCtx.inputChannels[i]
|
2021-03-23 01:49:50 +08:00
|
|
|
select {
|
2021-09-29 20:19:59 +08:00
|
|
|
case <-nodeCtx.closeCh:
|
2021-02-27 18:33:29 +08:00
|
|
|
return
|
2021-03-23 01:49:50 +08:00
|
|
|
case msg, ok := <-channel:
|
|
|
|
if !ok {
|
2021-08-03 22:43:25 +08:00
|
|
|
log.Warn("input channel closed")
|
2021-03-23 01:49:50 +08:00
|
|
|
return
|
|
|
|
}
|
2021-03-25 14:41:46 +08:00
|
|
|
nodeCtx.inputMessages[i] = msg
|
2021-02-25 15:08:50 +08:00
|
|
|
}
|
|
|
|
}
|
2020-12-10 16:31:09 +08:00
|
|
|
}
|
2021-02-27 18:33:29 +08:00
|
|
|
sign <- struct{}{}
|
|
|
|
}()
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-time.After(10 * time.Second):
|
|
|
|
panic("Fatal, misaligned time tick, please restart pulsar")
|
|
|
|
case <-sign:
|
2021-09-29 20:19:59 +08:00
|
|
|
case <-nodeCtx.closeCh:
|
2020-12-10 16:31:09 +08:00
|
|
|
}
|
|
|
|
}
|
2020-11-02 16:44:54 +08:00
|
|
|
}
|
|
|
|
|
2021-10-04 22:38:02 +08:00
|
|
|
// MaxQueueLength returns the maximal queue length
|
2020-11-02 19:30:12 +08:00
|
|
|
func (node *BaseNode) MaxQueueLength() int32 {
|
2020-11-02 16:44:54 +08:00
|
|
|
return node.maxQueueLength
|
|
|
|
}
|
|
|
|
|
2021-10-04 22:38:02 +08:00
|
|
|
// MaxParallelism returns the maximal parallelism
|
2020-11-02 19:30:12 +08:00
|
|
|
func (node *BaseNode) MaxParallelism() int32 {
|
2020-11-02 16:44:54 +08:00
|
|
|
return node.maxParallelism
|
|
|
|
}
|
|
|
|
|
2021-10-04 22:38:02 +08:00
|
|
|
// SetMaxQueueLength is used to set the maximal queue length
|
2020-11-02 19:30:12 +08:00
|
|
|
func (node *BaseNode) SetMaxQueueLength(n int32) {
|
2020-11-02 16:44:54 +08:00
|
|
|
node.maxQueueLength = n
|
|
|
|
}
|
|
|
|
|
2021-10-04 22:38:02 +08:00
|
|
|
// SetMaxParallelism is used to set the maximal parallelism
|
2020-11-02 19:30:12 +08:00
|
|
|
func (node *BaseNode) SetMaxParallelism(n int32) {
|
2020-11-02 16:44:54 +08:00
|
|
|
node.maxParallelism = n
|
|
|
|
}
|
|
|
|
|
2021-09-29 20:19:59 +08:00
|
|
|
// IsInputNode returns whether Node is InputNode, BaseNode is not InputNode by default
|
2020-11-09 16:27:11 +08:00
|
|
|
func (node *BaseNode) IsInputNode() bool {
|
|
|
|
return false
|
2020-11-02 16:44:54 +08:00
|
|
|
}
|
2021-03-26 18:40:04 +08:00
|
|
|
|
2021-09-29 20:19:59 +08:00
|
|
|
// Start implementing Node, base node does nothing when starts
|
|
|
|
func (node *BaseNode) Start() {}
|
|
|
|
|
2021-10-04 22:38:02 +08:00
|
|
|
// Close implementing Node, base node does nothing when stops
|
2021-09-29 20:19:59 +08:00
|
|
|
func (node *BaseNode) Close() {}
|