Simplify flow graph node to pipeline mode (#19667)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
Xiaofan 2022-10-10 22:15:22 +08:00 committed by GitHub
parent 31db01b6ae
commit db3f4be49d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 149 additions and 375 deletions

View File

@ -326,7 +326,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
// ddStreamNode
err = dsService.fg.SetEdges(dmStreamNode.Name(),
[]string{},
[]string{ddNode.Name()},
)
if err != nil {
@ -336,7 +335,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
// ddNode
err = dsService.fg.SetEdges(ddNode.Name(),
[]string{dmStreamNode.Name()},
[]string{insertBufferNode.Name()},
)
if err != nil {
@ -346,7 +344,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
// insertBufferNode
err = dsService.fg.SetEdges(insertBufferNode.Name(),
[]string{ddNode.Name()},
[]string{deleteNode.Name()},
)
if err != nil {
@ -356,7 +353,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
//deleteNode
err = dsService.fg.SetEdges(deleteNode.Name(),
[]string{insertBufferNode.Name()},
[]string{},
)
if err != nil {

View File

@ -82,6 +82,11 @@ func (ddn *ddNode) Name() string {
// Operate handles input messages, implementing flowgrpah.Node
func (ddn *ddNode) Operate(in []Msg) []Msg {
if in == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil")
return []Msg{}
}
if len(in) != 1 {
log.Warn("Invalid operate message input in ddNode", zap.Int("input length", len(in)))
return []Msg{}
@ -89,11 +94,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
msMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil")
} else {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
return []Msg{}
}

View File

@ -120,20 +120,19 @@ func (dn *deleteNode) showDelBuf(segIDs []UniqueID, ts Timestamp) {
// Operate implementing flowgraph.Node, performs delete data process
func (dn *deleteNode) Operate(in []Msg) []Msg {
//log.Debug("deleteNode Operating")
if in == nil {
log.Debug("type assertion failed for flowGraphMsg because it's nil")
return []Msg{}
}
if len(in) != 1 {
log.Error("Invalid operate message input in deleteNode", zap.Int("input length", len(in)))
return nil
return []Msg{}
}
fgMsg, ok := in[0].(*flowGraphMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for flowGraphMsg because it's nil")
} else {
log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
return []Msg{}
}
@ -205,7 +204,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
for _, sp := range spans {
sp.Finish()
}
return nil
return in
}
// update delBuf for compacted segments

View File

@ -209,8 +209,6 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
"Invalid input length == 0"},
{[]Msg{&flowGraphMsg{}, &flowGraphMsg{}, &flowGraphMsg{}},
"Invalid input length == 3"},
{[]Msg{&flowGraphMsg{}},
"Invalid input length == 1 but input message is not msgStreamMsg"},
{[]Msg{&flowgraph.MsgStreamMsg{}},
"Invalid input length == 1 but input message is not flowGraphMsg"},
}

View File

@ -175,7 +175,10 @@ func (ibNode *insertBufferNode) Close() {
}
func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
// log.Debug("InsertBufferNode Operating")
if in == nil {
log.Debug("type assertion failed for flowGraphMsg because it's nil")
return []Msg{}
}
if len(in) != 1 {
log.Error("Invalid operate message input in insertBufferNode", zap.Int("input length", len(in)))
@ -184,11 +187,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
fgMsg, ok := in[0].(*flowGraphMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for flowGraphMsg because it's nil")
} else {
log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
return []Msg{}
}

View File

@ -52,6 +52,11 @@ func (dNode *deleteNode) Name() string {
// Operate handles input messages, do delete operations
func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if in == nil {
log.Debug("type assertion failed for deleteMsg because it's nil", zap.String("name", dNode.Name()))
return []Msg{}
}
if len(in) != 1 {
log.Warn("Invalid operate message input in deleteNode", zap.Int("input length", len(in)), zap.String("name", dNode.Name()))
return []Msg{}
@ -59,11 +64,7 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
dMsg, ok := in[0].(*deleteMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for deleteMsg because it's nil", zap.String("name", dNode.Name()))
} else {
log.Warn("type assertion failed for deleteMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", dNode.Name()))
}
log.Warn("type assertion failed for deleteMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", dNode.Name()))
return []Msg{}
}
@ -73,10 +74,6 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
deleteOffset: map[UniqueID]int64{},
}
if dMsg == nil {
return []Msg{}
}
var spans []opentracing.Span
for _, msg := range dMsg.deleteMessages {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())

View File

@ -45,6 +45,11 @@ func (fddNode *filterDeleteNode) Name() string {
// Operate handles input messages, to filter invalid delete messages
func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if in == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fddNode.Name()))
return []Msg{}
}
if len(in) != 1 {
log.Warn("Invalid operate message input in filterDDNode", zap.Int("input length", len(in)), zap.String("name", fddNode.Name()))
return []Msg{}
@ -52,15 +57,7 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
msgStreamMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fddNode.Name()))
} else {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fddNode.Name()))
}
return []Msg{}
}
if msgStreamMsg == nil {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fddNode.Name()))
return []Msg{}
}

