enhance: Add CompactionTaskNum metrics (#29518)

See also: #27606

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2023-12-28 15:46:55 +08:00 committed by GitHub
parent a8a0aa9357
commit 4b406e5973
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 103 additions and 82 deletions

View File

@ -252,7 +252,7 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
zap.Int64("planID", task.plan.GetPlanID()),
zap.Int64("node", task.dataNodeID),
)
c.scheduler.Finish(task.dataNodeID, task.plan.PlanID)
c.scheduler.Finish(task.dataNodeID, task.plan)
delete(c.plans, id)
}
}
@ -383,7 +383,7 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionPlan
plan := c.plans[planID].plan
nodeID := c.plans[planID].dataNodeID
defer c.scheduler.Finish(nodeID, plan.PlanID)
defer c.scheduler.Finish(nodeID, plan)
switch plan.GetType() {
case datapb.CompactionType_MergeCompaction, datapb.CompactionType_MixCompaction:
if err := c.handleMergeCompactionResult(plan, result); err != nil {
@ -523,7 +523,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed))
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan.PlanID)
c.scheduler.Finish(task.dataNodeID, task.plan)
}
// Timeout tasks will be timeout and failed in DataNode
@ -537,7 +537,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed))
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan.PlanID)
c.scheduler.Finish(task.dataNodeID, task.plan)
}
// DataNode will check if plan's are timeout but not as sensitive as DataCoord,

View File

@ -1,6 +1,7 @@
package datacoord
import (
"fmt"
"sync"
"github.com/samber/lo"
@ -9,13 +10,14 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type Scheduler interface {
Submit(t ...*compactionTask)
Schedule() []*compactionTask
Finish(nodeID, planID int64)
Finish(nodeID int64, plan *datapb.CompactionPlan)
GetTaskCount() int
LogStatus()
@ -50,6 +52,10 @@ func (s *CompactionScheduler) Submit(tasks ...*compactionTask) {
s.mu.Unlock()
s.taskNumber.Add(int32(len(tasks)))
lo.ForEach(tasks, func(t *compactionTask, _ int) {
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(t.dataNodeID), t.plan.GetType().String(), metrics.Pending).Inc()
})
s.LogStatus()
}
@ -126,6 +132,10 @@ func (s *CompactionScheduler) Schedule() []*compactionTask {
} else {
s.parallelTasks[node] = append(s.parallelTasks[node], task)
}
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(node), task.plan.GetType().String(), metrics.Executing).Inc()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(node), task.plan.GetType().String(), metrics.Pending).Dec()
}
s.queuingTasks = lo.Filter(s.queuingTasks, func(t *compactionTask, _ int) bool {
@ -142,7 +152,8 @@ func (s *CompactionScheduler) Schedule() []*compactionTask {
return lo.Values(executable)
}
func (s *CompactionScheduler) Finish(nodeID, planID UniqueID) {
func (s *CompactionScheduler) Finish(nodeID UniqueID, plan *datapb.CompactionPlan) {
planID := plan.GetPlanID()
log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID))
s.mu.Lock()
@ -152,6 +163,10 @@ func (s *CompactionScheduler) Finish(nodeID, planID UniqueID) {
})
s.parallelTasks[nodeID] = tasks
s.taskNumber.Dec()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Done).Inc()
log.Info("Compaction scheduler remove task from executing")
}
@ -161,6 +176,10 @@ func (s *CompactionScheduler) Finish(nodeID, planID UniqueID) {
if len(filtered) < len(s.queuingTasks) {
s.queuingTasks = filtered
s.taskNumber.Dec()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Done).Inc()
log.Info("Compaction scheduler remove task from queue")
}

View File

