From ef18e6a532331b108a59a50b7277ed0a99541728 Mon Sep 17 00:00:00 2001 From: SimFG Date: Tue, 12 Dec 2023 10:16:39 +0800 Subject: [PATCH] enhance: Use a non-blocking method to trigger compaction when saveBinlogPath is executed (#28941) /kind improvement issue: #28924 Signed-off-by: SimFG --- internal/datacoord/compaction_trigger.go | 15 +++- internal/datacoord/compaction_trigger_test.go | 74 +++++++++++++++++++ internal/datacoord/mock_test.go | 2 +- internal/datacoord/services.go | 2 +- 4 files changed, 88 insertions(+), 5 deletions(-) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 737bb29a01..c32c8c7496 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -46,7 +46,7 @@ type trigger interface { // triggerCompaction triggers a compaction if any compaction condition satisfy. triggerCompaction() error // triggerSingleCompaction triggers a compaction bundled with collection-partition-channel-segment - triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error + triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error // forceTriggerCompaction force to start a compaction forceTriggerCompaction(collectionID int64) (UniqueID, error) } @@ -232,7 +232,7 @@ func (t *compactionTrigger) triggerCompaction() error { } // triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment -func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error { +func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error { // If AutoCompaction disabled, flush request will not trigger compaction if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() { return nil @@ -251,7 +251,16 @@ func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, s segmentID: segmentID, channel: channel, } - t.signals <- signal + if blockToSendSignal { + t.signals <- signal + return nil + } + select { + case t.signals <- signal: + default: + log.Info("no space to send compaction signal", zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segmentID), zap.String("channel", channel)) + } + return nil } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index bb5feb069e..d3ba8dca5e 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -17,7 +17,9 @@ package datacoord import ( + "context" "sort" + "sync/atomic" "testing" "time" @@ -1992,6 +1994,78 @@ func Test_compactionTrigger_getCompactTime(t *testing.T) { assert.NotNil(t, ct) } +func Test_triggerSingleCompaction(t *testing.T) { + originValue := Params.DataCoordCfg.EnableAutoCompaction.GetValue() + Params.Save(Params.DataCoordCfg.EnableAutoCompaction.Key, "true") + defer func() { + Params.Save(Params.DataCoordCfg.EnableAutoCompaction.Key, originValue) + }() + m := &meta{segments: NewSegmentsInfo(), collections: make(map[UniqueID]*collectionInfo)} + got := newCompactionTrigger(m, &compactionPlanHandler{}, newMockAllocator(), + &ServerHandler{ + &Server{ + meta: m, + }, + }, newMockVersionManager()) + got.signals = make(chan *compactionSignal, 1) + { + err := got.triggerSingleCompaction(1, 1, 1, "a", false) + assert.NoError(t, err) + } + { + err := got.triggerSingleCompaction(2, 2, 2, "b", false) + assert.NoError(t, err) + } + var i atomic.Value + i.Store(0) + check := func() { + for { + select { + case signal := <-got.signals: + x := i.Load().(int) + i.Store(x + 1) + assert.EqualValues(t, 1, signal.collectionID) + default: + return + } + } + } + check() + assert.Equal(t, 1, i.Load().(int)) + + { + err := got.triggerSingleCompaction(3, 3, 3, "c", true) + assert.NoError(t, err) + } + var j atomic.Value + j.Store(0) + go func() { + timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), time.Second) + defer cancelFunc() + for { + select { + case signal := <-got.signals: + x := j.Load().(int) + j.Store(x + 1) + if x == 0 { + assert.EqualValues(t, 3, signal.collectionID) + } else if x == 1 { + assert.EqualValues(t, 4, signal.collectionID) + } + case <-timeoutCtx.Done(): + return + } + } + }() + { + err := got.triggerSingleCompaction(4, 4, 4, "d", true) + assert.NoError(t, err) + } + assert.Eventually(t, func() bool { + return j.Load().(int) == 2 + }, 2*time.Second, 500*time.Millisecond) +} + type CompactionTriggerSuite struct { suite.Suite diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index e7d5e309e2..a3f57fddae 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -607,7 +607,7 @@ func (t *mockCompactionTrigger) triggerCompaction() error { } // triggerSingleCompaction trigerr a compaction bundled with collection-partiiton-channel-segment -func (t *mockCompactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error { +func (t *mockCompactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error { if f, ok := t.methods["triggerSingleCompaction"]; ok { if ff, ok := f.(func(collectionID int64, partitionID int64, segmentID int64, channel string) error); ok { return ff(collectionID, partitionID, segmentID, channel) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index b0518551bb..3963acb79c 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -513,7 +513,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath if !req.Importing && Params.DataCoordCfg.EnableCompaction.GetAsBool() { if req.GetSegLevel() != datapb.SegmentLevel_L0 { err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(), - segmentID, segment.GetInsertChannel()) + segmentID, segment.GetInsertChannel(), false) } if err != nil {