enhance: Use a non-blocking method to trigger compaction when saveBinlogPath is executed (#28941)

/kind improvement
issue: #28924

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2023-12-12 10:16:39 +08:00 committed by GitHub
parent a67fc08865
commit ef18e6a532
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 88 additions and 5 deletions

View File

@ -46,7 +46,7 @@ type trigger interface {
// triggerCompaction triggers a compaction if any compaction condition satisfy.
triggerCompaction() error
// triggerSingleCompaction triggers a compaction bundled with collection-partition-channel-segment
triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error
triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error
// forceTriggerCompaction force to start a compaction
forceTriggerCompaction(collectionID int64) (UniqueID, error)
}
@ -232,7 +232,7 @@ func (t *compactionTrigger) triggerCompaction() error {
}
// triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error {
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error {
// If AutoCompaction disabled, flush request will not trigger compaction
if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
return nil
@ -251,7 +251,16 @@ func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, s
segmentID: segmentID,
channel: channel,
}
t.signals <- signal
if blockToSendSignal {
t.signals <- signal
return nil
}
select {
case t.signals <- signal:
default:
log.Info("no space to send compaction signal", zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segmentID), zap.String("channel", channel))
}
return nil
}

View File

@ -17,7 +17,9 @@
package datacoord
import (
"context"
"sort"
"sync/atomic"
"testing"
"time"
@ -1992,6 +1994,78 @@ func Test_compactionTrigger_getCompactTime(t *testing.T) {
assert.NotNil(t, ct)
}
func Test_triggerSingleCompaction(t *testing.T) {
originValue := Params.DataCoordCfg.EnableAutoCompaction.GetValue()
Params.Save(Params.DataCoordCfg.EnableAutoCompaction.Key, "true")
defer func() {
Params.Save(Params.DataCoordCfg.EnableAutoCompaction.Key, originValue)
}()
m := &meta{segments: NewSegmentsInfo(), collections: make(map[UniqueID]*collectionInfo)}
got := newCompactionTrigger(m, &compactionPlanHandler{}, newMockAllocator(),
&ServerHandler{
&Server{
meta: m,
},
}, newMockVersionManager())
got.signals = make(chan *compactionSignal, 1)
{
err := got.triggerSingleCompaction(1, 1, 1, "a", false)
assert.NoError(t, err)
}
{
err := got.triggerSingleCompaction(2, 2, 2, "b", false)
assert.NoError(t, err)
}
var i atomic.Value
i.Store(0)
check := func() {
for {
select {
case signal := <-got.signals:
x := i.Load().(int)
i.Store(x + 1)
assert.EqualValues(t, 1, signal.collectionID)
default:
return
}
}
}
check()
assert.Equal(t, 1, i.Load().(int))
{
err := got.triggerSingleCompaction(3, 3, 3, "c", true)
assert.NoError(t, err)
}
var j atomic.Value
j.Store(0)
go func() {
timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
for {
select {
case signal := <-got.signals:
x := j.Load().(int)
j.Store(x + 1)
if x == 0 {
assert.EqualValues(t, 3, signal.collectionID)
} else if x == 1 {
assert.EqualValues(t, 4, signal.collectionID)
}
case <-timeoutCtx.Done():
return
}
}
}()
{
err := got.triggerSingleCompaction(4, 4, 4, "d", true)
assert.NoError(t, err)
}
assert.Eventually(t, func() bool {
return j.Load().(int) == 2
}, 2*time.Second, 500*time.Millisecond)
}
type CompactionTriggerSuite struct {
suite.Suite

View File

@ -607,7 +607,7 @@ func (t *mockCompactionTrigger) triggerCompaction() error {
}
// triggerSingleCompaction trigerr a compaction bundled with collection-partiiton-channel-segment
func (t *mockCompactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error {
func (t *mockCompactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error {
if f, ok := t.methods["triggerSingleCompaction"]; ok {
if ff, ok := f.(func(collectionID int64, partitionID int64, segmentID int64, channel string) error); ok {
return ff(collectionID, partitionID, segmentID, channel)

View File

@ -513,7 +513,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
if !req.Importing && Params.DataCoordCfg.EnableCompaction.GetAsBool() {
if req.GetSegLevel() != datapb.SegmentLevel_L0 {
err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
segmentID, segment.GetInsertChannel())
segmentID, segment.GetInsertChannel(), false)
}
if err != nil {