enhance: Optimize import scheduling and add time cost metric (#36601)

1. Optimize import scheduling strategic:
a. Revise slot weights, calculating them based on the number of files
and segments for both import and pre-import tasks.
b. Ensure that the DN executes tasks in ascending order of task ID.
2. Add time cost metric and log.

issue: https://github.com/milvus-io/milvus/issues/36600,
https://github.com/milvus-io/milvus/issues/36518

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-10-09 14:41:20 +08:00 committed by GitHub
parent 2c3a8b7dea
commit 0fc2a4aa53
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 235 additions and 49 deletions

View File

@ -650,6 +650,7 @@ dataNode:
maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode.
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.
readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import.
maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task.
compaction:
levelZeroBatchMemoryRatio: 0.05 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.

View File

@ -183,7 +183,7 @@ func (c *importChecker) getLackFilesForImports(job ImportJob) []*datapb.ImportFi
}
func (c *importChecker) checkPendingJob(job ImportJob) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
log := log.With(zap.Int64("jobID", job.GetJobID()))
lacks := c.getLackFilesForPreImports(job)
if len(lacks) == 0 {
return
@ -192,7 +192,7 @@ func (c *importChecker) checkPendingJob(job ImportJob) {
newTasks, err := NewPreImportTasks(fileGroups, job, c.alloc)
if err != nil {
logger.Warn("new preimport tasks failed", zap.Error(err))
log.Warn("new preimport tasks failed", zap.Error(err))
return
}
for _, t := range newTasks {
@ -203,14 +203,19 @@ func (c *importChecker) checkPendingJob(job ImportJob) {
}
log.Info("add new preimport task", WrapTaskLog(t)...)
}
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
if err != nil {
logger.Warn("failed to update job state to PreImporting", zap.Error(err))
log.Warn("failed to update job state to PreImporting", zap.Error(err))
return
}
pendingDuration := job.GetTR().RecordSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds()))
log.Info("import job start to execute", zap.Duration("jobTimeCost/pending", pendingDuration))
}
func (c *importChecker) checkPreImportingJob(job ImportJob) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
log := log.With(zap.Int64("jobID", job.GetJobID()))
lacks := c.getLackFilesForImports(job)
if len(lacks) == 0 {
return
@ -218,10 +223,10 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
requestSize, err := CheckDiskQuota(job, c.meta, c.imeta)
if err != nil {
logger.Warn("import failed, disk quota exceeded", zap.Error(err))
log.Warn("import failed, disk quota exceeded", zap.Error(err))
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
if err != nil {
logger.Warn("failed to update job state to Failed", zap.Error(err))
log.Warn("failed to update job state to Failed", zap.Error(err))
}
return
}
@ -230,7 +235,7 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
groups := RegroupImportFiles(job, lacks, allDiskIndex)
newTasks, err := NewImportTasks(groups, job, c.alloc, c.meta)
if err != nil {
logger.Warn("new import tasks failed", zap.Error(err))
log.Warn("new import tasks failed", zap.Error(err))
return
}
for _, t := range newTasks {
@ -241,13 +246,19 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
}
log.Info("add new import task", WrapTaskLog(t)...)
}
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing), UpdateRequestedDiskSize(requestSize))
if err != nil {
logger.Warn("failed to update job state to Importing", zap.Error(err))
log.Warn("failed to update job state to Importing", zap.Error(err))
return
}
preImportDuration := job.GetTR().RecordSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStagePreImport).Observe(float64(preImportDuration.Milliseconds()))
log.Info("import job preimport done", zap.Duration("jobTimeCost/preimport", preImportDuration))
}
func (c *importChecker) checkImportingJob(job ImportJob) {
log := log.With(zap.Int64("jobID", job.GetJobID()))
tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID()))
for _, t := range tasks {
if t.GetState() != datapb.ImportTaskStateV2_Completed {
@ -259,18 +270,22 @@ func (c *importChecker) checkImportingJob(job ImportJob) {
log.Warn("failed to update job state to Stats", zap.Error(err))
return
}
log.Info("update import job state to Stats", zap.Int64("jobID", job.GetJobID()))
importDuration := job.GetTR().RecordSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageImport).Observe(float64(importDuration.Milliseconds()))
log.Info("import job import done", zap.Duration("jobTimeCost/import", importDuration))
}
func (c *importChecker) checkStatsJob(job ImportJob) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
log := log.With(zap.Int64("jobID", job.GetJobID()))
updateJobState := func(state internalpb.ImportJobState) {
err := c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(state))
if err != nil {
logger.Warn("failed to update job state", zap.Error(err))
log.Warn("failed to update job state", zap.Error(err))
return
}
logger.Info("update import job state", zap.String("state", state.String()))
statsDuration := job.GetTR().RecordSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageStats).Observe(float64(statsDuration.Milliseconds()))
log.Info("import job stats done", zap.Duration("jobTimeCost/stats", statsDuration))
}
// Skip stats stage if not enable stats or is l0 import.
@ -290,18 +305,20 @@ func (c *importChecker) checkStatsJob(job ImportJob) {
statsSegmentIDs := task.(*importTask).GetStatsSegmentIDs()
taskCnt += len(originSegmentIDs)
for i, originSegmentID := range originSegmentIDs {
taskLogFields := WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i]))
state := c.sjm.GetStatsTaskState(originSegmentID, indexpb.StatsSubJob_Sort)
switch state {
case indexpb.JobState_JobStateNone:
err := c.sjm.SubmitStatsTask(originSegmentID, statsSegmentIDs[i], indexpb.StatsSubJob_Sort, false)
if err != nil {
logger.Warn("submit stats task failed", zap.Error(err))
log.Warn("submit stats task failed", zap.Error(err))
continue
}
log.Info("submit stats task done", WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i]))...)
log.Info("submit stats task done", taskLogFields...)
case indexpb.JobState_JobStateInit, indexpb.JobState_JobStateRetry, indexpb.JobState_JobStateInProgress:
logger.Debug("waiting for stats task...", WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i]))...)
log.Debug("waiting for stats task...", taskLogFields...)
case indexpb.JobState_JobStateFailed:
log.Warn("import job stats failed", taskLogFields...)
updateJobState(internalpb.ImportJobState_Failed)
return
case indexpb.JobState_JobStateFinished:
@ -317,7 +334,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) {
}
func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
log := log.With(zap.Int64("jobID", job.GetJobID()))
tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID()))
originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
return t.(*importTask).GetSegmentIDs()
@ -339,28 +356,32 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
default:
}
}
logger.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed))
log.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed))
return
}
buildIndexDuration := job.GetTR().RecordSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageBuildIndex).Observe(float64(buildIndexDuration.Milliseconds()))
log.Info("import job build index done", zap.Duration("jobTimeCost/buildIndex", buildIndexDuration))
// Here, all segment indexes have been successfully built, try unset isImporting flag for all segments.
isImportingSegments := lo.Filter(append(originSegmentIDs, statsSegmentIDs...), func(segmentID int64, _ int) bool {
segment := c.meta.GetSegment(segmentID)
if segment == nil {
logger.Warn("cannot find segment", zap.Int64("segmentID", segmentID))
log.Warn("cannot find segment", zap.Int64("segmentID", segmentID))
return false
}
return segment.GetIsImporting()
})
channels, err := c.meta.GetSegmentsChannels(isImportingSegments)
if err != nil {
logger.Warn("get segments channels failed", zap.Error(err))
log.Warn("get segments channels failed", zap.Error(err))
return
}
for _, segmentID := range isImportingSegments {
channelCP := c.meta.GetChannelCheckpoint(channels[segmentID])
if channelCP == nil {
logger.Warn("nil channel checkpoint")
log.Warn("nil channel checkpoint")
return
}
op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}})
@ -368,7 +389,7 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
op3 := UpdateIsImporting(segmentID, false)
err = c.meta.UpdateSegmentsInfo(op1, op2, op3)
if err != nil {
logger.Warn("update import segment failed", zap.Error(err))
log.Warn("update import segment failed", zap.Error(err))
return
}
}
@ -377,10 +398,12 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime))
if err != nil {
logger.Warn("failed to update job state to Completed", zap.Error(err))
log.Warn("failed to update job state to Completed", zap.Error(err))
return
}
logger.Info("import job completed")
totalDuration := job.GetTR().ElapseSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.TotalLabel).Observe(float64(totalDuration.Milliseconds()))
log.Info("import job all completed", zap.Duration("jobTimeCost/total", totalDuration))
}
func (c *importChecker) checkFailedJob(job ImportJob) {
@ -462,9 +485,9 @@ func (c *importChecker) checkGC(job ImportJob) {
}
cleanupTime := tsoutil.PhysicalTime(job.GetCleanupTs())
if time.Now().After(cleanupTime) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
log := log.With(zap.Int64("jobID", job.GetJobID()))
GCRetention := Params.DataCoordCfg.ImportTaskRetention.GetAsDuration(time.Second)
logger.Info("job has reached the GC retention",
log.Info("job has reached the GC retention",
zap.Time("cleanupTime", cleanupTime), zap.Duration("GCRetention", GCRetention))
tasks := c.imeta.GetTaskBy(WithJob(job.GetJobID()))
shouldRemoveJob := true
@ -492,9 +515,9 @@ func (c *importChecker) checkGC(job ImportJob) {
}
err := c.imeta.RemoveJob(job.GetJobID())
if err != nil {
logger.Warn("remove import job failed", zap.Error(err))
log.Warn("remove import job failed", zap.Error(err))
return
}
logger.Info("import job removed")
log.Info("import job removed")
}
}