@ -1,12 +1,15 @@
package datacoord
import (
"fmt"
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/testutils"
)
func TestSchedulerSuite(t *testing.T) {
@ -14,7 +17,7 @@ func TestSchedulerSuite(t *testing.T) {
}
type SchedulerSuite struct {
suite.Suite
testutils.PromMetricsSuite
scheduler *CompactionScheduler
}
@ -22,11 +25,11 @@ func (s *SchedulerSuite) SetupTest() {
s.scheduler = NewCompactionScheduler()
s.scheduler.parallelTasks = map[int64][]*compactionTask{
100: {
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 2, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch-1", Type: datapb.CompactionType_MixCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 2, Channel: "ch-1", Type: datapb.CompactionType_MixCompaction}},
},
101: {
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 3, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 3, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction}},
},
102: {
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 4, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}},
@ -174,3 +177,41 @@ func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() {
})
}
}
func (s *SchedulerSuite) TestFinish() {
s.Run("finish from parallelTasks", func() {
s.SetupTest()
metrics.DataCoordCompactionTaskNum.Reset()
s.scheduler.Finish(100, &datapb.CompactionPlan{PlanID: 1, Type: datapb.CompactionType_MixCompaction})
taskNum, err := metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues("100", datapb.CompactionType_MixCompaction.String(), metrics.Executing)
s.NoError(err)
s.MetricsEqual(taskNum, -1)
taskNum, err = metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues("100", datapb.CompactionType_MixCompaction.String(), metrics.Done)
s.NoError(err)
s.MetricsEqual(taskNum, 1)
})
s.Run("finish from queuingTasks", func() {
s.SetupTest()
metrics.DataCoordCompactionTaskNum.Reset()
var datanodeID int64 = 10000
plan := &datapb.CompactionPlan{PlanID: 19530, Type: datapb.CompactionType_Level0DeleteCompaction}
s.scheduler.Submit(&compactionTask{plan: plan, dataNodeID: datanodeID})
taskNum, err := metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues(fmt.Sprint(datanodeID), datapb.CompactionType_Level0DeleteCompaction.String(), metrics.Pending)
s.NoError(err)
s.MetricsEqual(taskNum, 1)
s.scheduler.Finish(datanodeID, plan)
taskNum, err = metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues(fmt.Sprint(datanodeID), datapb.CompactionType_Level0DeleteCompaction.String(), metrics.Pending)
s.NoError(err)
s.MetricsEqual(taskNum, 0)
taskNum, err = metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues(fmt.Sprint(datanodeID), datapb.CompactionType_Level0DeleteCompaction.String(), metrics.Done)
s.NoError(err)
s.MetricsEqual(taskNum, 1)
})
}

View File