View File

@ -49,6 +49,10 @@ func (fdmNode *filterDmNode) Name() string {
// Operate handles input messages, to filter invalid insert messages
func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if in == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fdmNode.Name()))
return []Msg{}
}
if len(in) != 1 {
log.Warn("Invalid operate message input in filterDmNode", zap.Int("input length", len(in)), zap.String("name", fdmNode.Name()))
return []Msg{}
@ -56,15 +60,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
msgStreamMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fdmNode.Name()))
} else {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fdmNode.Name()))
}
return []Msg{}
}
if msgStreamMsg == nil {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fdmNode.Name()))
return []Msg{}
}

View File

@ -72,6 +72,11 @@ func (iNode *insertNode) Name() string {
// Operate handles input messages, to execute insert operations
func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if in == nil {
log.Debug("type assertion failed for insertMsg because it's nil", zap.String("name", iNode.Name()))
return []Msg{}
}
if len(in) != 1 {
log.Warn("Invalid operate message input in insertNode", zap.Int("input length", len(in)), zap.String("name", iNode.Name()))
return []Msg{}
@ -79,11 +84,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
iMsg, ok := in[0].(*insertMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for insertMsg because it's nil", zap.String("name", iNode.Name()))
} else {
log.Warn("type assertion failed for insertMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", iNode.Name()))
}
log.Warn("type assertion failed for insertMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", iNode.Name()))
return []Msg{}
}
@ -95,10 +96,6 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
insertPKs: make(map[UniqueID][]primaryKey),
}
if iMsg == nil {
return []Msg{}
}
var spans []opentracing.Span
for _, msg := range iMsg.insertMessages {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())

View File

