From 3c2e0375df4cf4fe65ff21dfc4149ce011d956f3 Mon Sep 17 00:00:00 2001 From: congqixia Date: Sun, 18 Feb 2024 14:08:49 +0800 Subject: [PATCH] fix: make compactor inject done called no more than once (#30603) See also #30571 When `compactionExecutor` stops one compaction task, the `stop` method will case `injectDone` called. However in `executeTask` when `compact` method returns error, it shall also invoke `injectDone` as well. That the reason `Unlock of unlocked RWMutex` panicking happened. This PR add sync.Once to make sure that `injectDone` is called only once. We did not remove any of the `injectDone` since removal any of those invocation may cause logic problem. --------- Signed-off-by: Congqi Xia --- internal/datanode/compactor.go | 14 +++++++++----- internal/datanode/compactor_test.go | 21 +++++++++++++++++++++ 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 043405ca4b..fde437dea4 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -19,6 +19,7 @@ package datanode import ( "context" "fmt" + "sync" "time" "github.com/cockroachdb/errors" @@ -81,8 +82,9 @@ type compactionTask struct { ctx context.Context cancel context.CancelFunc - done chan struct{} - tr *timerecord.TimeRecorder + injectDoneOnce sync.Once + done chan struct{} + tr *timerecord.TimeRecorder } func newCompactionTask( @@ -567,9 +569,11 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { } func (t *compactionTask) injectDone() { - for _, binlog := range t.plan.SegmentBinlogs { - t.syncMgr.Unblock(binlog.SegmentID) - } + t.injectDoneOnce.Do(func() { + for _, binlog := range t.plan.SegmentBinlogs { + t.syncMgr.Unblock(binlog.SegmentID) + } + }) } // TODO copy maybe expensive, but this seems to be the only convinent way. diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index b5d084b2f1..9902be4555 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -1095,3 +1095,24 @@ func TestCompactorInterfaceMethods(t *testing.T) { assert.NotEmpty(t, segment.Field2StatslogPaths) }) } + +func TestInjectDone(t *testing.T) { + syncMgr := syncmgr.NewMockSyncManager(t) + + segmentIDs := []int64{100, 200, 300} + task := &compactionTask{ + plan: &datapb.CompactionPlan{ + SegmentBinlogs: lo.Map(segmentIDs, func(id int64, _ int) *datapb.CompactionSegmentBinlogs { + return &datapb.CompactionSegmentBinlogs{SegmentID: id} + }), + }, + syncMgr: syncMgr, + } + + for _, segmentID := range segmentIDs { + syncMgr.EXPECT().Unblock(segmentID).Return().Once() + } + + task.injectDone() + task.injectDone() +}