diff --git a/apisix/plugin.lua b/apisix/plugin.lua index d9d8e2d1..7dead03c 100644 --- a/apisix/plugin.lua +++ b/apisix/plugin.lua @@ -17,6 +17,7 @@ local require = require local core = require("apisix.core") local config_util = require("apisix.core.config_util") +local ngx_exit = ngx.exit local pkg_loaded = package.loaded local sort_tab = table.sort local pcall = pcall @@ -27,6 +28,7 @@ local local_plugins = core.table.new(32, 0) local ngx = ngx local tostring = tostring local error = error +local is_http = ngx.config.subsystem == "http" local local_plugins_hash = core.table.new(0, 32) local stream_local_plugins = core.table.new(32, 0) local stream_local_plugins_hash = core.table.new(0, 32) @@ -276,8 +278,6 @@ local function trace_plugins_info_for_debug(plugins) return end - local is_http = ngx.config.subsystem == "http" - if not plugins then if is_http and not ngx.headers_sent then core.response.add_header("Apisix-Plugins", "no plugin") @@ -641,11 +641,19 @@ function _M.run_plugin(phase, plugins, api_ctx) if phase_func then local code, body = phase_func(plugins[i + 1], api_ctx) if code or body then - if code >= 400 then - core.log.warn(plugins[i].name, " exits with http status code ", code) - end + if is_http then + if code >= 400 then + core.log.warn(plugins[i].name, " exits with http status code ", code) + end - core.response.exit(code, body) + core.response.exit(code, body) + else + if code >= 400 then + core.log.warn(plugins[i].name, " exits with status code ", code) + end + + ngx_exit(1) + end end end end diff --git a/apisix/stream/plugins/mqtt-proxy.lua b/apisix/stream/plugins/mqtt-proxy.lua index f7cfd889..dfd05ae2 100644 --- a/apisix/stream/plugins/mqtt-proxy.lua +++ b/apisix/stream/plugins/mqtt-proxy.lua @@ -16,6 +16,7 @@ -- local core = require("apisix.core") local upstream = require("apisix.upstream") +local ipmatcher = require("resty.ipmatcher") local bit = require("bit") local ngx = ngx local ngx_exit = ngx.exit @@ -31,9 +32,14 @@ local schema = { upstream = { type = "object", properties = { - ip = {type = "string"}, + ip = {type = "string"}, -- deprecated, use "host" instead + host = {type = "string"}, port = {type = "number"}, - } + }, + oneOf = { + {required = {"host", "port"}}, + {required = {"ip", "port"}}, + }, } }, required = {"protocol_name", "protocol_level", "upstream"}, @@ -159,16 +165,38 @@ function _M.preread(conf, ctx) core.log.info("mqtt client id: ", res.client_id) + local host = conf.upstream.host + if not host then + host = conf.upstream.ip + end + + if conf.host_is_domain == nil then + conf.host_is_domain = not ipmatcher.parse_ipv4(host) + and not ipmatcher.parse_ipv6(host) + end + + if conf.host_is_domain then + local ip, err = core.resolver.parse_domain(host) + if not ip then + core.log.error("failed to parse host ", host, ", err: ", err) + return 500 + end + + host = ip + end + local up_conf = { type = "roundrobin", nodes = { - {host = conf.upstream.ip, port = conf.upstream.port, weight = 1}, + {host = host, port = conf.upstream.port, weight = 1}, } } local ok, err = upstream.check_schema(up_conf) if not ok then - return 500, err + core.log.error("failed to check schema ", core.json.delay_encode(up_conf), + ", err: ", err) + return 500 end local matched_route = ctx.matched_route diff --git a/docs/en/latest/plugins/mqtt-proxy.md b/docs/en/latest/plugins/mqtt-proxy.md index ab7f502f..e90a0713 100644 --- a/docs/en/latest/plugins/mqtt-proxy.md +++ b/docs/en/latest/plugins/mqtt-proxy.md @@ -41,7 +41,8 @@ And this plugin both support MQTT protocol [3.1.*](http://docs.oasis-open.org/mq | -------------- | ------- | ----------- | ------- | ----- | -------------------------------------------------------------------------------------- | | protocol_name | string | required | | | Name of protocol, should be `MQTT` in normal. | | protocol_level | integer | required | | | Level of protocol, it should be `4` for MQTT `3.1.*`. it should be `5` for MQTT `5.0`. | -| upstream.ip | string | required | | | IP address of upstream, will forward current request to. | +| upstream.host | string | required | | | the IP or host of upstream, will forward current request to. | +| upstream.ip | string | deprecated | | | Use "host" instead. IP address of upstream, will forward current request to.| | upstream.port | number | required | | | Port of upstream, will forward current request to. | ## How To Enable @@ -74,7 +75,7 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f03 "protocol_name": "MQTT", "protocol_level": 4, "upstream": { - "ip": "127.0.0.1", + "host": "127.0.0.1", "port": 1980 } } diff --git a/docs/zh/latest/plugins/mqtt-proxy.md b/docs/zh/latest/plugins/mqtt-proxy.md index 5eb26612..7a848c06 100644 --- a/docs/zh/latest/plugins/mqtt-proxy.md +++ b/docs/zh/latest/plugins/mqtt-proxy.md @@ -36,16 +36,12 @@ title: mqtt-proxy ## 属性 -* `protocol_name`: 必选,协议名称,正常情况下应为“ MQTT” 。 -* `protocol_level`: 必选,协议级别,MQTT `3.1.*` 应为 “4” ,MQTT `5.0` 应该是“5”。 -* `upstream.ip`: 必选,将当前请求转发到的上游的 IP 地址, -* `upstream.port`: 必选,将当前请求转发到的上游的 端口, - | 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 | | -------------- | ------- | ------ | ------ | ------ | ------------------------------------------------------ | | protocol_name | string | 必须 | | | 协议名称,正常情况下应为“ MQTT” | | protocol_level | integer | 必须 | | | 协议级别,MQTT `3.1.*` 应为 `4` ,MQTT `5.0` 应是`5`。 | -| upstream.ip | string | 必须 | | | 将当前请求转发到的上游的 IP 地址 | +| upstream.host | string | 必须 | | | 将当前请求转发到的上游的 IP 地址或域名 | +| upstream.ip | string | 废弃 | | | 推荐使用“host”代替。将当前请求转发到的上游的 IP 地址 | | upstream.port | number | 必须 | | | 将当前请求转发到的上游的端口 | ## 如何启用 @@ -77,7 +73,7 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f03 "protocol_name": "MQTT", "protocol_level": 4, "upstream": { - "ip": "127.0.0.1", + "host": "127.0.0.1", "port": 1980 } } diff --git a/t/stream-plugin/mqtt-proxy.t b/t/stream-plugin/mqtt-proxy.t index 82f5453b..5e74823b 100644 --- a/t/stream-plugin/mqtt-proxy.t +++ b/t/stream-plugin/mqtt-proxy.t @@ -17,7 +17,7 @@ use t::APISIX 'no_plan'; -repeat_each(1); +repeat_each(2); no_long_string(); no_shuffle(); no_root_location(); @@ -41,7 +41,7 @@ __DATA__ "protocol_name": "MQTT", "protocol_level": 4, "upstream": { - "ip": "127.0.0.1", + "host": "127.0.0.1", "port": 1995 } } @@ -99,7 +99,7 @@ hello world "protocol_name": "MQTT", "protocol_level": 4, "upstream": { - "ip": "127.0.0.1", + "host": "127.0.0.1", "port": 1995 } } @@ -132,3 +132,135 @@ receive stream response error: connection reset by peer receive stream response error: connection reset by peer --- error_log match(): not hit any route + + + +=== TEST 6: check schema +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "remote_addr": "127.0.0.1", + "server_port": 1985, + "plugins": { + "mqtt-proxy": { + "protocol_name": "MQTT", + "protocol_level": 4, + "upstream": { + "host": "127.0.0.1" + } + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.print(body) + } + } +--- request +GET /t +--- error_code: 400 +--- response_body +{"error_msg":"failed to check the configuration of stream plugin [mqtt-proxy]: property \"upstream\" validation failed: value should match only one schema, but matches none"} + + + +=== TEST 7: set route with host +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "remote_addr": "127.0.0.1", + "server_port": 1985, + "plugins": { + "mqtt-proxy": { + "protocol_name": "MQTT", + "protocol_level": 4, + "upstream": { + "host": "localhost", + "port": 1995 + } + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 8: hit route +--- stream_enable +--- stream_request eval +"\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f" +--- stream_response +hello world +--- no_error_log +[error] + + + +=== TEST 9: set route with invalid host +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "remote_addr": "127.0.0.1", + "server_port": 1985, + "plugins": { + "mqtt-proxy": { + "protocol_name": "MQTT", + "protocol_level": 4, + "upstream": { + "host": "loc", + "port": 1995 + } + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 10: hit route +--- stream_enable +--- stream_request eval +"\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f" +--- error_log +failed to parse domain: loc, error: