* feature: support grpc-proxy REST <-> gRPC case. (#410)

This commit is contained in:
nic-chen 2019-08-21 23:10:34 +08:00 committed by YuanSheng Wang
parent 906bbc2cb6
commit e46e8f2130
28 changed files with 1065 additions and 10 deletions

View File

@ -23,23 +23,39 @@ before_install() {
do_install() {
wget -qO - https://openresty.org/package/pubkey.gpg | sudo apt-key add -
sudo apt-get -y update --fix-missing
sudo apt-get -y install software-properties-common
sudo add-apt-repository -y "deb http://openresty.org/package/ubuntu $(lsb_release -sc) main"
sudo apt-get update
sudo apt-get install openresty-debug
sudo add-apt-repository -y ppa:longsleep/golang-backports
sudo apt-get update
sudo apt-get install golang
export GO111MOUDULE=on
export_or_prefix
sudo luarocks make --lua-dir=${OPENRESTY_PREFIX}luajit rockspec/apisix-dev-1.0-0.rockspec --tree=deps --only-deps --local
sudo luarocks install --lua-dir=${OPENRESTY_PREFIX}luajit lua-resty-libr3 --tree=deps --local
git clone https://github.com/openresty/test-nginx.git test-nginx
git clone https://github.com/membphis/test-nginx.git test-nginx
git clone https://github.com/nic-chen/grpc_server_example.git grpc_server_example
cd grpc_server_example/
go build -o grpc_server_example main.go
cd ..
}
script() {
export_or_prefix
export PATH=$OPENRESTY_PREFIX/nginx/sbin:$OPENRESTY_PREFIX/luajit/bin:$OPENRESTY_PREFIX/bin:$PATH
sudo service etcd start
./grpc_server_example/grpc_server_example &
./bin/apisix help
./bin/apisix init
./bin/apisix init_etcd

View File

@ -18,6 +18,8 @@ export_or_prefix() {
before_install() {
HOMEBREW_NO_AUTO_UPDATE=1 brew install perl cpanminus etcd luarocks openresty/brew/openresty-debug
brew upgrade go
export GO111MOUDULE=on
sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1)
export_or_prefix
luarocks install --lua-dir=${OPENRESTY_PREFIX}/luajit luacov-coveralls --local --tree=deps
@ -29,7 +31,12 @@ do_install() {
make dev
make dev_r3
git clone https://github.com/openresty/test-nginx.git test-nginx
git clone https://github.com/membphis/test-nginx.git test-nginx
git clone https://github.com/nic-chen/grpc_server_example.git grpc_server_example
cd grpc_server_example/
go build -o grpc_server_example main.go
cd ..
}
script() {
@ -38,6 +45,9 @@ script() {
luarocks install luacheck
brew services start etcd
./grpc_server_example/grpc_server_example &
make help
make init
sudo make run

View File

@ -307,3 +307,58 @@ IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
%%%%%%%%%
lua-resty-grpc-gateway
https://github.com/ysugimoto/lua-resty-grpc-gateway
MIT License
Copyright (c) 2019 Yoshiaki Sugimoto
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
%%%%%%%%%
lua-protobuf
https://github.com/starwing/lua-protobuf
MIT License
Copyright (c) 2019 Yoshiaki Sugimoto
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -52,7 +52,8 @@ check:
lua/apisix/admin/*.lua \
lua/apisix/core/*.lua \
lua/apisix/http/*.lua \
lua/apisix/plugins/*.lua > \
lua/apisix/plugins/*.lua \
lua/apisix/plugins/grpc-proxy/*.lua > \
/tmp/check.log 2>&1 || (cat /tmp/check.log && exit 1)

View File

@ -44,6 +44,7 @@ For more detailed information, see the [White Paper](https://www.iresty.com/down
- **[Limit-concurrency](doc/plugins/limit-conn.md)**
- **OpenTracing: [Zipkin](doc/plugins/zipkin.md)**
- **Monitoring and Metrics**: [Prometheus](doc/plugins/prometheus.md)
- **[gRPC-Proxy](doc/plugins/grpc-proxy.md)**REST <-> gRPC proxying.
- **Custom plugins**: Allows hooking of common phases, such as `rewrite`, `access`, `header filer`, `body filter` and `log`, also allows to hook the `balancer` stage.
- **Dashboard**: Built-in dashboard to control APISIX.
- **CLI**: start\stop\reload APISIX through the command line.

View File

@ -41,6 +41,7 @@ APISIX 通过插件机制,提供动态负载平衡、身份验证、限流限
- **[限制并发](doc/plugins/limit-conn-cn.md)**
- **OpenTracing: [Zipkin](doc/plugins/zipkin.md)**
- **监控和指标**: [Prometheus](doc/plugins/prometheus-cn.md)
- **[gRPC-Proxy](doc/plugins/grpc-proxy-cn.md)**REST <-> gRPC proxying.
- **自定义插件**: 允许挂载常见阶段,例如`rewrite``access``header filer``body filter`和`log`,还允许挂载 `balancer` 阶段。
- **控制台**: 内置控制台来操作 APISIX 集群。
- **CLI**: 使用命令行来启动、关闭和重启 APISIX。

View File

@ -128,7 +128,7 @@ http {
apisix.http_balancer_phase()
}
keepalive 32;
keepalive 320;
}
init_by_lua_block {
@ -231,6 +231,33 @@ http {
apisix.http_header_filter_phase()
}
body_filter_by_lua_block {
apisix.http_body_filter_phase()
}
log_by_lua_block {
apisix.http_log_phase()
}
}
location @grpc_pass {
access_by_lua_block {
apisix.grpc_access_phase()
}
grpc_set_header Content-Type application/grpc;
grpc_socket_keepalive on;
grpc_pass grpc://apisix_backend;
header_filter_by_lua_block {
apisix.http_header_filter_phase()
}
body_filter_by_lua_block {
apisix.http_body_filter_phase()
}
log_by_lua_block {
apisix.http_log_phase()
}

View File

@ -35,3 +35,4 @@ plugins: # plugin list
- jwt-auth
- zipkin
- ip-restriction
- grpc-proxy

View File

@ -62,7 +62,7 @@ http {
apisix.http_balancer_phase()
}
keepalive 32;
keepalive 320;
}
init_by_lua_block {
@ -141,9 +141,37 @@ http {
apisix.http_header_filter_phase()
}
body_filter_by_lua_block {
apisix.http_body_filter_phase()
}
log_by_lua_block {
apisix.http_log_phase()
}
}
location @grpc_pass {
access_by_lua_block {
apisix.grpc_access_phase()
}
grpc_set_header Content-Type application/grpc;
grpc_socket_keepalive on;
grpc_pass grpc://apisix_backend;
header_filter_by_lua_block {
apisix.http_header_filter_phase()
}
body_filter_by_lua_block {
apisix.http_body_filter_phase()
}
log_by_lua_block {
apisix.http_log_phase()
}
}
}
}

View File

@ -0,0 +1,90 @@
[English](grpc-proxy.md)
# grpc-proxy
HTTP(s) -> APISIX -> gRPC server
### Proto
#### 参数
* `content`: `.proto` 文件的内容
#### 添加proto
路径中最后的数字,会被用作 proto 的 id 做唯一标识,比如下面示例的 proto `id``1`
```shell
curl http://127.0.0.1:9080/apisix/admin/proto/1 -X PUT -d '
{
"content" : "syntax = \"proto3\";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}"
}'
```
### 参数
* `proto_id`: `.proto`内容的id.
* `service`: grpc服务名.
* `method`: grpc服务中要调用的方法名.
### 示例
#### 使用 grpc-proxy 插件
在指定 route 中,代理 grpc 服务接口:
* 注意: 这个 route 的属性`service_protocal` 必须设置为 `grpc`
* 例子所代理的 grpc 服务可参考:[grpc_server_example](https://github.com/nic-chen/grpc_server_example)
```shell
curl http://127.0.0.1:9080/apisix/admin/routes/111 -X PUT -d '
{
"methods": ["GET"],
"uri": "/grpctest",
"service_protocol": "grpc",
"plugins": {
"grpc-proxy": {
"proto_id": "1",
"service": "helloworld.Greeter",
"method": "SayHello"
}
},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:50051": 1
}
}
}'
```
#### 测试
访问上面配置的 route
```shell
$ curl -i http://127.0.0.1:9080/grpctest
HTTP/1.1 200 OK
Date: Fri, 16 Aug 2019 11:55:36 GMT
Content-Type: application/json
Transfer-Encoding: chunked
Connection: keep-alive
Server: APISIX web server
Proxy-Connection: keep-alive
{"message":"Hello world"}
```
这表示已成功代理。

91
doc/plugins/grpc-proxy.md Normal file
View File

@ -0,0 +1,91 @@
[中文](grpc-proxy-cn.md)
# grpc-proxy
HTTP(s) -> APISIX -> gRPC server
### Proto
#### Parameters
* `content`: `.proto` file's content.
#### Add a proto
Here's an example, adding a proto which `id` is `1`:
```shell
curl http://127.0.0.1:9080/apisix/admin/proto/1 -X PUT -d '
{
"content" : "syntax = \"proto3\";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}"
}'
```
### Parameters
* `proto_id`: `.proto` content id.
* `service`: the grpc service name.
* `method`: the method name of grpc service.
### example
#### enable plugin
Here's an example, to enable the grpc-proxy plugin to specified route:
* attention: the route's option `service_protocal` must be `grpc`
* the grpc server example[grpc_server_example](https://github.com/nic-chen/grpc_server_example)
```shell
curl http://127.0.0.1:9080/apisix/admin/routes/111 -X PUT -d '
{
"methods": ["GET"],
"uri": "/grpctest",
"service_protocol": "grpc",
"plugins": {
"grpc-proxy": {
"proto_id": "1",
"service": "helloworld.Greeter",
"method": "SayHello"
}
},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:50051": 1
}
}
}'
```
#### test plugin
The above configuration proxy :
```shell
curl -i http://127.0.0.1:9080/grpctest
```
response:
```
HTTP/1.1 200 OK
Date: Fri, 16 Aug 2019 11:55:36 GMT
Content-Type: application/json
Transfer-Encoding: chunked
Connection: keep-alive
Server: APISIX web server
Proxy-Connection: keep-alive
{"message":"Hello world"}
```
This means that the proxying is working.

View File

@ -142,7 +142,7 @@ function _M.http_access_phase()
local ngx_ctx = ngx.ctx
local api_ctx = ngx_ctx.api_ctx
if api_ctx == nil then
if not api_ctx then
api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
ngx_ctx.api_ctx = api_ctx
end
@ -159,6 +159,70 @@ function _M.http_access_phase()
return core.response.exit(404)
end
--
if route.value.service_protocol == "grpc" then
return ngx.exec("@grpc_pass")
end
if route.value.service_id then
-- core.log.info("matched route: ", core.json.delay_encode(route.value))
local service = service_fetch(route.value.service_id)
if not service then
core.log.error("failed to fetch service configuration by ",
"id: ", route.value.service_id)
return core.response.exit(404)
end
local changed
route, changed = plugin.merge_service_route(service, route)
api_ctx.matched_route = route
if changed then
api_ctx.conf_type = "route&service"
api_ctx.conf_version = route.modifiedIndex .. "&"
.. service.modifiedIndex
api_ctx.conf_id = route.value.id .. "&"
.. service.value.id
else
api_ctx.conf_type = "service"
api_ctx.conf_version = service.modifiedIndex
api_ctx.conf_id = service.value.id
end
else
api_ctx.conf_type = "route"
api_ctx.conf_version = route.modifiedIndex
api_ctx.conf_id = route.value.id
end
local plugins = core.tablepool.fetch("plugins", 32, 0)
api_ctx.plugins = plugin.filter(route, plugins)
run_plugin("rewrite", plugins, api_ctx)
run_plugin("access", plugins, api_ctx)
end
function _M.grpc_access_phase()
local ngx_ctx = ngx.ctx
local api_ctx = ngx_ctx.api_ctx
if not api_ctx then
api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
ngx_ctx.api_ctx = api_ctx
end
core.ctx.set_vars_meta(api_ctx)
router.router_http.match(api_ctx)
core.log.info("route: ",
core.json.delay_encode(api_ctx.matched_route, true))
local route = api_ctx.matched_route
if not route then
return core.response.exit(404)
end
if route.value.service_id then
-- core.log.info("matched route: ", core.json.delay_encode(route.value))
local service = service_fetch(route.value.service_id)
@ -198,10 +262,14 @@ function _M.http_access_phase()
end
function _M.http_header_filter_phase()
run_plugin("header_filter")
end
function _M.http_body_filter_phase()
run_plugin("body_filter")
end
function _M.http_log_phase()
local api_ctx = run_plugin("log")

View File

@ -17,6 +17,7 @@ local resources = {
schema = require("apisix.admin.schema"),
ssl = require("apisix.admin.ssl"),
plugins = require("apisix.admin.plugins"),
proto = require("apisix.admin.proto"),
}

153
lua/apisix/admin/proto.lua Normal file
View File

@ -0,0 +1,153 @@
local type = type
local ipairs = ipairs
local core = require("apisix.core")
local get_routes = require("apisix.http.router").http_routes
local get_services = require("apisix.http.service").services
local tostring = tostring
local _M = {
version = 0.1,
}
local function check_conf(id, conf, need_id)
if not conf then
return nil, {error_msg = "missing configurations"}
end
id = id or conf.id
if need_id and not id then
return nil, {error_msg = "missing proto id"}
end
if not need_id and id then
return nil, {error_msg = "wrong proto id, do not need it"}
end
if need_id and conf.id and tostring(conf.id) ~= tostring(id) then
return nil, {error_msg = "wrong proto id"}
end
core.log.info("schema: ", core.json.delay_encode(core.schema.proto))
core.log.info("conf : ", core.json.delay_encode(conf))
local ok, err = core.schema.check(core.schema.proto, conf)
if not ok then
return nil, {error_msg = "invalid configuration: " .. err}
end
return need_id and id or true
end
function _M.put(id, conf)
local id, err = check_conf(id, conf, true)
if not id then
return 400, err
end
local key = "/proto/" .. id
local res, err = core.etcd.set(key, conf)
if not res then
core.log.error("failed to put proto[", key, "]: ", err)
return 500, {error_msg = err}
end
return res.status, res.body
end
function _M.get(id)
local key = "/proto"
if id then
key = key .. "/" .. id
end
local res, err = core.etcd.get(key)
if not res then
core.log.error("failed to get proto[", key, "]: ", err)
return 500, {error_msg = err}
end
return res.status, res.body
end
function _M.post(id, conf)
local id, err = check_conf(id, conf, false)
if not id then
return 400, err
end
local key = "/proto"
-- core.log.info("key: ", key)
local res, err = core.etcd.push("/proto", conf)
if not res then
core.log.error("failed to post proto[", key, "]: ", err)
return 500, {error_msg = err}
end
return res.status, res.body
end
function _M.check_proto_used(plugins, deleting, ptype, pid)
core.log.info("plugins1: ", core.json.delay_encode(plugins, true))
if plugins then
if type(plugins) == "table" and plugins["grpc-proxy"]
and plugins["grpc-proxy"].proto_id
and tostring(plugins["grpc-proxy"].proto_id) == deleting then
return 400, {error_msg = "can not delete this proto,"
.. ptype .. " [" .. pid
.. "] is still using it now"}
end
end
end
function _M.delete(id)
if not id then
return 400, {error_msg = "missing proto id"}
end
local routes, routes_ver = get_routes()
core.log.info("routes: ", core.json.delay_encode(routes, true))
core.log.info("routes_ver: ", routes_ver)
if routes_ver and routes then
for _, route in ipairs(routes) do
if type(route) == "table" and route.value
and route.value.plugins then
return _M.check_proto_used(route.value.plugins, id, "route", route.value.id)
end
end
end
local services, services_ver = get_services()
core.log.info("services: ", core.json.delay_encode(services, true))
core.log.info("services_ver: ", services_ver)
if services_ver and services then
for _, service in ipairs(services) do
if type(service) == "table" and service.value
and service.value.plugins then
return _M.check_proto_used(service.value.plugins, id, "service", service.value.id)
end
end
end
local key = "/proto/" .. id
-- core.log.info("key: ", key)
local res, err = core.etcd.delete(key)
if not res then
core.log.error("failed to delete proto[", key, "]: ", err)
return 500, {error_msg = err}
end
return res.status, res.body
end
return _M

View File

@ -27,7 +27,7 @@ local function check_conf(id, conf, need_id)
return nil, {error_msg = "wrong route id"}
end
core.log.info("schema: ", core.json.delay_encode(core.schema.route))
core.log.info("schema: ", core.schema.route)
core.log.info("conf : ", core.json.delay_encode(conf))
local ok, err = core.schema.check(core.schema.route, conf)
if not ok then

View File

@ -1,4 +1,5 @@
local json = require('rapidjson')
local cjson = require('cjson.safe')
local schema_validator = json.SchemaValidator
local schema_doc = json.SchemaDocument
local json_doc = json.Document
@ -244,7 +245,7 @@ local upstream_schema = {
}
_M.route = [[{
local route = [[{
"type": "object",
"properties": {
"methods": {
@ -257,6 +258,9 @@ _M.route = [[{
},
"uniqueItems": true
},
"service_protocol": {
"enum": [ "grpc", "http" ]
},
"desc": {"type": "string", "maxLength": 256},
"plugins": ]] .. json.encode(plugins_schema) .. [[,
"upstream": ]] .. json.encode(upstream_schema) .. [[,
@ -288,6 +292,13 @@ _M.route = [[{
],
"additionalProperties": false
}]]
do
local route_t, err = cjson.decode(route)
if err then
error("invalid route: " .. route)
end
_M.route = cjson.encode(route_t)
end
_M.service = {
@ -345,4 +356,16 @@ _M.ssl = {
}
_M.proto = {
type = "object",
properties = {
content = {
type = "string", minLength = 1, maxLength = 4096
}
},
required = {"content"},
additionalProperties = false,
}
return _M

View File

@ -10,6 +10,7 @@ function _M.init_worker()
local conf = local_conf()
local router_http_name = "r3_uri"
local router_ssl_name = "r3_sni"
if conf and conf.apisix and conf.apisix.router then
router_http_name = conf.apisix.router.http or router_http_name
router_ssl_name = conf.apisix.router.ssl or router_ssl_name

View File

@ -0,0 +1,83 @@
local ngx = ngx
local core = require("apisix.core")
local plugin_name = "grpc-proxy"
local proto = require("apisix.plugins.grpc-proxy.proto")
local request = require("apisix.plugins.grpc-proxy.request")
local response = require("apisix.plugins.grpc-proxy.response")
local schema = {
type = "object",
additionalProperties = true
}
local _M = {
version = 0.1,
priority = 506,
name = plugin_name,
schema = schema,
}
function _M.init()
proto.init()
end
function _M.check_schema(conf)
local ok, err = core.schema.check(schema, conf)
if not ok then
return false, err
end
return true
end
function _M.access(conf, ctx)
core.log.info("conf: ", core.json.delay_encode(conf))
local proto_id = conf.proto_id
if not proto_id then
core.log.error("proto id miss: ", proto_id)
return
end
local proto_obj, err = proto.fetch(proto_id)
if err then
core.log.error("proto load error: ", err)
return
end
local ok, err = request(proto_obj, conf.service, conf.method)
if not ok then
core.log.error("trasnform request error: ", err)
return
end
ctx.proto_obj = proto_obj
end
function _M.header_filter(conf, ctx)
ngx.header["Content-Type"] = "application/json"
ngx.header["Trailer"] = {"grpc-status", "grpc-message"}
end
function _M.body_filter(conf, ctx)
local proto_obj = ctx.proto_obj
if not proto_obj then
return
end
local err = response(proto_obj, conf.service, conf.method)
if err then
core.log.error("trasnform response error: ", err)
return
end
end
return _M

View File

@ -0,0 +1,64 @@
local core = require("apisix.core")
local protoc = require("protoc")
local ipairs = ipairs
local protos
local lrucache_proto = core.lrucache.new({
ttl = 300, count = 100
})
local function create_proto_obj(proto_id)
if protos.values == nil then
return nil
end
local content
for _, proto in ipairs(protos.values) do
if proto_id == proto.value.id then
content = proto.value.content
break
end
end
if not content then
return nil, "failed to find proto by id: " .. proto_id
end
local _p = protoc.new()
local res = _p:load(content)
if not res or not _p.loaded then
return nil, "failed to load proto content"
end
return _p.loaded
end
local _M = {version = 0.1}
function _M.fetch(proto_id)
return lrucache_proto(proto_id, protos.conf_version,
create_proto_obj, proto_id)
end
function _M.init()
local err
protos, err = core.config.new("/proto", {
automatic = true,
item_schema = core.schema.proto
})
if not protos then
core.log.error("failed to create etcd instance for fetching protos: ",
err)
return
end
end
return _M

View File

@ -0,0 +1,38 @@
local util = require("apisix.plugins.grpc-proxy.util")
local core = require("apisix.core")
local pb = require("pb")
local bit = require("bit")
local ngx = ngx
local string = string
local table = table
return function (proto, service, method, default_values)
core.log.info("proto: ", core.json.delay_encode(proto, true))
local m = util.find_method(proto, service, method)
if not m then
return false, "Undefined service method: " .. service .. "/" .. method
.. " end"
end
ngx.req.read_body()
local encoded = pb.encode(m.input_type,
util.map_message(m.input_type, default_values or {}))
local size = string.len(encoded)
local prefix = {
string.char(0),
string.char(bit.band(bit.rshift(size, 24), 0xFF)),
string.char(bit.band(bit.rshift(size, 16), 0xFF)),
string.char(bit.band(bit.rshift(size, 8), 0xFF)),
string.char(bit.band(size, 0xFF))
}
local message = table.concat(prefix, "") .. encoded
ngx.req.set_method(ngx.HTTP_POST)
ngx.req.set_uri("/" .. service .. "/" .. method, false)
ngx.req.set_uri_args({})
ngx.req.set_body_data(message)
return true
end

View File

@ -0,0 +1,41 @@
local util = require("apisix.plugins.grpc-proxy.util")
local core = require("apisix.core")
local pb = require("pb")
local ngx = ngx
local string = string
local table = table
return function(proto, service, method)
local m = util.find_method(proto, service, method)
if not m then
return false, "2.Undefined service method: " .. service .. "/" .. method
.. " end."
end
local chunk, eof = ngx.arg[1], ngx.arg[2]
local buffered = ngx.ctx.buffered
if not buffered then
buffered = {}
ngx.ctx.buffered = buffered
end
if chunk ~= "" then
core.table.insert(buffered, chunk)
ngx.arg[1] = nil
end
if not eof then
return
end
ngx.ctx.buffered = nil
local buffer = table.concat(buffered)
if not ngx.req.get_headers()["X-Grpc-Web"] then
buffer = string.sub(buffer, 6)
end
local decoded = pb.decode(m.output_type, buffer)
local response = core.json.encode(decoded)
ngx.arg[1] = response
end

View File

@ -0,0 +1,84 @@
local json = require("apisix.core.json")
local pb = require("pb")
local ngx = ngx
local pairs = pairs
local ipairs = ipairs
local string = string
local tonumber = tonumber
local type = type
local _M = {version = 0.1}
function _M.find_method(protos, service, method)
for k, loaded in pairs(protos) do
if type(loaded) == 'table' then
local package = loaded.package
for _, s in ipairs(loaded.service or {}) do
if package .. "." .. s.name == service then
for _, m in ipairs(s.method) do
if m.name == method then
return m
end
end
end
end
end
end
return nil
end
local function get_from_request(name, kind)
local request_table
if ngx.req.get_method() == "POST" then
if string.find(ngx.req.get_headers()["Content-Type"] or "",
"application/json", true) then
request_table = json.decode(ngx.req.get_body_data())
else
request_table = ngx.req.get_post_args()
end
else
request_table = ngx.req.get_uri_args()
end
local prefix = kind:sub(1, 3)
if prefix == "str" then
return request_table[name] or nil
end
if prefix == "int" then
if request_table[name] then
return tonumber(request_table[name])
end
end
return nil
end
function _M.map_message(field, default_values)
if not pb.type(field) then
return nil, "Field " .. field .. " is not defined"
end
local request = {}
local sub, err
for name, _, field_type in pb.fields(field) do
if field_type:sub(1, 1) == "." then
sub, err = _M.map_message(field_type, default_values)
if err then
return nil, err
end
request[name] = sub
else
request[name] = get_from_request(name, field_type) or default_values[name] or nil
end
end
return request
end
return _M

View File

@ -28,6 +28,7 @@ dependencies = {
"opentracing-openresty = 0.1",
"lua-resty-radixtree = 0.4",
"lua-resty-iputils = 0.3.0-1",
"lua-protobuf = 0.3.1",
}
build = {

27
t/APISix.pm vendored
View File

@ -8,6 +8,7 @@ repeat_each(1);
log_level('info');
no_long_string();
no_shuffle();
worker_connections(128);
my $pwd = cwd();
@ -157,6 +158,32 @@ _EOC_
apisix.http_header_filter_phase()
}
body_filter_by_lua_block {
apisix.http_body_filter_phase()
}
log_by_lua_block {
apisix.http_log_phase()
}
}
location \@grpc_pass {
access_by_lua_block {
apisix.grpc_access_phase()
}
grpc_set_header Content-Type application/grpc;
grpc_socket_keepalive on;
grpc_pass grpc://apisix_backend;
header_filter_by_lua_block {
apisix.http_header_filter_phase()
}
body_filter_by_lua_block {
apisix.http_body_filter_phase()
}
log_by_lua_block {
apisix.http_log_phase()
}

View File

@ -14,6 +14,6 @@ __DATA__
--- request
GET /apisix/admin/plugins/list
--- response_body_like eval
qr/\["limit-req","limit-count","limit-conn","key-auth","prometheus","node-status","jwt-auth","zipkin","ip-restriction"\]/
qr/\["limit-req","limit-count","limit-conn","key-auth","prometheus","node-status","jwt-auth","zipkin","ip-restriction","grpc-proxy"\]/
--- no_error_log
[error]

View File

@ -14,7 +14,7 @@ __DATA__
--- request
GET /apisix/admin/schema/route
--- response_body eval
qr/"plugins": \{"type":"object"}/
qr/"plugins":\{"type":"object"}/
--- no_error_log
[error]

1
t/debug-mode.t vendored
View File

@ -46,6 +46,7 @@ loaded plugin and sort by priority: 1003 name: limit-conn
loaded plugin and sort by priority: 1002 name: limit-count
loaded plugin and sort by priority: 1001 name: limit-req
loaded plugin and sort by priority: 1000 name: node-status
loaded plugin and sort by priority: 506 name: grpc-proxy
loaded plugin and sort by priority: 500 name: prometheus
loaded plugin and sort by priority: 0 name: example-plugin
loaded plugin and sort by priority: -1000 name: zipkin

149
t/plugin/grpc-proxy.t Normal file
View File

@ -0,0 +1,149 @@
BEGIN {
if ($ENV{TEST_NGINX_CHECK_LEAK}) {
$SkipReason = "unavailable for the hup tests";
} else {
$ENV{TEST_NGINX_USE_HUP} = 1;
undef $ENV{TEST_NGINX_USE_STAP};
}
}
use t::APISix 'no_plan';
repeat_each(1);
no_long_string();
no_shuffle();
no_root_location();
log_level('debug');
run_tests;
__DATA__
=== TEST 1: set proto(id: 1)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/proto/1',
ngx.HTTP_PUT,
[[{
"content" : "syntax = \"proto3\";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]
=== TEST 2: set routes(id: 1)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"methods": ["GET"],
"uri": "/grpctest",
"service_protocol": "grpc",
"plugins": {
"grpc-proxy": {
"proto_id": "1",
"service": "helloworld.Greeter",
"method": "SayHello"
}
},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:50051": 1
}
}
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]
=== TEST 3: hit route
--- request
GET /grpctest
--- response_body eval
qr/\{"message":"Hello "\}/
--- no_error_log
[error]
=== TEST 4: wrong service protocol
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"methods": ["GET"],
"uri": "/grpctest",
"service_protocol": "asf",
"plugins": {
"grpc-proxy": {
"proto_id": "1",
"service": "helloworld.Greeter",
"method": "SayHello"
}
},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:50051": 1
}
}
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- error_code: 400
--- no_error_log
[error]