feat(loggly): support http/s bulk sending for batch logs (#6212)

This commit is contained in:
Bisakh 2022-01-28 07:23:47 +05:30 committed by GitHub
parent 427e9269ef
commit 46834b2b69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 167 additions and 30 deletions

View File

@ -18,6 +18,8 @@ local core = require("apisix.core")
local plugin = require("apisix.plugin")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local log_util = require("apisix.utils.log-util")
local path = require("pl.path")
local http = require("resty.http")
local ngx = ngx
local tostring = tostring
local pairs = pairs
@ -76,6 +78,12 @@ local schema = {
-- we prevent of having `tag=` prefix
pattern = "^(?!tag=)[ -~]*",
},
default = {"apisix"}
},
ssl_verify = {
-- applicable for https protocol
type = "boolean",
default = true
},
},
required = {"customer_token"}
@ -104,8 +112,8 @@ local metadata_schema = {
protocol = {
type = "string",
default = defaults.protocol,
-- more methods coming soon
enum = {"syslog"}
-- in case of http and https, we use bulk endpoints
enum = {"syslog", "http", "https"}
},
timeout = {
type = "integer",
@ -158,13 +166,17 @@ local function generate_log_message(conf, ctx)
entry = log_util.get_full_log(ngx, conf)
end
-- generate rfc5424 compliant syslog event
local json_str, err = core.json.encode(entry)
if not json_str then
core.log.error('error occurred while encoding the data: ', err)
return nil
end
if metadata.value.protocol ~= "syslog" then
return json_str
end
-- generate rfc5424 compliant syslog event
local timestamp = log_util.get_rfc3339_zulu_timestamp()
local taglist = {}
if conf.tags then
@ -188,24 +200,13 @@ local function generate_log_message(conf, ctx)
end
local function send_data_over_udp(message)
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))
if not metadata then
core.log.info("received nil metadata: using metadata defaults: ",
core.json.delay_encode(defaults, true))
metadata = {}
metadata.value = defaults
end
local function send_data_over_udp(message, metadata)
local err_msg
local res = true
local sock = udp()
local host, port = metadata.value.host, metadata.value.port
sock:settimeout(metadata.value.timeout)
core.log.info("sending a batch logs to ", host, ":", port)
local ok, err = sock:setpeername(host, port)
if not ok then
@ -232,13 +233,72 @@ local function send_data_over_udp(message)
end
local function handle_log(entries)
for i = 1, #entries do
local ok, err = send_data_over_udp(entries[i])
if not ok then
return false, err
local function send_bulk_over_http(message, metadata, conf)
local endpoint = path.join(metadata.value.host, "bulk", conf.customer_token, "tag", "bulk")
local has_prefix = core.string.has_prefix(metadata.value.host, "http")
if not has_prefix then
if metadata.value.protocol == "http" then
endpoint = "http://" .. endpoint
else
endpoint = "https://" .. endpoint
end
end
local httpc = http.new()
httpc:set_timeout(metadata.value.timeout)
local res, err = httpc:request_uri(endpoint, {
ssl_verify = conf.ssl_verify,
method = "POST",
body = message,
headers = {
["Content-Type"] = "application/json",
["X-LOGGLY-TAG"] = conf.tags
},
})
if not res then
return false, "failed to write log to loggly, " .. err
end
if res.status ~= 200 then
local body = core.json.decode(res.body)
if not body then
return false, "failed to send log to loggly, http status code: " .. res.status
else
return false, "failed to send log to loggly, http status code: " .. res.status
.. " response body: ".. res.body
end
end
return true
end
local handle_http_payload
local function handle_log(entries)
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))
if not metadata then
core.log.info("received nil metadata: using metadata defaults: ",
core.json.delay_encode(defaults, true))
metadata = {}
metadata.value = defaults
end
core.log.info("sending a batch logs to ", metadata.value.host)
if metadata.value.protocol == "syslog" then
for i = 1, #entries do
local ok, err = send_data_over_udp(entries[i], metadata)
if not ok then
return false, err
end
end
else
return handle_http_payload(entries, metadata)
end
return true
end
@ -249,6 +309,12 @@ function _M.log(conf, ctx)
return
end
handle_http_payload = function (entries, metadata)
-- loggly bulk endpoint expects entries concatenated in newline("\n")
local message = tab_concat(entries, "\n")
return send_bulk_over_http(message, metadata, conf)
end
if batch_processor_manager:add_entry(conf, log_data) then
return
end

View File