View File

@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
@ -101,6 +102,7 @@ func (s *ImportCheckerSuite) SetupTest() {
},
},
},
tr: timerecord.NewTimeRecorder("import job"),
}
catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil)
@ -120,6 +122,7 @@ func (s *ImportCheckerSuite) TestLogStats() {
TaskID: 1,
State: datapb.ImportTaskStateV2_Failed,
},
tr: timerecord.NewTimeRecorder("preimport task"),
}
err := s.imeta.AddTask(pit1)
s.NoError(err)
@ -131,6 +134,7 @@ func (s *ImportCheckerSuite) TestLogStats() {
SegmentIDs: []int64{10, 11, 12},
State: datapb.ImportTaskStateV2_Pending,
},
tr: timerecord.NewTimeRecorder("import task"),
}
err = s.imeta.AddTask(it1)
s.NoError(err)
@ -310,6 +314,7 @@ func (s *ImportCheckerSuite) TestCheckTimeout() {
TaskID: 1,
State: datapb.ImportTaskStateV2_InProgress,
},
tr: timerecord.NewTimeRecorder("preimport task"),
}
err := s.imeta.AddTask(task)
s.NoError(err)
@ -332,6 +337,7 @@ func (s *ImportCheckerSuite) TestCheckFailure() {
SegmentIDs: []int64{2},
StatsSegmentIDs: []int64{3},
},
tr: timerecord.NewTimeRecorder("import task"),
}
err := s.imeta.AddTask(it)
s.NoError(err)
@ -371,6 +377,7 @@ func (s *ImportCheckerSuite) TestCheckGC() {
SegmentIDs: []int64{2},
StatsSegmentIDs: []int64{3},
},
tr: timerecord.NewTimeRecorder("import task"),
}
err := s.imeta.AddTask(task)
s.NoError(err)
@ -447,6 +454,7 @@ func (s *ImportCheckerSuite) TestCheckCollection() {
TaskID: 1,
State: datapb.ImportTaskStateV2_Pending,
},
tr: timerecord.NewTimeRecorder("preimport task"),
}
err := s.imeta.AddTask(task)
s.NoError(err)

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
@ -91,15 +92,23 @@ type ImportJob interface {
GetCompleteTime() string
GetFiles() []*internalpb.ImportFile
GetOptions() []*commonpb.KeyValuePair
GetTR() *timerecord.TimeRecorder
Clone() ImportJob
}
type importJob struct {
*datapb.ImportJob
tr *timerecord.TimeRecorder
}
func (j *importJob) GetTR() *timerecord.TimeRecorder {
return j.tr
}
func (j *importJob) Clone() ImportJob {
return &importJob{
ImportJob: proto.Clone(j.ImportJob).(*datapb.ImportJob),
tr: j.tr,
}
}

