feat(redis): support pubsub (#7031)

Signed-off-by: spacewander <spacewanderlzx@gmail.com>
This commit is contained in:
罗泽轩 2022-05-13 16:04:39 +08:00 committed by GitHub
parent 4690feb421
commit 025710564a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 321 additions and 9 deletions

View File

@ -153,7 +153,12 @@ local function read_req(session, sk)
end
local s = ffi_str(p, n)
cmd_line[1] = s
local cmd = s:lower()
cmd_line[1] = cmd
if cmd == "subscribe" or cmd == "psubscribe" then
session.in_pub_sub = true
end
local key_finder
local matcher = session.matcher
@ -209,7 +214,7 @@ local function read_req(session, sk)
session.req_id_seq = session.req_id_seq + 1
local ctx = sdk.get_req_ctx(session, session.req_id_seq)
ctx.cmd_line = cmd_line
ctx.cmd = ctx.cmd_line[1]
ctx.cmd = cmd
local pipelined = sk:has_pending_data()
@ -228,7 +233,37 @@ local function read_req(session, sk)
end
local function read_reply(sk)
local function read_subscribe_reply(sk)
local line, err, n = read_line(sk)
if not line then
return nil, err
end
local prefix = line[0]
if prefix == PREFIX_STR then -- char '$'
local size = tonumber(ffi_str(line + 1, n - 1))
if size < 0 then
return true
end
local p, err = sk:read(size + 2)
if not p then
return nil, err
end
return ffi_str(p, size)
elseif prefix == PREFIX_INT then -- char ':'
return tonumber(ffi_str(line + 1, n - 1))
else
return nil, str_fmt("unknown prefix: \"%s\"", prefix)
end
end
local function read_reply(sk, session)
local line, err, n = read_line(sk)
if not line then
return nil, err
@ -263,12 +298,49 @@ local function read_reply(sk)
return true
end
for i = 1, narr do
if session and session.in_pub_sub and (narr == 3 or narr == 4) then
local msg_type, err = read_subscribe_reply(sk)
if msg_type == nil then
return nil, err
end
session.pub_sub_msg_type = msg_type
local res, err = read_reply(sk)
if res == nil then
return nil, err
end
if msg_type == "unsubscribe" or msg_type == "punsubscribe" then
local n_ch, err = read_subscribe_reply(sk)
if n_ch == nil then
return nil, err
end
if n_ch == 0 then
session.in_pub_sub = -1
-- clear this flag later at the end of `handle_reply`
end
else
local n = msg_type == "pmessage" and 2 or 1
for i = 1, n do
local res, err = read_reply(sk)
if res == nil then
return nil, err
end
end
end
else
for i = 1, narr do
local res, err = read_reply(sk)
if res == nil then
return nil, err
end
end
end
return true
elseif prefix == PREFIX_INT then -- char ':'
@ -286,14 +358,31 @@ end
local function handle_reply(session, sk)
local ok, err = read_reply(sk)
local ok, err = read_reply(sk, session)
if not ok then
return nil, err
end
-- TODO: don't update resp_id_seq if the reply is subscribed msg
session.resp_id_seq = session.resp_id_seq + 1
local ctx = sdk.get_req_ctx(session, session.resp_id_seq)
local ctx
if session.in_pub_sub and session.pub_sub_msg_type then
local msg_type = session.pub_sub_msg_type
session.pub_sub_msg_type = nil
if session.resp_id_seq < session.req_id_seq then
local cur_ctx = sdk.get_req_ctx(session, session.resp_id_seq + 1)
local cmd = cur_ctx.cmd
if cmd == msg_type then
ctx = cur_ctx
session.resp_id_seq = session.resp_id_seq + 1
end
end
if session.in_pub_sub == -1 then
session.in_pub_sub = nil
end
else
session.resp_id_seq = session.resp_id_seq + 1
ctx = sdk.get_req_ctx(session, session.resp_id_seq)
end
return ctx
end
@ -371,7 +460,7 @@ end
function _M.from_upstream(session, downstream, upstream)
local ctx, err = handle_reply(session, upstream)
if ctx == nil then
if err then
core.log.error("failed to handle upstream: ", err)
return DECLINED
end

View File

@ -376,9 +376,12 @@ _EOC_
apisix.stream_init(args)
_EOC_
my $stream_extra_init_by_lua = $block->stream_extra_init_by_lua // "";
$stream_config .= <<_EOC_;
init_by_lua_block {
$stream_init_by_lua_block
$stream_extra_init_by_lua
}
init_worker_by_lua_block {
apisix.stream_init_worker()

220
t/xrpc/redis.t vendored
View File

@ -565,3 +565,223 @@ passed
--- response_body
ok
--- stream_conf_enable
=== TEST 12: publish & subscribe
--- stream_extra_init_by_lua
local cjson = require "cjson"
local redis_proto = require("apisix.stream.xrpc.protocols.redis")
redis_proto.log = function(sess, ctx)
ngx.log(ngx.WARN, "log redis request ", cjson.encode(ctx.cmd_line))
end
--- config
location /t {
content_by_lua_block {
local cjson = require "cjson"
local redis = require "resty.redis"
local red = redis:new()
local red2 = redis:new()
red:set_timeout(1000) -- 1 sec
red2:set_timeout(1000) -- 1 sec
local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
if not ok then
ngx.say("1: failed to connect: ", err)
return
end
ok, err = red2:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
if not ok then
ngx.say("2: failed to connect: ", err)
return
end
local res, err = red:subscribe("dog")
if not res then
ngx.say("1: failed to subscribe: ", err)
return
end
ngx.say("1: subscribe dog: ", cjson.encode(res))
res, err = red:subscribe("cat")
if not res then
ngx.say("1: failed to subscribe: ", err)
return
end
ngx.say("1: subscribe cat: ", cjson.encode(res))
res, err = red2:publish("dog", "Hello")
if not res then
ngx.say("2: failed to publish: ", err)
return
end
ngx.say("2: publish: ", cjson.encode(res))
res, err = red:read_reply()
if not res then
ngx.say("1: failed to read reply: ", err)
else
ngx.say("1: receive: ", cjson.encode(res))
end
red:set_timeout(10) -- 10ms
res, err = red:read_reply()
if not res then
ngx.say("1: failed to read reply: ", err)
else
ngx.say("1: receive: ", cjson.encode(res))
end
red:set_timeout(1000) -- 1s
res, err = red:unsubscribe()
if not res then
ngx.say("1: failed to unscribe: ", err)
else
ngx.say("1: unsubscribe: ", cjson.encode(res))
end
res, err = red:read_reply()
if not res then
ngx.say("1: failed to read reply: ", err)
else
ngx.say("1: receive: ", cjson.encode(res))
end
red:set_timeout(10) -- 10ms
res, err = red:read_reply()
if not res then
ngx.say("1: failed to read reply: ", err)
else
ngx.say("1: receive: ", cjson.encode(res))
end
red:set_timeout(1000) -- 1s
res, err = red:set("dog", 1)
if not res then
ngx.say("1: failed to set: ", err)
else
ngx.say("1: receive: ", cjson.encode(res))
end
red:close()
red2:close()
}
}
--- response_body_like chop
^1: subscribe dog: \["subscribe","dog",1\]
1: subscribe cat: \["subscribe","cat",2\]
2: publish: 1
1: receive: \["message","dog","Hello"\]
1: failed to read reply: timeout
1: unsubscribe: \[\["unsubscribe","(?:dog|cat)",1\],\["unsubscribe","(?:dog|cat)",0\]\]
1: failed to read reply: not subscribed
1: failed to read reply: not subscribed
1: receive: "OK"
$
--- stream_conf_enable
--- grep_error_log eval
qr/log redis request \[[^]]+\]/
--- grep_error_log_out
log redis request ["subscribe","dog"]
log redis request ["subscribe","cat"]
log redis request ["publish","dog","Hello"]
log redis request ["unsubscribe"]
log redis request ["set","dog","1"]
=== TEST 13: psubscribe & punsubscribe
--- stream_extra_init_by_lua
local cjson = require "cjson"
local redis_proto = require("apisix.stream.xrpc.protocols.redis")
redis_proto.log = function(sess, ctx)
ngx.log(ngx.WARN, "log redis request ", cjson.encode(ctx.cmd_line))
end
--- config
location /t {
content_by_lua_block {
local cjson = require "cjson"
local redis = require "resty.redis"
local red = redis:new()
local red2 = redis:new()
red:set_timeout(1000) -- 1 sec
red2:set_timeout(1000) -- 1 sec
local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
if not ok then
ngx.say("1: failed to connect: ", err)
return
end
ok, err = red2:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
if not ok then
ngx.say("2: failed to connect: ", err)
return
end
local res, err = red:psubscribe("dog*", "cat*")
if not res then
ngx.say("1: failed to subscribe: ", err)
return
end
ngx.say("1: psubscribe: ", cjson.encode(res))
res, err = red2:publish("dog1", "Hello")
if not res then
ngx.say("2: failed to publish: ", err)
return
end
ngx.say("2: publish: ", cjson.encode(res))
res, err = red:read_reply()
if not res then
ngx.say("1: failed to read reply: ", err)
else
ngx.say("1: receive: ", cjson.encode(res))
end
res, err = red:punsubscribe("cat*", "dog*")
if not res then
ngx.say("1: failed to unscribe: ", err)
else
ngx.say("1: punsubscribe: ", cjson.encode(res))
end
res, err = red:set("dog", 1)
if not res then
ngx.say("1: failed to set: ", err)
else
ngx.say("1: receive: ", cjson.encode(res))
end
red:close()
red2:close()
}
}
--- response_body_like chop
^1: psubscribe: \[\["psubscribe","dog\*",1\],\["psubscribe","cat\*",2\]\]
2: publish: 1
1: receive: \["pmessage","dog\*","dog1","Hello"\]
1: punsubscribe: \[\["punsubscribe","cat\*",1\],\["punsubscribe","dog\*",0\]\]
1: receive: "OK"
$
--- stream_conf_enable
--- grep_error_log eval
qr/log redis request \[[^]]+\]/
--- grep_error_log_out
log redis request ["psubscribe","dog*","cat*"]
log redis request ["publish","dog1","Hello"]
log redis request ["punsubscribe","cat*","dog*"]
log redis request ["set","dog","1"]