milvus/internal/querynode/flow_graph_msg_stream_input_nodes.go
bigsheeper a7eae3a4c1 Remove consumer in query node's initialization
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
2021-02-05 14:15:10 +08:00

41 lines
1.3 KiB
Go

package querynode
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph.InputNode {
factory := pulsarms.NewFactory(Params.PulsarAddress, Params.InsertReceiveBufSize, Params.InsertPulsarBufSize)
// query node doesn't need to consume any topic
insertStream, _ := factory.NewTtMsgStream(ctx)
dsService.dmStream = insertStream
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
node := flowgraph.NewInputNode(&insertStream, "dmInputNode", maxQueueLength, maxParallelism)
return node
}
func (dsService *dataSyncService) newDDInputNode(ctx context.Context) *flowgraph.InputNode {
factory := pulsarms.NewFactory(Params.PulsarAddress, Params.DDReceiveBufSize, Params.DDPulsarBufSize)
consumeChannels := Params.DDChannelNames
consumeSubName := Params.MsgChannelSubName
ddStream, _ := factory.NewTtMsgStream(ctx)
ddStream.AsConsumer(consumeChannels, consumeSubName)
dsService.ddStream = ddStream
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
node := flowgraph.NewInputNode(&ddStream, "ddInputNode", maxQueueLength, maxParallelism)
return node
}