View File

@ -19,6 +19,7 @@ package datacoord
import (
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
type ImportMeta interface {
@ -61,11 +62,13 @@ func NewImportMeta(catalog metastore.DataCoordCatalog) (ImportMeta, error) {
for _, task := range restoredPreImportTasks {
tasks[task.GetTaskID()] = &preImportTask{
PreImportTask: task,
tr: timerecord.NewTimeRecorder("preimport task"),
}
}
for _, task := range restoredImportTasks {
tasks[task.GetTaskID()] = &importTask{
ImportTaskV2: task,
tr: timerecord.NewTimeRecorder("import task"),
}
}
@ -73,6 +76,7 @@ func NewImportMeta(catalog metastore.DataCoordCatalog) (ImportMeta, error) {
for _, job := range restoredJobs {
jobs[job.GetJobID()] = &importJob{
ImportJob: job,
tr: timerecord.NewTimeRecorder("import job"),
}
}

View File

@ -91,23 +91,6 @@ func (s *importScheduler) Close() {
}
func (s *importScheduler) process() {
getNodeID := func(nodeSlots map[int64]int64) int64 {
var (
nodeID int64 = NullNodeID
maxSlots int64 = -1
)
for id, slots := range nodeSlots {
if slots > 0 && slots > maxSlots {
nodeID = id
maxSlots = slots
}
}
if nodeID != NullNodeID {
nodeSlots[nodeID]--
}
return nodeID
}
jobs := s.imeta.GetJobBy()
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].GetJobID() < jobs[j].GetJobID()
@ -118,7 +101,7 @@ func (s *importScheduler) process() {
for _, task := range tasks {
switch task.GetState() {
case datapb.ImportTaskStateV2_Pending:
nodeID := getNodeID(nodeSlots)
nodeID := s.getNodeID(task, nodeSlots)
switch task.GetType() {
case PreImportTaskType:
s.processPendingPreImport(task, nodeID)
@ -167,6 +150,25 @@ func (s *importScheduler) peekSlots() map[int64]int64 {
return nodeSlots
}
func (s *importScheduler) getNodeID(task ImportTask, nodeSlots map[int64]int64) int64 {
var (
nodeID int64 = NullNodeID
maxSlots int64 = -1
)
require := task.GetSlots()
for id, slots := range nodeSlots {
// find the most idle datanode
if slots > 0 && slots >= require && slots > maxSlots {
nodeID = id
maxSlots = slots
}
}
if nodeID != NullNodeID {
nodeSlots[nodeID] -= require
}
return nodeID
}
func (s *importScheduler) processPendingPreImport(task ImportTask, nodeID int64) {
if nodeID == NullNodeID {
return
@ -186,7 +188,9 @@ func (s *importScheduler) processPendingPreImport(task ImportTask, nodeID int64)
log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...)
return
}
log.Info("process pending preimport task done", WrapTaskLog(task)...)
pendingDuration := task.GetTR().RecordSpan()
metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds()))
log.Info("preimport task start to execute", WrapTaskLog(task, zap.Int64("scheduledNodeID", nodeID), zap.Duration("taskTimeCost/pending", pendingDuration))...)
}
func (s *importScheduler) processPendingImport(task ImportTask, nodeID int64) {
@ -212,7 +216,9 @@ func (s *importScheduler) processPendingImport(task ImportTask, nodeID int64) {
log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...)
return
}
log.Info("processing pending import task done", WrapTaskLog(task)...)
pendingDuration := task.GetTR().RecordSpan()
metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds()))
log.Info("import task start to execute", WrapTaskLog(task, zap.Int64("scheduledNodeID", nodeID), zap.Duration("taskTimeCost/pending", pendingDuration))...)
}
func (s *importScheduler) processInProgressPreImport(task ImportTask) {
@ -249,6 +255,11 @@ func (s *importScheduler) processInProgressPreImport(task ImportTask) {
}
log.Info("query preimport", WrapTaskLog(task, zap.String("state", resp.GetState().String()),
zap.Any("fileStats", resp.GetFileStats()))...)
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
preimportDuration := task.GetTR().RecordSpan()
metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePreImport).Observe(float64(preimportDuration.Milliseconds()))
log.Info("preimport done", WrapTaskLog(task, zap.Duration("taskTimeCost/preimport", preimportDuration))...)
}
}
func (s *importScheduler) processInProgressImport(task ImportTask) {
@ -322,6 +333,9 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...)
return
}
importDuration := task.GetTR().RecordSpan()
metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStageImport).Observe(float64(importDuration.Milliseconds()))
log.Info("import done", WrapTaskLog(task, zap.Duration("taskTimeCost/import", importDuration))...)
}
log.Info("query import", WrapTaskLog(task, zap.String("state", resp.GetState().String()),
zap.String("reason", resp.GetReason()))...)

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
type ImportSchedulerSuite struct {
@ -87,6 +88,7 @@ func (s *ImportSchedulerSuite) TestProcessPreImport() {
CollectionID: s.collectionID,
State: datapb.ImportTaskStateV2_Pending,
},
tr: timerecord.NewTimeRecorder("preimport task"),
}
err := s.imeta.AddTask(task)
s.NoError(err)
@ -97,6 +99,7 @@ func (s *ImportSchedulerSuite) TestProcessPreImport() {
TimeoutTs: math.MaxUint64,
Schema: &schemapb.CollectionSchema{},
},
tr: timerecord.NewTimeRecorder("import job"),
}
err = s.imeta.AddJob(job)
s.NoError(err)
@ -157,6 +160,7 @@ func (s *ImportSchedulerSuite) TestProcessImport() {
},
},
},
tr: timerecord.NewTimeRecorder("import task"),
}
err := s.imeta.AddTask(task)
s.NoError(err)
@ -169,6 +173,7 @@ func (s *ImportSchedulerSuite) TestProcessImport() {
Schema: &schemapb.CollectionSchema{},
TimeoutTs: math.MaxUint64,
},
tr: timerecord.NewTimeRecorder("import job"),
}
err = s.imeta.AddJob(job)
s.NoError(err)
@ -223,6 +228,7 @@ func (s *ImportSchedulerSuite) TestProcessFailed() {
StatsSegmentIDs: []int64{4, 5},
State: datapb.ImportTaskStateV2_Failed,
},
tr: timerecord.NewTimeRecorder("import task"),
}
err := s.imeta.AddTask(task)
s.NoError(err)
@ -235,6 +241,7 @@ func (s *ImportSchedulerSuite) TestProcessFailed() {
Schema: &schemapb.CollectionSchema{},
TimeoutTs: math.MaxUint64,
},
tr: timerecord.NewTimeRecorder("import job"),
}
err = s.imeta.AddJob(job)
s.NoError(err)

