diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index e237cf15ae..96d7c47353 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -164,6 +164,15 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla c.setSegmentsCompacting(plan, true) + task := &compactionTask{ + triggerInfo: signal, + plan: plan, + state: executing, + dataNodeID: nodeID, + } + c.plans[plan.PlanID] = task + c.executingTaskNum++ + go func() { log.Debug("acquire queue", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID())) c.acquireQueue(nodeID) @@ -173,17 +182,11 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla log.Warn("Alloc start time for CompactionPlan failed", zap.Int64("planID", plan.GetPlanID())) return } - plan.StartTime = ts c.mu.Lock() - task := &compactionTask{ - triggerInfo: signal, - plan: plan, - state: executing, - dataNodeID: nodeID, - } - c.plans[plan.PlanID] = task - c.executingTaskNum++ + c.plans[plan.PlanID] = c.plans[plan.PlanID].shadowClone(func(task *compactionTask) { + task.plan.StartTime = ts + }) c.mu.Unlock() err = c.sessions.Compaction(nodeID, plan) @@ -192,6 +195,7 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla c.mu.Lock() delete(c.plans, plan.PlanID) c.executingTaskNum-- + c.releaseQueue(nodeID) c.mu.Unlock() return } @@ -274,20 +278,34 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { stateResult, ok := planStates[task.plan.PlanID] state := stateResult.GetState() planID := task.plan.PlanID + startTime := task.plan.GetStartTime() + // start time is 0 means this task have not started, skip checker + if startTime == 0 { + continue + } // check wether the state of CompactionPlan is working if ok { + if state == commonpb.CompactionState_Completed { + log.Info("compaction completed", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID)) + c.completeCompaction(stateResult.GetResult()) + continue + } // check wether the CompactionPlan is timeout if state == commonpb.CompactionState_Executing && !c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) { continue } - if state == commonpb.CompactionState_Completed { - c.completeCompaction(stateResult.GetResult()) - continue - } + log.Info("compaction timeout", + zap.Int64("planID", task.plan.PlanID), + zap.Int64("nodeID", task.dataNodeID), + zap.Uint64("startTime", task.plan.GetStartTime()), + zap.Uint64("now", ts), + ) c.plans[planID] = c.plans[planID].shadowClone(setState(timeout)) + continue } + log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) c.plans[planID] = c.plans[planID].shadowClone(setState(failed)) c.setSegmentsCompacting(task.plan, false) c.executingTaskNum-- diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 8219de7a2b..0d90c21eb2 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -78,6 +78,35 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { false, nil, }, + { + "test exec compaction failed", + fields{ + plans: map[int64]*compactionTask{}, + sessions: &SessionManager{ + sessions: struct { + sync.RWMutex + data map[int64]*Session + }{ + data: map[int64]*Session{ + 1: {client: &mockDataNodeClient{ch: ch, compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}}, + }, + }, + }, + chManager: &ChannelManager{ + store: &ChannelStore{ + channelsInfo: map[int64]*NodeChannelInfo{ + 1: {NodeID: 1, Channels: []*channel{{Name: "ch1"}}}, + }, + }, + }, + }, + args{ + signal: &compactionSignal{id: 100}, + plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction}, + }, + true, + nil, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -93,9 +122,19 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { if err == nil { <-ch task := c.getCompaction(tt.args.plan.PlanID) - assert.Equal(t, tt.args.plan, task.plan) - assert.Equal(t, tt.args.signal, task.triggerInfo) - assert.Equal(t, 1, c.executingTaskNum) + if !tt.wantErr { + assert.Equal(t, tt.args.plan, task.plan) + assert.Equal(t, tt.args.signal, task.triggerInfo) + assert.Equal(t, 1, c.executingTaskNum) + } else { + assert.Eventually(t, + func() bool { + c.mu.RLock() + defer c.mu.RUnlock() + return c.executingTaskNum == 0 && len(c.parallelCh[1]) == 0 + }, + 5*time.Second, 100*time.Millisecond) + } } }) } @@ -455,7 +494,8 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { fields fields args args wantErr bool - expired []int64 + timeout []int64 + failed []int64 unexpired []int64 }{ { @@ -484,7 +524,7 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { }, }, 3: { - state: completed, + state: executing, dataNodeID: 2, plan: &datapb.CompactionPlan{ PlanID: 3, @@ -519,6 +559,8 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { compactionStateResp: &datapb.CompactionStateResponse{ Results: []*datapb.CompactionStateResult{ {PlanID: 1, State: commonpb.CompactionState_Executing}, + {PlanID: 3, State: commonpb.CompactionState_Completed, Result: &datapb.CompactionResult{PlanID: 3}}, + {PlanID: 4, State: commonpb.CompactionState_Executing}, }, }, }}, @@ -528,7 +570,8 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { }, args{ts: tsoutil.ComposeTS(ts.Add(5*time.Second).UnixNano()/int64(time.Millisecond), 0)}, false, - []int64{2, 4}, + []int64{4}, + []int64{2}, []int64{1, 3}, }, } @@ -543,7 +586,12 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { err := c.updateCompaction(tt.args.ts) assert.Equal(t, tt.wantErr, err != nil) - for _, id := range tt.expired { + for _, id := range tt.timeout { + task := c.getCompaction(id) + assert.Equal(t, timeout, task.state) + } + + for _, id := range tt.failed { task := c.getCompaction(id) assert.Equal(t, failed, task.state) } diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 154f60eb5c..5a5f5f63ab 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -478,12 +478,7 @@ func (t *compactionTrigger) fillOriginPlan(plan *datapb.CompactionPlan) error { if err != nil { return err } - ts, err := t.allocator.allocTimestamp(context.TODO()) - if err != nil { - return err - } plan.PlanID = id - plan.StartTime = ts plan.TimeoutInSeconds = Params.DataCoordCfg.CompactionTimeoutInSeconds return nil } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 7614dd32ef..c82b693938 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -203,7 +203,7 @@ func Test_compactionTrigger_force(t *testing.T) { }, }, }, - StartTime: 3, + StartTime: 0, TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds, Type: datapb.CompactionType_MixCompaction, Timetravel: 200, diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index ddee89bad9..c28672babc 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -960,10 +960,6 @@ func getCompactionMergeInfo(task *compactionTask) *milvuspb.CompactionMergeInfo } func getCompactionState(tasks []*compactionTask) (state commonpb.CompactionState, executingCnt, completedCnt, failedCnt, timeoutCnt int) { - if len(tasks) == 0 { - state = commonpb.CompactionState_Executing - return - } for _, t := range tasks { switch t.state { case executing: