diff --git a/apisix/stream/xrpc/sdk.lua b/apisix/stream/xrpc/sdk.lua index cea2dd1b..9773cb36 100644 --- a/apisix/stream/xrpc/sdk.lua +++ b/apisix/stream/xrpc/sdk.lua @@ -43,12 +43,20 @@ local _M = {} function _M.connect_upstream(node, up_conf) local sk = xrpc_socket.upstream.socket() + local timeout = up_conf.timeout + if not timeout then + -- use the default timeout of Nginx proxy + sk:settimeouts(60 * 1000, 600 * 1000, 600 * 1000) + else + -- the timeout unit for balancer is second while the unit for cosocket is millisecond + sk:settimeouts(timeout.connect * 1000, timeout.send * 1000, timeout.read * 1000) + end + local ok, err = sk:connect(node.host, node.port) if not ok then core.log.error("failed to connect: ", err) return nil end - -- TODO: support timeout if up_conf.scheme == "tls" then -- TODO: support mTLS diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua index 694e5e7a..1487044f 100644 --- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua +++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua @@ -177,25 +177,14 @@ function _M.connect_upstream(session, ctx) return DECLINED end local node = nodes[math_random(#nodes)] - local sk = xrpc_socket.upstream.socket() - sk:settimeout(1000) -- the short timeout is just for test core.log.info("connect to ", node.host, ":", node.port) - local ok, err = sk:connect(node.host, node.port) - if not ok then - core.log.error("failed to connect: ", err) + local sk = sdk.connect_upstream(node, conf) + if not sk then return DECLINED end - if conf.scheme == "tls" then - local ok, err = sk:sslhandshake(nil, node.host) - if not ok then - core.log.error("failed to handshake: ", err) - return DECLINED - end - end - return OK, sk end diff --git a/t/xrpc/pingpong.t b/t/xrpc/pingpong.t index a264e2fc..97c192ed 100644 --- a/t/xrpc/pingpong.t +++ b/t/xrpc/pingpong.t @@ -212,7 +212,69 @@ failed to connect: connection refused -=== TEST 8: reset +=== TEST 8: use short timeout to check upstream's bad response +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong" + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + timeout = { + connect = 0.01, + send = 0.009, + read = 0.008, + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 9: bad response +--- request eval +"POST /t +" . +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" x 1 +--- stream_conf_enable +--- stream_upstream_code + local sock = ngx.req.socket(true) + sock:settimeout(10) + while true do + local data = sock:receiveany(4096) + if not data then + return + end + sock:send(data:sub(5)) + end +--- error_log +failed to read: timeout +stream lua tcp socket connect timeout: 10 +lua tcp socket send timeout: 9 +stream lua tcp socket read timeout: 8 +--- log_level: debug + + + +=== TEST 10: reset --- config location /t { content_by_lua_block { @@ -244,29 +306,7 @@ passed -=== TEST 9: bad response ---- request eval -"POST /t -" . -"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" x 1 ---- stream_conf_enable ---- stream_upstream_code - local sock = ngx.req.socket(true) - sock:settimeout(1100) - while true do - local data = sock:receiveany(4096) - if not data then - return - end - sock:send(data:sub(5)) - end ---- wait: 1.1 ---- error_log -failed to read: timeout - - - -=== TEST 10: client stream, N:N +=== TEST 11: client stream, N:N --- request eval "POST /t " . @@ -292,7 +332,7 @@ failed to read: timeout -=== TEST 11: client stream, bad response +=== TEST 12: client stream, bad response --- request eval "POST /t " . @@ -321,7 +361,7 @@ RPC is not finished -=== TEST 12: server stream, heartbeat +=== TEST 13: server stream, heartbeat --- request eval "POST /t " . @@ -347,7 +387,7 @@ RPC is not finished -=== TEST 13: server stream +=== TEST 14: server stream --- request eval "POST /t " . @@ -370,7 +410,7 @@ RPC is not finished -=== TEST 14: superior & subordinate +=== TEST 15: superior & subordinate --- config location /t { content_by_lua_block { @@ -495,7 +535,7 @@ passed -=== TEST 15: hit +=== TEST 16: hit --- request eval "POST /t " . @@ -515,7 +555,7 @@ connect to 127.0.0.2:1995 while prereading client data -=== TEST 16: hit (fallback to superior if not found) +=== TEST 17: hit (fallback to superior if not found) --- request eval "POST /t " . @@ -535,7 +575,7 @@ connect to 127.0.0.1:1995 while prereading client data -=== TEST 17: cache router by version +=== TEST 18: cache router by version --- config location /t { content_by_lua_block { @@ -608,7 +648,7 @@ connect to 127.0.0.4:1995 while prereading client data -=== TEST 18: use upstream_id +=== TEST 19: use upstream_id --- config location /t { content_by_lua_block { @@ -657,7 +697,7 @@ passed -=== TEST 19: hit +=== TEST 20: hit --- request eval "POST /t " . @@ -672,7 +712,7 @@ connect to 127.0.0.3:1995 while prereading client data -=== TEST 20: cache router by version, with upstream_id +=== TEST 21: cache router by version, with upstream_id --- config location /t { content_by_lua_block { diff --git a/t/xrpc/pingpong2.t b/t/xrpc/pingpong2.t new file mode 100644 index 00000000..3c7152c5 --- /dev/null +++ b/t/xrpc/pingpong2.t @@ -0,0 +1,141 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX; + +my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx'; +my $version = eval { `$nginx_binary -V 2>&1` }; + +if ($version !~ m/\/apisix-nginx-module/) { + plan(skip_all => "apisix-nginx-module not installed"); +} else { + plan('no_plan'); +} + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->extra_yaml_config) { + my $extra_yaml_config = <<_EOC_; +xrpc: + protocols: + - name: pingpong +_EOC_ + $block->set_value("extra_yaml_config", $extra_yaml_config); + } + + my $config = $block->config // <<_EOC_; + location /t { + content_by_lua_block { + ngx.req.read_body() + local sock = ngx.socket.tcp() + sock:settimeout(1000) + local ok, err = sock:connect("127.0.0.1", 1985) + if not ok then + ngx.log(ngx.ERR, "failed to connect: ", err) + return ngx.exit(503) + end + + local bytes, err = sock:send(ngx.req.get_body_data()) + if not bytes then + ngx.log(ngx.ERR, "send stream request error: ", err) + return ngx.exit(503) + end + while true do + local data, err = sock:receiveany(4096) + if not data then + sock:close() + break + end + ngx.print(data) + end + } + } +_EOC_ + + $block->set_value("config", $config); + + my $stream_upstream_code = $block->stream_upstream_code // <<_EOC_; + local sock = ngx.req.socket(true) + sock:settimeout(10) + while true do + local data = sock:receiveany(4096) + if not data then + return + end + sock:send(data) + end +_EOC_ + + $block->set_value("stream_upstream_code", $stream_upstream_code); + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]\nRPC is not finished"); + } + + $block; +}); + +run_tests; + +__DATA__ + +=== TEST 1: init +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local etcd = require("apisix.core.etcd") + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong" + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 2: check the default timeout +--- request eval +"POST /t +" . +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" +--- response_body eval +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" +--- error_log +stream lua tcp socket connect timeout: 60000 +lua tcp socket send timeout: 60000 +stream lua tcp socket read timeout: 60000 +--- log_level: debug +--- stream_conf_enable