View File

@ -17,9 +17,12 @@
package datacoord
import (
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
type TaskType int
@ -139,33 +142,62 @@ type ImportTask interface {
GetState() datapb.ImportTaskStateV2
GetReason() string
GetFileStats() []*datapb.ImportFileStats
GetTR() *timerecord.TimeRecorder
GetSlots() int64
Clone() ImportTask
}
type preImportTask struct {
*datapb.PreImportTask
tr *timerecord.TimeRecorder
}
func (p *preImportTask) GetType() TaskType {
return PreImportTaskType
}
func (p *preImportTask) GetTR() *timerecord.TimeRecorder {
return p.tr
}
func (p *preImportTask) GetSlots() int64 {
return int64(funcutil.Min(len(p.GetFileStats()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt()))
}
func (p *preImportTask) Clone() ImportTask {
return &preImportTask{
PreImportTask: proto.Clone(p.PreImportTask).(*datapb.PreImportTask),
tr: p.tr,
}
}
type importTask struct {
*datapb.ImportTaskV2
tr *timerecord.TimeRecorder
}
func (t *importTask) GetType() TaskType {
return ImportTaskType
}
func (t *importTask) GetTR() *timerecord.TimeRecorder {
return t.tr
}
func (t *importTask) GetSlots() int64 {
// Consider the following two scenarios:
// 1. Importing a large number of small files results in
// a small total data size, making file count unsuitable as a slot number.
// 2. Importing a file with many shards number results in many segments and a small total data size,
// making segment count unsuitable as a slot number.
// Taking these factors into account, we've decided to use the
// minimum value between segment count and file count as the slot number.
return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt()))
}
func (t *importTask) Clone() ImportTask {
return &importTask{
ImportTaskV2: proto.Clone(t.ImportTaskV2).(*datapb.ImportTaskV2),
tr: t.tr,
}
}

