mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 03:18:29 +08:00
Add more specific logs (#11724)
Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
4d58ff2df7
commit
f3852c1db0
@ -168,12 +168,17 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
|
|||||||
Position: pack.pos,
|
Position: pack.pos,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
startPos := dsService.replica.listNewSegmentsStartPositions()
|
||||||
|
|
||||||
log.Debug("SaveBinlogPath",
|
log.Debug("SaveBinlogPath",
|
||||||
zap.Int64("SegmentID", pack.segmentID),
|
zap.Int64("SegmentID", pack.segmentID),
|
||||||
zap.Int64("CollectionID", dsService.collectionID),
|
zap.Int64("CollectionID", dsService.collectionID),
|
||||||
|
zap.Bool("IsFlushed", pack.flushed),
|
||||||
|
zap.Bool("IsDropped", pack.dropped),
|
||||||
zap.Int("Length of Field2BinlogPaths", len(fieldInsert)),
|
zap.Int("Length of Field2BinlogPaths", len(fieldInsert)),
|
||||||
zap.Int("Length of Field2Stats", len(fieldStats)),
|
zap.Int("Length of Field2Stats", len(fieldStats)),
|
||||||
zap.Int("Length of Field2Deltalogs", len(deltaInfos)),
|
zap.Int("Length of Field2Deltalogs", len(deltaInfos)),
|
||||||
|
zap.Any("Listed start positions", startPos),
|
||||||
)
|
)
|
||||||
|
|
||||||
req := &datapb.SaveBinlogPathsRequest{
|
req := &datapb.SaveBinlogPathsRequest{
|
||||||
@ -189,14 +194,15 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
|
|||||||
Field2StatslogPaths: fieldStats,
|
Field2StatslogPaths: fieldStats,
|
||||||
Deltalogs: deltaInfos,
|
Deltalogs: deltaInfos,
|
||||||
|
|
||||||
CheckPoints: checkPoints,
|
CheckPoints: checkPoints,
|
||||||
|
StartPositions: startPos,
|
||||||
|
|
||||||
StartPositions: dsService.replica.listNewSegmentsStartPositions(),
|
Flushed: pack.flushed,
|
||||||
Flushed: pack.flushed,
|
Dropped: pack.dropped,
|
||||||
Dropped: pack.dropped,
|
|
||||||
}
|
}
|
||||||
rsp, err := dsService.dataCoord.SaveBinlogPaths(context.Background(), req)
|
rsp, err := dsService.dataCoord.SaveBinlogPaths(context.Background(), req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Warn(err.Error())
|
||||||
return fmt.Errorf(err.Error())
|
return fmt.Errorf(err.Error())
|
||||||
}
|
}
|
||||||
if rsp.ErrorCode != commonpb.ErrorCode_Success {
|
if rsp.ErrorCode != commonpb.ErrorCode_Success {
|
||||||
|
@ -58,6 +58,7 @@ type ddNode struct {
|
|||||||
|
|
||||||
segID2SegInfo sync.Map // segment ID to *SegmentInfo
|
segID2SegInfo sync.Map // segment ID to *SegmentInfo
|
||||||
flushedSegments []*datapb.SegmentInfo
|
flushedSegments []*datapb.SegmentInfo
|
||||||
|
vchannelName string
|
||||||
|
|
||||||
deltaMsgStream msgstream.MsgStream
|
deltaMsgStream msgstream.MsgStream
|
||||||
dropMode atomic.Value
|
dropMode atomic.Value
|
||||||
@ -91,7 +92,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if load := ddn.dropMode.Load(); load != nil && load.(bool) {
|
if load := ddn.dropMode.Load(); load != nil && load.(bool) {
|
||||||
log.Debug("ddNode in dropMode")
|
log.Debug("ddNode in dropMode",
|
||||||
|
zap.String("vchannel name", ddn.vchannelName),
|
||||||
|
zap.Int64("collection ID", ddn.collectionID))
|
||||||
return []Msg{}
|
return []Msg{}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -274,6 +277,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI
|
|||||||
BaseNode: baseNode,
|
BaseNode: baseNode,
|
||||||
collectionID: collID,
|
collectionID: collID,
|
||||||
flushedSegments: fs,
|
flushedSegments: fs,
|
||||||
|
vchannelName: vchanInfo.ChannelName,
|
||||||
deltaMsgStream: deltaMsgStream,
|
deltaMsgStream: deltaMsgStream,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -279,64 +279,62 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
|
|||||||
dropped: true,
|
dropped: true,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
goto flush // Jump over the auto-flush and manual flush procedure
|
} else {
|
||||||
}
|
segmentsToFlush = make([]UniqueID, 0, len(seg2Upload)+1) //auto flush number + possible manual flush
|
||||||
|
flushTaskList = make([]flushTask, 0, len(seg2Upload)+1)
|
||||||
|
|
||||||
segmentsToFlush = make([]UniqueID, 0, len(seg2Upload)+1) //auto flush number + possible manual flush
|
// Auto Flush
|
||||||
flushTaskList = make([]flushTask, 0, len(seg2Upload)+1)
|
for _, segToFlush := range seg2Upload {
|
||||||
|
// If full, auto flush
|
||||||
|
if bd, ok := ibNode.insertBuffer.Load(segToFlush); ok && bd.(*BufferData).effectiveCap() <= 0 {
|
||||||
|
log.Warn("Auto flush", zap.Int64("segment id", segToFlush))
|
||||||
|
ibuffer := bd.(*BufferData)
|
||||||
|
|
||||||
// Auto Flush
|
flushTaskList = append(flushTaskList, flushTask{
|
||||||
for _, segToFlush := range seg2Upload {
|
buffer: ibuffer,
|
||||||
// If full, auto flush
|
segmentID: segToFlush,
|
||||||
if bd, ok := ibNode.insertBuffer.Load(segToFlush); ok && bd.(*BufferData).effectiveCap() <= 0 {
|
flushed: false,
|
||||||
log.Warn("Auto flush", zap.Int64("segment id", segToFlush))
|
dropped: false,
|
||||||
ibuffer := bd.(*BufferData)
|
})
|
||||||
|
|
||||||
flushTaskList = append(flushTaskList, flushTask{
|
|
||||||
buffer: ibuffer,
|
|
||||||
segmentID: segToFlush,
|
|
||||||
flushed: false,
|
|
||||||
dropped: false,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Manual Flush
|
|
||||||
select {
|
|
||||||
case fmsg := <-ibNode.flushChan:
|
|
||||||
|
|
||||||
log.Debug(". Receiving flush message",
|
|
||||||
zap.Int64("segmentID", fmsg.segmentID),
|
|
||||||
zap.Int64("collectionID", fmsg.collectionID),
|
|
||||||
)
|
|
||||||
// merging auto&manual flush segment same segment id
|
|
||||||
dup := false
|
|
||||||
for i, task := range flushTaskList {
|
|
||||||
if task.segmentID == fmsg.segmentID {
|
|
||||||
flushTaskList[i].flushed = fmsg.flushed
|
|
||||||
dup = true
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// if merged, skip load buffer and create task
|
|
||||||
if !dup {
|
// Manual Flush
|
||||||
currentSegID := fmsg.segmentID
|
select {
|
||||||
bd, ok := ibNode.insertBuffer.Load(currentSegID)
|
case fmsg := <-ibNode.flushChan:
|
||||||
var buf *BufferData
|
|
||||||
if ok {
|
log.Debug(". Receiving flush message",
|
||||||
buf = bd.(*BufferData)
|
zap.Int64("segmentID", fmsg.segmentID),
|
||||||
|
zap.Int64("collectionID", fmsg.collectionID),
|
||||||
|
)
|
||||||
|
// merging auto&manual flush segment same segment id
|
||||||
|
dup := false
|
||||||
|
for i, task := range flushTaskList {
|
||||||
|
if task.segmentID == fmsg.segmentID {
|
||||||
|
flushTaskList[i].flushed = fmsg.flushed
|
||||||
|
dup = true
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
flushTaskList = append(flushTaskList, flushTask{
|
// if merged, skip load buffer and create task
|
||||||
buffer: buf,
|
if !dup {
|
||||||
segmentID: currentSegID,
|
currentSegID := fmsg.segmentID
|
||||||
flushed: fmsg.flushed,
|
bd, ok := ibNode.insertBuffer.Load(currentSegID)
|
||||||
dropped: false,
|
var buf *BufferData
|
||||||
})
|
if ok {
|
||||||
|
buf = bd.(*BufferData)
|
||||||
|
}
|
||||||
|
flushTaskList = append(flushTaskList, flushTask{
|
||||||
|
buffer: buf,
|
||||||
|
segmentID: currentSegID,
|
||||||
|
flushed: fmsg.flushed,
|
||||||
|
dropped: false,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
|
|
||||||
flush:
|
|
||||||
for _, task := range flushTaskList {
|
for _, task := range flushTaskList {
|
||||||
err := ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, task.flushed, task.dropped, endPositions[0])
|
err := ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, task.flushed, task.dropped, endPositions[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user