mirror of
https://gitee.com/iresty/apisix.git
synced 2024-11-30 11:07:59 +08:00
feature: implemented new feature limit-req
.
This commit is contained in:
parent
ee1d508e04
commit
20e240bb96
@ -4,3 +4,4 @@ etcd:
|
||||
|
||||
plugins:
|
||||
- example-plugin
|
||||
- limit-req
|
||||
|
@ -21,6 +21,8 @@ http {
|
||||
lua_package_path "/usr/share/lua/5.1/?.lua;$prefix/lua/?.lua;;";
|
||||
lua_package_cpath '/usr/lib64/lua/5.1/?.so;;';
|
||||
|
||||
lua_shared_dict plugin-limit-req 10m;
|
||||
|
||||
lua_ssl_verify_depth 5;
|
||||
ssl_session_timeout 86400;
|
||||
|
||||
|
@ -7,8 +7,6 @@ local base_plugin = require("apisix.base_plugin")
|
||||
local new_tab = require("table.new")
|
||||
local load_balancer = require("apisix.balancer") .run
|
||||
local ngx = ngx
|
||||
local ngx_req = ngx.req
|
||||
local ngx_var = ngx.var
|
||||
|
||||
|
||||
local _M = {version = 0.1}
|
||||
@ -41,17 +39,16 @@ function _M.rewrite_phase()
|
||||
ngx_ctx.api_ctx = api_ctx
|
||||
end
|
||||
|
||||
api_ctx.method = api_ctx.method or ngx_req.get_method()
|
||||
api_ctx.uri = api_ctx.uri or ngx_var.uri
|
||||
api_ctx.host = api_ctx.host or ngx_var.host
|
||||
local method = core.ctx.get(api_ctx, "method")
|
||||
local uri = core.ctx.get(api_ctx, "uri")
|
||||
local host = core.ctx.get(api_ctx, "host")
|
||||
|
||||
local router, dispatch_uri = route_handler.get_router()
|
||||
local ok
|
||||
if dispatch_uri then
|
||||
ok = router:dispatch(api_ctx.method, api_ctx.uri, api_ctx)
|
||||
ok = router:dispatch(method, uri, api_ctx)
|
||||
else
|
||||
ok = router:dispatch(api_ctx.method, api_ctx.host .. api_ctx.uri,
|
||||
api_ctx)
|
||||
ok = router:dispatch(method, host .. uri, api_ctx)
|
||||
end
|
||||
|
||||
if not ok then
|
||||
@ -66,19 +63,25 @@ function _M.rewrite_phase()
|
||||
ngx.say("failed to load plugins: ", err)
|
||||
end
|
||||
|
||||
if api_ctx.matched_route.service_id then
|
||||
error("todo: suppport to use service fetch user config")
|
||||
else
|
||||
api_ctx.conf_type = "route"
|
||||
api_ctx.conf_version = api_ctx.matched_route.modifiedIndex
|
||||
api_ctx.conf_id = api_ctx.matched_route.id
|
||||
end
|
||||
|
||||
local filter_plugins = base_plugin.filter_plugin(
|
||||
api_ctx.matched_route, local_supported_plugins)
|
||||
|
||||
api_ctx.filter_plugins = filter_plugins
|
||||
api_ctx.conf_version = api_ctx.matched_route.modifiedIndex
|
||||
-- todo: fetch the upstream node status, it may be stored in
|
||||
-- different places.
|
||||
|
||||
for i = 1, #filter_plugins, 2 do
|
||||
local plugin = filter_plugins[i]
|
||||
if plugin.rewrite then
|
||||
plugin.rewrite(filter_plugins[i + 1],
|
||||
api_ctx.conf_version)
|
||||
plugin.rewrite(filter_plugins[i + 1], api_ctx)
|
||||
end
|
||||
end
|
||||
end
|
||||
@ -93,8 +96,7 @@ function _M.access_phase()
|
||||
for i = 1, #filter_plugins, 2 do
|
||||
local plugin = filter_plugins[i]
|
||||
if plugin.access then
|
||||
plugin.access(filter_plugins[i + 1],
|
||||
api_ctx.conf_version)
|
||||
plugin.access(filter_plugins[i + 1], api_ctx)
|
||||
end
|
||||
end
|
||||
end
|
||||
@ -109,8 +111,7 @@ function _M.header_filter_phase()
|
||||
for i = 1, #filter_plugins, 2 do
|
||||
local plugin = filter_plugins[i]
|
||||
if plugin.header_filter then
|
||||
plugin.header_filter(filter_plugins[i + 1],
|
||||
api_ctx.conf_version)
|
||||
plugin.header_filter(filter_plugins[i + 1], api_ctx)
|
||||
end
|
||||
end
|
||||
end
|
||||
@ -125,8 +126,7 @@ function _M.log_phase()
|
||||
for i = 1, #filter_plugins, 2 do
|
||||
local plugin = filter_plugins[i]
|
||||
if plugin.log then
|
||||
plugin.log(filter_plugins[i + 1],
|
||||
api_ctx.conf_version)
|
||||
plugin.log(filter_plugins[i + 1], api_ctx)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -1,8 +1,49 @@
|
||||
local tostring = tostring
|
||||
local json_encode = require("cjson.safe").encode
|
||||
|
||||
|
||||
local function serialise_obj(data)
|
||||
if type(data) == "function" or type(data) == "userdata"
|
||||
or type(data) == "table" then
|
||||
return tostring(data)
|
||||
end
|
||||
|
||||
return data
|
||||
end
|
||||
|
||||
local function tab_clone_with_serialise(data)
|
||||
if type(data) ~= "table" then
|
||||
return data
|
||||
end
|
||||
|
||||
local t = {}
|
||||
for k, v in pairs(data) do
|
||||
if type(v) == "table" then
|
||||
t[serialise_obj(k)] = tab_clone_with_serialise(v)
|
||||
|
||||
else
|
||||
t[serialise_obj(k)] = serialise_obj(v)
|
||||
end
|
||||
end
|
||||
|
||||
return t
|
||||
end
|
||||
|
||||
|
||||
return {
|
||||
version = 0.1,
|
||||
log = require("apisix.core.log"),
|
||||
config = require("apisix.core.config"),
|
||||
json = require("cjson.safe"),
|
||||
json = {
|
||||
encode = function(data, force)
|
||||
if force then
|
||||
data = tab_clone_with_serialise(data)
|
||||
end
|
||||
|
||||
return json_encode(data)
|
||||
end,
|
||||
decode = require("cjson.safe").decode,
|
||||
},
|
||||
table = {
|
||||
new = require("table.new"),
|
||||
clear = require("table.clear"),
|
||||
@ -10,4 +51,6 @@ return {
|
||||
},
|
||||
resp = require("apisix.core.resp"),
|
||||
typeof = require("apisix.core.typeof"),
|
||||
global_lru = require("apisix.core.global_lrucache"),
|
||||
ctx = require("apisix.core.ctx"),
|
||||
}
|
||||
|
37
lua/apisix/core/ctx.lua
Normal file
37
lua/apisix/core/ctx.lua
Normal file
@ -0,0 +1,37 @@
|
||||
-- Copyright (C) Yuansheng Wang
|
||||
|
||||
local ngx_req = ngx.req
|
||||
local ngx_var = ngx.var
|
||||
local new_tab = require("table.new")
|
||||
|
||||
|
||||
local _M = {version = 0.1}
|
||||
|
||||
|
||||
function _M.get(api_ctx, name)
|
||||
local vars = api_ctx.vars
|
||||
if not vars then
|
||||
vars = new_tab(0, 8)
|
||||
api_ctx.vars = vars
|
||||
end
|
||||
|
||||
local val = vars[name]
|
||||
if val then
|
||||
return val
|
||||
end
|
||||
|
||||
if name == "method" then
|
||||
val = ngx_req.get_method()
|
||||
else
|
||||
val = ngx_var[name]
|
||||
end
|
||||
|
||||
if val then
|
||||
vars[name] = val
|
||||
end
|
||||
|
||||
return val
|
||||
end
|
||||
|
||||
|
||||
return _M
|
33
lua/apisix/core/global_lrucache.lua
Normal file
33
lua/apisix/core/global_lrucache.lua
Normal file
@ -0,0 +1,33 @@
|
||||
-- Copyright (C) Yuansheng Wang
|
||||
|
||||
local lru_new = require("resty.lrucache").new
|
||||
local lru_items = lru_new(200) -- todo: support to config in yaml.
|
||||
|
||||
|
||||
local _M = {version = 0.1}
|
||||
|
||||
|
||||
function _M.fetch(name, count, version)
|
||||
local cache = lru_items:get(name)
|
||||
if cache and cache.version == version then
|
||||
return cache
|
||||
end
|
||||
|
||||
cache = lru_new(count)
|
||||
cache.version = version
|
||||
lru_items:set(name, cache)
|
||||
return cache
|
||||
end
|
||||
|
||||
|
||||
function _M.set(self, name, lru)
|
||||
return lru_items:set(name, lru)
|
||||
end
|
||||
|
||||
|
||||
function _M.delete(name)
|
||||
return lru_items:delete(name)
|
||||
end
|
||||
|
||||
|
||||
return _M
|
@ -4,6 +4,7 @@ local ngx = ngx
|
||||
local ngx_log = ngx.log
|
||||
local ngx_DEBUG= ngx.DEBUG
|
||||
local DEBUG = ngx.config.debug
|
||||
-- todo: support stream module
|
||||
local cur_level = ngx.config.subsystem == "http" and
|
||||
require "ngx.errlog" .get_sys_filter_level()
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
local core = require("apisix.core")
|
||||
local base_plugin = require("apisix.base_plugin")
|
||||
local encode_json = require("cjson.safe").encode
|
||||
|
||||
|
||||
-- TODO: need a more powerful way to define the schema
|
||||
@ -32,13 +31,15 @@ function _M.check_args(conf)
|
||||
end
|
||||
|
||||
|
||||
function _M.rewrite(conf)
|
||||
core.log.warn("plugin rewrite phase, conf: ", encode_json(conf))
|
||||
function _M.rewrite(conf, api_ctx)
|
||||
core.log.warn("plugin rewrite phase, conf: ", core.json.encode(conf),
|
||||
" ctx: ", core.json.encode(api_ctx, true))
|
||||
end
|
||||
|
||||
|
||||
function _M.access(conf)
|
||||
core.log.warn("plugin access phase, conf: ", encode_json(conf))
|
||||
function _M.access(conf, api_ctx)
|
||||
core.log.warn("plugin access phase, conf: ", core.json.encode(conf),
|
||||
" ctx: ", core.json.encode(api_ctx, true))
|
||||
-- ngx.say("hit example plugin")
|
||||
end
|
||||
|
||||
|
45
lua/apisix/plugins/limit-req.lua
Normal file
45
lua/apisix/plugins/limit-req.lua
Normal file
@ -0,0 +1,45 @@
|
||||
local limit_req = require("resty.limit.req")
|
||||
local core = require("apisix.core")
|
||||
-- todo: support to config it in yaml
|
||||
local cache = core.global_lru.fetch("/plugin/limit-req", 200)
|
||||
|
||||
|
||||
local _M = {
|
||||
version = 0.1,
|
||||
priority = 1001, -- TODO: add a type field, may be a good idea
|
||||
name = "limit-req",
|
||||
}
|
||||
|
||||
|
||||
function _M.check_args(conf)
|
||||
return true
|
||||
end
|
||||
|
||||
|
||||
function _M.access(conf, api_ctx)
|
||||
local key = api_ctx.conf_type .. "#" .. api_ctx.conf_id
|
||||
-- core.log.warn("key: ", key, " conf: ", core.json.encode(conf))
|
||||
|
||||
local limit_ins = cache:get(key)
|
||||
if not limit_ins or limit_ins.version ~= api_ctx.conf_version then
|
||||
limit_ins = limit_req.new("plugin-limit-req", conf.rate, conf.burst)
|
||||
cache:set(key, limit_ins)
|
||||
end
|
||||
|
||||
key = core.ctx.get(api_ctx, conf.key)
|
||||
|
||||
local delay, err = limit_ins:incoming(key, true)
|
||||
if not delay then
|
||||
if err == "rejected" then
|
||||
return core.resp(conf.rejected_code)
|
||||
end
|
||||
|
||||
core.log.error("failed to limit req: ", err)
|
||||
return core.resp(500)
|
||||
end
|
||||
|
||||
core.log.info("hit limit-req access")
|
||||
end
|
||||
|
||||
|
||||
return _M
|
Loading…
Reference in New Issue
Block a user