feat(mqtt): balance by client id (#6079)

This commit is contained in:
罗泽轩 2022-01-14 16:47:18 +08:00 committed by GitHub
parent 92ac6d901d
commit e69f9bee98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 171 additions and 10 deletions

View File

@ -143,14 +143,16 @@ do
var_x_forwarded_proto = true,
}
-- sort in alphabetical
local apisix_var_names = {
balancer_ip = true,
balancer_port = true,
consumer_name = true,
mqtt_client_id = true,
route_id = true,
route_name = true,
service_id = true,
service_name = true,
consumer_name = true,
balancer_ip = true,
balancer_port = true,
}
local mt = {

View File

@ -29,6 +29,7 @@ local schema = {
protocol_name = {type = "string"},
protocol_level = {type = "integer"},
upstream = {
description = "Deprecated. We should configure upstream outside of the plugin",
type = "object",
properties = {
ip = {type = "string"}, -- deprecated, use "host" instead
@ -57,13 +58,7 @@ local _M = {
function _M.check_schema(conf)
local ok, err = core.schema.check(schema, conf)
if not ok then
return false, err
end
return true
return core.schema.check(schema, conf)
end
@ -185,6 +180,11 @@ function _M.preread(conf, ctx)
core.log.info("mqtt client id: ", res.client_id)
-- when client id is missing, fallback to balance by client IP
if res.client_id ~= "" then
ctx.mqtt_client_id = res.client_id
end
if not conf.upstream then
return
end

View File

@ -34,6 +34,7 @@ List in alphabetical order:
| graphql_name | the [operation name](https://graphql.org/learn/queries/#operation-name) of GraphQL | HeroComparison |
| graphql_operation | the operation type of GraphQL | mutation |
| graphql_root_fields | the top level fields of GraphQL | ["hero"] |
| mqtt_client_id | the client id in MQTT protocol | |
| route_id | id of `route` | |
| route_name | name of `route` | |
| service_id | id of `service` | |

View File

@ -90,6 +90,38 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f03
In case Docker is used in combination with MacOS `host.docker.internal` is the right parameter for `host`.
This plugin exposes a variable `mqtt_client_id`, and we can use it to load balance via the client id. For example:
```shell
curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"plugins": {
"mqtt-proxy": {
"protocol_name": "MQTT",
"protocol_level": 4
}
},
"upstream": {
"type": "chash",
"key": "mqtt_client_id",
"nodes": [
{
"host": "127.0.0.1",
"port": 1995,
"weight": 1
},
{
"host": "127.0.0.2",
"port": 1995,
"weight": 1
}
]
}
}'
```
MQTT connections with different client ID will be forwarded to different node via the consistent hash algorithm. If the client ID is missing, we will balance via client IP instead.
## Delete Plugin
```shell

View File

@ -88,6 +88,38 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f03
在 Docker 与 MacOS 结合使用的情况下,`host.docker.internal` 是 `host` 的正确参数。
这个插件暴露了一个变量 `mqtt_client_id`,我们可以用它来通过客户端 ID 进行负载均衡。比如说:
```shell
curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"plugins": {
"mqtt-proxy": {
"protocol_name": "MQTT",
"protocol_level": 4
}
},
"upstream": {
"type": "chash",
"key": "mqtt_client_id",
"nodes": [
{
"host": "127.0.0.1",
"port": 1995,
"weight": 1
},
{
"host": "127.0.0.2",
"port": 1995,
"weight": 1
}
]
}
}'
```
不同客户端 ID 的 MQTT 连接将通过一致性哈希算法被转发到不同的节点。如果客户端 ID 为空,我们将通过客户端 IP 进行均衡。
#### 禁用插件
当你想去掉插件的时候,很简单,在插件的配置中把对应的 json 配置删除即可,无须重启服务,即刻生效:

View File

@ -398,3 +398,97 @@ qr/mqtt client id: \S+/
mqtt client id: clint-111
--- no_error_log
[error]
=== TEST 17: balance with mqtt_client_id
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/stream_routes/1',
ngx.HTTP_PUT,
[[{
"remote_addr": "127.0.0.1",
"server_port": 1985,
"plugins": {
"mqtt-proxy": {
"protocol_name": "MQTT",
"protocol_level": 5
}
},
"upstream": {
"type": "chash",
"key": "mqtt_client_id",
"nodes": [
{
"host": "0.0.0.0",
"port": 1995,
"weight": 1
},
{
"host": "127.0.0.1",
"port": 1995,
"weight": 1
}
]
}
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]
=== TEST 18: hit route with empty id
--- stream_request eval
"\x10\x0d\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x00"
--- stream_response
hello world
--- grep_error_log eval
qr/(mqtt client id: \w+|proxy request to \S+)/
--- grep_error_log_out
proxy request to 127.0.0.1:1995
--- no_error_log
[error]
=== TEST 19: hit route with different client id, part 1
--- stream_request eval
"\x10\x0e\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x01\x66"
--- stream_response
hello world
--- grep_error_log eval
qr/(mqtt client id: \w+|proxy request to \S+)/
--- grep_error_log_out
mqtt client id: f
proxy request to 0.0.0.0:1995
--- no_error_log
[error]
=== TEST 20: hit route with different client id, part 2
--- stream_request eval
"\x10\x0e\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x01\x67"
--- stream_response
hello world
--- grep_error_log eval
qr/(mqtt client id: \w+|proxy request to \S+)/
--- grep_error_log_out
mqtt client id: g
proxy request to 127.0.0.1:1995
--- no_error_log
[error]