mirror of
https://gitee.com/iresty/apisix.git
synced 2024-12-14 00:41:35 +08:00
bugfix: Adding function to remove stale objects from kafka logger (#1526)
This commit is contained in:
parent
e8091f6e2c
commit
fd626ae7e1
@ -21,7 +21,11 @@ local batch_processor = require("apisix.utils.batch-processor")
|
||||
local pairs = pairs
|
||||
local type = type
|
||||
local table = table
|
||||
local ipairs = ipairs
|
||||
local plugin_name = "kafka-logger"
|
||||
local stale_timer_running = false;
|
||||
local timer_at = ngx.timer.at
|
||||
local tostring = tostring
|
||||
local ngx = ngx
|
||||
local buffers = {}
|
||||
|
||||
@ -90,6 +94,22 @@ local function send_kafka_data(conf, log_message)
|
||||
end
|
||||
|
||||
|
||||
local function remove_stale_objects(premature)
|
||||
if premature then
|
||||
return
|
||||
end
|
||||
|
||||
for key, batch in ipairs(buffers) do
|
||||
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
||||
core.log.debug("removing batch processor stale object, route id:", tostring(key))
|
||||
buffers[key] = nil
|
||||
end
|
||||
end
|
||||
|
||||
stale_timer_running = false
|
||||
end
|
||||
|
||||
|
||||
function _M.log(conf)
|
||||
local entry = log_util.get_full_log(ngx)
|
||||
|
||||
@ -100,6 +120,12 @@ function _M.log(conf)
|
||||
|
||||
local log_buffer = buffers[entry.route_id]
|
||||
|
||||
if not stale_timer_running then
|
||||
-- run the timer every 30 mins if any log is present
|
||||
timer_at(1800, remove_stale_objects)
|
||||
stale_timer_running = true
|
||||
end
|
||||
|
||||
if log_buffer then
|
||||
log_buffer:push(entry)
|
||||
return
|
||||
|
Loading…
Reference in New Issue
Block a user