2021-12-01 19:33:32 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-11-05 22:25:00 +08:00
package datacoord
import (
"context"
"fmt"
"sync"
"time"
2023-02-26 11:31:49 +08:00
"github.com/cockroachdb/errors"
2023-11-07 03:18:18 +08:00
"github.com/samber/lo"
2024-01-23 10:37:00 +08:00
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
2023-04-06 19:14:32 +08:00
"go.uber.org/zap"
2023-02-26 11:31:49 +08:00
2023-06-09 01:28:37 +08:00
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
2021-11-05 22:25:00 +08:00
"github.com/milvus-io/milvus/internal/proto/datapb"
2023-04-06 19:14:32 +08:00
"github.com/milvus-io/milvus/pkg/log"
2024-01-30 10:09:03 +08:00
"github.com/milvus-io/milvus/pkg/tracer"
2023-11-07 03:18:18 +08:00
"github.com/milvus-io/milvus/pkg/util/conc"
2024-03-06 21:36:59 +08:00
"github.com/milvus-io/milvus/pkg/util/merr"
2023-09-21 09:45:27 +08:00
"github.com/milvus-io/milvus/pkg/util/tsoutil"
2024-01-23 10:37:00 +08:00
"github.com/milvus-io/milvus/pkg/util/typeutil"
2021-11-05 22:25:00 +08:00
)
// TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple
2022-06-15 23:14:10 +08:00
// TODO we should split compaction into different priorities, small compaction helps to merge segment, large compaction helps to handle delta and expiration of large segments
2021-11-05 22:25:00 +08:00
const (
2023-07-18 14:25:20 +08:00
tsTimeout = uint64 ( 1 )
2021-11-05 22:25:00 +08:00
)
type compactionPlanContext interface {
start ( )
stop ( )
// execCompactionPlan start to execute plan and return immediately
2021-11-11 15:54:42 +08:00
execCompactionPlan ( signal * compactionSignal , plan * datapb . CompactionPlan ) error
2021-11-05 22:25:00 +08:00
// getCompaction return compaction task. If planId does not exist, return nil.
getCompaction ( planID int64 ) * compactionTask
2022-08-23 15:50:52 +08:00
// updateCompaction set the compaction state to timeout or completed
updateCompaction ( ts Timestamp ) error
2021-11-05 22:25:00 +08:00
// isFull return true if the task pool is full
isFull ( ) bool
2021-11-09 14:47:02 +08:00
// get compaction tasks by signal id
getCompactionTasksBySignalID ( signalID int64 ) [ ] * compactionTask
2023-11-29 10:50:29 +08:00
removeTasksByChannel ( channel string )
2021-11-05 22:25:00 +08:00
}
type compactionTaskState int8
const (
executing compactionTaskState = iota + 1
2023-02-15 16:00:33 +08:00
pipelining
2021-11-05 22:25:00 +08:00
completed
2022-08-23 15:50:52 +08:00
failed
2021-11-05 22:25:00 +08:00
timeout
)
var (
errChannelNotWatched = errors . New ( "channel is not watched" )
errChannelInBuffer = errors . New ( "channel is in buffer" )
)
2023-12-05 18:44:37 +08:00
type CompactionMeta interface {
2024-05-08 21:37:29 +08:00
SelectSegments ( filters ... SegmentFilter ) [ ] * SegmentInfo
2023-12-05 18:44:37 +08:00
GetHealthySegment ( segID UniqueID ) * SegmentInfo
UpdateSegmentsInfo ( operators ... UpdateOperator ) error
SetSegmentCompacting ( segmentID int64 , compacting bool )
2024-02-19 20:52:50 +08:00
CompleteCompactionMutation ( plan * datapb . CompactionPlan , result * datapb . CompactionPlanResult ) ( [ ] * SegmentInfo , * segMetricMutation , error )
2023-12-05 18:44:37 +08:00
}
var _ CompactionMeta = ( * meta ) ( nil )
2021-11-05 22:25:00 +08:00
type compactionTask struct {
triggerInfo * compactionSignal
plan * datapb . CompactionPlan
state compactionTaskState
dataNodeID int64
2023-11-14 15:56:19 +08:00
result * datapb . CompactionPlanResult
2024-01-23 10:37:00 +08:00
span trace . Span
2021-11-05 22:25:00 +08:00
}
func ( t * compactionTask ) shadowClone ( opts ... compactionTaskOpt ) * compactionTask {
task := & compactionTask {
2021-11-11 15:54:42 +08:00
triggerInfo : t . triggerInfo ,
plan : t . plan ,
state : t . state ,
dataNodeID : t . dataNodeID ,
2024-01-23 10:37:00 +08:00
span : t . span ,
2021-11-05 22:25:00 +08:00
}
for _ , opt := range opts {
opt ( task )
}
return task
}
var _ compactionPlanContext = ( * compactionPlanHandler ) ( nil )
type compactionPlanHandler struct {
2023-11-29 10:50:29 +08:00
mu sync . RWMutex
plans map [ int64 ] * compactionTask // planID -> task
2023-12-05 18:44:37 +08:00
meta CompactionMeta
2023-11-07 03:18:18 +08:00
allocator allocator
2024-01-02 18:08:49 +08:00
chManager ChannelManager
2023-12-05 18:44:37 +08:00
scheduler Scheduler
2023-12-11 17:52:37 +08:00
sessions SessionManager
2023-11-29 10:50:29 +08:00
stopCh chan struct { }
stopOnce sync . Once
stopWg sync . WaitGroup
2021-11-05 22:25:00 +08:00
}
2024-01-02 18:08:49 +08:00
func newCompactionPlanHandler ( sessions SessionManager , cm ChannelManager , meta CompactionMeta , allocator allocator ,
2023-09-21 09:45:27 +08:00
) * compactionPlanHandler {
2021-11-05 22:25:00 +08:00
return & compactionPlanHandler {
2023-01-04 19:37:36 +08:00
plans : make ( map [ int64 ] * compactionTask ) ,
chManager : cm ,
meta : meta ,
sessions : sessions ,
allocator : allocator ,
2023-11-23 17:30:25 +08:00
scheduler : NewCompactionScheduler ( ) ,
2021-11-05 22:25:00 +08:00
}
}
2023-12-05 18:44:37 +08:00
func ( c * compactionPlanHandler ) checkResult ( ) {
// deal results
2023-12-22 12:00:43 +08:00
ts , err := c . GetCurrentTS ( )
if err != nil {
log . Warn ( "fail to check result" , zap . Error ( err ) )
return
}
2024-03-19 01:01:36 +08:00
err = c . updateCompaction ( ts )
if err != nil {
log . Warn ( "fail to update compaction" , zap . Error ( err ) )
return
}
2023-12-22 12:00:43 +08:00
}
func ( c * compactionPlanHandler ) GetCurrentTS ( ) ( Timestamp , error ) {
2023-12-13 18:48:39 +08:00
interval := Params . DataCoordCfg . CompactionRPCTimeout . GetAsDuration ( time . Second )
ctx , cancel := context . WithTimeout ( context . Background ( ) , interval )
2023-12-05 18:44:37 +08:00
defer cancel ( )
ts , err := c . allocator . allocTimestamp ( ctx )
if err != nil {
log . Warn ( "unable to alloc timestamp" , zap . Error ( err ) )
2023-12-22 12:00:43 +08:00
return 0 , err
2023-12-05 18:44:37 +08:00
}
2023-12-22 12:00:43 +08:00
return ts , nil
2023-12-05 18:44:37 +08:00
}
func ( c * compactionPlanHandler ) schedule ( ) {
// schedule queuing tasks
tasks := c . scheduler . Schedule ( )
if len ( tasks ) > 0 {
c . notifyTasks ( tasks )
c . scheduler . LogStatus ( )
}
}
2021-11-05 22:25:00 +08:00
func ( c * compactionPlanHandler ) start ( ) {
2022-12-07 18:01:19 +08:00
interval := Params . DataCoordCfg . CompactionCheckIntervalInSeconds . GetAsDuration ( time . Second )
2023-11-29 10:50:29 +08:00
c . stopCh = make ( chan struct { } )
2023-12-22 12:00:43 +08:00
c . stopWg . Add ( 3 )
2021-11-05 22:25:00 +08:00
go func ( ) {
2023-11-29 10:50:29 +08:00
defer c . stopWg . Done ( )
2023-11-07 03:18:18 +08:00
checkResultTicker := time . NewTicker ( interval )
2023-11-29 10:50:29 +08:00
log . Info ( "Compaction handler check result loop start" , zap . Any ( "check result interval" , interval ) )
2023-11-07 03:18:18 +08:00
defer checkResultTicker . Stop ( )
2021-11-05 22:25:00 +08:00
for {
select {
2023-11-29 10:50:29 +08:00
case <- c . stopCh :
log . Info ( "compaction handler check result loop quit" )
2021-11-05 22:25:00 +08:00
return
2023-11-07 03:18:18 +08:00
case <- checkResultTicker . C :
2023-12-05 18:44:37 +08:00
c . checkResult ( )
2023-11-29 10:50:29 +08:00
}
}
} ( )
// saperate check results and schedule goroutine so that check results doesn't
// influence the schedule
go func ( ) {
defer c . stopWg . Done ( )
scheduleTicker := time . NewTicker ( 200 * time . Millisecond )
defer scheduleTicker . Stop ( )
log . Info ( "compaction handler start schedule" )
for {
select {
case <- c . stopCh :
log . Info ( "Compaction handler quit schedule" )
return
2023-11-07 03:18:18 +08:00
case <- scheduleTicker . C :
2023-12-05 18:44:37 +08:00
c . schedule ( )
2021-11-05 22:25:00 +08:00
}
}
} ( )
2023-12-22 12:00:43 +08:00
go func ( ) {
defer c . stopWg . Done ( )
cleanTicker := time . NewTicker ( 30 * time . Minute )
defer cleanTicker . Stop ( )
for {
select {
case <- c . stopCh :
log . Info ( "Compaction handler quit clean" )
return
case <- cleanTicker . C :
c . Clean ( )
}
}
} ( )
}
func ( c * compactionPlanHandler ) Clean ( ) {
current := tsoutil . GetCurrentTime ( )
c . mu . Lock ( )
defer c . mu . Unlock ( )
for id , task := range c . plans {
if task . state == executing || task . state == pipelining {
continue
}
// after timeout + 1h, the plan will be cleaned
if c . isTimeout ( current , task . plan . GetStartTime ( ) , task . plan . GetTimeoutInSeconds ( ) + 60 * 60 ) {
delete ( c . plans , id )
}
}
2021-11-05 22:25:00 +08:00
}
func ( c * compactionPlanHandler ) stop ( ) {
2023-11-29 10:50:29 +08:00
c . stopOnce . Do ( func ( ) {
close ( c . stopCh )
} )
c . stopWg . Wait ( )
}
func ( c * compactionPlanHandler ) removeTasksByChannel ( channel string ) {
c . mu . Lock ( )
defer c . mu . Unlock ( )
for id , task := range c . plans {
if task . triggerInfo . channel == channel {
2023-12-05 18:44:37 +08:00
log . Info ( "Compaction handler removing tasks by channel" ,
zap . String ( "channel" , channel ) ,
zap . Int64 ( "planID" , task . plan . GetPlanID ( ) ) ,
zap . Int64 ( "node" , task . dataNodeID ) ,
)
2023-12-28 15:46:55 +08:00
c . scheduler . Finish ( task . dataNodeID , task . plan )
2023-11-29 10:50:29 +08:00
delete ( c . plans , id )
}
}
2021-11-05 22:25:00 +08:00
}
2023-02-15 16:00:33 +08:00
func ( c * compactionPlanHandler ) updateTask ( planID int64 , opts ... compactionTaskOpt ) {
c . mu . Lock ( )
defer c . mu . Unlock ( )
2023-12-05 18:44:37 +08:00
if plan , ok := c . plans [ planID ] ; ok {
c . plans [ planID ] = plan . shadowClone ( opts ... )
2023-11-29 18:54:27 +08:00
}
2023-02-15 16:00:33 +08:00
}
2023-11-07 03:18:18 +08:00
func ( c * compactionPlanHandler ) enqueuePlan ( signal * compactionSignal , plan * datapb . CompactionPlan ) error {
2021-11-05 22:25:00 +08:00
nodeID , err := c . chManager . FindWatcher ( plan . GetChannel ( ) )
if err != nil {
2023-08-30 11:12:27 +08:00
log . Error ( "failed to find watcher" , zap . Int64 ( "planID" , plan . GetPlanID ( ) ) , zap . Error ( err ) )
2021-11-05 22:25:00 +08:00
return err
}
2023-08-30 11:12:27 +08:00
log := log . With ( zap . Int64 ( "planID" , plan . GetPlanID ( ) ) , zap . Int64 ( "nodeID" , nodeID ) )
2021-11-05 22:25:00 +08:00
c . setSegmentsCompacting ( plan , true )
2024-01-23 10:37:00 +08:00
_ , span := otel . Tracer ( typeutil . DataCoordRole ) . Start ( context . Background ( ) , fmt . Sprintf ( "Compaction-%s" , plan . GetType ( ) ) )
2022-08-26 14:22:55 +08:00
task := & compactionTask {
triggerInfo : signal ,
plan : plan ,
2023-02-15 16:00:33 +08:00
state : pipelining ,
2022-08-26 14:22:55 +08:00
dataNodeID : nodeID ,
2024-01-23 10:37:00 +08:00
span : span ,
2022-08-26 14:22:55 +08:00
}
2023-11-07 03:18:18 +08:00
c . mu . Lock ( )
2022-08-26 14:22:55 +08:00
c . plans [ plan . PlanID ] = task
2023-11-07 03:18:18 +08:00
c . mu . Unlock ( )
2022-08-26 14:22:55 +08:00
2023-11-23 17:30:25 +08:00
c . scheduler . Submit ( task )
2023-11-07 03:18:18 +08:00
log . Info ( "Compaction plan submited" )
2021-11-05 22:25:00 +08:00
return nil
}
2024-03-06 21:36:59 +08:00
func ( c * compactionPlanHandler ) RefreshPlan ( task * compactionTask ) error {
2023-11-23 17:30:25 +08:00
plan := task . plan
2023-12-05 18:44:37 +08:00
log := log . With ( zap . Int64 ( "taskID" , task . triggerInfo . id ) , zap . Int64 ( "planID" , plan . GetPlanID ( ) ) )
2023-11-23 17:30:25 +08:00
if plan . GetType ( ) == datapb . CompactionType_Level0DeleteCompaction {
2024-03-05 16:37:00 +08:00
// Fill in deltalogs for L0 segments
2024-03-06 21:36:59 +08:00
for _ , seg := range plan . GetSegmentBinlogs ( ) {
2024-03-05 16:37:00 +08:00
if seg . GetLevel ( ) == datapb . SegmentLevel_L0 {
segInfo := c . meta . GetHealthySegment ( seg . GetSegmentID ( ) )
2024-03-06 21:36:59 +08:00
if segInfo == nil {
return merr . WrapErrSegmentNotFound ( seg . GetSegmentID ( ) )
}
2024-03-05 16:37:00 +08:00
seg . Deltalogs = segInfo . GetDeltalogs ( )
}
2024-03-06 21:36:59 +08:00
}
2024-03-05 16:37:00 +08:00
// Select sealed L1 segments for LevelZero compaction that meets the condition:
// dmlPos < triggerInfo.pos
// TODO: select L2 segments too
2024-05-08 21:37:29 +08:00
sealedSegments := c . meta . SelectSegments ( WithCollection ( task . triggerInfo . collectionID ) , SegmentFilterFunc ( func ( info * SegmentInfo ) bool {
return ( task . triggerInfo . partitionID == - 1 || info . GetPartitionID ( ) == task . triggerInfo . partitionID ) &&
2023-11-23 17:30:25 +08:00
info . GetInsertChannel ( ) == plan . GetChannel ( ) &&
isFlushState ( info . GetState ( ) ) &&
! info . isCompacting &&
! info . GetIsImporting ( ) &&
info . GetLevel ( ) != datapb . SegmentLevel_L0 &&
info . GetDmlPosition ( ) . GetTimestamp ( ) < task . triggerInfo . pos . GetTimestamp ( )
2024-05-08 21:37:29 +08:00
} ) )
2024-03-19 10:13:14 +08:00
if len ( sealedSegments ) == 0 {
return errors . Errorf ( "Selected zero L1/L2 segments for the position=%v" , task . triggerInfo . pos )
}
2023-11-23 17:30:25 +08:00
sealedSegBinlogs := lo . Map ( sealedSegments , func ( info * SegmentInfo , _ int ) * datapb . CompactionSegmentBinlogs {
return & datapb . CompactionSegmentBinlogs {
2024-01-26 10:51:02 +08:00
SegmentID : info . GetID ( ) ,
Level : datapb . SegmentLevel_L1 ,
CollectionID : info . GetCollectionID ( ) ,
PartitionID : info . GetPartitionID ( ) ,
2023-11-23 17:30:25 +08:00
}
} )
plan . SegmentBinlogs = append ( plan . SegmentBinlogs , sealedSegBinlogs ... )
2024-03-05 16:37:00 +08:00
log . Info ( "Compaction handler refreshed level zero compaction plan" ,
zap . Any ( "target position" , task . triggerInfo . pos ) ,
zap . Any ( "target segments count" , len ( sealedSegBinlogs ) ) )
2024-03-06 21:36:59 +08:00
return nil
2023-11-23 17:30:25 +08:00
}
if plan . GetType ( ) == datapb . CompactionType_MixCompaction {
2024-03-01 11:31:00 +08:00
segIDMap := make ( map [ int64 ] [ ] * datapb . FieldBinlog , len ( plan . SegmentBinlogs ) )
2023-11-23 17:30:25 +08:00
for _ , seg := range plan . GetSegmentBinlogs ( ) {
2024-03-06 21:36:59 +08:00
info := c . meta . GetHealthySegment ( seg . GetSegmentID ( ) )
if info == nil {
return merr . WrapErrSegmentNotFound ( seg . GetSegmentID ( ) )
2023-12-05 18:44:37 +08:00
}
2024-03-06 21:36:59 +08:00
seg . Deltalogs = info . GetDeltalogs ( )
segIDMap [ seg . SegmentID ] = info . GetDeltalogs ( )
2023-11-23 17:30:25 +08:00
}
2024-03-01 11:31:00 +08:00
log . Info ( "Compaction handler refreshed mix compaction plan" , zap . Any ( "segID2DeltaLogs" , segIDMap ) )
2023-11-23 17:30:25 +08:00
}
2024-03-06 21:36:59 +08:00
return nil
2023-11-23 17:30:25 +08:00
}
2023-11-07 03:18:18 +08:00
func ( c * compactionPlanHandler ) notifyTasks ( tasks [ ] * compactionTask ) {
for _ , task := range tasks {
2023-11-14 16:38:18 +08:00
// avoid closure capture iteration variable
innerTask := task
2024-03-06 21:36:59 +08:00
err := c . RefreshPlan ( innerTask )
if err != nil {
c . updateTask ( innerTask . plan . GetPlanID ( ) , setState ( failed ) , endSpan ( ) )
c . scheduler . Finish ( innerTask . dataNodeID , innerTask . plan )
log . Warn ( "failed to refresh task" ,
zap . Int64 ( "plan" , task . plan . PlanID ) ,
zap . Error ( err ) )
continue
}
2023-11-07 03:18:18 +08:00
getOrCreateIOPool ( ) . Submit ( func ( ) ( any , error ) {
2024-01-30 10:09:03 +08:00
ctx := tracer . SetupSpan ( context . Background ( ) , innerTask . span )
2023-11-14 16:38:18 +08:00
plan := innerTask . plan
2024-01-30 10:09:03 +08:00
log := log . Ctx ( ctx ) . With ( zap . Int64 ( "planID" , plan . GetPlanID ( ) ) , zap . Int64 ( "nodeID" , innerTask . dataNodeID ) )
2023-11-07 03:18:18 +08:00
log . Info ( "Notify compaction task to DataNode" )
2024-01-30 10:09:03 +08:00
ts , err := c . allocator . allocTimestamp ( ctx )
2023-11-07 03:18:18 +08:00
if err != nil {
log . Warn ( "Alloc start time for CompactionPlan failed" , zap . Error ( err ) )
// update plan ts to TIMEOUT ts
c . updateTask ( plan . PlanID , setState ( executing ) , setStartTime ( tsTimeout ) )
return nil , err
}
2023-11-14 16:38:18 +08:00
c . updateTask ( plan . PlanID , setStartTime ( ts ) )
2024-01-23 10:37:00 +08:00
err = c . sessions . Compaction ( ctx , innerTask . dataNodeID , plan )
2023-11-14 16:38:18 +08:00
c . updateTask ( plan . PlanID , setState ( executing ) )
2023-11-07 03:18:18 +08:00
if err != nil {
log . Warn ( "Failed to notify compaction tasks to DataNode" , zap . Error ( err ) )
return nil , err
}
log . Info ( "Compaction start" )
return nil , nil
} )
}
}
// execCompactionPlan start to execute plan and return immediately
func ( c * compactionPlanHandler ) execCompactionPlan ( signal * compactionSignal , plan * datapb . CompactionPlan ) error {
return c . enqueuePlan ( signal , plan )
}
2021-11-05 22:25:00 +08:00
func ( c * compactionPlanHandler ) setSegmentsCompacting ( plan * datapb . CompactionPlan , compacting bool ) {
for _ , segmentBinlogs := range plan . GetSegmentBinlogs ( ) {
c . meta . SetSegmentCompacting ( segmentBinlogs . GetSegmentID ( ) , compacting )
}
}
2022-08-23 15:50:52 +08:00
// complete a compaction task
// not threadsafe, only can be used internally
2023-11-14 15:56:19 +08:00
func ( c * compactionPlanHandler ) completeCompaction ( result * datapb . CompactionPlanResult ) error {
2021-11-05 22:25:00 +08:00
planID := result . PlanID
if _ , ok := c . plans [ planID ] ; ! ok {
return fmt . Errorf ( "plan %d is not found" , planID )
}
if c . plans [ planID ] . state != executing {
return fmt . Errorf ( "plan %d's state is %v" , planID , c . plans [ planID ] . state )
}
plan := c . plans [ planID ] . plan
2023-11-07 03:18:18 +08:00
nodeID := c . plans [ planID ] . dataNodeID
2023-12-28 15:46:55 +08:00
defer c . scheduler . Finish ( nodeID , plan )
2021-11-05 22:25:00 +08:00
switch plan . GetType ( ) {
2022-02-18 14:51:49 +08:00
case datapb . CompactionType_MergeCompaction , datapb . CompactionType_MixCompaction :
2022-08-25 15:48:54 +08:00
if err := c . handleMergeCompactionResult ( plan , result ) ; err != nil {
2021-11-05 22:25:00 +08:00
return err
}
2023-12-05 18:44:37 +08:00
case datapb . CompactionType_Level0DeleteCompaction :
if err := c . handleL0CompactionResult ( plan , result ) ; err != nil {
return err
}
2021-11-05 22:25:00 +08:00
default :
return errors . New ( "unknown compaction type" )
}
2023-11-14 15:56:19 +08:00
UpdateCompactionSegmentSizeMetrics ( result . GetSegments ( ) )
2024-01-23 10:37:00 +08:00
c . plans [ planID ] = c . plans [ planID ] . shadowClone ( setState ( completed ) , setResult ( result ) , cleanLogPath ( ) , endSpan ( ) )
2021-11-05 22:25:00 +08:00
return nil
}
2023-12-05 18:44:37 +08:00
func ( c * compactionPlanHandler ) handleL0CompactionResult ( plan * datapb . CompactionPlan , result * datapb . CompactionPlanResult ) error {
var operators [ ] UpdateOperator
for _ , seg := range result . GetSegments ( ) {
2024-03-20 17:53:14 +08:00
operators = append ( operators , AddBinlogsOperator ( seg . GetSegmentID ( ) , nil , nil , seg . GetDeltalogs ( ) ) )
2023-12-05 18:44:37 +08:00
}
levelZeroSegments := lo . Filter ( plan . GetSegmentBinlogs ( ) , func ( b * datapb . CompactionSegmentBinlogs , _ int ) bool {
return b . GetLevel ( ) == datapb . SegmentLevel_L0
} )
for _ , seg := range levelZeroSegments {
2024-01-09 10:52:49 +08:00
operators = append ( operators , UpdateStatusOperator ( seg . GetSegmentID ( ) , commonpb . SegmentState_Dropped ) , UpdateCompactedOperator ( seg . GetSegmentID ( ) ) )
2023-12-05 18:44:37 +08:00
}
log . Info ( "meta update: update segments info for level zero compaction" ,
zap . Int64 ( "planID" , plan . GetPlanID ( ) ) ,
)
return c . meta . UpdateSegmentsInfo ( operators ... )
}
2023-11-14 15:56:19 +08:00
func ( c * compactionPlanHandler ) handleMergeCompactionResult ( plan * datapb . CompactionPlan , result * datapb . CompactionPlanResult ) error {
2022-09-27 16:02:53 +08:00
log := log . With ( zap . Int64 ( "planID" , plan . GetPlanID ( ) ) )
2023-11-29 10:50:29 +08:00
if len ( result . GetSegments ( ) ) == 0 || len ( result . GetSegments ( ) ) > 1 {
// should never happen
log . Warn ( "illegal compaction results" )
return fmt . Errorf ( "Illegal compaction results: %v" , result )
}
2022-09-27 16:02:53 +08:00
2023-11-29 10:50:29 +08:00
// Merge compaction has one and only one segment
newSegmentInfo := c . meta . GetHealthySegment ( result . GetSegments ( ) [ 0 ] . SegmentID )
if newSegmentInfo != nil {
log . Info ( "meta has already been changed, skip meta change and retry sync segments" )
} else {
// Also prepare metric updates.
2024-02-19 20:52:50 +08:00
newSegments , metricMutation , err := c . meta . CompleteCompactionMutation ( plan , result )
2023-11-29 10:50:29 +08:00
if err != nil {
return err
}
// Apply metrics after successful meta update.
metricMutation . commit ( )
2024-02-19 20:52:50 +08:00
newSegmentInfo = newSegments [ 0 ]
2022-09-27 16:02:53 +08:00
}
2023-09-21 09:45:27 +08:00
nodeID := c . plans [ plan . GetPlanID ( ) ] . dataNodeID
2022-09-27 16:02:53 +08:00
req := & datapb . SyncSegmentsRequest {
PlanID : plan . PlanID ,
2023-11-29 10:50:29 +08:00
CompactedTo : newSegmentInfo . GetID ( ) ,
CompactedFrom : newSegmentInfo . GetCompactionFrom ( ) ,
NumOfRows : newSegmentInfo . GetNumOfRows ( ) ,
StatsLogs : newSegmentInfo . GetStatslogs ( ) ,
2023-11-07 10:06:17 +08:00
ChannelName : plan . GetChannel ( ) ,
2023-11-29 10:50:29 +08:00
PartitionId : newSegmentInfo . GetPartitionID ( ) ,
CollectionId : newSegmentInfo . GetCollectionID ( ) ,
2022-09-27 16:02:53 +08:00
}
2022-10-25 19:31:30 +08:00
log . Info ( "handleCompactionResult: syncing segments with node" , zap . Int64 ( "nodeID" , nodeID ) )
2022-09-27 16:02:53 +08:00
if err := c . sessions . SyncSegments ( nodeID , req ) ; err != nil {
2023-11-29 10:50:29 +08:00
log . Warn ( "handleCompactionResult: fail to sync segments with node" ,
2023-03-17 17:27:56 +08:00
zap . Int64 ( "nodeID" , nodeID ) , zap . Error ( err ) )
return err
2022-09-27 16:02:53 +08:00
}
log . Info ( "handleCompactionResult: success to handle merge compaction result" )
return nil
2021-11-05 22:25:00 +08:00
}
// getCompaction return compaction task. If planId does not exist, return nil.
func ( c * compactionPlanHandler ) getCompaction ( planID int64 ) * compactionTask {
c . mu . RLock ( )
defer c . mu . RUnlock ( )
return c . plans [ planID ]
}
2022-08-23 15:50:52 +08:00
func ( c * compactionPlanHandler ) updateCompaction ( ts Timestamp ) error {
2023-08-30 11:12:27 +08:00
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
2023-08-01 08:55:04 +08:00
// for DC might add new task while GetCompactionState.
2023-08-30 11:12:27 +08:00
executingTasks := c . getTasksByState ( executing )
timeoutTasks := c . getTasksByState ( timeout )
2024-03-19 01:01:36 +08:00
planStates , err := c . sessions . GetCompactionPlansResults ( )
if err != nil {
// if there is a data node alive but we failed to get info,
log . Warn ( "failed to get compaction plans from all nodes" , zap . Error ( err ) )
return err
}
2024-03-01 11:31:00 +08:00
cachedPlans := [ ] int64 { }
2022-08-23 15:50:52 +08:00
2024-03-01 11:31:00 +08:00
// TODO reduce the lock range
2021-11-05 22:25:00 +08:00
c . mu . Lock ( )
2023-08-30 11:12:27 +08:00
for _ , task := range executingTasks {
2024-04-10 15:09:18 +08:00
log := log . With (
zap . Int64 ( "planID" , task . plan . PlanID ) ,
zap . Int64 ( "nodeID" , task . dataNodeID ) ,
zap . String ( "channel" , task . plan . GetChannel ( ) ) )
2022-08-23 15:50:52 +08:00
planID := task . plan . PlanID
2024-03-01 11:31:00 +08:00
cachedPlans = append ( cachedPlans , planID )
2024-04-10 15:09:18 +08:00
2024-03-01 11:31:00 +08:00
if nodePlan , ok := planStates [ planID ] ; ok {
planResult := nodePlan . B
switch planResult . GetState ( ) {
case commonpb . CompactionState_Completed :
log . Info ( "start to complete compaction" )
2024-04-10 15:09:18 +08:00
// channels are balanced to other nodes, yet the old datanode still have the compaction results
// task.dataNodeID == planState.A, but
// task.dataNodeID not match with channel
// Mark this compaction as failure and skip processing the meta
if ! c . chManager . Match ( task . dataNodeID , task . plan . GetChannel ( ) ) {
// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
// without changing the meta
log . Warn ( "compaction failed for channel nodeID not match" )
if err := c . sessions . SyncSegments ( task . dataNodeID , & datapb . SyncSegmentsRequest { PlanID : planID } ) ; err != nil {
log . Warn ( "compaction failed to sync segments with node" , zap . Error ( err ) )
continue
}
c . plans [ planID ] = c . plans [ planID ] . shadowClone ( setState ( failed ) , endSpan ( ) )
c . setSegmentsCompacting ( task . plan , false )
c . scheduler . Finish ( task . dataNodeID , task . plan )
}
2024-03-01 11:31:00 +08:00
if err := c . completeCompaction ( planResult ) ; err != nil {
log . Warn ( "fail to complete compaction" , zap . Error ( err ) )
}
case commonpb . CompactionState_Executing :
if c . isTimeout ( ts , task . plan . GetStartTime ( ) , task . plan . GetTimeoutInSeconds ( ) ) {
log . Warn ( "compaction timeout" ,
zap . Int32 ( "timeout in seconds" , task . plan . GetTimeoutInSeconds ( ) ) ,
zap . Uint64 ( "startTime" , task . plan . GetStartTime ( ) ) ,
zap . Uint64 ( "now" , ts ) ,
)
c . plans [ planID ] = c . plans [ planID ] . shadowClone ( setState ( timeout ) , endSpan ( ) )
2023-06-19 14:18:41 +08:00
}
2022-08-26 14:22:55 +08:00
}
2024-03-01 11:31:00 +08:00
} else {
// compaction task in DC but not found in DN means the compaction plan has failed
log . Info ( "compaction failed" )
c . plans [ planID ] = c . plans [ planID ] . shadowClone ( setState ( failed ) , endSpan ( ) )
c . setSegmentsCompacting ( task . plan , false )
c . scheduler . Finish ( task . dataNodeID , task . plan )
2021-11-05 22:25:00 +08:00
}
}
2023-08-30 11:12:27 +08:00
// Timeout tasks will be timeout and failed in DataNode
2024-03-01 11:31:00 +08:00
// need to wait for DataNode reporting failure and clean the status.
2023-08-30 11:12:27 +08:00
for _ , task := range timeoutTasks {
2024-04-10 15:09:18 +08:00
log := log . With (
zap . Int64 ( "planID" , task . plan . PlanID ) ,
zap . Int64 ( "nodeID" , task . dataNodeID ) ,
zap . String ( "channel" , task . plan . GetChannel ( ) ) ,
)
2023-08-30 11:12:27 +08:00
planID := task . plan . PlanID
2024-03-01 11:31:00 +08:00
cachedPlans = append ( cachedPlans , planID )
if nodePlan , ok := planStates [ planID ] ; ok {
if nodePlan . B . GetState ( ) == commonpb . CompactionState_Executing {
log . RatedInfo ( 1 , "compaction timeout in DataCoord yet DataNode is still running" )
}
} else {
// compaction task in DC but not found in DN means the compaction plan has failed
log . Info ( "compaction failed for timeout" )
2024-01-23 10:37:00 +08:00
c . plans [ planID ] = c . plans [ planID ] . shadowClone ( setState ( failed ) , endSpan ( ) )
2023-08-30 11:12:27 +08:00
c . setSegmentsCompacting ( task . plan , false )
2023-12-28 15:46:55 +08:00
c . scheduler . Finish ( task . dataNodeID , task . plan )
2023-08-30 11:12:27 +08:00
}
2024-03-01 11:31:00 +08:00
}
c . mu . Unlock ( )
2023-08-30 11:12:27 +08:00
2024-03-01 11:31:00 +08:00
// Compaction plans in DN but not in DC are unknown plans, need to notify DN to clear it.
// No locks needed, because no changes in DC memeory
completedPlans := lo . PickBy ( planStates , func ( planID int64 , planState * typeutil . Pair [ int64 , * datapb . CompactionPlanResult ] ) bool {
return planState . B . GetState ( ) == commonpb . CompactionState_Completed
} )
unkonwnPlansInWorker , _ := lo . Difference ( lo . Keys ( completedPlans ) , cachedPlans )
for _ , planID := range unkonwnPlansInWorker {
if nodeUnkonwnPlan , ok := completedPlans [ planID ] ; ok {
nodeID , plan := nodeUnkonwnPlan . A , nodeUnkonwnPlan . B
2024-04-10 15:09:18 +08:00
log := log . With ( zap . Int64 ( "planID" , planID ) , zap . Int64 ( "nodeID" , nodeID ) , zap . String ( "channel" , plan . GetChannel ( ) ) )
2024-03-01 11:31:00 +08:00
// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
// without changing the meta
2024-04-10 15:09:18 +08:00
log . Info ( "compaction syncing unknown plan with node" )
if err := c . sessions . SyncSegments ( nodeID , & datapb . SyncSegmentsRequest {
2024-03-01 11:31:00 +08:00
PlanID : planID ,
ChannelName : plan . GetChannel ( ) ,
2024-04-10 15:09:18 +08:00
} ) ; err != nil {
2024-03-01 11:31:00 +08:00
log . Warn ( "compaction failed to sync segments with node" , zap . Error ( err ) )
return err
}
2023-08-30 11:12:27 +08:00
}
}
2024-03-01 11:31:00 +08:00
2021-11-05 22:25:00 +08:00
return nil
}
func ( c * compactionPlanHandler ) isTimeout ( now Timestamp , start Timestamp , timeout int32 ) bool {
2021-12-17 19:14:59 +08:00
startTime , _ := tsoutil . ParseTS ( start )
2021-11-05 22:25:00 +08:00
ts , _ := tsoutil . ParseTS ( now )
2021-12-17 19:14:59 +08:00
return int32 ( ts . Sub ( startTime ) . Seconds ( ) ) >= timeout
2021-11-05 22:25:00 +08:00
}
// isFull return true if the task pool is full
func ( c * compactionPlanHandler ) isFull ( ) bool {
2023-12-05 18:44:37 +08:00
return c . scheduler . GetTaskCount ( ) >= Params . DataCoordCfg . CompactionMaxParallelTasks . GetAsInt ( )
2021-11-05 22:25:00 +08:00
}
2023-08-30 11:12:27 +08:00
func ( c * compactionPlanHandler ) getTasksByState ( state compactionTaskState ) [ ] * compactionTask {
2023-08-01 08:55:04 +08:00
c . mu . RLock ( )
defer c . mu . RUnlock ( )
2021-11-05 22:25:00 +08:00
tasks := make ( [ ] * compactionTask , 0 , len ( c . plans ) )
for _ , plan := range c . plans {
2023-08-30 11:12:27 +08:00
if plan . state == state {
2021-11-05 22:25:00 +08:00
tasks = append ( tasks , plan )
}
}
return tasks
}
2022-08-23 15:50:52 +08:00
// get compaction tasks by signal id; if signalID == 0 return all tasks
2021-11-09 14:47:02 +08:00
func ( c * compactionPlanHandler ) getCompactionTasksBySignalID ( signalID int64 ) [ ] * compactionTask {
2021-11-05 22:25:00 +08:00
c . mu . RLock ( )
defer c . mu . RUnlock ( )
2021-11-09 14:47:02 +08:00
var tasks [ ] * compactionTask
2021-11-05 22:25:00 +08:00
for _ , t := range c . plans {
2022-08-23 15:50:52 +08:00
if signalID == 0 {
tasks = append ( tasks , t )
continue
}
2021-11-05 22:25:00 +08:00
if t . triggerInfo . id != signalID {
continue
}
2021-11-09 14:47:02 +08:00
tasks = append ( tasks , t )
2021-11-05 22:25:00 +08:00
}
2021-11-09 14:47:02 +08:00
return tasks
2021-11-05 22:25:00 +08:00
}
type compactionTaskOpt func ( task * compactionTask )
func setState ( state compactionTaskState ) compactionTaskOpt {
return func ( task * compactionTask ) {
task . state = state
}
}
2021-11-09 14:47:02 +08:00
2024-01-23 10:37:00 +08:00
func endSpan ( ) compactionTaskOpt {
return func ( task * compactionTask ) {
if task . span != nil {
task . span . End ( )
}
}
}
2023-02-15 16:00:33 +08:00
func setStartTime ( startTime uint64 ) compactionTaskOpt {
return func ( task * compactionTask ) {
task . plan . StartTime = startTime
}
}
2023-11-14 15:56:19 +08:00
func setResult ( result * datapb . CompactionPlanResult ) compactionTaskOpt {
2021-11-09 14:47:02 +08:00
return func ( task * compactionTask ) {
task . result = result
}
}
2022-08-23 15:50:52 +08:00
2023-12-22 12:00:43 +08:00
// cleanLogPath clean the log info in the compactionTask object for avoiding the memory leak
func cleanLogPath ( ) compactionTaskOpt {
return func ( task * compactionTask ) {
if task . plan . GetSegmentBinlogs ( ) != nil {
for _ , binlogs := range task . plan . GetSegmentBinlogs ( ) {
binlogs . FieldBinlogs = nil
binlogs . Field2StatslogPaths = nil
binlogs . Deltalogs = nil
}
}
if task . result . GetSegments ( ) != nil {
for _ , segment := range task . result . GetSegments ( ) {
segment . InsertLogs = nil
segment . Deltalogs = nil
segment . Field2StatslogPaths = nil
}
}
}
}
2022-08-23 15:50:52 +08:00
// 0.5*min(8, NumCPU/2)
func calculateParallel ( ) int {
2023-07-24 10:17:00 +08:00
// TODO after node memory management enabled, use this config as hard limit
return Params . DataCoordCfg . CompactionWorkerParalleTasks . GetAsInt ( )
2023-11-06 06:02:16 +08:00
//cores := hardware.GetCPUNum()
2022-08-23 15:50:52 +08:00
//if cores < 16 {
//return 4
//}
//return cores / 2
}
2023-11-07 03:18:18 +08:00
var (
ioPool * conc . Pool [ any ]
ioPoolInitOnce sync . Once
)
func initIOPool ( ) {
capacity := Params . DataNodeCfg . IOConcurrency . GetAsInt ( )
if capacity > 32 {
capacity = 32
}
// error only happens with negative expiry duration or with negative pre-alloc size.
ioPool = conc . NewPool [ any ] ( capacity )
}
func getOrCreateIOPool ( ) * conc . Pool [ any ] {
ioPoolInitOnce . Do ( initIOPool )
return ioPool
}