mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
enhance: Remove adding import segments to the datanode (#31244)
With the presence of L0 segments, there's no longer a need to add import segments to the datanode. issue: https://github.com/milvus-io/milvus/issues/28521 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
afa4193fc2
commit
7d7ef388df
@ -268,12 +268,6 @@ func (c *importChecker) checkImportingJob(job ImportJob) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, segmentID := range unfinished {
|
for _, segmentID := range unfinished {
|
||||||
err = AddImportSegment(c.cluster, c.meta, segmentID)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("add import segment failed", zap.Int64("jobID", job.GetJobID()),
|
|
||||||
zap.Int64("collectionID", job.GetCollectionID()), zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.buildIndexCh <- segmentID // accelerate index building
|
c.buildIndexCh <- segmentID // accelerate index building
|
||||||
channelCP := c.meta.GetChannelCheckpoint(channels[segmentID])
|
channelCP := c.meta.GetChannelCheckpoint(channels[segmentID])
|
||||||
if channelCP == nil {
|
if channelCP == nil {
|
||||||
|
@ -180,8 +180,6 @@ func (s *ImportCheckerSuite) TestCheckJob() {
|
|||||||
}
|
}
|
||||||
sm := s.checker.sm.(*MockManager)
|
sm := s.checker.sm.(*MockManager)
|
||||||
sm.EXPECT().FlushImportSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
sm.EXPECT().FlushImportSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
cluster := s.checker.cluster.(*MockCluster)
|
|
||||||
cluster.EXPECT().AddImportSegment(mock.Anything, mock.Anything).Return(nil, nil)
|
|
||||||
catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil)
|
catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil)
|
||||||
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil)
|
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil)
|
||||||
catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
|
@ -30,7 +30,6 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
"github.com/milvus-io/milvus/internal/storage"
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
)
|
)
|
||||||
@ -247,25 +246,6 @@ func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats) [][]*dat
|
|||||||
return fileGroups
|
return fileGroups
|
||||||
}
|
}
|
||||||
|
|
||||||
func AddImportSegment(cluster Cluster, meta *meta, segmentID int64) error {
|
|
||||||
segment := meta.GetSegment(segmentID)
|
|
||||||
req := &datapb.AddImportSegmentRequest{
|
|
||||||
Base: commonpbutil.NewMsgBase(
|
|
||||||
commonpbutil.WithSourceID(paramtable.GetNodeID()),
|
|
||||||
),
|
|
||||||
SegmentId: segment.GetID(),
|
|
||||||
ChannelName: segment.GetInsertChannel(),
|
|
||||||
CollectionId: segment.GetCollectionID(),
|
|
||||||
PartitionId: segment.GetPartitionID(),
|
|
||||||
RowNum: segment.GetNumOfRows(),
|
|
||||||
StatsLog: segment.GetStatslogs(),
|
|
||||||
}
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
_, err := cluster.AddImportSegment(ctx, req)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func getPendingProgress(jobID int64, imeta ImportMeta) float32 {
|
func getPendingProgress(jobID int64, imeta ImportMeta) float32 {
|
||||||
tasks := imeta.GetTaskBy(WithJob(jobID), WithType(PreImportTaskType))
|
tasks := imeta.GetTaskBy(WithJob(jobID), WithType(PreImportTaskType))
|
||||||
preImportingFiles := lo.SumBy(tasks, func(task ImportTask) int {
|
preImportingFiles := lo.SumBy(tasks, func(task ImportTask) int {
|
||||||
|
@ -215,29 +215,6 @@ func TestImportUtil_RegroupImportFiles(t *testing.T) {
|
|||||||
assert.Equal(t, fileNum, total)
|
assert.Equal(t, fileNum, total)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestImportUtil_AddImportSegment(t *testing.T) {
|
|
||||||
cluster := NewMockCluster(t)
|
|
||||||
cluster.EXPECT().AddImportSegment(mock.Anything, mock.Anything).Return(nil, nil)
|
|
||||||
|
|
||||||
catalog := mocks.NewDataCoordCatalog(t)
|
|
||||||
catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil)
|
|
||||||
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil)
|
|
||||||
catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil)
|
|
||||||
catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil)
|
|
||||||
catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil)
|
|
||||||
|
|
||||||
meta, err := newMeta(context.TODO(), catalog, nil)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
segment := &SegmentInfo{
|
|
||||||
SegmentInfo: &datapb.SegmentInfo{ID: 1, IsImporting: true},
|
|
||||||
}
|
|
||||||
err = meta.AddSegment(context.Background(), segment)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
err = AddImportSegment(cluster, meta, segment.GetID())
|
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestImportUtil_DropImportTask(t *testing.T) {
|
func TestImportUtil_DropImportTask(t *testing.T) {
|
||||||
cluster := NewMockCluster(t)
|
cluster := NewMockCluster(t)
|
||||||
cluster.EXPECT().DropImport(mock.Anything, mock.Anything).Return(nil)
|
cluster.EXPECT().DropImport(mock.Anything, mock.Anything).Return(nil)
|
||||||
|
Loading…
Reference in New Issue
Block a user