Add policy to seal long time idle segment (#19222)

Signed-off-by: longjiquan <jiquan.long@zilliz.com>

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
Jiquan Long 2022-09-20 20:54:50 +08:00 committed by GitHub
parent 297d75fc92
commit 5141e05c47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 79 additions and 12 deletions

View File

@ -245,7 +245,7 @@ indexNode:
dataCoord:
address: localhost
port: 13333
enableCompaction: true # Enable data segment compression
enableCompaction: true # Enable data segment compaction
enableGarbageCollection: true
segment:
@ -253,6 +253,10 @@ dataCoord:
sealProportion: 0.25 # It's the minimum proportion for a segment which can be sealed
assignmentExpiration: 2000 # The time of the assignment expiration in ms
maxLife: 86400 # The max lifetime of segment in seconds, 24*60*60
# If a segment didn't accept dml records in `maxIdleTime` and the size of segment is greater than
# `minSizeFromIdleToSealed`, Milvus will automatically seal it.
maxIdleTime: 600 # The max idle time of segment in seconds, 10*60.
minSizeFromIdleToSealed: 16 # The min size in MB of segment which can be idle from sealed.
compaction:
enableAutoCompaction: true

View File

@ -110,6 +110,22 @@ func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy {
}
}
// sealLongTimeIdlePolicy seal segment if the segment has been written with a high frequency before.
// serve for this case:
// If users insert entities into segment continuously within a certain period of time, but they forgot to flush/(seal)
// it and the size of segment didn't reach the seal proportion. Under this situation, Milvus will wait these segments to
// be expired and during this period search latency may be a little high. We can assume that entities won't be inserted
// into this segment anymore, so sealLongTimeIdlePolicy will seal these segments to trigger handoff of query cluster.
// Q: Why we don't decrease the expiry time directly?
// A: We don't want to influence segments which are accepting `frequent small` batch entities.
func sealLongTimeIdlePolicy(idleTimeTolerance time.Duration, minSizeToSealIdleSegment float64, maxSizeOfSegment float64) segmentSealPolicy {
return func(segment *SegmentInfo, ts Timestamp) bool {
limit := (minSizeToSealIdleSegment / maxSizeOfSegment) * float64(segment.GetMaxRowNum())
return time.Since(segment.lastWrittenTime) > idleTimeTolerance &&
float64(segment.currRows) > limit
}
}
// channelSealPolicy seal policy applies to channel
type channelSealPolicy func(string, []*SegmentInfo, Timestamp) []*SegmentInfo

View File

@ -177,3 +177,16 @@ func TestSealSegmentPolicy(t *testing.T) {
assert.True(t, shouldSeal)
})
}
func Test_sealLongTimeIdlePolicy(t *testing.T) {
idleTimeTolerance := 2 * time.Second
minSizeToSealIdleSegment := 16.0
maxSizeOfSegment := 512.0
policy := sealLongTimeIdlePolicy(idleTimeTolerance, minSizeToSealIdleSegment, maxSizeOfSegment)
seg1 := &SegmentInfo{lastWrittenTime: time.Now().Add(idleTimeTolerance * 5)}
assert.False(t, policy(seg1, 100))
seg2 := &SegmentInfo{lastWrittenTime: getZeroTime(), currRows: 1, SegmentInfo: &datapb.SegmentInfo{MaxRowNum: 10000}}
assert.False(t, policy(seg2, 100))
seg3 := &SegmentInfo{lastWrittenTime: getZeroTime(), currRows: 1000, SegmentInfo: &datapb.SegmentInfo{MaxRowNum: 10000}}
assert.True(t, policy(seg3, 100))
}

View File

