2021-12-01 19:33:32 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-11-05 22:25:00 +08:00
package datacoord
import (
"context"
"fmt"
"sync"
"time"
2023-02-26 11:31:49 +08:00
"github.com/cockroachdb/errors"
2023-11-07 03:18:18 +08:00
"github.com/samber/lo"
2023-04-06 19:14:32 +08:00
"go.uber.org/zap"
2023-02-26 11:31:49 +08:00
2023-06-09 01:28:37 +08:00
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
2021-11-05 22:25:00 +08:00
"github.com/milvus-io/milvus/internal/proto/datapb"
2023-04-06 19:14:32 +08:00
"github.com/milvus-io/milvus/pkg/log"
2023-11-07 03:18:18 +08:00
"github.com/milvus-io/milvus/pkg/util/conc"
2023-09-21 09:45:27 +08:00
"github.com/milvus-io/milvus/pkg/util/tsoutil"
2021-11-05 22:25:00 +08:00
)
// TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple
2022-06-15 23:14:10 +08:00
// TODO we should split compaction into different priorities, small compaction helps to merge segment, large compaction helps to handle delta and expiration of large segments
2021-11-05 22:25:00 +08:00
const (
2023-07-18 14:25:20 +08:00
tsTimeout = uint64 ( 1 )
2021-11-05 22:25:00 +08:00
)
type compactionPlanContext interface {
start ( )
stop ( )
// execCompactionPlan start to execute plan and return immediately
2021-11-11 15:54:42 +08:00
execCompactionPlan ( signal * compactionSignal , plan * datapb . CompactionPlan ) error
2021-11-05 22:25:00 +08:00
// getCompaction return compaction task. If planId does not exist, return nil.
getCompaction ( planID int64 ) * compactionTask
2022-08-23 15:50:52 +08:00
// updateCompaction set the compaction state to timeout or completed
updateCompaction ( ts Timestamp ) error
2021-11-05 22:25:00 +08:00
// isFull return true if the task pool is full
isFull ( ) bool
2021-11-09 14:47:02 +08:00
// get compaction tasks by signal id
getCompactionTasksBySignalID ( signalID int64 ) [ ] * compactionTask
2023-11-29 10:50:29 +08:00
removeTasksByChannel ( channel string )
2021-11-05 22:25:00 +08:00
}
type compactionTaskState int8
const (
executing compactionTaskState = iota + 1
2023-02-15 16:00:33 +08:00
pipelining
2021-11-05 22:25:00 +08:00
completed
2022-08-23 15:50:52 +08:00
failed
2021-11-05 22:25:00 +08:00
timeout
)
var (
errChannelNotWatched = errors . New ( "channel is not watched" )
errChannelInBuffer = errors . New ( "channel is in buffer" )
)
2023-12-05 18:44:37 +08:00
type CompactionMeta interface {
SelectSegments ( selector SegmentInfoSelector ) [ ] * SegmentInfo
GetHealthySegment ( segID UniqueID ) * SegmentInfo
UpdateSegmentsInfo ( operators ... UpdateOperator ) error
SetSegmentCompacting ( segmentID int64 , compacting bool )
PrepareCompleteCompactionMutation ( plan * datapb . CompactionPlan , result * datapb . CompactionPlanResult ) ( [ ] * SegmentInfo , * SegmentInfo , * segMetricMutation , error )
alterMetaStoreAfterCompaction ( segmentCompactTo * SegmentInfo , segmentsCompactFrom [ ] * SegmentInfo ) error
}
var _ CompactionMeta = ( * meta ) ( nil )
2021-11-05 22:25:00 +08:00
type compactionTask struct {
triggerInfo * compactionSignal
plan * datapb . CompactionPlan
state compactionTaskState
dataNodeID int64
2023-11-14 15:56:19 +08:00
result * datapb . CompactionPlanResult
2021-11-05 22:25:00 +08:00
}
func ( t * compactionTask ) shadowClone ( opts ... compactionTaskOpt ) * compactionTask {
task := & compactionTask {
2021-11-11 15:54:42 +08:00
triggerInfo : t . triggerInfo ,
plan : t . plan ,
state : t . state ,
dataNodeID : t . dataNodeID ,
2021-11-05 22:25:00 +08:00
}
for _ , opt := range opts {
opt ( task )
}
return task
}
var _ compactionPlanContext = ( * compactionPlanHandler ) ( nil )
type compactionPlanHandler struct {
2023-11-29 10:50:29 +08:00
mu sync . RWMutex
plans map [ int64 ] * compactionTask // planID -> task
2023-12-05 18:44:37 +08:00
meta CompactionMeta
2023-11-07 03:18:18 +08:00
allocator allocator
2023-11-29 10:50:29 +08:00
chManager * ChannelManager
2023-12-05 18:44:37 +08:00
scheduler Scheduler
2023-12-11 17:52:37 +08:00
sessions SessionManager
2023-11-29 10:50:29 +08:00
stopCh chan struct { }
stopOnce sync . Once
stopWg sync . WaitGroup
2021-11-05 22:25:00 +08:00
}
2023-12-11 17:52:37 +08:00
func newCompactionPlanHandler ( sessions SessionManager , cm * ChannelManager , meta CompactionMeta , allocator allocator ,
2023-09-21 09:45:27 +08:00
) * compactionPlanHandler {
2021-11-05 22:25:00 +08:00
return & compactionPlanHandler {
2023-01-04 19:37:36 +08:00
plans : make ( map [ int64 ] * compactionTask ) ,
chManager : cm ,
meta : meta ,
sessions : sessions ,
allocator : allocator ,
2023-11-23 17:30:25 +08:00
scheduler : NewCompactionScheduler ( ) ,
2021-11-05 22:25:00 +08:00
}
}
2023-12-05 18:44:37 +08:00
func ( c * compactionPlanHandler ) checkResult ( ) {
// deal results
2023-12-22 12:00:43 +08:00
ts , err := c . GetCurrentTS ( )
if err != nil {
log . Warn ( "fail to check result" , zap . Error ( err ) )
return
}
_ = c . updateCompaction ( ts )
}
func ( c * compactionPlanHandler ) GetCurrentTS ( ) ( Timestamp , error ) {
2023-12-13 18:48:39 +08:00
interval := Params . DataCoordCfg . CompactionRPCTimeout . GetAsDuration ( time . Second )
ctx , cancel := context . WithTimeout ( context . Background ( ) , interval )
2023-12-05 18:44:37 +08:00
defer cancel ( )
ts , err := c . allocator . allocTimestamp ( ctx )
if err != nil {
log . Warn ( "unable to alloc timestamp" , zap . Error ( err ) )
2023-12-22 12:00:43 +08:00
return 0 , err
2023-12-05 18:44:37 +08:00
}
2023-12-22 12:00:43 +08:00
return ts , nil
2023-12-05 18:44:37 +08:00
}
func ( c * compactionPlanHandler ) schedule ( ) {
// schedule queuing tasks
tasks := c . scheduler . Schedule ( )
if len ( tasks ) > 0 {
c . notifyTasks ( tasks )
c . scheduler . LogStatus ( )
}
}
2021-11-05 22:25:00 +08:00
func ( c * compactionPlanHandler ) start ( ) {
2022-12-07 18:01:19 +08:00
interval := Params . DataCoordCfg . CompactionCheckIntervalInSeconds . GetAsDuration ( time . Second )
2023-11-29 10:50:29 +08:00
c . stopCh = make ( chan struct { } )
2023-12-22 12:00:43 +08:00
c . stopWg . Add ( 3 )
2021-11-05 22:25:00 +08:00
go func ( ) {
2023-11-29 10:50:29 +08:00
defer c . stopWg . Done ( )
2023-11-07 03:18:18 +08:00
checkResultTicker := time . NewTicker ( interval )
2023-11-29 10:50:29 +08:00
log . Info ( "Compaction handler check result loop start" , zap . Any ( "check result interval" , interval ) )
2023-11-07 03:18:18 +08:00
defer checkResultTicker . Stop ( )
2021-11-05 22:25:00 +08:00
for {
select {
2023-11-29 10:50:29 +08:00
case <- c . stopCh :
log . Info ( "compaction handler check result loop quit" )
2021-11-05 22:25:00 +08:00
return
2023-11-07 03:18:18 +08:00
case <- checkResultTicker . C :
2023-12-05 18:44:37 +08:00
c . checkResult ( )
2023-11-29 10:50:29 +08:00
}
}
} ( )
// saperate check results and schedule goroutine so that check results doesn't
// influence the schedule
go func ( ) {
defer c . stopWg . Done ( )
scheduleTicker := time . NewTicker ( 200 * time . Millisecond )
defer scheduleTicker . Stop ( )
log . Info ( "compaction handler start schedule" )
for {
select {
case <- c . stopCh :
log . Info ( "Compaction handler quit schedule" )
return
2023-11-07 03:18:18 +08:00
case <- scheduleTicker . C :
2023-12-05 18:44:37 +08:00
c . schedule ( )
2021-11-05 22:25:00 +08:00
}
}
} ( )
2023-12-22 12:00:43 +08:00
go func ( ) {
defer c . stopWg . Done ( )
cleanTicker := time . NewTicker ( 30 * time . Minute )
defer cleanTicker . Stop ( )
for {
select {
case <- c . stopCh :
log . Info ( "Compaction handler quit clean" )
return
case <- cleanTicker . C :
c . Clean ( )
}
}
} ( )
}
func ( c * compactionPlanHandler ) Clean ( ) {
current := tsoutil . GetCurrentTime ( )
c . mu . Lock ( )
defer c . mu . Unlock ( )
for id , task := range c . plans {
if task . state == executing || task . state == pipelining {
continue
}
// after timeout + 1h, the plan will be cleaned
if c . isTimeout ( current , task . plan . GetStartTime ( ) , task . plan . GetTimeoutInSeconds ( ) + 60 * 60 ) {
delete ( c . plans , id )
}
}
2021-11-05 22:25:00 +08:00
}
func ( c * compactionPlanHandler ) stop ( ) {
2023-11-29 10:50:29 +08:00
c . stopOnce . Do ( func ( ) {
close ( c . stopCh )
} )
c . stopWg . Wait ( )
}
func ( c * compactionPlanHandler ) removeTasksByChannel ( channel string ) {
c . mu . Lock ( )
defer c . mu . Unlock ( )
for id , task := range c . plans {
if task . triggerInfo . channel == channel {
2023-12-05 18:44:37 +08:00
log . Info ( "Compaction handler removing tasks by channel" ,
zap . String ( "channel" , channel ) ,
zap . Int64 ( "planID" , task . plan . GetPlanID ( ) ) ,
zap . Int64 ( "node" , task . dataNodeID ) ,
)
c . scheduler . Finish ( task . dataNodeID , task . plan . PlanID )
2023-11-29 10:50:29 +08:00
delete ( c . plans , id )
}
}
2021-11-05 22:25:00 +08:00
}
2023-02-15 16:00:33 +08:00
func ( c * compactionPlanHandler ) updateTask ( planID int64 , opts ... compactionTaskOpt ) {
c . mu . Lock ( )
defer c . mu . Unlock ( )
2023-12-05 18:44:37 +08:00
if plan , ok := c . plans [ planID ] ; ok {
c . plans [ planID ] = plan . shadowClone ( opts ... )
2023-11-29 18:54:27 +08:00
}
2023-02-15 16:00:33 +08:00
}
2023-11-07 03:18:18 +08:00
func ( c * compactionPlanHandler ) enqueuePlan ( signal * compactionSignal , plan * datapb . CompactionPlan ) error {
2021-11-05 22:25:00 +08:00
nodeID , err := c . chManager . FindWatcher ( plan . GetChannel ( ) )
if err != nil {
2023-08-30 11:12:27 +08:00
log . Error ( "failed to find watcher" , zap . Int64 ( "planID" , plan . GetPlanID ( ) ) , zap . Error ( err ) )
2021-11-05 22:25:00 +08:00
return err
}
2023-08-30 11:12:27 +08:00
log := log . With ( zap . Int64 ( "planID" , plan . GetPlanID ( ) ) , zap . Int64 ( "nodeID" , nodeID ) )
2021-11-05 22:25:00 +08:00
c . setSegmentsCompacting ( plan , true )
2022-08-26 14:22:55 +08:00
task := & compactionTask {
triggerInfo : signal ,
plan : plan ,
2023-02-15 16:00:33 +08:00
state : pipelining ,
2022-08-26 14:22:55 +08:00
dataNodeID : nodeID ,
}
2023-11-07 03:18:18 +08:00
c . mu . Lock ( )
2022-08-26 14:22:55 +08:00
c . plans [ plan . PlanID ] = task
2023-11-07 03:18:18 +08:00
c . mu . Unlock ( )
2022-08-26 14:22:55 +08:00
2023-11-23 17:30:25 +08:00
c . scheduler . Submit ( task )
2023-11-07 03:18:18 +08:00
log . Info ( "Compaction plan submited" )
2021-11-05 22:25:00 +08:00
return nil
}
2023-11-23 17:30:25 +08:00
func ( c * compactionPlanHandler ) RefreshPlan ( task * compactionTask ) {
plan := task . plan
2023-12-05 18:44:37 +08:00
log := log . With ( zap . Int64 ( "taskID" , task . triggerInfo . id ) , zap . Int64 ( "planID" , plan . GetPlanID ( ) ) )
2023-11-23 17:30:25 +08:00
if plan . GetType ( ) == datapb . CompactionType_Level0DeleteCompaction {
sealedSegments := c . meta . SelectSegments ( func ( info * SegmentInfo ) bool {
return info . GetCollectionID ( ) == task . triggerInfo . collectionID &&
2023-12-05 18:44:37 +08:00
( task . triggerInfo . partitionID == - 1 || info . GetPartitionID ( ) == task . triggerInfo . partitionID ) &&
2023-11-23 17:30:25 +08:00
info . GetInsertChannel ( ) == plan . GetChannel ( ) &&
isFlushState ( info . GetState ( ) ) &&
! info . isCompacting &&
! info . GetIsImporting ( ) &&
info . GetLevel ( ) != datapb . SegmentLevel_L0 &&
info . GetDmlPosition ( ) . GetTimestamp ( ) < task . triggerInfo . pos . GetTimestamp ( )
} )
sealedSegBinlogs := lo . Map ( sealedSegments , func ( info * SegmentInfo , _ int ) * datapb . CompactionSegmentBinlogs {
return & datapb . CompactionSegmentBinlogs {
SegmentID : info . GetID ( ) ,
Level : datapb . SegmentLevel_L1 ,
}
} )
plan . SegmentBinlogs = append ( plan . SegmentBinlogs , sealedSegBinlogs ... )
2023-12-05 18:44:37 +08:00
log . Info ( "Compaction handler refreshed level zero compaction plan" , zap . Any ( "target segments" , sealedSegBinlogs ) )
2023-11-23 17:30:25 +08:00
return
}
if plan . GetType ( ) == datapb . CompactionType_MixCompaction {
for _ , seg := range plan . GetSegmentBinlogs ( ) {
2023-12-05 18:44:37 +08:00
if info := c . meta . GetHealthySegment ( seg . GetSegmentID ( ) ) ; info != nil {
seg . Deltalogs = info . GetDeltalogs ( )
}
2023-11-23 17:30:25 +08:00
}
2023-12-13 18:48:39 +08:00
log . Info ( "Compaction handler refreshed mix compaction plan" )
2023-11-23 17:30:25 +08:00
return
}
}
2023-11-07 03:18:18 +08:00
func ( c * compactionPlanHandler ) notifyTasks ( tasks [ ] * compactionTask ) {
for _ , task := range tasks {
2023-11-14 16:38:18 +08:00
// avoid closure capture iteration variable
innerTask := task
2023-11-23 17:30:25 +08:00
c . RefreshPlan ( innerTask )
2023-11-07 03:18:18 +08:00
getOrCreateIOPool ( ) . Submit ( func ( ) ( any , error ) {
2023-11-14 16:38:18 +08:00
plan := innerTask . plan
log := log . With ( zap . Int64 ( "planID" , plan . GetPlanID ( ) ) , zap . Int64 ( "nodeID" , innerTask . dataNodeID ) )
2023-11-07 03:18:18 +08:00
log . Info ( "Notify compaction task to DataNode" )
ts , err := c . allocator . allocTimestamp ( context . TODO ( ) )
if err != nil {
log . Warn ( "Alloc start time for CompactionPlan failed" , zap . Error ( err ) )
// update plan ts to TIMEOUT ts
c . updateTask ( plan . PlanID , setState ( executing ) , setStartTime ( tsTimeout ) )
return nil , err
}
2023-11-14 16:38:18 +08:00
c . updateTask ( plan . PlanID , setStartTime ( ts ) )
err = c . sessions . Compaction ( innerTask . dataNodeID , plan )
c . updateTask ( plan . PlanID , setState ( executing ) )
2023-11-07 03:18:18 +08:00
if err != nil {
log . Warn ( "Failed to notify compaction tasks to DataNode" , zap . Error ( err ) )
return nil , err
}
log . Info ( "Compaction start" )
return nil , nil
} )
}
}
// execCompactionPlan start to execute plan and return immediately
func ( c * compactionPlanHandler ) execCompactionPlan ( signal * compactionSignal , plan * datapb . CompactionPlan ) error {
return c . enqueuePlan ( signal , plan )
}
2021-11-05 22:25:00 +08:00
func ( c * compactionPlanHandler ) setSegmentsCompacting ( plan * datapb . CompactionPlan , compacting bool ) {
for _ , segmentBinlogs := range plan . GetSegmentBinlogs ( ) {
c . meta . SetSegmentCompacting ( segmentBinlogs . GetSegmentID ( ) , compacting )
}
}
2022-08-23 15:50:52 +08:00
// complete a compaction task
// not threadsafe, only can be used internally
2023-11-14 15:56:19 +08:00
func ( c * compactionPlanHandler ) completeCompaction ( result * datapb . CompactionPlanResult ) error {
2021-11-05 22:25:00 +08:00
planID := result . PlanID
if _ , ok := c . plans [ planID ] ; ! ok {
return fmt . Errorf ( "plan %d is not found" , planID )
}
if c . plans [ planID ] . state != executing {
return fmt . Errorf ( "plan %d's state is %v" , planID , c . plans [ planID ] . state )
}
plan := c . plans [ planID ] . plan
2023-11-07 03:18:18 +08:00
nodeID := c . plans [ planID ] . dataNodeID
2023-12-05 18:44:37 +08:00
defer c . scheduler . Finish ( nodeID , plan . PlanID )
2021-11-05 22:25:00 +08:00
switch plan . GetType ( ) {
2022-02-18 14:51:49 +08:00
case datapb . CompactionType_MergeCompaction , datapb . CompactionType_MixCompaction :
2022-08-25 15:48:54 +08:00
if err := c . handleMergeCompactionResult ( plan , result ) ; err != nil {
2021-11-05 22:25:00 +08:00
return err
}
2023-12-05 18:44:37 +08:00
case datapb . CompactionType_Level0DeleteCompaction :
if err := c . handleL0CompactionResult ( plan , result ) ; err != nil {
return err
}
2021-11-05 22:25:00 +08:00
default :
return errors . New ( "unknown compaction type" )
}
2023-11-14 15:56:19 +08:00
UpdateCompactionSegmentSizeMetrics ( result . GetSegments ( ) )
2023-12-22 12:00:43 +08:00
c . plans [ planID ] = c . plans [ planID ] . shadowClone ( setState ( completed ) , setResult ( result ) , cleanLogPath ( ) )
2021-11-05 22:25:00 +08:00
return nil
}
2023-12-05 18:44:37 +08:00
func ( c * compactionPlanHandler ) handleL0CompactionResult ( plan * datapb . CompactionPlan , result * datapb . CompactionPlanResult ) error {
var operators [ ] UpdateOperator
for _ , seg := range result . GetSegments ( ) {
operators = append ( operators , UpdateBinlogsOperator ( seg . GetSegmentID ( ) , nil , nil , seg . GetDeltalogs ( ) ) )
}
levelZeroSegments := lo . Filter ( plan . GetSegmentBinlogs ( ) , func ( b * datapb . CompactionSegmentBinlogs , _ int ) bool {
return b . GetLevel ( ) == datapb . SegmentLevel_L0
} )
for _ , seg := range levelZeroSegments {
operators = append ( operators , UpdateStatusOperator ( seg . SegmentID , commonpb . SegmentState_Dropped ) )
}
log . Info ( "meta update: update segments info for level zero compaction" ,
zap . Int64 ( "planID" , plan . GetPlanID ( ) ) ,
)
return c . meta . UpdateSegmentsInfo ( operators ... )
}
2023-11-14 15:56:19 +08:00
func ( c * compactionPlanHandler ) handleMergeCompactionResult ( plan * datapb . CompactionPlan , result * datapb . CompactionPlanResult ) error {
2022-09-27 16:02:53 +08:00
log := log . With ( zap . Int64 ( "planID" , plan . GetPlanID ( ) ) )
2023-11-29 10:50:29 +08:00
if len ( result . GetSegments ( ) ) == 0 || len ( result . GetSegments ( ) ) > 1 {
// should never happen
log . Warn ( "illegal compaction results" )
return fmt . Errorf ( "Illegal compaction results: %v" , result )
}
2022-09-27 16:02:53 +08:00
2023-11-29 10:50:29 +08:00
// Merge compaction has one and only one segment
newSegmentInfo := c . meta . GetHealthySegment ( result . GetSegments ( ) [ 0 ] . SegmentID )
if newSegmentInfo != nil {
log . Info ( "meta has already been changed, skip meta change and retry sync segments" )
} else {
// Also prepare metric updates.
2023-12-05 18:44:37 +08:00
modSegments , newSegment , metricMutation , err := c . meta . PrepareCompleteCompactionMutation ( plan , result )
2023-11-29 10:50:29 +08:00
if err != nil {
return err
}
if err := c . meta . alterMetaStoreAfterCompaction ( newSegment , modSegments ) ; err != nil {
log . Warn ( "fail to alert meta store" , zap . Error ( err ) )
return err
}
// Apply metrics after successful meta update.
metricMutation . commit ( )
newSegmentInfo = newSegment
2022-09-27 16:02:53 +08:00
}
2023-09-21 09:45:27 +08:00
nodeID := c . plans [ plan . GetPlanID ( ) ] . dataNodeID
2022-09-27 16:02:53 +08:00
req := & datapb . SyncSegmentsRequest {
PlanID : plan . PlanID ,
2023-11-29 10:50:29 +08:00
CompactedTo : newSegmentInfo . GetID ( ) ,
CompactedFrom : newSegmentInfo . GetCompactionFrom ( ) ,
NumOfRows : newSegmentInfo . GetNumOfRows ( ) ,
StatsLogs : newSegmentInfo . GetStatslogs ( ) ,
2023-11-07 10:06:17 +08:00
ChannelName : plan . GetChannel ( ) ,
2023-11-29 10:50:29 +08:00
PartitionId : newSegmentInfo . GetPartitionID ( ) ,
CollectionId : newSegmentInfo . GetCollectionID ( ) ,
2022-09-27 16:02:53 +08:00
}
2022-10-25 19:31:30 +08:00
log . Info ( "handleCompactionResult: syncing segments with node" , zap . Int64 ( "nodeID" , nodeID ) )
2022-09-27 16:02:53 +08:00
if err := c . sessions . SyncSegments ( nodeID , req ) ; err != nil {
2023-11-29 10:50:29 +08:00
log . Warn ( "handleCompactionResult: fail to sync segments with node" ,
2023-03-17 17:27:56 +08:00
zap . Int64 ( "nodeID" , nodeID ) , zap . Error ( err ) )
return err
2022-09-27 16:02:53 +08:00
}
log . Info ( "handleCompactionResult: success to handle merge compaction result" )
return nil
2021-11-05 22:25:00 +08:00
}
// getCompaction return compaction task. If planId does not exist, return nil.
func ( c * compactionPlanHandler ) getCompaction ( planID int64 ) * compactionTask {
c . mu . RLock ( )
defer c . mu . RUnlock ( )
return c . plans [ planID ]
}
// expireCompaction set the compaction state to expired
2022-08-23 15:50:52 +08:00
func ( c * compactionPlanHandler ) updateCompaction ( ts Timestamp ) error {
2023-08-30 11:12:27 +08:00
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
2023-08-01 08:55:04 +08:00
// for DC might add new task while GetCompactionState.
2023-08-30 11:12:27 +08:00
executingTasks := c . getTasksByState ( executing )
timeoutTasks := c . getTasksByState ( timeout )
2023-11-14 15:56:19 +08:00
planStates := c . sessions . GetCompactionPlansResults ( )
2022-08-23 15:50:52 +08:00
2021-11-05 22:25:00 +08:00
c . mu . Lock ( )
defer c . mu . Unlock ( )
2023-08-30 11:12:27 +08:00
for _ , task := range executingTasks {
2023-11-14 15:56:19 +08:00
planResult , ok := planStates [ task . plan . PlanID ]
state := planResult . GetState ( )
2022-08-23 15:50:52 +08:00
planID := task . plan . PlanID
2023-05-04 12:22:40 +08:00
// check whether the state of CompactionPlan is working
2022-08-23 15:50:52 +08:00
if ok {
if state == commonpb . CompactionState_Completed {
2023-11-07 03:18:18 +08:00
log . Info ( "complete compaction" , zap . Int64 ( "planID" , planID ) , zap . Int64 ( "nodeID" , task . dataNodeID ) )
2023-11-14 15:56:19 +08:00
err := c . completeCompaction ( planResult )
2023-06-19 14:18:41 +08:00
if err != nil {
log . Warn ( "fail to complete compaction" , zap . Int64 ( "planID" , planID ) , zap . Int64 ( "nodeID" , task . dataNodeID ) , zap . Error ( err ) )
}
2022-08-23 15:50:52 +08:00
continue
}
2022-08-26 14:22:55 +08:00
// check wether the CompactionPlan is timeout
if state == commonpb . CompactionState_Executing && ! c . isTimeout ( ts , task . plan . GetStartTime ( ) , task . plan . GetTimeoutInSeconds ( ) ) {
continue
}
2023-07-14 15:56:31 +08:00
log . Warn ( "compaction timeout" ,
2022-08-26 14:22:55 +08:00
zap . Int64 ( "planID" , task . plan . PlanID ) ,
zap . Int64 ( "nodeID" , task . dataNodeID ) ,
zap . Uint64 ( "startTime" , task . plan . GetStartTime ( ) ) ,
zap . Uint64 ( "now" , ts ) ,
)
2022-08-23 15:50:52 +08:00
c . plans [ planID ] = c . plans [ planID ] . shadowClone ( setState ( timeout ) )
2022-08-26 14:22:55 +08:00
continue
2021-11-05 22:25:00 +08:00
}
2022-08-26 14:22:55 +08:00
log . Info ( "compaction failed" , zap . Int64 ( "planID" , task . plan . PlanID ) , zap . Int64 ( "nodeID" , task . dataNodeID ) )
2022-08-23 15:50:52 +08:00
c . plans [ planID ] = c . plans [ planID ] . shadowClone ( setState ( failed ) )
2021-11-05 22:25:00 +08:00
c . setSegmentsCompacting ( task . plan , false )
2023-12-05 18:44:37 +08:00
c . scheduler . Finish ( task . dataNodeID , task . plan . PlanID )
2021-11-05 22:25:00 +08:00
}
2023-08-30 11:12:27 +08:00
// Timeout tasks will be timeout and failed in DataNode
// need to wait for DataNode reporting failure and
// clean the status.
for _ , task := range timeoutTasks {
stateResult , ok := planStates [ task . plan . PlanID ]
planID := task . plan . PlanID
if ! ok {
log . Info ( "compaction failed for timeout" , zap . Int64 ( "planID" , task . plan . PlanID ) , zap . Int64 ( "nodeID" , task . dataNodeID ) )
c . plans [ planID ] = c . plans [ planID ] . shadowClone ( setState ( failed ) )
c . setSegmentsCompacting ( task . plan , false )
2023-12-05 18:44:37 +08:00
c . scheduler . Finish ( task . dataNodeID , task . plan . PlanID )
2023-08-30 11:12:27 +08:00
}
// DataNode will check if plan's are timeout but not as sensitive as DataCoord,
// just wait another round.
if ok && stateResult . GetState ( ) == commonpb . CompactionState_Executing {
log . Info ( "compaction timeout in DataCoord yet DataNode is still running" ,
zap . Int64 ( "planID" , planID ) ,
zap . Int64 ( "nodeID" , task . dataNodeID ) )
continue
}
}
2021-11-05 22:25:00 +08:00
return nil
}
func ( c * compactionPlanHandler ) isTimeout ( now Timestamp , start Timestamp , timeout int32 ) bool {
2021-12-17 19:14:59 +08:00
startTime , _ := tsoutil . ParseTS ( start )
2021-11-05 22:25:00 +08:00
ts , _ := tsoutil . ParseTS ( now )
2021-12-17 19:14:59 +08:00
return int32 ( ts . Sub ( startTime ) . Seconds ( ) ) >= timeout
2021-11-05 22:25:00 +08:00
}
// isFull return true if the task pool is full
func ( c * compactionPlanHandler ) isFull ( ) bool {
2023-12-05 18:44:37 +08:00
return c . scheduler . GetTaskCount ( ) >= Params . DataCoordCfg . CompactionMaxParallelTasks . GetAsInt ( )
2021-11-05 22:25:00 +08:00
}
2023-08-30 11:12:27 +08:00
func ( c * compactionPlanHandler ) getTasksByState ( state compactionTaskState ) [ ] * compactionTask {
2023-08-01 08:55:04 +08:00
c . mu . RLock ( )
defer c . mu . RUnlock ( )
2021-11-05 22:25:00 +08:00
tasks := make ( [ ] * compactionTask , 0 , len ( c . plans ) )
for _ , plan := range c . plans {
2023-08-30 11:12:27 +08:00
if plan . state == state {
2021-11-05 22:25:00 +08:00
tasks = append ( tasks , plan )
}
}
return tasks
}
2022-08-23 15:50:52 +08:00
// get compaction tasks by signal id; if signalID == 0 return all tasks
2021-11-09 14:47:02 +08:00
func ( c * compactionPlanHandler ) getCompactionTasksBySignalID ( signalID int64 ) [ ] * compactionTask {
2021-11-05 22:25:00 +08:00
c . mu . RLock ( )
defer c . mu . RUnlock ( )
2021-11-09 14:47:02 +08:00
var tasks [ ] * compactionTask
2021-11-05 22:25:00 +08:00
for _ , t := range c . plans {
2022-08-23 15:50:52 +08:00
if signalID == 0 {
tasks = append ( tasks , t )
continue
}
2021-11-05 22:25:00 +08:00
if t . triggerInfo . id != signalID {
continue
}
2021-11-09 14:47:02 +08:00
tasks = append ( tasks , t )
2021-11-05 22:25:00 +08:00
}
2021-11-09 14:47:02 +08:00
return tasks
2021-11-05 22:25:00 +08:00
}
type compactionTaskOpt func ( task * compactionTask )
func setState ( state compactionTaskState ) compactionTaskOpt {
return func ( task * compactionTask ) {
task . state = state
}
}
2021-11-09 14:47:02 +08:00
2023-02-15 16:00:33 +08:00
func setStartTime ( startTime uint64 ) compactionTaskOpt {
return func ( task * compactionTask ) {
task . plan . StartTime = startTime
}
}
2023-11-14 15:56:19 +08:00
func setResult ( result * datapb . CompactionPlanResult ) compactionTaskOpt {
2021-11-09 14:47:02 +08:00
return func ( task * compactionTask ) {
task . result = result
}
}
2022-08-23 15:50:52 +08:00
2023-12-22 12:00:43 +08:00
// cleanLogPath clean the log info in the compactionTask object for avoiding the memory leak
func cleanLogPath ( ) compactionTaskOpt {
return func ( task * compactionTask ) {
if task . plan . GetSegmentBinlogs ( ) != nil {
for _ , binlogs := range task . plan . GetSegmentBinlogs ( ) {
binlogs . FieldBinlogs = nil
binlogs . Field2StatslogPaths = nil
binlogs . Deltalogs = nil
}
}
if task . result . GetSegments ( ) != nil {
for _ , segment := range task . result . GetSegments ( ) {
segment . InsertLogs = nil
segment . Deltalogs = nil
segment . Field2StatslogPaths = nil
}
}
}
}
2022-08-23 15:50:52 +08:00
// 0.5*min(8, NumCPU/2)
func calculateParallel ( ) int {
2023-07-24 10:17:00 +08:00
// TODO after node memory management enabled, use this config as hard limit
return Params . DataCoordCfg . CompactionWorkerParalleTasks . GetAsInt ( )
2023-11-06 06:02:16 +08:00
//cores := hardware.GetCPUNum()
2022-08-23 15:50:52 +08:00
//if cores < 16 {
//return 4
//}
//return cores / 2
}
2023-11-07 03:18:18 +08:00
var (
ioPool * conc . Pool [ any ]
ioPoolInitOnce sync . Once
)
func initIOPool ( ) {
capacity := Params . DataNodeCfg . IOConcurrency . GetAsInt ( )
if capacity > 32 {
capacity = 32
}
// error only happens with negative expiry duration or with negative pre-alloc size.
ioPool = conc . NewPool [ any ] ( capacity )
}
func getOrCreateIOPool ( ) * conc . Pool [ any ] {
ioPoolInitOnce . Do ( initIOPool )
return ioPool
}