diff --git a/conf/config.yaml b/conf/config.yaml index affa72db..cad3c17f 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -4,3 +4,4 @@ etcd: plugins: - example-plugin + - limit-req diff --git a/conf/nginx.conf b/conf/nginx.conf index 3dda428e..0eb4a489 100644 --- a/conf/nginx.conf +++ b/conf/nginx.conf @@ -21,6 +21,8 @@ http { lua_package_path "/usr/share/lua/5.1/?.lua;$prefix/lua/?.lua;;"; lua_package_cpath '/usr/lib64/lua/5.1/?.so;;'; + lua_shared_dict plugin-limit-req 10m; + lua_ssl_verify_depth 5; ssl_session_timeout 86400; diff --git a/lua/apisix.lua b/lua/apisix.lua index 7b0f03e4..683121e4 100644 --- a/lua/apisix.lua +++ b/lua/apisix.lua @@ -7,8 +7,6 @@ local base_plugin = require("apisix.base_plugin") local new_tab = require("table.new") local load_balancer = require("apisix.balancer") .run local ngx = ngx -local ngx_req = ngx.req -local ngx_var = ngx.var local _M = {version = 0.1} @@ -41,17 +39,16 @@ function _M.rewrite_phase() ngx_ctx.api_ctx = api_ctx end - api_ctx.method = api_ctx.method or ngx_req.get_method() - api_ctx.uri = api_ctx.uri or ngx_var.uri - api_ctx.host = api_ctx.host or ngx_var.host + local method = core.ctx.get(api_ctx, "method") + local uri = core.ctx.get(api_ctx, "uri") + local host = core.ctx.get(api_ctx, "host") local router, dispatch_uri = route_handler.get_router() local ok if dispatch_uri then - ok = router:dispatch(api_ctx.method, api_ctx.uri, api_ctx) + ok = router:dispatch(method, uri, api_ctx) else - ok = router:dispatch(api_ctx.method, api_ctx.host .. api_ctx.uri, - api_ctx) + ok = router:dispatch(method, host .. uri, api_ctx) end if not ok then @@ -66,19 +63,25 @@ function _M.rewrite_phase() ngx.say("failed to load plugins: ", err) end + if api_ctx.matched_route.service_id then + error("todo: suppport to use service fetch user config") + else + api_ctx.conf_type = "route" + api_ctx.conf_version = api_ctx.matched_route.modifiedIndex + api_ctx.conf_id = api_ctx.matched_route.id + end + local filter_plugins = base_plugin.filter_plugin( api_ctx.matched_route, local_supported_plugins) api_ctx.filter_plugins = filter_plugins - api_ctx.conf_version = api_ctx.matched_route.modifiedIndex -- todo: fetch the upstream node status, it may be stored in -- different places. for i = 1, #filter_plugins, 2 do local plugin = filter_plugins[i] if plugin.rewrite then - plugin.rewrite(filter_plugins[i + 1], - api_ctx.conf_version) + plugin.rewrite(filter_plugins[i + 1], api_ctx) end end end @@ -93,8 +96,7 @@ function _M.access_phase() for i = 1, #filter_plugins, 2 do local plugin = filter_plugins[i] if plugin.access then - plugin.access(filter_plugins[i + 1], - api_ctx.conf_version) + plugin.access(filter_plugins[i + 1], api_ctx) end end end @@ -109,8 +111,7 @@ function _M.header_filter_phase() for i = 1, #filter_plugins, 2 do local plugin = filter_plugins[i] if plugin.header_filter then - plugin.header_filter(filter_plugins[i + 1], - api_ctx.conf_version) + plugin.header_filter(filter_plugins[i + 1], api_ctx) end end end @@ -125,8 +126,7 @@ function _M.log_phase() for i = 1, #filter_plugins, 2 do local plugin = filter_plugins[i] if plugin.log then - plugin.log(filter_plugins[i + 1], - api_ctx.conf_version) + plugin.log(filter_plugins[i + 1], api_ctx) end end end diff --git a/lua/apisix/core.lua b/lua/apisix/core.lua index 1eff8351..31576f51 100644 --- a/lua/apisix/core.lua +++ b/lua/apisix/core.lua @@ -1,8 +1,49 @@ +local tostring = tostring +local json_encode = require("cjson.safe").encode + + +local function serialise_obj(data) + if type(data) == "function" or type(data) == "userdata" + or type(data) == "table" then + return tostring(data) + end + + return data +end + +local function tab_clone_with_serialise(data) + if type(data) ~= "table" then + return data + end + + local t = {} + for k, v in pairs(data) do + if type(v) == "table" then + t[serialise_obj(k)] = tab_clone_with_serialise(v) + + else + t[serialise_obj(k)] = serialise_obj(v) + end + end + + return t +end + + return { version = 0.1, log = require("apisix.core.log"), config = require("apisix.core.config"), - json = require("cjson.safe"), + json = { + encode = function(data, force) + if force then + data = tab_clone_with_serialise(data) + end + + return json_encode(data) + end, + decode = require("cjson.safe").decode, + }, table = { new = require("table.new"), clear = require("table.clear"), @@ -10,4 +51,6 @@ return { }, resp = require("apisix.core.resp"), typeof = require("apisix.core.typeof"), + global_lru = require("apisix.core.global_lrucache"), + ctx = require("apisix.core.ctx"), } diff --git a/lua/apisix/core/ctx.lua b/lua/apisix/core/ctx.lua new file mode 100644 index 00000000..5750b889 --- /dev/null +++ b/lua/apisix/core/ctx.lua @@ -0,0 +1,37 @@ +-- Copyright (C) Yuansheng Wang + +local ngx_req = ngx.req +local ngx_var = ngx.var +local new_tab = require("table.new") + + +local _M = {version = 0.1} + + +function _M.get(api_ctx, name) + local vars = api_ctx.vars + if not vars then + vars = new_tab(0, 8) + api_ctx.vars = vars + end + + local val = vars[name] + if val then + return val + end + + if name == "method" then + val = ngx_req.get_method() + else + val = ngx_var[name] + end + + if val then + vars[name] = val + end + + return val +end + + +return _M diff --git a/lua/apisix/core/global_lrucache.lua b/lua/apisix/core/global_lrucache.lua new file mode 100644 index 00000000..f685e9c7 --- /dev/null +++ b/lua/apisix/core/global_lrucache.lua @@ -0,0 +1,33 @@ +-- Copyright (C) Yuansheng Wang + +local lru_new = require("resty.lrucache").new +local lru_items = lru_new(200) -- todo: support to config in yaml. + + +local _M = {version = 0.1} + + +function _M.fetch(name, count, version) + local cache = lru_items:get(name) + if cache and cache.version == version then + return cache + end + + cache = lru_new(count) + cache.version = version + lru_items:set(name, cache) + return cache +end + + +function _M.set(self, name, lru) + return lru_items:set(name, lru) +end + + +function _M.delete(name) + return lru_items:delete(name) +end + + +return _M diff --git a/lua/apisix/core/log.lua b/lua/apisix/core/log.lua index dc83cdeb..6e3563db 100644 --- a/lua/apisix/core/log.lua +++ b/lua/apisix/core/log.lua @@ -4,6 +4,7 @@ local ngx = ngx local ngx_log = ngx.log local ngx_DEBUG= ngx.DEBUG local DEBUG = ngx.config.debug +-- todo: support stream module local cur_level = ngx.config.subsystem == "http" and require "ngx.errlog" .get_sys_filter_level() diff --git a/lua/apisix/plugins/example-plugin.lua b/lua/apisix/plugins/example-plugin.lua index 5ec7b8c2..586ffab2 100644 --- a/lua/apisix/plugins/example-plugin.lua +++ b/lua/apisix/plugins/example-plugin.lua @@ -1,6 +1,5 @@ local core = require("apisix.core") local base_plugin = require("apisix.base_plugin") -local encode_json = require("cjson.safe").encode -- TODO: need a more powerful way to define the schema @@ -32,13 +31,15 @@ function _M.check_args(conf) end -function _M.rewrite(conf) - core.log.warn("plugin rewrite phase, conf: ", encode_json(conf)) +function _M.rewrite(conf, api_ctx) + core.log.warn("plugin rewrite phase, conf: ", core.json.encode(conf), + " ctx: ", core.json.encode(api_ctx, true)) end -function _M.access(conf) - core.log.warn("plugin access phase, conf: ", encode_json(conf)) +function _M.access(conf, api_ctx) + core.log.warn("plugin access phase, conf: ", core.json.encode(conf), + " ctx: ", core.json.encode(api_ctx, true)) -- ngx.say("hit example plugin") end diff --git a/lua/apisix/plugins/limit-req.lua b/lua/apisix/plugins/limit-req.lua new file mode 100644 index 00000000..0bf0919e --- /dev/null +++ b/lua/apisix/plugins/limit-req.lua @@ -0,0 +1,45 @@ +local limit_req = require("resty.limit.req") +local core = require("apisix.core") +-- todo: support to config it in yaml +local cache = core.global_lru.fetch("/plugin/limit-req", 200) + + +local _M = { + version = 0.1, + priority = 1001, -- TODO: add a type field, may be a good idea + name = "limit-req", +} + + +function _M.check_args(conf) + return true +end + + +function _M.access(conf, api_ctx) + local key = api_ctx.conf_type .. "#" .. api_ctx.conf_id + -- core.log.warn("key: ", key, " conf: ", core.json.encode(conf)) + + local limit_ins = cache:get(key) + if not limit_ins or limit_ins.version ~= api_ctx.conf_version then + limit_ins = limit_req.new("plugin-limit-req", conf.rate, conf.burst) + cache:set(key, limit_ins) + end + + key = core.ctx.get(api_ctx, conf.key) + + local delay, err = limit_ins:incoming(key, true) + if not delay then + if err == "rejected" then + return core.resp(conf.rejected_code) + end + + core.log.error("failed to limit req: ", err) + return core.resp(500) + end + + core.log.info("hit limit-req access") +end + + +return _M