From fd626ae7e19803a43de7c16f2f0c1f7ae2970309 Mon Sep 17 00:00:00 2001 From: Nirojan Selvanathan Date: Thu, 7 May 2020 05:22:06 +0200 Subject: [PATCH] bugfix: Adding function to remove stale objects from kafka logger (#1526) --- apisix/plugins/kafka-logger.lua | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index a9050b9d..1641c5e1 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -21,7 +21,11 @@ local batch_processor = require("apisix.utils.batch-processor") local pairs = pairs local type = type local table = table +local ipairs = ipairs local plugin_name = "kafka-logger" +local stale_timer_running = false; +local timer_at = ngx.timer.at +local tostring = tostring local ngx = ngx local buffers = {} @@ -90,6 +94,22 @@ local function send_kafka_data(conf, log_message) end +local function remove_stale_objects(premature) + if premature then + return + end + + for key, batch in ipairs(buffers) do + if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then + core.log.debug("removing batch processor stale object, route id:", tostring(key)) + buffers[key] = nil + end + end + + stale_timer_running = false +end + + function _M.log(conf) local entry = log_util.get_full_log(ngx) @@ -100,6 +120,12 @@ function _M.log(conf) local log_buffer = buffers[entry.route_id] + if not stale_timer_running then + -- run the timer every 30 mins if any log is present + timer_at(1800, remove_stale_objects) + stale_timer_running = true + end + if log_buffer then log_buffer:push(entry) return