From 18f6e5c6752f983a2f9e127e5204f9158fb8de0e Mon Sep 17 00:00:00 2001 From: Zeping Bai Date: Tue, 17 May 2022 16:34:20 +0800 Subject: [PATCH] feat(pubsub): support kafka tls and sasl/plain auth (#7046) --- .github/workflows/centos7-ci.yml | 2 + apisix/plugins/kafka-proxy.lua | 61 ++++++++++ apisix/pubsub/kafka.lua | 12 ++ apisix/schema_def.lua | 11 +- apisix/upstream.lua | 2 +- ci/linux_openresty_common_runner.sh | 3 + ci/pod/docker-compose.yml | 12 ++ ci/pod/kafka/kafka-server/env/common.env | 9 +- ci/pod/kafka/kafka-server/kafka_jaas.conf | 23 ++++ conf/config-default.yaml | 1 + docs/en/latest/config.json | 3 +- docs/en/latest/plugins/kafka-proxy.md | 80 +++++++++++++ docs/en/latest/pubsub/kafka.md | 33 ++++++ t/admin/plugins.t | 1 + t/admin/upstream5.t | 52 +++++++++ t/node/upstream-mtls.t | 2 +- t/plugin/kafka-proxy.t | 61 ++++++++++ t/pubsub/kafka.t | 133 +++++++++++++++++++++- 18 files changed, 494 insertions(+), 7 deletions(-) create mode 100644 apisix/plugins/kafka-proxy.lua create mode 100644 ci/pod/kafka/kafka-server/kafka_jaas.conf create mode 100644 docs/en/latest/plugins/kafka-proxy.md create mode 100644 t/plugin/kafka-proxy.t diff --git a/.github/workflows/centos7-ci.yml b/.github/workflows/centos7-ci.yml index a63622e7..ee56e03b 100644 --- a/.github/workflows/centos7-ci.yml +++ b/.github/workflows/centos7-ci.yml @@ -79,6 +79,8 @@ jobs: - name: Run other docker containers for test run: | + # generating SSL certificates for Kafka + keytool -genkeypair -keyalg RSA -dname "CN=127.0.0.1" -alias 127.0.0.1 -keystore ./ci/pod/kafka/kafka-server/selfsigned.jks -validity 365 -keysize 2048 -storepass changeit make ci-env-up ./ci/linux-ci-init-service.sh diff --git a/apisix/plugins/kafka-proxy.lua b/apisix/plugins/kafka-proxy.lua new file mode 100644 index 00000000..b6c666a5 --- /dev/null +++ b/apisix/plugins/kafka-proxy.lua @@ -0,0 +1,61 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") + + +local schema = { + type = "object", + properties = { + sasl = { + type = "object", + properties = { + username = { + type = "string", + }, + password = { + type = "string", + }, + }, + required = {"username", "password"}, + }, + }, +} + + +local _M = { + version = 0.1, + priority = 508, + name = "kafka-proxy", + schema = schema, +} + + +function _M.check_schema(conf) + return core.schema.check(schema, conf) +end + + +function _M.access(conf, ctx) + if conf.sasl then + ctx.kafka_consumer_enable_sasl = true + ctx.kafka_consumer_sasl_username = conf.sasl.username + ctx.kafka_consumer_sasl_password = conf.sasl.password + end +end + + +return _M diff --git a/apisix/pubsub/kafka.lua b/apisix/pubsub/kafka.lua index ff035ce4..2cce1a05 100644 --- a/apisix/pubsub/kafka.lua +++ b/apisix/pubsub/kafka.lua @@ -69,9 +69,21 @@ function _M.access(api_ctx) host = node.host, port = node.port, } + + if api_ctx.kafka_consumer_enable_sasl then + broker_list[i].sasl_config = { + mechanism = "PLAIN", + user = api_ctx.kafka_consumer_sasl_username, + password = api_ctx.kafka_consumer_sasl_password, + } + end end local client_config = {refresh_interval = 30 * 60 * 1000} + if api_ctx.matched_upstream.tls then + client_config.ssl = true + client_config.ssl_verify = api_ctx.matched_upstream.tls.verify + end -- load and create the consumer instance when it is determined -- that the websocket connection was created successfully diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua index 5a7830e8..091785d3 100644 --- a/apisix/schema_def.lua +++ b/apisix/schema_def.lua @@ -406,8 +406,17 @@ local upstream_schema = { properties = { client_cert = certificate_scheme, client_key = private_key_schema, + verify = { + type = "boolean", + description = "Turn on server certificate verification, ".. + "currently only kafka upstream is supported", + default = false, + }, }, - required = {"client_cert", "client_key"}, + dependencies = { + client_cert = {"client_key"}, + client_key = {"client_cert"}, + } }, keepalive_pool = { type = "object", diff --git a/apisix/upstream.lua b/apisix/upstream.lua index 93c591c5..a0e963b4 100644 --- a/apisix/upstream.lua +++ b/apisix/upstream.lua @@ -435,7 +435,7 @@ local function check_upstream_conf(in_dp, conf) end end - if conf.tls then + if conf.tls and conf.tls.client_cert then local cert = conf.tls.client_cert local key = conf.tls.client_key local ok, err = apisix_ssl.validate(cert, key) diff --git a/ci/linux_openresty_common_runner.sh b/ci/linux_openresty_common_runner.sh index 3fae0797..0a1fd68e 100755 --- a/ci/linux_openresty_common_runner.sh +++ b/ci/linux_openresty_common_runner.sh @@ -21,6 +21,9 @@ before_install() { sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1) + # generating SSL certificates for Kafka + keytool -genkeypair -keyalg RSA -dname "CN=127.0.0.1" -alias 127.0.0.1 -keystore ./ci/pod/kafka/kafka-server/selfsigned.jks -validity 365 -keysize 2048 -storepass changeit + # launch deps env make ci-env-up ./ci/linux-ci-init-service.sh diff --git a/ci/pod/docker-compose.yml b/ci/pod/docker-compose.yml index 1804fbf2..68dab85c 100644 --- a/ci/pod/docker-compose.yml +++ b/ci/pod/docker-compose.yml @@ -73,11 +73,17 @@ services: restart: unless-stopped ports: - "9092:9092" + - "9093:9093" + - "9094:9094" depends_on: - zookeeper-server1 - zookeeper-server2 networks: kafka_net: + volumes: + - ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro + - ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro + - ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro kafka-server2: image: bitnami/kafka:2.8.1 @@ -88,11 +94,17 @@ services: restart: unless-stopped ports: - "19092:9092" + - "19093:9093" + - "19094:9094" depends_on: - zookeeper-server1 - zookeeper-server2 networks: kafka_net: + volumes: + - ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro + - ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro + - ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro ## Eureka diff --git a/ci/pod/kafka/kafka-server/env/common.env b/ci/pod/kafka/kafka-server/env/common.env index 06200b9b..adc9d7ca 100644 --- a/ci/pod/kafka/kafka-server/env/common.env +++ b/ci/pod/kafka/kafka-server/env/common.env @@ -1,3 +1,8 @@ ALLOW_PLAINTEXT_LISTENER=yes -KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true -KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 +KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false +KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093,SASL_PLAINTEXT://0.0.0.0:9094 +KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093,SASL_PLAINTEXT://127.0.0.1:9094 +KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM= +KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks +KAFKA_CFG_SSL_KEYSTORE_PASSWORD=changeit +KAFKA_CFG_SSL_KEY_PASSWORD=changeit diff --git a/ci/pod/kafka/kafka-server/kafka_jaas.conf b/ci/pod/kafka/kafka-server/kafka_jaas.conf new file mode 100644 index 00000000..4bc19386 --- /dev/null +++ b/ci/pod/kafka/kafka-server/kafka_jaas.conf @@ -0,0 +1,23 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin-secret" + user_admin="admin-secret"; +}; diff --git a/conf/config-default.yaml b/conf/config-default.yaml index 052c0fc2..bdf44554 100644 --- a/conf/config-default.yaml +++ b/conf/config-default.yaml @@ -374,6 +374,7 @@ plugins: # plugin list (sorted by priority) - traffic-split # priority: 966 - redirect # priority: 900 - response-rewrite # priority: 899 + - kafka-proxy # priority: 508 #- dubbo-proxy # priority: 507 - grpc-transcode # priority: 506 - grpc-web # priority: 505 diff --git a/docs/en/latest/config.json b/docs/en/latest/config.json index 5e18f58a..35e64986 100644 --- a/docs/en/latest/config.json +++ b/docs/en/latest/config.json @@ -172,7 +172,8 @@ "label": "Other Protocols", "items": [ "plugins/dubbo-proxy", - "plugins/mqtt-proxy" + "plugins/mqtt-proxy", + "plugins/kafka-proxy" ] } ] diff --git a/docs/en/latest/plugins/kafka-proxy.md b/docs/en/latest/plugins/kafka-proxy.md new file mode 100644 index 00000000..8fd08f0c --- /dev/null +++ b/docs/en/latest/plugins/kafka-proxy.md @@ -0,0 +1,80 @@ +--- +title: kafka-proxy +keywords: + - APISIX + - Plugin + - Kafka proxy +description: This document contains information about the Apache APISIX kafka-proxy Plugin. +--- + + + +## Description + +The `kafka-proxy` plugin can be used to configure advanced parameters for the kafka upstream of Apache APISIX, such as SASL authentication. + +## Attributes + +| Name | Type | Required | Default | Valid values | Description | +|-------------------|---------|----------|---------|---------------|------------------------------------| +| sasl | object | optional | | {"username": "user", "password" :"pwd"} | SASL/PLAIN authentication configuration, when this configuration exists, turn on SASL authentication; this object will contain two parameters username and password, they must be configured. | +| sasl.username | string | required | | | SASL/PLAIN authentication username | +| sasl.password | string | required | | | SASL/PLAIN authentication password | + +:::note +If SASL authentication is enabled, the `sasl.username` and `sasl.password` must be set. +The current SASL authentication only supports PLAIN mode, which is the username password login method. +::: + +## Example usage + +When we use scheme as the upstream of kafka, we can add kafka authentication configuration to it through this plugin. + +```shell +curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/r1' \ + -H 'X-API-KEY: ' \ + -H 'Content-Type: application/json' \ + -d '{ + "uri": "/kafka", + "plugins": { + "kafka-proxy": { + "sasl": { + "username": "user", + "password": "pwd" + } + } + }, + "upstream": { + "nodes": { + "kafka-server1:9092": 1, + "kafka-server2:9092": 1, + "kafka-server3:9092": 1 + }, + "type": "none", + "scheme": "kafka" + } +}' +``` + +Now, we can test it by connecting to the `/kafka` endpoint via websocket. + +## Disable Plugin + +To disable the `kafka-proxy` Plugin, you can delete the corresponding JSON configuration from the Plugin configuration. APISIX will automatically reload and you do not have to restart for this to take effect. diff --git a/docs/en/latest/pubsub/kafka.md b/docs/en/latest/pubsub/kafka.md index 1eb8ff2f..b2593664 100644 --- a/docs/en/latest/pubsub/kafka.md +++ b/docs/en/latest/pubsub/kafka.md @@ -92,3 +92,36 @@ curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/kafka' \ ``` After configuring the route, you can use this feature. + +#### Enabling TLS and SASL/PLAIN authentication + +Simply turn on the `kafka-proxy` plugin on the created route and enable the Kafka TLS handshake and SASL authentication through the configuration, which can be found in the [plugin documentation](../../../en/latest/plugins/kafka-proxy.md). + +```shell +curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/kafka' \ + -H 'X-API-KEY: ' \ + -H 'Content-Type: application/json' \ + -d '{ + "uri": "/kafka", + "plugins": { + "kafka-proxy": { + "sasl": { + "username": "user", + "password": "pwd" + } + } + }, + "upstream": { + "nodes": { + "kafka-server1:9092": 1, + "kafka-server2:9092": 1, + "kafka-server3:9092": 1 + }, + "type": "none", + "scheme": "kafka", + "tls": { + "verify": true + } + } +}' +``` diff --git a/t/admin/plugins.t b/t/admin/plugins.t index a639d3af..9581c538 100644 --- a/t/admin/plugins.t +++ b/t/admin/plugins.t @@ -102,6 +102,7 @@ server-info traffic-split redirect response-rewrite +kafka-proxy grpc-transcode grpc-web public-api diff --git a/t/admin/upstream5.t b/t/admin/upstream5.t index e0638417..9fd59bfe 100644 --- a/t/admin/upstream5.t +++ b/t/admin/upstream5.t @@ -59,3 +59,55 @@ __DATA__ } --- response_body passed + + + +=== TEST 2: set upstream(empty tls) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin") + local code, body = t.test("/apisix/admin/upstreams/kafka", ngx.HTTP_PUT, [[{ + "nodes": { + "127.0.0.1:9092": 1 + }, + "type": "none", + "scheme": "kafka", + "tls": {} + }]]) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 3: set upstream(tls without verify) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin") + local code, body = t.test("/apisix/admin/upstreams/kafka", ngx.HTTP_PUT, [[{ + "nodes": { + "127.0.0.1:9092": 1 + }, + "type": "none", + "scheme": "kafka", + "tls": { + "verify": false + } + }]]) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed diff --git a/t/node/upstream-mtls.t b/t/node/upstream-mtls.t index 9c0a49d9..7af6d2e6 100644 --- a/t/node/upstream-mtls.t +++ b/t/node/upstream-mtls.t @@ -77,7 +77,7 @@ __DATA__ GET /t --- error_code: 400 --- response_body -{"error_msg":"invalid configuration: property \"upstream\" validation failed: property \"tls\" validation failed: property \"client_key\" is required"} +{"error_msg":"invalid configuration: property \"upstream\" validation failed: property \"tls\" validation failed: property \"client_key\" is required when \"client_cert\" is set"} diff --git a/t/plugin/kafka-proxy.t b/t/plugin/kafka-proxy.t new file mode 100644 index 00000000..bf073e29 --- /dev/null +++ b/t/plugin/kafka-proxy.t @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]"); + } + + if (!defined $block->request) { + $block->set_value("request", "GET /t"); + } +}); + +run_tests(); + +__DATA__ + +=== TEST 1: sanity +--- config + location /t { + content_by_lua_block { + local test_cases = { + {}, + {sasl = {username = "user", password = "pwd"}}, + {sasl = {username = "user"}}, + {sasl = {username = "user", password = 1234}}, + } + local plugin = require("apisix.plugins.kafka-proxy") + + for _, case in ipairs(test_cases) do + local ok, err = plugin.check_schema(case) + ngx.say(ok and "done" or err) + end + } + } +--- response_body +done +done +property "sasl" validation failed: property "password" is required +property "sasl" validation failed: property "password" validation failed: wrong type: expected string, got number diff --git a/t/pubsub/kafka.t b/t/pubsub/kafka.t index 8303f5e9..ef4bf5fd 100644 --- a/t/pubsub/kafka.t +++ b/t/pubsub/kafka.t @@ -67,6 +67,59 @@ __DATA__ "uri": "/kafka-invalid" }]], }, + { + url = "/apisix/admin/routes/kafka-tlsv", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9093": 1 + }, + "type": "none", + "scheme": "kafka", + "tls": { + "verify": true + } + }, + "uri": "/kafka-tlsv" + }]], + }, + { + url = "/apisix/admin/routes/kafka-tls", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9093": 1 + }, + "type": "none", + "scheme": "kafka", + "tls": { + "verify": false + } + }, + "uri": "/kafka-tls" + }]], + }, + { + url = "/apisix/admin/routes/kafka-sasl", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9094": 1 + }, + "type": "none", + "scheme": "kafka" + }, + "uri": "/kafka-sasl", + "plugins": { + "kafka-proxy": { + "sasl": { + "username": "admin", + "password": "admin-secret" + } + } + } + }]], + }, } local t = require("lib.test_admin").test @@ -78,7 +131,7 @@ __DATA__ } } --- response_body eval -"passed\n"x2 +"passed\n"x5 @@ -227,3 +280,81 @@ failed to initialize pubsub module, err: bad "upgrade" request header: nil 0failed to list offset, topic: test-consumer, partition: 0, err: not found topic --- error_log all brokers failed in fetch topic metadata + + + +=== TEST 5: hit route (Kafka with TLS) +--- config + location /t { + content_by_lua_block { + local lib_pubsub = require("lib.pubsub") + local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka-tls") + + local data = test_pubsub:send_recv_ws_binary({ + sequence = 0, + cmd_kafka_list_offset = { + topic = "test-consumer", + partition = 0, + timestamp = -1, + }, + }) + if data.kafka_list_offset_resp then + ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset) + end + test_pubsub:close_ws() + } + } +--- response_body +0offset: 30 + + + +=== TEST 6: hit route (Kafka with TLS + ssl verify) +--- config + location /t { + content_by_lua_block { + local lib_pubsub = require("lib.pubsub") + local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka-tlsv") + + local data = test_pubsub:send_recv_ws_binary({ + sequence = 0, + cmd_kafka_list_offset = { + topic = "test-consumer", + partition = 0, + timestamp = -1, + }, + }) + if data.kafka_list_offset_resp then + ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset) + end + test_pubsub:close_ws() + } + } +--- error_log +self signed certificate + + + +=== TEST 7: hit route (Kafka with SASL) +--- config + location /t { + content_by_lua_block { + local lib_pubsub = require("lib.pubsub") + local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka-sasl") + + local data = test_pubsub:send_recv_ws_binary({ + sequence = 0, + cmd_kafka_list_offset = { + topic = "test-consumer", + partition = 0, + timestamp = -1, + }, + }) + if data.kafka_list_offset_resp then + ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset) + end + test_pubsub:close_ws() + } + } +--- response_body +0offset: 30