feat(pubsub): support kafka (#7032)

This commit is contained in:
Zeping Bai 2022-05-16 17:12:56 +08:00 committed by GitHub
parent 3c54cf5e65
commit db26b6c38f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 608 additions and 13 deletions

View File

@ -295,6 +295,9 @@ install: runtime
$(ENV_INSTALL) apisix/discovery/kubernetes/*.lua $(ENV_INST_LUADIR)/apisix/discovery/kubernetes
$(ENV_INSTALL) apisix/discovery/tars/*.lua $(ENV_INST_LUADIR)/apisix/discovery/tars
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/pubsub
$(ENV_INSTALL) apisix/pubsub/*.lua $(ENV_INST_LUADIR)/apisix/pubsub/
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/http
$(ENV_INSTALL) apisix/http/*.lua $(ENV_INST_LUADIR)/apisix/http/

View File

@ -37,6 +37,24 @@ message CmdPing {
*/
message CmdEmpty {}
/**
* Get the offset of the specified topic partition from Apache Kafka.
*/
message CmdKafkaListOffset {
string topic = 1;
int32 partition = 2;
int64 timestamp = 3;
}
/**
* Fetch messages of the specified topic partition from Apache Kafka.
*/
message CmdKafkaFetch {
string topic = 1;
int32 partition = 2;
int64 offset = 3;
}
/**
* Client request definition for pubsub scenarios
*
@ -55,8 +73,10 @@ message CmdEmpty {}
message PubSubReq {
int64 sequence = 1;
oneof req {
CmdEmpty cmd_empty = 31;
CmdPing cmd_ping = 32;
CmdEmpty cmd_empty = 31;
CmdPing cmd_ping = 32;
CmdKafkaFetch cmd_kafka_fetch = 33;
CmdKafkaListOffset cmd_kafka_list_offset = 34;
};
}
@ -64,7 +84,7 @@ message PubSubReq {
* The response body of the service when an error occurs,
* containing the error code and the error message.
*/
message ErrorResp {
message ErrorResp {
int32 code = 1;
string message = 2;
}
@ -77,6 +97,31 @@ message PongResp {
bytes state = 1;
}
/**
* The definition of a message in Kafka with the current message
* offset, production timestamp, Key, and message content.
*/
message KafkaMessage {
int64 offset = 1;
int64 timestamp = 2;
bytes key = 3;
bytes value = 4;
}
/**
* The response of Fetch messages from Apache Kafka.
*/
message KafkaFetchResp {
repeated KafkaMessage messages = 1;
}
/**
* The response of list offset from Apache Kafka.
*/
message KafkaListOffsetResp {
int64 offset = 1;
}
/**
* Server response definition for pubsub scenarios
*
@ -90,7 +135,9 @@ message PongResp {
message PubSubResp {
int64 sequence = 1;
oneof resp {
ErrorResp error_resp = 31;
PongResp pong_resp = 32;
ErrorResp error_resp = 31;
PongResp pong_resp = 32;
KafkaFetchResp kafka_fetch_resp = 33;
KafkaListOffsetResp kafka_list_offset_resp = 34;
};
}

View File

@ -42,6 +42,7 @@ local xrpc = require("apisix.stream.xrpc")
local ctxdump = require("resty.ctxdump")
local ngx_balancer = require("ngx.balancer")
local debug = require("apisix.debug")
local pubsub_kafka = require("apisix.pubsub.kafka")
local ngx = ngx
local get_method = ngx.req.get_method
local ngx_exit = ngx.exit
@ -504,6 +505,13 @@ function _M.http_access_phase()
api_ctx.upstream_scheme = "grpc"
end
-- load balancer is not required by kafka upstream, so the upstream
-- node selection process is intercepted and left to kafka to
-- handle on its own
if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
return pubsub_kafka.access(api_ctx)
end
local code, err = set_upstream(route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)

137
apisix/pubsub/kafka.lua Normal file
View File

@ -0,0 +1,137 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local bconsumer = require("resty.kafka.basic-consumer")
local ffi = require("ffi")
local C = ffi.C
local tostring = tostring
local type = type
local ipairs = ipairs
local str_sub = string.sub
ffi.cdef[[
int64_t atoll(const char *num);
]]
local _M = {}
-- Handles the conversion of 64-bit integers in the lua-protobuf.
--
-- Because of the limitations of luajit, we cannot use native 64-bit
-- numbers, so pb decode converts int64 to a string in #xxx format
-- to avoid loss of precision, by this function, we convert this
-- string to int64 cdata numbers.
local function pb_convert_to_int64(src)
if type(src) == "string" then
-- the format is #1234, so there is a small minimum length of 2
if #src < 2 then
return 0
end
return C.atoll(ffi.cast("char *", src) + 1)
else
return src
end
end
-- Takes over requests of type kafka upstream in the http_access phase.
function _M.access(api_ctx)
local pubsub, err = core.pubsub.new()
if not pubsub then
core.log.error("failed to initialize pubsub module, err: ", err)
core.response.exit(400)
return
end
local up_nodes = api_ctx.matched_upstream.nodes
-- kafka client broker-related configuration
local broker_list = {}
for i, node in ipairs(up_nodes) do
broker_list[i] = {
host = node.host,
port = node.port,
}
end
local client_config = {refresh_interval = 30 * 60 * 1000}
-- load and create the consumer instance when it is determined
-- that the websocket connection was created successfully
local consumer = bconsumer:new(broker_list, client_config)
pubsub:on("cmd_kafka_list_offset", function (params)
-- The timestamp parameter uses a 64-bit integer, which is difficult
-- for luajit to handle well, so the int64_as_string option in
-- lua-protobuf is used here. Smaller numbers will be decoded as
-- lua number, while overly larger numbers will be decoded as strings
-- in the format #number, where the # symbol at the beginning of the
-- string will be removed and converted to int64_t with the atoll function.
local timestamp = pb_convert_to_int64(params.timestamp)
local offset, err = consumer:list_offset(params.topic, params.partition, timestamp)
if not offset then
return nil, "failed to list offset, topic: " .. params.topic ..
", partition: " .. params.partition .. ", err: " .. err
end
offset = tostring(offset)
return {
kafka_list_offset_resp = {
offset = str_sub(offset, 1, #offset - 2)
}
}
end)
pubsub:on("cmd_kafka_fetch", function (params)
local offset = pb_convert_to_int64(params.offset)
local ret, err = consumer:fetch(params.topic, params.partition, offset)
if not ret then
return nil, "failed to fetch message, topic: " .. params.topic ..
", partition: " .. params.partition .. ", err: " .. err
end
-- split into multiple messages when the amount of data in
-- a single batch is too large
local messages = ret.records
-- special handling of int64 for luajit compatibility
for _, message in ipairs(messages) do
local timestamp = tostring(message.timestamp)
message.timestamp = str_sub(timestamp, 1, #timestamp - 2)
local offset = tostring(message.offset)
message.offset = str_sub(offset, 1, #offset - 2)
end
return {
kafka_fetch_resp = {
messages = messages,
},
}
end)
-- start processing client commands
pubsub:wait()
end
return _M

View File

@ -451,10 +451,12 @@ local upstream_schema = {
},
scheme = {
default = "http",
enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp"},
enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp",
"kafka"},
description = "The scheme of the upstream." ..
" For L7 proxy, it can be one of grpc/grpcs/http/https." ..
" For L4 proxy, it can be one of tcp/tls/udp."
" For L4 proxy, it can be one of tcp/tls/udp." ..
" For specific protocols, it can be kafka."
},
labels = labels_def,
discovery_type = {

View File

@ -19,6 +19,15 @@
docker exec -i apache-apisix_kafka-server1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test2
docker exec -i apache-apisix_kafka-server1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 3 --topic test3
docker exec -i apache-apisix_kafka-server2_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server2:2181 --replication-factor 1 --partitions 1 --topic test4
docker exec -i apache-apisix_kafka-server1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test-consumer
# create messages for test-consumer
for i in `seq 30`
do
docker exec -i apache-apisix_kafka-server1_1 bash -c "echo "testmsg$i" | /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test-consumer"
echo "Produces messages to the test-consumer topic, msg: testmsg$i"
done
echo "Kafka service initialization completed"
# prepare openwhisk env
docker pull openwhisk/action-nodejs-v14:nightly

View File

@ -216,7 +216,8 @@
"type": "category",
"label": "PubSub",
"items": [
"pubsub"
"pubsub",
"pubsub/kafka"
]
},
{

View File

@ -2,7 +2,7 @@
title: PubSub
keywords:
- APISIX
- Pub-Sub
- PubSub
description: This document contains information about the Apache APISIX pubsub framework.
---
@ -25,7 +25,7 @@ description: This document contains information about the Apache APISIX pubsub f
#
-->
## What is Pub-Sub
## What is PubSub
Publish-subscribe is a messaging paradigm:
@ -38,9 +38,13 @@ In Apache APISIX, the most common scenario is handling north-south traffic from
## Architecture
![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
![pubsub architecture](../../assets/images/pubsub-architecture.svg)
Currently, Apache APISIX supports WebSocket communication with the client, which can be any application that supports WebSocket, with Protocol Buffer as the serialization mechanism, see the [protocol definition](../../../apisix/pubsub.proto).
Currently, Apache APISIX supports WebSocket communication with the client, which can be any application that supports WebSocket, with Protocol Buffer as the serialization mechanism, see the [protocol definition](https://github.com/apache/apisix/blob/master/apisix/include/apisix/model/pubsub.proto).
## Supported messaging systems
- [Aapche Kafka](pubsub/kafka.md)
## How to support other messaging systems

View File

@ -0,0 +1,94 @@
---
title: Apache Kafka
keywords:
- APISIX
- PubSub
- Kafka
description: This document contains information about the Apache APISIX kafka pubsub scenario.
---
<!--
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-->
## Connect to Apache Kafka
Connecting to Apache Kafka in Apache APISIX is very simple.
Currently, we provide a simpler way to integrate by combining two APIs, ListOffsets and Fetch, to quickly implement the ability to pull Kafka messages. Still, they do not support Apache Kafka's consumer group feature for now and cannot be managed for offsets by Apache Kafka.
### Limitations
- Offsets need to be managed manually
They can be stored by a custom backend service or obtained via the list_offset command before starting to fetch the message, which can use timestamp to get the starting offset, or to get the initial and end offsets.
- Unsupported batch data acquisition
A single instruction can only obtain the data of a Topic Partition, does not support batch data acquisition through a single instruction
### Prepare
First, it is necessary to compile the [communication protocol](https://github.com/apache/apisix/blob/master/apisix/include/apisix/model/pubsub.proto) as a language-specific SDK using the `protoc`, which provides the command and response definitions to connect to Kafka via APISIX using the WebSocket.
The `sequence` field in the protocol is used to associate the request with the response, they will correspond one to one, the client can manage it in the way they want, APISIX will not modify it, only pass it back to the client through the response body.
The following commands are currently used by Apache Kafka connect
- CmdKafkaFetch
- CmdKafkaListOffset
> The `timestamp` field in the `CmdKafkaListOffset` command supports the following value:
>
> - `unix timestamp`: Offset of the first message after the specified timestamp
> - `-1`Offset of the last message of the current Partition
> - `-2`Offset of the first message of current Partition
>
> For more information, see [Apache Kafka Protocol Documentation](https://kafka.apache.org/protocol.html#The_Messages_ListOffsets)
Possible response body: When an error occurs, `ErrorResp` will be returned, which includes the error string; the rest of the response will be returned after the execution of the particular command.
- ErrorResp
- KafkaFetchResp
- KafkaListOffsetResp
### How to use
#### Create route
Create a route, set the upstream `scheme` field to `kafka`, and configure `nodes` to be the address of the Kafka broker.
```shell
curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/kafka' \
-H 'X-API-KEY: <api-key>' \
-H 'Content-Type: application/json' \
-d '{
"uri": "/kafka",
"upstream": {
"nodes": {
"kafka-server1:9092": 1,
"kafka-server2:9092": 1,
"kafka-server3:9092": 1
},
"type": "none",
"scheme": "kafka"
}
}'
```
After configuring the route, you can use this feature.

View File

@ -53,7 +53,7 @@ dependencies = {
"nginx-lua-prometheus = 0.20220127",
"jsonschema = 0.9.8",
"lua-resty-ipmatcher = 0.6.1",
"lua-resty-kafka = 0.07",
"lua-resty-kafka = 0.20-0",
"lua-resty-logger-socket = 2.0.1-0",
"skywalking-nginx-lua = 0.6.0",
"base64 = 1.5-2",

61
t/admin/upstream5.t vendored Normal file
View File

@ -0,0 +1,61 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX 'no_plan';
repeat_each(1);
no_long_string();
no_root_location();
no_shuffle();
log_level("info");
add_block_preprocessor(sub {
my ($block) = @_;
if (!$block->request) {
$block->set_value("request", "GET /t");
}
if (!$block->no_error_log && !$block->error_log) {
$block->set_value("no_error_log", "[error]\n[alert]");
}
});
run_tests;
__DATA__
=== TEST 1: set upstream(kafka scheme)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin")
local code, body = t.test("/apisix/admin/upstreams/kafka", ngx.HTTP_PUT, [[{
"nodes": {
"127.0.0.1:9092": 1
},
"type": "none",
"scheme": "kafka"
}]])
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- response_body
passed

229
t/pubsub/kafka.t vendored Normal file
View File

@ -0,0 +1,229 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX 'no_plan';
repeat_each(1);
no_long_string();
no_root_location();
add_block_preprocessor(sub {
my ($block) = @_;
if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
$block->set_value("no_error_log", "[error]");
}
if (!defined $block->request) {
$block->set_value("request", "GET /t");
}
});
run_tests();
__DATA__
=== TEST 1: setup all-in-one test
--- config
location /t {
content_by_lua_block {
local data = {
{
url = "/apisix/admin/routes/kafka",
data = [[{
"upstream": {
"nodes": {
"127.0.0.1:9092": 1
},
"type": "none",
"scheme": "kafka"
},
"uri": "/kafka"
}]],
},
{
url = "/apisix/admin/routes/kafka-invalid",
data = [[{
"upstream": {
"nodes": {
"127.0.0.1:59092": 1
},
"type": "none",
"scheme": "kafka"
},
"uri": "/kafka-invalid"
}]],
},
}
local t = require("lib.test_admin").test
for _, data in ipairs(data) do
local code, body = t(data.url, ngx.HTTP_PUT, data.data)
ngx.say(body)
end
}
}
--- response_body eval
"passed\n"x2
=== TEST 2: hit route (with HTTP request)
--- request
GET /kafka
--- error_code: 400
--- error_log
failed to initialize pubsub module, err: bad "upgrade" request header: nil
=== TEST 3: hit route (Kafka)
--- config
# The messages used in this test are produced in the linux-ci-init-service.sh
# script that prepares the CI environment
location /t {
content_by_lua_block {
local lib_pubsub = require("lib.pubsub")
local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka")
local data = {
{
sequence = 0,
cmd_kafka_list_offset = {
topic = "not-exist",
partition = 0,
timestamp = -1,
},
},
{
sequence = 1,
cmd_kafka_fetch = {
topic = "not-exist",
partition = 0,
offset = 0,
},
},
{
-- Query first message offset
sequence = 2,
cmd_kafka_list_offset = {
topic = "test-consumer",
partition = 0,
timestamp = -2,
},
},
{
-- Query last message offset
sequence = 3,
cmd_kafka_list_offset = {
topic = "test-consumer",
partition = 0,
timestamp = -1,
},
},
{
-- Query by timestamp, 9999999999999 later than the
-- production time of any message
sequence = 4,
cmd_kafka_list_offset = {
topic = "test-consumer",
partition = 0,
timestamp = "9999999999999",
},
},
{
-- Query by timestamp, 1500000000000 ms earlier than the
-- production time of any message
sequence = 5,
cmd_kafka_list_offset = {
topic = "test-consumer",
partition = 0,
timestamp = "1500000000000",
},
},
{
sequence = 6,
cmd_kafka_fetch = {
topic = "test-consumer",
partition = 0,
offset = 14,
},
},
{
sequence = 7,
cmd_kafka_fetch = {
topic = "test-consumer",
partition = 0,
offset = 999,
},
},
}
for i = 1, #data do
local data = test_pubsub:send_recv_ws_binary(data[i])
if data.error_resp then
ngx.say(data.sequence..data.error_resp.message)
end
if data.kafka_list_offset_resp then
ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset)
end
if data.kafka_fetch_resp then
ngx.say(data.sequence.."offset: "..data.kafka_fetch_resp.messages[1].offset..
" msg: "..data.kafka_fetch_resp.messages[1].value)
end
end
test_pubsub:close_ws()
}
}
--- response_body
0failed to list offset, topic: not-exist, partition: 0, err: not found topic
1failed to fetch message, topic: not-exist, partition: 0, err: not found topic
2offset: 0
3offset: 30
4offset: -1
5offset: 0
6offset: 14 msg: testmsg15
7failed to fetch message, topic: test-consumer, partition: 0, err: OFFSET_OUT_OF_RANGE
=== TEST 4: hit route (Kafka with invalid node ip)
--- config
# The messages used in this test are produced in the linux-ci-init-service.sh
# script that prepares the CI environment
location /t {
content_by_lua_block {
local lib_pubsub = require("lib.pubsub")
local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka-invalid")
local data = test_pubsub:send_recv_ws_binary({
sequence = 0,
cmd_kafka_list_offset = {
topic = "test-consumer",
partition = 0,
timestamp = -2,
},
})
if data.error_resp then
ngx.say(data.sequence..data.error_resp.message)
end
test_pubsub:close_ws()
}
}
--- response_body
0failed to list offset, topic: test-consumer, partition: 0, err: not found topic
--- error_log
all brokers failed in fetch topic metadata