feat(mqtt-proxy): support domain (#4391)

Signed-off-by: spacewander <spacewanderlzx@gmail.com>
This commit is contained in:
罗泽轩 2021-06-09 19:23:40 +08:00 committed by GitHub
parent f7fc8599b1
commit 8bf8cc1916
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 187 additions and 22 deletions

View File

@ -17,6 +17,7 @@
local require = require
local core = require("apisix.core")
local config_util = require("apisix.core.config_util")
local ngx_exit = ngx.exit
local pkg_loaded = package.loaded
local sort_tab = table.sort
local pcall = pcall
@ -27,6 +28,7 @@ local local_plugins = core.table.new(32, 0)
local ngx = ngx
local tostring = tostring
local error = error
local is_http = ngx.config.subsystem == "http"
local local_plugins_hash = core.table.new(0, 32)
local stream_local_plugins = core.table.new(32, 0)
local stream_local_plugins_hash = core.table.new(0, 32)
@ -276,8 +278,6 @@ local function trace_plugins_info_for_debug(plugins)
return
end
local is_http = ngx.config.subsystem == "http"
if not plugins then
if is_http and not ngx.headers_sent then
core.response.add_header("Apisix-Plugins", "no plugin")
@ -641,11 +641,19 @@ function _M.run_plugin(phase, plugins, api_ctx)
if phase_func then
local code, body = phase_func(plugins[i + 1], api_ctx)
if code or body then
if code >= 400 then
core.log.warn(plugins[i].name, " exits with http status code ", code)
end
if is_http then
if code >= 400 then
core.log.warn(plugins[i].name, " exits with http status code ", code)
end
core.response.exit(code, body)
core.response.exit(code, body)
else
if code >= 400 then
core.log.warn(plugins[i].name, " exits with status code ", code)
end
ngx_exit(1)
end
end
end
end

View File

@ -16,6 +16,7 @@
--
local core = require("apisix.core")
local upstream = require("apisix.upstream")
local ipmatcher = require("resty.ipmatcher")
local bit = require("bit")
local ngx = ngx
local ngx_exit = ngx.exit
@ -31,9 +32,14 @@ local schema = {
upstream = {
type = "object",
properties = {
ip = {type = "string"},
ip = {type = "string"}, -- deprecated, use "host" instead
host = {type = "string"},
port = {type = "number"},
}
},
oneOf = {
{required = {"host", "port"}},
{required = {"ip", "port"}},
},
}
},
required = {"protocol_name", "protocol_level", "upstream"},
@ -159,16 +165,38 @@ function _M.preread(conf, ctx)
core.log.info("mqtt client id: ", res.client_id)
local host = conf.upstream.host
if not host then
host = conf.upstream.ip
end
if conf.host_is_domain == nil then
conf.host_is_domain = not ipmatcher.parse_ipv4(host)
and not ipmatcher.parse_ipv6(host)
end
if conf.host_is_domain then
local ip, err = core.resolver.parse_domain(host)
if not ip then
core.log.error("failed to parse host ", host, ", err: ", err)
return 500
end
host = ip
end
local up_conf = {
type = "roundrobin",
nodes = {
{host = conf.upstream.ip, port = conf.upstream.port, weight = 1},
{host = host, port = conf.upstream.port, weight = 1},
}
}
local ok, err = upstream.check_schema(up_conf)
if not ok then
return 500, err
core.log.error("failed to check schema ", core.json.delay_encode(up_conf),
", err: ", err)
return 500
end
local matched_route = ctx.matched_route

View File

@ -41,7 +41,8 @@ And this plugin both support MQTT protocol [3.1.*](http://docs.oasis-open.org/mq
| -------------- | ------- | ----------- | ------- | ----- | -------------------------------------------------------------------------------------- |
| protocol_name | string | required | | | Name of protocol, should be `MQTT` in normal. |
| protocol_level | integer | required | | | Level of protocol, it should be `4` for MQTT `3.1.*`. it should be `5` for MQTT `5.0`. |
| upstream.ip | string | required | | | IP address of upstream, will forward current request to. |
| upstream.host | string | required | | | the IP or host of upstream, will forward current request to. |
| upstream.ip | string | deprecated | | | Use "host" instead. IP address of upstream, will forward current request to.|
| upstream.port | number | required | | | Port of upstream, will forward current request to. |
## How To Enable
@ -74,7 +75,7 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f03
"protocol_name": "MQTT",
"protocol_level": 4,
"upstream": {
"ip": "127.0.0.1",
"host": "127.0.0.1",
"port": 1980
}
}

