mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
Add web server in proxy with sample handler (#15647)
Signed-off-by: shaoyue.chen <shaoyue.chen@zilliz.com>
This commit is contained in:
parent
b0923f1299
commit
83fdc1da7c
@ -63,6 +63,12 @@ rootCoord:
|
||||
proxy:
|
||||
port: 19530
|
||||
|
||||
http:
|
||||
enabled: true # Whether to enable the http server
|
||||
port: 8080 # Whether to enable the http server
|
||||
readTimeout: 30000 # 30000 ms http read timeout
|
||||
writeTimeout: 30000 # 30000 ms http write timeout
|
||||
|
||||
grpc:
|
||||
serverMaxRecvSize: 536870912 # 512 MB, 512 * 1024 * 1024 Bytes
|
||||
serverMaxSendSize: 536870912 # 512 MB, 512 * 1024 * 1024 Bytes
|
||||
|
1
go.mod
1
go.mod
@ -13,6 +13,7 @@ require (
|
||||
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
|
||||
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
|
||||
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
|
||||
github.com/gin-gonic/gin v1.7.7
|
||||
github.com/go-basic/ipv4 v1.0.0
|
||||
github.com/golang/mock v1.5.0
|
||||
github.com/golang/protobuf v1.5.2
|
||||
|
21
go.sum
21
go.sum
@ -168,6 +168,10 @@ github.com/gdamore/tcell v1.3.0/go.mod h1:Hjvr+Ofd+gLglo7RYKxxnzCBmev3BzsS67MebK
|
||||
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
|
||||
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
|
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
|
||||
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
|
||||
github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs=
|
||||
github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U=
|
||||
github.com/go-basic/ipv4 v1.0.0 h1:gjyFAa1USC1hhXTkPOwBWDPfMcUaIM+tvo1XzV9EZxs=
|
||||
github.com/go-basic/ipv4 v1.0.0/go.mod h1:etLBnaxbidQfuqE6wgZQfs38nEWNmzALkxDZe4xY8Dg=
|
||||
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
|
||||
@ -181,6 +185,13 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
|
||||
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
|
||||
github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
|
||||
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
|
||||
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
|
||||
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
|
||||
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
|
||||
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
|
||||
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
|
||||
github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7aM3F26W0hOn+GE=
|
||||
github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
|
||||
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
|
||||
@ -317,6 +328,7 @@ github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9q
|
||||
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
|
||||
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
|
||||
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
|
||||
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
@ -348,6 +360,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
|
||||
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
|
||||
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 h1:IVlcvV0CjvfBYYod5ePe89l+3LBAl//6n9kJ9Vr2i0k=
|
||||
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76/go.mod h1:Iu9BHUvTh8/KpbuSoKx/CaJEdJvFxSverxIy7I+nq7s=
|
||||
github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg=
|
||||
@ -359,6 +373,8 @@ github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaW
|
||||
github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
|
||||
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
|
||||
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
|
||||
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
|
||||
github.com/mattn/go-runewidth v0.0.8/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
||||
@ -533,6 +549,10 @@ github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24sz
|
||||
github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
|
||||
github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ=
|
||||
github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
|
||||
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
|
||||
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
|
||||
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
|
||||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
|
||||
@ -747,6 +767,7 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
41
internal/distributed/proxy/httpserver/handler.go
Normal file
41
internal/distributed/proxy/httpserver/handler.go
Normal file
@ -0,0 +1,41 @@
|
||||
package httpserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
)
|
||||
|
||||
// Handlers handles http requests
|
||||
type Handlers struct {
|
||||
proxy types.ProxyComponent
|
||||
}
|
||||
|
||||
// NewHandlers creates a new Handlers
|
||||
func NewHandlers(proxy types.ProxyComponent) *Handlers {
|
||||
return &Handlers{
|
||||
proxy: proxy,
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterRouters registers routes to given router
|
||||
func (h *Handlers) RegisterRoutesTo(router gin.IRouter) {
|
||||
router.GET("/health", wrapHandler(h.handleGetHealth))
|
||||
router.POST("/dummy", wrapHandler(h.handlePostDummy))
|
||||
}
|
||||
|
||||
func (h *Handlers) handleGetHealth(c *gin.Context) (interface{}, error) {
|
||||
return gin.H{"status": "ok"}, nil
|
||||
}
|
||||
|
||||
func (h *Handlers) handlePostDummy(c *gin.Context) (interface{}, error) {
|
||||
req := milvuspb.DummyRequest{}
|
||||
// use ShouldBind to supports binding JSON, XML, YAML, and protobuf.
|
||||
err := c.ShouldBind(&req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: parse json failed: %v", errBadRequest, err)
|
||||
}
|
||||
return h.proxy.Dummy(c, &req)
|
||||
}
|
89
internal/distributed/proxy/httpserver/handler_test.go
Normal file
89
internal/distributed/proxy/httpserver/handler_test.go
Normal file
@ -0,0 +1,89 @@
|
||||
package httpserver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gin-gonic/gin/binding"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type mockProxyComponent struct {
|
||||
// wrap the interface to avoid implement not used func.
|
||||
// and to let not implemented call panics
|
||||
// implement the method you want to mock
|
||||
types.ProxyComponent
|
||||
}
|
||||
|
||||
func (mockProxyComponent) Dummy(ctx context.Context, request *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestHandlers(t *testing.T) {
|
||||
mockProxy := mockProxyComponent{}
|
||||
h := NewHandlers(mockProxy)
|
||||
testEngine := gin.New()
|
||||
h.RegisterRoutesTo(testEngine)
|
||||
|
||||
t.Run("handleGetHealth default json ok", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodGet, "/health", nil)
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
assert.Equal(t, w.Body.Bytes(), []byte(`{"status":"ok"}`))
|
||||
})
|
||||
t.Run("handleGetHealth accept yaml ok", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodGet, "/health", nil)
|
||||
req.Header = http.Header{
|
||||
"Accept": []string{binding.MIMEYAML},
|
||||
}
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
assert.Equal(t, w.Body.Bytes(), []byte("status: ok\n"))
|
||||
})
|
||||
t.Run("handlePostDummy parsejson failed 400", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/dummy", nil)
|
||||
req.Header = http.Header{
|
||||
"Content-Type": []string{binding.MIMEJSON},
|
||||
}
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusBadRequest, w.Code)
|
||||
})
|
||||
t.Run("handlePostDummy parseyaml failed 400", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/dummy", nil)
|
||||
req.Header = http.Header{
|
||||
"Content-Type": []string{binding.MIMEYAML},
|
||||
}
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusBadRequest, w.Code)
|
||||
})
|
||||
t.Run("handlePostDummy default json ok", func(t *testing.T) {
|
||||
bodyBytes := []byte("{}")
|
||||
req := httptest.NewRequest(http.MethodPost, "/dummy", bytes.NewReader(bodyBytes))
|
||||
req.Header = http.Header{
|
||||
"Content-Type": []string{binding.MIMEJSON},
|
||||
}
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
})
|
||||
t.Run("handlePostDummy yaml ok", func(t *testing.T) {
|
||||
bodyBytes := []byte("---")
|
||||
req := httptest.NewRequest(http.MethodPost, "/dummy", bytes.NewReader(bodyBytes))
|
||||
req.Header = http.Header{
|
||||
"Content-Type": []string{binding.MIMEYAML},
|
||||
}
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
})
|
||||
}
|
53
internal/distributed/proxy/httpserver/wrapper.go
Normal file
53
internal/distributed/proxy/httpserver/wrapper.go
Normal file
@ -0,0 +1,53 @@
|
||||
package httpserver
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gin-gonic/gin/binding"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
)
|
||||
|
||||
var (
|
||||
errBadRequest = errors.New("bad request")
|
||||
)
|
||||
|
||||
// handlerFunc handles http request with gin context
|
||||
type handlerFunc func(c *gin.Context) (interface{}, error)
|
||||
|
||||
// ErrResponse of server
|
||||
type ErrResponse = commonpb.Status
|
||||
|
||||
// wrapHandler wraps a handlerFunc into a gin.HandlerFunc
|
||||
func wrapHandler(handle handlerFunc) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
data, err := handle(c)
|
||||
// format body by accept header, protobuf marshal not supported by gin by default
|
||||
// TODO: add marshal handler to support protobuf response
|
||||
formatOffered := []string{binding.MIMEJSON, binding.MIMEYAML, binding.MIMEXML}
|
||||
bodyFormatNegotiate := gin.Negotiate{
|
||||
Offered: formatOffered,
|
||||
Data: data,
|
||||
}
|
||||
if err != nil {
|
||||
switch {
|
||||
case errors.Is(err, errBadRequest):
|
||||
bodyFormatNegotiate.Data = ErrResponse{
|
||||
ErrorCode: commonpb.ErrorCode_IllegalArgument,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
c.Negotiate(http.StatusBadRequest, bodyFormatNegotiate)
|
||||
return
|
||||
default:
|
||||
bodyFormatNegotiate.Data = ErrResponse{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
c.Negotiate(http.StatusInternalServerError, bodyFormatNegotiate)
|
||||
return
|
||||
}
|
||||
}
|
||||
c.Negotiate(http.StatusOK, bodyFormatNegotiate)
|
||||
}
|
||||
}
|
58
internal/distributed/proxy/httpserver/wrapper_test.go
Normal file
58
internal/distributed/proxy/httpserver/wrapper_test.go
Normal file
@ -0,0 +1,58 @@
|
||||
package httpserver
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestWrapHandler(t *testing.T) {
|
||||
var testWrapFunc = func(c *gin.Context) (interface{}, error) {
|
||||
Case := c.Param("case")
|
||||
switch Case {
|
||||
case "0":
|
||||
return gin.H{"status": "ok"}, nil
|
||||
case "1":
|
||||
return nil, errBadRequest
|
||||
case "2":
|
||||
return nil, errors.New("internal err")
|
||||
}
|
||||
panic("shall not reach")
|
||||
}
|
||||
wrappedHandler := wrapHandler(testWrapFunc)
|
||||
testEngine := gin.New()
|
||||
testEngine.GET("/test/:case", wrappedHandler)
|
||||
|
||||
t.Run("status ok", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodGet, "/test/0", nil)
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
})
|
||||
|
||||
t.Run("err notfound", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodGet, "/test", nil)
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusNotFound, w.Code)
|
||||
})
|
||||
|
||||
t.Run("err bad request", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodGet, "/test/1", nil)
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusBadRequest, w.Code)
|
||||
})
|
||||
|
||||
t.Run("err internal", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodGet, "/test/2", nil)
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusInternalServerError, w.Code)
|
||||
})
|
||||
|
||||
}
|
@ -21,13 +21,16 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
ot "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
|
||||
dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
|
||||
icc "github.com/milvus-io/milvus/internal/distributed/indexcoord/client"
|
||||
"github.com/milvus-io/milvus/internal/distributed/proxy/httpserver"
|
||||
qcc "github.com/milvus-io/milvus/internal/distributed/querycoord/client"
|
||||
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
@ -51,6 +54,7 @@ import (
|
||||
)
|
||||
|
||||
var Params paramtable.GrpcServerConfig
|
||||
var HTTPParams paramtable.HTTPConfig
|
||||
|
||||
// Server is the Proxy Server
|
||||
type Server struct {
|
||||
@ -58,6 +62,9 @@ type Server struct {
|
||||
wg sync.WaitGroup
|
||||
proxy types.ProxyComponent
|
||||
grpcServer *grpc.Server
|
||||
httpServer *http.Server
|
||||
// avoid race
|
||||
httpServerMtx sync.Mutex
|
||||
|
||||
grpcErrChan chan error
|
||||
|
||||
@ -87,6 +94,27 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
|
||||
return server, err
|
||||
}
|
||||
|
||||
// startHTTPServer starts the http server, panic when failed
|
||||
func (s *Server) startHTTPServer(port int) {
|
||||
defer s.wg.Done()
|
||||
ginHandler := gin.Default()
|
||||
apiv1 := ginHandler.Group("/api/v1")
|
||||
httpserver.NewHandlers(s.proxy).RegisterRoutesTo(apiv1)
|
||||
s.httpServerMtx.Lock()
|
||||
s.httpServer = &http.Server{
|
||||
Addr: fmt.Sprintf(":%d", port),
|
||||
Handler: ginHandler,
|
||||
ReadTimeout: HTTPParams.ReadTimeout,
|
||||
WriteTimeout: HTTPParams.WriteTimeout,
|
||||
}
|
||||
s.httpServerMtx.Unlock()
|
||||
if err := s.httpServer.ListenAndServe(); err != nil {
|
||||
if err != http.ErrServerClosed {
|
||||
panic("failed to start http server: " + err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) startGrpcLoop(grpcPort int) {
|
||||
defer s.wg.Done()
|
||||
|
||||
@ -159,6 +187,8 @@ func (s *Server) Run() error {
|
||||
func (s *Server) init() error {
|
||||
Params.InitOnce(typeutil.ProxyRole)
|
||||
log.Debug("Proxy init service's parameter table done")
|
||||
HTTPParams.InitOnce()
|
||||
log.Debug("Proxy init http server's parameter table done")
|
||||
|
||||
if !funcutil.CheckPortAvailable(Params.Port) {
|
||||
Params.Port = funcutil.GetAvailablePort()
|
||||
@ -188,7 +218,12 @@ func (s *Server) init() error {
|
||||
log.Warn("failed to start Proxy's grpc server", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("grpc server of Proxy has been started")
|
||||
log.Debug("grpc server of proxy has been started")
|
||||
if HTTPParams.Enabled {
|
||||
log.Info("start http server of proxy", zap.Int("port", HTTPParams.Port))
|
||||
s.wg.Add(1)
|
||||
go s.startHTTPServer(HTTPParams.Port)
|
||||
}
|
||||
|
||||
if s.rootCoordClient == nil {
|
||||
var err error
|
||||
@ -347,10 +382,26 @@ func (s *Server) Stop() error {
|
||||
defer s.etcdCli.Close()
|
||||
}
|
||||
|
||||
if s.grpcServer != nil {
|
||||
log.Debug("Graceful stop grpc server...")
|
||||
s.grpcServer.GracefulStop()
|
||||
}
|
||||
gracefulWg := sync.WaitGroup{}
|
||||
gracefulWg.Add(1)
|
||||
go func() {
|
||||
defer gracefulWg.Done()
|
||||
s.httpServerMtx.Lock()
|
||||
defer s.httpServerMtx.Unlock()
|
||||
if s.httpServer != nil {
|
||||
log.Debug("Graceful stop http server...")
|
||||
s.httpServer.Shutdown(context.TODO())
|
||||
}
|
||||
}()
|
||||
gracefulWg.Add(1)
|
||||
go func() {
|
||||
defer gracefulWg.Done()
|
||||
if s.grpcServer != nil {
|
||||
log.Debug("Graceful stop grpc server...")
|
||||
s.grpcServer.GracefulStop()
|
||||
}
|
||||
}()
|
||||
gracefulWg.Wait()
|
||||
|
||||
err = s.proxy.Stop()
|
||||
if err != nil {
|
||||
|
@ -880,3 +880,25 @@ func Test_NewServer(t *testing.T) {
|
||||
err = server.Stop()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func Test_NewServer_HTTPServerDisabled(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
server, err := NewServer(ctx, nil)
|
||||
assert.NotNil(t, server)
|
||||
assert.Nil(t, err)
|
||||
|
||||
server.proxy = &MockProxy{}
|
||||
server.rootCoordClient = &MockRootCoord{}
|
||||
server.indexCoordClient = &MockIndexCoord{}
|
||||
server.queryCoordClient = &MockQueryCoord{}
|
||||
server.dataCoordClient = &MockDataCoord{}
|
||||
|
||||
HTTPParams.InitOnce()
|
||||
HTTPParams.Enabled = false
|
||||
|
||||
err = server.Run()
|
||||
assert.Nil(t, err)
|
||||
assert.Nil(t, server.httpServer)
|
||||
err = server.Stop()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
50
internal/util/paramtable/http_param.go
Normal file
50
internal/util/paramtable/http_param.go
Normal file
@ -0,0 +1,50 @@
|
||||
package paramtable
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type HTTPConfig struct {
|
||||
BaseTable
|
||||
|
||||
once sync.Once
|
||||
Enabled bool
|
||||
Port int
|
||||
ReadTimeout time.Duration
|
||||
WriteTimeout time.Duration
|
||||
}
|
||||
|
||||
// InitOnce initialize HTTPConfig
|
||||
func (p *HTTPConfig) InitOnce() {
|
||||
p.once.Do(func() {
|
||||
p.init()
|
||||
})
|
||||
}
|
||||
|
||||
func (p *HTTPConfig) init() {
|
||||
p.BaseTable.Init()
|
||||
|
||||
p.initHTTPEnabled()
|
||||
p.initHTTPPort()
|
||||
p.initHTTPReadTimeout()
|
||||
p.initHTTPWriteTimeout()
|
||||
}
|
||||
|
||||
func (p *HTTPConfig) initHTTPEnabled() {
|
||||
p.Enabled = p.ParseBool("proxy.http.enabled", true)
|
||||
}
|
||||
|
||||
func (p *HTTPConfig) initHTTPPort() {
|
||||
p.Port = p.ParseIntWithDefault("proxy.http.port", 8080)
|
||||
}
|
||||
|
||||
func (p *HTTPConfig) initHTTPReadTimeout() {
|
||||
interval := p.ParseIntWithDefault("proxy.http.readTimeout", 30000)
|
||||
p.ReadTimeout = time.Duration(interval) * time.Millisecond
|
||||
}
|
||||
|
||||
func (p *HTTPConfig) initHTTPWriteTimeout() {
|
||||
interval := p.ParseIntWithDefault("proxy.http.writeTimeout", 30000)
|
||||
p.WriteTimeout = time.Duration(interval) * time.Millisecond
|
||||
}
|
17
internal/util/paramtable/http_param_test.go
Normal file
17
internal/util/paramtable/http_param_test.go
Normal file
@ -0,0 +1,17 @@
|
||||
package paramtable
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestHTTPConfig_Init(t *testing.T) {
|
||||
cf := new(HTTPConfig)
|
||||
cf.InitOnce()
|
||||
assert.Equal(t, cf.Enabled, true)
|
||||
assert.Equal(t, cf.Port, 8080)
|
||||
assert.Equal(t, cf.ReadTimeout, time.Second*30)
|
||||
assert.Equal(t, cf.WriteTimeout, time.Second*30)
|
||||
}
|
Loading…
Reference in New Issue
Block a user