mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
336 lines
10 KiB
Go
336 lines
10 KiB
Go
|
// Licensed to the LF AI & Data foundation under one
|
||
|
// or more contributor license agreements. See the NOTICE file
|
||
|
// distributed with this work for additional information
|
||
|
// regarding copyright ownership. The ASF licenses this file
|
||
|
// to you under the Apache License, Version 2.0 (the
|
||
|
// "License"); you may not use this file except in compliance
|
||
|
// with the License. You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package datacoord
|
||
|
|
||
|
import (
|
||
|
"sort"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/cockroachdb/errors"
|
||
|
"github.com/samber/lo"
|
||
|
"go.uber.org/zap"
|
||
|
|
||
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||
|
"github.com/milvus-io/milvus/pkg/log"
|
||
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
NullNodeID = -1
|
||
|
)
|
||
|
|
||
|
type ImportScheduler interface {
|
||
|
Start()
|
||
|
Close()
|
||
|
}
|
||
|
|
||
|
type importScheduler struct {
|
||
|
meta *meta
|
||
|
cluster Cluster
|
||
|
alloc allocator
|
||
|
imeta ImportMeta
|
||
|
|
||
|
closeOnce sync.Once
|
||
|
closeChan chan struct{}
|
||
|
}
|
||
|
|
||
|
func NewImportScheduler(meta *meta,
|
||
|
cluster Cluster,
|
||
|
alloc allocator,
|
||
|
imeta ImportMeta,
|
||
|
) ImportScheduler {
|
||
|
return &importScheduler{
|
||
|
meta: meta,
|
||
|
cluster: cluster,
|
||
|
alloc: alloc,
|
||
|
imeta: imeta,
|
||
|
closeChan: make(chan struct{}),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *importScheduler) Start() {
|
||
|
log.Info("start import scheduler")
|
||
|
ticker := time.NewTicker(Params.DataCoordCfg.ImportScheduleInterval.GetAsDuration(time.Second))
|
||
|
defer ticker.Stop()
|
||
|
for {
|
||
|
select {
|
||
|
case <-s.closeChan:
|
||
|
log.Info("import scheduler exited")
|
||
|
return
|
||
|
case <-ticker.C:
|
||
|
s.process()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *importScheduler) Close() {
|
||
|
s.closeOnce.Do(func() {
|
||
|
close(s.closeChan)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (s *importScheduler) process() {
|
||
|
getNodeID := func(nodeSlots map[int64]int64) int64 {
|
||
|
for nodeID, slots := range nodeSlots {
|
||
|
if slots > 0 {
|
||
|
nodeSlots[nodeID]--
|
||
|
return nodeID
|
||
|
}
|
||
|
}
|
||
|
return NullNodeID
|
||
|
}
|
||
|
|
||
|
jobs := s.imeta.GetJobBy()
|
||
|
sort.Slice(jobs, func(i, j int) bool {
|
||
|
return jobs[i].GetJobID() < jobs[j].GetJobID()
|
||
|
})
|
||
|
nodeSlots := s.peekSlots()
|
||
|
for _, job := range jobs {
|
||
|
tasks := s.imeta.GetTaskBy(WithJob(job.GetJobID()))
|
||
|
for _, task := range tasks {
|
||
|
switch task.GetState() {
|
||
|
case datapb.ImportTaskStateV2_Pending:
|
||
|
nodeID := getNodeID(nodeSlots)
|
||
|
switch task.GetType() {
|
||
|
case PreImportTaskType:
|
||
|
s.processPendingPreImport(task, nodeID)
|
||
|
case ImportTaskType:
|
||
|
s.processPendingImport(task, nodeID)
|
||
|
}
|
||
|
case datapb.ImportTaskStateV2_InProgress:
|
||
|
switch task.GetType() {
|
||
|
case PreImportTaskType:
|
||
|
s.processInProgressPreImport(task)
|
||
|
case ImportTaskType:
|
||
|
s.processInProgressImport(task)
|
||
|
}
|
||
|
case datapb.ImportTaskStateV2_Completed:
|
||
|
s.processCompleted(task)
|
||
|
case datapb.ImportTaskStateV2_Failed:
|
||
|
s.processFailed(task)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *importScheduler) checkErr(task ImportTask, err error) {
|
||
|
if merr.IsRetryableErr(err) || merr.IsCanceledOrTimeout(err) || errors.Is(err, merr.ErrNodeNotFound) {
|
||
|
err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending))
|
||
|
if err != nil {
|
||
|
log.Warn("failed to update import task state to pending", WrapTaskLog(task, zap.Error(err))...)
|
||
|
return
|
||
|
}
|
||
|
log.Info("reset task state to pending due to error occurs", WrapTaskLog(task, zap.Error(err))...)
|
||
|
} else {
|
||
|
err = s.imeta.UpdateJob(task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
|
||
|
if err != nil {
|
||
|
log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(err))
|
||
|
return
|
||
|
}
|
||
|
log.Info("import task failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *importScheduler) peekSlots() map[int64]int64 {
|
||
|
nodeIDs := lo.Map(s.cluster.GetSessions(), func(s *Session, _ int) int64 {
|
||
|
return s.info.NodeID
|
||
|
})
|
||
|
nodeSlots := make(map[int64]int64)
|
||
|
mu := &sync.Mutex{}
|
||
|
wg := &sync.WaitGroup{}
|
||
|
for _, nodeID := range nodeIDs {
|
||
|
wg.Add(1)
|
||
|
go func(nodeID int64) {
|
||
|
defer wg.Done()
|
||
|
resp, err := s.cluster.QueryImport(nodeID, &datapb.QueryImportRequest{QuerySlot: true})
|
||
|
if err != nil {
|
||
|
log.Warn("query import failed", zap.Error(err))
|
||
|
return
|
||
|
}
|
||
|
mu.Lock()
|
||
|
defer mu.Unlock()
|
||
|
nodeSlots[nodeID] = resp.GetSlots()
|
||
|
}(nodeID)
|
||
|
}
|
||
|
wg.Wait()
|
||
|
log.Info("peek slots done", zap.Any("nodeSlots", nodeSlots))
|
||
|
return nodeSlots
|
||
|
}
|
||
|
|
||
|
func (s *importScheduler) processPendingPreImport(task ImportTask, nodeID int64) {
|
||
|
if nodeID == NullNodeID {
|
||
|
return
|
||
|
}
|
||
|
log.Info("processing pending preimport task...", WrapTaskLog(task)...)
|
||
|
job := s.imeta.GetJob(task.GetJobID())
|
||
|
req := AssemblePreImportRequest(task, job)
|
||
|
err := s.cluster.PreImport(nodeID, req)
|
||
|
if err != nil {
|
||
|
log.Warn("preimport failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
return
|
||
|
}
|
||
|
err = s.imeta.UpdateTask(task.GetTaskID(),
|
||
|
UpdateState(datapb.ImportTaskStateV2_InProgress),
|
||
|
UpdateNodeID(nodeID))
|
||
|
if err != nil {
|
||
|
log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
return
|
||
|
}
|
||
|
log.Info("process pending preimport task done", WrapTaskLog(task)...)
|
||
|
}
|
||
|
|
||
|
func (s *importScheduler) processPendingImport(task ImportTask, nodeID int64) {
|
||
|
if nodeID == NullNodeID {
|
||
|
return
|
||
|
}
|
||
|
log.Info("processing pending import task...", WrapTaskLog(task)...)
|
||
|
job := s.imeta.GetJob(task.GetJobID())
|
||
|
req, err := AssembleImportRequest(task, job, s.meta, s.alloc)
|
||
|
if err != nil {
|
||
|
log.Warn("assemble import request failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
return
|
||
|
}
|
||
|
err = s.cluster.ImportV2(nodeID, req)
|
||
|
if err != nil {
|
||
|
log.Warn("import failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
return
|
||
|
}
|
||
|
err = s.imeta.UpdateTask(task.GetTaskID(),
|
||
|
UpdateState(datapb.ImportTaskStateV2_InProgress),
|
||
|
UpdateNodeID(nodeID))
|
||
|
if err != nil {
|
||
|
log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
return
|
||
|
}
|
||
|
log.Info("processing pending import task done", WrapTaskLog(task)...)
|
||
|
}
|
||
|
|
||
|
func (s *importScheduler) processInProgressPreImport(task ImportTask) {
|
||
|
req := &datapb.QueryPreImportRequest{
|
||
|
JobID: task.GetJobID(),
|
||
|
TaskID: task.GetTaskID(),
|
||
|
}
|
||
|
resp, err := s.cluster.QueryPreImport(task.GetNodeID(), req)
|
||
|
if err != nil {
|
||
|
log.Warn("query preimport failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
s.checkErr(task, err)
|
||
|
return
|
||
|
}
|
||
|
if resp.GetState() == datapb.ImportTaskStateV2_Failed {
|
||
|
err = s.imeta.UpdateJob(task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
|
||
|
UpdateJobReason(resp.GetReason()))
|
||
|
if err != nil {
|
||
|
log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(err))
|
||
|
}
|
||
|
log.Warn("preimport failed", WrapTaskLog(task, zap.String("reason", resp.GetReason()))...)
|
||
|
return
|
||
|
}
|
||
|
actions := []UpdateAction{UpdateFileStats(resp.GetFileStats())}
|
||
|
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
|
||
|
actions = append(actions, UpdateState(datapb.ImportTaskStateV2_Completed))
|
||
|
}
|
||
|
err = s.imeta.UpdateTask(task.GetTaskID(), actions...)
|
||
|
if err != nil {
|
||
|
log.Warn("update preimport task failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
return
|
||
|
}
|
||
|
log.Info("query preimport", WrapTaskLog(task, zap.String("state", resp.GetState().String()),
|
||
|
zap.Any("fileStats", resp.GetFileStats()))...)
|
||
|
}
|
||
|
|
||
|
func (s *importScheduler) processInProgressImport(task ImportTask) {
|
||
|
req := &datapb.QueryImportRequest{
|
||
|
JobID: task.GetJobID(),
|
||
|
TaskID: task.GetTaskID(),
|
||
|
}
|
||
|
resp, err := s.cluster.QueryImport(task.GetNodeID(), req)
|
||
|
if err != nil {
|
||
|
log.Warn("query import failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
s.checkErr(task, err)
|
||
|
return
|
||
|
}
|
||
|
if resp.GetState() == datapb.ImportTaskStateV2_Failed {
|
||
|
err = s.imeta.UpdateJob(task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
|
||
|
UpdateJobReason(resp.GetReason()))
|
||
|
if err != nil {
|
||
|
log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(err))
|
||
|
}
|
||
|
log.Warn("import failed", WrapTaskLog(task, zap.String("reason", resp.GetReason()))...)
|
||
|
return
|
||
|
}
|
||
|
for _, info := range resp.GetImportSegmentsInfo() {
|
||
|
segment := s.meta.GetSegment(info.GetSegmentID())
|
||
|
if info.GetImportedRows() <= segment.GetNumOfRows() {
|
||
|
continue // rows not changed, no need to update
|
||
|
}
|
||
|
op := UpdateImportedRows(info.GetSegmentID(), info.GetImportedRows())
|
||
|
err = s.meta.UpdateSegmentsInfo(op)
|
||
|
if err != nil {
|
||
|
log.Warn("update import segment rows failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
|
||
|
for _, info := range resp.GetImportSegmentsInfo() {
|
||
|
op := ReplaceBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), nil)
|
||
|
err = s.meta.UpdateSegmentsInfo(op)
|
||
|
if err != nil {
|
||
|
log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed))
|
||
|
if err != nil {
|
||
|
log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
log.Info("query import", WrapTaskLog(task, zap.String("state", resp.GetState().String()),
|
||
|
zap.String("reason", resp.GetReason()))...)
|
||
|
}
|
||
|
|
||
|
func (s *importScheduler) processCompleted(task ImportTask) {
|
||
|
err := DropImportTask(task, s.cluster, s.imeta)
|
||
|
if err != nil {
|
||
|
log.Warn("drop import failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *importScheduler) processFailed(task ImportTask) {
|
||
|
if task.GetType() == ImportTaskType {
|
||
|
segments := task.(*importTask).GetSegmentIDs()
|
||
|
for _, segment := range segments {
|
||
|
err := s.meta.DropSegment(segment)
|
||
|
if err != nil {
|
||
|
log.Warn("drop import segment failed",
|
||
|
WrapTaskLog(task, zap.Int64("segment", segment), zap.Error(err))...)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
err := s.imeta.UpdateTask(task.GetTaskID(), UpdateSegmentIDs(nil))
|
||
|
if err != nil {
|
||
|
log.Warn("update import task segments failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
}
|
||
|
}
|
||
|
err := DropImportTask(task, s.cluster, s.imeta)
|
||
|
if err != nil {
|
||
|
log.Warn("drop import failed", WrapTaskLog(task, zap.Error(err))...)
|
||
|
}
|
||
|
}
|