View File

@ -36,16 +36,12 @@ title: mqtt-proxy
## 属性
* `protocol_name`: 必选,协议名称,正常情况下应为“ MQTT” 。
* `protocol_level`: 必选协议级别MQTT `3.1.*` 应为 “4” MQTT `5.0` 应该是“5”。
* `upstream.ip`: 必选,将当前请求转发到的上游的 IP 地址,
* `upstream.port`: 必选,将当前请求转发到的上游的 端口,
| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 |
| -------------- | ------- | ------ | ------ | ------ | ------------------------------------------------------ |
| protocol_name | string | 必须 | | | 协议名称,正常情况下应为“ MQTT” |
| protocol_level | integer | 必须 | | | 协议级别MQTT `3.1.*` 应为 `4` MQTT `5.0` 应是`5`。 |
| upstream.ip | string | 必须 | | | 将当前请求转发到的上游的 IP 地址 |
| upstream.host | string | 必须 | | | 将当前请求转发到的上游的 IP 地址或域名 |
| upstream.ip | string | 废弃 | | | 推荐使用“host”代替。将当前请求转发到的上游的 IP 地址 |
| upstream.port | number | 必须 | | | 将当前请求转发到的上游的端口 |
## 如何启用
@ -77,7 +73,7 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f03
"protocol_name": "MQTT",
"protocol_level": 4,
"upstream": {
"ip": "127.0.0.1",
"host": "127.0.0.1",
"port": 1980
}
}

View File

@ -17,7 +17,7 @@
use t::APISIX 'no_plan';
repeat_each(1);
repeat_each(2);
no_long_string();
no_shuffle();
no_root_location();
@ -41,7 +41,7 @@ __DATA__
"protocol_name": "MQTT",
"protocol_level": 4,
"upstream": {
"ip": "127.0.0.1",
"host": "127.0.0.1",
"port": 1995
}
}
@ -99,7 +99,7 @@ hello world
"protocol_name": "MQTT",
"protocol_level": 4,
"upstream": {
"ip": "127.0.0.1",
"host": "127.0.0.1",
"port": 1995
}
}
@ -132,3 +132,135 @@ receive stream response error: connection reset by peer
receive stream response error: connection reset by peer
--- error_log
match(): not hit any route
=== TEST 6: check schema
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/stream_routes/1',
ngx.HTTP_PUT,
[[{
"remote_addr": "127.0.0.1",
"server_port": 1985,
"plugins": {
"mqtt-proxy": {
"protocol_name": "MQTT",
"protocol_level": 4,
"upstream": {
"host": "127.0.0.1"
}
}
}
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.print(body)
}
}
--- request
GET /t
--- error_code: 400
--- response_body
{"error_msg":"failed to check the configuration of stream plugin [mqtt-proxy]: property \"upstream\" validation failed: value should match only one schema, but matches none"}
=== TEST 7: set route with host
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/stream_routes/1',
ngx.HTTP_PUT,
[[{
"remote_addr": "127.0.0.1",
"server_port": 1985,
"plugins": {
"mqtt-proxy": {
"protocol_name": "MQTT",
"protocol_level": 4,
"upstream": {
"host": "localhost",
"port": 1995
}
}
}
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]
=== TEST 8: hit route
--- stream_enable
--- stream_request eval
"\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f"
--- stream_response
hello world
--- no_error_log
[error]
=== TEST 9: set route with invalid host
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/stream_routes/1',
ngx.HTTP_PUT,
[[{
"remote_addr": "127.0.0.1",
"server_port": 1985,
"plugins": {
"mqtt-proxy": {
"protocol_name": "MQTT",
"protocol_level": 4,
"upstream": {
"host": "loc",
"port": 1995
}
}
}
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]
=== TEST 10: hit route
--- stream_enable
--- stream_request eval
"\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f"
--- error_log
failed to parse domain: loc, error: