enhance: make slotUsage a field of CompactionTask (#36510)

#36509

Signed-off-by: wayblink <anyang.wang@zilliz.com>
This commit is contained in:
wayblink 2024-09-26 20:19:14 +08:00 committed by GitHub
parent c056620899
commit 7ff41697f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 74 additions and 36 deletions

View File

@ -39,7 +39,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -589,19 +588,9 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com
var task CompactionTask
switch t.GetType() {
case datapb.CompactionType_MixCompaction:
task = &mixCompactionTask{
CompactionTask: t,
allocator: c.allocator,
meta: c.meta,
sessions: c.sessions,
}
task = newMixCompactionTask(t, c.allocator, c.meta, c.sessions)
case datapb.CompactionType_Level0DeleteCompaction:
task = &l0CompactionTask{
CompactionTask: t,
allocator: c.allocator,
meta: c.meta,
sessions: c.sessions,
}
task = newL0CompactionTask(t, c.allocator, c.meta, c.sessions)
case datapb.CompactionType_ClusteringCompaction:
task = newClusteringCompactionTask(t, c.allocator, c.meta, c.sessions, c.handler, c.analyzeScheduler)
default:
@ -688,13 +677,10 @@ func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task Comp
nodeID = NullNodeID
var maxSlots int64 = -1
switch task.GetType() {
case datapb.CompactionType_ClusteringCompaction:
useSlot = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_MixCompaction:
useSlot = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_Level0DeleteCompaction:
useSlot = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
useSlot = task.GetSlotUsage()
if useSlot <= 0 {
log.Warn("task slot should not be 0", zap.Int64("planID", task.GetPlanID()), zap.String("type", task.GetType().String()))
return NullNodeID, useSlot
}
for id, slots := range nodeSlots {

View File

@ -26,6 +26,7 @@ import (
type CompactionTask interface {
Process() bool
BuildCompactionRequest() (*datapb.CompactionPlan, error)
GetSlotUsage() int64
GetTriggerID() UniqueID
GetPlanID() UniqueID

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -55,6 +56,7 @@ type clusteringCompactionTask struct {
analyzeScheduler *taskScheduler
maxRetryTimes int32
slotUsage int64
}
func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator, meta CompactionMeta, session session.DataNodeManager, handler Handler, analyzeScheduler *taskScheduler) *clusteringCompactionTask {
@ -66,6 +68,7 @@ func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator.A
handler: handler,
analyzeScheduler: analyzeScheduler,
maxRetryTimes: 3,
slotUsage: paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
}
}
@ -179,7 +182,7 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
Begin: t.GetResultSegments()[0],
End: t.GetResultSegments()[1],
},
SlotUsage: Params.DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
SlotUsage: t.GetSlotUsage(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
@ -615,6 +618,10 @@ func (t *clusteringCompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID)
}
func (t *clusteringCompactionTask) GetSlotUsage() int64 {
return t.slotUsage
}
func (t *clusteringCompactionTask) CleanLogPath() {
if t.plan.GetSegmentBinlogs() != nil {
for _, binlogs := range t.plan.GetSegmentBinlogs() {

View File

@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var _ CompactionTask = (*l0CompactionTask)(nil)
@ -46,6 +47,18 @@ type l0CompactionTask struct {
allocator allocator.Allocator
sessions session.DataNodeManager
meta CompactionMeta
slotUsage int64
}
func newL0CompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator, meta CompactionMeta, session session.DataNodeManager) *l0CompactionTask {
return &l0CompactionTask{
CompactionTask: t,
allocator: allocator,
meta: meta,
sessions: session,
slotUsage: paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(),
}
}
// Note: return True means exit this state machine.
@ -295,7 +308,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
BeginLogID: beginLogID,
SlotUsage: Params.DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(),
SlotUsage: t.GetSlotUsage(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
@ -424,3 +437,7 @@ func (t *l0CompactionTask) saveSegmentMeta() error {
return t.meta.UpdateSegmentsInfo(operators...)
}
func (t *l0CompactionTask) GetSlotUsage() int64 {
return t.slotUsage
}

View File

@ -15,6 +15,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var _ CompactionTask = (*mixCompactionTask)(nil)
@ -29,6 +30,17 @@ type mixCompactionTask struct {
sessions session.DataNodeManager
meta CompactionMeta
newSegmentIDs []int64
slotUsage int64
}
func newMixCompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator, meta CompactionMeta, session session.DataNodeManager) *mixCompactionTask {
return &mixCompactionTask{
CompactionTask: t,
allocator: allocator,
meta: meta,
sessions: session,
slotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
}
}
func (t *mixCompactionTask) processPipelining() bool {
@ -178,12 +190,6 @@ func (t *mixCompactionTask) GetPlan() *datapb.CompactionPlan {
return t.plan
}
/*
func (t *mixCompactionTask) GetState() datapb.CompactionTaskState {
return t.CompactionTask.GetState()
}
*/
func (t *mixCompactionTask) GetLabel() string {
return fmt.Sprintf("%d-%s", t.PartitionID, t.GetChannel())
}
@ -296,12 +302,6 @@ func (t *mixCompactionTask) SetSpan(span trace.Span) {
t.span = span
}
/*
func (t *mixCompactionTask) SetPlan(plan *datapb.CompactionPlan) {
t.plan = plan
}
*/
func (t *mixCompactionTask) SetResult(result *datapb.CompactionPlanResult) {
t.result = result
}
@ -351,7 +351,7 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
Begin: t.GetResultSegments()[0],
End: t.GetResultSegments()[1],
},
SlotUsage: Params.DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
SlotUsage: t.GetSlotUsage(),
MaxSize: t.GetMaxSize(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
@ -378,3 +378,7 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
log.Info("Compaction handler refreshed mix compaction plan", zap.Int64("maxSize", plan.GetMaxSize()), zap.Any("segID2DeltaLogs", segIDMap))
return plan, nil
}
func (t *mixCompactionTask) GetSlotUsage() int64 {
return t.slotUsage
}

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -357,6 +358,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
slotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
})
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot
@ -365,6 +367,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
slotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
})
s.Equal(int64(100), node)
nodeSlots[node] = nodeSlots[node] - useSlot
@ -373,6 +376,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
slotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
})
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot
@ -381,6 +385,22 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
s.Equal(int64(NullNodeID), node)
}
func (s *CompactionPlanHandlerSuite) TestPickAnyNodeSlotUsageShouldNotBeZero() {
s.SetupTest()
nodeSlots := map[int64]int64{
100: 16,
101: 23,
}
nodeID, useSlot := s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
slotUsage: 0,
})
s.Equal(int64(NullNodeID), nodeID)
s.Equal(int64(0), useSlot)
}
func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
s.SetupTest()
nodeSlots := map[int64]int64{
@ -393,17 +413,20 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
slotUsage: paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
}
executingTasks[2] = &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
slotUsage: paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
}
s.handler.executingTasks = executingTasks
node, useSlot := s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
slotUsage: paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
})
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot
@ -412,6 +435,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
slotUsage: paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
})
s.Equal(int64(NullNodeID), node)
}

View File

@ -492,7 +492,6 @@ func Test_compactionTrigger_force(t *testing.T) {
TotalRows: 200,
Schema: schema,
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 100, End: 200},
SlotUsage: 8,
MaxSize: 1342177280,
},
},