From 5cf9bb236e2b84add831b9f56e7d00c0c2c0203e Mon Sep 17 00:00:00 2001 From: PowderLi <135960789+PowderLi@users.noreply.github.com> Date: Wed, 31 Jan 2024 17:57:04 +0800 Subject: [PATCH] enhance: restful support import jobs (#30343) issue: #28521 #29732 include 1. list collection's import jobs 2. create a new import job 3. get the progress of an import job fix: 1. mix the order of dbName & collectionName #29728 2. trace log keep the same as v1 3. support traceID 4. azure precheck, blob name cannot end with / #29703 --------- Signed-off-by: PowderLi --- .../AzureBlobChunkManager.cpp | 2 +- .../distributed/proxy/httpserver/constant.go | 2 +- .../proxy/httpserver/handler_v2.go | 525 +++++++++++------- .../proxy/httpserver/handler_v2_test.go | 104 +++- .../proxy/httpserver/request_v2.go | 40 +- internal/distributed/proxy/service.go | 19 + 6 files changed, 462 insertions(+), 230 deletions(-) diff --git a/internal/core/src/storage/azure-blob-storage/AzureBlobChunkManager.cpp b/internal/core/src/storage/azure-blob-storage/AzureBlobChunkManager.cpp index 42625b152a..9db84eedc7 100644 --- a/internal/core/src/storage/azure-blob-storage/AzureBlobChunkManager.cpp +++ b/internal/core/src/storage/azure-blob-storage/AzureBlobChunkManager.cpp @@ -82,7 +82,7 @@ AzureBlobChunkManager::AzureBlobChunkManager( try { Azure::Core::Context context; client_->GetBlobContainerClient("justforconnectioncheck") - .GetBlockBlobClient("") + .GetBlockBlobClient("justforconnectioncheck") .GetProperties(Azure::Storage::Blobs::GetBlobPropertiesOptions(), context); } catch (const Azure::Storage::StorageException& e) { diff --git a/internal/distributed/proxy/httpserver/constant.go b/internal/distributed/proxy/httpserver/constant.go index 558adddec0..d8ad07e82f 100644 --- a/internal/distributed/proxy/httpserver/constant.go +++ b/internal/distributed/proxy/httpserver/constant.go @@ -12,7 +12,7 @@ const ( RoleCategory = "/roles/" IndexCategory = "/indexes/" AliasCategory = "/aliases/" - JobCategory = "/jobs/" + ImportJobCategory = "/jobs/import/" ListAction = "list" HasAction = "has" diff --git a/internal/distributed/proxy/httpserver/handler_v2.go b/internal/distributed/proxy/httpserver/handler_v2.go index 3bf59aac0d..755be97720 100644 --- a/internal/distributed/proxy/httpserver/handler_v2.go +++ b/internal/distributed/proxy/httpserver/handler_v2.go @@ -3,8 +3,10 @@ package httpserver import ( "context" "encoding/json" + "io" "net/http" "strconv" + "time" "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" @@ -12,6 +14,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/tidwall/gjson" "go.uber.org/zap" + "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -93,8 +96,8 @@ func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) { router.POST(UserCategory+GrantRoleAction, timeoutMiddleware(wrapperPost(func() any { return &UserRoleReq{} }, wrapperTraceLog(h.addRoleToUser)))) router.POST(UserCategory+RevokeRoleAction, timeoutMiddleware(wrapperPost(func() any { return &UserRoleReq{} }, wrapperTraceLog(h.removeRoleFromUser)))) - router.POST(RoleCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &DatabaseReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listRoles))))) - router.POST(RoleCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &RoleReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.describeRole))))) + router.POST(RoleCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &DatabaseReq{} }, wrapperTraceLog(h.listRoles)))) + router.POST(RoleCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &RoleReq{} }, wrapperTraceLog(h.describeRole)))) router.POST(RoleCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &RoleReq{} }, wrapperTraceLog(h.createRole)))) router.POST(RoleCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &RoleReq{} }, wrapperTraceLog(h.dropRole)))) @@ -114,23 +117,32 @@ func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) { router.POST(AliasCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &AliasCollectionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createAlias))))) router.POST(AliasCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &AliasReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.dropAlias))))) router.POST(AliasCategory+AlterAction, timeoutMiddleware(wrapperPost(func() any { return &AliasCollectionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.alterAlias))))) + + router.POST(ImportJobCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listImportJob))))) + router.POST(ImportJobCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &DataFilesReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createImportJob))))) + router.POST(ImportJobCategory+GetProgressAction, timeoutMiddleware(wrapperPost(func() any { return &TaskIDReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess))))) } type ( newReqFunc func() any - handlerFuncV4 func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) + handlerFuncV2 func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) ) -func wrapperPost(newReq newReqFunc, v2 handlerFuncV4) gin.HandlerFunc { +func wrapperPost(newReq newReqFunc, v2 handlerFuncV2) gin.HandlerFunc { return func(c *gin.Context) { req := newReq() if err := c.ShouldBindBodyWith(req, binding.JSON); err != nil { - log.Warn("high level restful api, the parameter of create collection is incorrect", zap.Any("request", req), zap.Error(err)) + log.Warn("high level restful api, read parameters from request body fail", zap.Any("request", req), zap.Error(err)) if _, ok := err.(validator.ValidationErrors); ok { c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", error: " + err.Error(), }) + } else if err == io.EOF { + c.AbortWithStatusJSON(http.StatusOK, gin.H{ + HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), + HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", the request body should be nil, however {} is valid", + }) } else { c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), @@ -139,7 +151,7 @@ func wrapperPost(newReq newReqFunc, v2 handlerFuncV4) gin.HandlerFunc { } return } - log.Debug("[wrapper post]bind post request", zap.Any("req", req)) + log.Debug("high level restful api, read parameters from request body", zap.Any("request", req)) dbName := "" if getter, ok := req.(requestutil.DBNameGetter); ok { dbName = getter.GetDbName() @@ -149,48 +161,55 @@ func wrapperPost(newReq newReqFunc, v2 handlerFuncV4) gin.HandlerFunc { } username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), dbName) - v2(c, &ctx, req, dbName) + traceID := strconv.FormatInt(time.Now().UnixNano(), 10) + ctx = log.WithTraceID(ctx, traceID) + c.Keys["traceID"] = traceID + v2(ctx, c, req, dbName) } } -func wrapperTraceLog(v2 handlerFuncV4) handlerFuncV4 { - return func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) { - log.Debug("[wrapper trace log]bind post request", zap.Any("req", req)) +func wrapperTraceLog(v2 handlerFuncV2) handlerFuncV2 { + return func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) { switch proxy.Params.CommonCfg.TraceLogMode.GetAsInt() { case 1: // simple info - var fields []zap.Field - fields = append(fields, zap.String("request_name", c.Request.Method)) - log.Ctx(*ctx).Info("trace info: simple", fields...) + fields := proxy.GetRequestBaseInfo(ctx, req, &grpc.UnaryServerInfo{ + FullMethod: c.Request.URL.Path, + }, false) + log.Ctx(ctx).Info("trace info: simple", fields...) case 2: // detail info - var fields []zap.Field - fields = append(fields, zap.String("request_name", c.Request.Method)) - log.Ctx(*ctx).Info("trace info: detail", fields...) + fields := proxy.GetRequestBaseInfo(ctx, req, &grpc.UnaryServerInfo{ + FullMethod: c.Request.URL.Path, + }, true) + fields = append(fields, proxy.GetRequestFieldWithoutSensitiveInfo(req)) + log.Ctx(ctx).Info("trace info: detail", fields...) case 3: // detail info with request and response - var fields []zap.Field - fields = append(fields, zap.String("request_name", c.Request.Method)) - log.Ctx(*ctx).Info("trace info: all request", fields...) + fields := proxy.GetRequestBaseInfo(ctx, req, &grpc.UnaryServerInfo{ + FullMethod: c.Request.URL.Path, + }, true) + fields = append(fields, proxy.GetRequestFieldWithoutSensitiveInfo(req)) + log.Ctx(ctx).Info("trace info: all request", fields...) } - resp, err := v2(c, ctx, req, dbName) + resp, err := v2(ctx, c, req, dbName) if proxy.Params.CommonCfg.TraceLogMode.GetAsInt() > 2 { if err != nil { - log.Ctx(*ctx).Info("trace info: all, error", zap.Error(err)) + log.Ctx(ctx).Info("trace info: all, error", zap.Error(err)) } else { - log.Ctx(*ctx).Info("trace info: all, unknown", zap.Any("resp", resp)) + log.Ctx(ctx).Info("trace info: all, unknown", zap.Any("resp", resp)) } } return resp, err } } -func wrapperProxy(c *gin.Context, ctx *context.Context, req any, checkAuth bool, ignoreErr bool, handler func(reqCtx *context.Context, req any) (any, error)) (interface{}, error) { +func wrapperProxy(ctx context.Context, c *gin.Context, req any, checkAuth bool, ignoreErr bool, handler func(reqCtx context.Context, req any) (any, error)) (interface{}, error) { if checkAuth { - err := checkAuthorization(*ctx, c, req) + err := checkAuthorization(ctx, c, req) if err != nil { return nil, err } } // todo delete the message - log.Debug("todo grpc call", zap.Any("request", req)) + log.Ctx(ctx).Debug("high level restful api, try to do a grpc call", zap.Any("request", req)) response, err := handler(ctx, req) if err == nil { status, ok := requestutil.GetStatusFromResponse(response) @@ -199,7 +218,7 @@ func wrapperProxy(c *gin.Context, ctx *context.Context, req any, checkAuth bool, } } if err != nil { - log.Warn("did grpc call, but fail with error", zap.Error(err), zap.Any("request", req)) + log.Ctx(ctx).Warn("high level restful api, grpc call failed", zap.Error(err), zap.Any("request", req)) if !ignoreErr { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } @@ -207,23 +226,23 @@ func wrapperProxy(c *gin.Context, ctx *context.Context, req any, checkAuth bool, return response, err } -func (h *HandlersV2) wrapperCheckDatabase(v2 handlerFuncV4) handlerFuncV4 { - return func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) { - if dbName == DefaultDbName || proxy.CheckDatabase(*ctx, dbName) { - return v2(c, ctx, req, dbName) +func (h *HandlersV2) wrapperCheckDatabase(v2 handlerFuncV2) handlerFuncV2 { + return func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) { + if dbName == DefaultDbName || proxy.CheckDatabase(ctx, dbName) { + return v2(ctx, c, req, dbName) } - resp, err := wrapperProxy(c, ctx, req, false, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.ListDatabases(*reqCtx, &milvuspb.ListDatabasesRequest{}) + resp, err := wrapperProxy(ctx, c, req, false, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.ListDatabases(reqCtx, &milvuspb.ListDatabasesRequest{}) }) if err != nil { return resp, err } for _, db := range resp.(*milvuspb.ListDatabasesResponse).DbNames { if db == dbName { - return v2(c, ctx, req, dbName) + return v2(ctx, c, req, dbName) } } - log.Warn("non-exist database", zap.String("database", dbName)) + log.Ctx(ctx).Warn("high level restful api, non-exist database", zap.String("database", dbName), zap.Any("request", req)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrDatabaseNotFound), HTTPReturnMessage: merr.ErrDatabaseNotFound.Error() + ", database: " + dbName, @@ -232,18 +251,18 @@ func (h *HandlersV2) wrapperCheckDatabase(v2 handlerFuncV4) handlerFuncV4 { } } -func (h *HandlersV2) hasCollection(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) hasCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { getter, _ := anyReq.(requestutil.CollectionNameGetter) collectionName := getter.GetCollectionName() - _, err := proxy.GetCachedCollectionSchema(*ctx, dbName, collectionName) + _, err := proxy.GetCachedCollectionSchema(ctx, dbName, collectionName) has := true if err != nil { req := &milvuspb.HasCollectionRequest{ DbName: dbName, CollectionName: collectionName, } - resp, err := wrapperProxy(c, ctx, req, false, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.HasCollection(*reqCtx, req.(*milvuspb.HasCollectionRequest)) + resp, err := wrapperProxy(ctx, c, req, false, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.HasCollection(reqCtx, req.(*milvuspb.HasCollectionRequest)) }) if err != nil { return nil, err @@ -251,15 +270,15 @@ func (h *HandlersV2) hasCollection(c *gin.Context, ctx *context.Context, anyReq has = resp.(*milvuspb.BoolResponse).Value } c.JSON(http.StatusOK, wrapperReturnHas(has)) - return nil, nil + return has, nil } -func (h *HandlersV2) listCollections(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) listCollections(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { req := &milvuspb.ShowCollectionsRequest{ DbName: dbName, } - resp, err := wrapperProxy(c, ctx, req, false, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.ShowCollections(*reqCtx, req.(*milvuspb.ShowCollectionsRequest)) + resp, err := wrapperProxy(ctx, c, req, false, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.ShowCollections(reqCtx, req.(*milvuspb.ShowCollectionsRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnList(resp.(*milvuspb.ShowCollectionsResponse).CollectionNames)) @@ -267,15 +286,15 @@ func (h *HandlersV2) listCollections(c *gin.Context, ctx *context.Context, anyRe return resp, err } -func (h *HandlersV2) getCollectionDetails(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) getCollectionDetails(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) collectionName := collectionGetter.GetCollectionName() req := &milvuspb.DescribeCollectionRequest{ DbName: dbName, CollectionName: collectionName, } - resp, err := wrapperProxy(c, ctx, req, false, false, func(reqCtx *context.Context, req any) (any, error) { - return h.proxy.DescribeCollection(*reqCtx, req.(*milvuspb.DescribeCollectionRequest)) + resp, err := wrapperProxy(ctx, c, req, false, false, func(reqCtx context.Context, req any) (any, error) { + return h.proxy.DescribeCollection(reqCtx, req.(*milvuspb.DescribeCollectionRequest)) }) if err != nil { return resp, err @@ -284,7 +303,7 @@ func (h *HandlersV2) getCollectionDetails(c *gin.Context, ctx *context.Context, primaryField, ok := getPrimaryField(coll.Schema) autoID := false if !ok { - log.Warn("get primary field from collection schema fail", zap.Any("collection schema", coll.Schema)) + log.Ctx(ctx).Warn("high level restful api, get primary field from collection schema fail", zap.Any("collection schema", coll.Schema)) } else { autoID = primaryField.AutoID } @@ -292,8 +311,8 @@ func (h *HandlersV2) getCollectionDetails(c *gin.Context, ctx *context.Context, DbName: dbName, CollectionName: collectionName, } - stateResp, err := wrapperProxy(c, ctx, loadStateReq, h.checkAuth, true, func(reqCtx *context.Context, req any) (any, error) { - return h.proxy.GetLoadState(*reqCtx, req.(*milvuspb.GetLoadStateRequest)) + stateResp, err := wrapperProxy(ctx, c, loadStateReq, h.checkAuth, true, func(reqCtx context.Context, req any) (any, error) { + return h.proxy.GetLoadState(reqCtx, req.(*milvuspb.GetLoadStateRequest)) }) collLoadState := "" if err == nil { @@ -312,8 +331,8 @@ func (h *HandlersV2) getCollectionDetails(c *gin.Context, ctx *context.Context, CollectionName: collectionName, FieldName: vectorField, } - indexResp, err := wrapperProxy(c, ctx, descIndexReq, false, true, func(reqCtx *context.Context, req any) (any, error) { - return h.proxy.DescribeIndex(*reqCtx, req.(*milvuspb.DescribeIndexRequest)) + indexResp, err := wrapperProxy(ctx, c, descIndexReq, false, true, func(reqCtx context.Context, req any) (any, error) { + return h.proxy.DescribeIndex(reqCtx, req.(*milvuspb.DescribeIndexRequest)) }) if err == nil { indexDesc = printIndexes(indexResp.(*milvuspb.DescribeIndexResponse).IndexDescriptions) @@ -331,14 +350,14 @@ func (h *HandlersV2) getCollectionDetails(c *gin.Context, ctx *context.Context, return resp, nil } -func (h *HandlersV2) getCollectionStats(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) getCollectionStats(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) req := &milvuspb.GetCollectionStatisticsRequest{ DbName: dbName, CollectionName: collectionGetter.GetCollectionName(), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (any, error) { - return h.proxy.GetCollectionStatistics(*reqCtx, req.(*milvuspb.GetCollectionStatisticsRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (any, error) { + return h.proxy.GetCollectionStatistics(reqCtx, req.(*milvuspb.GetCollectionStatisticsRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnRowCount(resp.(*milvuspb.GetCollectionStatisticsResponse).Stats)) @@ -346,14 +365,14 @@ func (h *HandlersV2) getCollectionStats(c *gin.Context, ctx *context.Context, an return resp, err } -func (h *HandlersV2) getCollectionLoadState(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) getCollectionLoadState(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) req := &milvuspb.GetLoadStateRequest{ DbName: dbName, CollectionName: collectionGetter.GetCollectionName(), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (any, error) { - return h.proxy.GetLoadState(*reqCtx, req.(*milvuspb.GetLoadStateRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (any, error) { + return h.proxy.GetLoadState(reqCtx, req.(*milvuspb.GetLoadStateRequest)) }) if err != nil { return resp, err @@ -374,8 +393,8 @@ func (h *HandlersV2) getCollectionLoadState(c *gin.Context, ctx *context.Context PartitionNames: partitionsGetter.GetPartitionNames(), DbName: dbName, } - progressResp, err := wrapperProxy(c, ctx, progressReq, h.checkAuth, true, func(reqCtx *context.Context, req any) (any, error) { - return h.proxy.GetLoadingProgress(*reqCtx, req.(*milvuspb.GetLoadingProgressRequest)) + progressResp, err := wrapperProxy(ctx, c, progressReq, h.checkAuth, true, func(reqCtx context.Context, req any) (any, error) { + return h.proxy.GetLoadingProgress(reqCtx, req.(*milvuspb.GetLoadingProgressRequest)) }) progress := int64(-1) if err == nil { @@ -392,14 +411,14 @@ func (h *HandlersV2) getCollectionLoadState(c *gin.Context, ctx *context.Context return resp, err } -func (h *HandlersV2) dropCollection(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) dropCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { getter, _ := anyReq.(requestutil.CollectionNameGetter) req := &milvuspb.DropCollectionRequest{ DbName: dbName, CollectionName: getter.GetCollectionName(), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.DropCollection(*reqCtx, req.(*milvuspb.DropCollectionRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.DropCollection(reqCtx, req.(*milvuspb.DropCollectionRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -407,7 +426,7 @@ func (h *HandlersV2) dropCollection(c *gin.Context, ctx *context.Context, anyReq return resp, err } -func (h *HandlersV2) renameCollection(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) renameCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*RenameCollectionReq) req := &milvuspb.RenameCollectionRequest{ DbName: dbName, @@ -415,8 +434,8 @@ func (h *HandlersV2) renameCollection(c *gin.Context, ctx *context.Context, anyR NewName: httpReq.NewCollectionName, NewDBName: dbName, } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.RenameCollection(*reqCtx, req.(*milvuspb.RenameCollectionRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.RenameCollection(reqCtx, req.(*milvuspb.RenameCollectionRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -424,14 +443,14 @@ func (h *HandlersV2) renameCollection(c *gin.Context, ctx *context.Context, anyR return resp, err } -func (h *HandlersV2) loadCollection(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) loadCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { getter, _ := anyReq.(requestutil.CollectionNameGetter) req := &milvuspb.LoadCollectionRequest{ DbName: dbName, CollectionName: getter.GetCollectionName(), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.LoadCollection(*reqCtx, req.(*milvuspb.LoadCollectionRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.LoadCollection(reqCtx, req.(*milvuspb.LoadCollectionRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -439,14 +458,14 @@ func (h *HandlersV2) loadCollection(c *gin.Context, ctx *context.Context, anyReq return resp, err } -func (h *HandlersV2) releaseCollection(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) releaseCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { getter, _ := anyReq.(requestutil.CollectionNameGetter) req := &milvuspb.ReleaseCollectionRequest{ DbName: dbName, CollectionName: getter.GetCollectionName(), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.ReleaseCollection(*reqCtx, req.(*milvuspb.ReleaseCollectionRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.ReleaseCollection(reqCtx, req.(*milvuspb.ReleaseCollectionRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -454,7 +473,7 @@ func (h *HandlersV2) releaseCollection(c *gin.Context, ctx *context.Context, any return resp, err } -func (h *HandlersV2) query(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) query(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*QueryReqV2) req := &milvuspb.QueryRequest{ DbName: dbName, @@ -471,15 +490,15 @@ func (h *HandlersV2) query(c *gin.Context, ctx *context.Context, anyReq any, dbN if httpReq.Limit > 0 { req.QueryParams = append(req.QueryParams, &commonpb.KeyValuePair{Key: ParamLimit, Value: strconv.FormatInt(int64(httpReq.Limit), 10)}) } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.Query(*reqCtx, req.(*milvuspb.QueryRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.Query(reqCtx, req.(*milvuspb.QueryRequest)) }) if err == nil { queryResp := resp.(*milvuspb.QueryResults) allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64)) outputData, err := buildQueryResp(int64(0), queryResp.OutputFields, queryResp.FieldsData, nil, nil, allowJS) if err != nil { - log.Warn("high level restful api, fail to deal with query result", zap.Any("response", resp), zap.Error(err)) + log.Ctx(ctx).Warn("high level restful api, fail to deal with query result", zap.Any("response", resp), zap.Error(err)) c.JSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult), HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(), @@ -491,9 +510,9 @@ func (h *HandlersV2) query(c *gin.Context, ctx *context.Context, anyReq any, dbN return resp, err } -func (h *HandlersV2) get(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) get(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*CollectionIDOutputReq) - collSchema, err := h.GetCollectionSchema(c, ctx, dbName, httpReq.CollectionName) + collSchema, err := h.GetCollectionSchema(ctx, c, dbName, httpReq.CollectionName) if err != nil { return nil, err } @@ -514,15 +533,15 @@ func (h *HandlersV2) get(c *gin.Context, ctx *context.Context, anyReq any, dbNam GuaranteeTimestamp: BoundedTimestamp, Expr: filter, } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.Query(*reqCtx, req.(*milvuspb.QueryRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.Query(reqCtx, req.(*milvuspb.QueryRequest)) }) if err == nil { queryResp := resp.(*milvuspb.QueryResults) allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64)) outputData, err := buildQueryResp(int64(0), queryResp.OutputFields, queryResp.FieldsData, nil, nil, allowJS) if err != nil { - log.Warn("high level restful api, fail to deal with get result", zap.Any("response", resp), zap.Error(err)) + log.Ctx(ctx).Warn("high level restful api, fail to deal with get result", zap.Any("response", resp), zap.Error(err)) c.JSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult), HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(), @@ -534,9 +553,9 @@ func (h *HandlersV2) get(c *gin.Context, ctx *context.Context, anyReq any, dbNam return resp, err } -func (h *HandlersV2) delete(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) delete(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*CollectionIDFilterReq) - collSchema, err := h.GetCollectionSchema(c, ctx, dbName, httpReq.CollectionName) + collSchema, err := h.GetCollectionSchema(ctx, c, dbName, httpReq.CollectionName) if err != nil { return nil, err } @@ -558,8 +577,8 @@ func (h *HandlersV2) delete(c *gin.Context, ctx *context.Context, anyReq any, db } req.Expr = filter } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.Delete(*reqCtx, req.(*milvuspb.DeleteRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.Delete(reqCtx, req.(*milvuspb.DeleteRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -567,16 +586,16 @@ func (h *HandlersV2) delete(c *gin.Context, ctx *context.Context, anyReq any, db return resp, err } -func (h *HandlersV2) insert(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) insert(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*CollectionDataReq) - collSchema, err := h.GetCollectionSchema(c, ctx, dbName, httpReq.CollectionName) + collSchema, err := h.GetCollectionSchema(ctx, c, dbName, httpReq.CollectionName) if err != nil { return nil, err } body, _ := c.Get(gin.BodyBytesKey) err, httpReq.Data = checkAndSetData(string(body.([]byte)), collSchema) if err != nil { - log.Warn("high level restful api, fail to deal with insert data", zap.Any("body", body), zap.Error(err)) + log.Ctx(ctx).Warn("high level restful api, fail to deal with insert data", zap.Any("body", body), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData), HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(), @@ -592,15 +611,15 @@ func (h *HandlersV2) insert(c *gin.Context, ctx *context.Context, anyReq any, db } req.FieldsData, err = anyToColumns(httpReq.Data, collSchema) if err != nil { - log.Warn("high level restful api, fail to deal with insert data", zap.Any("data", httpReq.Data), zap.Error(err)) + log.Ctx(ctx).Warn("high level restful api, fail to deal with insert data", zap.Any("data", httpReq.Data), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData), HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(), }) return nil, err } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.Insert(*reqCtx, req.(*milvuspb.InsertRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.Insert(reqCtx, req.(*milvuspb.InsertRequest)) }) if err == nil { insertResp := resp.(*milvuspb.MutationResult) @@ -624,9 +643,9 @@ func (h *HandlersV2) insert(c *gin.Context, ctx *context.Context, anyReq any, db return resp, err } -func (h *HandlersV2) upsert(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) upsert(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*CollectionDataReq) - collSchema, err := h.GetCollectionSchema(c, ctx, dbName, httpReq.CollectionName) + collSchema, err := h.GetCollectionSchema(ctx, c, dbName, httpReq.CollectionName) if err != nil { return nil, err } @@ -638,7 +657,7 @@ func (h *HandlersV2) upsert(c *gin.Context, ctx *context.Context, anyReq any, db body, _ := c.Get(gin.BodyBytesKey) err, httpReq.Data = checkAndSetData(string(body.([]byte)), collSchema) if err != nil { - log.Warn("high level restful api, fail to deal with upsert data", zap.Any("body", body), zap.Error(err)) + log.Ctx(ctx).Warn("high level restful api, fail to deal with upsert data", zap.Any("body", body), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData), HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(), @@ -654,15 +673,15 @@ func (h *HandlersV2) upsert(c *gin.Context, ctx *context.Context, anyReq any, db } req.FieldsData, err = anyToColumns(httpReq.Data, collSchema) if err != nil { - log.Warn("high level restful api, fail to deal with upsert data", zap.Any("data", httpReq.Data), zap.Error(err)) + log.Ctx(ctx).Warn("high level restful api, fail to deal with upsert data", zap.Any("data", httpReq.Data), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData), HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(), }) return nil, err } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.Upsert(*reqCtx, req.(*milvuspb.UpsertRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.Upsert(reqCtx, req.(*milvuspb.UpsertRequest)) }) if err == nil { upsertResp := resp.(*milvuspb.MutationResult) @@ -686,7 +705,7 @@ func (h *HandlersV2) upsert(c *gin.Context, ctx *context.Context, anyReq any, db return resp, err } -func (h *HandlersV2) search(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) search(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*SearchReqV2) params := map[string]interface{}{ // auto generated mapping "level": int(commonpb.ConsistencyLevel_Bounded), @@ -696,7 +715,7 @@ func (h *HandlersV2) search(c *gin.Context, ctx *context.Context, anyReq any, db rangeFilter, rangeFilterOk := httpReq.Params[ParamRangeFilter] if rangeFilterOk { if !radiusOk { - log.Warn("high level restful api, search params invalid, because only " + ParamRangeFilter) + log.Ctx(ctx).Warn("high level restful api, search params invalid, because only " + ParamRangeFilter) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: invalid search params", @@ -728,8 +747,8 @@ func (h *HandlersV2) search(c *gin.Context, ctx *context.Context, anyReq any, db GuaranteeTimestamp: BoundedTimestamp, Nq: int64(1), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.Search(*reqCtx, req.(*milvuspb.SearchRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.Search(reqCtx, req.(*milvuspb.SearchRequest)) }) if err == nil { searchResp := resp.(*milvuspb.SearchResults) @@ -739,7 +758,7 @@ func (h *HandlersV2) search(c *gin.Context, ctx *context.Context, anyReq any, db allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64)) outputData, err := buildQueryResp(searchResp.Results.TopK, searchResp.Results.OutputFields, searchResp.Results.FieldsData, searchResp.Results.Ids, searchResp.Results.Scores, allowJS) if err != nil { - log.Warn("high level restful api, fail to deal with search result", zap.Any("result", searchResp.Results), zap.Error(err)) + log.Ctx(ctx).Warn("high level restful api, fail to deal with search result", zap.Any("result", searchResp.Results), zap.Error(err)) c.JSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult), HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(), @@ -752,7 +771,7 @@ func (h *HandlersV2) search(c *gin.Context, ctx *context.Context, anyReq any, db return resp, err } -func (h *HandlersV2) createCollection(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) createCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*CollectionReq) var schema []byte vectorFieldNum := 0 @@ -836,7 +855,7 @@ func (h *HandlersV2) createCollection(c *gin.Context, ctx *context.Context, anyR schema, err = proto.Marshal(&collSchema) } if err != nil { - log.Warn("high level restful api, marshal collection schema fail", zap.Any("request", httpReq), zap.Error(err)) + log.Ctx(ctx).Warn("high level restful api, marshal collection schema fail", zap.Any("request", httpReq), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrMarshalCollectionSchema), HTTPReturnMessage: merr.ErrMarshalCollectionSchema.Error() + ", error: " + err.Error(), @@ -850,8 +869,8 @@ func (h *HandlersV2) createCollection(c *gin.Context, ctx *context.Context, anyR ShardsNum: ShardNumDefault, ConsistencyLevel: commonpb.ConsistencyLevel_Bounded, } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.CreateCollection(*reqCtx, req.(*milvuspb.CreateCollectionRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.CreateCollection(reqCtx, req.(*milvuspb.CreateCollectionRequest)) }) if err != nil { return resp, err @@ -868,8 +887,8 @@ func (h *HandlersV2) createCollection(c *gin.Context, ctx *context.Context, anyR IndexName: VectorFieldName, ExtraParams: []*commonpb.KeyValuePair{{Key: common.MetricTypeKey, Value: httpReq.MetricsType}}, } - statusResponse, err := wrapperProxy(c, ctx, createIndexReq, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.CreateIndex(*ctx, req.(*milvuspb.CreateIndexRequest)) + statusResponse, err := wrapperProxy(ctx, c, createIndexReq, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.CreateIndex(ctx, req.(*milvuspb.CreateIndexRequest)) }) if err != nil { return statusResponse, err @@ -883,8 +902,8 @@ func (h *HandlersV2) createCollection(c *gin.Context, ctx *context.Context, anyR IndexName: indexParam.IndexName, ExtraParams: []*commonpb.KeyValuePair{{Key: common.MetricTypeKey, Value: indexParam.MetricsType}}, } - statusResponse, err := wrapperProxy(c, ctx, createIndexReq, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.CreateIndex(*ctx, req.(*milvuspb.CreateIndexRequest)) + statusResponse, err := wrapperProxy(ctx, c, createIndexReq, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.CreateIndex(ctx, req.(*milvuspb.CreateIndexRequest)) }) if err != nil { return statusResponse, err @@ -895,8 +914,8 @@ func (h *HandlersV2) createCollection(c *gin.Context, ctx *context.Context, anyR DbName: dbName, CollectionName: httpReq.CollectionName, } - statusResponse, err := wrapperProxy(c, ctx, loadReq, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.LoadCollection(*ctx, req.(*milvuspb.LoadCollectionRequest)) + statusResponse, err := wrapperProxy(ctx, c, loadReq, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.LoadCollection(ctx, req.(*milvuspb.LoadCollectionRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -904,14 +923,14 @@ func (h *HandlersV2) createCollection(c *gin.Context, ctx *context.Context, anyR return statusResponse, err } -func (h *HandlersV2) listPartitions(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) listPartitions(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) req := &milvuspb.ShowPartitionsRequest{ DbName: dbName, CollectionName: collectionGetter.GetCollectionName(), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.ShowPartitions(*reqCtx, req.(*milvuspb.ShowPartitionsRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.ShowPartitions(reqCtx, req.(*milvuspb.ShowPartitionsRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnList(resp.(*milvuspb.ShowPartitionsResponse).PartitionNames)) @@ -919,7 +938,7 @@ func (h *HandlersV2) listPartitions(c *gin.Context, ctx *context.Context, anyReq return resp, err } -func (h *HandlersV2) hasPartitions(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) hasPartitions(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) partitionGetter, _ := anyReq.(requestutil.PartitionNameGetter) req := &milvuspb.HasPartitionRequest{ @@ -927,8 +946,8 @@ func (h *HandlersV2) hasPartitions(c *gin.Context, ctx *context.Context, anyReq CollectionName: collectionGetter.GetCollectionName(), PartitionName: partitionGetter.GetPartitionName(), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.HasPartition(*reqCtx, req.(*milvuspb.HasPartitionRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.HasPartition(reqCtx, req.(*milvuspb.HasPartitionRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnHas(resp.(*milvuspb.BoolResponse).Value)) @@ -938,7 +957,7 @@ func (h *HandlersV2) hasPartitions(c *gin.Context, ctx *context.Context, anyReq // data coord will collect partitions' row_count // proxy grpc call only support partition not partitions -func (h *HandlersV2) statsPartition(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) statsPartition(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) partitionGetter, _ := anyReq.(requestutil.PartitionNameGetter) req := &milvuspb.GetPartitionStatisticsRequest{ @@ -946,8 +965,8 @@ func (h *HandlersV2) statsPartition(c *gin.Context, ctx *context.Context, anyReq CollectionName: collectionGetter.GetCollectionName(), PartitionName: partitionGetter.GetPartitionName(), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.GetPartitionStatistics(*reqCtx, req.(*milvuspb.GetPartitionStatisticsRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.GetPartitionStatistics(reqCtx, req.(*milvuspb.GetPartitionStatisticsRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnRowCount(resp.(*milvuspb.GetPartitionStatisticsResponse).Stats)) @@ -955,7 +974,7 @@ func (h *HandlersV2) statsPartition(c *gin.Context, ctx *context.Context, anyReq return resp, err } -func (h *HandlersV2) createPartition(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) createPartition(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) partitionGetter, _ := anyReq.(requestutil.PartitionNameGetter) req := &milvuspb.CreatePartitionRequest{ @@ -963,8 +982,8 @@ func (h *HandlersV2) createPartition(c *gin.Context, ctx *context.Context, anyRe CollectionName: collectionGetter.GetCollectionName(), PartitionName: partitionGetter.GetPartitionName(), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.CreatePartition(*reqCtx, req.(*milvuspb.CreatePartitionRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.CreatePartition(reqCtx, req.(*milvuspb.CreatePartitionRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -972,7 +991,7 @@ func (h *HandlersV2) createPartition(c *gin.Context, ctx *context.Context, anyRe return resp, err } -func (h *HandlersV2) dropPartition(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) dropPartition(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) partitionGetter, _ := anyReq.(requestutil.PartitionNameGetter) req := &milvuspb.DropPartitionRequest{ @@ -980,8 +999,8 @@ func (h *HandlersV2) dropPartition(c *gin.Context, ctx *context.Context, anyReq CollectionName: collectionGetter.GetCollectionName(), PartitionName: partitionGetter.GetPartitionName(), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.DropPartition(*reqCtx, req.(*milvuspb.DropPartitionRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.DropPartition(reqCtx, req.(*milvuspb.DropPartitionRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -989,15 +1008,15 @@ func (h *HandlersV2) dropPartition(c *gin.Context, ctx *context.Context, anyReq return resp, err } -func (h *HandlersV2) loadPartitions(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) loadPartitions(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*PartitionsReq) req := &milvuspb.LoadPartitionsRequest{ DbName: dbName, CollectionName: httpReq.CollectionName, PartitionNames: httpReq.PartitionNames, } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.LoadPartitions(*reqCtx, req.(*milvuspb.LoadPartitionsRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.LoadPartitions(reqCtx, req.(*milvuspb.LoadPartitionsRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -1005,15 +1024,15 @@ func (h *HandlersV2) loadPartitions(c *gin.Context, ctx *context.Context, anyReq return resp, err } -func (h *HandlersV2) releasePartitions(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) releasePartitions(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*PartitionsReq) req := &milvuspb.ReleasePartitionsRequest{ DbName: dbName, CollectionName: httpReq.CollectionName, PartitionNames: httpReq.PartitionNames, } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.ReleasePartitions(*reqCtx, req.(*milvuspb.ReleasePartitionsRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.ReleasePartitions(reqCtx, req.(*milvuspb.ReleasePartitionsRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -1021,10 +1040,10 @@ func (h *HandlersV2) releasePartitions(c *gin.Context, ctx *context.Context, any return resp, err } -func (h *HandlersV2) listUsers(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) listUsers(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { req := &milvuspb.ListCredUsersRequest{} - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.ListCredUsers(*reqCtx, req.(*milvuspb.ListCredUsersRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.ListCredUsers(reqCtx, req.(*milvuspb.ListCredUsersRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnList(resp.(*milvuspb.ListCredUsersResponse).Usernames)) @@ -1032,7 +1051,7 @@ func (h *HandlersV2) listUsers(c *gin.Context, ctx *context.Context, anyReq any, return resp, err } -func (h *HandlersV2) describeUser(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) describeUser(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { userNameGetter, _ := anyReq.(UserNameGetter) userName := userNameGetter.GetUserName() req := &milvuspb.SelectUserRequest{ @@ -1041,8 +1060,8 @@ func (h *HandlersV2) describeUser(c *gin.Context, ctx *context.Context, anyReq a }, IncludeRoleInfo: true, } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.SelectUser(*reqCtx, req.(*milvuspb.SelectUserRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.SelectUser(reqCtx, req.(*milvuspb.SelectUserRequest)) }) if err == nil { roleNames := []string{} @@ -1058,14 +1077,14 @@ func (h *HandlersV2) describeUser(c *gin.Context, ctx *context.Context, anyReq a return resp, err } -func (h *HandlersV2) createUser(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) createUser(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*PasswordReq) req := &milvuspb.CreateCredentialRequest{ Username: httpReq.UserName, Password: httpReq.Password, } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.CreateCredential(*reqCtx, req.(*milvuspb.CreateCredentialRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.CreateCredential(reqCtx, req.(*milvuspb.CreateCredentialRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -1073,15 +1092,15 @@ func (h *HandlersV2) createUser(c *gin.Context, ctx *context.Context, anyReq any return resp, err } -func (h *HandlersV2) updateUser(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) updateUser(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*NewPasswordReq) req := &milvuspb.UpdateCredentialRequest{ Username: httpReq.UserName, OldPassword: httpReq.Password, NewPassword: httpReq.NewPassword, } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.UpdateCredential(*reqCtx, req.(*milvuspb.UpdateCredentialRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.UpdateCredential(reqCtx, req.(*milvuspb.UpdateCredentialRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -1089,13 +1108,13 @@ func (h *HandlersV2) updateUser(c *gin.Context, ctx *context.Context, anyReq any return resp, err } -func (h *HandlersV2) dropUser(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) dropUser(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { getter, _ := anyReq.(UserNameGetter) req := &milvuspb.DeleteCredentialRequest{ Username: getter.GetUserName(), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.DeleteCredential(*reqCtx, req.(*milvuspb.DeleteCredentialRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.DeleteCredential(reqCtx, req.(*milvuspb.DeleteCredentialRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -1103,14 +1122,14 @@ func (h *HandlersV2) dropUser(c *gin.Context, ctx *context.Context, anyReq any, return resp, err } -func (h *HandlersV2) operateRoleToUser(c *gin.Context, ctx *context.Context, userName, roleName string, operateType milvuspb.OperateUserRoleType) (interface{}, error) { +func (h *HandlersV2) operateRoleToUser(ctx context.Context, c *gin.Context, userName, roleName string, operateType milvuspb.OperateUserRoleType) (interface{}, error) { req := &milvuspb.OperateUserRoleRequest{ Username: userName, RoleName: roleName, Type: operateType, } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.OperateUserRole(*reqCtx, req.(*milvuspb.OperateUserRoleRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.OperateUserRole(reqCtx, req.(*milvuspb.OperateUserRoleRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -1118,18 +1137,18 @@ func (h *HandlersV2) operateRoleToUser(c *gin.Context, ctx *context.Context, use return resp, err } -func (h *HandlersV2) addRoleToUser(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { - return h.operateRoleToUser(c, ctx, anyReq.(*UserRoleReq).UserName, anyReq.(*UserRoleReq).RoleName, milvuspb.OperateUserRoleType_AddUserToRole) +func (h *HandlersV2) addRoleToUser(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { + return h.operateRoleToUser(ctx, c, anyReq.(*UserRoleReq).UserName, anyReq.(*UserRoleReq).RoleName, milvuspb.OperateUserRoleType_AddUserToRole) } -func (h *HandlersV2) removeRoleFromUser(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { - return h.operateRoleToUser(c, ctx, anyReq.(*UserRoleReq).UserName, anyReq.(*UserRoleReq).RoleName, milvuspb.OperateUserRoleType_RemoveUserFromRole) +func (h *HandlersV2) removeRoleFromUser(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { + return h.operateRoleToUser(ctx, c, anyReq.(*UserRoleReq).UserName, anyReq.(*UserRoleReq).RoleName, milvuspb.OperateUserRoleType_RemoveUserFromRole) } -func (h *HandlersV2) listRoles(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) listRoles(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { req := &milvuspb.SelectRoleRequest{} - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.SelectRole(*reqCtx, req.(*milvuspb.SelectRoleRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.SelectRole(reqCtx, req.(*milvuspb.SelectRoleRequest)) }) if err == nil { roleNames := []string{} @@ -1141,13 +1160,13 @@ func (h *HandlersV2) listRoles(c *gin.Context, ctx *context.Context, anyReq any, return resp, err } -func (h *HandlersV2) describeRole(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) describeRole(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { getter, _ := anyReq.(RoleNameGetter) req := &milvuspb.SelectGrantRequest{ Entity: &milvuspb.GrantEntity{Role: &milvuspb.RoleEntity{Name: getter.GetRoleName()}}, } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.SelectGrant(*reqCtx, req.(*milvuspb.SelectGrantRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.SelectGrant(reqCtx, req.(*milvuspb.SelectGrantRequest)) }) if err == nil { privileges := [](map[string]string){} @@ -1166,13 +1185,13 @@ func (h *HandlersV2) describeRole(c *gin.Context, ctx *context.Context, anyReq a return resp, err } -func (h *HandlersV2) createRole(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) createRole(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { getter, _ := anyReq.(RoleNameGetter) req := &milvuspb.CreateRoleRequest{ Entity: &milvuspb.RoleEntity{Name: getter.GetRoleName()}, } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.CreateRole(*reqCtx, req.(*milvuspb.CreateRoleRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.CreateRole(reqCtx, req.(*milvuspb.CreateRoleRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -1180,13 +1199,13 @@ func (h *HandlersV2) createRole(c *gin.Context, ctx *context.Context, anyReq any return resp, err } -func (h *HandlersV2) dropRole(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) dropRole(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { getter, _ := anyReq.(RoleNameGetter) req := &milvuspb.DropRoleRequest{ RoleName: getter.GetRoleName(), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.DropRole(*reqCtx, req.(*milvuspb.DropRoleRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.DropRole(reqCtx, req.(*milvuspb.DropRoleRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -1194,7 +1213,7 @@ func (h *HandlersV2) dropRole(c *gin.Context, ctx *context.Context, anyReq any, return resp, err } -func (h *HandlersV2) operatePrivilegeToRole(c *gin.Context, ctx *context.Context, httpReq *GrantReq, operateType milvuspb.OperatePrivilegeType, dbName string) (interface{}, error) { +func (h *HandlersV2) operatePrivilegeToRole(ctx context.Context, c *gin.Context, httpReq *GrantReq, operateType milvuspb.OperatePrivilegeType, dbName string) (interface{}, error) { req := &milvuspb.OperatePrivilegeRequest{ Entity: &milvuspb.GrantEntity{ Role: &milvuspb.RoleEntity{Name: httpReq.RoleName}, @@ -1207,8 +1226,8 @@ func (h *HandlersV2) operatePrivilegeToRole(c *gin.Context, ctx *context.Context }, Type: operateType, } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.OperatePrivilege(*reqCtx, req.(*milvuspb.OperatePrivilegeRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.OperatePrivilege(reqCtx, req.(*milvuspb.OperatePrivilegeRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -1216,23 +1235,23 @@ func (h *HandlersV2) operatePrivilegeToRole(c *gin.Context, ctx *context.Context return resp, err } -func (h *HandlersV2) addPrivilegeToRole(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { - return h.operatePrivilegeToRole(c, ctx, anyReq.(*GrantReq), milvuspb.OperatePrivilegeType_Grant, dbName) +func (h *HandlersV2) addPrivilegeToRole(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { + return h.operatePrivilegeToRole(ctx, c, anyReq.(*GrantReq), milvuspb.OperatePrivilegeType_Grant, dbName) } -func (h *HandlersV2) removePrivilegeFromRole(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { - return h.operatePrivilegeToRole(c, ctx, anyReq.(*GrantReq), milvuspb.OperatePrivilegeType_Revoke, dbName) +func (h *HandlersV2) removePrivilegeFromRole(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { + return h.operatePrivilegeToRole(ctx, c, anyReq.(*GrantReq), milvuspb.OperatePrivilegeType_Revoke, dbName) } -func (h *HandlersV2) listIndexes(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) listIndexes(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) indexNames := []string{} req := &milvuspb.DescribeIndexRequest{ DbName: dbName, CollectionName: collectionGetter.GetCollectionName(), } - resp, err := wrapperProxy(c, ctx, req, false, false, func(reqCtx *context.Context, req any) (any, error) { - return h.proxy.DescribeIndex(*reqCtx, req.(*milvuspb.DescribeIndexRequest)) + resp, err := wrapperProxy(ctx, c, req, false, false, func(reqCtx context.Context, req any) (any, error) { + return h.proxy.DescribeIndex(reqCtx, req.(*milvuspb.DescribeIndexRequest)) }) if err != nil { return resp, err @@ -1244,7 +1263,7 @@ func (h *HandlersV2) listIndexes(c *gin.Context, ctx *context.Context, anyReq an return resp, err } -func (h *HandlersV2) describeIndex(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) describeIndex(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) indexGetter, _ := anyReq.(IndexNameGetter) req := &milvuspb.DescribeIndexRequest{ @@ -1252,8 +1271,8 @@ func (h *HandlersV2) describeIndex(c *gin.Context, ctx *context.Context, anyReq CollectionName: collectionGetter.GetCollectionName(), IndexName: indexGetter.GetIndexName(), } - resp, err := wrapperProxy(c, ctx, req, false, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.DescribeIndex(*reqCtx, req.(*milvuspb.DescribeIndexRequest)) + resp, err := wrapperProxy(ctx, c, req, false, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.DescribeIndex(reqCtx, req.(*milvuspb.DescribeIndexRequest)) }) if err == nil { indexInfos := [](map[string]any){} @@ -1285,7 +1304,7 @@ func (h *HandlersV2) describeIndex(c *gin.Context, ctx *context.Context, anyReq return resp, err } -func (h *HandlersV2) createIndex(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) createIndex(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { httpReq := anyReq.(*IndexParamReq) for _, indexParam := range httpReq.IndexParams { req := &milvuspb.CreateIndexRequest{ @@ -1300,18 +1319,18 @@ func (h *HandlersV2) createIndex(c *gin.Context, ctx *context.Context, anyReq an if indexParam.IndexType != "" { req.ExtraParams = append(req.ExtraParams, &commonpb.KeyValuePair{Key: common.IndexTypeKey, Value: indexParam.IndexType}) } - resp, err := wrapperProxy(c, ctx, req, false, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.CreateIndex(*reqCtx, req.(*milvuspb.CreateIndexRequest)) + resp, err := wrapperProxy(ctx, c, req, false, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.CreateIndex(reqCtx, req.(*milvuspb.CreateIndexRequest)) }) if err != nil { return resp, err } } c.JSON(http.StatusOK, wrapperReturnDefault()) - return nil, nil + return httpReq.IndexParams, nil } -func (h *HandlersV2) dropIndex(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) dropIndex(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { collGetter, _ := anyReq.(requestutil.CollectionNameGetter) indexGetter, _ := anyReq.(IndexNameGetter) req := &milvuspb.DropIndexRequest{ @@ -1319,8 +1338,8 @@ func (h *HandlersV2) dropIndex(c *gin.Context, ctx *context.Context, anyReq any, CollectionName: collGetter.GetCollectionName(), IndexName: indexGetter.GetIndexName(), } - resp, err := wrapperProxy(c, ctx, req, false, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.DropIndex(*reqCtx, req.(*milvuspb.DropIndexRequest)) + resp, err := wrapperProxy(ctx, c, req, false, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.DropIndex(reqCtx, req.(*milvuspb.DropIndexRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -1328,12 +1347,12 @@ func (h *HandlersV2) dropIndex(c *gin.Context, ctx *context.Context, anyReq any, return resp, err } -func (h *HandlersV2) listAlias(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) listAlias(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { req := &milvuspb.ListAliasesRequest{ DbName: dbName, } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.ListAliases(*reqCtx, req.(*milvuspb.ListAliasesRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.ListAliases(reqCtx, req.(*milvuspb.ListAliasesRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnList(resp.(*milvuspb.ListAliasesResponse).Aliases)) @@ -1341,14 +1360,14 @@ func (h *HandlersV2) listAlias(c *gin.Context, ctx *context.Context, anyReq any, return resp, err } -func (h *HandlersV2) describeAlias(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) describeAlias(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { getter, _ := anyReq.(AliasNameGetter) req := &milvuspb.DescribeAliasRequest{ DbName: dbName, Alias: getter.GetAliasName(), } - resp, err := wrapperProxy(c, ctx, req, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.DescribeAlias(*reqCtx, req.(*milvuspb.DescribeAliasRequest)) + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.DescribeAlias(reqCtx, req.(*milvuspb.DescribeAliasRequest)) }) if err == nil { response := resp.(*milvuspb.DescribeAliasResponse) @@ -1361,7 +1380,7 @@ func (h *HandlersV2) describeAlias(c *gin.Context, ctx *context.Context, anyReq return resp, err } -func (h *HandlersV2) createAlias(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) createAlias(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) aliasGetter, _ := anyReq.(AliasNameGetter) req := &milvuspb.CreateAliasRequest{ @@ -1369,8 +1388,8 @@ func (h *HandlersV2) createAlias(c *gin.Context, ctx *context.Context, anyReq an CollectionName: collectionGetter.GetCollectionName(), Alias: aliasGetter.GetAliasName(), } - resp, err := wrapperProxy(c, ctx, req, false, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.CreateAlias(*reqCtx, req.(*milvuspb.CreateAliasRequest)) + resp, err := wrapperProxy(ctx, c, req, false, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.CreateAlias(reqCtx, req.(*milvuspb.CreateAliasRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -1378,14 +1397,14 @@ func (h *HandlersV2) createAlias(c *gin.Context, ctx *context.Context, anyReq an return resp, err } -func (h *HandlersV2) dropAlias(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) dropAlias(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { getter, _ := anyReq.(AliasNameGetter) req := &milvuspb.DropAliasRequest{ DbName: dbName, Alias: getter.GetAliasName(), } - resp, err := wrapperProxy(c, ctx, req, false, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.DropAlias(*reqCtx, req.(*milvuspb.DropAliasRequest)) + resp, err := wrapperProxy(ctx, c, req, false, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.DropAlias(reqCtx, req.(*milvuspb.DropAliasRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -1393,7 +1412,7 @@ func (h *HandlersV2) dropAlias(c *gin.Context, ctx *context.Context, anyReq any, return resp, err } -func (h *HandlersV2) alterAlias(c *gin.Context, ctx *context.Context, anyReq any, dbName string) (interface{}, error) { +func (h *HandlersV2) alterAlias(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) aliasGetter, _ := anyReq.(AliasNameGetter) req := &milvuspb.AlterAliasRequest{ @@ -1401,8 +1420,8 @@ func (h *HandlersV2) alterAlias(c *gin.Context, ctx *context.Context, anyReq any CollectionName: collectionGetter.GetCollectionName(), Alias: aliasGetter.GetAliasName(), } - resp, err := wrapperProxy(c, ctx, req, false, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.AlterAlias(*reqCtx, req.(*milvuspb.AlterAliasRequest)) + resp, err := wrapperProxy(ctx, c, req, false, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.AlterAlias(reqCtx, req.(*milvuspb.AlterAliasRequest)) }) if err == nil { c.JSON(http.StatusOK, wrapperReturnDefault()) @@ -1410,8 +1429,106 @@ func (h *HandlersV2) alterAlias(c *gin.Context, ctx *context.Context, anyReq any return resp, err } -func (h *HandlersV2) GetCollectionSchema(c *gin.Context, ctx *context.Context, collectionName, dbName string) (*schemapb.CollectionSchema, error) { - collSchema, err := proxy.GetCachedCollectionSchema(*ctx, dbName, collectionName) +func (h *HandlersV2) listImportJob(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { + collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) + limitGetter, _ := anyReq.(LimitGetter) + req := &milvuspb.ListImportTasksRequest{ + CollectionName: collectionGetter.GetCollectionName(), + Limit: int64(limitGetter.GetLimit()), + DbName: dbName, + } + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.ListImportTasks(reqCtx, req.(*milvuspb.ListImportTasksRequest)) + }) + if err == nil { + returnData := []map[string]interface{}{} + for _, job := range resp.(*milvuspb.ListImportTasksResponse).Tasks { + taskDetail := map[string]interface{}{ + "taskID": job.Id, + "state": job.State.String(), + "dbName": dbName, + "collectionName": collectionGetter.GetCollectionName(), + "createTimestamp": strconv.FormatInt(job.CreateTs, 10), + } + for _, info := range job.Infos { + switch info.Key { + case "collection": + taskDetail["collectionName"] = info.Value + case "partition": + taskDetail["partitionName"] = info.Value + case "persist_cost": + taskDetail["persistCost"] = info.Value + case "progress_percent": + taskDetail["progressPercent"] = info.Value + case "failed_reason": + if info.Value != "" { + taskDetail[HTTPReturnIndexFailReason] = info.Value + } + } + } + returnData = append(returnData, taskDetail) + } + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: returnData}) + } + return resp, err +} + +func (h *HandlersV2) createImportJob(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { + collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter) + fileNamesGetter, _ := anyReq.(FileNamesGetter) + req := &milvuspb.ImportRequest{ + CollectionName: collectionGetter.GetCollectionName(), + DbName: dbName, + Files: fileNamesGetter.GetFileNames(), + } + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.Import(reqCtx, req.(*milvuspb.ImportRequest)) + }) + if err == nil { + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: resp.(*milvuspb.ImportResponse).Tasks}) + } + return resp, err +} + +func (h *HandlersV2) getImportJobProcess(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { + taskIDGetter, _ := anyReq.(TaskIDGetter) + req := &milvuspb.GetImportStateRequest{ + Task: taskIDGetter.GetTaskID(), + } + resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.GetImportState(reqCtx, req.(*milvuspb.GetImportStateRequest)) + }) + if err == nil { + response := resp.(*milvuspb.GetImportStateResponse) + returnData := map[string]interface{}{ + "taskID": response.Id, + "state": response.State.String(), + "dbName": dbName, + "createTimestamp": strconv.FormatInt(response.CreateTs, 10), + } + for _, info := range response.Infos { + switch info.Key { + case "collection": + returnData["collectionName"] = info.Value + case "partition": + returnData["partitionName"] = info.Value + case "persist_cost": + returnData["persistCost"] = info.Value + case "progress_percent": + returnData["progressPercent"] = info.Value + case "failed_reason": + if info.Value != "" { + returnData[HTTPReturnIndexFailReason] = info.Value + } + } + } + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: returnData}) + } + return resp, err +} + +func (h *HandlersV2) GetCollectionSchema(ctx context.Context, c *gin.Context, dbName, collectionName string) (*schemapb.CollectionSchema, error) { + collSchema, err := proxy.GetCachedCollectionSchema(ctx, dbName, collectionName) if err == nil { return collSchema.CollectionSchema, nil } @@ -1419,8 +1536,8 @@ func (h *HandlersV2) GetCollectionSchema(c *gin.Context, ctx *context.Context, c DbName: dbName, CollectionName: collectionName, } - descResp, err := wrapperProxy(c, ctx, descReq, h.checkAuth, false, func(reqCtx *context.Context, req any) (interface{}, error) { - return h.proxy.DescribeCollection(*reqCtx, req.(*milvuspb.DescribeCollectionRequest)) + descResp, err := wrapperProxy(ctx, c, descReq, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { + return h.proxy.DescribeCollection(reqCtx, req.(*milvuspb.DescribeCollectionRequest)) }) if err != nil { return nil, err diff --git a/internal/distributed/proxy/httpserver/handler_v2_test.go b/internal/distributed/proxy/httpserver/handler_v2_test.go index 00187bf933..76e628b2e5 100644 --- a/internal/distributed/proxy/httpserver/handler_v2_test.go +++ b/internal/distributed/proxy/httpserver/handler_v2_test.go @@ -50,7 +50,7 @@ func TestHTTPWrapper(t *testing.T) { ginHandler := gin.Default() app := ginHandler.Group("", genAuthMiddleWare(false)) path := "/wrapper/post" - app.POST(path, wrapperPost(func() any { return &DefaultReq{} }, func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) { + app.POST(path, wrapperPost(func() any { return &DefaultReq{} }, func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) { return nil, nil })) postTestCases = append(postTestCases, requestBodyTestCase{ @@ -58,7 +58,7 @@ func TestHTTPWrapper(t *testing.T) { requestBody: []byte(`{}`), }) path = "/wrapper/post/param" - app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) { + app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) { return nil, nil })) postTestCases = append(postTestCases, requestBodyTestCase{ @@ -71,6 +71,12 @@ func TestHTTPWrapper(t *testing.T) { errMsg: "missing required parameters, error: Key: 'CollectionNameReq.CollectionName' Error:Field validation for 'CollectionName' failed on the 'required' tag", errCode: 1802, // ErrMissingRequiredParameters }) + postTestCases = append(postTestCases, requestBodyTestCase{ + path: path, + requestBody: []byte(``), + errMsg: "can only accept json format request, the request body should be nil, however {} is valid", + errCode: 1801, // ErrIncorrectParameterFormat + }) postTestCases = append(postTestCases, requestBodyTestCase{ path: path, requestBody: []byte(`{"collectionName": "book", "dbName"}`), @@ -78,7 +84,7 @@ func TestHTTPWrapper(t *testing.T) { errCode: 1801, // ErrIncorrectParameterFormat }) path = "/wrapper/post/trace" - app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) { + app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) { return nil, nil }))) postTestCasesTrace = append(postTestCasesTrace, requestBodyTestCase{ @@ -86,7 +92,7 @@ func TestHTTPWrapper(t *testing.T) { requestBody: []byte(`{"collectionName": "` + DefaultCollectionName + `"}`), }) path = "/wrapper/post/trace/wrong" - app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) { + app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) { return nil, merr.ErrCollectionNotFound }))) postTestCasesTrace = append(postTestCasesTrace, requestBodyTestCase{ @@ -94,8 +100,8 @@ func TestHTTPWrapper(t *testing.T) { requestBody: []byte(`{"collectionName": "` + DefaultCollectionName + `"}`), }) path = "/wrapper/post/trace/call" - app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) { - return wrapperProxy(c, ctx, req, false, false, func(reqCtx *context.Context, req any) (any, error) { + app.POST(path, wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) { + return wrapperProxy(ctx, c, req, false, false, func(reqctx context.Context, req any) (any, error) { return nil, nil }) }))) @@ -150,17 +156,17 @@ func TestGrpcWrapper(t *testing.T) { app := ginHandler.Group("") appNeedAuth := ginHandler.Group(needAuthPrefix, genAuthMiddleWare(true)) path := "/wrapper/grpc/-0" - handle := func(reqCtx *context.Context, req any) (any, error) { + handle := func(reqctx context.Context, req any) (any, error) { return nil, nil } app.GET(path, func(c *gin.Context) { ctx := proxy.NewContextWithMetadata(c, "", DefaultDbName) - wrapperProxy(c, &ctx, &DefaultReq{}, false, false, handle) + wrapperProxy(ctx, c, &DefaultReq{}, false, false, handle) }) appNeedAuth.GET(path, func(c *gin.Context) { username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), DefaultDbName) - wrapperProxy(c, &ctx, &milvuspb.DescribeCollectionRequest{}, true, false, handle) + wrapperProxy(ctx, c, &milvuspb.DescribeCollectionRequest{}, true, false, handle) }) getTestCases = append(getTestCases, rawTestCase{ path: path, @@ -169,17 +175,17 @@ func TestGrpcWrapper(t *testing.T) { path: needAuthPrefix + path, }) path = "/wrapper/grpc/01" - handle = func(reqCtx *context.Context, req any) (any, error) { + handle = func(reqctx context.Context, req any) (any, error) { return nil, merr.ErrNeedAuthenticate // 1800 } app.GET(path, func(c *gin.Context) { ctx := proxy.NewContextWithMetadata(c, "", DefaultDbName) - wrapperProxy(c, &ctx, &DefaultReq{}, false, false, handle) + wrapperProxy(ctx, c, &DefaultReq{}, false, false, handle) }) appNeedAuth.GET(path, func(c *gin.Context) { username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), DefaultDbName) - wrapperProxy(c, &ctx, &milvuspb.DescribeCollectionRequest{}, true, false, handle) + wrapperProxy(ctx, c, &milvuspb.DescribeCollectionRequest{}, true, false, handle) }) getTestCases = append(getTestCases, rawTestCase{ path: path, @@ -189,19 +195,19 @@ func TestGrpcWrapper(t *testing.T) { path: needAuthPrefix + path, }) path = "/wrapper/grpc/00" - handle = func(reqCtx *context.Context, req any) (any, error) { + handle = func(reqctx context.Context, req any) (any, error) { return &milvuspb.BoolResponse{ Status: commonSuccessStatus, }, nil } app.GET(path, func(c *gin.Context) { ctx := proxy.NewContextWithMetadata(c, "", DefaultDbName) - wrapperProxy(c, &ctx, &DefaultReq{}, false, false, handle) + wrapperProxy(ctx, c, &DefaultReq{}, false, false, handle) }) appNeedAuth.GET(path, func(c *gin.Context) { username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), DefaultDbName) - wrapperProxy(c, &ctx, &milvuspb.DescribeCollectionRequest{}, true, false, handle) + wrapperProxy(ctx, c, &milvuspb.DescribeCollectionRequest{}, true, false, handle) }) getTestCases = append(getTestCases, rawTestCase{ path: path, @@ -210,7 +216,7 @@ func TestGrpcWrapper(t *testing.T) { path: needAuthPrefix + path, }) path = "/wrapper/grpc/10" - handle = func(reqCtx *context.Context, req any) (any, error) { + handle = func(reqctx context.Context, req any) (any, error) { return &milvuspb.BoolResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_CollectionNameNotFound, // 28 @@ -220,12 +226,12 @@ func TestGrpcWrapper(t *testing.T) { } app.GET(path, func(c *gin.Context) { ctx := proxy.NewContextWithMetadata(c, "", DefaultDbName) - wrapperProxy(c, &ctx, &DefaultReq{}, false, false, handle) + wrapperProxy(ctx, c, &DefaultReq{}, false, false, handle) }) appNeedAuth.GET(path, func(c *gin.Context) { username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), DefaultDbName) - wrapperProxy(c, &ctx, &milvuspb.DescribeCollectionRequest{}, true, false, handle) + wrapperProxy(ctx, c, &milvuspb.DescribeCollectionRequest{}, true, false, handle) }) getTestCases = append(getTestCases, rawTestCase{ path: path, @@ -294,7 +300,7 @@ func TestTimeout(t *testing.T) { headers: map[string]string{HTTPHeaderRequestTimeout: "5"}, }) path = "/middleware/timeout/10" - // app.GET(path, wrapper(wrapperTimeout(func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) { + // app.GET(path, wrapper(wrapperTimeout(func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) { app.GET(path, timeoutMiddleware(func(c *gin.Context) { time.Sleep(10 * time.Second) })) @@ -310,7 +316,7 @@ func TestTimeout(t *testing.T) { status: http.StatusRequestTimeout, }) path = "/middleware/timeout/60" - // app.GET(path, wrapper(wrapperTimeout(func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) { + // app.GET(path, wrapper(wrapperTimeout(func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) { app.GET(path, timeoutMiddleware(func(c *gin.Context) { time.Sleep(60 * time.Second) })) @@ -372,7 +378,7 @@ func TestDatabaseWrapper(t *testing.T) { ginHandler := gin.Default() app := ginHandler.Group("", genAuthMiddleWare(false)) path := "/wrapper/database" - app.POST(path, wrapperPost(func() any { return &DatabaseReq{} }, h.wrapperCheckDatabase(func(c *gin.Context, ctx *context.Context, req any, dbName string) (interface{}, error) { + app.POST(path, wrapperPost(func() any { return &DatabaseReq{} }, h.wrapperCheckDatabase(func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) { return nil, nil }))) postTestCases = append(postTestCases, requestBodyTestCase{ @@ -656,6 +662,46 @@ func TestMethodGet(t *testing.T) { Status: &StatusSuccess, Alias: DefaultAliasName, }, nil).Once() + mp.EXPECT().ListImportTasks(mock.Anything, mock.Anything).Return(&milvuspb.ListImportTasksResponse{ + Status: &StatusSuccess, + Tasks: []*milvuspb.GetImportStateResponse{ + { + Status: &StatusSuccess, + State: 6, + Infos: []*commonpb.KeyValuePair{ + {Key: "collection", Value: DefaultCollectionName}, + {Key: "partition", Value: DefaultPartitionName}, + {Key: "persist_cost", Value: "0.23"}, + {Key: "progress_percent", Value: "100"}, + {Key: "failed_reason"}, + }, + Id: 1234567890, + }, + { + Status: &StatusSuccess, + State: 0, + Infos: []*commonpb.KeyValuePair{ + {Key: "collection", Value: DefaultCollectionName}, + {Key: "partition", Value: DefaultPartitionName}, + {Key: "progress_percent", Value: "0"}, + {Key: "failed_reason", Value: "failed to get file size of "}, + }, + Id: 123456789, + }, + }, + }, nil).Once() + mp.EXPECT().GetImportState(mock.Anything, mock.Anything).Return(&milvuspb.GetImportStateResponse{ + Status: &StatusSuccess, + State: 6, + Infos: []*commonpb.KeyValuePair{ + {Key: "collection", Value: DefaultCollectionName}, + {Key: "partition", Value: DefaultPartitionName}, + {Key: "persist_cost", Value: "0.23"}, + {Key: "progress_percent", Value: "100"}, + {Key: "failed_reason"}, + }, + Id: 1234567890, + }, nil).Once() testEngine := initHTTPServerV2(mp, false) queryTestCases := []rawTestCase{} @@ -720,6 +766,12 @@ func TestMethodGet(t *testing.T) { queryTestCases = append(queryTestCases, rawTestCase{ path: versionalV2(AliasCategory, DescribeAction), }) + queryTestCases = append(queryTestCases, rawTestCase{ + path: versionalV2(ImportJobCategory, ListAction), + }) + queryTestCases = append(queryTestCases, rawTestCase{ + path: versionalV2(ImportJobCategory, GetProgressAction), + }) for _, testcase := range queryTestCases { t.Run("query", func(t *testing.T) { @@ -729,7 +781,8 @@ func TestMethodGet(t *testing.T) { `"indexName": "` + DefaultIndexName + `",` + `"userName": "` + util.UserRoot + `",` + `"roleName": "` + util.RoleAdmin + `",` + - `"aliasName": "` + DefaultAliasName + `"` + + `"aliasName": "` + DefaultAliasName + `",` + + `"taskID": 1234567890` + `}`)) req := httptest.NewRequest(http.MethodPost, testcase.path, bodyReader) w := httptest.NewRecorder() @@ -829,6 +882,7 @@ func TestMethodPost(t *testing.T) { mp.EXPECT().CreateIndex(mock.Anything, mock.Anything).Return(commonErrorStatus, nil).Once() mp.EXPECT().CreateAlias(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once() mp.EXPECT().AlterAlias(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once() + mp.EXPECT().Import(mock.Anything, mock.Anything).Return(&milvuspb.ImportResponse{Status: commonSuccessStatus, Tasks: []int64{int64(1234567890)}}, nil).Once() testEngine := initHTTPServerV2(mp, false) queryTestCases := []rawTestCase{} queryTestCases = append(queryTestCases, rawTestCase{ @@ -887,6 +941,9 @@ func TestMethodPost(t *testing.T) { queryTestCases = append(queryTestCases, rawTestCase{ path: versionalV2(AliasCategory, AlterAction), }) + queryTestCases = append(queryTestCases, rawTestCase{ + path: versionalV2(ImportJobCategory, CreateAction), + }) for _, testcase := range queryTestCases { t.Run("query", func(t *testing.T) { @@ -897,7 +954,8 @@ func TestMethodPost(t *testing.T) { `"indexParams": [{"indexName": "` + DefaultIndexName + `", "fieldName": "book_intro", "metricsType": "L2", "indexType": "IVF_FLAT"}],` + `"userName": "` + util.UserRoot + `", "password": "Milvus", "newPassword": "milvus", "roleName": "` + util.RoleAdmin + `",` + `"roleName": "` + util.RoleAdmin + `", "objectType": "Global", "objectName": "*", "privilege": "*",` + - `"aliasName": "` + DefaultAliasName + `"` + + `"aliasName": "` + DefaultAliasName + `",` + + `"files": ["book.json"]` + `}`)) req := httptest.NewRequest(http.MethodPost, testcase.path, bodyReader) w := httptest.NewRecorder() diff --git a/internal/distributed/proxy/httpserver/request_v2.go b/internal/distributed/proxy/httpserver/request_v2.go index ce36a73100..da9428f9fc 100644 --- a/internal/distributed/proxy/httpserver/request_v2.go +++ b/internal/distributed/proxy/httpserver/request_v2.go @@ -17,7 +17,8 @@ func (req *DatabaseReq) GetDbName() string { return req.DbName } type CollectionNameReq struct { DbName string `json:"dbName"` CollectionName string `json:"collectionName" binding:"required"` - PartitionNames []string `json:"partitionNames"` + Limit int32 `json:"limit"` // list import jobs + PartitionNames []string `json:"partitionNames"` // get partitions load state } func (req *CollectionNameReq) GetDbName() string { @@ -28,6 +29,10 @@ func (req *CollectionNameReq) GetCollectionName() string { return req.CollectionName } +func (req *CollectionNameReq) GetLimit() int32 { + return req.Limit +} + func (req *CollectionNameReq) GetPartitionNames() []string { return req.PartitionNames } @@ -52,6 +57,30 @@ func (req *PartitionReq) GetDbName() string { return req.DbName } func (req *PartitionReq) GetCollectionName() string { return req.CollectionName } func (req *PartitionReq) GetPartitionName() string { return req.PartitionName } +type DataFilesReq struct { + DbName string `json:"dbName"` + CollectionName string `json:"collectionName" binding:"required"` + Files []string `json:"files" binding:"required"` +} + +func (req *DataFilesReq) GetDbName() string { + return req.DbName +} + +func (req *DataFilesReq) GetCollectionName() string { + return req.CollectionName +} + +func (req *DataFilesReq) GetFileNames() []string { + return req.Files +} + +type TaskIDReq struct { + TaskID int64 `json:"taskID" binding:"required"` +} + +func (req *TaskIDReq) GetTaskID() int64 { return req.TaskID } + type QueryReqV2 struct { DbName string `json:"dbName"` CollectionName string `json:"collectionName" binding:"required"` @@ -140,6 +169,15 @@ type IndexNameGetter interface { type AliasNameGetter interface { GetAliasName() string } +type LimitGetter interface { + GetLimit() int32 +} +type FileNamesGetter interface { + GetFileNames() []string +} +type TaskIDGetter interface { + GetTaskID() int64 +} type PasswordReq struct { UserName string `json:"userName"` diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 26bdcc2447..d95f36a3a1 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -172,6 +172,25 @@ func (s *Server) startHTTPServer(errChan chan error) { ginHandler := gin.New() ginLogger := gin.LoggerWithConfig(gin.LoggerConfig{ SkipPaths: proxy.Params.ProxyCfg.GinLogSkipPaths.GetAsStrings(), + Formatter: func(param gin.LogFormatterParams) string { + if param.Latency > time.Minute { + param.Latency = param.Latency.Truncate(time.Second) + } + traceID, ok := param.Keys["traceID"] + if !ok { + traceID = "" + } + return fmt.Sprintf("[%v] [GIN] [%s] [traceID=%s] [code=%3d] [latency=%v] [client=%s] [method=%s] [error=%s]\n", + param.TimeStamp.Format("2006/01/02 15:04:05.000 Z07:00"), + param.Path, + traceID, + param.StatusCode, + param.Latency, + param.ClientIP, + param.Method, + param.ErrorMessage, + ) + }, }) ginHandler.Use(ginLogger, gin.Recovery()) httpHeaderAllowInt64 := "false"