mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-03 04:19:18 +08:00
Abstract Execute interface for import/preimport task, simplify import scheduler. issue: https://github.com/milvus-io/milvus/issues/33157 pr: https://github.com/milvus-io/milvus/pull/33234 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
95582b0208
commit
e282e1408e
@ -288,7 +288,7 @@ func (node *DataNode) Init() error {
|
|||||||
node.writeBufferManager = writebuffer.NewManager(syncMgr)
|
node.writeBufferManager = writebuffer.NewManager(syncMgr)
|
||||||
|
|
||||||
node.importTaskMgr = importv2.NewTaskManager()
|
node.importTaskMgr = importv2.NewTaskManager()
|
||||||
node.importScheduler = importv2.NewScheduler(node.importTaskMgr, node.syncMgr, node.chunkManager)
|
node.importScheduler = importv2.NewScheduler(node.importTaskMgr)
|
||||||
node.channelCheckpointUpdater = newChannelCheckpointUpdater(node)
|
node.channelCheckpointUpdater = newChannelCheckpointUpdater(node)
|
||||||
node.flowgraphManager = newFlowgraphManager()
|
node.flowgraphManager = newFlowgraphManager()
|
||||||
|
|
||||||
|
41
internal/datanode/importv2/pool.go
Normal file
41
internal/datanode/importv2/pool.go
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package importv2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
execPool *conc.Pool[any]
|
||||||
|
execPoolInitOnce sync.Once
|
||||||
|
)
|
||||||
|
|
||||||
|
func initExecPool() {
|
||||||
|
execPool = conc.NewPool[any](
|
||||||
|
paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt(),
|
||||||
|
conc.WithPreAlloc(true),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetExecPool() *conc.Pool[any] {
|
||||||
|
execPoolInitOnce.Do(initExecPool)
|
||||||
|
return execPool
|
||||||
|
}
|
@ -17,20 +17,13 @@
|
|||||||
package importv2
|
package importv2
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
|
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
||||||
"github.com/milvus-io/milvus/internal/storage"
|
|
||||||
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
@ -44,25 +37,14 @@ type Scheduler interface {
|
|||||||
|
|
||||||
type scheduler struct {
|
type scheduler struct {
|
||||||
manager TaskManager
|
manager TaskManager
|
||||||
syncMgr syncmgr.SyncManager
|
|
||||||
cm storage.ChunkManager
|
|
||||||
|
|
||||||
pool *conc.Pool[any]
|
|
||||||
|
|
||||||
closeOnce sync.Once
|
closeOnce sync.Once
|
||||||
closeChan chan struct{}
|
closeChan chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewScheduler(manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.ChunkManager) Scheduler {
|
func NewScheduler(manager TaskManager) Scheduler {
|
||||||
pool := conc.NewPool[any](
|
|
||||||
paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt(),
|
|
||||||
conc.WithPreAlloc(true),
|
|
||||||
)
|
|
||||||
return &scheduler{
|
return &scheduler{
|
||||||
manager: manager,
|
manager: manager,
|
||||||
syncMgr: syncMgr,
|
|
||||||
cm: cm,
|
|
||||||
pool: pool,
|
|
||||||
closeChan: make(chan struct{}),
|
closeChan: make(chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -84,16 +66,9 @@ func (s *scheduler) Start() {
|
|||||||
tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending))
|
tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending))
|
||||||
futures := make(map[int64][]*conc.Future[any])
|
futures := make(map[int64][]*conc.Future[any])
|
||||||
for _, task := range tasks {
|
for _, task := range tasks {
|
||||||
switch task.GetType() {
|
fs := task.Execute()
|
||||||
case PreImportTaskType:
|
futures[task.GetTaskID()] = fs
|
||||||
fs := s.PreImport(task)
|
tryFreeFutures(futures)
|
||||||
futures[task.GetTaskID()] = fs
|
|
||||||
tryFreeFutures(futures)
|
|
||||||
case ImportTaskType:
|
|
||||||
fs := s.Import(task)
|
|
||||||
futures[task.GetTaskID()] = fs
|
|
||||||
tryFreeFutures(futures)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
for taskID, fs := range futures {
|
for taskID, fs := range futures {
|
||||||
err := conc.AwaitAll(fs...)
|
err := conc.AwaitAll(fs...)
|
||||||
@ -120,17 +95,6 @@ func (s *scheduler) Close() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func WrapLogFields(task Task, fields ...zap.Field) []zap.Field {
|
|
||||||
res := []zap.Field{
|
|
||||||
zap.Int64("taskID", task.GetTaskID()),
|
|
||||||
zap.Int64("jobID", task.GetJobID()),
|
|
||||||
zap.Int64("collectionID", task.GetCollectionID()),
|
|
||||||
zap.String("type", task.GetType().String()),
|
|
||||||
}
|
|
||||||
res = append(res, fields...)
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
func tryFreeFutures(futures map[int64][]*conc.Future[any]) {
|
func tryFreeFutures(futures map[int64][]*conc.Future[any]) {
|
||||||
for k, fs := range futures {
|
for k, fs := range futures {
|
||||||
fs = lo.Filter(fs, func(f *conc.Future[any], _ int) bool {
|
fs = lo.Filter(fs, func(f *conc.Future[any], _ int) bool {
|
||||||
@ -143,207 +107,3 @@ func tryFreeFutures(futures map[int64][]*conc.Future[any]) {
|
|||||||
futures[k] = fs
|
futures[k] = fs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *scheduler) handleErr(task Task, err error, msg string) {
|
|
||||||
log.Warn(msg, WrapLogFields(task, zap.Error(err))...)
|
|
||||||
s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error()))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduler) PreImport(task Task) []*conc.Future[any] {
|
|
||||||
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
|
|
||||||
log.Info("start to preimport", WrapLogFields(task,
|
|
||||||
zap.Int("bufferSize", bufferSize),
|
|
||||||
zap.Any("schema", task.GetSchema()))...)
|
|
||||||
s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))
|
|
||||||
files := lo.Map(task.(*PreImportTask).GetFileStats(),
|
|
||||||
func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile {
|
|
||||||
return fileStat.GetImportFile()
|
|
||||||
})
|
|
||||||
|
|
||||||
fn := func(i int, file *internalpb.ImportFile) error {
|
|
||||||
reader, err := importutilv2.NewReader(task.GetCtx(), s.cm, task.GetSchema(), file, task.GetOptions(), bufferSize)
|
|
||||||
if err != nil {
|
|
||||||
s.handleErr(task, err, "new reader failed")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer reader.Close()
|
|
||||||
start := time.Now()
|
|
||||||
err = s.readFileStat(reader, task, i)
|
|
||||||
if err != nil {
|
|
||||||
s.handleErr(task, err, "preimport failed")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.Info("read file stat done", WrapLogFields(task, zap.Strings("files", file.GetPaths()),
|
|
||||||
zap.Duration("dur", time.Since(start)))...)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
futures := make([]*conc.Future[any], 0, len(files))
|
|
||||||
for i, file := range files {
|
|
||||||
i := i
|
|
||||||
file := file
|
|
||||||
f := s.pool.Submit(func() (any, error) {
|
|
||||||
err := fn(i, file)
|
|
||||||
return err, err
|
|
||||||
})
|
|
||||||
futures = append(futures, f)
|
|
||||||
}
|
|
||||||
return futures
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduler) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error {
|
|
||||||
fileSize, err := reader.Size()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
maxSize := paramtable.Get().DataNodeCfg.MaxImportFileSizeInGB.GetAsFloat() * 1024 * 1024 * 1024
|
|
||||||
if fileSize > int64(maxSize) {
|
|
||||||
return errors.New(fmt.Sprintf(
|
|
||||||
"The import file size has reached the maximum limit allowed for importing, "+
|
|
||||||
"fileSize=%d, maxSize=%d", fileSize, int64(maxSize)))
|
|
||||||
}
|
|
||||||
|
|
||||||
totalRows := 0
|
|
||||||
totalSize := 0
|
|
||||||
hashedStats := make(map[string]*datapb.PartitionImportStats)
|
|
||||||
for {
|
|
||||||
data, err := reader.Read()
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, io.EOF) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = CheckRowsEqual(task.GetSchema(), data)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
rowsCount, err := GetRowsStats(task, data)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
MergeHashedStats(rowsCount, hashedStats)
|
|
||||||
rows := data.GetRowNum()
|
|
||||||
size := data.GetMemorySize()
|
|
||||||
totalRows += rows
|
|
||||||
totalSize += size
|
|
||||||
log.Info("reading file stat...", WrapLogFields(task, zap.Int("readRows", rows), zap.Int("readSize", size))...)
|
|
||||||
}
|
|
||||||
|
|
||||||
stat := &datapb.ImportFileStats{
|
|
||||||
FileSize: fileSize,
|
|
||||||
TotalRows: int64(totalRows),
|
|
||||||
TotalMemorySize: int64(totalSize),
|
|
||||||
HashedStats: hashedStats,
|
|
||||||
}
|
|
||||||
s.manager.Update(task.GetTaskID(), UpdateFileStat(fileIdx, stat))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduler) Import(task Task) []*conc.Future[any] {
|
|
||||||
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
|
|
||||||
log.Info("start to import", WrapLogFields(task,
|
|
||||||
zap.Int("bufferSize", bufferSize),
|
|
||||||
zap.Any("schema", task.GetSchema()))...)
|
|
||||||
s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))
|
|
||||||
|
|
||||||
req := task.(*ImportTask).req
|
|
||||||
|
|
||||||
fn := func(file *internalpb.ImportFile) error {
|
|
||||||
reader, err := importutilv2.NewReader(task.GetCtx(), s.cm, task.GetSchema(), file, task.GetOptions(), bufferSize)
|
|
||||||
if err != nil {
|
|
||||||
s.handleErr(task, err, fmt.Sprintf("new reader failed, file: %s", file.String()))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer reader.Close()
|
|
||||||
start := time.Now()
|
|
||||||
err = s.importFile(reader, task)
|
|
||||||
if err != nil {
|
|
||||||
s.handleErr(task, err, fmt.Sprintf("do import failed, file: %s", file.String()))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.Info("import file done", WrapLogFields(task, zap.Strings("files", file.GetPaths()),
|
|
||||||
zap.Duration("dur", time.Since(start)))...)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
futures := make([]*conc.Future[any], 0, len(req.GetFiles()))
|
|
||||||
for _, file := range req.GetFiles() {
|
|
||||||
file := file
|
|
||||||
f := s.pool.Submit(func() (any, error) {
|
|
||||||
err := fn(file)
|
|
||||||
return err, err
|
|
||||||
})
|
|
||||||
futures = append(futures, f)
|
|
||||||
}
|
|
||||||
return futures
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduler) importFile(reader importutilv2.Reader, task Task) error {
|
|
||||||
iTask := task.(*ImportTask)
|
|
||||||
syncFutures := make([]*conc.Future[struct{}], 0)
|
|
||||||
syncTasks := make([]syncmgr.Task, 0)
|
|
||||||
for {
|
|
||||||
data, err := reader.Read()
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, io.EOF) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = AppendSystemFieldsData(iTask, data)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
hashedData, err := HashData(iTask, data)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
fs, sts, err := s.Sync(iTask, hashedData)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
syncFutures = append(syncFutures, fs...)
|
|
||||||
syncTasks = append(syncTasks, sts...)
|
|
||||||
}
|
|
||||||
err := conc.AwaitAll(syncFutures...)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for _, syncTask := range syncTasks {
|
|
||||||
segmentInfo, err := NewImportSegmentInfo(syncTask, iTask)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
s.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo))
|
|
||||||
log.Info("sync import data done", WrapLogFields(task, zap.Any("segmentInfo", segmentInfo))...)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduler) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
|
|
||||||
log.Info("start to sync import data", WrapLogFields(task)...)
|
|
||||||
futures := make([]*conc.Future[struct{}], 0)
|
|
||||||
syncTasks := make([]syncmgr.Task, 0)
|
|
||||||
segmentImportedSizes := make(map[int64]int)
|
|
||||||
for channelIdx, datas := range hashedData {
|
|
||||||
channel := task.GetVchannels()[channelIdx]
|
|
||||||
for partitionIdx, data := range datas {
|
|
||||||
if data.GetRowNum() == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
partitionID := task.GetPartitionIDs()[partitionIdx]
|
|
||||||
size := data.GetMemorySize()
|
|
||||||
segmentID := PickSegment(task, segmentImportedSizes, channel, partitionID, size)
|
|
||||||
syncTask, err := NewSyncTask(task.GetCtx(), task, segmentID, partitionID, channel, data)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
segmentImportedSizes[segmentID] += size
|
|
||||||
future := s.syncMgr.SyncData(task.GetCtx(), syncTask)
|
|
||||||
futures = append(futures, future)
|
|
||||||
syncTasks = append(syncTasks, syncTask)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return futures, syncTasks, nil
|
|
||||||
}
|
|
||||||
|
@ -116,7 +116,7 @@ func (s *SchedulerSuite) SetupTest() {
|
|||||||
|
|
||||||
s.manager = NewTaskManager()
|
s.manager = NewTaskManager()
|
||||||
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
|
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
|
||||||
s.scheduler = NewScheduler(s.manager, s.syncMgr, nil).(*scheduler)
|
s.scheduler = NewScheduler(s.manager).(*scheduler)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData {
|
func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData {
|
||||||
@ -236,7 +236,7 @@ func (s *SchedulerSuite) TestScheduler_Slots() {
|
|||||||
Schema: s.schema,
|
Schema: s.schema,
|
||||||
ImportFiles: []*internalpb.ImportFile{{Paths: []string{"dummy.json"}}},
|
ImportFiles: []*internalpb.ImportFile{{Paths: []string{"dummy.json"}}},
|
||||||
}
|
}
|
||||||
preimportTask := NewPreImportTask(preimportReq)
|
preimportTask := NewPreImportTask(preimportReq, s.manager, s.cm)
|
||||||
s.manager.Add(preimportTask)
|
s.manager.Add(preimportTask)
|
||||||
|
|
||||||
slots := s.scheduler.Slots()
|
slots := s.scheduler.Slots()
|
||||||
@ -262,7 +262,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Preimport() {
|
|||||||
ioReader := strings.NewReader(string(bytes))
|
ioReader := strings.NewReader(string(bytes))
|
||||||
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
|
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
|
||||||
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
||||||
s.scheduler.cm = cm
|
s.cm = cm
|
||||||
|
|
||||||
preimportReq := &datapb.PreImportRequest{
|
preimportReq := &datapb.PreImportRequest{
|
||||||
JobID: 1,
|
JobID: 1,
|
||||||
@ -273,7 +273,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Preimport() {
|
|||||||
Schema: s.schema,
|
Schema: s.schema,
|
||||||
ImportFiles: []*internalpb.ImportFile{{Paths: []string{"dummy.json"}}},
|
ImportFiles: []*internalpb.ImportFile{{Paths: []string{"dummy.json"}}},
|
||||||
}
|
}
|
||||||
preimportTask := NewPreImportTask(preimportReq)
|
preimportTask := NewPreImportTask(preimportReq, s.manager, s.cm)
|
||||||
s.manager.Add(preimportTask)
|
s.manager.Add(preimportTask)
|
||||||
|
|
||||||
go s.scheduler.Start()
|
go s.scheduler.Start()
|
||||||
@ -316,7 +316,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Preimport_Failed() {
|
|||||||
ioReader := strings.NewReader(string(bytes))
|
ioReader := strings.NewReader(string(bytes))
|
||||||
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
|
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
|
||||||
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
||||||
s.scheduler.cm = cm
|
s.cm = cm
|
||||||
|
|
||||||
preimportReq := &datapb.PreImportRequest{
|
preimportReq := &datapb.PreImportRequest{
|
||||||
JobID: 1,
|
JobID: 1,
|
||||||
@ -327,7 +327,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Preimport_Failed() {
|
|||||||
Schema: s.schema,
|
Schema: s.schema,
|
||||||
ImportFiles: []*internalpb.ImportFile{{Paths: []string{"dummy.json"}}},
|
ImportFiles: []*internalpb.ImportFile{{Paths: []string{"dummy.json"}}},
|
||||||
}
|
}
|
||||||
preimportTask := NewPreImportTask(preimportReq)
|
preimportTask := NewPreImportTask(preimportReq, s.manager, s.cm)
|
||||||
s.manager.Add(preimportTask)
|
s.manager.Add(preimportTask)
|
||||||
|
|
||||||
go s.scheduler.Start()
|
go s.scheduler.Start()
|
||||||
@ -355,7 +355,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() {
|
|||||||
cm := mocks.NewChunkManager(s.T())
|
cm := mocks.NewChunkManager(s.T())
|
||||||
ioReader := strings.NewReader(string(bytes))
|
ioReader := strings.NewReader(string(bytes))
|
||||||
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
||||||
s.scheduler.cm = cm
|
s.cm = cm
|
||||||
|
|
||||||
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] {
|
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] {
|
||||||
future := conc.Go(func() (struct{}, error) {
|
future := conc.Go(func() (struct{}, error) {
|
||||||
@ -388,7 +388,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
importTask := NewImportTask(importReq)
|
importTask := NewImportTask(importReq, s.manager, s.syncMgr, s.cm)
|
||||||
s.manager.Add(importTask)
|
s.manager.Add(importTask)
|
||||||
|
|
||||||
go s.scheduler.Start()
|
go s.scheduler.Start()
|
||||||
@ -416,7 +416,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() {
|
|||||||
cm := mocks.NewChunkManager(s.T())
|
cm := mocks.NewChunkManager(s.T())
|
||||||
ioReader := strings.NewReader(string(bytes))
|
ioReader := strings.NewReader(string(bytes))
|
||||||
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
||||||
s.scheduler.cm = cm
|
s.cm = cm
|
||||||
|
|
||||||
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] {
|
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] {
|
||||||
future := conc.Go(func() (struct{}, error) {
|
future := conc.Go(func() (struct{}, error) {
|
||||||
@ -449,7 +449,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
importTask := NewImportTask(importReq)
|
importTask := NewImportTask(importReq, s.manager, s.syncMgr, s.cm)
|
||||||
s.manager.Add(importTask)
|
s.manager.Add(importTask)
|
||||||
|
|
||||||
go s.scheduler.Start()
|
go s.scheduler.Start()
|
||||||
@ -487,9 +487,9 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() {
|
|||||||
Schema: s.schema,
|
Schema: s.schema,
|
||||||
ImportFiles: []*internalpb.ImportFile{importFile},
|
ImportFiles: []*internalpb.ImportFile{importFile},
|
||||||
}
|
}
|
||||||
preimportTask := NewPreImportTask(preimportReq)
|
preimportTask := NewPreImportTask(preimportReq, s.manager, s.cm)
|
||||||
s.manager.Add(preimportTask)
|
s.manager.Add(preimportTask)
|
||||||
err := s.scheduler.readFileStat(s.reader, preimportTask, 0)
|
err := preimportTask.(*PreImportTask).readFileStat(s.reader, preimportTask, 0)
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -538,9 +538,9 @@ func (s *SchedulerSuite) TestScheduler_ImportFile() {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
importTask := NewImportTask(importReq)
|
importTask := NewImportTask(importReq, s.manager, s.syncMgr, s.cm)
|
||||||
s.manager.Add(importTask)
|
s.manager.Add(importTask)
|
||||||
err := s.scheduler.importFile(s.reader, importTask)
|
err := importTask.(*ImportTask).importFile(s.reader, importTask)
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,18 +17,12 @@
|
|||||||
package importv2
|
package importv2
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
"github.com/milvus-io/milvus/internal/datanode/metacache"
|
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
|
||||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type TaskType int
|
type TaskType int
|
||||||
@ -130,6 +124,7 @@ func UpdateSegmentInfo(info *datapb.ImportSegmentInfo) UpdateAction {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Task interface {
|
type Task interface {
|
||||||
|
Execute() []*conc.Future[any]
|
||||||
GetJobID() int64
|
GetJobID() int64
|
||||||
GetTaskID() int64
|
GetTaskID() int64
|
||||||
GetCollectionID() int64
|
GetCollectionID() int64
|
||||||
@ -139,183 +134,17 @@ type Task interface {
|
|||||||
GetState() datapb.ImportTaskStateV2
|
GetState() datapb.ImportTaskStateV2
|
||||||
GetReason() string
|
GetReason() string
|
||||||
GetSchema() *schemapb.CollectionSchema
|
GetSchema() *schemapb.CollectionSchema
|
||||||
GetCtx() context.Context
|
|
||||||
GetOptions() []*commonpb.KeyValuePair
|
|
||||||
Cancel()
|
Cancel()
|
||||||
Clone() Task
|
Clone() Task
|
||||||
}
|
}
|
||||||
|
|
||||||
type PreImportTask struct {
|
func WrapLogFields(task Task, fields ...zap.Field) []zap.Field {
|
||||||
*datapb.PreImportTask
|
res := []zap.Field{
|
||||||
ctx context.Context
|
zap.Int64("taskID", task.GetTaskID()),
|
||||||
cancel context.CancelFunc
|
zap.Int64("jobID", task.GetJobID()),
|
||||||
partitionIDs []int64
|
zap.Int64("collectionID", task.GetCollectionID()),
|
||||||
vchannels []string
|
zap.String("type", task.GetType().String()),
|
||||||
schema *schemapb.CollectionSchema
|
|
||||||
options []*commonpb.KeyValuePair
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewPreImportTask(req *datapb.PreImportRequest) Task {
|
|
||||||
fileStats := lo.Map(req.GetImportFiles(), func(file *internalpb.ImportFile, _ int) *datapb.ImportFileStats {
|
|
||||||
return &datapb.ImportFileStats{
|
|
||||||
ImportFile: file,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
// During binlog import, even if the primary key's autoID is set to true,
|
|
||||||
// the primary key from the binlog should be used instead of being reassigned.
|
|
||||||
if importutilv2.IsBackup(req.GetOptions()) {
|
|
||||||
UnsetAutoID(req.GetSchema())
|
|
||||||
}
|
|
||||||
return &PreImportTask{
|
|
||||||
PreImportTask: &datapb.PreImportTask{
|
|
||||||
JobID: req.GetJobID(),
|
|
||||||
TaskID: req.GetTaskID(),
|
|
||||||
CollectionID: req.GetCollectionID(),
|
|
||||||
State: datapb.ImportTaskStateV2_Pending,
|
|
||||||
FileStats: fileStats,
|
|
||||||
},
|
|
||||||
ctx: ctx,
|
|
||||||
cancel: cancel,
|
|
||||||
partitionIDs: req.GetPartitionIDs(),
|
|
||||||
vchannels: req.GetVchannels(),
|
|
||||||
schema: req.GetSchema(),
|
|
||||||
options: req.GetOptions(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PreImportTask) GetPartitionIDs() []int64 {
|
|
||||||
return p.partitionIDs
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PreImportTask) GetVchannels() []string {
|
|
||||||
return p.vchannels
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PreImportTask) GetType() TaskType {
|
|
||||||
return PreImportTaskType
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PreImportTask) GetSchema() *schemapb.CollectionSchema {
|
|
||||||
return p.schema
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PreImportTask) GetOptions() []*commonpb.KeyValuePair {
|
|
||||||
return p.options
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PreImportTask) GetCtx() context.Context {
|
|
||||||
return p.ctx
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PreImportTask) Cancel() {
|
|
||||||
p.cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PreImportTask) Clone() Task {
|
|
||||||
ctx, cancel := context.WithCancel(p.GetCtx())
|
|
||||||
return &PreImportTask{
|
|
||||||
PreImportTask: proto.Clone(p.PreImportTask).(*datapb.PreImportTask),
|
|
||||||
ctx: ctx,
|
|
||||||
cancel: cancel,
|
|
||||||
partitionIDs: p.GetPartitionIDs(),
|
|
||||||
vchannels: p.GetVchannels(),
|
|
||||||
schema: p.GetSchema(),
|
|
||||||
options: p.GetOptions(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type ImportTask struct {
|
|
||||||
*datapb.ImportTaskV2
|
|
||||||
ctx context.Context
|
|
||||||
cancel context.CancelFunc
|
|
||||||
segmentsInfo map[int64]*datapb.ImportSegmentInfo
|
|
||||||
req *datapb.ImportRequest
|
|
||||||
metaCaches map[string]metacache.MetaCache
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewImportTask(req *datapb.ImportRequest) Task {
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
// During binlog import, even if the primary key's autoID is set to true,
|
|
||||||
// the primary key from the binlog should be used instead of being reassigned.
|
|
||||||
if importutilv2.IsBackup(req.GetOptions()) {
|
|
||||||
UnsetAutoID(req.GetSchema())
|
|
||||||
}
|
|
||||||
task := &ImportTask{
|
|
||||||
ImportTaskV2: &datapb.ImportTaskV2{
|
|
||||||
JobID: req.GetJobID(),
|
|
||||||
TaskID: req.GetTaskID(),
|
|
||||||
CollectionID: req.GetCollectionID(),
|
|
||||||
State: datapb.ImportTaskStateV2_Pending,
|
|
||||||
},
|
|
||||||
ctx: ctx,
|
|
||||||
cancel: cancel,
|
|
||||||
segmentsInfo: make(map[int64]*datapb.ImportSegmentInfo),
|
|
||||||
req: req,
|
|
||||||
}
|
|
||||||
task.initMetaCaches(req)
|
|
||||||
return task
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *ImportTask) initMetaCaches(req *datapb.ImportRequest) {
|
|
||||||
metaCaches := make(map[string]metacache.MetaCache)
|
|
||||||
schema := typeutil.AppendSystemFields(req.GetSchema())
|
|
||||||
for _, channel := range req.GetVchannels() {
|
|
||||||
info := &datapb.ChannelWatchInfo{
|
|
||||||
Vchan: &datapb.VchannelInfo{
|
|
||||||
CollectionID: req.GetCollectionID(),
|
|
||||||
ChannelName: channel,
|
|
||||||
},
|
|
||||||
Schema: schema,
|
|
||||||
}
|
|
||||||
metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) *metacache.BloomFilterSet {
|
|
||||||
return metacache.NewBloomFilterSet()
|
|
||||||
})
|
|
||||||
metaCaches[channel] = metaCache
|
|
||||||
}
|
|
||||||
t.metaCaches = metaCaches
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *ImportTask) GetType() TaskType {
|
|
||||||
return ImportTaskType
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *ImportTask) GetPartitionIDs() []int64 {
|
|
||||||
return t.req.GetPartitionIDs()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *ImportTask) GetVchannels() []string {
|
|
||||||
return t.req.GetVchannels()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *ImportTask) GetSchema() *schemapb.CollectionSchema {
|
|
||||||
return t.req.GetSchema()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *ImportTask) GetOptions() []*commonpb.KeyValuePair {
|
|
||||||
return t.req.GetOptions()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *ImportTask) GetCtx() context.Context {
|
|
||||||
return t.ctx
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *ImportTask) Cancel() {
|
|
||||||
t.cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *ImportTask) GetSegmentsInfo() []*datapb.ImportSegmentInfo {
|
|
||||||
return lo.Values(t.segmentsInfo)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *ImportTask) Clone() Task {
|
|
||||||
ctx, cancel := context.WithCancel(t.GetCtx())
|
|
||||||
return &ImportTask{
|
|
||||||
ImportTaskV2: proto.Clone(t.ImportTaskV2).(*datapb.ImportTaskV2),
|
|
||||||
ctx: ctx,
|
|
||||||
cancel: cancel,
|
|
||||||
segmentsInfo: t.segmentsInfo,
|
|
||||||
req: t.req,
|
|
||||||
metaCaches: t.metaCaches,
|
|
||||||
}
|
}
|
||||||
|
res = append(res, fields...)
|
||||||
|
return res
|
||||||
}
|
}
|
||||||
|
248
internal/datanode/importv2/task_import.go
Normal file
248
internal/datanode/importv2/task_import.go
Normal file
@ -0,0 +1,248 @@
|
|||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package importv2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/cockroachdb/errors"
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/samber/lo"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/datanode/metacache"
|
||||||
|
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
||||||
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ImportTask struct {
|
||||||
|
*datapb.ImportTaskV2
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
segmentsInfo map[int64]*datapb.ImportSegmentInfo
|
||||||
|
req *datapb.ImportRequest
|
||||||
|
|
||||||
|
manager TaskManager
|
||||||
|
syncMgr syncmgr.SyncManager
|
||||||
|
cm storage.ChunkManager
|
||||||
|
metaCaches map[string]metacache.MetaCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewImportTask(req *datapb.ImportRequest,
|
||||||
|
manager TaskManager,
|
||||||
|
syncMgr syncmgr.SyncManager,
|
||||||
|
cm storage.ChunkManager,
|
||||||
|
) Task {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
// During binlog import, even if the primary key's autoID is set to true,
|
||||||
|
// the primary key from the binlog should be used instead of being reassigned.
|
||||||
|
if importutilv2.IsBackup(req.GetOptions()) {
|
||||||
|
UnsetAutoID(req.GetSchema())
|
||||||
|
}
|
||||||
|
task := &ImportTask{
|
||||||
|
ImportTaskV2: &datapb.ImportTaskV2{
|
||||||
|
JobID: req.GetJobID(),
|
||||||
|
TaskID: req.GetTaskID(),
|
||||||
|
CollectionID: req.GetCollectionID(),
|
||||||
|
State: datapb.ImportTaskStateV2_Pending,
|
||||||
|
},
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
segmentsInfo: make(map[int64]*datapb.ImportSegmentInfo),
|
||||||
|
req: req,
|
||||||
|
manager: manager,
|
||||||
|
syncMgr: syncMgr,
|
||||||
|
cm: cm,
|
||||||
|
}
|
||||||
|
task.initMetaCaches(req)
|
||||||
|
return task
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *ImportTask) initMetaCaches(req *datapb.ImportRequest) {
|
||||||
|
metaCaches := make(map[string]metacache.MetaCache)
|
||||||
|
schema := typeutil.AppendSystemFields(req.GetSchema())
|
||||||
|
for _, channel := range req.GetVchannels() {
|
||||||
|
info := &datapb.ChannelWatchInfo{
|
||||||
|
Vchan: &datapb.VchannelInfo{
|
||||||
|
CollectionID: req.GetCollectionID(),
|
||||||
|
ChannelName: channel,
|
||||||
|
},
|
||||||
|
Schema: schema,
|
||||||
|
}
|
||||||
|
metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||||
|
return metacache.NewBloomFilterSet()
|
||||||
|
})
|
||||||
|
metaCaches[channel] = metaCache
|
||||||
|
}
|
||||||
|
t.metaCaches = metaCaches
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *ImportTask) GetType() TaskType {
|
||||||
|
return ImportTaskType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *ImportTask) GetPartitionIDs() []int64 {
|
||||||
|
return t.req.GetPartitionIDs()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *ImportTask) GetVchannels() []string {
|
||||||
|
return t.req.GetVchannels()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *ImportTask) GetSchema() *schemapb.CollectionSchema {
|
||||||
|
return t.req.GetSchema()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *ImportTask) Cancel() {
|
||||||
|
t.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *ImportTask) GetSegmentsInfo() []*datapb.ImportSegmentInfo {
|
||||||
|
return lo.Values(t.segmentsInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *ImportTask) Clone() Task {
|
||||||
|
ctx, cancel := context.WithCancel(t.ctx)
|
||||||
|
return &ImportTask{
|
||||||
|
ImportTaskV2: proto.Clone(t.ImportTaskV2).(*datapb.ImportTaskV2),
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
segmentsInfo: t.segmentsInfo,
|
||||||
|
req: t.req,
|
||||||
|
metaCaches: t.metaCaches,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *ImportTask) Execute() []*conc.Future[any] {
|
||||||
|
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
|
||||||
|
log.Info("start to import", WrapLogFields(t,
|
||||||
|
zap.Int("bufferSize", bufferSize),
|
||||||
|
zap.Any("schema", t.GetSchema()))...)
|
||||||
|
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))
|
||||||
|
|
||||||
|
req := t.req
|
||||||
|
|
||||||
|
fn := func(file *internalpb.ImportFile) error {
|
||||||
|
reader, err := importutilv2.NewReader(t.ctx, t.cm, t.GetSchema(), file, t.req.GetOptions(), bufferSize)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("new reader failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...)
|
||||||
|
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error()))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer reader.Close()
|
||||||
|
start := time.Now()
|
||||||
|
err = t.importFile(reader, t)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("do import failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...)
|
||||||
|
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error()))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Info("import file done", WrapLogFields(t, zap.Strings("files", file.GetPaths()),
|
||||||
|
zap.Duration("dur", time.Since(start)))...)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
futures := make([]*conc.Future[any], 0, len(req.GetFiles()))
|
||||||
|
for _, file := range req.GetFiles() {
|
||||||
|
file := file
|
||||||
|
f := GetExecPool().Submit(func() (any, error) {
|
||||||
|
err := fn(file)
|
||||||
|
return err, err
|
||||||
|
})
|
||||||
|
futures = append(futures, f)
|
||||||
|
}
|
||||||
|
return futures
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *ImportTask) importFile(reader importutilv2.Reader, task Task) error {
|
||||||
|
iTask := task.(*ImportTask)
|
||||||
|
syncFutures := make([]*conc.Future[struct{}], 0)
|
||||||
|
syncTasks := make([]syncmgr.Task, 0)
|
||||||
|
for {
|
||||||
|
data, err := reader.Read()
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = AppendSystemFieldsData(iTask, data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
hashedData, err := HashData(iTask, data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fs, sts, err := t.sync(iTask, hashedData)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
syncFutures = append(syncFutures, fs...)
|
||||||
|
syncTasks = append(syncTasks, sts...)
|
||||||
|
}
|
||||||
|
err := conc.AwaitAll(syncFutures...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, syncTask := range syncTasks {
|
||||||
|
segmentInfo, err := NewImportSegmentInfo(syncTask, iTask)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo))
|
||||||
|
log.Info("sync import data done", WrapLogFields(task, zap.Any("segmentInfo", segmentInfo))...)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *ImportTask) sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
|
||||||
|
log.Info("start to sync import data", WrapLogFields(task)...)
|
||||||
|
futures := make([]*conc.Future[struct{}], 0)
|
||||||
|
syncTasks := make([]syncmgr.Task, 0)
|
||||||
|
segmentImportedSizes := make(map[int64]int)
|
||||||
|
for channelIdx, datas := range hashedData {
|
||||||
|
channel := task.GetVchannels()[channelIdx]
|
||||||
|
for partitionIdx, data := range datas {
|
||||||
|
if data.GetRowNum() == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
partitionID := task.GetPartitionIDs()[partitionIdx]
|
||||||
|
size := data.GetMemorySize()
|
||||||
|
segmentID := PickSegment(task, segmentImportedSizes, channel, partitionID, size)
|
||||||
|
syncTask, err := NewSyncTask(task.ctx, task, segmentID, partitionID, channel, data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
segmentImportedSizes[segmentID] += size
|
||||||
|
future := t.syncMgr.SyncData(task.ctx, syncTask)
|
||||||
|
futures = append(futures, future)
|
||||||
|
syncTasks = append(syncTasks, syncTask)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return futures, syncTasks, nil
|
||||||
|
}
|
212
internal/datanode/importv2/task_preimport.go
Normal file
212
internal/datanode/importv2/task_preimport.go
Normal file
@ -0,0 +1,212 @@
|
|||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package importv2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/cockroachdb/errors"
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/samber/lo"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
||||||
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PreImportTask struct {
|
||||||
|
*datapb.PreImportTask
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
partitionIDs []int64
|
||||||
|
vchannels []string
|
||||||
|
schema *schemapb.CollectionSchema
|
||||||
|
options []*commonpb.KeyValuePair
|
||||||
|
|
||||||
|
manager TaskManager
|
||||||
|
cm storage.ChunkManager
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPreImportTask(req *datapb.PreImportRequest,
|
||||||
|
manager TaskManager,
|
||||||
|
cm storage.ChunkManager,
|
||||||
|
) Task {
|
||||||
|
fileStats := lo.Map(req.GetImportFiles(), func(file *internalpb.ImportFile, _ int) *datapb.ImportFileStats {
|
||||||
|
return &datapb.ImportFileStats{
|
||||||
|
ImportFile: file,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
// During binlog import, even if the primary key's autoID is set to true,
|
||||||
|
// the primary key from the binlog should be used instead of being reassigned.
|
||||||
|
if importutilv2.IsBackup(req.GetOptions()) {
|
||||||
|
UnsetAutoID(req.GetSchema())
|
||||||
|
}
|
||||||
|
return &PreImportTask{
|
||||||
|
PreImportTask: &datapb.PreImportTask{
|
||||||
|
JobID: req.GetJobID(),
|
||||||
|
TaskID: req.GetTaskID(),
|
||||||
|
CollectionID: req.GetCollectionID(),
|
||||||
|
State: datapb.ImportTaskStateV2_Pending,
|
||||||
|
FileStats: fileStats,
|
||||||
|
},
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
partitionIDs: req.GetPartitionIDs(),
|
||||||
|
vchannels: req.GetVchannels(),
|
||||||
|
schema: req.GetSchema(),
|
||||||
|
options: req.GetOptions(),
|
||||||
|
manager: manager,
|
||||||
|
cm: cm,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PreImportTask) GetPartitionIDs() []int64 {
|
||||||
|
return p.partitionIDs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PreImportTask) GetVchannels() []string {
|
||||||
|
return p.vchannels
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PreImportTask) GetType() TaskType {
|
||||||
|
return PreImportTaskType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PreImportTask) GetSchema() *schemapb.CollectionSchema {
|
||||||
|
return p.schema
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PreImportTask) Cancel() {
|
||||||
|
p.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PreImportTask) Clone() Task {
|
||||||
|
ctx, cancel := context.WithCancel(p.ctx)
|
||||||
|
return &PreImportTask{
|
||||||
|
PreImportTask: proto.Clone(p.PreImportTask).(*datapb.PreImportTask),
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
partitionIDs: p.GetPartitionIDs(),
|
||||||
|
vchannels: p.GetVchannels(),
|
||||||
|
schema: p.GetSchema(),
|
||||||
|
options: p.options,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PreImportTask) Execute() []*conc.Future[any] {
|
||||||
|
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
|
||||||
|
log.Info("start to preimport", WrapLogFields(p,
|
||||||
|
zap.Int("bufferSize", bufferSize),
|
||||||
|
zap.Any("schema", p.GetSchema()))...)
|
||||||
|
p.manager.Update(p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))
|
||||||
|
files := lo.Map(p.GetFileStats(),
|
||||||
|
func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile {
|
||||||
|
return fileStat.GetImportFile()
|
||||||
|
})
|
||||||
|
|
||||||
|
fn := func(i int, file *internalpb.ImportFile) error {
|
||||||
|
reader, err := importutilv2.NewReader(p.ctx, p.cm, p.GetSchema(), file, p.options, bufferSize)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("new reader failed", WrapLogFields(p, zap.String("file", file.String()), zap.Error(err))...)
|
||||||
|
p.manager.Update(p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error()))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer reader.Close()
|
||||||
|
start := time.Now()
|
||||||
|
err = p.readFileStat(reader, p, i)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("preimport failed", WrapLogFields(p, zap.String("file", file.String()), zap.Error(err))...)
|
||||||
|
p.manager.Update(p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error()))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Info("read file stat done", WrapLogFields(p, zap.Strings("files", file.GetPaths()),
|
||||||
|
zap.Duration("dur", time.Since(start)))...)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
futures := make([]*conc.Future[any], 0, len(files))
|
||||||
|
for i, file := range files {
|
||||||
|
i := i
|
||||||
|
file := file
|
||||||
|
f := GetExecPool().Submit(func() (any, error) {
|
||||||
|
err := fn(i, file)
|
||||||
|
return err, err
|
||||||
|
})
|
||||||
|
futures = append(futures, f)
|
||||||
|
}
|
||||||
|
return futures
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PreImportTask) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error {
|
||||||
|
fileSize, err := reader.Size()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
maxSize := paramtable.Get().DataNodeCfg.MaxImportFileSizeInGB.GetAsFloat() * 1024 * 1024 * 1024
|
||||||
|
if fileSize > int64(maxSize) {
|
||||||
|
return errors.New(fmt.Sprintf(
|
||||||
|
"The import file size has reached the maximum limit allowed for importing, "+
|
||||||
|
"fileSize=%d, maxSize=%d", fileSize, int64(maxSize)))
|
||||||
|
}
|
||||||
|
|
||||||
|
totalRows := 0
|
||||||
|
totalSize := 0
|
||||||
|
hashedStats := make(map[string]*datapb.PartitionImportStats)
|
||||||
|
for {
|
||||||
|
data, err := reader.Read()
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = CheckRowsEqual(task.GetSchema(), data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
rowsCount, err := GetRowsStats(task, data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
MergeHashedStats(rowsCount, hashedStats)
|
||||||
|
rows := data.GetRowNum()
|
||||||
|
size := data.GetMemorySize()
|
||||||
|
totalRows += rows
|
||||||
|
totalSize += size
|
||||||
|
log.Info("reading file stat...", WrapLogFields(task, zap.Int("readRows", rows), zap.Int("readSize", size))...)
|
||||||
|
}
|
||||||
|
|
||||||
|
stat := &datapb.ImportFileStats{
|
||||||
|
FileSize: fileSize,
|
||||||
|
TotalRows: int64(totalRows),
|
||||||
|
TotalMemorySize: int64(totalSize),
|
||||||
|
HashedStats: hashedStats,
|
||||||
|
}
|
||||||
|
p.manager.Update(task.GetTaskID(), UpdateFileStat(fileIdx, stat))
|
||||||
|
return nil
|
||||||
|
}
|
@ -418,7 +418,7 @@ func (node *DataNode) PreImport(ctx context.Context, req *datapb.PreImportReques
|
|||||||
return merr.Status(err), nil
|
return merr.Status(err), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
task := importv2.NewPreImportTask(req)
|
task := importv2.NewPreImportTask(req, node.importTaskMgr, node.chunkManager)
|
||||||
node.importTaskMgr.Add(task)
|
node.importTaskMgr.Add(task)
|
||||||
|
|
||||||
log.Info("datanode added preimport task")
|
log.Info("datanode added preimport task")
|
||||||
@ -437,7 +437,7 @@ func (node *DataNode) ImportV2(ctx context.Context, req *datapb.ImportRequest) (
|
|||||||
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
|
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
|
||||||
return merr.Status(err), nil
|
return merr.Status(err), nil
|
||||||
}
|
}
|
||||||
task := importv2.NewImportTask(req)
|
task := importv2.NewImportTask(req, node.importTaskMgr, node.syncMgr, node.chunkManager)
|
||||||
node.importTaskMgr.Add(task)
|
node.importTaskMgr.Add(task)
|
||||||
|
|
||||||
log.Info("datanode added import task")
|
log.Info("datanode added import task")
|
||||||
|
Loading…
Reference in New Issue
Block a user