diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 4c288fb5cc..6a6472f029 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -167,12 +167,11 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla nodeID, err := c.chManager.FindWatcher(plan.GetChannel()) if err != nil { - log.Error("failed to find watcher", - zap.Int64("plan ID", plan.GetPlanID()), - zap.Error(err)) + log.Error("failed to find watcher", zap.Int64("planID", plan.GetPlanID()), zap.Error(err)) return err } + log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", nodeID)) c.setSegmentsCompacting(plan, true) task := &compactionTask{ @@ -185,12 +184,12 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla c.executingTaskNum++ go func() { - log.Info("acquire queue", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID())) + log.Info("acquire queue") c.acquireQueue(nodeID) ts, err := c.allocator.allocTimestamp(context.TODO()) if err != nil { - log.Warn("Alloc start time for CompactionPlan failed", zap.Int64("planID", plan.GetPlanID())) + log.Warn("Alloc start time for CompactionPlan failed", zap.Error(err)) // update plan ts to TIMEOUT ts c.updateTask(plan.PlanID, setState(executing), setStartTime(tsTimeout)) return @@ -199,15 +198,12 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla err = c.sessions.Compaction(nodeID, plan) c.updateTask(plan.PlanID, setState(executing)) if err != nil { - log.Warn("try to Compaction but DataNode rejected", - zap.Int64("targetNodeID", nodeID), - zap.Int64("planID", plan.GetPlanID()), - ) + log.Warn("try to Compaction but DataNode rejected", zap.Error(err)) // do nothing here, prevent double release, see issue#21014 // release queue will be done in `updateCompaction` return } - log.Info("start compaction", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID())) + log.Info("start compaction") }() return nil } @@ -299,14 +295,15 @@ func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask { // expireCompaction set the compaction state to expired func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { - // Get executing tasks before GetCompactionState from DataNode to prevent false failure, + // Get executing executingTasks before GetCompactionState from DataNode to prevent false failure, // for DC might add new task while GetCompactionState. - tasks := c.getExecutingCompactions() + executingTasks := c.getTasksByState(executing) + timeoutTasks := c.getTasksByState(timeout) planStates := c.sessions.GetCompactionState() c.mu.Lock() defer c.mu.Unlock() - for _, task := range tasks { + for _, task := range executingTasks { stateResult, ok := planStates[task.plan.PlanID] state := stateResult.GetState() planID := task.plan.PlanID @@ -341,6 +338,30 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { c.releaseQueue(task.dataNodeID) } + // Timeout tasks will be timeout and failed in DataNode + // need to wait for DataNode reporting failure and + // clean the status. + for _, task := range timeoutTasks { + stateResult, ok := planStates[task.plan.PlanID] + planID := task.plan.PlanID + + if !ok { + log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) + c.plans[planID] = c.plans[planID].shadowClone(setState(failed)) + c.setSegmentsCompacting(task.plan, false) + c.executingTaskNum-- + c.releaseQueue(task.dataNodeID) + } + + // DataNode will check if plan's are timeout but not as sensitive as DataCoord, + // just wait another round. + if ok && stateResult.GetState() == commonpb.CompactionState_Executing { + log.Info("compaction timeout in DataCoord yet DataNode is still running", + zap.Int64("planID", planID), + zap.Int64("nodeID", task.dataNodeID)) + continue + } + } return nil } @@ -381,12 +402,12 @@ func (c *compactionPlanHandler) isFull() bool { return c.executingTaskNum >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() } -func (c *compactionPlanHandler) getExecutingCompactions() []*compactionTask { +func (c *compactionPlanHandler) getTasksByState(state compactionTaskState) []*compactionTask { c.mu.RLock() defer c.mu.RUnlock() tasks := make([]*compactionTask, 0, len(c.plans)) for _, plan := range c.plans { - if plan.state == executing { + if plan.state == state { tasks = append(tasks, plan) } } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 534a8936ac..f4cbc3a326 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -747,6 +747,24 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { TimeoutInSeconds: 1, }, }, + 5: { // timeout and failed + state: timeout, + dataNodeID: 2, + plan: &datapb.CompactionPlan{ + PlanID: 5, + StartTime: tsoutil.ComposeTS(ts.UnixNano()/int64(time.Millisecond), 0) - 200*1000, + TimeoutInSeconds: 1, + }, + }, + 6: { // timeout and executing + state: timeout, + dataNodeID: 2, + plan: &datapb.CompactionPlan{ + PlanID: 6, + StartTime: tsoutil.ComposeTS(ts.UnixNano()/int64(time.Millisecond), 0) - 200*1000, + TimeoutInSeconds: 1, + }, + }, }, meta: &meta{ segments: &SegmentsInfo{ @@ -761,12 +779,13 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { data map[int64]*Session }{ data: map[int64]*Session{ - 1: {client: &mockDataNodeClient{ + 2: {client: &mockDataNodeClient{ compactionStateResp: &datapb.CompactionStateResponse{ Results: []*datapb.CompactionStateResult{ {PlanID: 1, State: commonpb.CompactionState_Executing}, {PlanID: 3, State: commonpb.CompactionState_Completed, Result: &datapb.CompactionResult{PlanID: 3}}, {PlanID: 4, State: commonpb.CompactionState_Executing}, + {PlanID: 6, State: commonpb.CompactionState_Executing}, }, }, }}, @@ -776,17 +795,21 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { }, args{ts: tsoutil.ComposeTS(ts.Add(5*time.Second).UnixNano()/int64(time.Millisecond), 0)}, false, - []int64{4}, - []int64{2}, + []int64{4, 6}, + []int64{2, 5}, []int64{1, 3}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := &compactionPlanHandler{ - plans: tt.fields.plans, - sessions: tt.fields.sessions, - meta: tt.fields.meta, + plans: tt.fields.plans, + sessions: tt.fields.sessions, + meta: tt.fields.meta, + parallelCh: make(map[int64]chan struct{}), + } + for range tt.failed { + c.acquireQueue(2) } err := c.updateCompaction(tt.args.ts) @@ -806,6 +829,10 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { task := c.getCompaction(id) assert.NotEqual(t, failed, task.state) } + + c.mu.Lock() + assert.Equal(t, 0, len(c.parallelCh[2])) + c.mu.Unlock() }) } }