feature: Batch processor implementation to aggregate logs in batch (#1121)

This commit is contained in:
Nirojan Selvanathan 2020-02-20 08:25:20 +01:00 committed by GitHub
parent fbb9bd039c
commit d50727bcf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 661 additions and 0 deletions

72
doc/batch-processor.md Normal file
View File

@ -0,0 +1,72 @@
<!--
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-->
# Batch Processor
The batch processor can be used to aggregate entries(logs/any data) and process them in a batch.
When the batch_max_size is set to zero the processor will execute each entry immediately. Setting the batch max size more
than 1 will start aggregating the entries until it reaches the max size or the timeout expires.
## Configurations
The only mandatory parameter to create a batch processor is a function. The function will be executed when the batch reaches the max size
or when the buffer duration exceeds.
|Name |Requirement |Description|
|------- |----- |------|
|id |optional |A unique identifier to identity the batch processor|
|batch_max_size |optional |Max size of each batch, default is 1000|
|inactive_timeout|optional |maximum age in seconds when the buffer will be flushed if inactive, default is 5s|
|buffer_duration|optional |Maximum age in seconds of the oldest entry in a batch before the batch must be processed, default is 5|
|max_retry_count|optional |Maximum number of retries before removing from the processing pipe line; default is zero|
|retry_delay |optional |Number of seconds the process execution should be delayed if the execution fails; default is 1|
The following code shows an example of how to use a batch processor. The batch processor takes a function to be executed as the first
argument and the batch configuration as the second parameter.
```lua
local bp = require("apisix.plugins.batch-processor")
local func_to_execute = function(entries)
-- serialize to json array core.json.encode(entries)
-- process/send data
return true
end
local config = {
max_retry_count = 2,
buffer_duration = 60,
inactive_timeout = 5,
batch_max_size = 1,
retry_delay = 0
}
local batch_processor, err = bp:new(func_to_execute, config)
if batch_processor then
batch_processor:push({hello='world'})
end
```
Note: Please make sure the batch max size (entry count) is within the limits of the function execution.
The timer to flush the batch runs based on the `inactive_timeout` configuration. Thus, for optimal usage,
keep the `inactive_timeout` smaller than the `buffer_duration`.

View File

@ -0,0 +1,180 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local setmetatable = setmetatable
local timer_at = ngx.timer.at
local fmt = string.format
local ipairs = ipairs
local table = table
local now = ngx.now
local type = type
local Batch_Processor = {}
local Batch_Processor_mt = {
__index = Batch_Processor
}
local execute_func
local create_buffer_timer
local schema = {
type = "object",
properties = {
name = {type = "string", default = "log buffer"},
max_retry_count = {type = "integer", minimum = 0, default= 0},
retry_delay = {type = "integer", minimum = 0, default= 1},
buffer_duration = {type = "integer", minimum = 1, default= 60},
inactive_timeout = {type = "integer", minimum = 1, default= 5},
batch_max_size = {type = "integer", minimum = 1, default= 1000},
}
}
local function schedule_func_exec(batch_processor, delay, batch)
local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
if not hdl then
core.log.error("failed to create process timer: ", err)
return
end
end
function execute_func(premature, batch_processor, batch)
if premature then
return
end
local ok, err = batch_processor.func(batch.entries)
if not ok then
batch.retry_count = batch.retry_count + 1
if batch.retry_count < batch_processor.max_retry_count then
core.log.warn(fmt("Batch Processor[%s] failed to process entries: ",
batch_processor.name), err)
schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
else
core.log.error(fmt(("Batch Processor[%s] exceeded the max_retry_count[%d] "
.. "dropping the entries"), batch_processor.name, batch.retry_count))
end
return
end
core.log.debug(fmt("Batch Processor[%s] successfully processed the entries",
batch_processor.name))
end
local function flush_buffer(premature, batch_processor)
if premature then
return
end
if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush",
batch_processor.name))
batch_processor:process_buffer()
batch_processor.is_timer_running = false
return
end
-- buffer duration did not exceed or the buffer is active, extending the timer
core.log.debug(fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
create_buffer_timer(batch_processor)
end
function create_buffer_timer(batch_processor)
local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
if not hdl then
core.log.error("failed to create buffer timer: ", err)
return
end
batch_processor.is_timer_running = true
end
function Batch_Processor:new(func, config)
local ok, err = core.schema.check(schema, config)
if not ok then
return err
end
if not(type(func) == "function") then
return nil, "Invalid argument, arg #1 must be a function"
end
local batch_processor = {
func = func,
buffer_duration = config.buffer_duration,
inactive_timeout = config.inactive_timeout,
max_retry_count = config.max_retry_count,
batch_max_size = config.batch_max_size,
retry_delay = config.retry_delay,
name = config.name,
batch_to_process = {},
entry_buffer = { entries = {}, retry_count = 0},
is_timer_running = false,
first_entry_t = 0,
last_entry_t = 0
}
return setmetatable(batch_processor, Batch_Processor_mt)
end
function Batch_Processor:push(entry)
-- if the batch size is one then immediately send for processing
if self.batch_max_size == 1 then
local batch = { entries = { entry }, retry_count = 0 }
schedule_func_exec(self, 0, batch)
end
local entries = self.entry_buffer.entries
table.insert(entries, entry)
if #entries == 1 then
self.first_entry_t = now()
end
self.last_entry_t = now()
if self.batch_max_size <= #entries then
core.log.debug(fmt("batch processor[%s] batch max size has exceeded", self.name))
self:process_buffer()
end
if not self.is_timer_running then
create_buffer_timer(self)
end
end
function Batch_Processor:process_buffer()
-- If entries are present in the buffer move the entries to processing
if #self.entry_buffer.entries > 0 then
core.log.debug(fmt("tranferring buffer entries to processing pipe line, buffercount[%d]",
#self.entry_buffer.entries))
self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer
self.entry_buffer = { entries = {}, retry_count = 0 }
end
for _, batch in ipairs(self.batch_to_process) do
schedule_func_exec(self, 0, batch)
end
self.batch_to_process = {}
end
return Batch_Processor

409
t/plugin/batch-processor.t Normal file
View File

@ -0,0 +1,409 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX 'no_plan';
log_level('debug');
repeat_each(1);
no_long_string();
no_root_location();
run_tests;
__DATA__
=== TEST 1: send invalid arguments for constructor
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.plugins.batch-processor")
local config = {
max_retry_count = 2,
batch_max_size = 1,
process_delay = 0,
retry_delay = 0,
}
local func_to_send = function(elements)
return true
end
local log_buffer, err = Batch:new("", config)
if log_buffer then
log_buffer:push({hello='world'})
ngx.say("done")
end
if not log_buffer then
ngx.say("failed")
end
}
}
--- request
GET /t
--- response_body
failed
--- wait: 0.5
=== TEST 2: sanity
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.plugins.batch-processor")
local func_to_send = function(elements)
return true
end
local config = {
max_retry_count = 2,
batch_max_size = 1,
process_delay = 0,
retry_delay = 0,
}
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- error_log
Batch Processor[log buffer] successfully processed the entries
--- wait: 0.5
=== TEST 3: batch processor timeout exceeded
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.plugins.batch-processor")
local config = {
max_retry_count = 2,
batch_max_size = 2,
process_delay = 0,
retry_delay = 0,
inactive_timeout = 1
}
local func_to_send = function(elements)
return true
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- error_log
BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
Batch Processor[log buffer] successfully processed the entries
--- wait: 3
=== TEST 4: batch processor batch max size exceeded
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.plugins.batch-processor")
local config = {
max_retry_count = 2,
batch_max_size = 2,
process_delay = 0,
retry_delay = 0,
}
local func_to_send = function(elements)
return true
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- no_error_log
BatchProcessor[log buffer] activating flush due to no activity
--- error_log
batch processor[log buffer] batch max size has exceeded
Batch Processor[log buffer] successfully processed the entries
--- wait: 0.5
=== TEST 5: first failed to process and second try success
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.plugins.batch-processor")
local core = require("apisix.core")
local retry = false
local config = {
max_retry_count = 2,
batch_max_size = 2,
process_delay = 0,
retry_delay = 0,
}
local func_to_send = function(elements)
if not retry then
retry = true
return false
end
return true
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- error_log
Batch Processor[log buffer] failed to process entries
Batch Processor[log buffer] successfully processed the entries
--- wait: 0.5
=== TEST 6: Exceeding max retry count
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.plugins.batch-processor")
local config = {
max_retry_count = 2,
batch_max_size = 2,
process_delay = 0,
retry_delay = 0,
}
local func_to_send = function(elements)
return false
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- no_error_log
BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
--- error_log
Batch Processor[log buffer] failed to process entries
Batch Processor[log buffer] exceeded the max_retry_count
--- wait: 0.5
=== TEST 7: two batches
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.plugins.batch-processor")
local core = require("apisix.core")
local count = 0
local config = {
max_retry_count = 2,
batch_max_size = 2,
process_delay = 0,
retry_delay = 0,
}
local func_to_send = function(elements)
count = count + 1
core.log.info("batch[", count , "] sent")
return true
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- no_error_log
BatchProcessor[log buffer] activating flush due to no activity
--- error_log
batch[1] sent
batch[2] sent
--- wait: 0.5
=== TEST 8: batch processor retry count 0 and fail processing
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.plugins.batch-processor")
local config = {
max_retry_count = 0,
batch_max_size = 2,
process_delay = 0,
retry_delay = 0,
}
local func_to_send = function(elements)
return false
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- no_error_log
BatchProcessor[log buffer] activating flush due to no activity
Batch Processor[log buffer] failed to process entries
--- error_log
Batch Processor[log buffer] exceeded the max_retry_count
--- wait: 0.5
=== TEST 9: batch processor timeout exceeded
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.plugins.batch-processor")
local config = {
max_retry_count = 2,
batch_max_size = 2,
process_delay = 0,
retry_delay = 0,
buffer_duration = 60,
inactive_timeout = 1,
}
local func_to_send = function(elements)
return true
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- error_log
BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
Batch Processor[log buffer] successfully processed the entries
--- wait: 3
=== TEST 10: json encode and log elements
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.plugins.batch-processor")
local core = require("apisix.core")
local config = {
max_retry_count = 2,
batch_max_size = 2,
process_delay = 0,
retry_delay = 0,
}
local func_to_send = function(elements)
core.log.info(core.json.encode(elements))
return true
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({msg='1'})
log_buffer:push({msg='2'})
log_buffer:push({msg='3'})
log_buffer:push({msg='4'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- no_error_log
BatchProcessor[log buffer] activating flush due to no activity
--- error_log
[{"msg":"1"},{"msg":"2"}]
[{"msg":"3"},{"msg":"4"}]
--- wait: 0.5