enhance: [2.4] Add param item for segmentFlushInterval (#34629) (#34663)

Cherry-pick from master
pr: #34629
See also #28817

Add paramitem for segment flush interval

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-07-15 14:31:40 +08:00 committed by GitHub
parent 92afcb7a39
commit 10c04f33c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 16 additions and 5 deletions

View File

@ -449,6 +449,7 @@ dataCoord:
# MUST BE GREATER THAN OR EQUAL TO <smallProportion>!!!
# During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%.
expansionRate: 1.25
segmentFlushInterval: 2 # the minimal interval duration(unit: Seconds) between flusing operation on same segment
autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version
enableCompaction: true # Enable data segment compaction
compaction:

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -180,12 +181,10 @@ func sortSegmentsByLastExpires(segs []*SegmentInfo) {
type flushPolicy func(segment *SegmentInfo, t Timestamp) bool
const flushInterval = 2 * time.Second
func flushPolicyL1(segment *SegmentInfo, t Timestamp) bool {
return segment.GetState() == commonpb.SegmentState_Sealed &&
segment.Level != datapb.SegmentLevel_L0 &&
time.Since(segment.lastFlushTime) >= flushInterval &&
time.Since(segment.lastFlushTime) >= paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second) &&
segment.GetLastExpireTime() <= t &&
segment.currRows != 0 &&
// Decoupling the importing segment from the flush process,

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation
@ -64,7 +65,7 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
SegmentInfo: info,
currRows: info.GetNumOfRows(),
allocations: make([]*Allocation, 0, 16),
lastFlushTime: time.Now().Add(-1 * flushInterval),
lastFlushTime: time.Now().Add(-1 * paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second)),
// A growing segment from recovery can be also considered idle.
lastWrittenTime: getZeroTime(),
}

View File

@ -531,7 +531,7 @@ func TestGetFlushableSegments(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, ids)
meta.SetLastFlushTime(allocations[0].SegmentID, time.Now().Local().Add(-flushInterval))
meta.SetLastFlushTime(allocations[0].SegmentID, time.Now().Local().Add(-1*paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second)))
ids, err = segmentManager.GetFlushableSegments(context.TODO(), "c1", allocations[0].ExpireTime)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(ids))

View File

@ -2844,6 +2844,7 @@ type dataCoordConfig struct {
SegmentMinSizeFromIdleToSealed ParamItem `refreshable:"false"`
SegmentMaxBinlogFileNumber ParamItem `refreshable:"false"`
AutoUpgradeSegmentIndex ParamItem `refreshable:"true"`
SegmentFlushInterval ParamItem `refreshable:"true"`
// compaction
EnableCompaction ParamItem `refreshable:"false"`
@ -3589,6 +3590,15 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.AutoUpgradeSegmentIndex.Init(base.mgr)
p.SegmentFlushInterval = ParamItem{
Key: "dataCoord.segmentFlushInterval",
Version: "2.4.6",
DefaultValue: "2",
Doc: "the minimal interval duration(unit: Seconds) between flusing operation on same segment",
Export: true,
}
p.SegmentFlushInterval.Init(base.mgr)
p.FilesPerPreImportTask = ParamItem{
Key: "dataCoord.import.filesPerPreImportTask",
Version: "2.4.0",