feature: the upstream data can be stored under /user_upstream and indexed by id. (#18)

* change: code style, used `apisix.core` if possible.
This commit is contained in:
YuanSheng Wang 2019-05-20 10:36:23 +08:00 committed by GitHub
parent 097ef81d69
commit 4fc74ea6e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 127 additions and 54 deletions

View File

@ -1,8 +1,7 @@
-- Copyright (C) Yuansheng Wang
local require = require
local log = require("apisix.core.log")
local resp = require("apisix.core.resp")
local core = require("apisix.core")
local route_handler = require("apisix.route.handler")
local base_plugin = require("apisix.base_plugin")
local new_tab = require("table.new")
@ -11,8 +10,10 @@ local ngx = ngx
local ngx_req = ngx.req
local ngx_var = ngx.var
local _M = {version = 0.1}
function _M.init()
require("resty.core")
require("ngx.re").opt("jit_stack_size", 200 * 1024)
@ -24,10 +25,13 @@ function _M.init()
require("apisix.route.handler").init()
end
function _M.init_worker()
require("apisix.route.load").init_worker()
require("apisix.balancer").init_worker()
end
function _M.rewrite_phase()
local ngx_ctx = ngx.ctx
local api_ctx = ngx_ctx.api_ctx
@ -52,8 +56,8 @@ function _M.rewrite_phase()
end
if not ok then
log.info("not find any matched route")
return resp(404)
core.log.info("not find any matched route")
return core.resp(404)
end
-- todo: move those code to another single file

View File

@ -1,14 +1,17 @@
local core = require("apisix.core")
local resty_roundrobin = require("resty.roundrobin")
local config_etcd = require("apisix.core.config_etcd")
local roundrobin = require("resty.roundrobin")
local balancer = require("ngx.balancer")
local lrucache = require("resty.lrucache")
local upstreams_etcd
local ngx = ngx
local ngx_exit = ngx.exit
local ngx_ERROR = ngx.ERROR
local error = error
local module_name = "balancer"
local cache, err = lrucache.new(500) -- todo: config in yaml
local cache = lrucache.new(500) -- todo: config in yaml
local _M = {
@ -17,16 +20,16 @@ local _M = {
}
local function create_obj(typ, nodes)
local function create_server_piker(typ, nodes)
-- core.log.info("create create_obj, type: ", typ,
-- " nodes: ", core.json.encode(nodes))
if typ == "roundrobin" then
local obj = resty_roundrobin:new(nodes)
return obj
return roundrobin:new(nodes)
end
if typ == "chash" then
-- todo: support `chash`
return nil, "not supported balancer type: " .. typ, 0
end
@ -37,30 +40,55 @@ end
function _M.run(route, version)
-- core.log.warn("conf: ", core.json.encode(conf), " version: ", version)
local upstream = route.value.upstream
local up_id = upstream.id
local key = upstream.type .. "#" .. route.id .. "#" .. version
local key
if up_id then
if not upstreams_etcd then
core.log.warn("need to create a etcd instance for fetching ",
"upstream information")
ngx_exit(ngx_ERROR)
return
end
local upstream_obj = upstreams_etcd:get(up_id)
if not upstream_obj then
core.log.warn("failed to find upstream by id: ", up_id)
ngx_exit(ngx_ERROR)
return
end
-- core.log.info("upstream: ", core.json.encode(upstream_obj))
upstream = upstream_obj.value
key = upstream.type .. "#upstream_" .. up_id .. "#"
.. upstream_obj.modifiedIndex
else
key = upstream.type .. "#route_" .. route.id .. "#" .. version
end
local server_piker, stale_server_piker = cache:get(key)
if not server_piker then
if stale_server_piker and stale_server_piker.conf_version == version then
server_piker = stale_server_piker
local obj, stale_obj = cache:get(key)
if not obj then
if stale_obj and stale_obj.conf_version == version then
obj = stale_obj
else
obj, err = create_obj(upstream.type, upstream.nodes)
if not obj then
core.log.error("failed to get balancer object: ", err)
local err
server_piker, err = create_server_piker(upstream.type, upstream.nodes)
if not server_piker then
core.log.error("failed to get server piker: ", err)
ngx_exit(ngx_ERROR)
return
end
obj.conf_version = version
server_piker.conf_version = version
end
-- todo: need a way to clean the old cache
-- todo: config in yaml
cache:set(key, obj, 3600)
cache:set(key, server_piker, 3600)
end
local server
server, err = obj:find()
local server, err = server_piker:find()
if not server then
core.log.error("failed to find valid upstream server", err)
ngx_exit(ngx_ERROR)
@ -77,4 +105,15 @@ function _M.run(route, version)
end
function _M.init_worker()
local err
upstreams_etcd, err = config_etcd.new("/user_upstreams",
{automatic = true})
if not upstreams_etcd then
error("failed to create etcd instance to fetch upstream: " .. err)
return
end
end
return _M

View File

@ -2,10 +2,11 @@ return {
version = 0.1,
log = require("apisix.core.log"),
config = require("apisix.core.config"),
config_etcd = require("apisix.core.config_etcd"),
json = require("cjson.safe"),
table = {
new = require("table.new"),
new = require("table.new"),
clear = require("table.clear"),
nkeys = require("table.nkeys"),
},
resp = require("apisix.core.resp"),
}

View File

@ -3,7 +3,6 @@
local core = require("apisix.core")
local etcd = require("resty.etcd")
local new_tab = require("table.new")
local json_encode = require("cjson.safe").encode
local exiting = ngx.worker.exiting
local insert_tab = table.insert
local type = type
@ -11,6 +10,7 @@ local ipairs = ipairs
local setmetatable = setmetatable
local ngx_sleep = ngx.sleep
local ngx_timer_at = ngx.timer.at
local sub_str = string.sub
local _M = {version = 0.1}
@ -62,11 +62,12 @@ local function waitdir(etcd_cli, key, modified_index)
end
function _M.fetch(self)
if self.automatic then
return self.values
end
local function short_key(self, str)
return sub_str(str, #self.key + 2)
end
function _M.fetch(self)
if self.values == nil then
local dir_res, err = readdir(self.etcd_cli, self.key)
if not dir_res then
@ -82,7 +83,8 @@ function _M.fetch(self)
for _, item in ipairs(dir_res.nodes) do
insert_tab(self.values, item)
self.values_hash[item.key] = #self.values
local key = short_key(self, item.key)
self.values_hash[key] = #self.values
if not self.prev_index or item.modifiedIndex > self.prev_index then
self.prev_index = item.modifiedIndex
@ -92,43 +94,71 @@ function _M.fetch(self)
return self.values
end
local dir_res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1)
if not dir_res then
local res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1)
if not res then
return nil, err
end
if dir_res.dir then
if res.dir then
core.log.error("todo: support for parsing `dir` response structures. ",
json_encode(dir_res))
core.json.encode(res))
return self.values
end
-- log.warn("waitdir: ", require("cjson").encode(dir_res))
-- core.log.warn("waitdir: ", core.json.encode(res))
if not self.prev_index or dir_res.modifiedIndex > self.prev_index then
self.prev_index = dir_res.modifiedIndex
if not self.prev_index or res.modifiedIndex > self.prev_index then
self.prev_index = res.modifiedIndex
end
local pre_index = self.values_hash[dir_res.key]
local key = short_key(self, res.key)
local pre_index = self.values_hash[key]
if pre_index then
if dir_res.value then
self.values[pre_index] = dir_res.value
if res.value then
self.values[pre_index] = res
else
self.sync_times = self.sync_times + 1
self.values[pre_index] = false
end
return self.values
elseif res.value then
insert_tab(self.values, res)
self.values_hash[key] = #self.values
end
if dir_res.value then
insert_tab(self.values, dir_res)
self.values_hash[dir_res.key] = #self.values
-- avoid space waste
-- todo: need to cover this path, it is important.
if self.sync_times > 100 then
local count = 0
for i = 1, #self.values do
local val = self.values[i]
self.values[i] = nil
if val then
count = count + 1
self.values[count] = val
end
end
for i = 1, count do
key = short_key(self, self.values[i].key)
self.values_hash[key] = i
end
end
return self.values
end
function _M.get(self, key)
local arr_idx = self.values_hash[tostring(key)]
if not arr_idx then
return nil
end
return self.values[arr_idx]
end
local function _automatic_fetch(premature, self)
if premature then
return
@ -169,6 +199,7 @@ function _M.new(key, opts)
prev_index = nil,
key = key,
automatic = automatic,
sync_times = 0,
}, mt)
if automatic then

