From 10c04f33c7639164ab516025bb96d6a0421176be Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 15 Jul 2024 14:31:40 +0800 Subject: [PATCH] enhance: [2.4] Add param item for segmentFlushInterval (#34629) (#34663) Cherry-pick from master pr: #34629 See also #28817 Add paramitem for segment flush interval Signed-off-by: Congqi Xia --- configs/milvus.yaml | 1 + internal/datacoord/segment_allocation_policy.go | 5 ++--- internal/datacoord/segment_info.go | 3 ++- internal/datacoord/segment_manager_test.go | 2 +- pkg/util/paramtable/component_param.go | 10 ++++++++++ 5 files changed, 16 insertions(+), 5 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 0bfcef6b15..f5b153f0f4 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -449,6 +449,7 @@ dataCoord: # MUST BE GREATER THAN OR EQUAL TO !!! # During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%. expansionRate: 1.25 + segmentFlushInterval: 2 # the minimal interval duration(unit: Seconds) between flusing operation on same segment autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version enableCompaction: true # Enable data segment compaction compaction: diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index 423e9faeac..d180a837fb 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -180,12 +181,10 @@ func sortSegmentsByLastExpires(segs []*SegmentInfo) { type flushPolicy func(segment *SegmentInfo, t Timestamp) bool -const flushInterval = 2 * time.Second - func flushPolicyL1(segment *SegmentInfo, t Timestamp) bool { return segment.GetState() == commonpb.SegmentState_Sealed && segment.Level != datapb.SegmentLevel_L0 && - time.Since(segment.lastFlushTime) >= flushInterval && + time.Since(segment.lastFlushTime) >= paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second) && segment.GetLastExpireTime() <= t && segment.currRows != 0 && // Decoupling the importing segment from the flush process, diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 35ed52f001..8dcd183632 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) // SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation @@ -64,7 +65,7 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo { SegmentInfo: info, currRows: info.GetNumOfRows(), allocations: make([]*Allocation, 0, 16), - lastFlushTime: time.Now().Add(-1 * flushInterval), + lastFlushTime: time.Now().Add(-1 * paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second)), // A growing segment from recovery can be also considered idle. lastWrittenTime: getZeroTime(), } diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 27bc9a9b02..fbc98c0a41 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -531,7 +531,7 @@ func TestGetFlushableSegments(t *testing.T) { assert.NoError(t, err) assert.Empty(t, ids) - meta.SetLastFlushTime(allocations[0].SegmentID, time.Now().Local().Add(-flushInterval)) + meta.SetLastFlushTime(allocations[0].SegmentID, time.Now().Local().Add(-1*paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second))) ids, err = segmentManager.GetFlushableSegments(context.TODO(), "c1", allocations[0].ExpireTime) assert.NoError(t, err) assert.EqualValues(t, 1, len(ids)) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index fdce4ef9cf..f810ae0667 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2844,6 +2844,7 @@ type dataCoordConfig struct { SegmentMinSizeFromIdleToSealed ParamItem `refreshable:"false"` SegmentMaxBinlogFileNumber ParamItem `refreshable:"false"` AutoUpgradeSegmentIndex ParamItem `refreshable:"true"` + SegmentFlushInterval ParamItem `refreshable:"true"` // compaction EnableCompaction ParamItem `refreshable:"false"` @@ -3589,6 +3590,15 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.AutoUpgradeSegmentIndex.Init(base.mgr) + p.SegmentFlushInterval = ParamItem{ + Key: "dataCoord.segmentFlushInterval", + Version: "2.4.6", + DefaultValue: "2", + Doc: "the minimal interval duration(unit: Seconds) between flusing operation on same segment", + Export: true, + } + p.SegmentFlushInterval.Init(base.mgr) + p.FilesPerPreImportTask = ParamItem{ Key: "dataCoord.import.filesPerPreImportTask", Version: "2.4.0",