Fix logs of compaction_executor (#15365)

- Remove not useful log in `stopExecutingtaskByVChannelName`
- Add planID info in `mergeFlushedSegments`

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2022-01-25 13:29:40 +08:00 committed by GitHub
parent 3f48a40ab0
commit 8ab9d769de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 6 additions and 6 deletions

View File

@ -109,7 +109,7 @@ func (c *compactionExecutor) executeTask(task compactor) {
func (c *compactionExecutor) stopTask(planID UniqueID) {
task, loaded := c.executing.LoadAndDelete(planID)
if loaded {
log.Warn("compaction executor stop task", zap.Int64("planID", planID))
log.Warn("compaction executor stop task", zap.Int64("planID", planID), zap.String("vChannelName", task.(compactor).getChannelName()))
task.(compactor).stop()
}
}
@ -126,7 +126,6 @@ func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string
if value.(compactor).getChannelName() == vChannelName {
c.stopTask(key.(UniqueID))
}
log.Warn(value.(compactor).getChannelName())
return true
})
}

View File

@ -461,7 +461,7 @@ func (t *compactionTask) compact() error {
t.refreshFlushedSegStatistics(targetSegID, numRows)
// no need to shorten the PK range of a segment, deleting dup PKs is valid
} else {
t.mergeFlushedSegments(targetSegID, collID, partID, segIDs, t.plan.GetChannel(), numRows)
t.mergeFlushedSegments(targetSegID, collID, partID, t.plan.GetPlanID(), segIDs, t.plan.GetChannel(), numRows)
}
ti.injectDone(true)

View File

@ -60,7 +60,7 @@ type Replica interface {
updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition)
updateSegmentCheckPoint(segID UniqueID)
updateSegmentPKRange(segID UniqueID, rowIDs []int64)
mergeFlushedSegments(segID, collID, partID UniqueID, compactedFrom []UniqueID, channelName string, numOfRows int64)
mergeFlushedSegments(segID, collID, partID, planID UniqueID, compactedFrom []UniqueID, channelName string, numOfRows int64)
hasSegment(segID UniqueID, countFlushed bool) bool
removeSegments(segID ...UniqueID)
listCompactedSegmentIDs() map[UniqueID][]UniqueID
@ -688,7 +688,7 @@ func (replica *SegmentReplica) updateSegmentCheckPoint(segID UniqueID) {
log.Warn("There's no segment", zap.Int64("ID", segID))
}
func (replica *SegmentReplica) mergeFlushedSegments(segID, collID, partID UniqueID, compactedFrom []UniqueID, channelName string, numOfRows int64) {
func (replica *SegmentReplica) mergeFlushedSegments(segID, collID, partID, planID UniqueID, compactedFrom []UniqueID, channelName string, numOfRows int64) {
if collID != replica.collectionID {
log.Warn("Mismatch collection",
zap.Int64("input ID", collID),
@ -697,6 +697,7 @@ func (replica *SegmentReplica) mergeFlushedSegments(segID, collID, partID Unique
}
log.Debug("merge flushed segments",
zap.Int64("planID", planID),
zap.Int64("compacted To segmentID", segID),
zap.Int64s("compacted From segmentIDs", compactedFrom),
zap.Int64("partition ID", partID),

View File

@ -628,7 +628,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
require.True(t, sr.hasSegment(1, true))
require.True(t, sr.hasSegment(2, true))
sr.mergeFlushedSegments(3, 1, 0, []UniqueID{1, 2}, "channel", 15)
sr.mergeFlushedSegments(3, 1, 0, 100, []UniqueID{1, 2}, "channel", 15)
assert.True(t, sr.hasSegment(3, true))
assert.False(t, sr.hasSegment(1, true))
assert.False(t, sr.hasSegment(2, true))