Add scheduler (#27938)

See also: #27606

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2023-11-07 03:18:18 +08:00 committed by GitHub
parent ece592a42f
commit 22c089894e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 491 additions and 217 deletions

View File

@ -23,13 +23,17 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple
@ -97,13 +101,162 @@ type compactionPlanHandler struct {
meta *meta
chManager *ChannelManager
mu sync.RWMutex
executingTaskNum int
allocator allocator
quit chan struct{}
wg sync.WaitGroup
flushCh chan UniqueID
// segRefer *SegmentReferenceManager
parallelCh map[int64]chan struct{}
scheduler *scheduler
}
type scheduler struct {
taskNumber *atomic.Int32
queuingTasks []*compactionTask
parallelTasks map[int64][]*compactionTask
mu sync.RWMutex
}
func newScheduler() *scheduler {
return &scheduler{
taskNumber: atomic.NewInt32(0),
queuingTasks: make([]*compactionTask, 0),
parallelTasks: make(map[int64][]*compactionTask),
}
}
// schedule pick 1 or 0 tasks for 1 node
func (s *scheduler) schedule() []*compactionTask {
nodeTasks := make(map[int64][]*compactionTask) // nodeID
s.mu.Lock()
defer s.mu.Unlock()
for _, task := range s.queuingTasks {
if _, ok := nodeTasks[task.dataNodeID]; !ok {
nodeTasks[task.dataNodeID] = make([]*compactionTask, 0)
}
nodeTasks[task.dataNodeID] = append(nodeTasks[task.dataNodeID], task)
}
executable := make(map[int64]*compactionTask)
pickPriorPolicy := func(tasks []*compactionTask, exclusiveChannels []string, executing []string) *compactionTask {
for _, task := range tasks {
if lo.Contains(exclusiveChannels, task.plan.GetChannel()) {
continue
}
if task.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
// Channel of LevelZeroCompaction task with no executing compactions
if !lo.Contains(executing, task.plan.GetChannel()) {
return task
}
// Don't schedule any tasks for channel with LevelZeroCompaction task
// when there're executing compactions
exclusiveChannels = append(exclusiveChannels, task.plan.GetChannel())
continue
}
return task
}
return nil
}
// pick 1 or 0 task for 1 node
for node, tasks := range nodeTasks {
parallel := s.parallelTasks[node]
if len(parallel) >= calculateParallel() {
log.Info("Compaction parallel in DataNode reaches the limit", zap.Int64("nodeID", node), zap.Int("parallel", len(parallel)))
continue
}
var (
executing = typeutil.NewSet[string]()
channelsExecPrior = typeutil.NewSet[string]()
)
for _, t := range parallel {
executing.Insert(t.plan.GetChannel())
if t.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
channelsExecPrior.Insert(t.plan.GetChannel())
}
}
picked := pickPriorPolicy(tasks, channelsExecPrior.Collect(), executing.Collect())
if picked != nil {
executable[node] = picked
}
}
var pickPlans []int64
for node, task := range executable {
pickPlans = append(pickPlans, task.plan.PlanID)
if _, ok := s.parallelTasks[node]; !ok {
s.parallelTasks[node] = []*compactionTask{task}
} else {
s.parallelTasks[node] = append(s.parallelTasks[node], task)
}
}
s.queuingTasks = lo.Filter(s.queuingTasks, func(t *compactionTask, _ int) bool {
return !lo.Contains(pickPlans, t.plan.PlanID)
})
// clean parallelTasks with nodes of no running tasks
for node, tasks := range s.parallelTasks {
if len(tasks) == 0 {
delete(s.parallelTasks, node)
}
}
return lo.Values(executable)
}
func (s *scheduler) finish(nodeID, planID UniqueID) {
s.mu.Lock()
if parallel, ok := s.parallelTasks[nodeID]; ok {
tasks := lo.Filter(parallel, func(t *compactionTask, _ int) bool {
return t.plan.PlanID != planID
})
s.parallelTasks[nodeID] = tasks
s.taskNumber.Dec()
}
s.mu.Unlock()
log.Info("Compaction finished", zap.Int64("planID", planID), zap.Int64("nodeID", nodeID))
s.logStatus()
}
func (s *scheduler) logStatus() {
s.mu.RLock()
defer s.mu.RUnlock()
waiting := lo.Map(s.queuingTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
})
var executing []int64
for _, tasks := range s.parallelTasks {
executing = append(executing, lo.Map(tasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
})...)
}
if len(waiting) > 0 || len(executing) > 0 {
log.Info("Compaction scheduler status", zap.Int64s("waiting", waiting), zap.Int64s("executing", executing))
}
}
func (s *scheduler) submit(tasks ...*compactionTask) {
s.mu.Lock()
s.queuingTasks = append(s.queuingTasks, tasks...)
s.mu.Unlock()
s.taskNumber.Add(int32(len(tasks)))
s.logStatus()
}
func (s *scheduler) getExecutingTaskNum() int {
return int(s.taskNumber.Load())
}
func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta *meta,
@ -116,8 +269,7 @@ func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta
sessions: sessions,
allocator: allocator,
flushCh: flush,
// segRefer: segRefer,
parallelCh: make(map[int64]chan struct{}),
scheduler: newScheduler(),
}
}
@ -128,14 +280,18 @@ func (c *compactionPlanHandler) start() {
go func() {
defer c.wg.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
checkResultTicker := time.NewTicker(interval)
scheduleTicker := time.NewTicker(200 * time.Millisecond)
log.Info("compaction handler start", zap.Any("check result interval", interval))
defer checkResultTicker.Stop()
defer scheduleTicker.Stop()
for {
select {
case <-c.quit:
log.Info("compaction handler quit")
return
case <-ticker.C:
case <-checkResultTicker.C:
// deal results
cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ts, err := c.allocator.allocTimestamp(cctx)
if err != nil {
@ -145,6 +301,15 @@ func (c *compactionPlanHandler) start() {
}
cancel()
_ = c.updateCompaction(ts)
case <-scheduleTicker.C:
// schedule queuing tasks
tasks := c.scheduler.schedule()
c.notifyTasks(tasks)
if len(tasks) > 0 {
c.scheduler.logStatus()
}
}
}
}()
@ -161,11 +326,7 @@ func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskO
c.plans[planID] = c.plans[planID].shadowClone(opts...)
}
// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
c.mu.Lock()
defer c.mu.Unlock()
func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
nodeID, err := c.chManager.FindWatcher(plan.GetChannel())
if err != nil {
log.Error("failed to find watcher", zap.Int64("planID", plan.GetPlanID()), zap.Error(err))
@ -181,32 +342,44 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
state: pipelining,
dataNodeID: nodeID,
}
c.mu.Lock()
c.plans[plan.PlanID] = task
c.executingTaskNum++
c.mu.Unlock()
go func() {
log.Info("acquire queue")
c.acquireQueue(nodeID)
c.scheduler.submit(task)
log.Info("Compaction plan submited")
return nil
}
func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {
for _, task := range tasks {
getOrCreateIOPool().Submit(func() (any, error) {
plan := task.plan
log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", task.dataNodeID))
log.Info("Notify compaction task to DataNode")
ts, err := c.allocator.allocTimestamp(context.TODO())
if err != nil {
log.Warn("Alloc start time for CompactionPlan failed", zap.Error(err))
// update plan ts to TIMEOUT ts
c.updateTask(plan.PlanID, setState(executing), setStartTime(tsTimeout))
return
return nil, err
}
c.updateTask(plan.PlanID, setStartTime(ts))
err = c.sessions.Compaction(nodeID, plan)
c.updateTask(plan.PlanID, setState(executing))
c.updateTask(task.plan.PlanID, setStartTime(ts))
err = c.sessions.Compaction(task.dataNodeID, task.plan)
c.updateTask(task.plan.PlanID, setState(executing))
if err != nil {
log.Warn("try to Compaction but DataNode rejected", zap.Error(err))
// do nothing here, prevent double release, see issue#21014
// release queue will be done in `updateCompaction`
return
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
return nil, err
}
log.Info("start compaction")
}()
return nil
log.Info("Compaction start")
return nil, nil
})
}
}
// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
return c.enqueuePlan(signal, plan)
}
func (c *compactionPlanHandler) setSegmentsCompacting(plan *datapb.CompactionPlan, compacting bool) {
@ -228,6 +401,8 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
}
plan := c.plans[planID].plan
nodeID := c.plans[planID].dataNodeID
defer c.scheduler.finish(nodeID, plan.PlanID)
switch plan.GetType() {
case datapb.CompactionType_MergeCompaction, datapb.CompactionType_MixCompaction:
if err := c.handleMergeCompactionResult(plan, result); err != nil {
@ -237,16 +412,12 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
return errors.New("unknown compaction type")
}
c.plans[planID] = c.plans[planID].shadowClone(setState(completed), setResult(result))
c.executingTaskNum--
if c.plans[planID].plan.GetType() == datapb.CompactionType_MergeCompaction ||
c.plans[planID].plan.GetType() == datapb.CompactionType_MixCompaction {
c.flushCh <- result.GetSegmentID()
}
// TODO: when to clean task list
nodeID := c.plans[planID].dataNodeID
c.releaseQueue(nodeID)
metrics.DataCoordCompactedSegmentSize.WithLabelValues().Observe(float64(getCompactedSegmentSize(result)))
return nil
}
@ -311,7 +482,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// check whether the state of CompactionPlan is working
if ok {
if state == commonpb.CompactionState_Completed {
log.Info("compaction completed", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID))
log.Info("complete compaction", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID))
err := c.completeCompaction(stateResult.GetResult())
if err != nil {
log.Warn("fail to complete compaction", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID), zap.Error(err))
@ -335,8 +506,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed))
c.setSegmentsCompacting(task.plan, false)
c.executingTaskNum--
c.releaseQueue(task.dataNodeID)
c.scheduler.finish(task.dataNodeID, task.plan.PlanID)
}
// Timeout tasks will be timeout and failed in DataNode
@ -350,8 +520,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed))
c.setSegmentsCompacting(task.plan, false)
c.executingTaskNum--
c.releaseQueue(task.dataNodeID)
c.scheduler.finish(task.dataNodeID, task.plan.PlanID)
}
// DataNode will check if plan's are timeout but not as sensitive as DataCoord,
@ -372,35 +541,9 @@ func (c *compactionPlanHandler) isTimeout(now Timestamp, start Timestamp, timeou
return int32(ts.Sub(startTime).Seconds()) >= timeout
}
func (c *compactionPlanHandler) acquireQueue(nodeID int64) {
c.mu.Lock()
_, ok := c.parallelCh[nodeID]
if !ok {
c.parallelCh[nodeID] = make(chan struct{}, calculateParallel())
}
c.mu.Unlock()
c.mu.RLock()
ch := c.parallelCh[nodeID]
c.mu.RUnlock()
ch <- struct{}{}
}
func (c *compactionPlanHandler) releaseQueue(nodeID int64) {
log.Info("try to release queue", zap.Int64("nodeID", nodeID))
ch, ok := c.parallelCh[nodeID]
if !ok {
return
}
<-ch
}
// isFull return true if the task pool is full
func (c *compactionPlanHandler) isFull() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.executingTaskNum >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
return c.scheduler.getExecutingTaskNum() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
}
func (c *compactionPlanHandler) getTasksByState(state compactionTaskState) []*compactionTask {
@ -464,3 +607,22 @@ func calculateParallel() int {
//}
//return cores / 2
}
var (
ioPool *conc.Pool[any]
ioPoolInitOnce sync.Once
)
func initIOPool() {
capacity := Params.DataNodeCfg.IOConcurrency.GetAsInt()
if capacity > 32 {
capacity = 32
}
// error only happens with negative expiry duration or with negative pre-alloc size.
ioPool = conc.NewPool[any](capacity)
}
func getOrCreateIOPool() *conc.Pool[any] {
ioPoolInitOnce.Do(initIOPool)
return ioPool
}

View File

@ -23,10 +23,11 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -34,13 +35,178 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestSchedulerSuite(t *testing.T) {
suite.Run(t, new(SchedulerSuite))
}
type SchedulerSuite struct {
suite.Suite
scheduler *scheduler
}
func (s *SchedulerSuite) SetupTest() {
s.scheduler = newScheduler()
s.scheduler.parallelTasks = map[int64][]*compactionTask{
100: {
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 2, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}},
},
101: {
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 3, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}},
},
102: {
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 4, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}},
},
}
s.scheduler.taskNumber.Add(4)
}
func (s *SchedulerSuite) TestScheduleEmpty() {
emptySch := newScheduler()
tasks := emptySch.schedule()
s.Empty(tasks)
s.Equal(0, emptySch.getExecutingTaskNum())
s.Empty(emptySch.queuingTasks)
s.Empty(emptySch.parallelTasks)
}
func (s *SchedulerSuite) TestScheduleParallelTaskFull() {
// dataNode 100's paralleTasks is full
tests := []struct {
description string
tasks []*compactionTask
expectedOut []UniqueID // planID
}{
{"with L0 tasks", []*compactionTask{
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.getExecutingTaskNum())
// submit the testing tasks
s.scheduler.submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
gotTasks := s.scheduler.schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
}))
})
}
}
func (s *SchedulerSuite) TestScheduleNodeWith1ParallelTask() {
// dataNode 101's paralleTasks has 1 task running, not L0 task
tests := []struct {
description string
tasks []*compactionTask
expectedOut []UniqueID // planID
}{
{"with L0 tasks diff channel", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{10}},
{"with L0 tasks same channel", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-2", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{11}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{14}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.getExecutingTaskNum())
// submit the testing tasks
s.scheduler.submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
gotTasks := s.scheduler.schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
}))
// the second schedule returns empty for full paralleTasks
gotTasks = s.scheduler.schedule()
s.Empty(gotTasks)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
})
}
}
func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() {
// dataNode 102's paralleTasks has running L0 tasks
// nothing of the same channel will be able to schedule
tests := []struct {
description string
tasks []*compactionTask
expectedOut []UniqueID // planID
}{
{"with L0 tasks diff channel", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{10}},
{"with L0 tasks same channel", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{11}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{13}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.getExecutingTaskNum())
// submit the testing tasks
s.scheduler.submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
gotTasks := s.scheduler.schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
}))
// the second schedule returns empty for full paralleTasks
if len(gotTasks) > 0 {
gotTasks = s.scheduler.schedule()
s.Empty(gotTasks)
}
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
})
}
}
func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
type fields struct {
plans map[int64]*compactionTask
@ -93,20 +259,11 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
"test exec compaction failed",
fields{
plans: map[int64]*compactionTask{},
sessions: &SessionManager{
sessions: struct {
sync.RWMutex
data map[int64]*Session
}{
data: map[int64]*Session{
1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1), compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}},
},
},
},
chManager: &ChannelManager{
store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []*channel{{Name: "ch1"}}},
1: {NodeID: 1, Channels: []*channel{}},
bufferID: {NodeID: bufferID, Channels: []*channel{}},
},
},
},
@ -117,49 +274,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction},
},
true,
nil,
},
{
"test_allocate_ts_failed",
fields{
plans: map[int64]*compactionTask{},
sessions: &SessionManager{
sessions: struct {
sync.RWMutex
data map[int64]*Session
}{
data: map[int64]*Session{
1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1), compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}},
},
},
},
chManager: &ChannelManager{
store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []*channel{{Name: "ch1"}}},
},
},
},
allocatorFactory: func() allocator {
a := &NMockAllocator{}
start := time.Now()
a.EXPECT().allocTimestamp(mock.Anything).Call.Return(func(_ context.Context) Timestamp {
return tsoutil.ComposeTSByTime(time.Now(), 0)
}, func(_ context.Context) error {
if time.Since(start) > time.Second*2 {
return nil
}
return errors.New("mocked")
})
return a
},
},
args{
signal: &compactionSignal{id: 100},
plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction},
},
true,
nil,
errChannelNotWatched,
},
}
for _, tt := range tests {
@ -168,29 +283,28 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
plans: tt.fields.plans,
sessions: tt.fields.sessions,
chManager: tt.fields.chManager,
parallelCh: make(map[int64]chan struct{}),
allocator: tt.fields.allocatorFactory(),
scheduler: newScheduler(),
}
Params.Save(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key, "1")
c.start()
err := c.execCompactionPlan(tt.args.signal, tt.args.plan)
assert.Equal(t, tt.err, err)
if err == nil {
assert.ErrorIs(t, tt.err, err)
task := c.getCompaction(tt.args.plan.PlanID)
if !tt.wantErr {
assert.Equal(t, tt.args.plan, task.plan)
assert.Equal(t, tt.args.signal, task.triggerInfo)
assert.Equal(t, 1, c.executingTaskNum)
assert.Equal(t, 1, c.scheduler.getExecutingTaskNum())
} else {
assert.Eventually(t,
func() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.executingTaskNum == 0 && len(c.parallelCh[1]) == 0
c.scheduler.mu.RLock()
defer c.scheduler.mu.RUnlock()
return c.scheduler.getExecutingTaskNum() == 0 && len(c.scheduler.parallelTasks[1]) == 0
},
5*time.Second, 100*time.Millisecond)
}
}
c.stop()
})
}
@ -198,7 +312,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
func Test_compactionPlanHandler_execWithParallels(t *testing.T) {
mockDataNode := &mocks.MockDataNodeClient{}
paramtable.Get().Save(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key, "1")
paramtable.Get().Save(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key, "0.001")
defer paramtable.Get().Reset(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key)
c := &compactionPlanHandler{
plans: map[int64]*compactionTask{},
@ -219,8 +333,8 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) {
},
},
},
parallelCh: make(map[int64]chan struct{}),
allocator: newMockAllocator(),
scheduler: newScheduler(),
}
signal := &compactionSignal{id: 100}
@ -228,8 +342,6 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) {
plan2 := &datapb.CompactionPlan{PlanID: 2, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction}
plan3 := &datapb.CompactionPlan{PlanID: 3, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction}
c.parallelCh[1] = make(chan struct{}, 2)
var mut sync.RWMutex
called := 0
@ -238,38 +350,39 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) {
mut.Lock()
defer mut.Unlock()
called++
}).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil).Times(3)
go func() {
c.execCompactionPlan(signal, plan1)
c.execCompactionPlan(signal, plan2)
c.execCompactionPlan(signal, plan3)
}()
}).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil).Times(2)
// wait for dispatch signal
<-c.parallelCh[1]
<-c.parallelCh[1]
<-c.parallelCh[1]
err := c.execCompactionPlan(signal, plan1)
require.NoError(t, err)
err = c.execCompactionPlan(signal, plan2)
require.NoError(t, err)
err = c.execCompactionPlan(signal, plan3)
require.NoError(t, err)
assert.Equal(t, 3, c.scheduler.getExecutingTaskNum())
// parallel for the same node are 2
tasks := c.scheduler.schedule()
assert.Equal(t, 1, len(tasks))
assert.Equal(t, int64(1), tasks[0].plan.PlanID)
assert.Equal(t, int64(1), tasks[0].dataNodeID)
c.notifyTasks(tasks)
tasks = c.scheduler.schedule()
assert.Equal(t, 1, len(tasks))
assert.Equal(t, int64(2), tasks[0].plan.PlanID)
assert.Equal(t, int64(1), tasks[0].dataNodeID)
c.notifyTasks(tasks)
// wait for compaction called
assert.Eventually(t, func() bool {
mut.RLock()
defer mut.RUnlock()
return called == 3
}, time.Second, time.Millisecond*10)
return called == 2
}, 3*time.Second, time.Millisecond*100)
tasks := c.getCompactionTasksBySignalID(0)
max, min := uint64(0), uint64(0)
for _, v := range tasks {
if max < v.plan.GetStartTime() {
max = v.plan.GetStartTime()
}
if min > v.plan.GetStartTime() {
min = v.plan.GetStartTime()
}
}
log.Debug("start time", zap.Uint64("min", min), zap.Uint64("max", max))
assert.Less(t, uint64(2), max-min)
tasks = c.scheduler.schedule()
assert.Equal(t, 0, len(tasks))
}
func getInsertLogPath(rootPath string, segmentID typeutil.UniqueID) string {
@ -528,6 +641,7 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
sessions: sessions,
meta: meta,
flushCh: flushCh,
scheduler: newScheduler(),
}
err := c.completeCompaction(&compactionResult)
@ -629,6 +743,7 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
sessions: sessions,
meta: meta,
flushCh: flushCh,
scheduler: newScheduler(),
}
err := c.completeCompaction(&compactionResult)
@ -815,10 +930,7 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
plans: tt.fields.plans,
sessions: tt.fields.sessions,
meta: tt.fields.meta,
parallelCh: make(map[int64]chan struct{}),
}
for range tt.failed {
c.acquireQueue(2)
scheduler: newScheduler(),
}
err := c.updateCompaction(tt.args.ts)
@ -839,9 +951,9 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
assert.NotEqual(t, failed, task.state)
}
c.mu.Lock()
assert.Equal(t, 0, len(c.parallelCh[2]))
c.mu.Unlock()
c.scheduler.mu.Lock()
assert.Equal(t, 0, len(c.scheduler.parallelTasks[2]))
c.scheduler.mu.Unlock()
})
}
}
@ -875,7 +987,7 @@ func Test_newCompactionPlanHandler(t *testing.T) {
meta: &meta{},
allocator: newMockAllocator(),
flushCh: nil,
parallelCh: make(map[int64]chan struct{}),
scheduler: newScheduler(),
},
},
}

View File

@ -1848,7 +1848,7 @@ func Test_compactionTrigger_new(t *testing.T) {
}
func Test_compactionTrigger_handleSignal(t *testing.T) {
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler())
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler())
signal := &compactionSignal{
segmentID: 1,
}
@ -1858,12 +1858,12 @@ func Test_compactionTrigger_handleSignal(t *testing.T) {
}
func Test_compactionTrigger_allocTs(t *testing.T) {
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler())
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler())
ts, err := got.allocTs()
assert.NoError(t, err)
assert.True(t, ts > 0)
got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, &FailsAllocator{}, newMockHandler())
got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, &FailsAllocator{}, newMockHandler())
ts, err = got.allocTs()
assert.Error(t, err)
assert.Equal(t, uint64(0), ts)
@ -1890,7 +1890,7 @@ func Test_compactionTrigger_getCompactTime(t *testing.T) {
}
m := &meta{segments: NewSegmentsInfo(), collections: collections}
got := newCompactionTrigger(m, &compactionPlanHandler{}, newMockAllocator(),
got := newCompactionTrigger(m, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(),
&ServerHandler{
&Server{
meta: m,