enhance: Increase task capacity and clean illegal task (#37896)

1. taskQueueCapacity 256 is too small for production when we want to
re-write the entire collection

2. tasks should be cleaned when unable to recover, or the meta will
remain in etcd forever later.

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2024-11-21 18:48:32 +08:00 committed by GitHub
parent 0a440e0d38
commit d7dcc752f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 5 additions and 4 deletions

View File

@ -562,7 +562,7 @@ dataCoord:
# level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions.
# mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions.
taskPrioritizer: default
taskQueueCapacity: 256 # compaction task queue size
taskQueueCapacity: 100000 # compaction task queue size
rpcTimeout: 10
maxParallelTaskNum: 10
dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds)

View File

@ -337,15 +337,16 @@ func (c *compactionPlanHandler) loadMeta() {
zap.String("state", task.GetState().String()))
continue
} else {
// TODO: how to deal with the create failed tasks, leave it in meta forever?
t, err := c.createCompactTask(task)
if err != nil {
log.Warn("compactionPlanHandler loadMeta create compactionTask failed",
log.Info("compactionPlanHandler loadMeta create compactionTask failed, try to clean it",
zap.Int64("planID", task.GetPlanID()),
zap.String("type", task.GetType().String()),
zap.String("state", task.GetState().String()),
zap.Error(err),
)
// ignore the drop error
c.meta.DropCompactionTask(task)
continue
}
if t.NeedReAssignNodeID() {

View File

@ -3530,7 +3530,7 @@ mix is prioritized by level: mix compactions first, then L0 compactions, then cl
p.CompactionTaskQueueCapacity = ParamItem{
Key: "dataCoord.compaction.taskQueueCapacity",
Version: "2.5.0",
DefaultValue: "256",
DefaultValue: "100000",
Doc: `compaction task queue size`,
Export: true,
}