Fix metric recording auto flush segments twice (#19967)

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2022-10-24 10:51:30 +08:00 committed by GitHub
parent 4e5d1c2e5f
commit eaae3aa49e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -238,29 +238,26 @@ func (ibNode *insertBufferNode) DisplayStatistics(seg2Upload []UniqueID) {
}
}
func (ibNode *insertBufferNode) Flush(fgMsg *flowGraphMsg, seg2Upload []UniqueID, endPosition *internalpb.MsgPosition) []UniqueID {
type flushTask struct {
buffer *BufferData
segmentID UniqueID
flushed bool
dropped bool
auto bool
}
type flushTask struct {
buffer *BufferData
segmentID UniqueID
flushed bool
dropped bool
auto bool
}
var (
flushTaskList []flushTask
segmentsToFlush []UniqueID
)
func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload []UniqueID) []flushTask {
var flushTasks []flushTask
if fgMsg.dropCollection {
segmentsToFlush := ibNode.channel.listAllSegmentIDs()
segmentIDs := ibNode.channel.listAllSegmentIDs()
log.Info("Receive drop collection request and flushing all segments",
zap.Any("segments", segmentsToFlush),
zap.Any("segments", segmentIDs),
zap.String("channel", ibNode.channelName),
)
flushTaskList = make([]flushTask, 0, len(segmentsToFlush))
flushTasks = make([]flushTask, 0, len(segmentIDs))
for _, seg2Flush := range segmentsToFlush {
for _, seg2Flush := range segmentIDs {
var buf *BufferData
bd, ok := ibNode.insertBuffer.Load(seg2Flush)
if !ok {
@ -268,99 +265,103 @@ func (ibNode *insertBufferNode) Flush(fgMsg *flowGraphMsg, seg2Upload []UniqueID
} else {
buf = bd.(*BufferData)
}
flushTaskList = append(flushTaskList, flushTask{
flushTasks = append(flushTasks, flushTask{
buffer: buf,
segmentID: seg2Flush,
flushed: false,
dropped: true,
})
}
} else {
segmentsToFlush = make([]UniqueID, 0, len(seg2Upload)+1) //auto flush number + possible manual flush
flushTaskList = make([]flushTask, 0, len(seg2Upload)+1)
return flushTasks
}
// Auto Syncing
for _, segToFlush := range seg2Upload {
if bd, ok := ibNode.insertBuffer.Load(segToFlush); ok && bd.(*BufferData).effectiveCap() <= 0 {
log.Info("(Auto Flush)",
zap.Int64("segment id", segToFlush),
zap.String("vchannel name", ibNode.channelName),
)
ibuffer := bd.(*BufferData)
flushTaskList = append(flushTaskList, flushTask{
buffer: ibuffer,
segmentID: segToFlush,
flushed: false,
dropped: false,
auto: true,
})
}
}
mergeFlushTask := func(segmentID UniqueID, setupTask func(task *flushTask)) {
// Merge auto & manual flush tasks with the same segment ID.
dup := false
for i, task := range flushTaskList {
if task.segmentID == segmentID {
log.Info("merging flush task, updating flushed flag",
zap.Int64("segment ID", segmentID))
setupTask(&flushTaskList[i])
dup = true
break
}
}
// Load buffer and create new flush task if there's no existing flush task for this segment.
if !dup {
bd, ok := ibNode.insertBuffer.Load(segmentID)
var buf *BufferData
if ok {
buf = bd.(*BufferData)
}
task := flushTask{
buffer: buf,
segmentID: segmentID,
dropped: false,
}
setupTask(&task)
flushTaskList = append(flushTaskList, task)
}
}
// Manual Syncing
select {
case fmsg := <-ibNode.flushChan:
log.Info("(Manual Flush) receiving flush message",
zap.Int64("segmentID", fmsg.segmentID),
zap.Int64("collectionID", fmsg.collectionID),
zap.Bool("flushed", fmsg.flushed),
zap.String("v-channel name", ibNode.channelName),
// Auto Syncing
for _, segToFlush := range seg2Upload {
if bd, ok := ibNode.insertBuffer.Load(segToFlush); ok && bd.(*BufferData).effectiveCap() <= 0 {
log.Info("(Auto Flush)",
zap.Int64("segment id", segToFlush),
zap.String("vchannel name", ibNode.channelName),
)
mergeFlushTask(fmsg.segmentID, func(task *flushTask) {
task.flushed = fmsg.flushed
ibuffer := bd.(*BufferData)
flushTasks = append(flushTasks, flushTask{
buffer: ibuffer,
segmentID: segToFlush,
flushed: false,
dropped: false,
auto: true,
})
default:
}
// process drop partition
for _, partitionDrop := range fgMsg.dropPartitions {
segmentIDs := ibNode.channel.listPartitionSegments(partitionDrop)
log.Info("(Drop Partition) process drop partition",
zap.Int64("collectionID", ibNode.channel.getCollectionID()),
zap.Int64("partitionID", partitionDrop),
zap.Int64s("segmentIDs", segmentIDs),
zap.String("v-channel name", ibNode.channelName),
)
for _, segID := range segmentIDs {
mergeFlushTask(segID, func(task *flushTask) {
task.flushed = true
task.dropped = true
})
}
}
}
for _, task := range flushTaskList {
mergeFlushTask := func(segmentID UniqueID, setupTask func(task *flushTask)) {
// Merge auto & manual flush tasks with the same segment ID.
dup := false
for i, task := range flushTasks {
if task.segmentID == segmentID {
log.Info("merging flush task, updating flushed flag",
zap.Int64("segment ID", segmentID))
setupTask(&flushTasks[i])
dup = true
break
}
}
// Load buffer and create new flush task if there's no existing flush task for this segment.
if !dup {
bd, ok := ibNode.insertBuffer.Load(segmentID)
var buf *BufferData
if ok {
buf = bd.(*BufferData)
}
task := flushTask{
buffer: buf,
segmentID: segmentID,
dropped: false,
}
setupTask(&task)
flushTasks = append(flushTasks, task)
}
}
// Manual Syncing
select {
case fmsg := <-ibNode.flushChan:
log.Info("(Manual Flush) receiving flush message",
zap.Int64("segmentID", fmsg.segmentID),
zap.Int64("collectionID", fmsg.collectionID),
zap.Bool("flushed", fmsg.flushed),
zap.String("v-channel name", ibNode.channelName),
)
mergeFlushTask(fmsg.segmentID, func(task *flushTask) {
task.flushed = fmsg.flushed
})
default:
}
// process drop partition
for _, partitionDrop := range fgMsg.dropPartitions {
segmentIDs := ibNode.channel.listPartitionSegments(partitionDrop)
log.Info("(Drop Partition) process drop partition",
zap.Int64("collectionID", ibNode.channel.getCollectionID()),
zap.Int64("partitionID", partitionDrop),
zap.Int64s("segmentIDs", segmentIDs),
zap.String("v-channel name", ibNode.channelName),
)
for _, segID := range segmentIDs {
mergeFlushTask(segID, func(task *flushTask) {
task.flushed = true
task.dropped = true
})
}
}
return flushTasks
}
func (ibNode *insertBufferNode) Flush(fgMsg *flowGraphMsg, seg2Upload []UniqueID, endPosition *internalpb.MsgPosition) []UniqueID {
flushTasks := ibNode.FillInSyncTasks(fgMsg, seg2Upload)
segmentsToFlush := make([]UniqueID, 0, len(flushTasks))
for _, task := range flushTasks {
log.Debug("insertBufferNode flushing BufferData",
zap.Int64("segmentID", task.segmentID),
zap.Bool("flushed", task.flushed),