diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 043405ca4b..fde437dea4 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -19,6 +19,7 @@ package datanode import ( "context" "fmt" + "sync" "time" "github.com/cockroachdb/errors" @@ -81,8 +82,9 @@ type compactionTask struct { ctx context.Context cancel context.CancelFunc - done chan struct{} - tr *timerecord.TimeRecorder + injectDoneOnce sync.Once + done chan struct{} + tr *timerecord.TimeRecorder } func newCompactionTask( @@ -567,9 +569,11 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { } func (t *compactionTask) injectDone() { - for _, binlog := range t.plan.SegmentBinlogs { - t.syncMgr.Unblock(binlog.SegmentID) - } + t.injectDoneOnce.Do(func() { + for _, binlog := range t.plan.SegmentBinlogs { + t.syncMgr.Unblock(binlog.SegmentID) + } + }) } // TODO copy maybe expensive, but this seems to be the only convinent way. diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index b5d084b2f1..9902be4555 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -1095,3 +1095,24 @@ func TestCompactorInterfaceMethods(t *testing.T) { assert.NotEmpty(t, segment.Field2StatslogPaths) }) } + +func TestInjectDone(t *testing.T) { + syncMgr := syncmgr.NewMockSyncManager(t) + + segmentIDs := []int64{100, 200, 300} + task := &compactionTask{ + plan: &datapb.CompactionPlan{ + SegmentBinlogs: lo.Map(segmentIDs, func(id int64, _ int) *datapb.CompactionSegmentBinlogs { + return &datapb.CompactionSegmentBinlogs{SegmentID: id} + }), + }, + syncMgr: syncMgr, + } + + for _, segmentID := range segmentIDs { + syncMgr.EXPECT().Unblock(segmentID).Return().Once() + } + + task.injectDone() + task.injectDone() +}