2021-12-01 19:33:32 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-11-05 22:25:00 +08:00
package datacoord
import (
"context"
"fmt"
"sync"
"time"
2023-02-26 11:31:49 +08:00
"github.com/cockroachdb/errors"
2023-04-06 19:14:32 +08:00
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"go.uber.org/zap"
2023-02-26 11:31:49 +08:00
2023-06-09 01:28:37 +08:00
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
2021-11-05 22:25:00 +08:00
"github.com/milvus-io/milvus/internal/proto/datapb"
2023-04-06 19:14:32 +08:00
"github.com/milvus-io/milvus/pkg/log"
2023-06-26 17:52:44 +08:00
"github.com/milvus-io/milvus/pkg/metrics"
2021-11-05 22:25:00 +08:00
)
// TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple
2022-06-15 23:14:10 +08:00
// TODO we should split compaction into different priorities, small compaction helps to merge segment, large compaction helps to handle delta and expiration of large segments
2021-11-05 22:25:00 +08:00
const (
2022-08-23 15:50:52 +08:00
maxParallelCompactionTaskNum = 100
rpcCompactionTimeout = 10 * time . Second
2023-02-07 19:02:31 +08:00
tsTimeout = uint64 ( 1 )
2021-11-05 22:25:00 +08:00
)
type compactionPlanContext interface {
start ( )
stop ( )
// execCompactionPlan start to execute plan and return immediately
2021-11-11 15:54:42 +08:00
execCompactionPlan ( signal * compactionSignal , plan * datapb . CompactionPlan ) error
2021-11-05 22:25:00 +08:00
// getCompaction return compaction task. If planId does not exist, return nil.
getCompaction ( planID int64 ) * compactionTask
2022-08-23 15:50:52 +08:00
// updateCompaction set the compaction state to timeout or completed
updateCompaction ( ts Timestamp ) error
2021-11-05 22:25:00 +08:00
// isFull return true if the task pool is full
isFull ( ) bool
2021-11-09 14:47:02 +08:00
// get compaction tasks by signal id
getCompactionTasksBySignalID ( signalID int64 ) [ ] * compactionTask
2021-11-05 22:25:00 +08:00
}
type compactionTaskState int8
const (
executing compactionTaskState = iota + 1
2023-02-15 16:00:33 +08:00
pipelining
2021-11-05 22:25:00 +08:00
completed
2022-08-23 15:50:52 +08:00
failed
2021-11-05 22:25:00 +08:00
timeout
)
var (
errChannelNotWatched = errors . New ( "channel is not watched" )
errChannelInBuffer = errors . New ( "channel is in buffer" )
)
type compactionTask struct {
triggerInfo * compactionSignal
plan * datapb . CompactionPlan
state compactionTaskState
dataNodeID int64
2021-11-09 14:47:02 +08:00
result * datapb . CompactionResult
2021-11-05 22:25:00 +08:00
}
func ( t * compactionTask ) shadowClone ( opts ... compactionTaskOpt ) * compactionTask {
task := & compactionTask {
2021-11-11 15:54:42 +08:00
triggerInfo : t . triggerInfo ,
plan : t . plan ,
state : t . state ,
dataNodeID : t . dataNodeID ,
2021-11-05 22:25:00 +08:00
}
for _ , opt := range opts {
opt ( task )
}
return task
}
var _ compactionPlanContext = ( * compactionPlanHandler ) ( nil )
type compactionPlanHandler struct {
2021-12-21 09:19:10 +08:00
plans map [ int64 ] * compactionTask // planID -> task
2021-11-05 22:25:00 +08:00
sessions * SessionManager
meta * meta
chManager * ChannelManager
mu sync . RWMutex
executingTaskNum int
allocator allocator
quit chan struct { }
wg sync . WaitGroup
flushCh chan UniqueID
2023-01-04 19:37:36 +08:00
//segRefer *SegmentReferenceManager
parallelCh map [ int64 ] chan struct { }
2021-11-05 22:25:00 +08:00
}
func newCompactionPlanHandler ( sessions * SessionManager , cm * ChannelManager , meta * meta ,
2023-01-04 19:37:36 +08:00
allocator allocator , flush chan UniqueID ) * compactionPlanHandler {
2021-11-05 22:25:00 +08:00
return & compactionPlanHandler {
2023-01-04 19:37:36 +08:00
plans : make ( map [ int64 ] * compactionTask ) ,
chManager : cm ,
meta : meta ,
sessions : sessions ,
allocator : allocator ,
flushCh : flush ,
//segRefer: segRefer,
2022-08-23 15:50:52 +08:00
parallelCh : make ( map [ int64 ] chan struct { } ) ,
2021-11-05 22:25:00 +08:00
}
}
func ( c * compactionPlanHandler ) start ( ) {
2022-12-07 18:01:19 +08:00
interval := Params . DataCoordCfg . CompactionCheckIntervalInSeconds . GetAsDuration ( time . Second )
2021-11-05 22:25:00 +08:00
c . quit = make ( chan struct { } )
c . wg . Add ( 1 )
go func ( ) {
defer c . wg . Done ( )
2023-02-23 18:59:45 +08:00
ticker := time . NewTicker ( interval )
defer ticker . Stop ( )
2021-11-05 22:25:00 +08:00
for {
select {
case <- c . quit :
log . Info ( "compaction handler quit" )
return
case <- ticker . C :
cctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
ts , err := c . allocator . allocTimestamp ( cctx )
if err != nil {
log . Warn ( "unable to alloc timestamp" , zap . Error ( err ) )
cancel ( )
continue
}
cancel ( )
2022-08-23 15:50:52 +08:00
_ = c . updateCompaction ( ts )
2021-11-05 22:25:00 +08:00
}
}
} ( )
}
func ( c * compactionPlanHandler ) stop ( ) {
close ( c . quit )
c . wg . Wait ( )
}
2023-02-15 16:00:33 +08:00
func ( c * compactionPlanHandler ) updateTask ( planID int64 , opts ... compactionTaskOpt ) {
c . mu . Lock ( )
defer c . mu . Unlock ( )
c . plans [ planID ] = c . plans [ planID ] . shadowClone ( opts ... )
}
2021-11-05 22:25:00 +08:00
// execCompactionPlan start to execute plan and return immediately
2021-11-11 15:54:42 +08:00
func ( c * compactionPlanHandler ) execCompactionPlan ( signal * compactionSignal , plan * datapb . CompactionPlan ) error {
2021-11-05 22:25:00 +08:00
c . mu . Lock ( )
defer c . mu . Unlock ( )
nodeID , err := c . chManager . FindWatcher ( plan . GetChannel ( ) )
if err != nil {
2022-11-15 11:13:07 +08:00
log . Error ( "failed to find watcher" ,
zap . Int64 ( "plan ID" , plan . GetPlanID ( ) ) ,
zap . Error ( err ) )
2021-11-05 22:25:00 +08:00
return err
}
c . setSegmentsCompacting ( plan , true )
2022-08-26 14:22:55 +08:00
task := & compactionTask {
triggerInfo : signal ,
plan : plan ,
2023-02-15 16:00:33 +08:00
state : pipelining ,
2022-08-26 14:22:55 +08:00
dataNodeID : nodeID ,
}
c . plans [ plan . PlanID ] = task
c . executingTaskNum ++
2022-08-23 15:50:52 +08:00
go func ( ) {
2022-10-25 19:31:30 +08:00
log . Info ( "acquire queue" , zap . Int64 ( "nodeID" , nodeID ) , zap . Int64 ( "planID" , plan . GetPlanID ( ) ) )
2022-08-23 15:50:52 +08:00
c . acquireQueue ( nodeID )
2021-11-05 22:25:00 +08:00
2022-08-23 15:50:52 +08:00
ts , err := c . allocator . allocTimestamp ( context . TODO ( ) )
if err != nil {
log . Warn ( "Alloc start time for CompactionPlan failed" , zap . Int64 ( "planID" , plan . GetPlanID ( ) ) )
2023-02-07 19:02:31 +08:00
// update plan ts to TIMEOUT ts
2023-02-15 16:00:33 +08:00
c . updateTask ( plan . PlanID , setState ( executing ) , setStartTime ( tsTimeout ) )
2022-08-23 15:50:52 +08:00
return
}
2023-02-15 16:00:33 +08:00
c . updateTask ( plan . PlanID , setStartTime ( ts ) )
2022-08-23 15:50:52 +08:00
err = c . sessions . Compaction ( nodeID , plan )
2023-02-15 16:00:33 +08:00
c . updateTask ( plan . PlanID , setState ( executing ) )
2022-08-23 15:50:52 +08:00
if err != nil {
2022-12-06 20:09:18 +08:00
log . Warn ( "try to Compaction but DataNode rejected" ,
zap . Int64 ( "targetNodeID" , nodeID ) ,
zap . Int64 ( "planID" , plan . GetPlanID ( ) ) ,
)
// do nothing here, prevent double release, see issue#21014
// release queue will be done in `updateCompaction`
2022-08-23 15:50:52 +08:00
return
}
2022-10-25 19:31:30 +08:00
log . Info ( "start compaction" , zap . Int64 ( "nodeID" , nodeID ) , zap . Int64 ( "planID" , plan . GetPlanID ( ) ) )
2022-08-23 15:50:52 +08:00
} ( )
2021-11-05 22:25:00 +08:00
return nil
}
func ( c * compactionPlanHandler ) setSegmentsCompacting ( plan * datapb . CompactionPlan , compacting bool ) {
for _ , segmentBinlogs := range plan . GetSegmentBinlogs ( ) {
c . meta . SetSegmentCompacting ( segmentBinlogs . GetSegmentID ( ) , compacting )
}
}
2022-08-23 15:50:52 +08:00
// complete a compaction task
// not threadsafe, only can be used internally
2021-11-05 22:25:00 +08:00
func ( c * compactionPlanHandler ) completeCompaction ( result * datapb . CompactionResult ) error {
planID := result . PlanID
if _ , ok := c . plans [ planID ] ; ! ok {
return fmt . Errorf ( "plan %d is not found" , planID )
}
if c . plans [ planID ] . state != executing {
return fmt . Errorf ( "plan %d's state is %v" , planID , c . plans [ planID ] . state )
}
plan := c . plans [ planID ] . plan
switch plan . GetType ( ) {
2022-02-18 14:51:49 +08:00
case datapb . CompactionType_MergeCompaction , datapb . CompactionType_MixCompaction :
2022-08-25 15:48:54 +08:00
if err := c . handleMergeCompactionResult ( plan , result ) ; err != nil {
2021-11-05 22:25:00 +08:00
return err
}
default :
return errors . New ( "unknown compaction type" )
}
2021-11-11 15:54:42 +08:00
c . plans [ planID ] = c . plans [ planID ] . shadowClone ( setState ( completed ) , setResult ( result ) )
2021-11-05 22:25:00 +08:00
c . executingTaskNum --
2022-02-18 14:51:49 +08:00
if c . plans [ planID ] . plan . GetType ( ) == datapb . CompactionType_MergeCompaction ||
c . plans [ planID ] . plan . GetType ( ) == datapb . CompactionType_MixCompaction {
2021-11-05 22:25:00 +08:00
c . flushCh <- result . GetSegmentID ( )
}
// TODO: when to clean task list
2022-08-23 15:50:52 +08:00
nodeID := c . plans [ planID ] . dataNodeID
c . releaseQueue ( nodeID )
2023-06-26 17:52:44 +08:00
metrics . DataCoordCompactedSegmentSize . WithLabelValues ( ) . Observe ( float64 ( getCompactedSegmentSize ( result ) ) )
2021-11-05 22:25:00 +08:00
return nil
}
2022-08-25 15:48:54 +08:00
func ( c * compactionPlanHandler ) handleMergeCompactionResult ( plan * datapb . CompactionPlan , result * datapb . CompactionResult ) error {
2022-11-22 19:21:13 +08:00
// Also prepare metric updates.
2023-03-17 17:27:56 +08:00
_ , modSegments , newSegment , metricMutation , err := c . meta . PrepareCompleteCompactionMutation ( plan . GetSegmentBinlogs ( ) , result )
2022-11-17 20:37:10 +08:00
if err != nil {
return err
}
2022-09-27 16:02:53 +08:00
log := log . With ( zap . Int64 ( "planID" , plan . GetPlanID ( ) ) )
2023-03-17 17:27:56 +08:00
if err := c . meta . alterMetaStoreAfterCompaction ( newSegment , modSegments ) ; err != nil {
log . Warn ( "fail to alert meta store" , zap . Error ( err ) )
return err
2022-09-27 16:02:53 +08:00
}
var nodeID = c . plans [ plan . GetPlanID ( ) ] . dataNodeID
req := & datapb . SyncSegmentsRequest {
PlanID : plan . PlanID ,
CompactedTo : newSegment . GetID ( ) ,
CompactedFrom : newSegment . GetCompactionFrom ( ) ,
NumOfRows : newSegment . GetNumOfRows ( ) ,
StatsLogs : newSegment . GetStatslogs ( ) ,
}
2022-10-25 19:31:30 +08:00
log . Info ( "handleCompactionResult: syncing segments with node" , zap . Int64 ( "nodeID" , nodeID ) )
2022-09-27 16:02:53 +08:00
if err := c . sessions . SyncSegments ( nodeID , req ) ; err != nil {
log . Warn ( "handleCompactionResult: fail to sync segments with node, reverting metastore" ,
2023-03-17 17:27:56 +08:00
zap . Int64 ( "nodeID" , nodeID ) , zap . Error ( err ) )
return err
2022-09-27 16:02:53 +08:00
}
2022-11-22 19:21:13 +08:00
// Apply metrics after successful meta update.
metricMutation . commit ( )
2022-09-27 16:02:53 +08:00
log . Info ( "handleCompactionResult: success to handle merge compaction result" )
return nil
2021-11-05 22:25:00 +08:00
}
// getCompaction return compaction task. If planId does not exist, return nil.
func ( c * compactionPlanHandler ) getCompaction ( planID int64 ) * compactionTask {
c . mu . RLock ( )
defer c . mu . RUnlock ( )
return c . plans [ planID ]
}
// expireCompaction set the compaction state to expired
2022-08-23 15:50:52 +08:00
func ( c * compactionPlanHandler ) updateCompaction ( ts Timestamp ) error {
planStates := c . sessions . GetCompactionState ( )
2021-11-05 22:25:00 +08:00
c . mu . Lock ( )
defer c . mu . Unlock ( )
tasks := c . getExecutingCompactions ( )
for _ , task := range tasks {
2022-08-23 15:50:52 +08:00
stateResult , ok := planStates [ task . plan . PlanID ]
state := stateResult . GetState ( )
planID := task . plan . PlanID
2023-05-04 12:22:40 +08:00
// check whether the state of CompactionPlan is working
2022-08-23 15:50:52 +08:00
if ok {
if state == commonpb . CompactionState_Completed {
2022-08-26 14:22:55 +08:00
log . Info ( "compaction completed" , zap . Int64 ( "planID" , planID ) , zap . Int64 ( "nodeID" , task . dataNodeID ) )
2023-06-19 14:18:41 +08:00
err := c . completeCompaction ( stateResult . GetResult ( ) )
if err != nil {
log . Warn ( "fail to complete compaction" , zap . Int64 ( "planID" , planID ) , zap . Int64 ( "nodeID" , task . dataNodeID ) , zap . Error ( err ) )
}
2022-08-23 15:50:52 +08:00
continue
}
2022-08-26 14:22:55 +08:00
// check wether the CompactionPlan is timeout
if state == commonpb . CompactionState_Executing && ! c . isTimeout ( ts , task . plan . GetStartTime ( ) , task . plan . GetTimeoutInSeconds ( ) ) {
continue
}
log . Info ( "compaction timeout" ,
zap . Int64 ( "planID" , task . plan . PlanID ) ,
zap . Int64 ( "nodeID" , task . dataNodeID ) ,
zap . Uint64 ( "startTime" , task . plan . GetStartTime ( ) ) ,
zap . Uint64 ( "now" , ts ) ,
)
2022-08-23 15:50:52 +08:00
c . plans [ planID ] = c . plans [ planID ] . shadowClone ( setState ( timeout ) )
2022-08-26 14:22:55 +08:00
continue
2021-11-05 22:25:00 +08:00
}
2022-08-26 14:22:55 +08:00
log . Info ( "compaction failed" , zap . Int64 ( "planID" , task . plan . PlanID ) , zap . Int64 ( "nodeID" , task . dataNodeID ) )
2022-08-23 15:50:52 +08:00
c . plans [ planID ] = c . plans [ planID ] . shadowClone ( setState ( failed ) )
2021-11-05 22:25:00 +08:00
c . setSegmentsCompacting ( task . plan , false )
c . executingTaskNum --
2022-08-23 15:50:52 +08:00
c . releaseQueue ( task . dataNodeID )
2021-11-05 22:25:00 +08:00
}
return nil
}
func ( c * compactionPlanHandler ) isTimeout ( now Timestamp , start Timestamp , timeout int32 ) bool {
2021-12-17 19:14:59 +08:00
startTime , _ := tsoutil . ParseTS ( start )
2021-11-05 22:25:00 +08:00
ts , _ := tsoutil . ParseTS ( now )
2021-12-17 19:14:59 +08:00
return int32 ( ts . Sub ( startTime ) . Seconds ( ) ) >= timeout
2021-11-05 22:25:00 +08:00
}
2022-08-23 15:50:52 +08:00
func ( c * compactionPlanHandler ) acquireQueue ( nodeID int64 ) {
c . mu . Lock ( )
_ , ok := c . parallelCh [ nodeID ]
if ! ok {
c . parallelCh [ nodeID ] = make ( chan struct { } , calculateParallel ( ) )
}
c . mu . Unlock ( )
c . mu . RLock ( )
ch := c . parallelCh [ nodeID ]
c . mu . RUnlock ( )
ch <- struct { } { }
}
func ( c * compactionPlanHandler ) releaseQueue ( nodeID int64 ) {
2022-10-25 19:31:30 +08:00
log . Info ( "try to release queue" , zap . Int64 ( "nodeID" , nodeID ) )
2022-08-23 15:50:52 +08:00
ch , ok := c . parallelCh [ nodeID ]
if ! ok {
return
}
<- ch
}
2021-11-05 22:25:00 +08:00
// isFull return true if the task pool is full
func ( c * compactionPlanHandler ) isFull ( ) bool {
c . mu . RLock ( )
defer c . mu . RUnlock ( )
return c . executingTaskNum >= maxParallelCompactionTaskNum
}
func ( c * compactionPlanHandler ) getExecutingCompactions ( ) [ ] * compactionTask {
tasks := make ( [ ] * compactionTask , 0 , len ( c . plans ) )
for _ , plan := range c . plans {
if plan . state == executing {
tasks = append ( tasks , plan )
}
}
return tasks
}
2022-08-23 15:50:52 +08:00
// get compaction tasks by signal id; if signalID == 0 return all tasks
2021-11-09 14:47:02 +08:00
func ( c * compactionPlanHandler ) getCompactionTasksBySignalID ( signalID int64 ) [ ] * compactionTask {
2021-11-05 22:25:00 +08:00
c . mu . RLock ( )
defer c . mu . RUnlock ( )
2021-11-09 14:47:02 +08:00
var tasks [ ] * compactionTask
2021-11-05 22:25:00 +08:00
for _ , t := range c . plans {
2022-08-23 15:50:52 +08:00
if signalID == 0 {
tasks = append ( tasks , t )
continue
}
2021-11-05 22:25:00 +08:00
if t . triggerInfo . id != signalID {
continue
}
2021-11-09 14:47:02 +08:00
tasks = append ( tasks , t )
2021-11-05 22:25:00 +08:00
}
2021-11-09 14:47:02 +08:00
return tasks
2021-11-05 22:25:00 +08:00
}
type compactionTaskOpt func ( task * compactionTask )
func setState ( state compactionTaskState ) compactionTaskOpt {
return func ( task * compactionTask ) {
task . state = state
}
}
2021-11-09 14:47:02 +08:00
2023-02-15 16:00:33 +08:00
func setStartTime ( startTime uint64 ) compactionTaskOpt {
return func ( task * compactionTask ) {
task . plan . StartTime = startTime
}
}
2021-11-09 14:47:02 +08:00
func setResult ( result * datapb . CompactionResult ) compactionTaskOpt {
return func ( task * compactionTask ) {
task . result = result
}
}
2022-08-23 15:50:52 +08:00
// 0.5*min(8, NumCPU/2)
func calculateParallel ( ) int {
return 2
//cores := runtime.NumCPU()
//if cores < 16 {
//return 4
//}
//return cores / 2
}