@ -64,9 +64,9 @@ To generate a Customer Token, head over to `<your assigned subdomain>/loggly.com
| Name | Type | Requirement | Default | Valid | Description |
| ----------- | ------ | ----------- | ------- | ----- | ---------------------------------------------------------------------- |
| host | string | optional | "logs-01.loggly.com" | | The host address endpoint where logs are being sent. |
| port | integer | optional | 514 | | Loggly host port to make a connection request. |
| port | integer | optional | 514 | | Loggly port (for "syslog" protocol only) to make a connection request. |
| timeout | integer | optional | 5000 | | Loggly send data request timeout in milliseconds. |
| protocol | string | optional | "syslog" | | Protocol through which the logs are sent to Loggly from APISIX (currently supported protocol : "syslog") |
| protocol | string | optional | "syslog" | [ "syslog" , "http", "https" ] | Protocol through which the logs are sent to Loggly from APISIX (currently supported protocol : "syslog", "http", "https") |
| log_format | object | optional | nil | | Log format declared as key value pair in JSON format. Only string is supported in the `value` part. If the value starts with `$`, it means to get [`APISIX` variables](../apisix-variable.md) or [Nginx variable](http://nginx.org/en/docs/varindex.html). If it is nil or empty object, APISIX generates full log info. |
## How To Enable
@ -100,6 +100,15 @@ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f13
}'
```
We support [Syslog](https://documentation.solarwinds.com/en/success_center/loggly/content/admin/streaming-syslog-without-using-files.htm), [HTTP/S](https://documentation.solarwinds.com/en/success_center/loggly/content/admin/http-bulk-endpoint.htm) (bulk endpoint) protocols to send log events to Loggly. By default, in APISIX side, the protocol is set to "syslog". It lets you send RFC5424 compliant syslog events with some fine-grained control (log severity mapping based on upstream HTTP response code). But HTTP/S bulk endpoint is great to send larger batches of log events with faster transmission speed. If you wish to update it, just update the metadata
```shell
curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/loggly -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"protocol": "http"
}'
```
### Minimal configuration
```shell

78
t/plugin/loggly.t vendored
View File

@ -39,6 +39,26 @@ _EOC_
$block->set_value("extra_stream_config", $stream_config);
}
my $http_config = $block->http_config // <<_EOC_;
server {
listen 10420;
location /loggly/bulk/tok/tag/bulk {
content_by_lua_block {
ngx.req.read_body()
local data = ngx.req.get_body_data()
local headers = ngx.req.get_headers()
ngx.log(ngx.ERR, "loggly body: ", data)
ngx.log(ngx.ERR, "loggly tags: " .. require("toolkit.json").encode(headers["X-LOGGLY-TAG"]))
ngx.say("ok")
}
}
}
_EOC_
$block->set_value("http_config", $http_config);
if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
$block->set_value("no_error_log", "[error]");
}
@ -214,8 +234,8 @@ opentracing
--- grep_error_log eval
qr/message received: .+?(?= \{)/
--- grep_error_log_out eval
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[test-token\@41058 ]
message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[test-token\@41058 ]/
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[test-token\@41058 tag="apisix"]
message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[test-token\@41058 tag="apisix"]/
@ -318,7 +338,7 @@ opentracing
--- grep_error_log eval
qr/message received: .+?(?= \{)/
--- grep_error_log_out eval
qr/message received: <10>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[token-1\@41058 ]/
qr/message received: <10>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[token-1\@41058 tag="apisix"]/
@ -341,7 +361,7 @@ opentracing
--- grep_error_log eval
qr/message received: [ -~]+/
--- grep_error_log_out eval
qr/message received: <10>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[token-1\@41058 ] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{\},"size":[\d]+,"uri":"\/opentracing","url":"http:\/\/127\.0\.0\.1:1984\/opentracing"\},"response":\{"headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/
qr/message received: <10>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[token-1\@41058 tag="apisix"] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{\},"size":[\d]+,"uri":"\/opentracing","url":"http:\/\/127\.0\.0\.1:1984\/opentracing"\},"response":\{"headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/
@ -389,7 +409,7 @@ opentracing
--- grep_error_log eval
qr/message received: [ -~]+/
--- grep_error_log_out eval
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 ] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{\},"size":[\d]+,"uri":"\/opentracing","url":"http:\/\/127\.0\.0\.1:1984\/opentracing"\},"response":\{"body":"opentracing\\n","headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 tag="apisix"] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{\},"size":[\d]+,"uri":"\/opentracing","url":"http:\/\/127\.0\.0\.1:1984\/opentracing"\},"response":\{"body":"opentracing\\n","headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/
@ -442,7 +462,7 @@ opentracing
--- grep_error_log eval
qr/message received: [ -~]+/
--- grep_error_log_out eval
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 ] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{"bar":"bar"\},"size":[\d]+,"uri":"\/opentracing\?bar=bar","url":"http:\/\/127\.0\.0\.1:1984\/opentracing\?bar=bar"\},"response":\{"body":"opentracing\\n","headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 tag="apisix"] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{"bar":"bar"\},"size":[\d]+,"uri":"\/opentracing\?bar=bar","url":"http:\/\/127\.0\.0\.1:1984\/opentracing\?bar=bar"\},"response":\{"body":"opentracing\\n","headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/
@ -466,7 +486,7 @@ opentracing
--- grep_error_log eval
qr/message received: [ -~]+/
--- grep_error_log_out eval
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 ] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{"foo":"bar"\},"size":[\d]+,"uri":"\/opentracing\?foo=bar","url":"http:\/\/127\.0\.0\.1:1984\/opentracing\?foo=bar"\},"response":\{"headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 tag="apisix"] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{"foo":"bar"\},"size":[\d]+,"uri":"\/opentracing\?foo=bar","url":"http:\/\/127\.0\.0\.1:1984\/opentracing\?foo=bar"\},"response":\{"headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/
@ -509,4 +529,46 @@ opentracing
--- grep_error_log eval
qr/message received: [ -~]+/
--- grep_error_log_out eval
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 ] \{"client":"[\d.]+","host":"[\d.]+","route_id":"1"\}/
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 tag="apisix"] \{"client":"[\d.]+","host":"[\d.]+","route_id":"1"\}/
=== TEST 12: loggly http protocol
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/plugin_metadata/loggly',
ngx.HTTP_PUT,
{
host = ngx.var.server_addr .. ":10420/loggly",
protocol = "http",
log_format = {
["route_id"] = "$route_id",
}
}
)
if code >= 300 then
ngx.status = code
ngx.say("fail")
return
end
ngx.say(body)
local code, _, body = t("/opentracing", "GET")
if code >= 300 then
ngx.status = code
ngx.say("fail")
return
end
ngx.print(body)
}
}
--- wait: 2
--- response_body
passed
opentracing
--- error_log
loggly body: {"route_id":"1"}
loggly tags: "apisix"