enhance: restful support import jobs (#30343)

issue: #28521 #29732

include
1. list collection's import jobs
2. create a new import job
3. get the progress of an import job

fix:
1. mix the order of dbName & collectionName #29728
2. trace log keep the same as v1
3. support traceID
4. azure precheck, blob name cannot end with / #29703

---------

Signed-off-by: PowderLi <min.li@zilliz.com>
This commit is contained in:
PowderLi 2024-01-31 17:57:04 +08:00 committed by GitHub
parent 2f778d9649
commit 5cf9bb236e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 462 additions and 230 deletions

View File

@ -82,7 +82,7 @@ AzureBlobChunkManager::AzureBlobChunkManager(
try {
Azure::Core::Context context;
client_->GetBlobContainerClient("justforconnectioncheck")
.GetBlockBlobClient("")
.GetBlockBlobClient("justforconnectioncheck")
.GetProperties(Azure::Storage::Blobs::GetBlobPropertiesOptions(),
context);
} catch (const Azure::Storage::StorageException& e) {

View File

@ -12,7 +12,7 @@ const (
RoleCategory = "/roles/"
IndexCategory = "/indexes/"
AliasCategory = "/aliases/"
JobCategory = "/jobs/"
ImportJobCategory = "/jobs/import/"
ListAction = "list"
HasAction = "has"

File diff suppressed because it is too large Load Diff

View File

@ -50,7 +50,7 @@ func TestHTTPWrapper(t *testing.T) {
ginHandler := gin.Default()
app := ginHandler.Group("", genAuthMiddleWare(false))
path := "/wrapper/post"
app.POST(path, wrapperPost(func() any { return &DefaultReq{} }, func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) {
app.POST(path, wrapperPost(func() any { return &DefaultReq{} }, func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) {
return nil, nil
}))
postTestCases = append(postTestCases, requestBodyTestCase{
@ -58,7 +58,7 @@ func TestHTTPWrapper(t *testing.T) {
requestBody: []byte(`{}`),
})
path = "/wrapper/post/param"
app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) {
app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) {
return nil, nil
}))
postTestCases = append(postTestCases, requestBodyTestCase{
@ -71,6 +71,12 @@ func TestHTTPWrapper(t *testing.T) {
errMsg: "missing required parameters, error: Key: 'CollectionNameReq.CollectionName' Error:Field validation for 'CollectionName' failed on the 'required' tag",
errCode: 1802, // ErrMissingRequiredParameters
})
postTestCases = append(postTestCases, requestBodyTestCase{
path: path,
requestBody: []byte(``),
errMsg: "can only accept json format request, the request body should be nil, however {} is valid",
errCode: 1801, // ErrIncorrectParameterFormat
})
postTestCases = append(postTestCases, requestBodyTestCase{
path: path,
requestBody: []byte(`{"collectionName": "book", "dbName"}`),
@ -78,7 +84,7 @@ func TestHTTPWrapper(t *testing.T) {
errCode: 1801, // ErrIncorrectParameterFormat
})
path = "/wrapper/post/trace"
app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) {
app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) {
return nil, nil
})))
postTestCasesTrace = append(postTestCasesTrace, requestBodyTestCase{
@ -86,7 +92,7 @@ func TestHTTPWrapper(t *testing.T) {
requestBody: []byte(`{"collectionName": "` + DefaultCollectionName + `"}`),
})
path = "/wrapper/post/trace/wrong"
app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) {
app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) {
return nil, merr.ErrCollectionNotFound
})))
postTestCasesTrace = append(postTestCasesTrace, requestBodyTestCase{
@ -94,8 +100,8 @@ func TestHTTPWrapper(t *testing.T) {
requestBody: []byte(`{"collectionName": "` + DefaultCollectionName + `"}`),
})
path = "/wrapper/post/trace/call"
app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) {
return wrapperProxy(c, ctx, req, false, false, func(reqCtx *context.Context, req any) (any, error) {
app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) {
return wrapperProxy(ctx, c, req, false, false, func(reqctx context.Context, req any) (any, error) {
return nil, nil
})
})))
@ -150,17 +156,17 @@ func TestGrpcWrapper(t *testing.T) {
app := ginHandler.Group("")
appNeedAuth := ginHandler.Group(needAuthPrefix, genAuthMiddleWare(true))
path := "/wrapper/grpc/-0"
handle := func(reqCtx *context.Context, req any) (any, error) {
handle := func(reqctx context.Context, req any) (any, error) {
return nil, nil
}
app.GET(path, func(c *gin.Context) {
ctx := proxy.NewContextWithMetadata(c, "", DefaultDbName)
wrapperProxy(c, &ctx, &DefaultReq{}, false, false, handle)
wrapperProxy(ctx, c, &DefaultReq{}, false, false, handle)
})
appNeedAuth.GET(path, func(c *gin.Context) {
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), DefaultDbName)
wrapperProxy(c, &ctx, &milvuspb.DescribeCollectionRequest{}, true, false, handle)
wrapperProxy(ctx, c, &milvuspb.DescribeCollectionRequest{}, true, false, handle)
})
getTestCases = append(getTestCases, rawTestCase{
path: path,
@ -169,17 +175,17 @@ func TestGrpcWrapper(t *testing.T) {
path: needAuthPrefix + path,
})
path = "/wrapper/grpc/01"
handle = func(reqCtx *context.Context, req any) (any, error) {
handle = func(reqctx context.Context, req any) (any, error) {
return nil, merr.ErrNeedAuthenticate // 1800
}
app.GET(path, func(c *gin.Context) {
ctx := proxy.NewContextWithMetadata(c, "", DefaultDbName)
wrapperProxy(c, &ctx, &DefaultReq{}, false, false, handle)
wrapperProxy(ctx, c, &DefaultReq{}, false, false, handle)
})
appNeedAuth.GET(path, func(c *gin.Context) {
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), DefaultDbName)
wrapperProxy(c, &ctx, &milvuspb.DescribeCollectionRequest{}, true, false, handle)
wrapperProxy(ctx, c, &milvuspb.DescribeCollectionRequest{}, true, false, handle)
})
getTestCases = append(getTestCases, rawTestCase{
path: path,
@ -189,19 +195,19 @@ func TestGrpcWrapper(t *testing.T) {
path: needAuthPrefix + path,
})
path = "/wrapper/grpc/00"
handle = func(reqCtx *context.Context, req any) (any, error) {
handle = func(reqctx context.Context, req any) (any, error) {
return &milvuspb.BoolResponse{
Status: commonSuccessStatus,
}, nil
}
app.GET(path, func(c *gin.Context) {
ctx := proxy.NewContextWithMetadata(c, "", DefaultDbName)
wrapperProxy(c, &ctx, &DefaultReq{}, false, false, handle)
wrapperProxy(ctx, c, &DefaultReq{}, false, false, handle)
})
appNeedAuth.GET(path, func(c *gin.Context) {
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), DefaultDbName)
wrapperProxy(c, &ctx, &milvuspb.DescribeCollectionRequest{}, true, false, handle)
wrapperProxy(ctx, c, &milvuspb.DescribeCollectionRequest{}, true, false, handle)
})
getTestCases = append(getTestCases, rawTestCase{
path: path,
@ -210,7 +216,7 @@ func TestGrpcWrapper(t *testing.T) {
path: needAuthPrefix + path,
})
path = "/wrapper/grpc/10"
handle = func(reqCtx *context.Context, req any) (any, error) {
handle = func(reqctx context.Context, req any) (any, error) {
return &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_CollectionNameNotFound, // 28
@ -220,12 +226,12 @@ func TestGrpcWrapper(t *testing.T) {
}
app.GET(path, func(c *gin.Context) {
ctx := proxy.NewContextWithMetadata(c, "", DefaultDbName)
wrapperProxy(c, &ctx, &DefaultReq{}, false, false, handle)
wrapperProxy(ctx, c, &DefaultReq{}, false, false, handle)
})
appNeedAuth.GET(path, func(c *gin.Context) {
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), DefaultDbName)
wrapperProxy(c, &ctx, &milvuspb.DescribeCollectionRequest{}, true, false, handle)
wrapperProxy(ctx, c, &milvuspb.DescribeCollectionRequest{}, true, false, handle)
})
getTestCases = append(getTestCases, rawTestCase{
path: path,
@ -294,7 +300,7 @@ func TestTimeout(t *testing.T) {
headers: map[string]string{HTTPHeaderRequestTimeout: "5"},
})
path = "/middleware/timeout/10"
// app.GET(path, wrapper(wrapperTimeout(func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) {
// app.GET(path, wrapper(wrapperTimeout(func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) {
app.GET(path, timeoutMiddleware(func(c *gin.Context) {
time.Sleep(10 * time.Second)
}))
@ -310,7 +316,7 @@ func TestTimeout(t *testing.T) {
status: http.StatusRequestTimeout,
})
path = "/middleware/timeout/60"
// app.GET(path, wrapper(wrapperTimeout(func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) {
// app.GET(path, wrapper(wrapperTimeout(func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) {
app.GET(path, timeoutMiddleware(func(c *gin.Context) {
time.Sleep(60 * time.Second)
}))
@ -372,7 +378,7 @@ func TestDatabaseWrapper(t *testing.T) {
ginHandler := gin.Default()
app := ginHandler.Group("", genAuthMiddleWare(false))
path := "/wrapper/database"
app.POST(path, wrapperPost(func() any { return &DatabaseReq{} }, h.wrapperCheckDatabase(func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) {
app.POST(path, wrapperPost(func() any { return &DatabaseReq{} }, h.wrapperCheckDatabase(func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) {
return nil, nil
})))
postTestCases = append(postTestCases, requestBodyTestCase{
@ -656,6 +662,46 @@ func TestMethodGet(t *testing.T) {
Status: &StatusSuccess,
Alias: DefaultAliasName,
}, nil).Once()
mp.EXPECT().ListImportTasks(mock.Anything, mock.Anything).Return(&milvuspb.ListImportTasksResponse{
Status: &StatusSuccess,
Tasks: []*milvuspb.GetImportStateResponse{
{
Status: &StatusSuccess,
State: 6,
Infos: []*commonpb.KeyValuePair{
{Key: "collection", Value: DefaultCollectionName},
{Key: "partition", Value: DefaultPartitionName},
{Key: "persist_cost", Value: "0.23"},
{Key: "progress_percent", Value: "100"},
{Key: "failed_reason"},
},
Id: 1234567890,
},
{
Status: &StatusSuccess,
State: 0,
Infos: []*commonpb.KeyValuePair{
{Key: "collection", Value: DefaultCollectionName},
{Key: "partition", Value: DefaultPartitionName},
{Key: "progress_percent", Value: "0"},
{Key: "failed_reason", Value: "failed to get file size of "},
},
Id: 123456789,
},
},
}, nil).Once()
mp.EXPECT().GetImportState(mock.Anything, mock.Anything).Return(&milvuspb.GetImportStateResponse{
Status: &StatusSuccess,
State: 6,
Infos: []*commonpb.KeyValuePair{
{Key: "collection", Value: DefaultCollectionName},
{Key: "partition", Value: DefaultPartitionName},
{Key: "persist_cost", Value: "0.23"},
{Key: "progress_percent", Value: "100"},
{Key: "failed_reason"},
},
Id: 1234567890,
}, nil).Once()
testEngine := initHTTPServerV2(mp, false)
queryTestCases := []rawTestCase{}
@ -720,6 +766,12 @@ func TestMethodGet(t *testing.T) {
queryTestCases = append(queryTestCases, rawTestCase{
path: versionalV2(AliasCategory, DescribeAction),
})
queryTestCases = append(queryTestCases, rawTestCase{
path: versionalV2(ImportJobCategory, ListAction),
})
queryTestCases = append(queryTestCases, rawTestCase{
path: versionalV2(ImportJobCategory, GetProgressAction),
})
for _, testcase := range queryTestCases {
t.Run("query", func(t *testing.T) {
@ -729,7 +781,8 @@ func TestMethodGet(t *testing.T) {
`"indexName": "` + DefaultIndexName + `",` +
`"userName": "` + util.UserRoot + `",` +
`"roleName": "` + util.RoleAdmin + `",` +
`"aliasName": "` + DefaultAliasName + `"` +
`"aliasName": "` + DefaultAliasName + `",` +
`"taskID": 1234567890` +
`}`))
req := httptest.NewRequest(http.MethodPost, testcase.path, bodyReader)
w := httptest.NewRecorder()
@ -829,6 +882,7 @@ func TestMethodPost(t *testing.T) {
mp.EXPECT().CreateIndex(mock.Anything, mock.Anything).Return(commonErrorStatus, nil).Once()
mp.EXPECT().CreateAlias(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once()
mp.EXPECT().AlterAlias(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once()
mp.EXPECT().Import(mock.Anything, mock.Anything).Return(&milvuspb.ImportResponse{Status: commonSuccessStatus, Tasks: []int64{int64(1234567890)}}, nil).Once()
testEngine := initHTTPServerV2(mp, false)
queryTestCases := []rawTestCase{}
queryTestCases = append(queryTestCases, rawTestCase{
@ -887,6 +941,9 @@ func TestMethodPost(t *testing.T) {
queryTestCases = append(queryTestCases, rawTestCase{
path: versionalV2(AliasCategory, AlterAction),
})
queryTestCases = append(queryTestCases, rawTestCase{
path: versionalV2(ImportJobCategory, CreateAction),
})
for _, testcase := range queryTestCases {
t.Run("query", func(t *testing.T) {
@ -897,7 +954,8 @@ func TestMethodPost(t *testing.T) {
`"indexParams": [{"indexName": "` + DefaultIndexName + `", "fieldName": "book_intro", "metricsType": "L2", "indexType": "IVF_FLAT"}],` +
`"userName": "` + util.UserRoot + `", "password": "Milvus", "newPassword": "milvus", "roleName": "` + util.RoleAdmin + `",` +
`"roleName": "` + util.RoleAdmin + `", "objectType": "Global", "objectName": "*", "privilege": "*",` +
`"aliasName": "` + DefaultAliasName + `"` +
`"aliasName": "` + DefaultAliasName + `",` +
`"files": ["book.json"]` +
`}`))
req := httptest.NewRequest(http.MethodPost, testcase.path, bodyReader)
w := httptest.NewRecorder()

View File

@ -17,7 +17,8 @@ func (req *DatabaseReq) GetDbName() string { return req.DbName }
type CollectionNameReq struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
PartitionNames []string `json:"partitionNames"`
Limit int32 `json:"limit"` // list import jobs
PartitionNames []string `json:"partitionNames"` // get partitions load state
}
func (req *CollectionNameReq) GetDbName() string {
@ -28,6 +29,10 @@ func (req *CollectionNameReq) GetCollectionName() string {
return req.CollectionName
}
func (req *CollectionNameReq) GetLimit() int32 {
return req.Limit
}
func (req *CollectionNameReq) GetPartitionNames() []string {
return req.PartitionNames
}
@ -52,6 +57,30 @@ func (req *PartitionReq) GetDbName() string { return req.DbName }
func (req *PartitionReq) GetCollectionName() string { return req.CollectionName }
func (req *PartitionReq) GetPartitionName() string { return req.PartitionName }
type DataFilesReq struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
Files []string `json:"files" binding:"required"`
}
func (req *DataFilesReq) GetDbName() string {
return req.DbName
}
func (req *DataFilesReq) GetCollectionName() string {
return req.CollectionName
}
func (req *DataFilesReq) GetFileNames() []string {
return req.Files
}
type TaskIDReq struct {
TaskID int64 `json:"taskID" binding:"required"`
}
func (req *TaskIDReq) GetTaskID() int64 { return req.TaskID }
type QueryReqV2 struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
@ -140,6 +169,15 @@ type IndexNameGetter interface {
type AliasNameGetter interface {
GetAliasName() string
}
type LimitGetter interface {
GetLimit() int32
}
type FileNamesGetter interface {
GetFileNames() []string
}
type TaskIDGetter interface {
GetTaskID() int64
}
type PasswordReq struct {
UserName string `json:"userName"`

View File

@ -172,6 +172,25 @@ func (s *Server) startHTTPServer(errChan chan error) {
ginHandler := gin.New()
ginLogger := gin.LoggerWithConfig(gin.LoggerConfig{
SkipPaths: proxy.Params.ProxyCfg.GinLogSkipPaths.GetAsStrings(),
Formatter: func(param gin.LogFormatterParams) string {
if param.Latency > time.Minute {
param.Latency = param.Latency.Truncate(time.Second)
}
traceID, ok := param.Keys["traceID"]
if !ok {
traceID = ""
}
return fmt.Sprintf("[%v] [GIN] [%s] [traceID=%s] [code=%3d] [latency=%v] [client=%s] [method=%s] [error=%s]\n",
param.TimeStamp.Format("2006/01/02 15:04:05.000 Z07:00"),
param.Path,
traceID,
param.StatusCode,
param.Latency,
param.ClientIP,
param.Method,
param.ErrorMessage,
)
},
})
ginHandler.Use(ginLogger, gin.Recovery())
httpHeaderAllowInt64 := "false"