@ -38,7 +38,8 @@ type SegmentInfo struct {
lastFlushTime time.Time
isCompacting bool
// a cache to avoid calculate twice
size int64
size int64
lastWrittenTime time.Time
}
// NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo`
@ -51,6 +52,8 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
currRows: info.GetNumOfRows(),
allocations: make([]*Allocation, 0, 16),
lastFlushTime: time.Now().Add(-1 * flushInterval),
// A growing segment from recovery can be also considered idle.
lastWrittenTime: getZeroTime(),
}
}
@ -193,6 +196,7 @@ func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo {
lastFlushTime: s.lastFlushTime,
isCompacting: s.isCompacting,
//cannot copy size, since binlog may be changed
lastWrittenTime: s.lastWrittenTime,
}
for _, opt := range opts {
opt(cloned)
@ -203,12 +207,13 @@ func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo {
// ShadowClone shadow clone the segment and return a new instance
func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo {
cloned := &SegmentInfo{
SegmentInfo: s.SegmentInfo,
currRows: s.currRows,
allocations: s.allocations,
lastFlushTime: s.lastFlushTime,
isCompacting: s.isCompacting,
size: s.size,
SegmentInfo: s.SegmentInfo,
currRows: s.currRows,
allocations: s.allocations,
lastFlushTime: s.lastFlushTime,
isCompacting: s.isCompacting,
size: s.size,
lastWrittenTime: s.lastWrittenTime,
}
for _, opt := range opts {
@ -274,6 +279,7 @@ func AddAllocation(allocation *Allocation) SegmentInfoOption {
func SetCurrentRows(rows int64) SegmentInfoOption {
return func(segment *SegmentInfo) {
segment.currRows = rows
segment.lastWrittenTime = time.Now()
}
}

View File

@ -182,6 +182,7 @@ func defaultSegmentSealPolicy() []segmentSealPolicy {
return []segmentSealPolicy{
sealByLifetimePolicy(Params.DataCoordCfg.SegmentMaxLifetime),
getSegmentCapacityPolicy(Params.DataCoordCfg.SegmentSealProportion),
sealLongTimeIdlePolicy(Params.DataCoordCfg.SegmentMaxIdleTime, Params.DataCoordCfg.SegmentMinSizeFromIdleToSealed, Params.DataCoordCfg.SegmentMaxSize),
}
}

View File

@ -194,3 +194,8 @@ func IsParentDropped(meta *meta, segment *SegmentInfo) bool {
}
return true
}
func getZeroTime() time.Time {
var t time.Time
return t
}

View File

@ -204,3 +204,11 @@ func (f *fixedTSOAllocator) allocTimestamp(_ context.Context) (Timestamp, error)
func (f *fixedTSOAllocator) allocID(_ context.Context) (UniqueID, error) {
panic("not implemented") // TODO: Implement
}
func (suite *UtilSuite) TestGetZeroTime() {
n := 10
for i := 0; i < n; i++ {
timeGot := getZeroTime()
suite.True(timeGot.IsZero())
}
}

View File

@ -984,10 +984,12 @@ type dataCoordConfig struct {
ChannelWatchSubPath string
// --- SEGMENTS ---
SegmentMaxSize float64
SegmentSealProportion float64
SegAssignmentExpiration int64
SegmentMaxLifetime time.Duration
SegmentMaxSize float64
SegmentSealProportion float64
SegAssignmentExpiration int64
SegmentMaxLifetime time.Duration
SegmentMaxIdleTime time.Duration
SegmentMinSizeFromIdleToSealed float64
CreatedTime time.Time
UpdatedTime time.Time
@ -1022,6 +1024,8 @@ func (p *dataCoordConfig) init(base *BaseTable) {
p.initSegmentSealProportion()
p.initSegAssignmentExpiration()
p.initSegmentMaxLifetime()
p.initSegmentMaxIdleTime()
p.initSegmentMinSizeFromIdleToSealed()
p.initEnableCompaction()
p.initEnableAutoCompaction()
@ -1059,6 +1063,16 @@ func (p *dataCoordConfig) initSegmentMaxLifetime() {
p.SegmentMaxLifetime = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.segment.maxLife", 24*60*60)) * time.Second
}
func (p *dataCoordConfig) initSegmentMaxIdleTime() {
p.SegmentMaxIdleTime = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.segment.maxIdleTime", 60*60)) * time.Second
log.Info("init segment max idle time", zap.String("value", p.SegmentMaxIdleTime.String()))
}
func (p *dataCoordConfig) initSegmentMinSizeFromIdleToSealed() {
p.SegmentMinSizeFromIdleToSealed = p.Base.ParseFloatWithDefault("dataCoord.segment.minSizeFromIdleToSealed", 16.0)
log.Info("init segment min size from idle to sealed", zap.Float64("value", p.SegmentMinSizeFromIdleToSealed))
}
func (p *dataCoordConfig) initChannelWatchPrefix() {
// WARN: this value should not be put to milvus.yaml. It's a default value for channel watch path.
// This will be removed after we reconstruct our config module.