From 3b7e3fde0559155c79060097a27c1d3010df5046 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 5 May 2022 10:02:36 +0800 Subject: [PATCH] feat(stream): port syslog plugin (#6953) --- Makefile | 3 + apisix/plugins/syslog.lua | 82 +------- apisix/plugins/syslog/init.lua | 103 ++++++++++ apisix/stream/plugins/syslog.lua | 82 ++++++++ apisix/utils/batch-processor.lua | 7 +- conf/config-default.yaml | 1 + t/stream-plugin/syslog.t | 343 +++++++++++++++++++++++++++++++ 7 files changed, 541 insertions(+), 80 deletions(-) create mode 100644 apisix/plugins/syslog/init.lua create mode 100644 apisix/stream/plugins/syslog.lua create mode 100644 t/stream-plugin/syslog.t diff --git a/Makefile b/Makefile index 39131e20..3bc97429 100644 --- a/Makefile +++ b/Makefile @@ -334,6 +334,9 @@ install: runtime $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/slslog $(ENV_INSTALL) apisix/plugins/slslog/*.lua $(ENV_INST_LUADIR)/apisix/plugins/slslog/ + $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/syslog + $(ENV_INSTALL) apisix/plugins/syslog/*.lua $(ENV_INST_LUADIR)/apisix/plugins/syslog/ + $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/zipkin $(ENV_INSTALL) apisix/plugins/zipkin/*.lua $(ENV_INST_LUADIR)/apisix/plugins/zipkin/ diff --git a/apisix/plugins/syslog.lua b/apisix/plugins/syslog.lua index be73cac5..7eb4675c 100644 --- a/apisix/plugins/syslog.lua +++ b/apisix/plugins/syslog.lua @@ -18,12 +18,11 @@ local core = require("apisix.core") local log_util = require("apisix.utils.log-util") local bp_manager_mod = require("apisix.utils.batch-processor-manager") -local logger_socket = require("resty.logger.socket") +local syslog = require("apisix.plugins.syslog.init") local plugin_name = "syslog" local ngx = ngx - -local batch_processor_manager = bp_manager_mod.new("sys logger") +local batch_processor_manager = bp_manager_mod.new("http sys logger") local schema = { type = "object", properties = { @@ -43,11 +42,6 @@ local schema = { } -local lrucache = core.lrucache.new({ - ttl = 300, count = 512, serial_creating = true, -}) - - local schema = batch_processor_manager:wrap_schema(schema) local _M = { @@ -55,6 +49,7 @@ local _M = { priority = 401, name = plugin_name, schema = schema, + flush_syslog = syslog.flush_syslog, } @@ -70,78 +65,9 @@ function _M.check_schema(conf) end -function _M.flush_syslog(logger) - local ok, err = logger:flush(logger) - if not ok then - core.log.error("failed to flush message:", err) - end - - return ok -end - - -local function send_syslog_data(conf, log_message, api_ctx) - local err_msg - local res = true - - core.log.info("sending a batch logs to ", conf.host, ":", conf.port) - - -- fetch it from lrucache - local logger, err = core.lrucache.plugin_ctx( - lrucache, api_ctx, nil, logger_socket.new, logger_socket, { - host = conf.host, - port = conf.port, - flush_limit = conf.flush_limit, - drop_limit = conf.drop_limit, - timeout = conf.timeout, - sock_type = conf.sock_type, - pool_size = conf.pool_size, - tls = conf.tls, - } - ) - - if not logger then - res = false - err_msg = "failed when initiating the sys logger processor".. err - end - - -- reuse the logger object - local ok, err = logger:log(core.json.encode(log_message)) - if not ok then - res = false - err_msg = "failed to log message" .. err - end - - return res, err_msg -end - - --- log phase in APISIX function _M.log(conf, ctx) local entry = log_util.get_full_log(ngx, conf) - - if batch_processor_manager:add_entry(conf, entry) then - return - end - - -- Generate a function to be executed by the batch processor - local cp_ctx = core.table.clone(ctx) - local func = function(entries, batch_max_size) - local data, err - if batch_max_size == 1 then - data, err = core.json.encode(entries[1]) -- encode as single {} - else - data, err = core.json.encode(entries) -- encode as array [{}] - end - - if not data then - return false, 'error occurred while encoding the data: ' .. err - end - - return send_syslog_data(conf, data, cp_ctx) - end - - batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func) + syslog.push_entry(conf, ctx, entry) end diff --git a/apisix/plugins/syslog/init.lua b/apisix/plugins/syslog/init.lua new file mode 100644 index 00000000..24f2f62a --- /dev/null +++ b/apisix/plugins/syslog/init.lua @@ -0,0 +1,103 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") +local bp_manager_mod = require("apisix.utils.batch-processor-manager") +local logger_socket = require("resty.logger.socket") + +local batch_processor_manager = bp_manager_mod.new("sys logger") + +local lrucache = core.lrucache.new({ + ttl = 300, count = 512, serial_creating = true, +}) + +local _M = {} + +function _M.flush_syslog(logger) + local ok, err = logger:flush(logger) + if not ok then + core.log.error("failed to flush message:", err) + end + + return ok +end + + +local function send_syslog_data(conf, log_message, api_ctx) + local err_msg + local res = true + + core.log.info("sending a batch logs to ", conf.host, ":", conf.port) + + -- fetch it from lrucache + local logger, err = core.lrucache.plugin_ctx( + lrucache, api_ctx, nil, logger_socket.new, logger_socket, { + host = conf.host, + port = conf.port, + flush_limit = conf.flush_limit, + drop_limit = conf.drop_limit, + timeout = conf.timeout, + sock_type = conf.sock_type, + pool_size = conf.pool_size, + tls = conf.tls, + } + ) + + if not logger then + res = false + err_msg = "failed when initiating the sys logger processor".. err + end + + -- reuse the logger object + local ok, err = logger:log(core.json.encode(log_message)) + if not ok then + res = false + err_msg = "failed to log message" .. err + end + + return res, err_msg +end + + +-- called in log phase of APISIX +function _M.push_entry(conf, ctx, entry) + if batch_processor_manager:add_entry(conf, entry) then + return + end + + -- Generate a function to be executed by the batch processor + local cp_ctx = core.table.clone(ctx) + local func = function(entries, batch_max_size) + local data, err + if batch_max_size == 1 then + data, err = core.json.encode(entries[1]) -- encode as single {} + else + data, err = core.json.encode(entries) -- encode as array [{}] + end + + if not data then + return false, 'error occurred while encoding the data: ' .. err + end + + return send_syslog_data(conf, data, cp_ctx) + end + + batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func) +end + + +return _M diff --git a/apisix/stream/plugins/syslog.lua b/apisix/stream/plugins/syslog.lua new file mode 100644 index 00000000..fcae8303 --- /dev/null +++ b/apisix/stream/plugins/syslog.lua @@ -0,0 +1,82 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") +local log_util = require("apisix.utils.log-util") +local bp_manager_mod = require("apisix.utils.batch-processor-manager") +local syslog = require("apisix.plugins.syslog.init") +local plugin = require("apisix.plugin") +local plugin_name = "syslog" + +local batch_processor_manager = bp_manager_mod.new("stream sys logger") +local schema = { + type = "object", + properties = { + host = {type = "string"}, + port = {type = "integer"}, + flush_limit = {type = "integer", minimum = 1, default = 4096}, + drop_limit = {type = "integer", default = 1048576}, + timeout = {type = "integer", minimum = 1, default = 3000}, + sock_type = {type = "string", default = "tcp", enum = {"tcp", "udp"}}, + pool_size = {type = "integer", minimum = 5, default = 5}, + tls = {type = "boolean", default = false} + }, + required = {"host", "port"} +} + +local schema = batch_processor_manager:wrap_schema(schema) + +local metadata_schema = { + type = "object", + properties = { + log_format = log_util.metadata_schema_log_format, + }, +} + +local _M = { + version = 0.1, + priority = 401, + name = plugin_name, + schema = schema, + metadata_schema = metadata_schema, + flush_syslog = syslog.flush_syslog, +} + + +function _M.check_schema(conf, schema_type) + if schema_type == core.schema.TYPE_METADATA then + return core.schema.check(metadata_schema, conf) + end + return core.schema.check(schema, conf) +end + + +function _M.log(conf, ctx) + local metadata = plugin.plugin_metadata(plugin_name) + if not metadata or not metadata.value.log_format + or core.table.nkeys(metadata.value.log_format) <= 0 + then + core.log.error("syslog's log_format is not set") + return + end + + local entry = log_util.get_custom_format_log(ctx, metadata.value.log_format) + syslog.push_entry(conf, ctx, entry) +end + + +return _M diff --git a/apisix/utils/batch-processor.lua b/apisix/utils/batch-processor.lua index 6d3bf53f..5e324e8d 100644 --- a/apisix/utils/batch-processor.lua +++ b/apisix/utils/batch-processor.lua @@ -28,7 +28,10 @@ local batch_processor_mt = { local execute_func local create_buffer_timer local batch_metrics -local prometheus = require("apisix.plugins.prometheus.exporter") +local prometheus +if ngx.config.subsystem == "http" then + prometheus = require("apisix.plugins.prometheus.exporter") +end local schema = { @@ -181,7 +184,7 @@ function batch_processor:push(entry) return end - if not batch_metrics and prometheus.get_prometheus() and self.name + if prometheus and not batch_metrics and self.name and self.route_id and self.server_addr then batch_metrics = prometheus.get_prometheus():gauge("batch_process_entries", "batch process remaining entries", diff --git a/conf/config-default.yaml b/conf/config-default.yaml index 4522fdc1..0bda6354 100644 --- a/conf/config-default.yaml +++ b/conf/config-default.yaml @@ -402,6 +402,7 @@ stream_plugins: # sorted by priority - ip-restriction # priority: 3000 - limit-conn # priority: 1003 - mqtt-proxy # priority: 1000 + - syslog # priority: 401 # <- recommend to use priority (0, 100) for your custom plugins #wasm: diff --git a/t/stream-plugin/syslog.t b/t/stream-plugin/syslog.t new file mode 100644 index 00000000..c6d96c95 --- /dev/null +++ b/t/stream-plugin/syslog.t @@ -0,0 +1,343 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_shuffle(); +no_root_location(); + + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->error_log && !$block->no_error_log) { + $block->set_value("no_error_log", "[error]\n[alert]"); + } + + if (!defined $block->extra_stream_config) { + my $stream_config = <<_EOC_; + server { + listen 8125 udp; + content_by_lua_block { + require("lib.mock_layer4").dogstatsd() + } + } +_EOC_ + $block->set_value("extra_stream_config", $stream_config); + } + +}); + +run_tests; + +__DATA__ + +=== TEST 1: custom log format not set +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/upstreams/1', + ngx.HTTP_PUT, + [[{ + "nodes": { + "127.0.0.1:1995": 1 + }, + "type": "roundrobin" + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "syslog": { + "host" : "127.0.0.1", + "port" : 8125, + "sock_type": "udp", + "batch_max_size": 1, + "flush_limit":1 + } + }, + "upstream_id": "1" + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 2: hit +--- stream_request eval +mmm +--- stream_response +hello world +--- error_log +syslog's log_format is not set + + + +=== TEST 3: set custom log format +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/syslog', + ngx.HTTP_PUT, + [[{ + "log_format": { + "client_ip": "$remote_addr" + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 4: hit +--- stream_request eval +mmm +--- stream_response +hello world +--- wait: 0.5 +--- error_log eval +qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/ + + + +=== TEST 5: flush manually +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.stream.plugins.syslog") + local logger_socket = require("resty.logger.socket") + local logger, err = logger_socket:new({ + host = "127.0.0.1", + port = 5044, + flush_limit = 100, + }) + + local bytes, err = logger:log("abc") + if err then + ngx.log(ngx.ERR, err) + end + + local bytes, err = logger:log("efg") + if err then + ngx.log(ngx.ERR, err) + end + + local ok, err = plugin.flush_syslog(logger) + if not ok then + ngx.say("failed to flush syslog: ", err) + return + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +done + + + +=== TEST 6: small flush_limit, instant flush +--- stream_conf_enable +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "syslog": { + "host" : "127.0.0.1", + "port" : 5044, + "flush_limit" : 1, + "inactive_timeout": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1995": 1 + }, + "type": "roundrobin" + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + + -- wait etcd sync + ngx.sleep(0.5) + + local sock = ngx.socket.tcp() + local ok, err = sock:connect("127.0.0.1", 1985) + if not ok then + ngx.say("failed to connect: ", err) + return + end + + assert(sock:send("mmm")) + local data = assert(sock:receive("*a")) + ngx.print(data) + + -- wait flush log + ngx.sleep(2.5) + } + } +--- request +GET /t +--- response_body +passed +hello world +--- timeout: 5 +--- error_log +try to lock with key stream/route#1 +unlock with key stream/route#1 + + + +=== TEST 7: check plugin configuration updating +--- stream_conf_enable +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body1 = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "syslog": { + "host" : "127.0.0.1", + "port" : 5044, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1995": 1 + }, + "type": "roundrobin" + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + local sock = ngx.socket.tcp() + local ok, err = sock:connect("127.0.0.1", 1985) + if not ok then + ngx.status = code + ngx.say("fail") + return + end + + assert(sock:send("mmm")) + local body2 = assert(sock:receive("*a")) + + local code, body3 = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "syslog": { + "host" : "127.0.0.1", + "port" : 5045, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1995": 1 + }, + "type": "roundrobin" + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + local sock = ngx.socket.tcp() + local ok, err = sock:connect("127.0.0.1", 1985) + if not ok then + ngx.status = code + ngx.say("fail") + return + end + + assert(sock:send("mmm")) + local body4 = assert(sock:receive("*a")) + + ngx.print(body1) + ngx.print(body2) + ngx.print(body3) + ngx.print(body4) + } + } +--- request +GET /t +--- wait: 0.5 +--- response_body +passedhello world +passedhello world +--- grep_error_log eval +qr/sending a batch logs to 127.0.0.1:(\d+)/ +--- grep_error_log_out +sending a batch logs to 127.0.0.1:5044 +sending a batch logs to 127.0.0.1:5045