From 9186d5527e96f9b35f871dce2e67a61fcdb31e40 Mon Sep 17 00:00:00 2001 From: godchen Date: Fri, 5 Nov 2021 11:59:02 +0800 Subject: [PATCH] Forward delete msg (#11210) Signed-off-by: godchen --- internal/datanode/data_node_test.go | 16 ++-- internal/datanode/data_sync_service.go | 2 +- internal/datanode/data_sync_service_test.go | 20 ++--- internal/datanode/flow_graph_dd_node.go | 81 +++++++++++++++++++- internal/datanode/flow_graph_dd_node_test.go | 35 ++++++++- internal/datanode/param_table.go | 25 ++++++ 6 files changed, 155 insertions(+), 24 deletions(-) diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 0b5342427d..1639af3927 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -132,7 +132,7 @@ func TestDataNode(t *testing.T) { node2 := newIDLEDataNodeMock(ctx) err = node2.Start() assert.Nil(t, err) - dmChannelName := "fake-dm-channel-test-NewDataSyncService" + dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-NewDataSyncService" vchan := &datapb.VchannelInfo{ CollectionID: 1, @@ -160,7 +160,7 @@ func TestDataNode(t *testing.T) { }) t.Run("Test FlushSegments", func(t *testing.T) { - dmChannelName := "fake-dm-channel-test-FlushSegments" + dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-FlushSegments" node1 := newIDLEDataNodeMock(context.TODO()) err = node1.Init() @@ -368,9 +368,9 @@ func TestDataNode(t *testing.T) { collID UniqueID dmChannelName string }{ - {1, "fake-dm-backgroundgc-1"}, - {2, "fake-dm-backgroundgc-2"}, - {3, "fake-dm-backgroundgc-3"}, + {1, "fake-by-dev-rootcoord-dml-backgroundgc-1"}, + {2, "fake-by-dev-rootcoord-dml-backgroundgc-2"}, + {3, "fake-by-dev-rootcoord-dml-backgroundgc-3"}, {4, ""}, {1, ""}, } @@ -394,7 +394,7 @@ func TestDataNode(t *testing.T) { }) t.Run("Test ReleaseDataSyncService", func(t *testing.T) { - dmChannelName := "fake-dm-channel-test-NewDataSyncService" + dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-NewDataSyncService" vchan := &datapb.VchannelInfo{ CollectionID: 1, @@ -485,12 +485,12 @@ func TestWatchChannel(t *testing.T) { t.Run("test watch channel", func(t *testing.T) { kv, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath) require.NoError(t, err) - oldInvalidCh := "datanode-etcd-test-channel-invalid" + oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid" path := fmt.Sprintf("%s/%d/%s", Params.ChannelWatchSubPath, node.NodeID, oldInvalidCh) err = kv.Save(path, string([]byte{23})) assert.NoError(t, err) - ch := fmt.Sprintf("datanode-etcd-test-channel_%d", rand.Int31()) + ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31()) path = fmt.Sprintf("%s/%d/%s", Params.ChannelWatchSubPath, node.NodeID, ch) c := make(chan struct{}) go func() { diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 81455f7841..c2e932a3bb 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -279,7 +279,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro return err } - var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo) + var ddNode Node = newDDNode(dsService.ctx, dsService.clearSignal, dsService.collectionID, vchanInfo, dsService.msFactory) var insertBufferNode Node insertBufferNode, err = newInsertBufferNode( dsService.ctx, diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 2aeb6d081e..5006c1bcb5 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -96,32 +96,32 @@ func TestDataSyncService_newDataSyncService(te *testing.T) { tests := []*testInfo{ {false, false, &mockMsgStreamFactory{false, true}, - 0, "", + 0, "by-dev-rootcoord-dml_test", 0, 0, "", 0, 0, 0, "", 0, "SetParamsReturnError"}, {true, false, &mockMsgStreamFactory{true, true}, - 0, "", + 0, "by-dev-rootcoord-dml_test", 1, 0, "", 0, 1, 1, "", 0, "CollID 0 mismach with seginfo collID 1"}, {true, false, &mockMsgStreamFactory{true, true}, - 1, "c1", - 1, 0, "c2", 0, - 1, 1, "c3", 0, + 1, "by-dev-rootcoord-dml_1", + 1, 0, "by-dev-rootcoord-dml_2", 0, + 1, 1, "by-dev-rootcoord-dml_3", 0, "chanName c1 mismach with seginfo chanName c2"}, {true, false, &mockMsgStreamFactory{true, true}, - 1, "c1", - 1, 0, "c1", 0, - 1, 1, "c2", 0, + 1, "by-dev-rootcoord-dml_1", + 1, 0, "by-dev-rootcoord-dml_1", 0, + 1, 1, "by-dev-rootcoord-dml_2", 0, "add normal segments"}, {false, false, &mockMsgStreamFactory{true, false}, - 0, "", + 0, "by-dev-rootcoord-dml", 0, 0, "", 0, 0, 0, "", 0, "error when newinsertbufernode"}, {false, true, &mockMsgStreamFactory{true, false}, - 0, "", + 0, "by-dev-rootcoord-dml", 0, 0, "", 0, 0, 0, "", 0, "replica nil"}, diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 5ce6b66630..a986f90f39 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -17,6 +17,7 @@ package datanode import ( + "context" "sync" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/trace" "github.com/opentracing/opentracing-go" @@ -56,6 +58,8 @@ type ddNode struct { segID2SegInfo sync.Map // segment ID to *SegmentInfo flushedSegments []*datapb.SegmentInfo + + deltaMsgStream msgstream.MsgStream } // Name returns node name, implementing flowgraph.Node @@ -95,6 +99,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { endPositions: make([]*internalpb.MsgPosition, 0), } + forwardMsgs := make([]msgstream.TsMsg, 0) for _, msg := range msMsg.TsMessages() { switch msg.Type() { case commonpb.MsgType_DropCollection: @@ -124,6 +129,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { fgMsg.insertMessages = append(fgMsg.insertMessages, imsg) case commonpb.MsgType_Delete: log.Debug("DDNode receive delete messages") + forwardMsgs = append(forwardMsgs, msg) dmsg := msg.(*msgstream.DeleteMsg) if dmsg.CollectionID != ddn.collectionID { //log.Debug("filter invalid DeleteMsg, collection mis-match", @@ -134,6 +140,11 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { fgMsg.deleteMessages = append(fgMsg.deleteMessages, dmsg) } } + err := ddn.forwardDeleteMsg(forwardMsgs, msMsg.TimestampMin(), msMsg.TimestampMax()) + if err != nil { + // TODO: proper deal with error + log.Warn("DDNode forward delete msg failed", zap.Error(err)) + } fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...) fgMsg.endPositions = append(fgMsg.endPositions, msMsg.EndPositions()...) @@ -169,7 +180,60 @@ func (ddn *ddNode) isFlushed(segmentID UniqueID) bool { return false } -func newDDNode(clearSignal chan<- UniqueID, collID UniqueID, vchanInfo *datapb.VchannelInfo) *ddNode { +func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, maxTs Timestamp) error { + if err := ddn.sendDeltaTimeTick(minTs); err != nil { + return err + } + if len(msgs) != 0 { + var msgPack = msgstream.MsgPack{ + Msgs: msgs, + BeginTs: minTs, + EndTs: maxTs, + } + if err := ddn.deltaMsgStream.Produce(&msgPack); err != nil { + return err + } + } + if err := ddn.sendDeltaTimeTick(maxTs); err != nil { + return err + } + return nil +} + +func (ddn *ddNode) sendDeltaTimeTick(ts Timestamp) error { + msgPack := msgstream.MsgPack{} + baseMsg := msgstream.BaseMsg{ + BeginTimestamp: ts, + EndTimestamp: ts, + HashValues: []uint32{0}, + } + timeTickResult := internalpb.TimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_TimeTick, + MsgID: 0, + Timestamp: ts, + SourceID: Params.NodeID, + }, + } + timeTickMsg := &msgstream.TimeTickMsg{ + BaseMsg: baseMsg, + TimeTickMsg: timeTickResult, + } + msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) + + if err := ddn.deltaMsgStream.Produce(&msgPack); err != nil { + return err + } + return nil +} + +func (ddn *ddNode) Close() { + if ddn.deltaMsgStream != nil { + ddn.deltaMsgStream.Close() + } +} + +func newDDNode(ctx context.Context, clearSignal chan<- UniqueID, collID UniqueID, vchanInfo *datapb.VchannelInfo, msFactory msgstream.Factory) *ddNode { baseNode := BaseNode{} baseNode.SetMaxQueueLength(Params.FlowGraphMaxQueueLength) baseNode.SetMaxParallelism(Params.FlowGraphMaxParallelism) @@ -181,11 +245,26 @@ func newDDNode(clearSignal chan<- UniqueID, collID UniqueID, vchanInfo *datapb.V zap.Int("No. Segment", len(vchanInfo.GetFlushedSegments())), ) + deltaStream, err := msFactory.NewMsgStream(ctx) + if err != nil { + return nil + } + deltaChannelName, err := rootcoord.ConvertChannelName(vchanInfo.ChannelName, Params.DmlChannelName, Params.DeltaChannelName) + if err != nil { + log.Error(err.Error()) + return nil + } + deltaStream.AsProducer([]string{deltaChannelName}) + log.Debug("datanode AsProducer", zap.String("DeltaChannelName", Params.SegmentStatisticsChannelName)) + var deltaMsgStream msgstream.MsgStream = deltaStream + deltaMsgStream.Start() + dd := &ddNode{ BaseNode: baseNode, clearSignal: clearSignal, collectionID: collID, flushedSegments: fs, + deltaMsgStream: deltaMsgStream, } for _, us := range vchanInfo.GetUnflushedSegments() { diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index 4a6219ed57..6399b25f45 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -17,6 +17,7 @@ package datanode import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -28,6 +29,10 @@ import ( "github.com/milvus-io/milvus/internal/util/flowgraph" ) +type mockFactory struct { + msgstream.Factory +} + func TestFlowGraph_DDNode_newDDNode(te *testing.T) { tests := []struct { inCollID UniqueID @@ -65,12 +70,15 @@ func TestFlowGraph_DDNode_newDDNode(te *testing.T) { } ddNode := newDDNode( + context.Background(), make(chan UniqueID), test.inCollID, &datapb.VchannelInfo{ FlushedSegments: fi, UnflushedSegments: []*datapb.SegmentInfo{di}, + ChannelName: "by-dev-rootcoord-dml-test", }, + msgstream.NewPmsFactory(), ) flushedSegIDs := make([]int64, 0) @@ -131,9 +139,13 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { for _, test := range tests { te.Run(test.description, func(t *testing.T) { + factory := msgstream.NewPmsFactory() + deltaStream, err := factory.NewMsgStream(context.Background()) + assert.Nil(t, err) ddn := ddNode{ - clearSignal: test.ddnClearSignal, - collectionID: test.ddnCollID, + clearSignal: test.ddnClearSignal, + collectionID: test.ddnCollID, + deltaMsgStream: deltaStream, } var dropCollMsg msgstream.TsMsg = &msgstream.DropCollectionMsg{ @@ -186,10 +198,14 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { for _, test := range tests { te.Run(test.description, func(t *testing.T) { fs := &datapb.SegmentInfo{ID: test.ddnFlushedSegment} + factory := msgstream.NewPmsFactory() + deltaStream, err := factory.NewMsgStream(context.Background()) + assert.Nil(t, err) // Prepare ddNode states ddn := ddNode{ flushedSegments: []*datapb.SegmentInfo{fs}, collectionID: test.ddnCollID, + deltaMsgStream: deltaStream, } FilterThreshold = test.threshold @@ -228,9 +244,13 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { for _, test := range tests { te.Run(test.description, func(t *testing.T) { + factory := msgstream.NewPmsFactory() + deltaStream, err := factory.NewMsgStream(context.Background()) + assert.Nil(t, err) // Prepare ddNode states ddn := ddNode{ - collectionID: test.ddnCollID, + collectionID: test.ddnCollID, + deltaMsgStream: deltaStream, } // Prepare delete messages @@ -286,9 +306,13 @@ func TestFlowGraph_DDNode_filterMessages(te *testing.T) { s := &datapb.SegmentInfo{ID: id} fs = append(fs, s) } + factory := msgstream.NewPmsFactory() + deltaStream, err := factory.NewMsgStream(context.Background()) + assert.Nil(t, err) // Prepare ddNode states ddn := ddNode{ flushedSegments: fs, + deltaMsgStream: deltaStream, } for k, v := range test.ddnSegID2Ts { @@ -346,7 +370,10 @@ func TestFlowGraph_DDNode_isFlushed(te *testing.T) { s := &datapb.SegmentInfo{ID: id} fs = append(fs, s) } - ddn := &ddNode{flushedSegments: fs} + factory := msgstream.NewPmsFactory() + deltaStream, err := factory.NewMsgStream(context.Background()) + assert.Nil(t, err) + ddn := &ddNode{flushedSegments: fs, deltaMsgStream: deltaStream} assert.Equal(t, test.expectedOut, ddn.isFlushed(test.inSeg)) }) } diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index 0b575a753b..90a6e3feb3 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -46,6 +46,10 @@ type ParamTable struct { DeleteBinlogRootPath string Alias string // Different datanode in one machine + // Channel Name + DmlChannelName string + DeltaChannelName string + // Pulsar address PulsarAddress string @@ -130,6 +134,9 @@ func (p *ParamTable) Init() { p.initMinioUseSSL() p.initMinioBucketName() + p.initDmlChannelName() + p.initDeltaChannelName() + p.initRoleName() } @@ -289,3 +296,21 @@ func (p *ParamTable) initMinioBucketName() { func (p *ParamTable) initRoleName() { p.RoleName = "datanode" } + +func (p *ParamTable) initDmlChannelName() { + config, err := p.Load("msgChannel.chanNamePrefix.rootCoordDml") + if err != nil { + panic(err) + } + s := []string{p.ClusterChannelPrefix, config} + p.DmlChannelName = strings.Join(s, "-") +} + +func (p *ParamTable) initDeltaChannelName() { + config, err := p.Load("msgChannel.chanNamePrefix.rootCoordDelta") + if err != nil { + config = "rootcoord-delta" + } + s := []string{p.ClusterChannelPrefix, config} + p.DeltaChannelName = strings.Join(s, "-") +}