mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 19:08:30 +08:00
Fix timeout task never release queue (#26593)
See also: #26413, #26566 Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
87ecaac703
commit
9598a8b236
@ -167,12 +167,11 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
|
||||
|
||||
nodeID, err := c.chManager.FindWatcher(plan.GetChannel())
|
||||
if err != nil {
|
||||
log.Error("failed to find watcher",
|
||||
zap.Int64("plan ID", plan.GetPlanID()),
|
||||
zap.Error(err))
|
||||
log.Error("failed to find watcher", zap.Int64("planID", plan.GetPlanID()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", nodeID))
|
||||
c.setSegmentsCompacting(plan, true)
|
||||
|
||||
task := &compactionTask{
|
||||
@ -185,12 +184,12 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
|
||||
c.executingTaskNum++
|
||||
|
||||
go func() {
|
||||
log.Info("acquire queue", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID()))
|
||||
log.Info("acquire queue")
|
||||
c.acquireQueue(nodeID)
|
||||
|
||||
ts, err := c.allocator.allocTimestamp(context.TODO())
|
||||
if err != nil {
|
||||
log.Warn("Alloc start time for CompactionPlan failed", zap.Int64("planID", plan.GetPlanID()))
|
||||
log.Warn("Alloc start time for CompactionPlan failed", zap.Error(err))
|
||||
// update plan ts to TIMEOUT ts
|
||||
c.updateTask(plan.PlanID, setState(executing), setStartTime(tsTimeout))
|
||||
return
|
||||
@ -199,15 +198,12 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
|
||||
err = c.sessions.Compaction(nodeID, plan)
|
||||
c.updateTask(plan.PlanID, setState(executing))
|
||||
if err != nil {
|
||||
log.Warn("try to Compaction but DataNode rejected",
|
||||
zap.Int64("targetNodeID", nodeID),
|
||||
zap.Int64("planID", plan.GetPlanID()),
|
||||
)
|
||||
log.Warn("try to Compaction but DataNode rejected", zap.Error(err))
|
||||
// do nothing here, prevent double release, see issue#21014
|
||||
// release queue will be done in `updateCompaction`
|
||||
return
|
||||
}
|
||||
log.Info("start compaction", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID()))
|
||||
log.Info("start compaction")
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
@ -299,14 +295,15 @@ func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask {
|
||||
|
||||
// expireCompaction set the compaction state to expired
|
||||
func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
|
||||
// Get executing tasks before GetCompactionState from DataNode to prevent false failure,
|
||||
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
|
||||
// for DC might add new task while GetCompactionState.
|
||||
tasks := c.getExecutingCompactions()
|
||||
executingTasks := c.getTasksByState(executing)
|
||||
timeoutTasks := c.getTasksByState(timeout)
|
||||
planStates := c.sessions.GetCompactionState()
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
for _, task := range tasks {
|
||||
for _, task := range executingTasks {
|
||||
stateResult, ok := planStates[task.plan.PlanID]
|
||||
state := stateResult.GetState()
|
||||
planID := task.plan.PlanID
|
||||
@ -341,6 +338,30 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
|
||||
c.releaseQueue(task.dataNodeID)
|
||||
}
|
||||
|
||||
// Timeout tasks will be timeout and failed in DataNode
|
||||
// need to wait for DataNode reporting failure and
|
||||
// clean the status.
|
||||
for _, task := range timeoutTasks {
|
||||
stateResult, ok := planStates[task.plan.PlanID]
|
||||
planID := task.plan.PlanID
|
||||
|
||||
if !ok {
|
||||
log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
|
||||
c.plans[planID] = c.plans[planID].shadowClone(setState(failed))
|
||||
c.setSegmentsCompacting(task.plan, false)
|
||||
c.executingTaskNum--
|
||||
c.releaseQueue(task.dataNodeID)
|
||||
}
|
||||
|
||||
// DataNode will check if plan's are timeout but not as sensitive as DataCoord,
|
||||
// just wait another round.
|
||||
if ok && stateResult.GetState() == commonpb.CompactionState_Executing {
|
||||
log.Info("compaction timeout in DataCoord yet DataNode is still running",
|
||||
zap.Int64("planID", planID),
|
||||
zap.Int64("nodeID", task.dataNodeID))
|
||||
continue
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -381,12 +402,12 @@ func (c *compactionPlanHandler) isFull() bool {
|
||||
return c.executingTaskNum >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
|
||||
}
|
||||
|
||||
func (c *compactionPlanHandler) getExecutingCompactions() []*compactionTask {
|
||||
func (c *compactionPlanHandler) getTasksByState(state compactionTaskState) []*compactionTask {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
tasks := make([]*compactionTask, 0, len(c.plans))
|
||||
for _, plan := range c.plans {
|
||||
if plan.state == executing {
|
||||
if plan.state == state {
|
||||
tasks = append(tasks, plan)
|
||||
}
|
||||
}
|
||||
|
@ -747,6 +747,24 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
|
||||
TimeoutInSeconds: 1,
|
||||
},
|
||||
},
|
||||
5: { // timeout and failed
|
||||
state: timeout,
|
||||
dataNodeID: 2,
|
||||
plan: &datapb.CompactionPlan{
|
||||
PlanID: 5,
|
||||
StartTime: tsoutil.ComposeTS(ts.UnixNano()/int64(time.Millisecond), 0) - 200*1000,
|
||||
TimeoutInSeconds: 1,
|
||||
},
|
||||
},
|
||||
6: { // timeout and executing
|
||||
state: timeout,
|
||||
dataNodeID: 2,
|
||||
plan: &datapb.CompactionPlan{
|
||||
PlanID: 6,
|
||||
StartTime: tsoutil.ComposeTS(ts.UnixNano()/int64(time.Millisecond), 0) - 200*1000,
|
||||
TimeoutInSeconds: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
meta: &meta{
|
||||
segments: &SegmentsInfo{
|
||||
@ -761,12 +779,13 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
|
||||
data map[int64]*Session
|
||||
}{
|
||||
data: map[int64]*Session{
|
||||
1: {client: &mockDataNodeClient{
|
||||
2: {client: &mockDataNodeClient{
|
||||
compactionStateResp: &datapb.CompactionStateResponse{
|
||||
Results: []*datapb.CompactionStateResult{
|
||||
{PlanID: 1, State: commonpb.CompactionState_Executing},
|
||||
{PlanID: 3, State: commonpb.CompactionState_Completed, Result: &datapb.CompactionResult{PlanID: 3}},
|
||||
{PlanID: 4, State: commonpb.CompactionState_Executing},
|
||||
{PlanID: 6, State: commonpb.CompactionState_Executing},
|
||||
},
|
||||
},
|
||||
}},
|
||||
@ -776,17 +795,21 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
|
||||
},
|
||||
args{ts: tsoutil.ComposeTS(ts.Add(5*time.Second).UnixNano()/int64(time.Millisecond), 0)},
|
||||
false,
|
||||
[]int64{4},
|
||||
[]int64{2},
|
||||
[]int64{4, 6},
|
||||
[]int64{2, 5},
|
||||
[]int64{1, 3},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &compactionPlanHandler{
|
||||
plans: tt.fields.plans,
|
||||
sessions: tt.fields.sessions,
|
||||
meta: tt.fields.meta,
|
||||
plans: tt.fields.plans,
|
||||
sessions: tt.fields.sessions,
|
||||
meta: tt.fields.meta,
|
||||
parallelCh: make(map[int64]chan struct{}),
|
||||
}
|
||||
for range tt.failed {
|
||||
c.acquireQueue(2)
|
||||
}
|
||||
|
||||
err := c.updateCompaction(tt.args.ts)
|
||||
@ -806,6 +829,10 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
|
||||
task := c.getCompaction(id)
|
||||
assert.NotEqual(t, failed, task.state)
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
assert.Equal(t, 0, len(c.parallelCh[2]))
|
||||
c.mu.Unlock()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user