mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-03 04:19:18 +08:00
The GetIndexFilePaths interface judges whether the index is complete (#17052)
Signed-off-by: Cai.Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
77b0f290a8
commit
b37b87eb97
@ -579,7 +579,7 @@ func (i *IndexCoord) GetIndexFilePaths(ctx context.Context, req *indexpb.GetInde
|
||||
for _, buildID := range req.IndexBuildIDs {
|
||||
indexPathInfo, err := i.metaTable.GetIndexFilePathInfo(buildID)
|
||||
if err != nil {
|
||||
log.Warn("IndexCoord GetIndexFilePaths failed", zap.Int64("indexBuildID", buildID))
|
||||
log.Warn("IndexCoord GetIndexFilePaths failed", zap.Int64("indexBuildID", buildID), zap.Error(err))
|
||||
return &indexpb.GetIndexFilePathsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -350,3 +350,42 @@ func TestIndexCoord_NotHealthy(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp4.Status.ErrorCode)
|
||||
}
|
||||
|
||||
func TestIndexCoord_GetIndexFilePaths(t *testing.T) {
|
||||
ic := &IndexCoord{
|
||||
metaTable: &metaTable{
|
||||
indexBuildID2Meta: map[UniqueID]Meta{
|
||||
1: {
|
||||
indexMeta: &indexpb.IndexMeta{
|
||||
IndexBuildID: 1,
|
||||
State: commonpb.IndexState_Finished,
|
||||
IndexFilePaths: []string{"indexFiles-1", "indexFiles-2"},
|
||||
},
|
||||
},
|
||||
2: {
|
||||
indexMeta: &indexpb.IndexMeta{
|
||||
IndexBuildID: 2,
|
||||
State: commonpb.IndexState_Failed,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
ic.stateCode.Store(internalpb.StateCode_Healthy)
|
||||
|
||||
t.Run("GetIndexFilePaths success", func(t *testing.T) {
|
||||
resp, err := ic.GetIndexFilePaths(context.Background(), &indexpb.GetIndexFilePathsRequest{IndexBuildIDs: []UniqueID{1}})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, 1, len(resp.FilePaths))
|
||||
assert.ElementsMatch(t, resp.FilePaths[0].IndexFilePaths, []string{"indexFiles-1", "indexFiles-2"})
|
||||
})
|
||||
|
||||
t.Run("GetIndexFilePaths failed", func(t *testing.T) {
|
||||
resp, err := ic.GetIndexFilePaths(context.Background(), &indexpb.GetIndexFilePathsRequest{IndexBuildIDs: []UniqueID{2}})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
|
||||
assert.NotEqual(t, "", resp.Status.Reason)
|
||||
})
|
||||
}
|
||||
|
@ -322,6 +322,9 @@ func (mt *metaTable) GetIndexFilePathInfo(indexBuildID UniqueID) (*indexpb.Index
|
||||
if meta.indexMeta.MarkDeleted {
|
||||
return nil, fmt.Errorf("index not exists with ID = %d", indexBuildID)
|
||||
}
|
||||
if meta.indexMeta.State != commonpb.IndexState_Finished {
|
||||
return nil, fmt.Errorf("index not finished with ID = %d", indexBuildID)
|
||||
}
|
||||
ret.IndexFilePaths = meta.indexMeta.IndexFilePaths
|
||||
ret.SerializedSize = meta.indexMeta.GetSerializeSize()
|
||||
|
||||
|
@ -527,13 +527,8 @@ func (it *IndexBuildTask) saveIndex(ctx context.Context, blobs []*storage.Blob)
|
||||
return nil
|
||||
}
|
||||
|
||||
err := funcutil.ProcessFuncParallel(blobCnt, runtime.NumCPU(), saveIndexFile, "saveIndexFile")
|
||||
if err != nil {
|
||||
log.Warn("saveIndexFile to minio failed", zap.Error(err))
|
||||
// In this case, we intend not to return err, otherwise the task will be marked as failed.
|
||||
it.internalErr = err
|
||||
}
|
||||
return nil
|
||||
// If an error occurs, return the error that the task state will be set to retry.
|
||||
return funcutil.ProcessFuncParallel(blobCnt, runtime.NumCPU(), saveIndexFile, "saveIndexFile")
|
||||
}
|
||||
|
||||
func (it *IndexBuildTask) releaseMemory() {
|
||||
|
Loading…
Reference in New Issue
Block a user