View File

@ -39,6 +39,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
func WrapTaskLog(task ImportTask, fields ...zap.Field) []zap.Field {
@ -47,6 +48,7 @@ func WrapTaskLog(task ImportTask, fields ...zap.Field) []zap.Field {
zap.Int64("jobID", task.GetJobID()),
zap.Int64("collectionID", task.GetCollectionID()),
zap.String("type", task.GetType().String()),
zap.Int64("nodeID", task.GetNodeID()),
}
res = append(res, fields...)
return res
@ -75,6 +77,7 @@ func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile,
State: datapb.ImportTaskStateV2_Pending,
FileStats: fileStats,
},
tr: timerecord.NewTimeRecorder("preimport task"),
}
tasks = append(tasks, task)
}
@ -99,6 +102,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats,
State: datapb.ImportTaskStateV2_Pending,
FileStats: group,
},
tr: timerecord.NewTimeRecorder("import task"),
}
segments, err := AssignSegments(job, task, alloc, meta)
if err != nil {

View File

@ -47,6 +47,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -1723,6 +1724,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
return importFile
})
startTime := time.Now()
job := &importJob{
ImportJob: &datapb.ImportJob{
JobID: idStart,
@ -1736,8 +1738,9 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
State: internalpb.ImportJobState_Pending,
Files: files,
Options: in.GetOptions(),
StartTime: time.Now().Format("2006-01-02T15:04:05Z07:00"),
StartTime: startTime.Format("2006-01-02T15:04:05Z07:00"),
},
tr: timerecord.NewTimeRecorder("import job"),
}
err = s.importMeta.AddJob(job)
if err != nil {

View File

@ -17,6 +17,7 @@
package importv2
import (
"sort"
"sync"
"time"
@ -64,6 +65,9 @@ func (s *scheduler) Start() {
return
case <-exeTicker.C:
tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending))
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].GetTaskID() < tasks[j].GetTaskID()
})
futures := make(map[int64][]*conc.Future[any])
for _, task := range tasks {
fs := task.Execute()
@ -86,7 +90,15 @@ func (s *scheduler) Start() {
func (s *scheduler) Slots() int64 {
tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress))
return paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64() - int64(len(tasks))
used := lo.SumBy(tasks, func(t Task) int64 {
return t.GetSlots()
})
total := paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64()
free := total - used
if free >= 0 {
return free
}
return 0
}
func (s *scheduler) Close() {

View File

@ -161,6 +161,7 @@ type Task interface {
GetState() datapb.ImportTaskStateV2
GetReason() string
GetSchema() *schemapb.CollectionSchema
GetSlots() int64
Cancel()
Clone() Task
}

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -103,6 +104,17 @@ func (t *ImportTask) GetSchema() *schemapb.CollectionSchema {
return t.req.GetSchema()
}
func (t *ImportTask) GetSlots() int64 {
// Consider the following two scenarios:
// 1. Importing a large number of small files results in
// a small total data size, making file count unsuitable as a slot number.
// 2. Importing a file with many shards number results in many segments and a small total data size,
// making segment count unsuitable as a slot number.
// Taking these factors into account, we've decided to use the
// minimum value between segment count and file count as the slot number.
return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt()))
}
func (t *ImportTask) Cancel() {
t.cancel()
}

