From 43fd8d19c22f12fcf2814f6a01ca6c7849e0c071 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Sat, 6 Jul 2024 08:50:11 +0800 Subject: [PATCH] enhance: Check segment existence when FlushSegments and add some key logs (#34438) Check if the segment exists during FlushSegments and add some key logs in write path. issue: https://github.com/milvus-io/milvus/issues/34255 --------- Signed-off-by: bigsheeper --- internal/datanode/pipeline/flow_graph_dd_node.go | 4 +++- internal/datanode/pipeline/flow_graph_dd_node_test.go | 2 +- internal/datanode/syncmgr/task.go | 1 + internal/datanode/writebuffer/write_buffer.go | 8 ++++++++ internal/datanode/writebuffer/write_buffer_test.go | 1 + pkg/mq/msgdispatcher/dispatcher.go | 4 ++-- pkg/mq/msgdispatcher/manager.go | 3 ++- 7 files changed, 18 insertions(+), 5 deletions(-) diff --git a/internal/datanode/pipeline/flow_graph_dd_node.go b/internal/datanode/pipeline/flow_graph_dd_node.go index f5264161d2..6dc596d4ad 100644 --- a/internal/datanode/pipeline/flow_graph_dd_node.go +++ b/internal/datanode/pipeline/flow_graph_dd_node.go @@ -198,7 +198,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { log.Debug("DDNode receive insert messages", zap.Int64("segmentID", imsg.GetSegmentID()), zap.String("channel", ddn.vChannelName), - zap.Int("numRows", len(imsg.GetRowIDs()))) + zap.Int("numRows", len(imsg.GetRowIDs())), + zap.Uint64("startPosTs", msMsg.StartPositions()[0].GetTimestamp()), + zap.Uint64("endPosTs", msMsg.EndPositions()[0].GetTimestamp())) fgMsg.InsertMessages = append(fgMsg.InsertMessages, imsg) case commonpb.MsgType_Delete: diff --git a/internal/datanode/pipeline/flow_graph_dd_node_test.go b/internal/datanode/pipeline/flow_graph_dd_node_test.go index ee05b72d80..794c6f5cab 100644 --- a/internal/datanode/pipeline/flow_graph_dd_node_test.go +++ b/internal/datanode/pipeline/flow_graph_dd_node_test.go @@ -229,7 +229,7 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) { } tsMessages := []msgstream.TsMsg{getInsertMsg(100, 10000), getInsertMsg(200, 20000)} - var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil) + var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, []*msgpb.MsgPosition{{Timestamp: 20000}}, []*msgpb.MsgPosition{{Timestamp: 20000}}) rt := ddn.Operate([]Msg{msgStreamMsg}) assert.Equal(t, 1, len(rt[0].(*FlowGraphMsg).InsertMessages)) diff --git a/internal/datanode/syncmgr/task.go b/internal/datanode/syncmgr/task.go index 26bfc81089..b6c07a781b 100644 --- a/internal/datanode/syncmgr/task.go +++ b/internal/datanode/syncmgr/task.go @@ -179,6 +179,7 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { if t.isDrop { t.metacache.RemoveSegments(metacache.WithSegmentIDs(t.segment.SegmentID())) + log.Info("segment removed", zap.Int64("segmentID", t.segment.SegmentID()), zap.String("channel", t.channelName)) } log.Info("task done", zap.Float64("flushedSize", totalSize)) diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index 900bd54d0d..f5ecf9d85a 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -311,6 +311,13 @@ func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) { } func (wb *writeBufferBase) sealSegments(_ context.Context, segmentIDs []int64) error { + for _, segmentID := range segmentIDs { + _, ok := wb.metaCache.GetSegmentByID(segmentID) + if !ok { + log.Warn("cannot find segment when sealSegments", zap.Int64("segmentID", segmentID), zap.String("channel", wb.channelName)) + return merr.WrapErrSegmentNotFound(segmentID) + } + } // mark segment flushing if segment was growing wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Sealed), metacache.WithSegmentIDs(segmentIDs...), @@ -542,6 +549,7 @@ func (wb *writeBufferBase) bufferInsert(inData *inData, startPos, endPos *msgpb. }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSetWithBatchSize(wb.getEstBatchSize()) }, metacache.SetStartPosRecorded(false)) + log.Info("add growing segment", zap.Int64("segmentID", inData.segmentID), zap.String("channel", wb.channelName)) } segBuf := wb.getOrCreateBuffer(inData.segmentID) diff --git a/internal/datanode/writebuffer/write_buffer_test.go b/internal/datanode/writebuffer/write_buffer_test.go index c507d70162..27fbf90407 100644 --- a/internal/datanode/writebuffer/write_buffer_test.go +++ b/internal/datanode/writebuffer/write_buffer_test.go @@ -112,6 +112,7 @@ func (s *WriteBufferSuite) TestFlushSegments() { segmentID := int64(1001) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() + s.metacache.EXPECT().GetSegmentByID(mock.Anything, mock.Anything, mock.Anything).Return(nil, true) wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle)) s.NoError(err) diff --git a/pkg/mq/msgdispatcher/dispatcher.go b/pkg/mq/msgdispatcher/dispatcher.go index 34a84b04de..690301ecf6 100644 --- a/pkg/mq/msgdispatcher/dispatcher.go +++ b/pkg/mq/msgdispatcher/dispatcher.go @@ -111,8 +111,8 @@ func NewDispatcher(ctx context.Context, return nil, err } posTime := tsoutil.PhysicalTime(position.GetTimestamp()) - log.Info("seek successfully", zap.Time("posTime", posTime), - zap.Duration("tsLag", time.Since(posTime))) + log.Info("seek successfully", zap.Uint64("posTs", position.GetTimestamp()), + zap.Time("posTime", posTime), zap.Duration("tsLag", time.Since(posTime))) } else { err := stream.AsConsumer(ctx, []string{pchannel}, subName, subPos) if err != nil { diff --git a/pkg/mq/msgdispatcher/manager.go b/pkg/mq/msgdispatcher/manager.go index 195b588dce..902d5ee5d2 100644 --- a/pkg/mq/msgdispatcher/manager.go +++ b/pkg/mq/msgdispatcher/manager.go @@ -205,6 +205,7 @@ func (c *dispatcherManager) tryMerge() { delete(candidates, vchannel) } } + mergeTs := c.mainDispatcher.CurTs() for vchannel := range candidates { t, err := c.soloDispatchers[vchannel].GetTarget(vchannel) if err == nil { @@ -215,7 +216,7 @@ func (c *dispatcherManager) tryMerge() { c.deleteMetric(vchannel) } c.mainDispatcher.Handle(resume) - log.Info("merge done", zap.Any("vchannel", candidates)) + log.Info("merge done", zap.Any("vchannel", candidates), zap.Uint64("mergeTs", mergeTs)) } func (c *dispatcherManager) split(t *target) {