mirror of
https://gitee.com/iresty/apisix.git
synced 2024-12-05 05:27:35 +08:00
refactor: introduce hold_body_chunk to dismiss repeated code (#5167)
This commit is contained in:
parent
3e188389e3
commit
178d18aa35
@ -16,6 +16,7 @@
|
||||
--
|
||||
local encode_json = require("cjson.safe").encode
|
||||
local ngx = ngx
|
||||
local arg = ngx.arg
|
||||
local ngx_print = ngx.print
|
||||
local ngx_header = ngx.header
|
||||
local ngx_add_header
|
||||
@ -151,4 +152,50 @@ function _M.clear_header_as_body_modified()
|
||||
end
|
||||
|
||||
|
||||
-- Hold body chunks and return the final body once all chunks have been read.
|
||||
-- Usage:
|
||||
-- function _M.body_filter(conf, ctx)
|
||||
-- local final_body = core.response.hold_body_chunk(ctx)
|
||||
-- if not final_body then
|
||||
-- return
|
||||
-- end
|
||||
-- final_body = transform(final_body)
|
||||
-- ngx.arg[1] = final_body
|
||||
-- ...
|
||||
--
|
||||
-- Inspired by kong.response.get_raw_body()
|
||||
function _M.hold_body_chunk(ctx)
|
||||
local body_buffer
|
||||
local chunk, eof = arg[1], arg[2]
|
||||
if eof then
|
||||
body_buffer = ctx._body_buffer
|
||||
if not body_buffer then
|
||||
return chunk
|
||||
end
|
||||
|
||||
body_buffer = concat_tab(body_buffer, "", 1, body_buffer.n)
|
||||
ctx._body_buffer = nil
|
||||
return body_buffer
|
||||
end
|
||||
|
||||
if type(chunk) == "string" and chunk ~= "" then
|
||||
body_buffer = ctx._body_buffer
|
||||
if not body_buffer then
|
||||
body_buffer = {
|
||||
chunk,
|
||||
n = 1
|
||||
}
|
||||
ctx._body_buffer = body_buffer
|
||||
else
|
||||
local n = body_buffer.n + 1
|
||||
body_buffer.n = n
|
||||
body_buffer[n] = chunk
|
||||
end
|
||||
end
|
||||
|
||||
arg[1] = nil
|
||||
return nil
|
||||
end
|
||||
|
||||
|
||||
return _M
|
||||
|
@ -163,7 +163,7 @@ function _M.body_filter(conf, ctx)
|
||||
return
|
||||
end
|
||||
|
||||
local err = response(proto_obj, conf.service, conf.method, conf.pb_option)
|
||||
local err = response(ctx, proto_obj, conf.service, conf.method, conf.pb_option)
|
||||
if err then
|
||||
core.log.error("transform response error: ", err)
|
||||
return
|
||||
|
@ -19,55 +19,43 @@ local core = require("apisix.core")
|
||||
local pb = require("pb")
|
||||
local ngx = ngx
|
||||
local string = string
|
||||
local table = table
|
||||
local ipairs = ipairs
|
||||
|
||||
return function(proto, service, method, pb_option)
|
||||
return function(ctx, proto, service, method, pb_option)
|
||||
local buffer = core.response.hold_body_chunk(ctx)
|
||||
if not buffer then
|
||||
return nil
|
||||
end
|
||||
|
||||
local m = util.find_method(proto, service, method)
|
||||
if not m then
|
||||
return false, "2.Undefined service method: " .. service .. "/" .. method
|
||||
.. " end."
|
||||
end
|
||||
|
||||
local chunk, eof = ngx.arg[1], ngx.arg[2]
|
||||
local buffered = ngx.ctx.buffered
|
||||
if not buffered then
|
||||
buffered = {}
|
||||
ngx.ctx.buffered = buffered
|
||||
if not ngx.req.get_headers()["X-Grpc-Web"] then
|
||||
buffer = string.sub(buffer, 6)
|
||||
end
|
||||
|
||||
if chunk ~= "" then
|
||||
core.table.insert(buffered, chunk)
|
||||
ngx.arg[1] = nil
|
||||
if pb_option then
|
||||
for _, opt in ipairs(pb_option) do
|
||||
pb.option(opt)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
if eof then
|
||||
ngx.ctx.buffered = nil
|
||||
local buffer = table.concat(buffered)
|
||||
if not ngx.req.get_headers()["X-Grpc-Web"] then
|
||||
buffer = string.sub(buffer, 6)
|
||||
end
|
||||
|
||||
if pb_option then
|
||||
for _, opt in ipairs(pb_option) do
|
||||
pb.option(opt)
|
||||
end
|
||||
end
|
||||
|
||||
local decoded = pb.decode(m.output_type, buffer)
|
||||
if not decoded then
|
||||
ngx.arg[1] = "failed to decode response data by protobuf"
|
||||
return "failed to decode response data by protobuf"
|
||||
end
|
||||
|
||||
local response, err = core.json.encode(decoded)
|
||||
if not response then
|
||||
core.log.error("failed to call json_encode data: ", err)
|
||||
response = "failed to json_encode response body"
|
||||
end
|
||||
|
||||
ngx.arg[1] = response
|
||||
local decoded = pb.decode(m.output_type, buffer)
|
||||
if not decoded then
|
||||
ngx.arg[1] = "failed to decode response data by protobuf"
|
||||
return "failed to decode response data by protobuf"
|
||||
end
|
||||
|
||||
local response, err = core.json.encode(decoded)
|
||||
if not response then
|
||||
core.log.error("failed to call json_encode data: ", err)
|
||||
response = "failed to json_encode response body"
|
||||
return response
|
||||
end
|
||||
|
||||
ngx.arg[1] = response
|
||||
return nil
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user