View File

@ -3,7 +3,7 @@
local r3router = require("resty.r3")
local new_tab = require("table.new")
local route_load = require("apisix.route.load")
local log = require("apisix.core.log")
local core = require("apisix.core")
local ipairs = ipairs
local router
@ -15,7 +15,7 @@ local _M = {}
local function run_route(matched_params, route, api_ctx)
api_ctx.matched_params = matched_params
api_ctx.matched_route = route
-- log.warn("run route id: ", route.id, " host: ", api_ctx.host)
-- core.log.warn("run route id: ", route.id, " host: ", api_ctx.host)
end
@ -48,12 +48,12 @@ do
local routes = {}
function _M.set_routes(new_routes)
routes = new_routes
log.info("update new routes: ", require("cjson.safe").encode(routes))
core.log.info("update new routes: ", require("cjson.safe").encode(routes))
end
function _M.get_router()
if router == nil then
log.info("generate a empty router instance")
core.log.info("generate a empty router instance")
return _load_route(routes)
end

View File

@ -1,9 +1,7 @@
-- Copyright (C) Yuansheng Wang
local log = require("apisix.core.log")
local core = require("apisix.core")
local etcd = require("apisix.core.config_etcd")
local table_nkeys = require("table.nkeys")
local new_tab = require("table.new")
local insert_tab = table.insert
local ngx = ngx
local pcall = pcall
@ -18,12 +16,12 @@ local function load(etcd_routes)
local routes, err = etcd_routes:fetch()
if not routes then
if err ~= "timeout" then
log.error("failed to fetch routes: ", err)
core.log.error("failed to fetch routes: ", err)
end
return
end
local arr_routes = new_tab(table_nkeys(routes), 0)
local arr_routes = core.table.new(core.table.nkeys(routes), 0)
for route_id, route in pairs(routes) do
if type(route) == "table" then
route.id = route_id
@ -31,7 +29,7 @@ local function load(etcd_routes)
end
end
-- log.warn(apisix.json.encode(arr_routes))
-- core.log.warn(apisix.json.encode(arr_routes))
if callback then
callback(arr_routes)
end
@ -56,7 +54,7 @@ function _M.load(premature)
running = false
if not ok then
log.error("failed to call `load` function: ", err)
core.log.error("failed to call `load` function: ", err)
end
end