diff --git a/apisix/plugins/udp-logger.lua b/apisix/plugins/udp-logger.lua index b1b565fb..957e732f 100644 --- a/apisix/plugins/udp-logger.lua +++ b/apisix/plugins/udp-logger.lua @@ -22,6 +22,9 @@ local tostring = tostring local buffers = {} local ngx = ngx local udp = ngx.socket.udp +local ipairs = ipairs +local stale_timer_running = false; +local timer_at = ngx.timer.at local schema = { type = "object", @@ -78,6 +81,22 @@ local function send_udp_data(conf, log_message) end +local function remove_stale_objects(premature) + if premature then + return + end + + for key, batch in ipairs(buffers) do + if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then + core.log.debug("removing batch processor stale object, route id:", tostring(key)) + buffers[key] = nil + end + end + + stale_timer_running = false +end + + function _M.log(conf) local entry = log_util.get_full_log(ngx) @@ -88,6 +107,12 @@ function _M.log(conf) local log_buffer = buffers[entry.route_id] + if not stale_timer_running then + -- run the timer every 30 mins if any log is present + timer_at(1800, remove_stale_objects) + stale_timer_running = true + end + if log_buffer then log_buffer:push(entry) return