From 2e6ddd7f2a371a4d3e2b8e330233db7a59fcb293 Mon Sep 17 00:00:00 2001 From: congqixia Date: Sat, 22 Oct 2022 12:09:28 +0800 Subject: [PATCH] Add datanode&channel match check for Flush (#19985) Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- internal/datacoord/cluster.go | 84 +++++++------------ internal/datacoord/cluster_test.go | 16 +++- internal/datacoord/server.go | 10 ++- .../datanode/flow_graph_insert_buffer_node.go | 1 + 4 files changed, 51 insertions(+), 60 deletions(-) diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index a336e273e2..9315c34547 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -18,12 +18,14 @@ package datacoord import ( "context" + "fmt" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/util/commonpbutil" + "github.com/samber/lo" "go.uber.org/zap" ) @@ -80,64 +82,36 @@ func (c *Cluster) Watch(ch string, collectionID UniqueID) error { return c.channelManager.Watch(&channel{Name: ch, CollectionID: collectionID}) } -// Flush sends flush requests to corresponding dataNodes according to channels where segments are assigned to. -func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, markSegments []*datapb.SegmentInfo) { - channels := c.channelManager.GetChannels() - nodeSegments := make(map[int64][]int64) - nodeMarks := make(map[int64][]int64) - channelNodes := make(map[string]int64) - targetNodes := make(map[int64]struct{}) - // channel -> node - for _, c := range channels { - for _, ch := range c.Channels { - channelNodes[ch.Name] = c.NodeID - } - } - // collectionID shall be the same in single Flush call - var collectionID int64 - // find node on which segment exists - for _, segment := range segments { - collectionID = segment.CollectionID - nodeID, ok := channelNodes[segment.GetInsertChannel()] - if !ok { - log.Warn("channel is not allocated to any node", zap.String("channel", segment.GetInsertChannel())) - continue - } - nodeSegments[nodeID] = append(nodeSegments[nodeID], segment.GetID()) - targetNodes[nodeID] = struct{}{} - } - for _, segment := range markSegments { - collectionID = segment.CollectionID - nodeID, ok := channelNodes[segment.GetInsertChannel()] - if !ok { - log.Warn("channel is not allocated to any node", zap.String("channel", segment.GetInsertChannel())) - continue - } - nodeMarks[nodeID] = append(nodeMarks[nodeID], segment.GetID()) - targetNodes[nodeID] = struct{}{} +// Flush sends flush requests to dataNodes specified +// which also according to channels where segments are assigned to. +func (c *Cluster) Flush(ctx context.Context, nodeID int64, channel string, + segments []*datapb.SegmentInfo, markSegments []*datapb.SegmentInfo) error { + if !c.channelManager.Match(nodeID, channel) { + log.Warn("node is not matched with channel", + zap.String("channel", channel), + zap.Int64("nodeID", nodeID), + ) + return fmt.Errorf("channel %s is not watched on node %d", channel, nodeID) } - for nodeID := range targetNodes { - segments := nodeSegments[nodeID] - marks := nodeMarks[nodeID] - if len(segments)+len(marks) == 0 { // no segment for this node - continue - } - req := &datapb.FlushSegmentsRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_Flush), - commonpbutil.WithSourceID(Params.DataCoordCfg.GetNodeID()), - ), - CollectionID: collectionID, - SegmentIDs: segments, - MarkSegmentIDs: marks, - } - log.Info("calling dataNode to flush", - zap.Int64("dataNode ID", nodeID), - zap.Int64s("segments", segments), - zap.Int64s("marks", marks)) - c.sessionManager.Flush(ctx, nodeID, req) + ch := c.channelManager.getChannelByNodeAndName(nodeID, channel) + + getSegmentID := func(segment *datapb.SegmentInfo, _ int) int64 { + return segment.GetID() } + + req := &datapb.FlushSegmentsRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_Flush), + commonpbutil.WithSourceID(Params.DataCoordCfg.GetNodeID()), + ), + CollectionID: ch.CollectionID, + SegmentIDs: lo.Map(segments, getSegmentID), + MarkSegmentIDs: lo.Map(markSegments, getSegmentID), + } + + c.sessionManager.Flush(ctx, nodeID, req) + return nil } // Import sends import requests to DataNodes whose ID==nodeID. diff --git a/internal/datacoord/cluster_test.go b/internal/datacoord/cluster_test.go index 87ed0334bd..57e4e5c4c7 100644 --- a/internal/datacoord/cluster_test.go +++ b/internal/datacoord/cluster_test.go @@ -535,14 +535,24 @@ func TestCluster_Flush(t *testing.T) { // flush empty should impact nothing assert.NotPanics(t, func() { - cluster.Flush(context.Background(), []*datapb.SegmentInfo{}, []*datapb.SegmentInfo{}) + err := cluster.Flush(context.Background(), 1, "chan-1", []*datapb.SegmentInfo{}, []*datapb.SegmentInfo{}) + assert.NoError(t, err) }) // flush not watched channel assert.NotPanics(t, func() { - cluster.Flush(context.Background(), []*datapb.SegmentInfo{{ID: 1, InsertChannel: "chan-2"}}, - []*datapb.SegmentInfo{{ID: 2, InsertChannel: "chan-3"}}) + err := cluster.Flush(context.Background(), 1, "chan-2", []*datapb.SegmentInfo{{ID: 1}}, + []*datapb.SegmentInfo{{ID: 2}}) + assert.Error(t, err) }) + + // flush from wrong datanode + assert.NotPanics(t, func() { + err := cluster.Flush(context.Background(), 2, "chan-1", []*datapb.SegmentInfo{{ID: 1}}, + []*datapb.SegmentInfo{{ID: 3}}) + assert.Error(t, err) + }) + //TODO add a method to verify datanode has flush request after client injection is available } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 9ecea3fe3d..1b675ea366 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -577,7 +577,8 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat log.Info("start flushing segments", zap.Int64s("segment IDs", flushableIDs), zap.Int("# of stale/mark segments", len(staleSegments))) - + // update segment last update triggered time + // it's ok to fail flushing, since next timetick after duration will re-trigger s.setLastFlushTime(flushableSegments) s.setLastFlushTime(staleSegments) @@ -588,7 +589,12 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat for _, info := range staleSegments { minfo = append(minfo, info.SegmentInfo) } - s.cluster.Flush(s.ctx, finfo, minfo) + err = s.cluster.Flush(s.ctx, ttMsg.GetBase().GetSourceID(), ch, finfo, minfo) + if err != nil { + log.Warn("handle") + return err + } + return nil } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 90f54242f9..77d93ece7c 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -610,6 +610,7 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan fl commonpbutil.WithMsgType(commonpb.MsgType_DataNodeTt), commonpbutil.WithMsgID(0), commonpbutil.WithTimeStamp(ts), + commonpbutil.WithSourceID(Params.DataNodeCfg.GetNodeID()), ), ChannelName: config.vChannelName, Timestamp: ts,