@ -75,7 +75,6 @@ func newQueryNodeFlowGraph(ctx context.Context,
// dmStreamNode
err = q.flowGraph.SetEdges(dmStreamNode.Name(),
[]string{},
[]string{filterDmNode.Name()},
)
if err != nil {
@ -84,7 +83,6 @@ func newQueryNodeFlowGraph(ctx context.Context,
// filterDmNode
err = q.flowGraph.SetEdges(filterDmNode.Name(),
[]string{dmStreamNode.Name()},
[]string{insertNode.Name()},
)
if err != nil {
@ -93,7 +91,6 @@ func newQueryNodeFlowGraph(ctx context.Context,
// insertNode
err = q.flowGraph.SetEdges(insertNode.Name(),
[]string{filterDmNode.Name()},
[]string{serviceTimeNode.Name()},
)
if err != nil {
@ -102,7 +99,6 @@ func newQueryNodeFlowGraph(ctx context.Context,
// serviceTimeNode
err = q.flowGraph.SetEdges(serviceTimeNode.Name(),
[]string{insertNode.Name()},
[]string{},
)
if err != nil {
@ -145,7 +141,6 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
// dmStreamNode
err = q.flowGraph.SetEdges(dmStreamNode.Name(),
[]string{},
[]string{filterDeleteNode.Name()},
)
if err != nil {
@ -154,7 +149,6 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
// filterDmNode
err = q.flowGraph.SetEdges(filterDeleteNode.Name(),
[]string{dmStreamNode.Name()},
[]string{deleteNode.Name()},
)
if err != nil {
@ -163,7 +157,6 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
// insertNode
err = q.flowGraph.SetEdges(deleteNode.Name(),
[]string{filterDeleteNode.Name()},
[]string{serviceTimeNode.Name()},
)
if err != nil {
@ -172,7 +165,6 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
// serviceTimeNode
err = q.flowGraph.SetEdges(serviceTimeNode.Name(),
[]string{deleteNode.Name()},
[]string{},
)
if err != nil {

View File

@ -42,6 +42,11 @@ func (stNode *serviceTimeNode) Name() string {
// Operate handles input messages, to execute insert operations
func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if in == nil {
log.Debug("type assertion failed for serviceTimeMsg because it's nil", zap.String("name", stNode.Name()))
return []Msg{}
}
if len(in) != 1 {
log.Warn("Invalid operate message input in serviceTimeNode, input length = ", zap.Int("input node", len(in)), zap.String("name", stNode.Name()))
return []Msg{}
@ -49,15 +54,7 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
serviceTimeMsg, ok := in[0].(*serviceTimeMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for serviceTimeMsg because it's nil", zap.String("name", stNode.Name()))
} else {
log.Warn("type assertion failed for serviceTimeMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", stNode.Name()))
}
return []Msg{}
}
if serviceTimeMsg == nil {
log.Warn("type assertion failed for serviceTimeMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", stNode.Name()))
return []Msg{}
}
@ -75,7 +72,7 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
zap.Any("channel", stNode.vChannel),
)
return []Msg{}
return in
}
// newServiceTimeNode returns a new serviceTimeNode

View File

@ -22,6 +22,8 @@ import (
"sync"
)
// Flow Graph is no longer a graph rather than a simple pipeline, this simplified our code and increase recovery speed - xiaofan.
// TimeTickedFlowGraph flowgraph with input from tt msg stream
type TimeTickedFlowGraph struct {
nodeCtx map[NodeName]*nodeCtx
@ -33,45 +35,37 @@ type TimeTickedFlowGraph struct {
// AddNode add Node into flowgraph
func (fg *TimeTickedFlowGraph) AddNode(node Node) {
nodeCtx := nodeCtx{
node: node,
downstreamInputChanIdx: make(map[string]int),
closeCh: make(chan struct{}),
closeWg: fg.closeWg,
node: node,
closeCh: make(chan struct{}),
closeWg: fg.closeWg,
}
fg.nodeCtx[node.Name()] = &nodeCtx
}
// SetEdges set directed edges from in nodes to out nodes
func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, in []string, out []string) error {
func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, out []string) error {
currentNode, ok := fg.nodeCtx[nodeName]
if !ok {
errMsg := "Cannot find node:" + nodeName
return errors.New(errMsg)
}
// init current node's downstream
currentNode.downstream = make([]*nodeCtx, len(out))
// set in nodes
for i, inNodeName := range in {
inNode, ok := fg.nodeCtx[inNodeName]
if !ok {
errMsg := "Cannot find in node:" + inNodeName
return errors.New(errMsg)
}
inNode.downstreamInputChanIdx[nodeName] = i
if len(out) > 1 {
errMsg := "Flow graph now support only pipeline mode, with only one or zero output:" + nodeName
return errors.New(errMsg)
}
// init current node's downstream
// set out nodes
for i, n := range out {
outNode, ok := fg.nodeCtx[n]
for _, name := range out {
outNode, ok := fg.nodeCtx[name]
if !ok {
errMsg := "Cannot find out node:" + n
errMsg := "Cannot find out node:" + name
return errors.New(errMsg)
}
maxQueueLength := outNode.node.MaxQueueLength()
outNode.inputChannels = append(outNode.inputChannels, make(chan Msg, maxQueueLength))
currentNode.downstream[i] = outNode
outNode.inputChannel = make(chan []Msg, maxQueueLength)
currentNode.downstream = outNode
}
return nil

View File

@ -18,7 +18,6 @@ package flowgraph
import (
"context"
"log"
"math"
"math/rand"
"testing"
@ -27,11 +26,10 @@ import (
"github.com/stretchr/testify/assert"
)
// Flow graph basic example: count `d = pow(a) + sqrt(a)`
// Flow graph basic example: count `c = pow(a) + 2`
// nodeA: receive input value a from input channel
// nodeB: count b = pow(a, 2)
// nodeC: count c = sqrt(a)
// nodeD: count d = b + c
// nodeD: count c = b + 2
type nodeA struct {
BaseNode
@ -45,11 +43,6 @@ type nodeB struct {
}
type nodeC struct {
BaseNode
c float64
}
type nodeD struct {
BaseNode
d float64
outputChan chan float64
@ -73,7 +66,7 @@ func (n *nodeA) Operate(in []Msg) []Msg {
var res Msg = &numMsg{
num: a,
}
return []Msg{res, res}
return []Msg{res}
}
func (n *nodeB) Name() string {
@ -81,12 +74,9 @@ func (n *nodeB) Name() string {
}
func (n *nodeB) Operate(in []Msg) []Msg {
if len(in) != 1 {
panic("illegal in")
}
a, ok := in[0].(*numMsg)
if !ok {
return []Msg{}
return nil
}
b := math.Pow(a.num, 2)
var res Msg = &numMsg{
@ -100,43 +90,17 @@ func (n *nodeC) Name() string {
}
func (n *nodeC) Operate(in []Msg) []Msg {
if len(in) != 1 {
panic("illegal in")
}
a, ok := in[0].(*numMsg)
if !ok {
return []Msg{}
}
c := math.Sqrt(a.num)
var res Msg = &numMsg{
num: c,
}
return []Msg{res}
}
func (n *nodeD) Name() string {
return "NodeD"
}
func (n *nodeD) Operate(in []Msg) []Msg {
if len(in) != 2 {
panic("illegal in")
}
b, ok := in[0].(*numMsg)
if !ok {
return nil
}
c, ok := in[1].(*numMsg)
if !ok {
return nil
}
d := b.num + c.num
n.outputChan <- d
c := b.num + 2
n.outputChan <- c
// return nil because nodeD doesn't have any downstream node.
return nil
}
func createExampleFlowGraph() (*TimeTickedFlowGraph, chan float64, chan float64, context.CancelFunc) {
func createExampleFlowGraph() (*TimeTickedFlowGraph, chan float64, chan float64, context.CancelFunc, error) {
const MaxQueueLength = 1024
ctx, cancel := context.WithCancel(context.Background())
@ -160,52 +124,35 @@ func createExampleFlowGraph() (*TimeTickedFlowGraph, chan float64, chan float64,
BaseNode: BaseNode{
maxQueueLength: MaxQueueLength,
},
}
var d Node = &nodeD{
BaseNode: BaseNode{
maxQueueLength: MaxQueueLength,
},
outputChan: outputChan,
}
fg.AddNode(a)
fg.AddNode(b)
fg.AddNode(c)
fg.AddNode(d)
var err = fg.SetEdges(a.Name(),
[]string{},
[]string{b.Name(), c.Name()},
[]string{b.Name()},
)
if err != nil {
log.Fatal("set edges failed")
return nil, nil, nil, cancel, err
}
err = fg.SetEdges(b.Name(),
[]string{a.Name()},
[]string{d.Name()},
[]string{c.Name()},
)
if err != nil {
log.Fatal("set edges failed")
return nil, nil, nil, cancel, err
}
err = fg.SetEdges(c.Name(),
[]string{a.Name()},
[]string{d.Name()},
)
if err != nil {
log.Fatal("set edges failed")
}
err = fg.SetEdges(d.Name(),
[]string{b.Name(), c.Name()},
[]string{},
)
if err != nil {
log.Fatal("set edges failed")
return nil, nil, nil, cancel, err
}
return fg, inputChan, outputChan, cancel
return fg, inputChan, outputChan, cancel, nil
}
func TestTimeTickedFlowGraph_AddNode(t *testing.T) {
@ -232,60 +179,9 @@ func TestTimeTickedFlowGraph_AddNode(t *testing.T) {
assert.Equal(t, len(fg.nodeCtx), 2)
}
func TestTimeTickedFlowGraph_SetEdges(t *testing.T) {
const MaxQueueLength = 1024
inputChan := make(chan float64, MaxQueueLength)
fg := NewTimeTickedFlowGraph(context.TODO())
var a Node = &nodeA{
BaseNode: BaseNode{
maxQueueLength: MaxQueueLength,
},
inputChan: inputChan,
}
var b Node = &nodeB{
BaseNode: BaseNode{
maxQueueLength: MaxQueueLength,
},
}
var c Node = &nodeC{
BaseNode: BaseNode{
maxQueueLength: MaxQueueLength,
},
}
fg.AddNode(a)
fg.AddNode(b)
fg.AddNode(c)
var err = fg.SetEdges(a.Name(),
[]string{b.Name()},
[]string{c.Name()},
)
assert.Nil(t, err)
err = fg.SetEdges("Invalid",
[]string{b.Name()},
[]string{c.Name()},
)
assert.Error(t, err)
err = fg.SetEdges(a.Name(),
[]string{"Invalid"},
[]string{c.Name()},
)
assert.Error(t, err)
err = fg.SetEdges(a.Name(),
[]string{b.Name()},
[]string{"Invalid"},
)
assert.Error(t, err)
}
func TestTimeTickedFlowGraph_Start(t *testing.T) {
fg, inputChan, outputChan, cancel := createExampleFlowGraph()
fg, inputChan, outputChan, cancel, err := createExampleFlowGraph()
assert.NoError(t, err)
defer cancel()
fg.Start()
@ -297,7 +193,7 @@ func TestTimeTickedFlowGraph_Start(t *testing.T) {
// output check
d := <-outputChan
res := math.Pow(a, 2) + math.Sqrt(a)
res := math.Pow(a, 2) + 2
assert.Equal(t, d, res)
}
}()
@ -305,7 +201,8 @@ func TestTimeTickedFlowGraph_Start(t *testing.T) {
}
func TestTimeTickedFlowGraph_Close(t *testing.T) {
fg, _, _, cancel := createExampleFlowGraph()
fg, _, _, cancel, err := createExampleFlowGraph()
assert.NoError(t, err)
defer cancel()
fg.Close()
}

View File

@ -17,6 +17,8 @@
package flowgraph
import (
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/util/trace"
@ -28,9 +30,9 @@ import (
// InputNode is the entry point of flowgragh
type InputNode struct {
BaseNode
inStream msgstream.MsgStream
name string
closeMsgChan chan struct{}
inStream msgstream.MsgStream
name string
closeOnce sync.Once
}
// IsInputNode returns whether Node is InputNode
@ -45,15 +47,9 @@ func (inNode *InputNode) Start() {
// Close implements node
func (inNode *InputNode) Close() {
select {
case <-inNode.closeMsgChan:
return
default:
close(inNode.closeMsgChan)
log.Debug("message stream closed",
zap.String("node name", inNode.name),
)
}
inNode.closeOnce.Do(func() {
inNode.inStream.Close()
})
}
// Name returns node name
@ -68,44 +64,40 @@ func (inNode *InputNode) InStream() msgstream.MsgStream {
// Operate consume a message pack from msgstream and return
func (inNode *InputNode) Operate(in []Msg) []Msg {
select {
case <-inNode.closeMsgChan:
inNode.inStream.Close()
msgPack, ok := <-inNode.inStream.Chan()
if !ok {
log.Warn("MsgStream closed", zap.Any("input node", inNode.Name()))
return []Msg{&MsgStreamMsg{
isCloseMsg: true,
}}
case msgPack, ok := <-inNode.inStream.Chan():
if !ok {
log.Warn("MsgStream closed", zap.Any("input node", inNode.Name()))
return []Msg{}
}
// TODO: add status
if msgPack == nil {
return nil
}
var spans []opentracing.Span
for _, msg := range msgPack.Msgs {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
sp.LogFields(oplog.String("input_node name", inNode.Name()))
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
var msgStreamMsg Msg = &MsgStreamMsg{
tsMessages: msgPack.Msgs,
timestampMin: msgPack.BeginTs,
timestampMax: msgPack.EndTs,
startPositions: msgPack.StartPositions,
endPositions: msgPack.EndPositions,
}
for _, span := range spans {
span.Finish()
}
return []Msg{msgStreamMsg}
}
// TODO: add status
if msgPack == nil {
return []Msg{}
}
var spans []opentracing.Span
for _, msg := range msgPack.Msgs {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
sp.LogFields(oplog.String("input_node name", inNode.Name()))
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
var msgStreamMsg Msg = &MsgStreamMsg{
tsMessages: msgPack.Msgs,
timestampMin: msgPack.BeginTs,
timestampMax: msgPack.EndTs,
startPositions: msgPack.StartPositions,
endPositions: msgPack.EndPositions,
}
for _, span := range spans {
span.Finish()
}
// TODO batch operate msg
return []Msg{msgStreamMsg}
}
// NewInputNode composes an InputNode with provided MsgStream, name and parameters
@ -115,9 +107,8 @@ func NewInputNode(inStream msgstream.MsgStream, nodeName string, maxQueueLength
baseNode.SetMaxParallelism(maxParallelism)
return &InputNode{
BaseNode: baseNode,
inStream: inStream,
name: nodeName,
closeMsgChan: make(chan struct{}),
BaseNode: baseNode,
inStream: inStream,
name: nodeName,
}
}

View File

@ -53,8 +53,11 @@ func TestInputNode(t *testing.T) {
stream := inputNode.InStream()
assert.NotNil(t, stream)
output := inputNode.Operate([]Msg{})
assert.Greater(t, len(output), 0)
output := inputNode.Operate(nil)
assert.NotNil(t, output)
msg, ok := output[0].(*MsgStreamMsg)
assert.True(t, ok)
assert.False(t, msg.isCloseMsg)
}
func Test_NewInputNode(t *testing.T) {

View File

@ -52,11 +52,9 @@ type BaseNode struct {
// nodeCtx maintains the running context for a Node in flowgragh
type nodeCtx struct {
node Node
inputChannels []chan Msg
inputMessages []Msg
downstream []*nodeCtx
downstreamInputChanIdx map[string]int
node Node
inputChannel chan []Msg
downstream *nodeCtx
closeCh chan struct{} // notify work to exit
closeWg *sync.WaitGroup
@ -100,45 +98,37 @@ func (nodeCtx *nodeCtx) work() {
return
default:
// inputs from inputsMessages for Operate
var inputs, res []Msg
var input, output []Msg
if !nodeCtx.node.IsInputNode() {
nodeCtx.collectInputMessages()
inputs = nodeCtx.inputMessages
input = <-nodeCtx.inputChannel
}
// the input message decides whether the operate method is executed
if isCloseMsg(inputs) {
res = inputs
if isCloseMsg(input) {
output = input
}
if len(res) == 0 {
if len(output) == 0 {
n := nodeCtx.node
res = n.Operate(inputs)
output = n.Operate(input)
}
// the res decide whether the node should be closed.
if isCloseMsg(res) {
// the output decide whether the node should be closed.
if isCloseMsg(output) {
close(nodeCtx.closeCh)
nodeCtx.closeWg.Done()
nodeCtx.node.Close()
if nodeCtx.inputChannel != nil {
close(nodeCtx.inputChannel)
}
}
if enableTtChecker {
checker.Check(name)
}
downstreamLength := len(nodeCtx.downstreamInputChanIdx)
if len(nodeCtx.downstream) < downstreamLength {
log.Warn("", zap.Any("nodeCtx.downstream length", len(nodeCtx.downstream)))
// deliver to all following flow graph node.
if nodeCtx.downstream != nil {
nodeCtx.downstream.inputChannel <- output
}
if len(res) < downstreamLength {
// log.Println("node result length = ", len(res))
break
}
w := sync.WaitGroup{}
for i := 0; i < downstreamLength; i++ {
w.Add(1)
go nodeCtx.downstream[i].deliverMsg(&w, res[i], nodeCtx.downstreamInputChanIdx[nodeCtx.downstream[i].node.Name()])
}
w.Wait()
}
}
}
@ -150,72 +140,6 @@ func (nodeCtx *nodeCtx) Close() {
}
}
// deliverMsg tries to put the Msg to specified downstream channel
func (nodeCtx *nodeCtx) deliverMsg(wg *sync.WaitGroup, msg Msg, inputChanIdx int) {
defer wg.Done()
defer func() {
err := recover()
if err != nil {
log.Warn(fmt.Sprintln(err))
}
}()
nodeCtx.inputChannels[inputChanIdx] <- msg
}
func (nodeCtx *nodeCtx) collectInputMessages() {
inputsNum := len(nodeCtx.inputChannels)
nodeCtx.inputMessages = make([]Msg, inputsNum)
// init inputMessages,
// receive messages from inputChannels,
// and move them to inputMessages.
for i := 0; i < inputsNum; i++ {
channel := nodeCtx.inputChannels[i]
msg, ok := <-channel
if !ok {
// TODO: add status
log.Warn("input channel closed")
return
}
nodeCtx.inputMessages[i] = msg
}
// timeTick alignment check
if len(nodeCtx.inputMessages) > 1 {
t := nodeCtx.inputMessages[0].TimeTick()
latestTime := t
for i := 1; i < len(nodeCtx.inputMessages); i++ {
if latestTime < nodeCtx.inputMessages[i].TimeTick() {
latestTime = nodeCtx.inputMessages[i].TimeTick()
}
}
// wait for time tick
sign := make(chan struct{})
go func() {
for i := 0; i < len(nodeCtx.inputMessages); i++ {
for nodeCtx.inputMessages[i].TimeTick() != latestTime {
log.Debug("Try to align timestamp", zap.Uint64("t1", latestTime), zap.Uint64("t2", nodeCtx.inputMessages[i].TimeTick()))
channel := nodeCtx.inputChannels[i]
msg, ok := <-channel
if !ok {
log.Warn("input channel closed")
return
}
nodeCtx.inputMessages[i] = msg
}
}
sign <- struct{}{}
}()
select {
case <-time.After(10 * time.Second):
panic("Fatal, misaligned time tick, please restart pulsar")
case <-sign:
}
}
}
// MaxQueueLength returns the maximal queue length
func (node *BaseNode) MaxQueueLength() int32 {
return node.maxQueueLength

View File

@ -77,16 +77,12 @@ func TestNodeCtx_Start(t *testing.T) {
inputNode := NewInputNode(msgStream, nodeName, 100, 100)
node := &nodeCtx{
node: inputNode,
inputChannels: make([]chan Msg, 2),
downstreamInputChanIdx: make(map[string]int),
closeCh: make(chan struct{}),
closeWg: &sync.WaitGroup{},
node: inputNode,
closeCh: make(chan struct{}),
closeWg: &sync.WaitGroup{},
}
for i := 0; i < len(node.inputChannels); i++ {
node.inputChannels[i] = make(chan Msg)
}
node.inputChannel = make(chan []Msg)
assert.NotPanics(t, func() {
node.Start()