@ -189,49 +189,6 @@ func (_c *MockChannelManager_FindWatcher_Call) RunAndReturn(run func(string) (in
return _c
}
// GetBufferChannels provides a mock function with given fields:
func (_m *MockChannelManager) GetBufferChannels() *NodeChannelInfo {
ret := _m.Called()
var r0 *NodeChannelInfo
if rf, ok := ret.Get(0).(func() *NodeChannelInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*NodeChannelInfo)
}
}
return r0
}
// MockChannelManager_GetBufferChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBufferChannels'
type MockChannelManager_GetBufferChannels_Call struct {
*mock.Call
}
// GetBufferChannels is a helper method to define mock.On call
func (_e *MockChannelManager_Expecter) GetBufferChannels() *MockChannelManager_GetBufferChannels_Call {
return &MockChannelManager_GetBufferChannels_Call{Call: _e.mock.On("GetBufferChannels")}
}
func (_c *MockChannelManager_GetBufferChannels_Call) Run(run func()) *MockChannelManager_GetBufferChannels_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockChannelManager_GetBufferChannels_Call) Return(_a0 *NodeChannelInfo) *MockChannelManager_GetBufferChannels_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_GetBufferChannels_Call) RunAndReturn(run func() *NodeChannelInfo) *MockChannelManager_GetBufferChannels_Call {
_c.Call.Return(run)
return _c
}
// GetChannelsByCollectionID provides a mock function with given fields: collectionID
func (_m *MockChannelManager) GetChannelsByCollectionID(collectionID int64) []RWChannel {
ret := _m.Called(collectionID)

View File

@ -2,7 +2,10 @@
package datacoord
import mock "github.com/stretchr/testify/mock"
import (
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
mock "github.com/stretchr/testify/mock"
)
// MockScheduler is an autogenerated mock type for the Scheduler type
type MockScheduler struct {
@ -17,9 +20,9 @@ func (_m *MockScheduler) EXPECT() *MockScheduler_Expecter {
return &MockScheduler_Expecter{mock: &_m.Mock}
}
// Finish provides a mock function with given fields: nodeID, planID
func (_m *MockScheduler) Finish(nodeID int64, planID int64) {
_m.Called(nodeID, planID)
// Finish provides a mock function with given fields: nodeID, plan
func (_m *MockScheduler) Finish(nodeID int64, plan *datapb.CompactionPlan) {
_m.Called(nodeID, plan)
}
// MockScheduler_Finish_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Finish'
@ -29,14 +32,14 @@ type MockScheduler_Finish_Call struct {
// Finish is a helper method to define mock.On call
// - nodeID int64
// - planID int64
func (_e *MockScheduler_Expecter) Finish(nodeID interface{}, planID interface{}) *MockScheduler_Finish_Call {
return &MockScheduler_Finish_Call{Call: _e.mock.On("Finish", nodeID, planID)}
// - plan *datapb.CompactionPlan
func (_e *MockScheduler_Expecter) Finish(nodeID interface{}, plan interface{}) *MockScheduler_Finish_Call {
return &MockScheduler_Finish_Call{Call: _e.mock.On("Finish", nodeID, plan)}
}
func (_c *MockScheduler_Finish_Call) Run(run func(nodeID int64, planID int64)) *MockScheduler_Finish_Call {
func (_c *MockScheduler_Finish_Call) Run(run func(nodeID int64, plan *datapb.CompactionPlan)) *MockScheduler_Finish_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64))
run(args[0].(int64), args[1].(*datapb.CompactionPlan))
})
return _c
}
@ -46,7 +49,7 @@ func (_c *MockScheduler_Finish_Call) Return() *MockScheduler_Finish_Call {
return _c
}
func (_c *MockScheduler_Finish_Call) RunAndReturn(run func(int64, int64)) *MockScheduler_Finish_Call {
func (_c *MockScheduler_Finish_Call) RunAndReturn(run func(int64, *datapb.CompactionPlan)) *MockScheduler_Finish_Call {
_c.Call.Return(run)
return _c
}

View File

@ -26,14 +26,6 @@ import (
)
const (
CompactTypeI = "compactTypeI"
CompactTypeII = "compactTypeII"
CompactInputLabel = "input"
CompactInput2Label = "input2"
CompactOutputLabel = "output"
compactIOLabelName = "IO"
compactTypeLabelName = "compactType"
InsertFileLabel = "insert_file"
DeleteFileLabel = "delete_file"
StatFileLabel = "stat_file"
@ -172,6 +164,18 @@ var (
Buckets: buckets,
}, []string{})
DataCoordCompactionTaskNum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "compaction_task_num",
Help: "Number of compaction tasks currently",
}, []string{
nodeIDLabelName,
compactionTypeLabelName,
statusLabelName,
})
FlushedSegmentFileNum = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
@ -226,7 +230,7 @@ var (
Name: "segment_compact_duration",
Help: "time spent on each segment flush",
Buckets: []float64{0.1, 0.5, 1, 5, 10, 20, 50, 100, 250, 500, 1000, 3600, 5000, 10000}, // unit seconds
}, []string{compactTypeLabelName})
}, []string{})
DataCoordCompactLoad = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
@ -234,15 +238,8 @@ var (
Subsystem: typeutil.DataCoordRole,
Name: "compaction_load",
Help: "Information on the input and output of compaction",
}, []string{compactTypeLabelName, compactIOLabelName})
}, []string{})
DataCoordNumCompactionTask = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "num_compaction_tasks",
Help: "Number of compaction tasks currently",
}, []string{statusLabelName})
*/
// IndexRequestCounter records the number of the index requests.

View File

@ -161,7 +161,7 @@ var (
Buckets: longTaskBuckets,
}, []string{
nodeIDLabelName,
compactionTypeLabel,
compactionTypeLabelName,
})
DataNodeCompactionLatencyInQueue = prometheus.NewHistogramVec(

View File

@ -64,7 +64,11 @@ const (
ReduceSegments = "segments"
ReduceShards = "shards"
compactionTypeLabel = "compaction_type"
Pending = "pending"
Executing = "executing"
Done = "done"
compactionTypeLabelName = "compaction_type"
nodeIDLabelName = "node_id"
statusLabelName = "status"
indexTaskStatusLabelName = "index_task_status"