diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index fe730caf52..7ae50eb75c 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -535,9 +535,6 @@ func (c *compactionPlanHandler) getCompactionTask(planID int64) CompactionTask { func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) error { log := log.With(zap.Int64("planID", task.GetPlanID()), zap.Int64("triggerID", task.GetTriggerID()), zap.Int64("collectionID", task.GetCollectionID()), zap.String("type", task.GetType().String())) - if c.isFull() { - return errCompactionBusy - } t, err := c.createCompactTask(task) if err != nil { return err @@ -571,12 +568,11 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com } case datapb.CompactionType_ClusteringCompaction: task = &clusteringCompactionTask{ - CompactionTask: t, - meta: c.meta, - sessions: c.sessions, - handler: c.handler, - analyzeScheduler: c.analyzeScheduler, - lastUpdateStateTime: time.Now().UnixMilli(), + CompactionTask: t, + meta: c.meta, + sessions: c.sessions, + handler: c.handler, + analyzeScheduler: c.analyzeScheduler, } default: return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type") diff --git a/internal/datacoord/compaction_task.go b/internal/datacoord/compaction_task.go index 57e52f21c3..6cfdcb9af8 100644 --- a/internal/datacoord/compaction_task.go +++ b/internal/datacoord/compaction_task.go @@ -100,3 +100,15 @@ func setStartTime(startTime int64) compactionTaskOpt { task.StartTime = startTime } } + +func setRetryTimes(retryTimes int32) compactionTaskOpt { + return func(task *datapb.CompactionTask) { + task.RetryTimes = retryTimes + } +} + +func setLastStateStartTime(lastStateStartTime int64) compactionTaskOpt { + return func(task *datapb.CompactionTask) { + task.LastStateStartTime = lastStateStartTime + } +} diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 245a7c6375..fedd0902c4 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -47,10 +47,9 @@ const ( type clusteringCompactionTask struct { *datapb.CompactionTask - plan *datapb.CompactionPlan - result *datapb.CompactionPlanResult - span trace.Span - lastUpdateStateTime int64 + plan *datapb.CompactionPlan + result *datapb.CompactionPlanResult + span trace.Span meta CompactionMeta sessions SessionManager @@ -66,24 +65,22 @@ func (t *clusteringCompactionTask) Process() bool { log.Warn("fail in process task", zap.Error(err)) if merr.IsRetryableErr(err) && t.RetryTimes < taskMaxRetryTimes { // retry in next Process - t.RetryTimes = t.RetryTimes + 1 + t.updateAndSaveTaskMeta(setRetryTimes(t.RetryTimes + 1)) } else { log.Error("task fail with unretryable reason or meet max retry times", zap.Error(err)) - t.State = datapb.CompactionTaskState_failed - t.FailReason = err.Error() + t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error())) } } // task state update, refresh retry times count currentState := t.State.String() if currentState != lastState { - t.RetryTimes = 0 ts := time.Now().UnixMilli() - lastStateDuration := ts - t.lastUpdateStateTime + t.updateAndSaveTaskMeta(setRetryTimes(0), setLastStateStartTime(ts)) + lastStateDuration := ts - t.GetLastStateStartTime() log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse", lastStateDuration)) metrics.DataCoordCompactionLatency. WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), datapb.CompactionType_ClusteringCompaction.String(), lastState). Observe(float64(lastStateDuration)) - t.lastUpdateStateTime = ts if t.State == datapb.CompactionTaskState_completed { t.updateAndSaveTaskMeta(setEndTime(ts)) diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index cb48e69496..12baa0f34f 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -528,7 +528,7 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { s.SetupTest() - s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Once() + s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Maybe() s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil) @@ -545,7 +545,7 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { s.handler.taskNumber.Add(1000) task.PlanID = 2 err = s.handler.enqueueCompaction(task) - s.Error(err) + s.NoError(err) } func (s *CompactionPlanHandlerSuite) TestCheckCompaction() { diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 573eccf0b8..92fe48c5ed 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -289,6 +289,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C PreferSegmentRows: preferSegmentRows, TotalRows: totalRows, AnalyzeTaskID: taskID + 1, + LastStateStartTime: time.Now().UnixMilli(), } err = m.compactionHandler.enqueueCompaction(task) if err != nil { diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 81af5196a6..c2b5a8e5e2 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -913,6 +913,7 @@ message CompactionTask{ int64 prefer_segment_rows = 22; int64 analyzeTaskID = 23; int64 analyzeVersion = 24; + int64 lastStateStartTime = 25; } message PartitionStatsInfo {