// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package datacoord import ( "context" "fmt" "sync" "time" "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) type ImportChecker interface { Start() Close() } type importChecker struct { meta *meta broker broker.Broker cluster Cluster alloc allocator sm Manager imeta ImportMeta buildIndexCh chan UniqueID closeOnce sync.Once closeChan chan struct{} } func NewImportChecker(meta *meta, broker broker.Broker, cluster Cluster, alloc allocator, sm Manager, imeta ImportMeta, buildIndexCh chan UniqueID, ) ImportChecker { return &importChecker{ meta: meta, broker: broker, cluster: cluster, alloc: alloc, sm: sm, imeta: imeta, buildIndexCh: buildIndexCh, closeChan: make(chan struct{}), } } func (c *importChecker) Start() { log.Info("start import checker") var ( ticker1 = time.NewTicker(Params.DataCoordCfg.ImportCheckIntervalHigh.GetAsDuration(time.Second)) // 2s ticker2 = time.NewTicker(Params.DataCoordCfg.ImportCheckIntervalLow.GetAsDuration(time.Second)) // 2min ) defer ticker1.Stop() defer ticker2.Stop() for { select { case <-c.closeChan: log.Info("import checker exited") return case <-ticker1.C: jobs := c.imeta.GetJobBy() for _, job := range jobs { switch job.GetState() { case internalpb.ImportJobState_Pending: c.checkPendingJob(job) case internalpb.ImportJobState_PreImporting: c.checkPreImportingJob(job) case internalpb.ImportJobState_Importing: c.checkImportingJob(job) case internalpb.ImportJobState_Failed: c.tryFailingTasks(job) } } case <-ticker2.C: jobs := c.imeta.GetJobBy() for _, job := range jobs { c.tryTimeoutJob(job) c.checkGC(job) } jobsByColl := lo.GroupBy(jobs, func(job ImportJob) int64 { return job.GetCollectionID() }) for collID, collJobs := range jobsByColl { c.checkCollection(collID, collJobs) } c.LogStats() } } } func (c *importChecker) Close() { c.closeOnce.Do(func() { close(c.closeChan) }) } func (c *importChecker) LogStats() { logFunc := func(tasks []ImportTask, taskType TaskType) { byState := lo.GroupBy(tasks, func(t ImportTask) datapb.ImportTaskStateV2 { return t.GetState() }) pending := len(byState[datapb.ImportTaskStateV2_Pending]) inProgress := len(byState[datapb.ImportTaskStateV2_InProgress]) completed := len(byState[datapb.ImportTaskStateV2_Completed]) failed := len(byState[datapb.ImportTaskStateV2_Failed]) log.Info("import task stats", zap.String("type", taskType.String()), zap.Int("pending", pending), zap.Int("inProgress", inProgress), zap.Int("completed", completed), zap.Int("failed", failed)) metrics.ImportTasks.WithLabelValues(taskType.String(), datapb.ImportTaskStateV2_Pending.String()).Set(float64(pending)) metrics.ImportTasks.WithLabelValues(taskType.String(), datapb.ImportTaskStateV2_InProgress.String()).Set(float64(inProgress)) metrics.ImportTasks.WithLabelValues(taskType.String(), datapb.ImportTaskStateV2_Completed.String()).Set(float64(completed)) metrics.ImportTasks.WithLabelValues(taskType.String(), datapb.ImportTaskStateV2_Failed.String()).Set(float64(failed)) } tasks := c.imeta.GetTaskBy(WithType(PreImportTaskType)) logFunc(tasks, PreImportTaskType) tasks = c.imeta.GetTaskBy(WithType(ImportTaskType)) logFunc(tasks, ImportTaskType) } func (c *importChecker) getLackFilesForPreImports(job ImportJob) []*internalpb.ImportFile { lacks := lo.KeyBy(job.GetFiles(), func(file *internalpb.ImportFile) int64 { return file.GetId() }) exists := c.imeta.GetTaskBy(WithType(PreImportTaskType), WithJob(job.GetJobID())) for _, task := range exists { for _, file := range task.GetFileStats() { delete(lacks, file.GetImportFile().GetId()) } } return lo.Values(lacks) } func (c *importChecker) getLackFilesForImports(job ImportJob) []*datapb.ImportFileStats { preimports := c.imeta.GetTaskBy(WithType(PreImportTaskType), WithJob(job.GetJobID())) lacks := make(map[int64]*datapb.ImportFileStats, 0) for _, t := range preimports { if t.GetState() != datapb.ImportTaskStateV2_Completed { // Preimport tasks are not fully completed, thus generating imports should not be triggered. return nil } for _, stat := range t.GetFileStats() { lacks[stat.GetImportFile().GetId()] = stat } } exists := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID())) for _, task := range exists { for _, file := range task.GetFileStats() { delete(lacks, file.GetImportFile().GetId()) } } return lo.Values(lacks) } func (c *importChecker) checkPendingJob(job ImportJob) { lacks := c.getLackFilesForPreImports(job) if len(lacks) == 0 { return } fileGroups := lo.Chunk(lacks, Params.DataCoordCfg.FilesPerPreImportTask.GetAsInt()) newTasks, err := NewPreImportTasks(fileGroups, job, c.alloc) if err != nil { log.Warn("new preimport tasks failed", zap.Error(err)) return } for _, t := range newTasks { err = c.imeta.AddTask(t) if err != nil { log.Warn("add preimport task failed", WrapTaskLog(t, zap.Error(err))...) return } log.Info("add new preimport task", WrapTaskLog(t)...) } err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) if err != nil { log.Warn("failed to update job state to PreImporting", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) } } func (c *importChecker) checkPreImportingJob(job ImportJob) { lacks := c.getLackFilesForImports(job) if len(lacks) == 0 { return } groups := RegroupImportFiles(job, lacks) newTasks, err := NewImportTasks(groups, job, c.sm, c.alloc) if err != nil { log.Warn("new import tasks failed", zap.Error(err)) return } for _, t := range newTasks { err = c.imeta.AddTask(t) if err != nil { log.Warn("add new import task failed", WrapTaskLog(t, zap.Error(err))...) return } log.Info("add new import task", WrapTaskLog(t)...) } err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing)) if err != nil { log.Warn("failed to update job state to Importing", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) } } func (c *importChecker) checkImportingJob(job ImportJob) { tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID())) for _, t := range tasks { if t.GetState() != datapb.ImportTaskStateV2_Completed { return } } unfinished := make([]int64, 0) for _, task := range tasks { segmentIDs := task.(*importTask).GetSegmentIDs() for _, segmentID := range segmentIDs { segment := c.meta.GetSegment(segmentID) if segment == nil { log.Warn("cannot find segment, may be compacted", WrapTaskLog(task, zap.Int64("segmentID", segmentID))...) continue } if segment.GetIsImporting() { unfinished = append(unfinished, segmentID) } } } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err := c.sm.FlushImportSegments(ctx, job.GetCollectionID(), unfinished) if err != nil { log.Warn("flush imported segments failed", zap.Int64("jobID", job.GetJobID()), zap.Int64("collectionID", job.GetCollectionID()), zap.Int64s("segments", unfinished), zap.Error(err)) return } channels, err := c.meta.GetSegmentsChannels(unfinished) if err != nil { log.Warn("get segments channels failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) return } for _, segmentID := range unfinished { err = AddImportSegment(c.cluster, c.meta, segmentID) if err != nil { log.Warn("add import segment failed", zap.Int64("jobID", job.GetJobID()), zap.Int64("collectionID", job.GetCollectionID()), zap.Error(err)) return } c.buildIndexCh <- segmentID // accelerate index building channelCP := c.meta.GetChannelCheckpoint(channels[segmentID]) if channelCP == nil { log.Warn("nil channel checkpoint", zap.Int64("jobID", job.GetJobID())) return } op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}}) op2 := UpdateDmlPosition(segmentID, channelCP) op3 := UpdateIsImporting(segmentID, false) err = c.meta.UpdateSegmentsInfo(op1, op2, op3) if err != nil { log.Warn("update import segment failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) return } } err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed)) if err != nil { log.Warn("failed to update job state to Completed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) } } func (c *importChecker) tryFailingTasks(job ImportJob) { tasks := c.imeta.GetTaskBy(WithJob(job.GetJobID()), WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress, datapb.ImportTaskStateV2_Completed)) if len(tasks) == 0 { return } log.Warn("Import job has failed, all tasks with the same jobID"+ " will be marked as failed", zap.Int64("jobID", job.GetJobID())) for _, task := range tasks { err := c.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(job.GetReason())) if err != nil { log.Warn("failed to update import task state to failed", WrapTaskLog(task, zap.Error(err))...) continue } } } func (c *importChecker) tryTimeoutJob(job ImportJob) { timeoutTime := tsoutil.PhysicalTime(job.GetTimeoutTs()) if time.Now().After(timeoutTime) { log.Warn("Import timeout, expired the specified time limit", zap.Int64("jobID", job.GetJobID()), zap.Time("timeoutTime", timeoutTime)) err := c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason("import timeout")) if err != nil { log.Warn("failed to update job state to Failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) } } } func (c *importChecker) checkCollection(collectionID int64, jobs []ImportJob) { if len(jobs) == 0 { return } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() has, err := c.broker.HasCollection(ctx, collectionID) if err != nil { log.Warn("verify existence of collection failed", zap.Int64("collection", collectionID), zap.Error(err)) return } if !has { for _, job := range jobs { err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(fmt.Sprintf("collection %d dropped", collectionID))) if err != nil { log.Warn("failed to update job state to Failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) } } } } func (c *importChecker) checkGC(job ImportJob) { if job.GetState() != internalpb.ImportJobState_Completed && job.GetState() != internalpb.ImportJobState_Failed { return } GCRetention := Params.DataCoordCfg.ImportTaskRetention.GetAsDuration(time.Second) cleanupTime := tsoutil.PhysicalTime(job.GetCleanupTs()) if time.Since(cleanupTime) >= GCRetention { log.Info("job has reached the GC retention", zap.Int64("jobID", job.GetJobID()), zap.Time("cleanupTime", cleanupTime), zap.Duration("GCRetention", GCRetention)) tasks := c.imeta.GetTaskBy(WithJob(job.GetJobID())) shouldRemoveJob := true for _, task := range tasks { if job.GetState() == internalpb.ImportJobState_Failed && task.GetType() == ImportTaskType { if len(task.(*importTask).GetSegmentIDs()) != 0 { shouldRemoveJob = false continue } } if task.GetNodeID() != NullNodeID { shouldRemoveJob = false continue } err := c.imeta.RemoveTask(task.GetTaskID()) if err != nil { log.Warn("remove task failed during GC", WrapTaskLog(task, zap.Error(err))...) shouldRemoveJob = false continue } log.Info("reached GC retention, task removed", WrapTaskLog(task)...) } if !shouldRemoveJob { return } err := c.imeta.RemoveJob(job.GetJobID()) if err != nil { log.Warn("remove import job failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) } } }