diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go index 1543b0eeed..379694f3af 100644 --- a/internal/proxyservice/impl.go +++ b/internal/proxyservice/impl.go @@ -121,7 +121,7 @@ func (s *ServiceImpl) Init() error { "proxyservicesub") // TODO: add config log.Println("create node time tick consumer channel: ", Params.NodeTimeTickChannel) - ttBarrier := newSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{0}, 10) + ttBarrier := newSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{1}, 10) log.Println("create soft time tick barrier ...") s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream, insertTickMsgStream) log.Println("create time tick ...") diff --git a/internal/querynode/flow_graph_msg_stream_input_nodes.go b/internal/querynode/flow_graph_msg_stream_input_nodes.go index 1816d0e836..0d229606bc 100644 --- a/internal/querynode/flow_graph_msg_stream_input_nodes.go +++ b/internal/querynode/flow_graph_msg_stream_input_nodes.go @@ -3,6 +3,7 @@ package querynode import ( "context" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) @@ -10,14 +11,19 @@ import ( func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph.InputNode { factory := pulsarms.NewFactory(Params.PulsarAddress, Params.InsertReceiveBufSize, Params.InsertPulsarBufSize) - // query node doesn't need to consume any topic + consumeChannels := Params.InsertChannelNames + consumeSubName := Params.MsgChannelSubName + insertStream, _ := factory.NewTtMsgStream(ctx) - dsService.dmStream = insertStream + insertStream.AsConsumer(consumeChannels, consumeSubName) + + var stream msgstream.MsgStream = insertStream + dsService.dmStream = stream maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism - node := flowgraph.NewInputNode(&insertStream, "dmInputNode", maxQueueLength, maxParallelism) + node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism) return node } @@ -30,11 +36,12 @@ func (dsService *dataSyncService) newDDInputNode(ctx context.Context) *flowgraph ddStream, _ := factory.NewTtMsgStream(ctx) ddStream.AsConsumer(consumeChannels, consumeSubName) - dsService.ddStream = ddStream + var stream msgstream.MsgStream = ddStream + dsService.ddStream = stream maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism - node := flowgraph.NewInputNode(&ddStream, "ddInputNode", maxQueueLength, maxParallelism) + node := flowgraph.NewInputNode(&stream, "ddInputNode", maxQueueLength, maxParallelism) return node } diff --git a/scripts/start.sh b/scripts/start.sh new file mode 100755 index 0000000000..5cfa8061a4 --- /dev/null +++ b/scripts/start.sh @@ -0,0 +1,34 @@ +cd .. +make + +echo "starting master" +nohup ./bin/masterservice > ~/masterservice.out 2>&1 & + +echo "starting proxyservice" +nohup ./bin/proxyservice > ~/proxyservice.out 2>&1 & + +echo "starting proxynode" +nohup ./bin/proxynode > ~/proxynode.out 2>&1 & + +echo "starting queryservice" +nohup ./bin/queryservice > ~/queryservice.out 2>&1 & + +echo "starting querynode1" +export QUERY_NODE_ID=1 +nohup ./bin/querynode > ~/querynode1.out 2>&1 & + +echo "starting querynode2" +export QUERY_NODE_ID=2 +nohup ./bin/querynode > ~/querynode2.out 2>&1 & + +echo "starting dataservice" +nohup ./bin/dataservice > ~/dataservice.out 2>&1 & + +echo "starting datanode" +nohup ./bin/datanode > ~/datanode.out 2>&1 & + +echo "starting indexservice" +nohup ./bin/indexservice > ~/indexservice.out 2>&1 & + +echo "starting indexnode" +nohup ./bin/indexnode > ~/indexnode.out 2>&1 & diff --git a/scripts/stop.sh b/scripts/stop.sh new file mode 100755 index 0000000000..e0806199aa --- /dev/null +++ b/scripts/stop.sh @@ -0,0 +1,29 @@ +echo "stopping masterservice" +kill -9 $(ps -e | grep masterservice | awk '{print $1}') + +echo "stopping proxyservice" +kill -9 $(ps -e | grep proxyservice | awk '{print $1}') + +echo "stopping proxynode" +kill -9 $(ps -e | grep proxynode | awk '{print $1}') + +echo "stopping queryservice" +kill -9 $(ps -e | grep queryservice | awk '{print $1}') + +echo "stopping querynode" +kill -9 $(ps -e | grep querynode | awk '{print $1}') + +echo "stopping dataservice" +kill -9 $(ps -e | grep dataservice | awk '{print $1}') + +echo "stopping datanode" +kill -9 $(ps -e | grep datanode | awk '{print $1}') + +echo "stopping indexservice" +kill -9 $(ps -e | grep indexservice | awk '{print $1}') + +echo "stopping indexnode" +kill -9 $(ps -e | grep indexnode | awk '{print $1}') + +echo "completed" +