View File

@ -99,6 +99,10 @@ func (t *L0ImportTask) GetSchema() *schemapb.CollectionSchema {
return t.req.GetSchema()
}
func (t *L0ImportTask) GetSlots() int64 {
return 1
}
func (t *L0ImportTask) Cancel() {
t.cancel()
}

View File

@ -94,6 +94,10 @@ func (t *L0PreImportTask) GetSchema() *schemapb.CollectionSchema {
return t.schema
}
func (t *L0PreImportTask) GetSlots() int64 {
return 1
}
func (t *L0PreImportTask) Cancel() {
t.cancel()
}

View File

@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -101,6 +102,10 @@ func (t *PreImportTask) GetSchema() *schemapb.CollectionSchema {
return t.schema
}
func (t *PreImportTask) GetSlots() int64 {
return int64(funcutil.Min(len(t.GetFileStats()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt()))
}
func (t *PreImportTask) Cancel() {
t.cancel()
}

View File

@ -213,6 +213,28 @@ var (
stageLabelName,
})
ImportJobLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "import_job_latency",
Help: "latency of import job",
Buckets: longTaskBuckets,
}, []string{
importStageLabelName,
})
ImportTaskLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "import_task_latency",
Help: "latency of import task",
Buckets: longTaskBuckets,
}, []string{
importStageLabelName,
})
FlushedSegmentFileNum = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
@ -352,6 +374,8 @@ func RegisterDataCoord(registry *prometheus.Registry) {
registry.MustRegister(DataCoordCompactedSegmentSize)
registry.MustRegister(DataCoordCompactionTaskNum)
registry.MustRegister(DataCoordCompactionLatency)
registry.MustRegister(ImportJobLatency)
registry.MustRegister(ImportTaskLatency)
registry.MustRegister(DataCoordSizeStoredL0Segment)
registry.MustRegister(DataCoordL0DeleteEntriesNum)
registry.MustRegister(FlushedSegmentFileNum)

View File

@ -77,6 +77,12 @@ const (
Executing = "executing"
Done = "done"
ImportStagePending = "pending"
ImportStagePreImport = "preimport"
ImportStageImport = "import"
ImportStageStats = "stats"
ImportStageBuildIndex = "build_index"
compactionTypeLabelName = "compaction_type"
isVectorFieldLabelName = "is_vector_field"
segmentPruneLabelName = "segment_prune_label"
@ -105,6 +111,7 @@ const (
cacheStateLabelName = "cache_state"
indexCountLabelName = "indexed_field_count"
dataSourceLabelName = "data_source"
importStageLabelName = "import_stage"
requestScope = "scope"
fullMethodLabelName = "full_method"
reduceLevelName = "reduce_level"

View File

@ -4172,6 +4172,7 @@ type dataNodeConfig struct {
MaxConcurrentImportTaskNum ParamItem `refreshable:"true"`
MaxImportFileSizeInGB ParamItem `refreshable:"true"`
ReadBufferSizeInMB ParamItem `refreshable:"true"`
MaxTaskSlotNum ParamItem `refreshable:"true"`
// Compaction
L0BatchMemoryRatio ParamItem `refreshable:"true"`
@ -4479,6 +4480,16 @@ if this parameter <= 0, will set it as 10`,
}
p.ReadBufferSizeInMB.Init(base.mgr)
p.MaxTaskSlotNum = ParamItem{
Key: "dataNode.import.maxTaskSlotNum",
Version: "2.4.13",
Doc: "The maximum number of slots occupied by each import/pre-import task.",
DefaultValue: "16",
PanicIfEmpty: false,
Export: true,
}
p.MaxTaskSlotNum.Init(base.mgr)
p.L0BatchMemoryRatio = ParamItem{
Key: "dataNode.compaction.levelZeroBatchMemoryRatio",
Version: "2.4.0",

View File

@ -561,6 +561,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 16, maxConcurrentImportTaskNum)
assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64())
assert.Equal(t, 16, Params.ReadBufferSizeInMB.GetAsInt())
assert.Equal(t, 16, Params.MaxTaskSlotNum.GetAsInt())
params.Save("datanode.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
assert.Equal(t, 16, Params.SlotCap.GetAsInt())