From b4c55f6b3cb471f1ce59a8bf103a2596eb79a886 Mon Sep 17 00:00:00 2001 From: godchen Date: Wed, 25 Aug 2021 11:41:52 +0800 Subject: [PATCH] Add delete node (#7261) Signed-off-by: godchen --- internal/datanode/data_sync_service.go | 18 ++++- internal/datanode/flow_graph_delete_node.go | 66 +++++++++++++++++++ .../datanode/flow_graph_delete_node_test.go | 37 +++++++++++ 3 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 internal/datanode/flow_graph_delete_node.go create mode 100644 internal/datanode/flow_graph_delete_node_test.go diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 24f7c3fcb5..1795042714 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -166,6 +166,11 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro vchanInfo.GetChannelName(), ) + var deleteNode Node = newDeleteDNode( + dsService.ctx, + dsService.replica, + ) + // recover segment checkpoints for _, us := range vchanInfo.GetUnflushedSegments() { if us.CollectionID != dsService.collectionID || @@ -192,6 +197,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro dsService.fg.AddNode(dmStreamNode) dsService.fg.AddNode(ddNode) dsService.fg.AddNode(insertBufferNode) + dsService.fg.AddNode(deleteNode) // ddStreamNode err = dsService.fg.SetEdges(dmStreamNode.Name(), @@ -216,11 +222,21 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro // insertBufferNode err = dsService.fg.SetEdges(insertBufferNode.Name(), []string{ddNode.Name()}, - []string{}, + []string{deleteNode.Name()}, ) if err != nil { log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err)) return err } + + //deleteNode + err = dsService.fg.SetEdges(deleteNode.Name(), + []string{insertBufferNode.Name()}, + []string{}, + ) + if err != nil { + log.Error("set edges failed in node", zap.String("name", deleteNode.Name()), zap.Error(err)) + return err + } return nil } diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go new file mode 100644 index 0000000000..bf66355577 --- /dev/null +++ b/internal/datanode/flow_graph_delete_node.go @@ -0,0 +1,66 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package datanode + +import ( + "context" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/log" +) + +type deleteNode struct { + BaseNode + + replica Replica +} + +func (ddn *deleteNode) Name() string { + return "deletedNode" +} + +func (ddn *deleteNode) Operate(in []Msg) []Msg { + // log.Debug("DDNode Operating") + + if len(in) != 1 { + log.Error("Invalid operate message input in deleteNode", zap.Int("input length", len(in))) + return []Msg{} + } + + if len(in) == 0 { + return []Msg{} + } + + msMsg, ok := in[0].(*MsgStreamMsg) + if !ok { + log.Error("type assertion failed for MsgStreamMsg") + return []Msg{} + // TODO: add error handling + } + + if msMsg == nil { + return []Msg{} + } + + return []Msg{} +} + +func newDeleteDNode(ctx context.Context, replica Replica) *deleteNode { + baseNode := BaseNode{} + baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength) + + return &deleteNode{ + BaseNode: baseNode, + replica: replica, + } +} diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go new file mode 100644 index 0000000000..1f3a7e02c1 --- /dev/null +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -0,0 +1,37 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package datanode + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFlowGraphDeleteNode_Operate_Nil(t *testing.T) { + ctx := context.Background() + var replica Replica + deleteNode := newDeleteDNode(ctx, replica) + result := deleteNode.Operate([]Msg{}) + assert.Equal(t, len(result), 0) +} + +func TestFlowGraphDeleteNode_Operate_Invalid_Size(t *testing.T) { + ctx := context.Background() + var replica Replica + deleteNode := newDeleteDNode(ctx, replica) + var Msg1 Msg + var Msg2 Msg + result := deleteNode.Operate([]Msg{Msg1, Msg2}) + assert.Equal(t, len(result), 0) +}