mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-03 12:29:36 +08:00
enhance: Check segment existence when FlushSegments and add some key logs (#34438)
Check if the segment exists during FlushSegments and add some key logs in write path. issue: https://github.com/milvus-io/milvus/issues/34255 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
05df70973c
commit
43fd8d19c2
@ -198,7 +198,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
|||||||
log.Debug("DDNode receive insert messages",
|
log.Debug("DDNode receive insert messages",
|
||||||
zap.Int64("segmentID", imsg.GetSegmentID()),
|
zap.Int64("segmentID", imsg.GetSegmentID()),
|
||||||
zap.String("channel", ddn.vChannelName),
|
zap.String("channel", ddn.vChannelName),
|
||||||
zap.Int("numRows", len(imsg.GetRowIDs())))
|
zap.Int("numRows", len(imsg.GetRowIDs())),
|
||||||
|
zap.Uint64("startPosTs", msMsg.StartPositions()[0].GetTimestamp()),
|
||||||
|
zap.Uint64("endPosTs", msMsg.EndPositions()[0].GetTimestamp()))
|
||||||
fgMsg.InsertMessages = append(fgMsg.InsertMessages, imsg)
|
fgMsg.InsertMessages = append(fgMsg.InsertMessages, imsg)
|
||||||
|
|
||||||
case commonpb.MsgType_Delete:
|
case commonpb.MsgType_Delete:
|
||||||
|
@ -229,7 +229,7 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tsMessages := []msgstream.TsMsg{getInsertMsg(100, 10000), getInsertMsg(200, 20000)}
|
tsMessages := []msgstream.TsMsg{getInsertMsg(100, 10000), getInsertMsg(200, 20000)}
|
||||||
var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil)
|
var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, []*msgpb.MsgPosition{{Timestamp: 20000}}, []*msgpb.MsgPosition{{Timestamp: 20000}})
|
||||||
|
|
||||||
rt := ddn.Operate([]Msg{msgStreamMsg})
|
rt := ddn.Operate([]Msg{msgStreamMsg})
|
||||||
assert.Equal(t, 1, len(rt[0].(*FlowGraphMsg).InsertMessages))
|
assert.Equal(t, 1, len(rt[0].(*FlowGraphMsg).InsertMessages))
|
||||||
|
@ -179,6 +179,7 @@ func (t *SyncTask) Run(ctx context.Context) (err error) {
|
|||||||
|
|
||||||
if t.isDrop {
|
if t.isDrop {
|
||||||
t.metacache.RemoveSegments(metacache.WithSegmentIDs(t.segment.SegmentID()))
|
t.metacache.RemoveSegments(metacache.WithSegmentIDs(t.segment.SegmentID()))
|
||||||
|
log.Info("segment removed", zap.Int64("segmentID", t.segment.SegmentID()), zap.String("channel", t.channelName))
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("task done", zap.Float64("flushedSize", totalSize))
|
log.Info("task done", zap.Float64("flushedSize", totalSize))
|
||||||
|
@ -311,6 +311,13 @@ func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wb *writeBufferBase) sealSegments(_ context.Context, segmentIDs []int64) error {
|
func (wb *writeBufferBase) sealSegments(_ context.Context, segmentIDs []int64) error {
|
||||||
|
for _, segmentID := range segmentIDs {
|
||||||
|
_, ok := wb.metaCache.GetSegmentByID(segmentID)
|
||||||
|
if !ok {
|
||||||
|
log.Warn("cannot find segment when sealSegments", zap.Int64("segmentID", segmentID), zap.String("channel", wb.channelName))
|
||||||
|
return merr.WrapErrSegmentNotFound(segmentID)
|
||||||
|
}
|
||||||
|
}
|
||||||
// mark segment flushing if segment was growing
|
// mark segment flushing if segment was growing
|
||||||
wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Sealed),
|
wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Sealed),
|
||||||
metacache.WithSegmentIDs(segmentIDs...),
|
metacache.WithSegmentIDs(segmentIDs...),
|
||||||
@ -542,6 +549,7 @@ func (wb *writeBufferBase) bufferInsert(inData *inData, startPos, endPos *msgpb.
|
|||||||
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet {
|
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||||
return metacache.NewBloomFilterSetWithBatchSize(wb.getEstBatchSize())
|
return metacache.NewBloomFilterSetWithBatchSize(wb.getEstBatchSize())
|
||||||
}, metacache.SetStartPosRecorded(false))
|
}, metacache.SetStartPosRecorded(false))
|
||||||
|
log.Info("add growing segment", zap.Int64("segmentID", inData.segmentID), zap.String("channel", wb.channelName))
|
||||||
}
|
}
|
||||||
|
|
||||||
segBuf := wb.getOrCreateBuffer(inData.segmentID)
|
segBuf := wb.getOrCreateBuffer(inData.segmentID)
|
||||||
|
@ -112,6 +112,7 @@ func (s *WriteBufferSuite) TestFlushSegments() {
|
|||||||
segmentID := int64(1001)
|
segmentID := int64(1001)
|
||||||
|
|
||||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
|
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
|
||||||
|
s.metacache.EXPECT().GetSegmentByID(mock.Anything, mock.Anything, mock.Anything).Return(nil, true)
|
||||||
|
|
||||||
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
|
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
|
@ -111,8 +111,8 @@ func NewDispatcher(ctx context.Context,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
posTime := tsoutil.PhysicalTime(position.GetTimestamp())
|
posTime := tsoutil.PhysicalTime(position.GetTimestamp())
|
||||||
log.Info("seek successfully", zap.Time("posTime", posTime),
|
log.Info("seek successfully", zap.Uint64("posTs", position.GetTimestamp()),
|
||||||
zap.Duration("tsLag", time.Since(posTime)))
|
zap.Time("posTime", posTime), zap.Duration("tsLag", time.Since(posTime)))
|
||||||
} else {
|
} else {
|
||||||
err := stream.AsConsumer(ctx, []string{pchannel}, subName, subPos)
|
err := stream.AsConsumer(ctx, []string{pchannel}, subName, subPos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -205,6 +205,7 @@ func (c *dispatcherManager) tryMerge() {
|
|||||||
delete(candidates, vchannel)
|
delete(candidates, vchannel)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
mergeTs := c.mainDispatcher.CurTs()
|
||||||
for vchannel := range candidates {
|
for vchannel := range candidates {
|
||||||
t, err := c.soloDispatchers[vchannel].GetTarget(vchannel)
|
t, err := c.soloDispatchers[vchannel].GetTarget(vchannel)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -215,7 +216,7 @@ func (c *dispatcherManager) tryMerge() {
|
|||||||
c.deleteMetric(vchannel)
|
c.deleteMetric(vchannel)
|
||||||
}
|
}
|
||||||
c.mainDispatcher.Handle(resume)
|
c.mainDispatcher.Handle(resume)
|
||||||
log.Info("merge done", zap.Any("vchannel", candidates))
|
log.Info("merge done", zap.Any("vchannel", candidates), zap.Uint64("mergeTs", mergeTs))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *dispatcherManager) split(t *target) {
|
func (c *dispatcherManager) split(t *target) {
|
||||||
|
Loading…
Reference in New Issue
Block a user