fix: Check if the import job exists (#33672) (#33673)

issue: https://github.com/milvus-io/milvus/issues/33671

pr: https://github.com/milvus-io/milvus/pull/33672

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-06-10 21:50:29 +08:00 committed by GitHub
parent ed1dee9e38
commit b71a404776
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 22 additions and 2 deletions

View File

@ -382,6 +382,9 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) (float32, i
func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, int64, int64, string) {
job := imeta.GetJob(jobID)
if job == nil {
return 0, internalpb.ImportJobState_Failed, 0, 0, fmt.Sprintf("import job does not exist, jobID=%d", jobID)
}
switch job.GetState() {
case internalpb.ImportJobState_Pending:
progress := getPendingProgress(jobID, imeta)

View File

@ -538,6 +538,12 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
assert.Equal(t, internalpb.ImportJobState_Failed, state)
assert.Equal(t, mockErr, reason)
// job does not exist
progress, state, _, _, reason = GetJobProgress(-1, imeta, meta)
assert.Equal(t, int64(0), progress)
assert.Equal(t, internalpb.ImportJobState_Failed, state)
assert.NotEqual(t, "", reason)
// pending state
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Pending))
assert.NoError(t, err)

View File

@ -1758,6 +1758,10 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport
return resp, nil
}
job := s.importMeta.GetJob(jobID)
if job == nil {
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("import job does not exist, jobID=%d", jobID)))
return resp, nil
}
progress, state, importedRows, totalRows, reason := GetJobProgress(jobID, s.importMeta, s.meta)
resp.State = state
resp.Reason = reason

View File

@ -1698,7 +1698,7 @@ func TestImportV2(t *testing.T) {
assert.NoError(t, err)
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))
// normal case
// job does not exist
catalog := mocks.NewDataCoordCatalog(t)
catalog.EXPECT().ListImportJobs().Return(nil, nil)
catalog.EXPECT().ListPreImportTasks().Return(nil, nil)
@ -1706,6 +1706,13 @@ func TestImportV2(t *testing.T) {
catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil)
s.importMeta, err = NewImportMeta(catalog)
assert.NoError(t, err)
resp, err = s.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{
JobID: "-1",
})
assert.NoError(t, err)
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))
// normal case
var job ImportJob = &importJob{
ImportJob: &datapb.ImportJob{
JobID: 0,

View File

@ -152,7 +152,7 @@ func Test_PickSegment(t *testing.T) {
importedSize := map[int64]int{}
totalSize := 8 * 1024 * 1024 * 1024
batchSize := 16 * 1024 * 1024
batchSize := 1 * 1024 * 1024
for totalSize > 0 {
picked := PickSegment(task.req.GetRequestSegments(), vchannel, partitionID)