diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 820fd21e84..2ac347cb45 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -288,7 +288,7 @@ func (node *DataNode) Init() error { node.writeBufferManager = writebuffer.NewManager(syncMgr) node.importTaskMgr = importv2.NewTaskManager() - node.importScheduler = importv2.NewScheduler(node.importTaskMgr, node.syncMgr, node.chunkManager) + node.importScheduler = importv2.NewScheduler(node.importTaskMgr) node.channelCheckpointUpdater = newChannelCheckpointUpdater(node) node.flowgraphManager = newFlowgraphManager() diff --git a/internal/datanode/importv2/pool.go b/internal/datanode/importv2/pool.go new file mode 100644 index 0000000000..3558477773 --- /dev/null +++ b/internal/datanode/importv2/pool.go @@ -0,0 +1,41 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importv2 + +import ( + "sync" + + "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +var ( + execPool *conc.Pool[any] + execPoolInitOnce sync.Once +) + +func initExecPool() { + execPool = conc.NewPool[any]( + paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt(), + conc.WithPreAlloc(true), + ) +} + +func GetExecPool() *conc.Pool[any] { + execPoolInitOnce.Do(initExecPool) + return execPool +} diff --git a/internal/datanode/importv2/scheduler.go b/internal/datanode/importv2/scheduler.go index 37884d87d8..d1d58e8df0 100644 --- a/internal/datanode/importv2/scheduler.go +++ b/internal/datanode/importv2/scheduler.go @@ -17,20 +17,13 @@ package importv2 import ( - "fmt" - "io" "sync" "time" - "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -44,25 +37,14 @@ type Scheduler interface { type scheduler struct { manager TaskManager - syncMgr syncmgr.SyncManager - cm storage.ChunkManager - - pool *conc.Pool[any] closeOnce sync.Once closeChan chan struct{} } -func NewScheduler(manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.ChunkManager) Scheduler { - pool := conc.NewPool[any]( - paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt(), - conc.WithPreAlloc(true), - ) +func NewScheduler(manager TaskManager) Scheduler { return &scheduler{ manager: manager, - syncMgr: syncMgr, - cm: cm, - pool: pool, closeChan: make(chan struct{}), } } @@ -84,16 +66,9 @@ func (s *scheduler) Start() { tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending)) futures := make(map[int64][]*conc.Future[any]) for _, task := range tasks { - switch task.GetType() { - case PreImportTaskType: - fs := s.PreImport(task) - futures[task.GetTaskID()] = fs - tryFreeFutures(futures) - case ImportTaskType: - fs := s.Import(task) - futures[task.GetTaskID()] = fs - tryFreeFutures(futures) - } + fs := task.Execute() + futures[task.GetTaskID()] = fs + tryFreeFutures(futures) } for taskID, fs := range futures { err := conc.AwaitAll(fs...) @@ -120,17 +95,6 @@ func (s *scheduler) Close() { }) } -func WrapLogFields(task Task, fields ...zap.Field) []zap.Field { - res := []zap.Field{ - zap.Int64("taskID", task.GetTaskID()), - zap.Int64("jobID", task.GetJobID()), - zap.Int64("collectionID", task.GetCollectionID()), - zap.String("type", task.GetType().String()), - } - res = append(res, fields...) - return res -} - func tryFreeFutures(futures map[int64][]*conc.Future[any]) { for k, fs := range futures { fs = lo.Filter(fs, func(f *conc.Future[any], _ int) bool { @@ -143,207 +107,3 @@ func tryFreeFutures(futures map[int64][]*conc.Future[any]) { futures[k] = fs } } - -func (s *scheduler) handleErr(task Task, err error, msg string) { - log.Warn(msg, WrapLogFields(task, zap.Error(err))...) - s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) -} - -func (s *scheduler) PreImport(task Task) []*conc.Future[any] { - bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024 - log.Info("start to preimport", WrapLogFields(task, - zap.Int("bufferSize", bufferSize), - zap.Any("schema", task.GetSchema()))...) - s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) - files := lo.Map(task.(*PreImportTask).GetFileStats(), - func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile { - return fileStat.GetImportFile() - }) - - fn := func(i int, file *internalpb.ImportFile) error { - reader, err := importutilv2.NewReader(task.GetCtx(), s.cm, task.GetSchema(), file, task.GetOptions(), bufferSize) - if err != nil { - s.handleErr(task, err, "new reader failed") - return err - } - defer reader.Close() - start := time.Now() - err = s.readFileStat(reader, task, i) - if err != nil { - s.handleErr(task, err, "preimport failed") - return err - } - log.Info("read file stat done", WrapLogFields(task, zap.Strings("files", file.GetPaths()), - zap.Duration("dur", time.Since(start)))...) - return nil - } - - futures := make([]*conc.Future[any], 0, len(files)) - for i, file := range files { - i := i - file := file - f := s.pool.Submit(func() (any, error) { - err := fn(i, file) - return err, err - }) - futures = append(futures, f) - } - return futures -} - -func (s *scheduler) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error { - fileSize, err := reader.Size() - if err != nil { - return err - } - maxSize := paramtable.Get().DataNodeCfg.MaxImportFileSizeInGB.GetAsFloat() * 1024 * 1024 * 1024 - if fileSize > int64(maxSize) { - return errors.New(fmt.Sprintf( - "The import file size has reached the maximum limit allowed for importing, "+ - "fileSize=%d, maxSize=%d", fileSize, int64(maxSize))) - } - - totalRows := 0 - totalSize := 0 - hashedStats := make(map[string]*datapb.PartitionImportStats) - for { - data, err := reader.Read() - if err != nil { - if errors.Is(err, io.EOF) { - break - } - return err - } - err = CheckRowsEqual(task.GetSchema(), data) - if err != nil { - return err - } - rowsCount, err := GetRowsStats(task, data) - if err != nil { - return err - } - MergeHashedStats(rowsCount, hashedStats) - rows := data.GetRowNum() - size := data.GetMemorySize() - totalRows += rows - totalSize += size - log.Info("reading file stat...", WrapLogFields(task, zap.Int("readRows", rows), zap.Int("readSize", size))...) - } - - stat := &datapb.ImportFileStats{ - FileSize: fileSize, - TotalRows: int64(totalRows), - TotalMemorySize: int64(totalSize), - HashedStats: hashedStats, - } - s.manager.Update(task.GetTaskID(), UpdateFileStat(fileIdx, stat)) - return nil -} - -func (s *scheduler) Import(task Task) []*conc.Future[any] { - bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024 - log.Info("start to import", WrapLogFields(task, - zap.Int("bufferSize", bufferSize), - zap.Any("schema", task.GetSchema()))...) - s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) - - req := task.(*ImportTask).req - - fn := func(file *internalpb.ImportFile) error { - reader, err := importutilv2.NewReader(task.GetCtx(), s.cm, task.GetSchema(), file, task.GetOptions(), bufferSize) - if err != nil { - s.handleErr(task, err, fmt.Sprintf("new reader failed, file: %s", file.String())) - return err - } - defer reader.Close() - start := time.Now() - err = s.importFile(reader, task) - if err != nil { - s.handleErr(task, err, fmt.Sprintf("do import failed, file: %s", file.String())) - return err - } - log.Info("import file done", WrapLogFields(task, zap.Strings("files", file.GetPaths()), - zap.Duration("dur", time.Since(start)))...) - return nil - } - - futures := make([]*conc.Future[any], 0, len(req.GetFiles())) - for _, file := range req.GetFiles() { - file := file - f := s.pool.Submit(func() (any, error) { - err := fn(file) - return err, err - }) - futures = append(futures, f) - } - return futures -} - -func (s *scheduler) importFile(reader importutilv2.Reader, task Task) error { - iTask := task.(*ImportTask) - syncFutures := make([]*conc.Future[struct{}], 0) - syncTasks := make([]syncmgr.Task, 0) - for { - data, err := reader.Read() - if err != nil { - if errors.Is(err, io.EOF) { - break - } - return err - } - err = AppendSystemFieldsData(iTask, data) - if err != nil { - return err - } - hashedData, err := HashData(iTask, data) - if err != nil { - return err - } - fs, sts, err := s.Sync(iTask, hashedData) - if err != nil { - return err - } - syncFutures = append(syncFutures, fs...) - syncTasks = append(syncTasks, sts...) - } - err := conc.AwaitAll(syncFutures...) - if err != nil { - return err - } - for _, syncTask := range syncTasks { - segmentInfo, err := NewImportSegmentInfo(syncTask, iTask) - if err != nil { - return err - } - s.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo)) - log.Info("sync import data done", WrapLogFields(task, zap.Any("segmentInfo", segmentInfo))...) - } - return nil -} - -func (s *scheduler) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[struct{}], []syncmgr.Task, error) { - log.Info("start to sync import data", WrapLogFields(task)...) - futures := make([]*conc.Future[struct{}], 0) - syncTasks := make([]syncmgr.Task, 0) - segmentImportedSizes := make(map[int64]int) - for channelIdx, datas := range hashedData { - channel := task.GetVchannels()[channelIdx] - for partitionIdx, data := range datas { - if data.GetRowNum() == 0 { - continue - } - partitionID := task.GetPartitionIDs()[partitionIdx] - size := data.GetMemorySize() - segmentID := PickSegment(task, segmentImportedSizes, channel, partitionID, size) - syncTask, err := NewSyncTask(task.GetCtx(), task, segmentID, partitionID, channel, data) - if err != nil { - return nil, nil, err - } - segmentImportedSizes[segmentID] += size - future := s.syncMgr.SyncData(task.GetCtx(), syncTask) - futures = append(futures, future) - syncTasks = append(syncTasks, syncTask) - } - } - return futures, syncTasks, nil -} diff --git a/internal/datanode/importv2/scheduler_test.go b/internal/datanode/importv2/scheduler_test.go index bdb876f750..a49e232992 100644 --- a/internal/datanode/importv2/scheduler_test.go +++ b/internal/datanode/importv2/scheduler_test.go @@ -116,7 +116,7 @@ func (s *SchedulerSuite) SetupTest() { s.manager = NewTaskManager() s.syncMgr = syncmgr.NewMockSyncManager(s.T()) - s.scheduler = NewScheduler(s.manager, s.syncMgr, nil).(*scheduler) + s.scheduler = NewScheduler(s.manager).(*scheduler) } func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData { @@ -236,7 +236,7 @@ func (s *SchedulerSuite) TestScheduler_Slots() { Schema: s.schema, ImportFiles: []*internalpb.ImportFile{{Paths: []string{"dummy.json"}}}, } - preimportTask := NewPreImportTask(preimportReq) + preimportTask := NewPreImportTask(preimportReq, s.manager, s.cm) s.manager.Add(preimportTask) slots := s.scheduler.Slots() @@ -262,7 +262,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Preimport() { ioReader := strings.NewReader(string(bytes)) cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) - s.scheduler.cm = cm + s.cm = cm preimportReq := &datapb.PreImportRequest{ JobID: 1, @@ -273,7 +273,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Preimport() { Schema: s.schema, ImportFiles: []*internalpb.ImportFile{{Paths: []string{"dummy.json"}}}, } - preimportTask := NewPreImportTask(preimportReq) + preimportTask := NewPreImportTask(preimportReq, s.manager, s.cm) s.manager.Add(preimportTask) go s.scheduler.Start() @@ -316,7 +316,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Preimport_Failed() { ioReader := strings.NewReader(string(bytes)) cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) - s.scheduler.cm = cm + s.cm = cm preimportReq := &datapb.PreImportRequest{ JobID: 1, @@ -327,7 +327,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Preimport_Failed() { Schema: s.schema, ImportFiles: []*internalpb.ImportFile{{Paths: []string{"dummy.json"}}}, } - preimportTask := NewPreImportTask(preimportReq) + preimportTask := NewPreImportTask(preimportReq, s.manager, s.cm) s.manager.Add(preimportTask) go s.scheduler.Start() @@ -355,7 +355,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() { cm := mocks.NewChunkManager(s.T()) ioReader := strings.NewReader(string(bytes)) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) - s.scheduler.cm = cm + s.cm = cm s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] { future := conc.Go(func() (struct{}, error) { @@ -388,7 +388,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() { }, }, } - importTask := NewImportTask(importReq) + importTask := NewImportTask(importReq, s.manager, s.syncMgr, s.cm) s.manager.Add(importTask) go s.scheduler.Start() @@ -416,7 +416,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() { cm := mocks.NewChunkManager(s.T()) ioReader := strings.NewReader(string(bytes)) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) - s.scheduler.cm = cm + s.cm = cm s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] { future := conc.Go(func() (struct{}, error) { @@ -449,7 +449,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() { }, }, } - importTask := NewImportTask(importReq) + importTask := NewImportTask(importReq, s.manager, s.syncMgr, s.cm) s.manager.Add(importTask) go s.scheduler.Start() @@ -487,9 +487,9 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() { Schema: s.schema, ImportFiles: []*internalpb.ImportFile{importFile}, } - preimportTask := NewPreImportTask(preimportReq) + preimportTask := NewPreImportTask(preimportReq, s.manager, s.cm) s.manager.Add(preimportTask) - err := s.scheduler.readFileStat(s.reader, preimportTask, 0) + err := preimportTask.(*PreImportTask).readFileStat(s.reader, preimportTask, 0) s.NoError(err) } @@ -538,9 +538,9 @@ func (s *SchedulerSuite) TestScheduler_ImportFile() { }, }, } - importTask := NewImportTask(importReq) + importTask := NewImportTask(importReq, s.manager, s.syncMgr, s.cm) s.manager.Add(importTask) - err := s.scheduler.importFile(s.reader, importTask) + err := importTask.(*ImportTask).importFile(s.reader, importTask) s.NoError(err) } diff --git a/internal/datanode/importv2/task.go b/internal/datanode/importv2/task.go index a13f421f55..d349bf833b 100644 --- a/internal/datanode/importv2/task.go +++ b/internal/datanode/importv2/task.go @@ -17,18 +17,12 @@ package importv2 import ( - "context" - - "github.com/golang/protobuf/proto" "github.com/samber/lo" + "go.uber.org/zap" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/util/importutilv2" - "github.com/milvus-io/milvus/pkg/util/typeutil" + "github.com/milvus-io/milvus/pkg/util/conc" ) type TaskType int @@ -130,6 +124,7 @@ func UpdateSegmentInfo(info *datapb.ImportSegmentInfo) UpdateAction { } type Task interface { + Execute() []*conc.Future[any] GetJobID() int64 GetTaskID() int64 GetCollectionID() int64 @@ -139,183 +134,17 @@ type Task interface { GetState() datapb.ImportTaskStateV2 GetReason() string GetSchema() *schemapb.CollectionSchema - GetCtx() context.Context - GetOptions() []*commonpb.KeyValuePair Cancel() Clone() Task } -type PreImportTask struct { - *datapb.PreImportTask - ctx context.Context - cancel context.CancelFunc - partitionIDs []int64 - vchannels []string - schema *schemapb.CollectionSchema - options []*commonpb.KeyValuePair -} - -func NewPreImportTask(req *datapb.PreImportRequest) Task { - fileStats := lo.Map(req.GetImportFiles(), func(file *internalpb.ImportFile, _ int) *datapb.ImportFileStats { - return &datapb.ImportFileStats{ - ImportFile: file, - } - }) - ctx, cancel := context.WithCancel(context.Background()) - // During binlog import, even if the primary key's autoID is set to true, - // the primary key from the binlog should be used instead of being reassigned. - if importutilv2.IsBackup(req.GetOptions()) { - UnsetAutoID(req.GetSchema()) - } - return &PreImportTask{ - PreImportTask: &datapb.PreImportTask{ - JobID: req.GetJobID(), - TaskID: req.GetTaskID(), - CollectionID: req.GetCollectionID(), - State: datapb.ImportTaskStateV2_Pending, - FileStats: fileStats, - }, - ctx: ctx, - cancel: cancel, - partitionIDs: req.GetPartitionIDs(), - vchannels: req.GetVchannels(), - schema: req.GetSchema(), - options: req.GetOptions(), - } -} - -func (p *PreImportTask) GetPartitionIDs() []int64 { - return p.partitionIDs -} - -func (p *PreImportTask) GetVchannels() []string { - return p.vchannels -} - -func (p *PreImportTask) GetType() TaskType { - return PreImportTaskType -} - -func (p *PreImportTask) GetSchema() *schemapb.CollectionSchema { - return p.schema -} - -func (p *PreImportTask) GetOptions() []*commonpb.KeyValuePair { - return p.options -} - -func (p *PreImportTask) GetCtx() context.Context { - return p.ctx -} - -func (p *PreImportTask) Cancel() { - p.cancel() -} - -func (p *PreImportTask) Clone() Task { - ctx, cancel := context.WithCancel(p.GetCtx()) - return &PreImportTask{ - PreImportTask: proto.Clone(p.PreImportTask).(*datapb.PreImportTask), - ctx: ctx, - cancel: cancel, - partitionIDs: p.GetPartitionIDs(), - vchannels: p.GetVchannels(), - schema: p.GetSchema(), - options: p.GetOptions(), - } -} - -type ImportTask struct { - *datapb.ImportTaskV2 - ctx context.Context - cancel context.CancelFunc - segmentsInfo map[int64]*datapb.ImportSegmentInfo - req *datapb.ImportRequest - metaCaches map[string]metacache.MetaCache -} - -func NewImportTask(req *datapb.ImportRequest) Task { - ctx, cancel := context.WithCancel(context.Background()) - // During binlog import, even if the primary key's autoID is set to true, - // the primary key from the binlog should be used instead of being reassigned. - if importutilv2.IsBackup(req.GetOptions()) { - UnsetAutoID(req.GetSchema()) - } - task := &ImportTask{ - ImportTaskV2: &datapb.ImportTaskV2{ - JobID: req.GetJobID(), - TaskID: req.GetTaskID(), - CollectionID: req.GetCollectionID(), - State: datapb.ImportTaskStateV2_Pending, - }, - ctx: ctx, - cancel: cancel, - segmentsInfo: make(map[int64]*datapb.ImportSegmentInfo), - req: req, - } - task.initMetaCaches(req) - return task -} - -func (t *ImportTask) initMetaCaches(req *datapb.ImportRequest) { - metaCaches := make(map[string]metacache.MetaCache) - schema := typeutil.AppendSystemFields(req.GetSchema()) - for _, channel := range req.GetVchannels() { - info := &datapb.ChannelWatchInfo{ - Vchan: &datapb.VchannelInfo{ - CollectionID: req.GetCollectionID(), - ChannelName: channel, - }, - Schema: schema, - } - metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) *metacache.BloomFilterSet { - return metacache.NewBloomFilterSet() - }) - metaCaches[channel] = metaCache - } - t.metaCaches = metaCaches -} - -func (t *ImportTask) GetType() TaskType { - return ImportTaskType -} - -func (t *ImportTask) GetPartitionIDs() []int64 { - return t.req.GetPartitionIDs() -} - -func (t *ImportTask) GetVchannels() []string { - return t.req.GetVchannels() -} - -func (t *ImportTask) GetSchema() *schemapb.CollectionSchema { - return t.req.GetSchema() -} - -func (t *ImportTask) GetOptions() []*commonpb.KeyValuePair { - return t.req.GetOptions() -} - -func (t *ImportTask) GetCtx() context.Context { - return t.ctx -} - -func (t *ImportTask) Cancel() { - t.cancel() -} - -func (t *ImportTask) GetSegmentsInfo() []*datapb.ImportSegmentInfo { - return lo.Values(t.segmentsInfo) -} - -func (t *ImportTask) Clone() Task { - ctx, cancel := context.WithCancel(t.GetCtx()) - return &ImportTask{ - ImportTaskV2: proto.Clone(t.ImportTaskV2).(*datapb.ImportTaskV2), - ctx: ctx, - cancel: cancel, - segmentsInfo: t.segmentsInfo, - req: t.req, - metaCaches: t.metaCaches, +func WrapLogFields(task Task, fields ...zap.Field) []zap.Field { + res := []zap.Field{ + zap.Int64("taskID", task.GetTaskID()), + zap.Int64("jobID", task.GetJobID()), + zap.Int64("collectionID", task.GetCollectionID()), + zap.String("type", task.GetType().String()), } + res = append(res, fields...) + return res } diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go new file mode 100644 index 0000000000..0b99348843 --- /dev/null +++ b/internal/datanode/importv2/task_import.go @@ -0,0 +1,248 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importv2 + +import ( + "context" + "io" + "time" + + "github.com/cockroachdb/errors" + "github.com/golang/protobuf/proto" + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type ImportTask struct { + *datapb.ImportTaskV2 + ctx context.Context + cancel context.CancelFunc + segmentsInfo map[int64]*datapb.ImportSegmentInfo + req *datapb.ImportRequest + + manager TaskManager + syncMgr syncmgr.SyncManager + cm storage.ChunkManager + metaCaches map[string]metacache.MetaCache +} + +func NewImportTask(req *datapb.ImportRequest, + manager TaskManager, + syncMgr syncmgr.SyncManager, + cm storage.ChunkManager, +) Task { + ctx, cancel := context.WithCancel(context.Background()) + // During binlog import, even if the primary key's autoID is set to true, + // the primary key from the binlog should be used instead of being reassigned. + if importutilv2.IsBackup(req.GetOptions()) { + UnsetAutoID(req.GetSchema()) + } + task := &ImportTask{ + ImportTaskV2: &datapb.ImportTaskV2{ + JobID: req.GetJobID(), + TaskID: req.GetTaskID(), + CollectionID: req.GetCollectionID(), + State: datapb.ImportTaskStateV2_Pending, + }, + ctx: ctx, + cancel: cancel, + segmentsInfo: make(map[int64]*datapb.ImportSegmentInfo), + req: req, + manager: manager, + syncMgr: syncMgr, + cm: cm, + } + task.initMetaCaches(req) + return task +} + +func (t *ImportTask) initMetaCaches(req *datapb.ImportRequest) { + metaCaches := make(map[string]metacache.MetaCache) + schema := typeutil.AppendSystemFields(req.GetSchema()) + for _, channel := range req.GetVchannels() { + info := &datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ + CollectionID: req.GetCollectionID(), + ChannelName: channel, + }, + Schema: schema, + } + metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + metaCaches[channel] = metaCache + } + t.metaCaches = metaCaches +} + +func (t *ImportTask) GetType() TaskType { + return ImportTaskType +} + +func (t *ImportTask) GetPartitionIDs() []int64 { + return t.req.GetPartitionIDs() +} + +func (t *ImportTask) GetVchannels() []string { + return t.req.GetVchannels() +} + +func (t *ImportTask) GetSchema() *schemapb.CollectionSchema { + return t.req.GetSchema() +} + +func (t *ImportTask) Cancel() { + t.cancel() +} + +func (t *ImportTask) GetSegmentsInfo() []*datapb.ImportSegmentInfo { + return lo.Values(t.segmentsInfo) +} + +func (t *ImportTask) Clone() Task { + ctx, cancel := context.WithCancel(t.ctx) + return &ImportTask{ + ImportTaskV2: proto.Clone(t.ImportTaskV2).(*datapb.ImportTaskV2), + ctx: ctx, + cancel: cancel, + segmentsInfo: t.segmentsInfo, + req: t.req, + metaCaches: t.metaCaches, + } +} + +func (t *ImportTask) Execute() []*conc.Future[any] { + bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024 + log.Info("start to import", WrapLogFields(t, + zap.Int("bufferSize", bufferSize), + zap.Any("schema", t.GetSchema()))...) + t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) + + req := t.req + + fn := func(file *internalpb.ImportFile) error { + reader, err := importutilv2.NewReader(t.ctx, t.cm, t.GetSchema(), file, t.req.GetOptions(), bufferSize) + if err != nil { + log.Warn("new reader failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...) + t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) + return err + } + defer reader.Close() + start := time.Now() + err = t.importFile(reader, t) + if err != nil { + log.Warn("do import failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...) + t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) + return err + } + log.Info("import file done", WrapLogFields(t, zap.Strings("files", file.GetPaths()), + zap.Duration("dur", time.Since(start)))...) + return nil + } + + futures := make([]*conc.Future[any], 0, len(req.GetFiles())) + for _, file := range req.GetFiles() { + file := file + f := GetExecPool().Submit(func() (any, error) { + err := fn(file) + return err, err + }) + futures = append(futures, f) + } + return futures +} + +func (t *ImportTask) importFile(reader importutilv2.Reader, task Task) error { + iTask := task.(*ImportTask) + syncFutures := make([]*conc.Future[struct{}], 0) + syncTasks := make([]syncmgr.Task, 0) + for { + data, err := reader.Read() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err + } + err = AppendSystemFieldsData(iTask, data) + if err != nil { + return err + } + hashedData, err := HashData(iTask, data) + if err != nil { + return err + } + fs, sts, err := t.sync(iTask, hashedData) + if err != nil { + return err + } + syncFutures = append(syncFutures, fs...) + syncTasks = append(syncTasks, sts...) + } + err := conc.AwaitAll(syncFutures...) + if err != nil { + return err + } + for _, syncTask := range syncTasks { + segmentInfo, err := NewImportSegmentInfo(syncTask, iTask) + if err != nil { + return err + } + t.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo)) + log.Info("sync import data done", WrapLogFields(task, zap.Any("segmentInfo", segmentInfo))...) + } + return nil +} + +func (t *ImportTask) sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[struct{}], []syncmgr.Task, error) { + log.Info("start to sync import data", WrapLogFields(task)...) + futures := make([]*conc.Future[struct{}], 0) + syncTasks := make([]syncmgr.Task, 0) + segmentImportedSizes := make(map[int64]int) + for channelIdx, datas := range hashedData { + channel := task.GetVchannels()[channelIdx] + for partitionIdx, data := range datas { + if data.GetRowNum() == 0 { + continue + } + partitionID := task.GetPartitionIDs()[partitionIdx] + size := data.GetMemorySize() + segmentID := PickSegment(task, segmentImportedSizes, channel, partitionID, size) + syncTask, err := NewSyncTask(task.ctx, task, segmentID, partitionID, channel, data) + if err != nil { + return nil, nil, err + } + segmentImportedSizes[segmentID] += size + future := t.syncMgr.SyncData(task.ctx, syncTask) + futures = append(futures, future) + syncTasks = append(syncTasks, syncTask) + } + } + return futures, syncTasks, nil +} diff --git a/internal/datanode/importv2/task_preimport.go b/internal/datanode/importv2/task_preimport.go new file mode 100644 index 0000000000..4d2ce93de7 --- /dev/null +++ b/internal/datanode/importv2/task_preimport.go @@ -0,0 +1,212 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importv2 + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/cockroachdb/errors" + "github.com/golang/protobuf/proto" + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +type PreImportTask struct { + *datapb.PreImportTask + ctx context.Context + cancel context.CancelFunc + partitionIDs []int64 + vchannels []string + schema *schemapb.CollectionSchema + options []*commonpb.KeyValuePair + + manager TaskManager + cm storage.ChunkManager +} + +func NewPreImportTask(req *datapb.PreImportRequest, + manager TaskManager, + cm storage.ChunkManager, +) Task { + fileStats := lo.Map(req.GetImportFiles(), func(file *internalpb.ImportFile, _ int) *datapb.ImportFileStats { + return &datapb.ImportFileStats{ + ImportFile: file, + } + }) + ctx, cancel := context.WithCancel(context.Background()) + // During binlog import, even if the primary key's autoID is set to true, + // the primary key from the binlog should be used instead of being reassigned. + if importutilv2.IsBackup(req.GetOptions()) { + UnsetAutoID(req.GetSchema()) + } + return &PreImportTask{ + PreImportTask: &datapb.PreImportTask{ + JobID: req.GetJobID(), + TaskID: req.GetTaskID(), + CollectionID: req.GetCollectionID(), + State: datapb.ImportTaskStateV2_Pending, + FileStats: fileStats, + }, + ctx: ctx, + cancel: cancel, + partitionIDs: req.GetPartitionIDs(), + vchannels: req.GetVchannels(), + schema: req.GetSchema(), + options: req.GetOptions(), + manager: manager, + cm: cm, + } +} + +func (p *PreImportTask) GetPartitionIDs() []int64 { + return p.partitionIDs +} + +func (p *PreImportTask) GetVchannels() []string { + return p.vchannels +} + +func (p *PreImportTask) GetType() TaskType { + return PreImportTaskType +} + +func (p *PreImportTask) GetSchema() *schemapb.CollectionSchema { + return p.schema +} + +func (p *PreImportTask) Cancel() { + p.cancel() +} + +func (p *PreImportTask) Clone() Task { + ctx, cancel := context.WithCancel(p.ctx) + return &PreImportTask{ + PreImportTask: proto.Clone(p.PreImportTask).(*datapb.PreImportTask), + ctx: ctx, + cancel: cancel, + partitionIDs: p.GetPartitionIDs(), + vchannels: p.GetVchannels(), + schema: p.GetSchema(), + options: p.options, + } +} + +func (p *PreImportTask) Execute() []*conc.Future[any] { + bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024 + log.Info("start to preimport", WrapLogFields(p, + zap.Int("bufferSize", bufferSize), + zap.Any("schema", p.GetSchema()))...) + p.manager.Update(p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) + files := lo.Map(p.GetFileStats(), + func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile { + return fileStat.GetImportFile() + }) + + fn := func(i int, file *internalpb.ImportFile) error { + reader, err := importutilv2.NewReader(p.ctx, p.cm, p.GetSchema(), file, p.options, bufferSize) + if err != nil { + log.Warn("new reader failed", WrapLogFields(p, zap.String("file", file.String()), zap.Error(err))...) + p.manager.Update(p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) + return err + } + defer reader.Close() + start := time.Now() + err = p.readFileStat(reader, p, i) + if err != nil { + log.Warn("preimport failed", WrapLogFields(p, zap.String("file", file.String()), zap.Error(err))...) + p.manager.Update(p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) + return err + } + log.Info("read file stat done", WrapLogFields(p, zap.Strings("files", file.GetPaths()), + zap.Duration("dur", time.Since(start)))...) + return nil + } + + futures := make([]*conc.Future[any], 0, len(files)) + for i, file := range files { + i := i + file := file + f := GetExecPool().Submit(func() (any, error) { + err := fn(i, file) + return err, err + }) + futures = append(futures, f) + } + return futures +} + +func (p *PreImportTask) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error { + fileSize, err := reader.Size() + if err != nil { + return err + } + maxSize := paramtable.Get().DataNodeCfg.MaxImportFileSizeInGB.GetAsFloat() * 1024 * 1024 * 1024 + if fileSize > int64(maxSize) { + return errors.New(fmt.Sprintf( + "The import file size has reached the maximum limit allowed for importing, "+ + "fileSize=%d, maxSize=%d", fileSize, int64(maxSize))) + } + + totalRows := 0 + totalSize := 0 + hashedStats := make(map[string]*datapb.PartitionImportStats) + for { + data, err := reader.Read() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err + } + err = CheckRowsEqual(task.GetSchema(), data) + if err != nil { + return err + } + rowsCount, err := GetRowsStats(task, data) + if err != nil { + return err + } + MergeHashedStats(rowsCount, hashedStats) + rows := data.GetRowNum() + size := data.GetMemorySize() + totalRows += rows + totalSize += size + log.Info("reading file stat...", WrapLogFields(task, zap.Int("readRows", rows), zap.Int("readSize", size))...) + } + + stat := &datapb.ImportFileStats{ + FileSize: fileSize, + TotalRows: int64(totalRows), + TotalMemorySize: int64(totalSize), + HashedStats: hashedStats, + } + p.manager.Update(task.GetTaskID(), UpdateFileStat(fileIdx, stat)) + return nil +} diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 1b66f157f9..8ade598bf1 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -418,7 +418,7 @@ func (node *DataNode) PreImport(ctx context.Context, req *datapb.PreImportReques return merr.Status(err), nil } - task := importv2.NewPreImportTask(req) + task := importv2.NewPreImportTask(req, node.importTaskMgr, node.chunkManager) node.importTaskMgr.Add(task) log.Info("datanode added preimport task") @@ -437,7 +437,7 @@ func (node *DataNode) ImportV2(ctx context.Context, req *datapb.ImportRequest) ( if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return merr.Status(err), nil } - task := importv2.NewImportTask(req) + task := importv2.NewImportTask(req, node.importTaskMgr, node.syncMgr, node.chunkManager) node.importTaskMgr.Add(task) log.Info("datanode added import task")