2021-04-19 13:42:47 +08:00
|
|
|
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
|
|
|
2020-11-09 16:27:11 +08:00
|
|
|
package flowgraph
|
|
|
|
|
|
|
|
import (
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/msgstream"
|
|
|
|
"github.com/milvus-io/milvus/internal/util/trace"
|
2021-02-25 17:35:36 +08:00
|
|
|
"github.com/opentracing/opentracing-go"
|
2020-11-09 16:27:11 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
type InputNode struct {
|
|
|
|
BaseNode
|
|
|
|
inStream *msgstream.MsgStream
|
|
|
|
name string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (inNode *InputNode) IsInputNode() bool {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2021-03-26 18:40:04 +08:00
|
|
|
func (inNode *InputNode) Close() {
|
|
|
|
(*inNode.inStream).Close()
|
|
|
|
}
|
|
|
|
|
2020-11-09 16:27:11 +08:00
|
|
|
func (inNode *InputNode) Name() string {
|
|
|
|
return inNode.name
|
|
|
|
}
|
|
|
|
|
|
|
|
func (inNode *InputNode) InStream() *msgstream.MsgStream {
|
|
|
|
return inNode.inStream
|
|
|
|
}
|
|
|
|
|
|
|
|
// empty input and return one *Msg
|
2021-03-25 14:41:46 +08:00
|
|
|
func (inNode *InputNode) Operate(in []Msg) []Msg {
|
2020-11-12 12:04:12 +08:00
|
|
|
//fmt.Println("Do InputNode operation")
|
2021-01-06 11:17:35 +08:00
|
|
|
|
2021-03-25 14:41:46 +08:00
|
|
|
msgPack := (*inNode.inStream).Consume()
|
2021-01-06 14:45:50 +08:00
|
|
|
|
2020-11-12 12:04:12 +08:00
|
|
|
// TODO: add status
|
|
|
|
if msgPack == nil {
|
2021-03-25 14:41:46 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
var spans []opentracing.Span
|
|
|
|
for _, msg := range msgPack.Msgs {
|
|
|
|
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
|
|
|
|
spans = append(spans, sp)
|
|
|
|
msg.SetTraceCtx(ctx)
|
2020-11-12 12:04:12 +08:00
|
|
|
}
|
|
|
|
|
2020-11-09 16:27:11 +08:00
|
|
|
var msgStreamMsg Msg = &MsgStreamMsg{
|
2021-01-26 09:44:39 +08:00
|
|
|
tsMessages: msgPack.Msgs,
|
|
|
|
timestampMin: msgPack.BeginTs,
|
|
|
|
timestampMax: msgPack.EndTs,
|
|
|
|
startPositions: msgPack.StartPositions,
|
2021-03-16 17:55:42 +08:00
|
|
|
endPositions: msgPack.EndPositions,
|
2020-11-09 16:27:11 +08:00
|
|
|
}
|
|
|
|
|
2021-03-25 14:41:46 +08:00
|
|
|
for _, span := range spans {
|
|
|
|
span.Finish()
|
|
|
|
}
|
|
|
|
|
|
|
|
return []Msg{msgStreamMsg}
|
2020-11-09 16:27:11 +08:00
|
|
|
}
|
|
|
|
|
2020-11-19 10:46:17 +08:00
|
|
|
func NewInputNode(inStream *msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32) *InputNode {
|
2020-11-09 16:27:11 +08:00
|
|
|
baseNode := BaseNode{}
|
2020-11-18 17:32:52 +08:00
|
|
|
baseNode.SetMaxQueueLength(maxQueueLength)
|
|
|
|
baseNode.SetMaxParallelism(maxParallelism)
|
2020-11-09 16:27:11 +08:00
|
|
|
|
|
|
|
return &InputNode{
|
|
|
|
BaseNode: baseNode,
|
|
|
|
inStream: inStream,
|
|
|
|
name: nodeName,
|
|
|
|
}
|
|
|
|
}
|