2021-12-29 11:38:57 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-11-05 22:25:00 +08:00
package datacoord
import (
"context"
2022-09-25 20:12:52 +08:00
"fmt"
2022-06-15 23:14:10 +08:00
"sort"
2021-11-05 22:25:00 +08:00
"sync"
"time"
2023-04-06 19:14:32 +08:00
"github.com/milvus-io/milvus/pkg/util/tsoutil"
2023-01-04 19:37:36 +08:00
"github.com/samber/lo"
"go.uber.org/zap"
2022-10-16 20:49:27 +08:00
"github.com/milvus-io/milvus-proto/go-api/commonpb"
2021-11-05 22:25:00 +08:00
"github.com/milvus-io/milvus/internal/proto/datapb"
2023-04-06 19:14:32 +08:00
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/logutil"
2021-11-05 22:25:00 +08:00
)
2022-06-15 23:14:10 +08:00
type compactTime struct {
2022-10-10 20:31:22 +08:00
travelTime Timestamp
expireTime Timestamp
collectionTTL time . Duration
2021-11-05 22:25:00 +08:00
}
type trigger interface {
start ( )
stop ( )
2021-12-16 09:57:25 +08:00
// triggerCompaction triggers a compaction if any compaction condition satisfy.
2022-10-10 20:31:22 +08:00
triggerCompaction ( ) error
2021-12-16 09:57:25 +08:00
// triggerSingleCompaction triggers a compaction bundled with collection-partition-channel-segment
2022-10-10 20:31:22 +08:00
triggerSingleCompaction ( collectionID , partitionID , segmentID int64 , channel string ) error
2021-11-05 22:25:00 +08:00
// forceTriggerCompaction force to start a compaction
2022-10-10 20:31:22 +08:00
forceTriggerCompaction ( collectionID int64 ) ( UniqueID , error )
2021-11-05 22:25:00 +08:00
}
type compactionSignal struct {
id UniqueID
isForce bool
isGlobal bool
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
channel string
}
var _ trigger = ( * compactionTrigger ) ( nil )
type compactionTrigger struct {
2023-01-04 19:37:36 +08:00
handler Handler
meta * meta
allocator allocator
signals chan * compactionSignal
compactionHandler compactionPlanContext
globalTrigger * time . Ticker
forceMu sync . Mutex
quit chan struct { }
wg sync . WaitGroup
//segRefer *SegmentReferenceManager
//indexCoord types.IndexCoord
2022-12-13 15:39:21 +08:00
estimateNonDiskSegmentPolicy calUpperLimitPolicy
estimateDiskSegmentPolicy calUpperLimitPolicy
// A sloopy hack, so we can test with different segment row count without worrying that
// they are re-calculated in every compaction.
testingOnly bool
2021-11-05 22:25:00 +08:00
}
2022-09-16 11:32:48 +08:00
func newCompactionTrigger (
meta * meta ,
compactionHandler compactionPlanContext ,
allocator allocator ,
2023-01-04 19:37:36 +08:00
//segRefer *SegmentReferenceManager,
//indexCoord types.IndexCoord,
2022-10-11 21:39:24 +08:00
handler Handler ,
2022-09-16 11:32:48 +08:00
) * compactionTrigger {
2021-11-05 22:25:00 +08:00
return & compactionTrigger {
2023-01-04 19:37:36 +08:00
meta : meta ,
allocator : allocator ,
signals : make ( chan * compactionSignal , 100 ) ,
compactionHandler : compactionHandler ,
//segRefer: segRefer,
//indexCoord: indexCoord,
2022-12-13 15:39:21 +08:00
estimateDiskSegmentPolicy : calBySchemaPolicyWithDiskIndex ,
estimateNonDiskSegmentPolicy : calBySchemaPolicy ,
handler : handler ,
2021-11-05 22:25:00 +08:00
}
}
func ( t * compactionTrigger ) start ( ) {
t . quit = make ( chan struct { } )
2022-12-07 18:01:19 +08:00
t . globalTrigger = time . NewTicker ( Params . DataCoordCfg . GlobalCompactionInterval . GetAsDuration ( time . Second ) )
2021-11-05 22:25:00 +08:00
t . wg . Add ( 2 )
go func ( ) {
defer logutil . LogPanic ( )
defer t . wg . Done ( )
for {
select {
case <- t . quit :
2022-03-02 15:35:55 +08:00
log . Info ( "compaction trigger quit" )
2021-11-05 22:25:00 +08:00
return
case signal := <- t . signals :
switch {
case signal . isGlobal :
t . handleGlobalSignal ( signal )
default :
t . handleSignal ( signal )
2022-06-15 23:14:10 +08:00
// shouldn't reset, otherwise a frequent flushed collection will affect other collections
// t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
2021-11-05 22:25:00 +08:00
}
}
}
} ( )
go t . startGlobalCompactionLoop ( )
}
func ( t * compactionTrigger ) startGlobalCompactionLoop ( ) {
defer logutil . LogPanic ( )
defer t . wg . Done ( )
2021-12-16 09:59:12 +08:00
// If AutoCompaction disabled, global loop will not start
2022-12-07 18:01:19 +08:00
if ! Params . DataCoordCfg . EnableAutoCompaction . GetAsBool ( ) {
2021-12-08 19:47:05 +08:00
return
}
2021-11-05 22:25:00 +08:00
for {
select {
case <- t . quit :
t . globalTrigger . Stop ( )
log . Info ( "global compaction loop exit" )
return
case <- t . globalTrigger . C :
2022-10-10 20:31:22 +08:00
err := t . triggerCompaction ( )
2022-01-07 17:41:40 +08:00
if err != nil {
log . Warn ( "unable to triggerCompaction" , zap . Error ( err ) )
}
2021-11-05 22:25:00 +08:00
}
}
}
func ( t * compactionTrigger ) stop ( ) {
close ( t . quit )
t . wg . Wait ( )
}
2022-10-10 20:31:22 +08:00
func ( t * compactionTrigger ) allocTs ( ) ( Timestamp , error ) {
cctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
ts , err := t . allocator . allocTimestamp ( cctx )
if err != nil {
return 0 , err
}
return ts , nil
}
func ( t * compactionTrigger ) getCompactTime ( ts Timestamp , collectionID UniqueID ) ( * compactTime , error ) {
2022-10-11 21:39:24 +08:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Second )
defer cancel ( )
coll , err := t . handler . GetCollection ( ctx , collectionID )
if err != nil {
return nil , fmt . Errorf ( "collection ID %d not found, err: %w" , collectionID , err )
2022-10-10 20:31:22 +08:00
}
collectionTTL , err := getCollectionTTL ( coll . Properties )
if err != nil {
return nil , err
}
pts , _ := tsoutil . ParseTS ( ts )
2022-12-07 18:01:19 +08:00
ttRetention := pts . Add ( Params . CommonCfg . RetentionDuration . GetAsDuration ( time . Second ) * - 1 )
2022-10-10 20:31:22 +08:00
ttRetentionLogic := tsoutil . ComposeTS ( ttRetention . UnixNano ( ) / int64 ( time . Millisecond ) , 0 )
if collectionTTL > 0 {
ttexpired := pts . Add ( - collectionTTL )
ttexpiredLogic := tsoutil . ComposeTS ( ttexpired . UnixNano ( ) / int64 ( time . Millisecond ) , 0 )
return & compactTime { ttRetentionLogic , ttexpiredLogic , collectionTTL } , nil
}
// no expiration time
return & compactTime { ttRetentionLogic , 0 , 0 } , nil
}
2021-11-05 22:25:00 +08:00
// triggerCompaction trigger a compaction if any compaction condition satisfy.
2022-10-10 20:31:22 +08:00
func ( t * compactionTrigger ) triggerCompaction ( ) error {
2021-11-05 22:25:00 +08:00
id , err := t . allocSignalID ( )
if err != nil {
return err
}
signal := & compactionSignal {
2022-10-10 20:31:22 +08:00
id : id ,
isForce : false ,
isGlobal : true ,
2021-11-05 22:25:00 +08:00
}
t . signals <- signal
return nil
}
2023-03-03 14:13:49 +08:00
// triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment
2022-10-10 20:31:22 +08:00
func ( t * compactionTrigger ) triggerSingleCompaction ( collectionID , partitionID , segmentID int64 , channel string ) error {
2023-03-03 14:13:49 +08:00
// If AutoCompaction disabled, flush request will not trigger compaction
2022-12-07 18:01:19 +08:00
if ! Params . DataCoordCfg . EnableAutoCompaction . GetAsBool ( ) {
2021-12-08 19:47:05 +08:00
return nil
}
2021-11-05 22:25:00 +08:00
id , err := t . allocSignalID ( )
if err != nil {
return err
}
signal := & compactionSignal {
id : id ,
isForce : false ,
isGlobal : false ,
collectionID : collectionID ,
partitionID : partitionID ,
segmentID : segmentID ,
channel : channel ,
}
t . signals <- signal
return nil
}
// forceTriggerCompaction force to start a compaction
2022-02-18 14:51:49 +08:00
// invoked by user `ManualCompaction` operation
2022-10-10 20:31:22 +08:00
func ( t * compactionTrigger ) forceTriggerCompaction ( collectionID int64 ) ( UniqueID , error ) {
2021-11-05 22:25:00 +08:00
id , err := t . allocSignalID ( )
if err != nil {
return - 1 , err
}
signal := & compactionSignal {
id : id ,
isForce : true ,
2022-02-18 14:51:49 +08:00
isGlobal : true ,
2021-11-05 22:25:00 +08:00
collectionID : collectionID ,
}
2022-02-18 14:51:49 +08:00
t . handleGlobalSignal ( signal )
2021-11-05 22:25:00 +08:00
return id , nil
}
func ( t * compactionTrigger ) allocSignalID ( ) ( UniqueID , error ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
return t . allocator . allocID ( ctx )
}
2022-12-13 15:39:21 +08:00
func ( t * compactionTrigger ) reCalcSegmentMaxNumOfRows ( collectionID UniqueID , isDisk bool ) ( int , error ) {
2022-10-11 21:39:24 +08:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Second )
defer cancel ( )
collMeta , err := t . handler . GetCollection ( ctx , collectionID )
if err != nil {
2022-09-25 20:12:52 +08:00
return - 1 , fmt . Errorf ( "failed to get collection %d" , collectionID )
}
2022-12-13 15:39:21 +08:00
if isDisk {
return t . estimateDiskSegmentPolicy ( collMeta . Schema )
}
return t . estimateNonDiskSegmentPolicy ( collMeta . Schema )
2022-09-25 20:12:52 +08:00
}
2022-12-13 15:39:21 +08:00
// TODO: Update segment info should be written back to Etcd.
2022-12-12 10:33:26 +08:00
func ( t * compactionTrigger ) updateSegmentMaxSize ( segments [ ] * SegmentInfo ) ( bool , error ) {
2022-09-25 20:12:52 +08:00
if len ( segments ) == 0 {
2022-12-12 10:33:26 +08:00
return false , nil
2022-09-25 20:12:52 +08:00
}
collectionID := segments [ 0 ] . GetCollectionID ( )
2023-01-04 19:37:36 +08:00
indexInfos := t . meta . GetIndexesForCollection ( segments [ 0 ] . GetCollectionID ( ) , "" )
2022-09-25 20:12:52 +08:00
2022-12-13 15:39:21 +08:00
isDiskANN := false
2023-01-04 19:37:36 +08:00
for _ , indexInfo := range indexInfos {
indexType := getIndexType ( indexInfo . IndexParams )
if indexType == indexparamcheck . IndexDISKANN {
// If index type is DiskANN, recalc segment max size here.
isDiskANN = true
newMaxRows , err := t . reCalcSegmentMaxNumOfRows ( collectionID , true )
if err != nil {
return false , err
}
if len ( segments ) > 0 && int64 ( newMaxRows ) != segments [ 0 ] . GetMaxRowNum ( ) {
log . Info ( "segment max rows recalculated for DiskANN collection" ,
zap . Int64 ( "old max rows" , segments [ 0 ] . GetMaxRowNum ( ) ) ,
zap . Int64 ( "new max rows" , int64 ( newMaxRows ) ) )
for _ , segment := range segments {
segment . MaxRowNum = int64 ( newMaxRows )
2022-09-25 20:12:52 +08:00
}
}
}
}
2022-12-13 15:39:21 +08:00
// If index type is not DiskANN, recalc segment max size using default policy.
if ! isDiskANN && ! t . testingOnly {
newMaxRows , err := t . reCalcSegmentMaxNumOfRows ( collectionID , false )
if err != nil {
return isDiskANN , err
}
if len ( segments ) > 0 && int64 ( newMaxRows ) != segments [ 0 ] . GetMaxRowNum ( ) {
log . Info ( "segment max rows recalculated for non-DiskANN collection" ,
zap . Int64 ( "old max rows" , segments [ 0 ] . GetMaxRowNum ( ) ) ,
zap . Int64 ( "new max rows" , int64 ( newMaxRows ) ) )
for _ , segment := range segments {
segment . MaxRowNum = int64 ( newMaxRows )
}
}
}
return isDiskANN , nil
2022-09-25 20:12:52 +08:00
}
2021-11-05 22:25:00 +08:00
func ( t * compactionTrigger ) handleGlobalSignal ( signal * compactionSignal ) {
t . forceMu . Lock ( )
defer t . forceMu . Unlock ( )
2022-02-18 14:51:49 +08:00
m := t . meta . GetSegmentsChanPart ( func ( segment * SegmentInfo ) bool {
return ( signal . collectionID == 0 || segment . CollectionID == signal . collectionID ) &&
isSegmentHealthy ( segment ) &&
isFlush ( segment ) &&
2022-09-26 18:06:54 +08:00
! segment . isCompacting && // not compacting now
! segment . GetIsImporting ( ) // not importing now
2022-02-18 14:51:49 +08:00
} ) // m is list of chanPartSegments, which is channel-partition organized segments
2022-09-22 15:48:50 +08:00
2022-10-10 20:31:22 +08:00
if len ( m ) == 0 {
return
}
ts , err := t . allocTs ( )
if err != nil {
log . Warn ( "allocate ts failed, skip to handle compaction" ,
zap . Int64 ( "collectionID" , signal . collectionID ) ,
zap . Int64 ( "partitionID" , signal . partitionID ) ,
zap . Int64 ( "segmentID" , signal . segmentID ) )
return
}
2022-02-18 14:51:49 +08:00
for _ , group := range m {
if ! signal . isForce && t . compactionHandler . isFull ( ) {
break
}
2022-09-25 20:12:52 +08:00
2022-12-12 10:33:26 +08:00
isDiskIndex , err := t . updateSegmentMaxSize ( group . segments )
2022-09-25 20:12:52 +08:00
if err != nil {
2022-12-13 15:39:21 +08:00
log . Warn ( "failed to update segment max size" , zap . Error ( err ) )
2022-09-25 20:12:52 +08:00
continue
}
2022-10-10 20:31:22 +08:00
ct , err := t . getCompactTime ( ts , group . collectionID )
if err != nil {
log . Warn ( "get compact time failed, skip to handle compaction" ,
zap . Int64 ( "collectionID" , group . collectionID ) ,
zap . Int64 ( "partitionID" , group . partitionID ) ,
zap . String ( "channel" , group . channelName ) )
return
}
2022-12-12 10:33:26 +08:00
plans := t . generatePlans ( group . segments , signal . isForce , isDiskIndex , ct )
2022-02-18 14:51:49 +08:00
for _ , plan := range plans {
2022-11-15 11:13:07 +08:00
segIDs := fetchSegIDs ( plan . GetSegmentBinlogs ( ) )
2022-02-18 14:51:49 +08:00
if ! signal . isForce && t . compactionHandler . isFull ( ) {
2022-11-15 11:13:07 +08:00
log . Warn ( "compaction plan skipped due to handler full" ,
zap . Int64 ( "collection" , signal . collectionID ) ,
zap . Int64s ( "segment IDs" , segIDs ) )
2022-02-18 14:51:49 +08:00
break
}
start := time . Now ( )
if err := t . fillOriginPlan ( plan ) ; err != nil {
2022-11-15 11:13:07 +08:00
log . Warn ( "failed to fill plan" ,
zap . Int64s ( "segment IDs" , segIDs ) ,
zap . Error ( err ) )
2022-02-18 14:51:49 +08:00
continue
}
2022-06-22 19:00:14 +08:00
err := t . compactionHandler . execCompactionPlan ( signal , plan )
if err != nil {
2022-11-15 11:13:07 +08:00
log . Warn ( "failed to execute compaction plan" ,
zap . Int64 ( "collection" , signal . collectionID ) ,
zap . Int64 ( "planID" , plan . PlanID ) ,
zap . Int64s ( "segment IDs" , segIDs ) ,
zap . Error ( err ) )
2022-06-22 19:00:14 +08:00
continue
}
2021-11-05 22:25:00 +08:00
2022-11-15 11:13:07 +08:00
segIDMap := make ( map [ int64 ] [ ] * datapb . FieldBinlog , len ( plan . SegmentBinlogs ) )
2022-07-28 14:52:31 +08:00
for _ , seg := range plan . SegmentBinlogs {
2022-11-15 11:13:07 +08:00
segIDMap [ seg . SegmentID ] = seg . Deltalogs
2022-07-28 14:52:31 +08:00
}
2022-11-15 11:13:07 +08:00
log . Info ( "time cost of generating global compaction" ,
zap . Any ( "segID2DeltaLogs" , segIDMap ) ,
zap . Int64 ( "planID" , plan . PlanID ) ,
zap . Any ( "time cost" , time . Since ( start ) . Milliseconds ( ) ) ,
zap . Int64 ( "collectionID" , signal . collectionID ) ,
zap . String ( "channel" , group . channelName ) ,
zap . Int64 ( "partitionID" , group . partitionID ) ,
zap . Int64s ( "segment IDs" , segIDs ) )
2022-02-18 14:51:49 +08:00
}
2021-12-29 10:06:47 +08:00
}
2021-11-05 22:25:00 +08:00
}
2022-02-18 14:51:49 +08:00
// handleSignal processes segment flush caused partition-chan level compaction signal
2021-11-05 22:25:00 +08:00
func ( t * compactionTrigger ) handleSignal ( signal * compactionSignal ) {
t . forceMu . Lock ( )
defer t . forceMu . Unlock ( )
// 1. check whether segment's binlogs should be compacted or not
if t . compactionHandler . isFull ( ) {
return
}
2023-03-03 14:13:49 +08:00
segment := t . meta . GetHealthySegment ( signal . segmentID )
2021-12-22 21:23:10 +08:00
if segment == nil {
log . Warn ( "segment in compaction signal not found in meta" , zap . Int64 ( "segmentID" , signal . segmentID ) )
return
}
2021-11-05 22:25:00 +08:00
channel := segment . GetInsertChannel ( )
partitionID := segment . GetPartitionID ( )
segments := t . getCandidateSegments ( channel , partitionID )
2022-09-25 20:12:52 +08:00
2022-10-10 20:31:22 +08:00
if len ( segments ) == 0 {
return
}
2022-12-12 10:33:26 +08:00
isDiskIndex , err := t . updateSegmentMaxSize ( segments )
2022-09-25 20:12:52 +08:00
if err != nil {
log . Warn ( "failed to update segment max size" , zap . Error ( err ) )
2023-01-04 16:37:35 +08:00
return
2022-09-25 20:12:52 +08:00
}
2022-10-10 20:31:22 +08:00
ts , err := t . allocTs ( )
if err != nil {
log . Warn ( "allocate ts failed, skip to handle compaction" , zap . Int64 ( "collectionID" , signal . collectionID ) ,
zap . Int64 ( "partitionID" , signal . partitionID ) , zap . Int64 ( "segmentID" , signal . segmentID ) )
return
}
ct , err := t . getCompactTime ( ts , segment . GetCollectionID ( ) )
if err != nil {
log . Warn ( "get compact time failed, skip to handle compaction" , zap . Int64 ( "collectionID" , segment . GetCollectionID ( ) ) ,
zap . Int64 ( "partitionID" , partitionID ) , zap . String ( "channel" , channel ) )
return
}
2022-12-12 10:33:26 +08:00
plans := t . generatePlans ( segments , signal . isForce , isDiskIndex , ct )
2022-02-18 14:51:49 +08:00
for _ , plan := range plans {
if t . compactionHandler . isFull ( ) {
log . Warn ( "compaction plan skipped due to handler full" , zap . Int64 ( "collection" , signal . collectionID ) , zap . Int64 ( "planID" , plan . PlanID ) )
break
}
start := time . Now ( )
if err := t . fillOriginPlan ( plan ) ; err != nil {
log . Warn ( "failed to fill plan" , zap . Error ( err ) )
continue
}
2022-11-15 11:13:07 +08:00
if err := t . compactionHandler . execCompactionPlan ( signal , plan ) ; err != nil {
log . Warn ( "failed to execute compaction plan" ,
zap . Int64 ( "collection" , signal . collectionID ) ,
zap . Int64 ( "planID" , plan . PlanID ) ,
zap . Int64s ( "segment IDs" , fetchSegIDs ( plan . GetSegmentBinlogs ( ) ) ) ,
zap . Error ( err ) )
continue
}
log . Info ( "time cost of generating compaction" ,
zap . Int64 ( "plan ID" , plan . PlanID ) ,
zap . Any ( "time cost" , time . Since ( start ) . Milliseconds ( ) ) ,
zap . Int64 ( "collection ID" , signal . collectionID ) ,
zap . String ( "channel" , channel ) ,
zap . Int64 ( "partition ID" , partitionID ) ,
zap . Int64s ( "segment IDs" , fetchSegIDs ( plan . GetSegmentBinlogs ( ) ) ) )
2021-11-05 22:25:00 +08:00
}
}
2022-12-12 10:33:26 +08:00
func ( t * compactionTrigger ) generatePlans ( segments [ ] * SegmentInfo , force bool , isDiskIndex bool , compactTime * compactTime ) [ ] * datapb . CompactionPlan {
2022-02-18 14:51:49 +08:00
// find segments need internal compaction
2022-06-15 23:14:10 +08:00
// TODO add low priority candidates, for example if the segment is smaller than full 0.9 * max segment size but larger than small segment boundary, we only execute compaction when there are no compaction running actively
var prioritizedCandidates [ ] * SegmentInfo
var smallCandidates [ ] * SegmentInfo
2022-12-09 16:03:20 +08:00
var nonPlannedSegments [ ] * SegmentInfo
2022-06-15 23:14:10 +08:00
// TODO, currently we lack of the measurement of data distribution, there should be another compaction help on redistributing segment based on scalar/vector field distribution
2022-02-18 14:51:49 +08:00
for _ , segment := range segments {
segment := segment . ShadowClone ( )
2022-06-15 23:14:10 +08:00
// TODO should we trigger compaction periodically even if the segment has no obvious reason to be compacted?
2022-12-12 10:33:26 +08:00
if force || t . ShouldDoSingleCompaction ( segment , isDiskIndex , compactTime ) {
2022-06-15 23:14:10 +08:00
prioritizedCandidates = append ( prioritizedCandidates , segment )
} else if t . isSmallSegment ( segment ) {
smallCandidates = append ( smallCandidates , segment )
2022-12-09 16:03:20 +08:00
} else {
nonPlannedSegments = append ( nonPlannedSegments , segment )
2022-02-18 14:51:49 +08:00
}
}
var plans [ ] * datapb . CompactionPlan
2022-06-15 23:14:10 +08:00
// sort segment from large to small
sort . Slice ( prioritizedCandidates , func ( i , j int ) bool {
2022-06-22 19:00:14 +08:00
if prioritizedCandidates [ i ] . GetNumOfRows ( ) != prioritizedCandidates [ j ] . GetNumOfRows ( ) {
return prioritizedCandidates [ i ] . GetNumOfRows ( ) > prioritizedCandidates [ j ] . GetNumOfRows ( )
2022-06-15 23:14:10 +08:00
}
return prioritizedCandidates [ i ] . GetID ( ) < prioritizedCandidates [ j ] . GetID ( )
} )
2022-02-18 14:51:49 +08:00
2022-06-15 23:14:10 +08:00
sort . Slice ( smallCandidates , func ( i , j int ) bool {
2022-06-22 19:00:14 +08:00
if smallCandidates [ i ] . GetNumOfRows ( ) != smallCandidates [ j ] . GetNumOfRows ( ) {
return smallCandidates [ i ] . GetNumOfRows ( ) > smallCandidates [ j ] . GetNumOfRows ( )
2022-06-15 23:14:10 +08:00
}
return smallCandidates [ i ] . GetID ( ) < smallCandidates [ j ] . GetID ( )
} )
2022-12-09 16:03:20 +08:00
// Sort non-planned from small to large.
sort . Slice ( nonPlannedSegments , func ( i , j int ) bool {
if nonPlannedSegments [ i ] . GetNumOfRows ( ) != nonPlannedSegments [ j ] . GetNumOfRows ( ) {
return nonPlannedSegments [ i ] . GetNumOfRows ( ) < nonPlannedSegments [ j ] . GetNumOfRows ( )
}
return nonPlannedSegments [ i ] . GetID ( ) > nonPlannedSegments [ j ] . GetID ( )
} )
getSegmentIDs := func ( segment * SegmentInfo , _ int ) int64 {
return segment . GetID ( )
}
2022-06-15 23:14:10 +08:00
// greedy pick from large segment to small, the goal is to fill each segment to reach 512M
// we must ensure all prioritized candidates is in a plan
//TODO the compaction policy should consider segment with similar timestamp together so timetravel and data expiration could work better.
//TODO the compaction selection policy should consider if compaction workload is high
for len ( prioritizedCandidates ) > 0 {
2022-02-18 14:51:49 +08:00
var bucket [ ] * SegmentInfo
2022-06-15 23:14:10 +08:00
// pop out the first element
segment := prioritizedCandidates [ 0 ]
2022-02-18 14:51:49 +08:00
bucket = append ( bucket , segment )
2022-06-15 23:14:10 +08:00
prioritizedCandidates = prioritizedCandidates [ 1 : ]
// only do single file compaction if segment is already large enough
2022-06-22 19:00:14 +08:00
if segment . GetNumOfRows ( ) < segment . GetMaxRowNum ( ) {
2022-06-15 23:14:10 +08:00
var result [ ] * SegmentInfo
2022-06-22 19:00:14 +08:00
free := segment . GetMaxRowNum ( ) - segment . GetNumOfRows ( )
2022-12-07 18:01:19 +08:00
maxNum := Params . DataCoordCfg . MaxSegmentToMerge . GetAsInt ( ) - 1
2022-06-15 23:14:10 +08:00
prioritizedCandidates , result , free = greedySelect ( prioritizedCandidates , free , maxNum )
bucket = append ( bucket , result ... )
maxNum -= len ( result )
if maxNum > 0 {
smallCandidates , result , _ = greedySelect ( smallCandidates , free , maxNum )
bucket = append ( bucket , result ... )
}
}
// since this is priority compaction, we will execute even if there is only segment
2022-06-22 19:00:14 +08:00
plan := segmentsToPlan ( bucket , compactTime )
var size int64
var row int64
for _ , s := range bucket {
size += s . getSegmentSize ( )
row += s . GetNumOfRows ( )
}
log . Info ( "generate a plan for priority candidates" , zap . Any ( "plan" , plan ) ,
zap . Int64 ( "target segment row" , row ) , zap . Int64 ( "target segment size" , size ) )
plans = append ( plans , plan )
2022-02-18 14:51:49 +08:00
}
2022-12-09 16:03:20 +08:00
getSegIDsFromPlan := func ( plan * datapb . CompactionPlan ) [ ] int64 {
var segmentIDs [ ] int64
for _ , binLog := range plan . GetSegmentBinlogs ( ) {
segmentIDs = append ( segmentIDs , binLog . GetSegmentID ( ) )
}
return segmentIDs
}
var remainingSmallSegs [ ] * SegmentInfo
2022-06-15 23:14:10 +08:00
// check if there are small candidates left can be merged into large segments
for len ( smallCandidates ) > 0 {
var bucket [ ] * SegmentInfo
// pop out the first element
segment := smallCandidates [ 0 ]
bucket = append ( bucket , segment )
smallCandidates = smallCandidates [ 1 : ]
var result [ ] * SegmentInfo
2022-06-22 19:00:14 +08:00
free := segment . GetMaxRowNum ( ) - segment . GetNumOfRows ( )
2022-06-15 23:14:10 +08:00
// for small segment merge, we pick one largest segment and merge as much as small segment together with it
// Why reverse? try to merge as many segments as expected.
// for instance, if a 255M and 255M is the largest small candidates, they will never be merged because of the MinSegmentToMerge limit.
2022-12-07 18:01:19 +08:00
smallCandidates , result , _ = reverseGreedySelect ( smallCandidates , free , Params . DataCoordCfg . MaxSegmentToMerge . GetAsInt ( ) - 1 )
2022-06-15 23:14:10 +08:00
bucket = append ( bucket , result ... )
2022-02-18 14:51:49 +08:00
2022-06-22 19:00:14 +08:00
var size int64
var targetRow int64
for _ , s := range bucket {
size += s . getSegmentSize ( )
targetRow += s . GetNumOfRows ( )
}
// only merge if candidate number is large than MinSegmentToMerge or if target row is large enough
2022-12-09 16:03:20 +08:00
if len ( bucket ) >= Params . DataCoordCfg . MinSegmentToMerge . GetAsInt ( ) ||
2022-12-16 10:01:23 +08:00
len ( bucket ) > 1 &&
targetRow > int64 ( float64 ( segment . GetMaxRowNum ( ) ) * Params . DataCoordCfg . SegmentCompactableProportion . GetAsFloat ( ) ) {
2022-06-22 19:00:14 +08:00
plan := segmentsToPlan ( bucket , compactTime )
2022-12-09 16:03:20 +08:00
log . Info ( "generate a plan for small candidates" ,
zap . Int64s ( "plan segment IDs" , lo . Map ( bucket , getSegmentIDs ) ) ,
zap . Int64 ( "target segment row" , targetRow ) ,
zap . Int64 ( "target segment size" , size ) )
2022-06-22 19:00:14 +08:00
plans = append ( plans , plan )
2022-12-09 16:03:20 +08:00
} else {
remainingSmallSegs = append ( remainingSmallSegs , bucket ... )
}
}
// Try adding remaining segments to existing plans.
for i := len ( remainingSmallSegs ) - 1 ; i >= 0 ; i -- {
s := remainingSmallSegs [ i ]
if ! isExpandableSmallSegment ( s ) {
continue
}
// Try squeeze this segment into existing plans. This could cause segment size to exceed maxSize.
for _ , plan := range plans {
if plan . TotalRows + s . GetNumOfRows ( ) <= int64 ( Params . DataCoordCfg . SegmentExpansionRate . GetAsFloat ( ) * float64 ( s . GetMaxRowNum ( ) ) ) {
segmentBinLogs := & datapb . CompactionSegmentBinlogs {
SegmentID : s . GetID ( ) ,
FieldBinlogs : s . GetBinlogs ( ) ,
Field2StatslogPaths : s . GetStatslogs ( ) ,
Deltalogs : s . GetDeltalogs ( ) ,
}
plan . TotalRows += s . GetNumOfRows ( )
plan . SegmentBinlogs = append ( plan . SegmentBinlogs , segmentBinLogs )
log . Info ( "small segment appended on existing plan" ,
zap . Int64 ( "segment ID" , s . GetID ( ) ) ,
zap . Int64 ( "target rows" , plan . GetTotalRows ( ) ) ,
zap . Int64s ( "plan segment ID" , getSegIDsFromPlan ( plan ) ) ,
)
remainingSmallSegs = append ( remainingSmallSegs [ : i ] , remainingSmallSegs [ i + 1 : ] ... )
break
}
}
}
// If there are still remaining small segments, try adding them to non-planned segments.
for _ , npSeg := range nonPlannedSegments {
bucket := [ ] * SegmentInfo { npSeg }
targetRow := npSeg . GetNumOfRows ( )
for i := len ( remainingSmallSegs ) - 1 ; i >= 0 ; i -- {
// Note: could also simply use MaxRowNum as limit.
if targetRow + remainingSmallSegs [ i ] . GetNumOfRows ( ) <=
int64 ( Params . DataCoordCfg . SegmentExpansionRate . GetAsFloat ( ) * float64 ( npSeg . GetMaxRowNum ( ) ) ) {
bucket = append ( bucket , remainingSmallSegs [ i ] )
targetRow += remainingSmallSegs [ i ] . GetNumOfRows ( )
remainingSmallSegs = append ( remainingSmallSegs [ : i ] , remainingSmallSegs [ i + 1 : ] ... )
}
}
if len ( bucket ) > 1 {
plan := segmentsToPlan ( bucket , compactTime )
plans = append ( plans , plan )
log . Info ( "generate a plan for to squeeze small candidates into non-planned segment" ,
zap . Int64s ( "plan segment IDs" , lo . Map ( bucket , getSegmentIDs ) ) ,
zap . Int64 ( "target segment row" , targetRow ) ,
)
2022-06-15 23:14:10 +08:00
}
2022-02-18 14:51:49 +08:00
}
return plans
}
2022-06-15 23:14:10 +08:00
func segmentsToPlan ( segments [ ] * SegmentInfo , compactTime * compactTime ) * datapb . CompactionPlan {
2022-02-18 14:51:49 +08:00
plan := & datapb . CompactionPlan {
2022-10-10 20:31:22 +08:00
Timetravel : compactTime . travelTime ,
Type : datapb . CompactionType_MixCompaction ,
Channel : segments [ 0 ] . GetInsertChannel ( ) ,
CollectionTtl : compactTime . collectionTTL . Nanoseconds ( ) ,
2022-02-18 14:51:49 +08:00
}
for _ , s := range segments {
segmentBinlogs := & datapb . CompactionSegmentBinlogs {
SegmentID : s . GetID ( ) ,
FieldBinlogs : s . GetBinlogs ( ) ,
Field2StatslogPaths : s . GetStatslogs ( ) ,
Deltalogs : s . GetDeltalogs ( ) ,
}
2022-12-09 16:03:20 +08:00
plan . TotalRows += s . GetNumOfRows ( )
2022-02-18 14:51:49 +08:00
plan . SegmentBinlogs = append ( plan . SegmentBinlogs , segmentBinlogs )
}
return plan
}
2022-06-15 23:14:10 +08:00
func greedySelect ( candidates [ ] * SegmentInfo , free int64 , maxSegment int ) ( [ ] * SegmentInfo , [ ] * SegmentInfo , int64 ) {
var result [ ] * SegmentInfo
for i := 0 ; i < len ( candidates ) ; {
candidate := candidates [ i ]
2022-06-22 19:00:14 +08:00
if len ( result ) < maxSegment && candidate . GetNumOfRows ( ) < free {
2022-06-15 23:14:10 +08:00
result = append ( result , candidate )
2022-06-22 19:00:14 +08:00
free -= candidate . GetNumOfRows ( )
2022-06-15 23:14:10 +08:00
candidates = append ( candidates [ : i ] , candidates [ i + 1 : ] ... )
} else {
i ++
}
}
return candidates , result , free
}
func reverseGreedySelect ( candidates [ ] * SegmentInfo , free int64 , maxSegment int ) ( [ ] * SegmentInfo , [ ] * SegmentInfo , int64 ) {
2022-02-18 14:51:49 +08:00
var result [ ] * SegmentInfo
2022-06-15 23:14:10 +08:00
for i := len ( candidates ) - 1 ; i >= 0 ; i -- {
candidate := candidates [ i ]
2022-06-22 19:00:14 +08:00
if ( len ( result ) < maxSegment ) && ( candidate . GetNumOfRows ( ) < free ) {
2022-06-15 23:14:10 +08:00
result = append ( result , candidate )
2022-06-22 19:00:14 +08:00
free -= candidate . GetNumOfRows ( )
2022-06-15 23:14:10 +08:00
candidates = append ( candidates [ : i ] , candidates [ i + 1 : ] ... )
}
2022-02-18 14:51:49 +08:00
}
2022-06-15 23:14:10 +08:00
return candidates , result , free
2021-11-05 22:25:00 +08:00
}
func ( t * compactionTrigger ) getCandidateSegments ( channel string , partitionID UniqueID ) [ ] * SegmentInfo {
segments := t . meta . GetSegmentsByChannel ( channel )
2022-02-18 14:51:49 +08:00
var res [ ] * SegmentInfo
2021-11-05 22:25:00 +08:00
for _ , s := range segments {
2022-09-22 18:34:50 +08:00
if ! isSegmentHealthy ( s ) ||
! isFlush ( s ) ||
s . GetInsertChannel ( ) != channel ||
s . GetPartitionID ( ) != partitionID ||
2022-09-26 18:06:54 +08:00
s . isCompacting ||
s . GetIsImporting ( ) {
2021-11-05 22:25:00 +08:00
continue
}
res = append ( res , s )
}
2022-09-16 11:32:48 +08:00
2021-11-05 22:25:00 +08:00
return res
}
2022-02-18 14:51:49 +08:00
func ( t * compactionTrigger ) isSmallSegment ( segment * SegmentInfo ) bool {
2022-12-07 18:01:19 +08:00
return segment . GetNumOfRows ( ) < int64 ( float64 ( segment . GetMaxRowNum ( ) ) * Params . DataCoordCfg . SegmentSmallProportion . GetAsFloat ( ) )
2021-11-05 22:25:00 +08:00
}
2022-12-09 16:03:20 +08:00
func isExpandableSmallSegment ( segment * SegmentInfo ) bool {
return segment . GetNumOfRows ( ) < int64 ( float64 ( segment . GetMaxRowNum ( ) ) * ( Params . DataCoordCfg . SegmentExpansionRate . GetAsFloat ( ) - 1 ) )
}
2021-11-05 22:25:00 +08:00
func ( t * compactionTrigger ) fillOriginPlan ( plan * datapb . CompactionPlan ) error {
// TODO context
2021-12-14 13:55:07 +08:00
id , err := t . allocator . allocID ( context . TODO ( ) )
if err != nil {
return err
}
2021-11-05 22:25:00 +08:00
plan . PlanID = id
2022-12-07 18:01:19 +08:00
plan . TimeoutInSeconds = int32 ( Params . DataCoordCfg . CompactionTimeoutInSeconds . GetAsInt ( ) )
2021-11-05 22:25:00 +08:00
return nil
}
2022-10-31 19:13:34 +08:00
func ( t * compactionTrigger ) isStaleSegment ( segment * SegmentInfo ) bool {
return time . Since ( segment . lastFlushTime ) . Minutes ( ) >= segmentTimedFlushDuration
}
2022-12-12 10:33:26 +08:00
func ( t * compactionTrigger ) ShouldDoSingleCompaction ( segment * SegmentInfo , isDiskIndex bool , compactTime * compactTime ) bool {
// no longer restricted binlog numbers because this is now related to field numbers
var binLog int
2022-06-15 23:14:10 +08:00
for _ , binlogs := range segment . GetBinlogs ( ) {
2022-12-12 10:33:26 +08:00
binLog += len ( binlogs . GetBinlogs ( ) )
2022-06-15 23:14:10 +08:00
}
2022-12-12 10:33:26 +08:00
// count all the statlog file count, only for flush generated segments
if len ( segment . CompactionFrom ) == 0 {
var statsLog int
for _ , statsLogs := range segment . GetStatslogs ( ) {
statsLog += len ( statsLogs . GetBinlogs ( ) )
}
var maxSize int
if isDiskIndex {
maxSize = int ( Params . DataCoordCfg . DiskSegmentMaxSize . GetAsInt64 ( ) * 1024 * 1024 / Params . DataNodeCfg . BinLogMaxSize . GetAsInt64 ( ) )
} else {
maxSize = int ( Params . DataCoordCfg . SegmentMaxSize . GetAsInt64 ( ) * 1024 * 1024 / Params . DataNodeCfg . BinLogMaxSize . GetAsInt64 ( ) )
}
// if stats log is more than expected, trigger compaction to reduce stats log size.
// TODO maybe we want to compact to single statslog to reduce watch dml channel cost
// TODO avoid rebuild index twice.
if statsLog > maxSize * 2.0 {
log . Info ( "stats number is too much, trigger compaction" , zap . Int64 ( "segment" , segment . ID ) , zap . Int ( "Bin logs" , binLog ) , zap . Int ( "Stat logs" , statsLog ) )
return true
}
2022-06-15 23:14:10 +08:00
}
2022-12-12 10:33:26 +08:00
var deltaLog int
for _ , deltaLogs := range segment . GetDeltalogs ( ) {
deltaLog += len ( deltaLogs . GetBinlogs ( ) )
2022-06-15 23:14:10 +08:00
}
2022-12-12 10:33:26 +08:00
if deltaLog > Params . DataCoordCfg . SingleCompactionDeltalogMaxNum . GetAsInt ( ) {
log . Info ( "total delta number is too much, trigger compaction" , zap . Int64 ( "segment" , segment . ID ) , zap . Int ( "Bin logs" , binLog ) , zap . Int ( "Delta logs" , deltaLog ) )
2022-06-15 23:14:10 +08:00
return true
}
// if expire time is enabled, put segment into compaction candidate
totalExpiredSize := int64 ( 0 )
2022-06-22 19:00:14 +08:00
totalExpiredRows := 0
2022-06-15 23:14:10 +08:00
for _ , binlogs := range segment . GetBinlogs ( ) {
for _ , l := range binlogs . GetBinlogs ( ) {
// TODO, we should probably estimate expired log entries by total rows in binlog and the ralationship of timeTo, timeFrom and expire time
if l . TimestampTo < compactTime . expireTime {
2022-06-22 19:00:14 +08:00
totalExpiredRows += int ( l . GetEntriesNum ( ) )
2022-06-15 23:14:10 +08:00
totalExpiredSize += l . GetLogSize ( )
}
}
}
2022-12-07 18:01:19 +08:00
if float64 ( totalExpiredRows ) / float64 ( segment . GetNumOfRows ( ) ) >= Params . DataCoordCfg . SingleCompactionRatioThreshold . GetAsFloat ( ) || totalExpiredSize > Params . DataCoordCfg . SingleCompactionExpiredLogMaxSize . GetAsInt64 ( ) {
2022-12-09 16:03:20 +08:00
log . Info ( "total expired entities is too much, trigger compaction" , zap . Int64 ( "segment" , segment . ID ) ,
2022-06-22 19:00:14 +08:00
zap . Int ( "expired rows" , totalExpiredRows ) , zap . Int64 ( "expired log size" , totalExpiredSize ) )
2022-06-15 23:14:10 +08:00
return true
}
2021-11-05 22:25:00 +08:00
// single compaction only merge insert and delta log beyond the timetravel
// segment's insert binlogs dont have time range info, so we wait until the segment's last expire time is less than timetravel
// to ensure that all insert logs is beyond the timetravel.
// TODO: add meta in insert binlog
2022-06-15 23:14:10 +08:00
if segment . LastExpireTime >= compactTime . travelTime {
2021-11-05 22:25:00 +08:00
return false
}
totalDeletedRows := 0
totalDeleteLogSize := int64 ( 0 )
2022-06-15 23:14:10 +08:00
for _ , deltaLogs := range segment . GetDeltalogs ( ) {
for _ , l := range deltaLogs . GetBinlogs ( ) {
if l . TimestampTo < compactTime . travelTime {
2021-12-19 20:00:42 +08:00
totalDeletedRows += int ( l . GetEntriesNum ( ) )
totalDeleteLogSize += l . GetLogSize ( )
}
2021-11-05 22:25:00 +08:00
}
}
// currently delta log size and delete ratio policy is applied
2022-12-07 18:01:19 +08:00
if float64 ( totalDeletedRows ) / float64 ( segment . GetNumOfRows ( ) ) >= Params . DataCoordCfg . SingleCompactionRatioThreshold . GetAsFloat ( ) || totalDeleteLogSize > Params . DataCoordCfg . SingleCompactionDeltaLogMaxSize . GetAsInt64 ( ) {
2022-12-09 16:03:20 +08:00
log . Info ( "total delete entities is too much, trigger compaction" , zap . Int64 ( "segment" , segment . ID ) ,
2022-06-15 23:14:10 +08:00
zap . Int ( "deleted rows" , totalDeletedRows ) , zap . Int64 ( "delete log size" , totalDeleteLogSize ) )
return true
2022-02-18 14:51:49 +08:00
}
return false
}
2021-12-14 13:55:07 +08:00
func isFlush ( segment * SegmentInfo ) bool {
return segment . GetState ( ) == commonpb . SegmentState_Flushed || segment . GetState ( ) == commonpb . SegmentState_Flushing
}
2022-11-15 11:13:07 +08:00
func fetchSegIDs ( segBinLogs [ ] * datapb . CompactionSegmentBinlogs ) [ ] int64 {
var segIDs [ ] int64
for _ , segBinLog := range segBinLogs {
segIDs = append ( segIDs , segBinLog . GetSegmentID ( ) )
}
return segIDs
}