mirror of
https://gitee.com/iresty/apisix.git
synced 2024-12-04 04:57:35 +08:00
fix(batch-processor): we didn't free stale object actually (#5700)
This commit is contained in:
parent
319f6208ba
commit
4df272c548
@ -26,6 +26,7 @@ local format = string.format
|
|||||||
local concat = table.concat
|
local concat = table.concat
|
||||||
local buffers = {}
|
local buffers = {}
|
||||||
local ipairs = ipairs
|
local ipairs = ipairs
|
||||||
|
local pairs = pairs
|
||||||
local tostring = tostring
|
local tostring = tostring
|
||||||
local stale_timer_running = false
|
local stale_timer_running = false
|
||||||
local timer_at = ngx.timer.at
|
local timer_at = ngx.timer.at
|
||||||
@ -120,7 +121,7 @@ local function remove_stale_objects(premature)
|
|||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
for key, batch in ipairs(buffers) do
|
for key, batch in pairs(buffers) do
|
||||||
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
||||||
core.log.warn("removing batch processor stale object, conf: ",
|
core.log.warn("removing batch processor stale object, conf: ",
|
||||||
core.json.delay_encode(key))
|
core.json.delay_encode(key))
|
||||||
|
@ -18,7 +18,7 @@
|
|||||||
local core = require("apisix.core")
|
local core = require("apisix.core")
|
||||||
local ngx = ngx
|
local ngx = ngx
|
||||||
local tostring = tostring
|
local tostring = tostring
|
||||||
local ipairs = ipairs
|
local pairs = pairs
|
||||||
local ngx_timer_at = ngx.timer.at
|
local ngx_timer_at = ngx.timer.at
|
||||||
local http = require("resty.http")
|
local http = require("resty.http")
|
||||||
local log_util = require("apisix.utils.log-util")
|
local log_util = require("apisix.utils.log-util")
|
||||||
@ -128,7 +128,7 @@ local function remove_stale_objects(premature)
|
|||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
for key, batch in ipairs(buffers) do
|
for key, batch in pairs(buffers) do
|
||||||
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
||||||
core.log.warn("removing batch processor stale object, route id:", tostring(key))
|
core.log.warn("removing batch processor stale object, route id:", tostring(key))
|
||||||
buffers[key] = nil
|
buffers[key] = nil
|
||||||
|
@ -25,6 +25,7 @@ local plugin = require("apisix.plugin")
|
|||||||
local ngx = ngx
|
local ngx = ngx
|
||||||
local tostring = tostring
|
local tostring = tostring
|
||||||
local ipairs = ipairs
|
local ipairs = ipairs
|
||||||
|
local pairs = pairs
|
||||||
local timer_at = ngx.timer.at
|
local timer_at = ngx.timer.at
|
||||||
|
|
||||||
local plugin_name = "http-logger"
|
local plugin_name = "http-logger"
|
||||||
@ -166,7 +167,7 @@ local function remove_stale_objects(premature)
|
|||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
for key, batch in ipairs(buffers) do
|
for key, batch in pairs(buffers) do
|
||||||
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
||||||
core.log.warn("removing batch processor stale object, conf: ",
|
core.log.warn("removing batch processor stale object, conf: ",
|
||||||
core.json.delay_encode(key))
|
core.json.delay_encode(key))
|
||||||
|
@ -23,7 +23,6 @@ local plugin = require("apisix.plugin")
|
|||||||
local math = math
|
local math = math
|
||||||
local pairs = pairs
|
local pairs = pairs
|
||||||
local type = type
|
local type = type
|
||||||
local ipairs = ipairs
|
|
||||||
local plugin_name = "kafka-logger"
|
local plugin_name = "kafka-logger"
|
||||||
local stale_timer_running = false
|
local stale_timer_running = false
|
||||||
local timer_at = ngx.timer.at
|
local timer_at = ngx.timer.at
|
||||||
@ -164,7 +163,7 @@ local function remove_stale_objects(premature)
|
|||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
for key, batch in ipairs(buffers) do
|
for key, batch in pairs(buffers) do
|
||||||
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
||||||
core.log.warn("removing batch processor stale object, conf: ",
|
core.log.warn("removing batch processor stale object, conf: ",
|
||||||
core.json.delay_encode(key))
|
core.json.delay_encode(key))
|
||||||
|
@ -28,7 +28,7 @@ local ngx_re = require("ngx.re")
|
|||||||
local ngx = ngx
|
local ngx = ngx
|
||||||
local tostring = tostring
|
local tostring = tostring
|
||||||
local tonumber = tonumber
|
local tonumber = tonumber
|
||||||
local ipairs = ipairs
|
local pairs = pairs
|
||||||
local timer_at = ngx.timer.at
|
local timer_at = ngx.timer.at
|
||||||
|
|
||||||
local plugin_name = "skywalking-logger"
|
local plugin_name = "skywalking-logger"
|
||||||
@ -130,7 +130,7 @@ local function remove_stale_objects(premature)
|
|||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
for key, batch in ipairs(buffers) do
|
for key, batch in pairs(buffers) do
|
||||||
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
||||||
core.log.warn("removing batch processor stale object, conf: ",
|
core.log.warn("removing batch processor stale object, conf: ",
|
||||||
core.json.delay_encode(key))
|
core.json.delay_encode(key))
|
||||||
|
@ -26,6 +26,7 @@ local tcp = ngx.socket.tcp
|
|||||||
local buffers = {}
|
local buffers = {}
|
||||||
local tostring = tostring
|
local tostring = tostring
|
||||||
local ipairs = ipairs
|
local ipairs = ipairs
|
||||||
|
local pairs = pairs
|
||||||
local table = table
|
local table = table
|
||||||
local schema = {
|
local schema = {
|
||||||
type = "object",
|
type = "object",
|
||||||
@ -116,7 +117,7 @@ local function remove_stale_objects(premature)
|
|||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
for key, batch in ipairs(buffers) do
|
for key, batch in pairs(buffers) do
|
||||||
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
||||||
core.log.warn("removing batch processor stale object, route id:", tostring(key))
|
core.log.warn("removing batch processor stale object, route id:", tostring(key))
|
||||||
buffers[key] = nil
|
buffers[key] = nil
|
||||||
|
@ -22,7 +22,7 @@ local logger_socket = require("resty.logger.socket")
|
|||||||
local plugin_name = "syslog"
|
local plugin_name = "syslog"
|
||||||
local ngx = ngx
|
local ngx = ngx
|
||||||
local buffers = {}
|
local buffers = {}
|
||||||
local ipairs = ipairs
|
local pairs = pairs
|
||||||
local stale_timer_running = false;
|
local stale_timer_running = false;
|
||||||
local timer_at = ngx.timer.at
|
local timer_at = ngx.timer.at
|
||||||
|
|
||||||
@ -121,7 +121,7 @@ local function remove_stale_objects(premature)
|
|||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
for key, batch in ipairs(buffers) do
|
for key, batch in pairs(buffers) do
|
||||||
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
||||||
core.log.warn("removing batch processor stale object, conf: ",
|
core.log.warn("removing batch processor stale object, conf: ",
|
||||||
core.json.delay_encode(key))
|
core.json.delay_encode(key))
|
||||||
|
@ -22,7 +22,7 @@ local tostring = tostring
|
|||||||
local buffers = {}
|
local buffers = {}
|
||||||
local ngx = ngx
|
local ngx = ngx
|
||||||
local tcp = ngx.socket.tcp
|
local tcp = ngx.socket.tcp
|
||||||
local ipairs = ipairs
|
local pairs = pairs
|
||||||
local stale_timer_running = false
|
local stale_timer_running = false
|
||||||
local timer_at = ngx.timer.at
|
local timer_at = ngx.timer.at
|
||||||
|
|
||||||
@ -106,7 +106,7 @@ local function remove_stale_objects(premature)
|
|||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
for key, batch in ipairs(buffers) do
|
for key, batch in pairs(buffers) do
|
||||||
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
||||||
core.log.warn("removing batch processor stale object, conf: ",
|
core.log.warn("removing batch processor stale object, conf: ",
|
||||||
core.json.delay_encode(key))
|
core.json.delay_encode(key))
|
||||||
|
@ -22,7 +22,7 @@ local tostring = tostring
|
|||||||
local buffers = {}
|
local buffers = {}
|
||||||
local ngx = ngx
|
local ngx = ngx
|
||||||
local udp = ngx.socket.udp
|
local udp = ngx.socket.udp
|
||||||
local ipairs = ipairs
|
local pairs = pairs
|
||||||
local stale_timer_running = false;
|
local stale_timer_running = false;
|
||||||
local timer_at = ngx.timer.at
|
local timer_at = ngx.timer.at
|
||||||
|
|
||||||
@ -90,7 +90,7 @@ local function remove_stale_objects(premature)
|
|||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
for key, batch in ipairs(buffers) do
|
for key, batch in pairs(buffers) do
|
||||||
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
|
||||||
core.log.warn("removing batch processor stale object, conf: ",
|
core.log.warn("removing batch processor stale object, conf: ",
|
||||||
core.json.delay_encode(key))
|
core.json.delay_encode(key))
|
||||||
|
Loading…
Reference in New Issue
Block a user