fix: make compactor inject done called no more than once (#30603)

See also #30571

When `compactionExecutor` stops one compaction task, the `stop` method
will case `injectDone` called.

However in `executeTask` when `compact` method returns error, it shall
also invoke `injectDone` as well. That the reason `Unlock of unlocked
RWMutex` panicking happened.

This PR add sync.Once to make sure that `injectDone` is called only
once. We did not remove any of the `injectDone` since removal any of
those invocation may cause logic problem.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-02-18 14:08:49 +08:00 committed by GitHub
parent 91b02b5d22
commit 3c2e0375df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 30 additions and 5 deletions

View File

@ -19,6 +19,7 @@ package datanode
import (
"context"
"fmt"
"sync"
"time"
"github.com/cockroachdb/errors"
@ -81,8 +82,9 @@ type compactionTask struct {
ctx context.Context
cancel context.CancelFunc
done chan struct{}
tr *timerecord.TimeRecorder
injectDoneOnce sync.Once
done chan struct{}
tr *timerecord.TimeRecorder
}
func newCompactionTask(
@ -567,9 +569,11 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) {
}
func (t *compactionTask) injectDone() {
for _, binlog := range t.plan.SegmentBinlogs {
t.syncMgr.Unblock(binlog.SegmentID)
}
t.injectDoneOnce.Do(func() {
for _, binlog := range t.plan.SegmentBinlogs {
t.syncMgr.Unblock(binlog.SegmentID)
}
})
}
// TODO copy maybe expensive, but this seems to be the only convinent way.

View File

@ -1095,3 +1095,24 @@ func TestCompactorInterfaceMethods(t *testing.T) {
assert.NotEmpty(t, segment.Field2StatslogPaths)
})
}
func TestInjectDone(t *testing.T) {
syncMgr := syncmgr.NewMockSyncManager(t)
segmentIDs := []int64{100, 200, 300}
task := &compactionTask{
plan: &datapb.CompactionPlan{
SegmentBinlogs: lo.Map(segmentIDs, func(id int64, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{SegmentID: id}
}),
},
syncMgr: syncMgr,
}
for _, segmentID := range segmentIDs {
syncMgr.EXPECT().Unblock(segmentID).Return().Once()
}
task.injectDone()
task.injectDone()
}