feat(redis): support pipeline (#6959)

Signed-off-by: spacewander <spacewanderlzx@gmail.com>
This commit is contained in:
罗泽轩 2022-05-05 10:46:01 +08:00 committed by GitHub
parent f78d045983
commit ea3828afef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 121 additions and 36 deletions

View File

@ -40,6 +40,8 @@ local PREFIX_ERR = str_byte("-")
function _M.init_downstream(session)
session.req_id_seq = 0
session.resp_id_seq = 0
return xrpc_socket.downstream.socket()
end
@ -73,13 +75,13 @@ local function read_len(sk)
end
local function read_req(sk, ctx)
local function read_req(session, sk)
local narg, err = read_len(sk)
if not narg then
return nil, err
end
ctx.cmd_line = core.table.new(narg, 0)
local cmd_line = core.tablepool.fetch("xrpc_redis_cmd_line", narg, 0)
for i = 1, narg do
local n, err = read_len(sk)
@ -110,11 +112,16 @@ local function read_req(sk, ctx)
s = ffi_str(p, n)
end
ctx.cmd_line[i] = s
cmd_line[i] = s
end
session.req_id_seq = session.req_id_seq + 1
local ctx = sdk.get_req_ctx(session, session.req_id_seq)
ctx.cmd_line = cmd_line
ctx.cmd = ctx.cmd_line[1]
return true
local pipelined = sk:has_pending_data()
return true, nil, pipelined
end
@ -131,7 +138,6 @@ local function read_reply(sk)
local size = tonumber(ffi_str(line + 1, n - 1))
if size < 0 then
-- return null
return true
end
@ -144,7 +150,6 @@ local function read_reply(sk)
elseif prefix == PREFIX_STA then -- char '+'
-- print("status reply")
-- return sub(line, 2)
return true
elseif prefix == PREFIX_ARR then -- char '*'
@ -152,38 +157,23 @@ local function read_reply(sk)
-- print("multi-bulk reply: ", narr)
if narr < 0 then
-- return null
return true
end
local vals = core.table.new(n, 0)
local nvals = 0
for i = 1, narr do
local res, err = read_reply(sk)
if res then
nvals = nvals + 1
vals[nvals] = res
elseif res == nil then
if res == nil then
return nil, err
else
-- be a valid redis error value
nvals = nvals + 1
vals[nvals] = {false, err}
end
end
return vals
return true
elseif prefix == PREFIX_INT then -- char ':'
-- print("integer reply")
-- return tonumber(str_sub(line, 2))
return true
elseif prefix == PREFIX_ERR then -- char '-'
-- print("error reply: ", n)
-- return false, str_sub(line, 2)
return true
else
@ -192,17 +182,53 @@ local function read_reply(sk)
end
local function handle_reply(session, sk)
local ok, err = read_reply(sk)
if not ok then
return nil, err
end
-- TODO: don't update resp_id_seq if the reply is subscribed msg
session.resp_id_seq = session.resp_id_seq + 1
local ctx = sdk.get_req_ctx(session, session.resp_id_seq)
return ctx
end
function _M.from_downstream(session, downstream)
local ctx = sdk.get_req_ctx(session, 0)
local ok, err = read_req(downstream, ctx)
local read_pipeline = false
while true do
local ok, err, pipelined = read_req(session, downstream)
if not ok then
if err ~= "timeout" and err ~= "closed" then
core.log.error("failed to read request: ", err)
end
if read_pipeline and err == "timeout" then
break
end
return DECLINED
end
return OK, ctx
if not pipelined then
break
end
if not read_pipeline then
read_pipeline = true
-- set minimal read timeout to read pipelined data
downstream:settimeouts(0, 0, 1)
end
end
if read_pipeline then
-- set timeout back
downstream:settimeouts(0, 0, 0)
end
return OK
end
@ -236,8 +262,13 @@ function _M.to_upstream(session, ctx, downstream, upstream)
return DECLINED
end
local p, err = read_reply(upstream)
if p == nil then
return OK
end
function _M.from_upstream(session, downstream, upstream)
local ctx, err = handle_reply(session, upstream)
if ctx == nil then
core.log.error("failed to handle upstream: ", err)
return DECLINED
end
@ -248,12 +279,13 @@ function _M.to_upstream(session, ctx, downstream, upstream)
return DECLINED
end
return DONE
return DONE, ctx
end
function _M.log(session, ctx)
-- TODO
core.tablepool.release("xrpc_redis_cmd_line", ctx.cmd_line)
ctx.cmd_line = nil
end

55
t/xrpc/redis.t vendored
View File

@ -40,7 +40,7 @@ _EOC_
}
if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
$block->set_value("no_error_log", "[error]");
$block->set_value("no_error_log", "[error]\nRPC is not finished");
}
if (!defined $block->request) {
@ -50,6 +50,7 @@ _EOC_
$block;
});
worker_connections(1024);
run_tests;
__DATA__
@ -204,3 +205,55 @@ hget animals: bark
--- response_body eval
"\r\n" x 16777216
--- stream_conf_enable
=== TEST 5: pipeline
--- config
location /t {
content_by_lua_block {
local cjson = require("cjson")
local redis = require "resty.redis"
local t = {}
for i = 1, 180 do
local th = assert(ngx.thread.spawn(function(i)
local red = redis:new()
local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
if not ok then
ngx.say("failed to connect: ", err)
return
end
red:init_pipeline()
red:set("mark_" .. i, i)
red:get("mark_" .. i)
red:get("counter")
for j = 1, 4 do
red:incr("counter")
end
local results, err = red:commit_pipeline()
if not results then
ngx.say("failed to commit: ", err)
return
end
local begin = tonumber(results[3])
for j = 1, 4 do
local incred = results[3 + j]
if incred ~= results[2 + j] + 1 then
ngx.log(ngx.ERR, cjson.encode(results))
end
end
end, i))
table.insert(t, th)
end
for i, th in ipairs(t) do
ngx.thread.wait(th)
end
}
}
--- response_body
--- stream_conf_enable