diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index bade2d2b78..a57d19aad9 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -23,13 +23,17 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/tsoutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple @@ -92,18 +96,167 @@ func (t *compactionTask) shadowClone(opts ...compactionTaskOpt) *compactionTask var _ compactionPlanContext = (*compactionPlanHandler)(nil) type compactionPlanHandler struct { - plans map[int64]*compactionTask // planID -> task - sessions *SessionManager - meta *meta - chManager *ChannelManager - mu sync.RWMutex - executingTaskNum int - allocator allocator - quit chan struct{} - wg sync.WaitGroup - flushCh chan UniqueID - // segRefer *SegmentReferenceManager - parallelCh map[int64]chan struct{} + plans map[int64]*compactionTask // planID -> task + sessions *SessionManager + meta *meta + chManager *ChannelManager + mu sync.RWMutex + allocator allocator + quit chan struct{} + wg sync.WaitGroup + flushCh chan UniqueID + scheduler *scheduler +} + +type scheduler struct { + taskNumber *atomic.Int32 + queuingTasks []*compactionTask + parallelTasks map[int64][]*compactionTask + mu sync.RWMutex +} + +func newScheduler() *scheduler { + return &scheduler{ + taskNumber: atomic.NewInt32(0), + queuingTasks: make([]*compactionTask, 0), + parallelTasks: make(map[int64][]*compactionTask), + } +} + +// schedule pick 1 or 0 tasks for 1 node +func (s *scheduler) schedule() []*compactionTask { + nodeTasks := make(map[int64][]*compactionTask) // nodeID + + s.mu.Lock() + defer s.mu.Unlock() + for _, task := range s.queuingTasks { + if _, ok := nodeTasks[task.dataNodeID]; !ok { + nodeTasks[task.dataNodeID] = make([]*compactionTask, 0) + } + + nodeTasks[task.dataNodeID] = append(nodeTasks[task.dataNodeID], task) + } + + executable := make(map[int64]*compactionTask) + + pickPriorPolicy := func(tasks []*compactionTask, exclusiveChannels []string, executing []string) *compactionTask { + for _, task := range tasks { + if lo.Contains(exclusiveChannels, task.plan.GetChannel()) { + continue + } + + if task.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction { + // Channel of LevelZeroCompaction task with no executing compactions + if !lo.Contains(executing, task.plan.GetChannel()) { + return task + } + + // Don't schedule any tasks for channel with LevelZeroCompaction task + // when there're executing compactions + exclusiveChannels = append(exclusiveChannels, task.plan.GetChannel()) + continue + } + + return task + } + + return nil + } + + // pick 1 or 0 task for 1 node + for node, tasks := range nodeTasks { + parallel := s.parallelTasks[node] + if len(parallel) >= calculateParallel() { + log.Info("Compaction parallel in DataNode reaches the limit", zap.Int64("nodeID", node), zap.Int("parallel", len(parallel))) + continue + } + + var ( + executing = typeutil.NewSet[string]() + channelsExecPrior = typeutil.NewSet[string]() + ) + for _, t := range parallel { + executing.Insert(t.plan.GetChannel()) + if t.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction { + channelsExecPrior.Insert(t.plan.GetChannel()) + } + } + + picked := pickPriorPolicy(tasks, channelsExecPrior.Collect(), executing.Collect()) + if picked != nil { + executable[node] = picked + } + } + + var pickPlans []int64 + for node, task := range executable { + pickPlans = append(pickPlans, task.plan.PlanID) + if _, ok := s.parallelTasks[node]; !ok { + s.parallelTasks[node] = []*compactionTask{task} + } else { + s.parallelTasks[node] = append(s.parallelTasks[node], task) + } + } + + s.queuingTasks = lo.Filter(s.queuingTasks, func(t *compactionTask, _ int) bool { + return !lo.Contains(pickPlans, t.plan.PlanID) + }) + + // clean parallelTasks with nodes of no running tasks + for node, tasks := range s.parallelTasks { + if len(tasks) == 0 { + delete(s.parallelTasks, node) + } + } + + return lo.Values(executable) +} + +func (s *scheduler) finish(nodeID, planID UniqueID) { + s.mu.Lock() + if parallel, ok := s.parallelTasks[nodeID]; ok { + tasks := lo.Filter(parallel, func(t *compactionTask, _ int) bool { + return t.plan.PlanID != planID + }) + s.parallelTasks[nodeID] = tasks + s.taskNumber.Dec() + } + s.mu.Unlock() + + log.Info("Compaction finished", zap.Int64("planID", planID), zap.Int64("nodeID", nodeID)) + s.logStatus() +} + +func (s *scheduler) logStatus() { + s.mu.RLock() + defer s.mu.RUnlock() + waiting := lo.Map(s.queuingTasks, func(t *compactionTask, _ int) int64 { + return t.plan.PlanID + }) + + var executing []int64 + for _, tasks := range s.parallelTasks { + executing = append(executing, lo.Map(tasks, func(t *compactionTask, _ int) int64 { + return t.plan.PlanID + })...) + } + + if len(waiting) > 0 || len(executing) > 0 { + log.Info("Compaction scheduler status", zap.Int64s("waiting", waiting), zap.Int64s("executing", executing)) + } +} + +func (s *scheduler) submit(tasks ...*compactionTask) { + s.mu.Lock() + s.queuingTasks = append(s.queuingTasks, tasks...) + s.mu.Unlock() + + s.taskNumber.Add(int32(len(tasks))) + s.logStatus() +} + +func (s *scheduler) getExecutingTaskNum() int { + return int(s.taskNumber.Load()) } func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta *meta, @@ -116,8 +269,7 @@ func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta sessions: sessions, allocator: allocator, flushCh: flush, - // segRefer: segRefer, - parallelCh: make(map[int64]chan struct{}), + scheduler: newScheduler(), } } @@ -128,14 +280,18 @@ func (c *compactionPlanHandler) start() { go func() { defer c.wg.Done() - ticker := time.NewTicker(interval) - defer ticker.Stop() + checkResultTicker := time.NewTicker(interval) + scheduleTicker := time.NewTicker(200 * time.Millisecond) + log.Info("compaction handler start", zap.Any("check result interval", interval)) + defer checkResultTicker.Stop() + defer scheduleTicker.Stop() for { select { case <-c.quit: log.Info("compaction handler quit") return - case <-ticker.C: + case <-checkResultTicker.C: + // deal results cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ts, err := c.allocator.allocTimestamp(cctx) if err != nil { @@ -145,6 +301,15 @@ func (c *compactionPlanHandler) start() { } cancel() _ = c.updateCompaction(ts) + + case <-scheduleTicker.C: + // schedule queuing tasks + tasks := c.scheduler.schedule() + c.notifyTasks(tasks) + + if len(tasks) > 0 { + c.scheduler.logStatus() + } } } }() @@ -161,11 +326,7 @@ func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskO c.plans[planID] = c.plans[planID].shadowClone(opts...) } -// execCompactionPlan start to execute plan and return immediately -func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { - c.mu.Lock() - defer c.mu.Unlock() - +func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { nodeID, err := c.chManager.FindWatcher(plan.GetChannel()) if err != nil { log.Error("failed to find watcher", zap.Int64("planID", plan.GetPlanID()), zap.Error(err)) @@ -181,34 +342,46 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla state: pipelining, dataNodeID: nodeID, } + c.mu.Lock() c.plans[plan.PlanID] = task - c.executingTaskNum++ + c.mu.Unlock() - go func() { - log.Info("acquire queue") - c.acquireQueue(nodeID) - - ts, err := c.allocator.allocTimestamp(context.TODO()) - if err != nil { - log.Warn("Alloc start time for CompactionPlan failed", zap.Error(err)) - // update plan ts to TIMEOUT ts - c.updateTask(plan.PlanID, setState(executing), setStartTime(tsTimeout)) - return - } - c.updateTask(plan.PlanID, setStartTime(ts)) - err = c.sessions.Compaction(nodeID, plan) - c.updateTask(plan.PlanID, setState(executing)) - if err != nil { - log.Warn("try to Compaction but DataNode rejected", zap.Error(err)) - // do nothing here, prevent double release, see issue#21014 - // release queue will be done in `updateCompaction` - return - } - log.Info("start compaction") - }() + c.scheduler.submit(task) + log.Info("Compaction plan submited") return nil } +func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) { + for _, task := range tasks { + getOrCreateIOPool().Submit(func() (any, error) { + plan := task.plan + log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", task.dataNodeID)) + log.Info("Notify compaction task to DataNode") + ts, err := c.allocator.allocTimestamp(context.TODO()) + if err != nil { + log.Warn("Alloc start time for CompactionPlan failed", zap.Error(err)) + // update plan ts to TIMEOUT ts + c.updateTask(plan.PlanID, setState(executing), setStartTime(tsTimeout)) + return nil, err + } + c.updateTask(task.plan.PlanID, setStartTime(ts)) + err = c.sessions.Compaction(task.dataNodeID, task.plan) + c.updateTask(task.plan.PlanID, setState(executing)) + if err != nil { + log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) + return nil, err + } + log.Info("Compaction start") + return nil, nil + }) + } +} + +// execCompactionPlan start to execute plan and return immediately +func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { + return c.enqueuePlan(signal, plan) +} + func (c *compactionPlanHandler) setSegmentsCompacting(plan *datapb.CompactionPlan, compacting bool) { for _, segmentBinlogs := range plan.GetSegmentBinlogs() { c.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), compacting) @@ -228,6 +401,8 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu } plan := c.plans[planID].plan + nodeID := c.plans[planID].dataNodeID + defer c.scheduler.finish(nodeID, plan.PlanID) switch plan.GetType() { case datapb.CompactionType_MergeCompaction, datapb.CompactionType_MixCompaction: if err := c.handleMergeCompactionResult(plan, result); err != nil { @@ -237,16 +412,12 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu return errors.New("unknown compaction type") } c.plans[planID] = c.plans[planID].shadowClone(setState(completed), setResult(result)) - c.executingTaskNum-- if c.plans[planID].plan.GetType() == datapb.CompactionType_MergeCompaction || c.plans[planID].plan.GetType() == datapb.CompactionType_MixCompaction { c.flushCh <- result.GetSegmentID() } // TODO: when to clean task list - nodeID := c.plans[planID].dataNodeID - c.releaseQueue(nodeID) - metrics.DataCoordCompactedSegmentSize.WithLabelValues().Observe(float64(getCompactedSegmentSize(result))) return nil } @@ -311,7 +482,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { // check whether the state of CompactionPlan is working if ok { if state == commonpb.CompactionState_Completed { - log.Info("compaction completed", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID)) + log.Info("complete compaction", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID)) err := c.completeCompaction(stateResult.GetResult()) if err != nil { log.Warn("fail to complete compaction", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID), zap.Error(err)) @@ -335,8 +506,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) c.plans[planID] = c.plans[planID].shadowClone(setState(failed)) c.setSegmentsCompacting(task.plan, false) - c.executingTaskNum-- - c.releaseQueue(task.dataNodeID) + c.scheduler.finish(task.dataNodeID, task.plan.PlanID) } // Timeout tasks will be timeout and failed in DataNode @@ -350,8 +520,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) c.plans[planID] = c.plans[planID].shadowClone(setState(failed)) c.setSegmentsCompacting(task.plan, false) - c.executingTaskNum-- - c.releaseQueue(task.dataNodeID) + c.scheduler.finish(task.dataNodeID, task.plan.PlanID) } // DataNode will check if plan's are timeout but not as sensitive as DataCoord, @@ -372,35 +541,9 @@ func (c *compactionPlanHandler) isTimeout(now Timestamp, start Timestamp, timeou return int32(ts.Sub(startTime).Seconds()) >= timeout } -func (c *compactionPlanHandler) acquireQueue(nodeID int64) { - c.mu.Lock() - _, ok := c.parallelCh[nodeID] - if !ok { - c.parallelCh[nodeID] = make(chan struct{}, calculateParallel()) - } - c.mu.Unlock() - - c.mu.RLock() - ch := c.parallelCh[nodeID] - c.mu.RUnlock() - ch <- struct{}{} -} - -func (c *compactionPlanHandler) releaseQueue(nodeID int64) { - log.Info("try to release queue", zap.Int64("nodeID", nodeID)) - ch, ok := c.parallelCh[nodeID] - if !ok { - return - } - <-ch -} - // isFull return true if the task pool is full func (c *compactionPlanHandler) isFull() bool { - c.mu.RLock() - defer c.mu.RUnlock() - - return c.executingTaskNum >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() + return c.scheduler.getExecutingTaskNum() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() } func (c *compactionPlanHandler) getTasksByState(state compactionTaskState) []*compactionTask { @@ -464,3 +607,22 @@ func calculateParallel() int { //} //return cores / 2 } + +var ( + ioPool *conc.Pool[any] + ioPoolInitOnce sync.Once +) + +func initIOPool() { + capacity := Params.DataNodeCfg.IOConcurrency.GetAsInt() + if capacity > 32 { + capacity = 32 + } + // error only happens with negative expiry duration or with negative pre-alloc size. + ioPool = conc.NewPool[any](capacity) +} + +func getOrCreateIOPool() *conc.Pool[any] { + ioPoolInitOnce.Do(initIOPool) + return ioPool +} diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 3f2e7f6b21..8c3489636f 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -23,10 +23,11 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "go.uber.org/zap" + "github.com/stretchr/testify/suite" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -34,13 +35,178 @@ import ( "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) +func TestSchedulerSuite(t *testing.T) { + suite.Run(t, new(SchedulerSuite)) +} + +type SchedulerSuite struct { + suite.Suite + scheduler *scheduler +} + +func (s *SchedulerSuite) SetupTest() { + s.scheduler = newScheduler() + s.scheduler.parallelTasks = map[int64][]*compactionTask{ + 100: { + {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}}, + {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 2, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}}, + }, + 101: { + {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 3, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}}, + }, + 102: { + {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 4, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}}, + }, + } + s.scheduler.taskNumber.Add(4) +} + +func (s *SchedulerSuite) TestScheduleEmpty() { + emptySch := newScheduler() + + tasks := emptySch.schedule() + s.Empty(tasks) + + s.Equal(0, emptySch.getExecutingTaskNum()) + s.Empty(emptySch.queuingTasks) + s.Empty(emptySch.parallelTasks) +} + +func (s *SchedulerSuite) TestScheduleParallelTaskFull() { + // dataNode 100's paralleTasks is full + tests := []struct { + description string + tasks []*compactionTask + expectedOut []UniqueID // planID + }{ + {"with L0 tasks", []*compactionTask{ + {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}}, + {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, + }, []UniqueID{}}, + {"without L0 tasks", []*compactionTask{ + {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_MinorCompaction}}, + {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, + }, []UniqueID{}}, + {"empty tasks", []*compactionTask{}, []UniqueID{}}, + } + + for _, test := range tests { + s.Run(test.description, func() { + s.SetupTest() + s.Require().Equal(4, s.scheduler.getExecutingTaskNum()) + + // submit the testing tasks + s.scheduler.submit(test.tasks...) + s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum()) + + gotTasks := s.scheduler.schedule() + s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 { + return t.plan.PlanID + })) + }) + } +} + +func (s *SchedulerSuite) TestScheduleNodeWith1ParallelTask() { + // dataNode 101's paralleTasks has 1 task running, not L0 task + tests := []struct { + description string + tasks []*compactionTask + expectedOut []UniqueID // planID + }{ + {"with L0 tasks diff channel", []*compactionTask{ + {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}}, + {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, + }, []UniqueID{10}}, + {"with L0 tasks same channel", []*compactionTask{ + {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-2", Type: datapb.CompactionType_Level0DeleteCompaction}}, + {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, + }, []UniqueID{11}}, + {"without L0 tasks", []*compactionTask{ + {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}}, + {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, + }, []UniqueID{14}}, + {"empty tasks", []*compactionTask{}, []UniqueID{}}, + } + + for _, test := range tests { + s.Run(test.description, func() { + s.SetupTest() + s.Require().Equal(4, s.scheduler.getExecutingTaskNum()) + + // submit the testing tasks + s.scheduler.submit(test.tasks...) + s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum()) + + gotTasks := s.scheduler.schedule() + s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 { + return t.plan.PlanID + })) + + // the second schedule returns empty for full paralleTasks + gotTasks = s.scheduler.schedule() + s.Empty(gotTasks) + + s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum()) + }) + } +} + +func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() { + // dataNode 102's paralleTasks has running L0 tasks + // nothing of the same channel will be able to schedule + tests := []struct { + description string + tasks []*compactionTask + expectedOut []UniqueID // planID + }{ + {"with L0 tasks diff channel", []*compactionTask{ + {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}}, + {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, + }, []UniqueID{10}}, + {"with L0 tasks same channel", []*compactionTask{ + {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}}, + {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, + {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}}, + }, []UniqueID{11}}, + {"without L0 tasks", []*compactionTask{ + {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}}, + {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, + }, []UniqueID{13}}, + {"empty tasks", []*compactionTask{}, []UniqueID{}}, + } + + for _, test := range tests { + s.Run(test.description, func() { + s.SetupTest() + s.Require().Equal(4, s.scheduler.getExecutingTaskNum()) + + // submit the testing tasks + s.scheduler.submit(test.tasks...) + s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum()) + + gotTasks := s.scheduler.schedule() + s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 { + return t.plan.PlanID + })) + + // the second schedule returns empty for full paralleTasks + if len(gotTasks) > 0 { + gotTasks = s.scheduler.schedule() + s.Empty(gotTasks) + } + + s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum()) + }) + } +} + func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { type fields struct { plans map[int64]*compactionTask @@ -93,20 +259,11 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { "test exec compaction failed", fields{ plans: map[int64]*compactionTask{}, - sessions: &SessionManager{ - sessions: struct { - sync.RWMutex - data map[int64]*Session - }{ - data: map[int64]*Session{ - 1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1), compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}}, - }, - }, - }, chManager: &ChannelManager{ store: &ChannelStore{ channelsInfo: map[int64]*NodeChannelInfo{ - 1: {NodeID: 1, Channels: []*channel{{Name: "ch1"}}}, + 1: {NodeID: 1, Channels: []*channel{}}, + bufferID: {NodeID: bufferID, Channels: []*channel{}}, }, }, }, @@ -117,79 +274,36 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction}, }, true, - nil, - }, - { - "test_allocate_ts_failed", - fields{ - plans: map[int64]*compactionTask{}, - sessions: &SessionManager{ - sessions: struct { - sync.RWMutex - data map[int64]*Session - }{ - data: map[int64]*Session{ - 1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1), compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}}, - }, - }, - }, - chManager: &ChannelManager{ - store: &ChannelStore{ - channelsInfo: map[int64]*NodeChannelInfo{ - 1: {NodeID: 1, Channels: []*channel{{Name: "ch1"}}}, - }, - }, - }, - allocatorFactory: func() allocator { - a := &NMockAllocator{} - start := time.Now() - a.EXPECT().allocTimestamp(mock.Anything).Call.Return(func(_ context.Context) Timestamp { - return tsoutil.ComposeTSByTime(time.Now(), 0) - }, func(_ context.Context) error { - if time.Since(start) > time.Second*2 { - return nil - } - return errors.New("mocked") - }) - return a - }, - }, - args{ - signal: &compactionSignal{id: 100}, - plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction}, - }, - true, - nil, + errChannelNotWatched, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := &compactionPlanHandler{ - plans: tt.fields.plans, - sessions: tt.fields.sessions, - chManager: tt.fields.chManager, - parallelCh: make(map[int64]chan struct{}), - allocator: tt.fields.allocatorFactory(), + plans: tt.fields.plans, + sessions: tt.fields.sessions, + chManager: tt.fields.chManager, + allocator: tt.fields.allocatorFactory(), + scheduler: newScheduler(), } Params.Save(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key, "1") c.start() err := c.execCompactionPlan(tt.args.signal, tt.args.plan) - assert.Equal(t, tt.err, err) - if err == nil { - task := c.getCompaction(tt.args.plan.PlanID) - if !tt.wantErr { - assert.Equal(t, tt.args.plan, task.plan) - assert.Equal(t, tt.args.signal, task.triggerInfo) - assert.Equal(t, 1, c.executingTaskNum) - } else { - assert.Eventually(t, - func() bool { - c.mu.RLock() - defer c.mu.RUnlock() - return c.executingTaskNum == 0 && len(c.parallelCh[1]) == 0 - }, - 5*time.Second, 100*time.Millisecond) - } + assert.ErrorIs(t, tt.err, err) + + task := c.getCompaction(tt.args.plan.PlanID) + if !tt.wantErr { + assert.Equal(t, tt.args.plan, task.plan) + assert.Equal(t, tt.args.signal, task.triggerInfo) + assert.Equal(t, 1, c.scheduler.getExecutingTaskNum()) + } else { + assert.Eventually(t, + func() bool { + c.scheduler.mu.RLock() + defer c.scheduler.mu.RUnlock() + return c.scheduler.getExecutingTaskNum() == 0 && len(c.scheduler.parallelTasks[1]) == 0 + }, + 5*time.Second, 100*time.Millisecond) } c.stop() }) @@ -198,7 +312,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { func Test_compactionPlanHandler_execWithParallels(t *testing.T) { mockDataNode := &mocks.MockDataNodeClient{} - paramtable.Get().Save(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key, "1") + paramtable.Get().Save(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key, "0.001") defer paramtable.Get().Reset(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key) c := &compactionPlanHandler{ plans: map[int64]*compactionTask{}, @@ -219,8 +333,8 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) { }, }, }, - parallelCh: make(map[int64]chan struct{}), - allocator: newMockAllocator(), + allocator: newMockAllocator(), + scheduler: newScheduler(), } signal := &compactionSignal{id: 100} @@ -228,8 +342,6 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) { plan2 := &datapb.CompactionPlan{PlanID: 2, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction} plan3 := &datapb.CompactionPlan{PlanID: 3, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction} - c.parallelCh[1] = make(chan struct{}, 2) - var mut sync.RWMutex called := 0 @@ -238,38 +350,39 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) { mut.Lock() defer mut.Unlock() called++ - }).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil).Times(3) - go func() { - c.execCompactionPlan(signal, plan1) - c.execCompactionPlan(signal, plan2) - c.execCompactionPlan(signal, plan3) - }() + }).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil).Times(2) - // wait for dispatch signal - <-c.parallelCh[1] - <-c.parallelCh[1] - <-c.parallelCh[1] + err := c.execCompactionPlan(signal, plan1) + require.NoError(t, err) + err = c.execCompactionPlan(signal, plan2) + require.NoError(t, err) + err = c.execCompactionPlan(signal, plan3) + require.NoError(t, err) + + assert.Equal(t, 3, c.scheduler.getExecutingTaskNum()) + + // parallel for the same node are 2 + tasks := c.scheduler.schedule() + assert.Equal(t, 1, len(tasks)) + assert.Equal(t, int64(1), tasks[0].plan.PlanID) + assert.Equal(t, int64(1), tasks[0].dataNodeID) + c.notifyTasks(tasks) + + tasks = c.scheduler.schedule() + assert.Equal(t, 1, len(tasks)) + assert.Equal(t, int64(2), tasks[0].plan.PlanID) + assert.Equal(t, int64(1), tasks[0].dataNodeID) + c.notifyTasks(tasks) // wait for compaction called assert.Eventually(t, func() bool { mut.RLock() defer mut.RUnlock() - return called == 3 - }, time.Second, time.Millisecond*10) + return called == 2 + }, 3*time.Second, time.Millisecond*100) - tasks := c.getCompactionTasksBySignalID(0) - max, min := uint64(0), uint64(0) - for _, v := range tasks { - if max < v.plan.GetStartTime() { - max = v.plan.GetStartTime() - } - if min > v.plan.GetStartTime() { - min = v.plan.GetStartTime() - } - } - - log.Debug("start time", zap.Uint64("min", min), zap.Uint64("max", max)) - assert.Less(t, uint64(2), max-min) + tasks = c.scheduler.schedule() + assert.Equal(t, 0, len(tasks)) } func getInsertLogPath(rootPath string, segmentID typeutil.UniqueID) string { @@ -524,10 +637,11 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { flushCh := make(chan UniqueID, 1) c := &compactionPlanHandler{ - plans: plans, - sessions: sessions, - meta: meta, - flushCh: flushCh, + plans: plans, + sessions: sessions, + meta: meta, + flushCh: flushCh, + scheduler: newScheduler(), } err := c.completeCompaction(&compactionResult) @@ -625,10 +739,11 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { flushCh := make(chan UniqueID, 1) c := &compactionPlanHandler{ - plans: plans, - sessions: sessions, - meta: meta, - flushCh: flushCh, + plans: plans, + sessions: sessions, + meta: meta, + flushCh: flushCh, + scheduler: newScheduler(), } err := c.completeCompaction(&compactionResult) @@ -812,13 +927,10 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := &compactionPlanHandler{ - plans: tt.fields.plans, - sessions: tt.fields.sessions, - meta: tt.fields.meta, - parallelCh: make(map[int64]chan struct{}), - } - for range tt.failed { - c.acquireQueue(2) + plans: tt.fields.plans, + sessions: tt.fields.sessions, + meta: tt.fields.meta, + scheduler: newScheduler(), } err := c.updateCompaction(tt.args.ts) @@ -839,9 +951,9 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { assert.NotEqual(t, failed, task.state) } - c.mu.Lock() - assert.Equal(t, 0, len(c.parallelCh[2])) - c.mu.Unlock() + c.scheduler.mu.Lock() + assert.Equal(t, 0, len(c.scheduler.parallelTasks[2])) + c.scheduler.mu.Unlock() }) } } @@ -869,13 +981,13 @@ func Test_newCompactionPlanHandler(t *testing.T) { nil, }, &compactionPlanHandler{ - plans: map[int64]*compactionTask{}, - sessions: &SessionManager{}, - chManager: &ChannelManager{}, - meta: &meta{}, - allocator: newMockAllocator(), - flushCh: nil, - parallelCh: make(map[int64]chan struct{}), + plans: map[int64]*compactionTask{}, + sessions: &SessionManager{}, + chManager: &ChannelManager{}, + meta: &meta{}, + allocator: newMockAllocator(), + flushCh: nil, + scheduler: newScheduler(), }, }, } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index c00f460514..9fb8f74879 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -1848,7 +1848,7 @@ func Test_compactionTrigger_new(t *testing.T) { } func Test_compactionTrigger_handleSignal(t *testing.T) { - got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler()) + got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler()) signal := &compactionSignal{ segmentID: 1, } @@ -1858,12 +1858,12 @@ func Test_compactionTrigger_handleSignal(t *testing.T) { } func Test_compactionTrigger_allocTs(t *testing.T) { - got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler()) + got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler()) ts, err := got.allocTs() assert.NoError(t, err) assert.True(t, ts > 0) - got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, &FailsAllocator{}, newMockHandler()) + got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, &FailsAllocator{}, newMockHandler()) ts, err = got.allocTs() assert.Error(t, err) assert.Equal(t, uint64(0), ts) @@ -1890,7 +1890,7 @@ func Test_compactionTrigger_getCompactTime(t *testing.T) { } m := &meta{segments: NewSegmentsInfo(), collections: collections} - got := newCompactionTrigger(m, &compactionPlanHandler{}, newMockAllocator(), + got := newCompactionTrigger(m, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), &ServerHandler{ &Server{ meta: m,