milvus/internal/util/pipeline/pipeline.go

79 lines
1.9 KiB
Go
Raw Normal View History

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipeline
import (
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
type Pipeline interface {
Add(node ...Node)
Start() error
Close()
}
type pipeline struct {
nodes []*nodeCtx
inputChannel chan Msg
nodeTtInterval time.Duration
enableTtChecker bool
}
func (p *pipeline) Add(nodes ...Node) {
for _, node := range nodes {
p.addNode(node)
}
}
func (p *pipeline) addNode(node Node) {
nodeCtx := newNodeCtx(node)
if p.enableTtChecker {
nodeCtx.checker = timerecord.GetGroupChecker("fgNode", p.nodeTtInterval, func(list []string) {
log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", p.nodeTtInterval))
})
}
if len(p.nodes) != 0 {
p.nodes[len(p.nodes)-1].next = nodeCtx
} else {
p.inputChannel = nodeCtx.inputChannel
}
p.nodes = append(p.nodes, nodeCtx)
}
func (p *pipeline) Start() error {
if len(p.nodes) == 0 {
return ErrEmptyPipeline
}
for _, node := range p.nodes {
node.Start()
}
return nil
}
func (p *pipeline) Close() {
for _, node := range p.nodes {
node.Close()
}
}