diff --git a/lua/apisix.lua b/lua/apisix.lua index 9c899d64..6d191896 100644 --- a/lua/apisix.lua +++ b/lua/apisix.lua @@ -1,8 +1,7 @@ -- Copyright (C) Yuansheng Wang local require = require -local log = require("apisix.core.log") -local resp = require("apisix.core.resp") +local core = require("apisix.core") local route_handler = require("apisix.route.handler") local base_plugin = require("apisix.base_plugin") local new_tab = require("table.new") @@ -11,8 +10,10 @@ local ngx = ngx local ngx_req = ngx.req local ngx_var = ngx.var + local _M = {version = 0.1} + function _M.init() require("resty.core") require("ngx.re").opt("jit_stack_size", 200 * 1024) @@ -24,10 +25,13 @@ function _M.init() require("apisix.route.handler").init() end + function _M.init_worker() require("apisix.route.load").init_worker() + require("apisix.balancer").init_worker() end + function _M.rewrite_phase() local ngx_ctx = ngx.ctx local api_ctx = ngx_ctx.api_ctx @@ -52,8 +56,8 @@ function _M.rewrite_phase() end if not ok then - log.info("not find any matched route") - return resp(404) + core.log.info("not find any matched route") + return core.resp(404) end -- todo: move those code to another single file diff --git a/lua/apisix/balancer.lua b/lua/apisix/balancer.lua index 8b7acdc9..de01b1ee 100644 --- a/lua/apisix/balancer.lua +++ b/lua/apisix/balancer.lua @@ -1,14 +1,17 @@ local core = require("apisix.core") -local resty_roundrobin = require("resty.roundrobin") +local config_etcd = require("apisix.core.config_etcd") +local roundrobin = require("resty.roundrobin") local balancer = require("ngx.balancer") local lrucache = require("resty.lrucache") +local upstreams_etcd local ngx = ngx local ngx_exit = ngx.exit local ngx_ERROR = ngx.ERROR +local error = error local module_name = "balancer" -local cache, err = lrucache.new(500) -- todo: config in yaml +local cache = lrucache.new(500) -- todo: config in yaml local _M = { @@ -17,16 +20,16 @@ local _M = { } -local function create_obj(typ, nodes) +local function create_server_piker(typ, nodes) -- core.log.info("create create_obj, type: ", typ, -- " nodes: ", core.json.encode(nodes)) if typ == "roundrobin" then - local obj = resty_roundrobin:new(nodes) - return obj + return roundrobin:new(nodes) end if typ == "chash" then + -- todo: support `chash` return nil, "not supported balancer type: " .. typ, 0 end @@ -37,30 +40,55 @@ end function _M.run(route, version) -- core.log.warn("conf: ", core.json.encode(conf), " version: ", version) local upstream = route.value.upstream + local up_id = upstream.id - local key = upstream.type .. "#" .. route.id .. "#" .. version + local key + if up_id then + if not upstreams_etcd then + core.log.warn("need to create a etcd instance for fetching ", + "upstream information") + ngx_exit(ngx_ERROR) + return + end + + local upstream_obj = upstreams_etcd:get(up_id) + if not upstream_obj then + core.log.warn("failed to find upstream by id: ", up_id) + ngx_exit(ngx_ERROR) + return + end + -- core.log.info("upstream: ", core.json.encode(upstream_obj)) + + upstream = upstream_obj.value + key = upstream.type .. "#upstream_" .. up_id .. "#" + .. upstream_obj.modifiedIndex + + else + key = upstream.type .. "#route_" .. route.id .. "#" .. version + end + + local server_piker, stale_server_piker = cache:get(key) + if not server_piker then + if stale_server_piker and stale_server_piker.conf_version == version then + server_piker = stale_server_piker - local obj, stale_obj = cache:get(key) - if not obj then - if stale_obj and stale_obj.conf_version == version then - obj = stale_obj else - obj, err = create_obj(upstream.type, upstream.nodes) - if not obj then - core.log.error("failed to get balancer object: ", err) + local err + server_piker, err = create_server_piker(upstream.type, upstream.nodes) + if not server_piker then + core.log.error("failed to get server piker: ", err) ngx_exit(ngx_ERROR) return end - obj.conf_version = version + server_piker.conf_version = version end -- todo: need a way to clean the old cache -- todo: config in yaml - cache:set(key, obj, 3600) + cache:set(key, server_piker, 3600) end - local server - server, err = obj:find() + local server, err = server_piker:find() if not server then core.log.error("failed to find valid upstream server", err) ngx_exit(ngx_ERROR) @@ -77,4 +105,15 @@ function _M.run(route, version) end +function _M.init_worker() + local err + upstreams_etcd, err = config_etcd.new("/user_upstreams", + {automatic = true}) + if not upstreams_etcd then + error("failed to create etcd instance to fetch upstream: " .. err) + return + end +end + + return _M diff --git a/lua/apisix/core.lua b/lua/apisix/core.lua index 70416923..a7d49ba3 100644 --- a/lua/apisix/core.lua +++ b/lua/apisix/core.lua @@ -2,10 +2,11 @@ return { version = 0.1, log = require("apisix.core.log"), config = require("apisix.core.config"), - config_etcd = require("apisix.core.config_etcd"), json = require("cjson.safe"), table = { - new = require("table.new"), + new = require("table.new"), clear = require("table.clear"), + nkeys = require("table.nkeys"), }, + resp = require("apisix.core.resp"), } diff --git a/lua/apisix/core/config_etcd.lua b/lua/apisix/core/config_etcd.lua index 7b866dfc..97afba68 100644 --- a/lua/apisix/core/config_etcd.lua +++ b/lua/apisix/core/config_etcd.lua @@ -3,7 +3,6 @@ local core = require("apisix.core") local etcd = require("resty.etcd") local new_tab = require("table.new") -local json_encode = require("cjson.safe").encode local exiting = ngx.worker.exiting local insert_tab = table.insert local type = type @@ -11,6 +10,7 @@ local ipairs = ipairs local setmetatable = setmetatable local ngx_sleep = ngx.sleep local ngx_timer_at = ngx.timer.at +local sub_str = string.sub local _M = {version = 0.1} @@ -62,11 +62,12 @@ local function waitdir(etcd_cli, key, modified_index) end -function _M.fetch(self) - if self.automatic then - return self.values - end +local function short_key(self, str) + return sub_str(str, #self.key + 2) +end + +function _M.fetch(self) if self.values == nil then local dir_res, err = readdir(self.etcd_cli, self.key) if not dir_res then @@ -82,7 +83,8 @@ function _M.fetch(self) for _, item in ipairs(dir_res.nodes) do insert_tab(self.values, item) - self.values_hash[item.key] = #self.values + local key = short_key(self, item.key) + self.values_hash[key] = #self.values if not self.prev_index or item.modifiedIndex > self.prev_index then self.prev_index = item.modifiedIndex @@ -92,43 +94,71 @@ function _M.fetch(self) return self.values end - local dir_res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1) - if not dir_res then + local res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1) + if not res then return nil, err end - if dir_res.dir then + if res.dir then core.log.error("todo: support for parsing `dir` response structures. ", - json_encode(dir_res)) + core.json.encode(res)) return self.values end - -- log.warn("waitdir: ", require("cjson").encode(dir_res)) + -- core.log.warn("waitdir: ", core.json.encode(res)) - if not self.prev_index or dir_res.modifiedIndex > self.prev_index then - self.prev_index = dir_res.modifiedIndex + if not self.prev_index or res.modifiedIndex > self.prev_index then + self.prev_index = res.modifiedIndex end - local pre_index = self.values_hash[dir_res.key] + local key = short_key(self, res.key) + local pre_index = self.values_hash[key] if pre_index then - if dir_res.value then - self.values[pre_index] = dir_res.value + if res.value then + self.values[pre_index] = res else + self.sync_times = self.sync_times + 1 self.values[pre_index] = false end - return self.values + elseif res.value then + insert_tab(self.values, res) + self.values_hash[key] = #self.values end - if dir_res.value then - insert_tab(self.values, dir_res) - self.values_hash[dir_res.key] = #self.values + -- avoid space waste + -- todo: need to cover this path, it is important. + if self.sync_times > 100 then + local count = 0 + for i = 1, #self.values do + local val = self.values[i] + self.values[i] = nil + if val then + count = count + 1 + self.values[count] = val + end + end + + for i = 1, count do + key = short_key(self, self.values[i].key) + self.values_hash[key] = i + end end return self.values end +function _M.get(self, key) + local arr_idx = self.values_hash[tostring(key)] + if not arr_idx then + return nil + end + + return self.values[arr_idx] +end + + local function _automatic_fetch(premature, self) if premature then return @@ -169,6 +199,7 @@ function _M.new(key, opts) prev_index = nil, key = key, automatic = automatic, + sync_times = 0, }, mt) if automatic then diff --git a/lua/apisix/route/handler.lua b/lua/apisix/route/handler.lua index da0a47e8..28e09f21 100644 --- a/lua/apisix/route/handler.lua +++ b/lua/apisix/route/handler.lua @@ -3,7 +3,7 @@ local r3router = require("resty.r3") local new_tab = require("table.new") local route_load = require("apisix.route.load") -local log = require("apisix.core.log") +local core = require("apisix.core") local ipairs = ipairs local router @@ -15,7 +15,7 @@ local _M = {} local function run_route(matched_params, route, api_ctx) api_ctx.matched_params = matched_params api_ctx.matched_route = route - -- log.warn("run route id: ", route.id, " host: ", api_ctx.host) + -- core.log.warn("run route id: ", route.id, " host: ", api_ctx.host) end @@ -48,12 +48,12 @@ do local routes = {} function _M.set_routes(new_routes) routes = new_routes - log.info("update new routes: ", require("cjson.safe").encode(routes)) + core.log.info("update new routes: ", require("cjson.safe").encode(routes)) end function _M.get_router() if router == nil then - log.info("generate a empty router instance") + core.log.info("generate a empty router instance") return _load_route(routes) end diff --git a/lua/apisix/route/load.lua b/lua/apisix/route/load.lua index 078d02c2..31dd15b5 100644 --- a/lua/apisix/route/load.lua +++ b/lua/apisix/route/load.lua @@ -1,9 +1,7 @@ -- Copyright (C) Yuansheng Wang -local log = require("apisix.core.log") +local core = require("apisix.core") local etcd = require("apisix.core.config_etcd") -local table_nkeys = require("table.nkeys") -local new_tab = require("table.new") local insert_tab = table.insert local ngx = ngx local pcall = pcall @@ -18,12 +16,12 @@ local function load(etcd_routes) local routes, err = etcd_routes:fetch() if not routes then if err ~= "timeout" then - log.error("failed to fetch routes: ", err) + core.log.error("failed to fetch routes: ", err) end return end - local arr_routes = new_tab(table_nkeys(routes), 0) + local arr_routes = core.table.new(core.table.nkeys(routes), 0) for route_id, route in pairs(routes) do if type(route) == "table" then route.id = route_id @@ -31,7 +29,7 @@ local function load(etcd_routes) end end - -- log.warn(apisix.json.encode(arr_routes)) + -- core.log.warn(apisix.json.encode(arr_routes)) if callback then callback(arr_routes) end @@ -56,7 +54,7 @@ function _M.load(premature) running = false if not ok then - log.error("failed to call `load` function: ", err) + core.log.error("failed to call `load` function: ", err) end end