mirror of
https://gitee.com/iresty/apisix.git
synced 2024-12-03 12:37:36 +08:00
feat(xRPC): support timeout (#6965)
Signed-off-by: spacewander <spacewanderlzx@gmail.com>
This commit is contained in:
parent
bd9f900023
commit
60e0ad6515
@ -43,12 +43,20 @@ local _M = {}
|
||||
function _M.connect_upstream(node, up_conf)
|
||||
local sk = xrpc_socket.upstream.socket()
|
||||
|
||||
local timeout = up_conf.timeout
|
||||
if not timeout then
|
||||
-- use the default timeout of Nginx proxy
|
||||
sk:settimeouts(60 * 1000, 600 * 1000, 600 * 1000)
|
||||
else
|
||||
-- the timeout unit for balancer is second while the unit for cosocket is millisecond
|
||||
sk:settimeouts(timeout.connect * 1000, timeout.send * 1000, timeout.read * 1000)
|
||||
end
|
||||
|
||||
local ok, err = sk:connect(node.host, node.port)
|
||||
if not ok then
|
||||
core.log.error("failed to connect: ", err)
|
||||
return nil
|
||||
end
|
||||
-- TODO: support timeout
|
||||
|
||||
if up_conf.scheme == "tls" then
|
||||
-- TODO: support mTLS
|
||||
|
@ -177,25 +177,14 @@ function _M.connect_upstream(session, ctx)
|
||||
return DECLINED
|
||||
end
|
||||
local node = nodes[math_random(#nodes)]
|
||||
local sk = xrpc_socket.upstream.socket()
|
||||
sk:settimeout(1000) -- the short timeout is just for test
|
||||
|
||||
core.log.info("connect to ", node.host, ":", node.port)
|
||||
|
||||
local ok, err = sk:connect(node.host, node.port)
|
||||
if not ok then
|
||||
core.log.error("failed to connect: ", err)
|
||||
local sk = sdk.connect_upstream(node, conf)
|
||||
if not sk then
|
||||
return DECLINED
|
||||
end
|
||||
|
||||
if conf.scheme == "tls" then
|
||||
local ok, err = sk:sslhandshake(nil, node.host)
|
||||
if not ok then
|
||||
core.log.error("failed to handshake: ", err)
|
||||
return DECLINED
|
||||
end
|
||||
end
|
||||
|
||||
return OK, sk
|
||||
end
|
||||
|
||||
|
108
t/xrpc/pingpong.t
vendored
108
t/xrpc/pingpong.t
vendored
@ -212,7 +212,69 @@ failed to connect: connection refused
|
||||
|
||||
|
||||
|
||||
=== TEST 8: reset
|
||||
=== TEST 8: use short timeout to check upstream's bad response
|
||||
--- config
|
||||
location /t {
|
||||
content_by_lua_block {
|
||||
local t = require("lib.test_admin").test
|
||||
local code, body = t('/apisix/admin/stream_routes/1',
|
||||
ngx.HTTP_PUT,
|
||||
{
|
||||
protocol = {
|
||||
name = "pingpong"
|
||||
},
|
||||
upstream = {
|
||||
nodes = {
|
||||
["127.0.0.1:1995"] = 1
|
||||
},
|
||||
timeout = {
|
||||
connect = 0.01,
|
||||
send = 0.009,
|
||||
read = 0.008,
|
||||
},
|
||||
type = "roundrobin"
|
||||
}
|
||||
}
|
||||
)
|
||||
if code >= 300 then
|
||||
ngx.status = code
|
||||
end
|
||||
ngx.say(body)
|
||||
}
|
||||
}
|
||||
--- request
|
||||
GET /t
|
||||
--- response_body
|
||||
passed
|
||||
|
||||
|
||||
|
||||
=== TEST 9: bad response
|
||||
--- request eval
|
||||
"POST /t
|
||||
" .
|
||||
"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" x 1
|
||||
--- stream_conf_enable
|
||||
--- stream_upstream_code
|
||||
local sock = ngx.req.socket(true)
|
||||
sock:settimeout(10)
|
||||
while true do
|
||||
local data = sock:receiveany(4096)
|
||||
if not data then
|
||||
return
|
||||
end
|
||||
sock:send(data:sub(5))
|
||||
end
|
||||
--- error_log
|
||||
failed to read: timeout
|
||||
stream lua tcp socket connect timeout: 10
|
||||
lua tcp socket send timeout: 9
|
||||
stream lua tcp socket read timeout: 8
|
||||
--- log_level: debug
|
||||
|
||||
|
||||
|
||||
=== TEST 10: reset
|
||||
--- config
|
||||
location /t {
|
||||
content_by_lua_block {
|
||||
@ -244,29 +306,7 @@ passed
|
||||
|
||||
|
||||
|
||||
=== TEST 9: bad response
|
||||
--- request eval
|
||||
"POST /t
|
||||
" .
|
||||
"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" x 1
|
||||
--- stream_conf_enable
|
||||
--- stream_upstream_code
|
||||
local sock = ngx.req.socket(true)
|
||||
sock:settimeout(1100)
|
||||
while true do
|
||||
local data = sock:receiveany(4096)
|
||||
if not data then
|
||||
return
|
||||
end
|
||||
sock:send(data:sub(5))
|
||||
end
|
||||
--- wait: 1.1
|
||||
--- error_log
|
||||
failed to read: timeout
|
||||
|
||||
|
||||
|
||||
=== TEST 10: client stream, N:N
|
||||
=== TEST 11: client stream, N:N
|
||||
--- request eval
|
||||
"POST /t
|
||||
" .
|
||||
@ -292,7 +332,7 @@ failed to read: timeout
|
||||
|
||||
|
||||
|
||||
=== TEST 11: client stream, bad response
|
||||
=== TEST 12: client stream, bad response
|
||||
--- request eval
|
||||
"POST /t
|
||||
" .
|
||||
@ -321,7 +361,7 @@ RPC is not finished
|
||||
|
||||
|
||||
|
||||
=== TEST 12: server stream, heartbeat
|
||||
=== TEST 13: server stream, heartbeat
|
||||
--- request eval
|
||||
"POST /t
|
||||
" .
|
||||
@ -347,7 +387,7 @@ RPC is not finished
|
||||
|
||||
|
||||
|
||||
=== TEST 13: server stream
|
||||
=== TEST 14: server stream
|
||||
--- request eval
|
||||
"POST /t
|
||||
" .
|
||||
@ -370,7 +410,7 @@ RPC is not finished
|
||||
|
||||
|
||||
|
||||
=== TEST 14: superior & subordinate
|
||||
=== TEST 15: superior & subordinate
|
||||
--- config
|
||||
location /t {
|
||||
content_by_lua_block {
|
||||
@ -495,7 +535,7 @@ passed
|
||||
|
||||
|
||||
|
||||
=== TEST 15: hit
|
||||
=== TEST 16: hit
|
||||
--- request eval
|
||||
"POST /t
|
||||
" .
|
||||
@ -515,7 +555,7 @@ connect to 127.0.0.2:1995 while prereading client data
|
||||
|
||||
|
||||
|
||||
=== TEST 16: hit (fallback to superior if not found)
|
||||
=== TEST 17: hit (fallback to superior if not found)
|
||||
--- request eval
|
||||
"POST /t
|
||||
" .
|
||||
@ -535,7 +575,7 @@ connect to 127.0.0.1:1995 while prereading client data
|
||||
|
||||
|
||||
|
||||
=== TEST 17: cache router by version
|
||||
=== TEST 18: cache router by version
|
||||
--- config
|
||||
location /t {
|
||||
content_by_lua_block {
|
||||
@ -608,7 +648,7 @@ connect to 127.0.0.4:1995 while prereading client data
|
||||
|
||||
|
||||
|
||||
=== TEST 18: use upstream_id
|
||||
=== TEST 19: use upstream_id
|
||||
--- config
|
||||
location /t {
|
||||
content_by_lua_block {
|
||||
@ -657,7 +697,7 @@ passed
|
||||
|
||||
|
||||
|
||||
=== TEST 19: hit
|
||||
=== TEST 20: hit
|
||||
--- request eval
|
||||
"POST /t
|
||||
" .
|
||||
@ -672,7 +712,7 @@ connect to 127.0.0.3:1995 while prereading client data
|
||||
|
||||
|
||||
|
||||
=== TEST 20: cache router by version, with upstream_id
|
||||
=== TEST 21: cache router by version, with upstream_id
|
||||
--- config
|
||||
location /t {
|
||||
content_by_lua_block {
|
||||
|
141
t/xrpc/pingpong2.t
vendored
Normal file
141
t/xrpc/pingpong2.t
vendored
Normal file
@ -0,0 +1,141 @@
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
use t::APISIX;
|
||||
|
||||
my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
|
||||
my $version = eval { `$nginx_binary -V 2>&1` };
|
||||
|
||||
if ($version !~ m/\/apisix-nginx-module/) {
|
||||
plan(skip_all => "apisix-nginx-module not installed");
|
||||
} else {
|
||||
plan('no_plan');
|
||||
}
|
||||
|
||||
add_block_preprocessor(sub {
|
||||
my ($block) = @_;
|
||||
|
||||
if (!$block->extra_yaml_config) {
|
||||
my $extra_yaml_config = <<_EOC_;
|
||||
xrpc:
|
||||
protocols:
|
||||
- name: pingpong
|
||||
_EOC_
|
||||
$block->set_value("extra_yaml_config", $extra_yaml_config);
|
||||
}
|
||||
|
||||
my $config = $block->config // <<_EOC_;
|
||||
location /t {
|
||||
content_by_lua_block {
|
||||
ngx.req.read_body()
|
||||
local sock = ngx.socket.tcp()
|
||||
sock:settimeout(1000)
|
||||
local ok, err = sock:connect("127.0.0.1", 1985)
|
||||
if not ok then
|
||||
ngx.log(ngx.ERR, "failed to connect: ", err)
|
||||
return ngx.exit(503)
|
||||
end
|
||||
|
||||
local bytes, err = sock:send(ngx.req.get_body_data())
|
||||
if not bytes then
|
||||
ngx.log(ngx.ERR, "send stream request error: ", err)
|
||||
return ngx.exit(503)
|
||||
end
|
||||
while true do
|
||||
local data, err = sock:receiveany(4096)
|
||||
if not data then
|
||||
sock:close()
|
||||
break
|
||||
end
|
||||
ngx.print(data)
|
||||
end
|
||||
}
|
||||
}
|
||||
_EOC_
|
||||
|
||||
$block->set_value("config", $config);
|
||||
|
||||
my $stream_upstream_code = $block->stream_upstream_code // <<_EOC_;
|
||||
local sock = ngx.req.socket(true)
|
||||
sock:settimeout(10)
|
||||
while true do
|
||||
local data = sock:receiveany(4096)
|
||||
if not data then
|
||||
return
|
||||
end
|
||||
sock:send(data)
|
||||
end
|
||||
_EOC_
|
||||
|
||||
$block->set_value("stream_upstream_code", $stream_upstream_code);
|
||||
|
||||
if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
|
||||
$block->set_value("no_error_log", "[error]\nRPC is not finished");
|
||||
}
|
||||
|
||||
$block;
|
||||
});
|
||||
|
||||
run_tests;
|
||||
|
||||
__DATA__
|
||||
|
||||
=== TEST 1: init
|
||||
--- config
|
||||
location /t {
|
||||
content_by_lua_block {
|
||||
local t = require("lib.test_admin").test
|
||||
local etcd = require("apisix.core.etcd")
|
||||
local code, body = t('/apisix/admin/stream_routes/1',
|
||||
ngx.HTTP_PUT,
|
||||
{
|
||||
protocol = {
|
||||
name = "pingpong"
|
||||
},
|
||||
upstream = {
|
||||
nodes = {
|
||||
["127.0.0.1:1995"] = 1
|
||||
},
|
||||
type = "roundrobin"
|
||||
}
|
||||
}
|
||||
)
|
||||
if code >= 300 then
|
||||
ngx.status = code
|
||||
end
|
||||
ngx.say(body)
|
||||
}
|
||||
}
|
||||
--- request
|
||||
GET /t
|
||||
--- response_body
|
||||
passed
|
||||
|
||||
|
||||
|
||||
=== TEST 2: check the default timeout
|
||||
--- request eval
|
||||
"POST /t
|
||||
" .
|
||||
"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
|
||||
--- response_body eval
|
||||
"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
|
||||
--- error_log
|
||||
stream lua tcp socket connect timeout: 60000
|
||||
lua tcp socket send timeout: 60000
|
||||
stream lua tcp socket read timeout: 60000
|
||||
--- log_level: debug
|
||||
--- stream_conf_enable
|
Loading…
Reference in New Issue
Block a user