Merge pull request #1208 from limingxinleo/1.1-rpc

Fixed bug that exception cannot be resolved successfully in TcpServer.
This commit is contained in:
李铭昕 2020-01-10 09:51:58 +08:00 committed by GitHub
commit 830419d4bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 605 additions and 93 deletions

View File

@ -46,7 +46,7 @@ class CoreMiddleware extends \Hyperf\RpcServer\CoreMiddleware
$parameters = $this->parseParameters($controller, $action, $request->getParsedBody());
try {
$response = $controllerInstance->{$action}(...$parameters);
} catch (\Exception $exception) {
} catch (\Throwable $exception) {
$response = $this->responseBuilder->buildErrorResponse($request, ResponseBuilder::SERVER_ERROR, $exception);
$this->responseBuilder->persistToContext($response);

View File

@ -41,9 +41,10 @@ class DataFormatter implements DataFormatterInterface
{
[$id, $code, $message, $data] = $data;
if (isset($data) && $data instanceof \Exception) {
if (isset($data) && $data instanceof \Throwable) {
$data = [
'class' => get_class($data),
'code' => $data->getCode(),
'message' => $data->getMessage(),
];
}

View File

@ -12,41 +12,6 @@ declare(strict_types=1);
namespace Hyperf\JsonRpc\Exception\Handler;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\ExceptionHandler\ExceptionHandler;
use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
use Psr\Http\Message\ResponseInterface;
use Throwable;
class HttpExceptionHandler extends ExceptionHandler
class HttpExceptionHandler extends TcpExceptionHandler
{
/**
* @var StdoutLoggerInterface
*/
protected $logger;
/**
* @var FormatterInterface
*/
protected $formatter;
public function __construct(StdoutLoggerInterface $logger, FormatterInterface $formatter)
{
$this->logger = $logger;
$this->formatter = $formatter;
}
public function handle(Throwable $throwable, ResponseInterface $response)
{
$this->logger->warning($this->formatter->format($throwable));
$this->stopPropagation();
return $response;
}
public function isValid(Throwable $throwable): bool
{
return true;
}
}

View File

@ -0,0 +1,52 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\JsonRpc\Exception\Handler;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\ExceptionHandler\ExceptionHandler;
use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
use Psr\Http\Message\ResponseInterface;
use Throwable;
class TcpExceptionHandler extends ExceptionHandler
{
/**
* @var StdoutLoggerInterface
*/
protected $logger;
/**
* @var FormatterInterface
*/
protected $formatter;
public function __construct(StdoutLoggerInterface $logger, FormatterInterface $formatter)
{
$this->logger = $logger;
$this->formatter = $formatter;
}
public function handle(Throwable $throwable, ResponseInterface $response)
{
$this->logger->warning($this->formatter->format($throwable));
$this->stopPropagation();
return $response;
}
public function isValid(Throwable $throwable): bool
{
return true;
}
}

View File

@ -84,6 +84,11 @@ class JsonRpcHttpTransporter implements TransporterInterface
return '';
}
public function recv()
{
throw new \RuntimeException(__CLASS__ . ' does not support recv method.');
}
public function getClient(): Client
{
return $this->clientFactory->create([

View File

@ -12,16 +12,20 @@ declare(strict_types=1);
namespace Hyperf\JsonRpc;
use Hyperf\Contract\ConnectionInterface;
use Hyperf\JsonRpc\Pool\PoolFactory;
use Hyperf\JsonRpc\Pool\RpcConnection;
use Hyperf\LoadBalancer\LoadBalancerInterface;
use Hyperf\LoadBalancer\Node;
use Hyperf\Pool\Pool;
use Hyperf\Rpc\Contract\TransporterInterface;
use Hyperf\Utils\Context;
use RuntimeException;
class JsonRpcPoolTransporter implements TransporterInterface
{
use RecvTrait;
/**
* @var PoolFactory
*/
@ -76,11 +80,8 @@ class JsonRpcPoolTransporter implements TransporterInterface
public function send(string $data)
{
$client = retry(2, function () use ($data) {
$pool = $this->getPool();
$connection = $pool->get();
try {
/** @var RpcConnection $client */
$client = $connection->getConnection();
$client = $this->getConnection();
if ($client->send($data) === false) {
if ($client->errCode == 104) {
throw new RuntimeException('Connect to server failed.');
@ -88,20 +89,40 @@ class JsonRpcPoolTransporter implements TransporterInterface
}
return $client;
} catch (\Throwable $throwable) {
if ($connection instanceof RpcConnection) {
// Reconnect again next time.
$connection->resetLastUseTime();
if (isset($client) && $client instanceof ConnectionInterface) {
$client->close();
}
$connection->release();
throw $throwable;
}
});
try {
$data = $client->recv($this->recvTimeout);
} finally {
$client->release();
return $this->recvAndCheck($client, $this->recvTimeout);
}
public function recv()
{
$client = $this->getConnection();
return $this->recvAndCheck($client, $this->recvTimeout);
}
/**
* Get RpcConnection from Context.
*/
public function getConnection(): RpcConnection
{
$class = static::class . '.Connection';
if (Context::has($class)) {
return Context::get($class);
}
return $data;
$connection = $this->getPool()->get();
defer(function () use ($connection) {
$connection->release();
});
return Context::set($class, $connection->getConnection());
}
public function getPool(): Pool

View File

@ -15,11 +15,14 @@ namespace Hyperf\JsonRpc;
use Hyperf\LoadBalancer\LoadBalancerInterface;
use Hyperf\LoadBalancer\Node;
use Hyperf\Rpc\Contract\TransporterInterface;
use Hyperf\Utils\Context;
use RuntimeException;
use Swoole\Coroutine\Client as SwooleClient;
class JsonRpcTransporter implements TransporterInterface
{
use RecvTrait;
/**
* @var null|LoadBalancerInterface
*/
@ -67,15 +70,27 @@ class JsonRpcTransporter implements TransporterInterface
}
return $client;
});
return $client->recv($this->recvTimeout);
return $this->recvAndCheck($client, $this->recvTimeout);
}
public function recv()
{
$client = $this->getClient();
return $this->recvAndCheck($client, $this->recvTimeout);
}
public function getClient(): SwooleClient
{
$client = new SwooleClient(SWOOLE_SOCK_TCP);
$client->set($this->config['settings'] ?? []);
$class = static::class . '.Connection';
if (Context::has($class)) {
return Context::get($class);
}
return retry(2, function () use ($client) {
return Context::set($class, retry(2, function () {
$client = new SwooleClient(SWOOLE_SOCK_TCP);
$client->set($this->config['settings'] ?? []);
$node = $this->getNode();
$result = $client->connect($node->host, $node->port, $this->connectTimeout);
if ($result === false && ($client->errCode == 114 or $client->errCode == 115)) {
@ -84,7 +99,7 @@ class JsonRpcTransporter implements TransporterInterface
throw new RuntimeException('Connect to server failed.');
}
return $client;
});
}));
}
public function getLoadBalancer(): ?LoadBalancerInterface

View File

@ -44,7 +44,7 @@ class RegisterServiceListener implements ListenerInterface
public function process(object $event)
{
$annotation = $event->annotation;
if (! in_array($annotation->protocol, ['jsonrpc', 'jsonrpc-http'])) {
if (! in_array($annotation->protocol, ['jsonrpc', 'jsonrpc-http', 'jsonrpc-tcp-length-check'])) {
return;
}
$metadata = $event->toArray();

View File

@ -40,7 +40,7 @@ class NormalizeDataFormatter extends DataFormatter
public function formatErrorResponse($data)
{
if (isset($data[3]) && $data[3] instanceof \Exception) {
if (isset($data[3]) && $data[3] instanceof \Throwable) {
$data[3] = [
'class' => get_class($data[3]),
'attributes' => $this->normalizer->normalize($data[3]),

View File

@ -97,6 +97,7 @@ class RpcConnection extends BaseConnection implements ConnectionInterface
public function close(): bool
{
$this->lastUseTime = 0.0;
$this->connection->close();
return true;
}

View File

@ -0,0 +1,40 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\JsonRpc;
use Hyperf\JsonRpc\Pool\RpcConnection;
use Hyperf\Rpc\Exception\RecvException;
use Swoole\Coroutine\Client;
trait RecvTrait
{
/**
* @param Client|RpcConnection $client
* @param float $timeout
*/
public function recvAndCheck($client, $timeout)
{
$data = $client->recv((float) $timeout);
if ($data === '') {
// RpcConnection: When the next time the connection is taken out of the connection pool, it will reconnecting to the target service.
// Client: It will reconnecting to the target service in the next request.
$client->close();
throw new RecvException('Connection is closed.');
}
if ($data === false) {
throw new RecvException('Error receiving data, errno=' . $client->errCode);
}
return $data;
}
}

View File

@ -49,7 +49,7 @@ class ResponseBuilder
$this->packer = $packer;
}
public function buildErrorResponse(ServerRequestInterface $request, int $code, \Exception $error = null): ResponseInterface
public function buildErrorResponse(ServerRequestInterface $request, int $code, \Throwable $error = null): ResponseInterface
{
$body = new SwooleStream($this->formatErrorResponse($request, $code, $error));
return $this->response()->withAddedHeader('content-type', 'application/json')->withBody($body);
@ -74,7 +74,7 @@ class ResponseBuilder
return $this->packer->pack($response);
}
protected function formatErrorResponse(ServerRequestInterface $request, int $code, \Exception $error = null): string
protected function formatErrorResponse(ServerRequestInterface $request, int $code, \Throwable $error = null): string
{
[$code, $message] = $this->error($code, $error ? $error->getMessage() : null);
$response = $this->dataFormatter->formatErrorResponse([$request->getAttribute('request_id') ?? '', $code, $message, $error]);

View File

@ -20,6 +20,7 @@ use Hyperf\HttpMessage\Server\Request as Psr7Request;
use Hyperf\HttpMessage\Server\Response as Psr7Response;
use Hyperf\HttpMessage\Uri\Uri;
use Hyperf\HttpServer\Contract\CoreMiddlewareInterface;
use Hyperf\JsonRpc\Exception\Handler\TcpExceptionHandler;
use Hyperf\Rpc\Protocol;
use Hyperf\Rpc\ProtocolManager;
use Hyperf\RpcServer\RequestDispatcher;
@ -144,4 +145,11 @@ class TcpServer extends Server
}
return $request;
}
protected function getDefaultExceptionHandler(): array
{
return [
TcpExceptionHandler::class,
];
}
}

View File

@ -147,6 +147,43 @@ class AnyParamCoreMiddlewareTest extends TestCase
], $ret['error']['data']['attributes']);
}
public function testThrowable()
{
$container = $this->createContainer();
$router = $container->make(DispatcherFactory::class, [])->getRouter('jsonrpc');
$router->addRoute('/CalculatorService/error', [
CalculatorService::class, 'error',
]);
$protocol = new Protocol($container, $container->get(ProtocolManager::class), 'jsonrpc');
$builder = $container->make(ResponseBuilder::class, [
'dataFormatter' => $protocol->getDataFormatter(),
'packer' => $protocol->getPacker(),
]);
$middleware = new CoreMiddleware($container, $protocol, $builder, 'jsonrpc');
$handler = \Mockery::mock(RequestHandlerInterface::class);
$request = (new Request('POST', new Uri('/CalculatorService/error')))
->withParsedBody([]);
Context::set(ResponseInterface::class, new Response());
$request = $middleware->dispatch($request);
try {
$response = $middleware->process($request, $handler);
} catch (\Throwable $exception) {
$response = Context::get(ResponseInterface::class);
}
$this->assertEquals(200, $response->getStatusCode());
$ret = json_decode((string) $response->getBody(), true);
$this->assertArrayHasKey('error', $ret);
$this->assertArrayHasKey('data', $ret['error']);
$this->assertEquals(\Error::class, $ret['error']['data']['class']);
$this->assertArraySubset([
'message' => 'Not only a exception.',
'code' => 0,
], $ret['error']['data']['attributes']);
}
public function createContainer()
{
$eventDispatcher = \Mockery::mock(EventDispatcherInterface::class);

View File

@ -0,0 +1,90 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\JsonRpc;
use Hyperf\JsonRpc\DataFormatter;
use Hyperf\JsonRpc\NormalizeDataFormatter;
use Hyperf\RpcClient\Exception\RequestException;
use Hyperf\Utils\Serializer\SerializerFactory;
use Hyperf\Utils\Serializer\SymfonyNormalizer;
use Mockery;
use PHPUnit\Framework\TestCase;
/**
* @internal
* @coversNothing
*/
class DataFormatterTest extends TestCase
{
protected function tearDown()
{
Mockery::close();
}
public function testFormatErrorResponse()
{
$formatter = new DataFormatter();
$data = $formatter->formatErrorResponse([$id = uniqid(), 500, 'Error', new \RuntimeException('test case', 1000)]);
$this->assertEquals([
'jsonrpc' => '2.0',
'id' => $id,
'error' => [
'code' => 500,
'message' => 'Error',
'data' => [
'class' => 'RuntimeException',
'code' => 1000,
'message' => 'test case',
],
],
], $data);
$exception = new RequestException('', 0, $data['error']['data']);
$this->assertSame(1000, $exception->getThrowableCode());
$this->assertSame('test case', $exception->getThrowableMessage());
}
public function testNormalizeFormatErrorResponse()
{
$normalizer = new SymfonyNormalizer((new SerializerFactory())());
$formatter = new NormalizeDataFormatter($normalizer);
$data = $formatter->formatErrorResponse([$id = uniqid(), 500, 'Error', new \RuntimeException('test case', 1000)]);
$this->assertArrayHasKey('line', $data['error']['data']['attributes']);
$this->assertArrayHasKey('file', $data['error']['data']['attributes']);
$exception = new RequestException('', 0, $data['error']['data']);
$this->assertSame(1000, $exception->getThrowableCode());
$this->assertSame('test case', $exception->getThrowableMessage());
unset($data['error']['data']['attributes']['line'], $data['error']['data']['attributes']['file']);
$this->assertEquals([
'jsonrpc' => '2.0',
'id' => $id,
'error' => [
'code' => 500,
'message' => 'Error',
'data' => [
'class' => 'RuntimeException',
'attributes' => [
'code' => 1000,
'message' => 'test case',
],
],
],
], $data);
}
}

View File

@ -26,6 +26,9 @@ use Hyperf\JsonRpc\JsonRpcTransporter;
use Hyperf\JsonRpc\NormalizeDataFormatter;
use Hyperf\JsonRpc\PathGenerator;
use Hyperf\Logger\Logger;
use Hyperf\Rpc\IdGenerator\IdGeneratorInterface;
use Hyperf\Rpc\IdGenerator\UniqidIdGenerator;
use Hyperf\RpcClient\Exception\RequestException;
use Hyperf\RpcClient\ProxyFactory;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Utils\Packer\JsonPacker;
@ -59,9 +62,13 @@ class RpcServiceClientTest extends TestCase
$transporter->shouldReceive('setLoadBalancer')
->andReturnSelf();
$transporter->shouldReceive('send')
->andReturn(json_encode([
'result' => 3,
]));
->andReturnUsing(function ($data) {
$id = json_decode($data, true)['id'];
return json_encode([
'id' => $id,
'result' => 3,
]);
});
$service = new CalculatorProxyServiceClient($container, CalculatorServiceInterface::class, 'jsonrpc');
$ret = $service->add(1, 2);
$this->assertEquals(3, $ret);
@ -76,9 +83,13 @@ class RpcServiceClientTest extends TestCase
$transporter->shouldReceive('setLoadBalancer')
->andReturnSelf();
$transporter->shouldReceive('send')
->andReturn(json_encode([
'result' => ['params' => [1, 2], 'sum' => 3],
]));
->andReturnUsing(function ($data) {
$id = json_decode($data, true)['id'];
return json_encode([
'id' => $id,
'result' => ['params' => [1, 2], 'sum' => 3],
]);
});
$service = new CalculatorProxyServiceClient($container, CalculatorServiceInterface::class, 'jsonrpc');
$ret = $service->array(1, 2);
$this->assertEquals(['params' => [1, 2], 'sum' => 3], $ret);
@ -92,9 +103,13 @@ class RpcServiceClientTest extends TestCase
$transporter->shouldReceive('setLoadBalancer')
->andReturnSelf();
$transporter->shouldReceive('send')
->andReturn(json_encode([
'result' => 3,
]));
->andReturnUsing(function ($data) {
$id = json_decode($data, true)['id'];
return json_encode([
'id' => $id,
'result' => 3,
]);
});
$factory = new ProxyFactory();
$proxyClass = $factory->createProxy(CalculatorServiceInterface::class);
/** @var CalculatorServiceInterface $service */
@ -103,6 +118,37 @@ class RpcServiceClientTest extends TestCase
$this->assertEquals(3, $ret);
}
public function testProxyFactoryWithErrorId()
{
$container = $this->createContainer();
/** @var MockInterface $transporter */
$transporter = $container->get(JsonRpcTransporter::class);
$transporter->shouldReceive('setLoadBalancer')
->andReturnSelf();
$transporter->shouldReceive('send')
->andReturn(json_encode([
'id' => '1234',
'result' => 3,
]));
$once = true;
$transporter->shouldReceive('recv')->andReturnUsing(function () use (&$once) {
$this->assertTrue($once);
$once = false;
return json_encode([
'id' => '1234',
'result' => 3,
]);
});
$factory = new ProxyFactory();
$proxyClass = $factory->createProxy(CalculatorServiceInterface::class);
/** @var CalculatorServiceInterface $service */
$service = new $proxyClass($container, CalculatorServiceInterface::class, 'jsonrpc');
$this->expectException(RequestException::class);
$this->expectExceptionMessageRegExp('/^Invalid response\. Request id\[.*\] is not equal to response id\[1234\]\.$/');
$service->add(1, 2);
}
public function testProxyFactoryObjectParameter()
{
$container = $this->createContainer();
@ -111,9 +157,13 @@ class RpcServiceClientTest extends TestCase
$transporter->shouldReceive('setLoadBalancer')
->andReturnSelf();
$transporter->shouldReceive('send')
->andReturn(json_encode([
'result' => ['value' => 3],
]));
->andReturnUsing(function ($data) {
$id = json_decode($data, true)['id'];
return json_encode([
'id' => $id,
'result' => ['value' => 3],
]);
});
$factory = new ProxyFactory();
$proxyClass = $factory->createProxy(CalculatorServiceInterface::class);
/** @var CalculatorServiceInterface $service */
@ -159,6 +209,7 @@ class RpcServiceClientTest extends TestCase
JsonRpcTransporter::class => function () use ($transporter) {
return $transporter;
},
IdGeneratorInterface::class => UniqidIdGenerator::class,
], new ScanConfig()));
ApplicationContext::setContainer($container);
return $container;

View File

@ -35,4 +35,9 @@ class CalculatorProxyServiceClient extends AbstractProxyService implements Calcu
{
return $this->client->__call(__FUNCTION__, func_get_args());
}
public function error()
{
return $this->client->__call(__FUNCTION__, func_get_args());
}
}

View File

@ -14,17 +14,11 @@ namespace HyperfTest\JsonRpc\Stub;
class CalculatorService implements CalculatorServiceInterface
{
/**
* {@inheritdoc}
*/
public function add(int $a, int $b)
{
return $a + $b;
}
/**
* {@inheritdoc}
*/
public function sum(IntegerValue $a, IntegerValue $b): IntegerValue
{
return IntegerValue::newInstance($a->getValue() + $b->getValue());
@ -42,4 +36,9 @@ class CalculatorService implements CalculatorServiceInterface
{
return ['params' => [$a, $b], 'sum' => $a + $b];
}
public function error()
{
throw new \Error('Not only a exception.');
}
}

View File

@ -21,4 +21,6 @@ interface CalculatorServiceInterface
public function divide($value, $divider);
public function array(int $a, int $b): array;
public function error();
}

View File

@ -0,0 +1,65 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\JsonRpc;
use Hyperf\Config\Config;
use Hyperf\Contract\ContainerInterface;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\ExceptionHandler\ExceptionHandlerDispatcher;
use Hyperf\JsonRpc\Exception\Handler\TcpExceptionHandler;
use Hyperf\JsonRpc\TcpServer;
use Hyperf\Rpc\ProtocolManager;
use Hyperf\RpcServer\RequestDispatcher;
use Hyperf\Utils\ApplicationContext;
use Mockery;
use PHPUnit\Framework\TestCase;
/**
* @internal
* @coversNothing
*/
class TcpServerTest extends TestCase
{
public function testGetDefaultExceptionHandler()
{
$container = $this->getContainer();
$server = new TcpServer(
$container,
$container->get(RequestDispatcher::class),
$container->get(ExceptionHandlerDispatcher::class),
$container->get(ProtocolManager::class),
$container->get(StdoutLoggerInterface::class)
);
$ref = new \ReflectionClass($server);
$method = $ref->getMethod('getDefaultExceptionHandler');
$method->setAccessible(true);
$res = $method->invoke($server);
$this->assertSame([TcpExceptionHandler::class], $res);
}
protected function getContainer()
{
$container = Mockery::mock(ContainerInterface::class);
ApplicationContext::setContainer($container);
$container->shouldReceive('get')->with(RequestDispatcher::class)->andReturn(new RequestDispatcher($container));
$container->shouldReceive('get')->with(ExceptionHandlerDispatcher::class)->andReturn(new ExceptionHandlerDispatcher($container));
$config = new Config([]);
$container->shouldReceive('get')->with(ProtocolManager::class)->andReturn(new ProtocolManager($config));
$container->shouldReceive('get')->with(StdoutLoggerInterface::class)->andReturn(Mockery::mock(StdoutLoggerInterface::class));
return $container;
}
}

View File

@ -22,8 +22,10 @@ use Hyperf\LoadBalancer\LoadBalancerManager;
use Hyperf\LoadBalancer\Node;
use Hyperf\Rpc\Contract\DataFormatterInterface;
use Hyperf\Rpc\Contract\PathGeneratorInterface;
use Hyperf\Rpc\IdGenerator;
use Hyperf\Rpc\Protocol;
use Hyperf\Rpc\ProtocolManager;
use Hyperf\RpcClient\Exception\RequestException;
use InvalidArgumentException;
use Psr\Container\ContainerInterface;
use RuntimeException;
@ -93,20 +95,20 @@ abstract class AbstractServiceClient
$this->client = make(Client::class)
->setPacker($protocol->getPacker())
->setTransporter($transporter);
if ($container->has(IdGeneratorInterface::class)) {
$this->idGenerator = $container->get(IdGeneratorInterface::class);
}
$this->idGenerator = $this->getIdGenerator();
$this->pathGenerator = $protocol->getPathGenerator();
$this->dataFormatter = $protocol->getDataFormatter();
}
protected function __request(string $method, array $params, ?string $id = null)
{
if ($this->idGenerator instanceof IdGeneratorInterface && ! $id) {
if (! $id && $this->idGenerator instanceof IdGeneratorInterface) {
$id = $this->idGenerator->generate();
}
$response = $this->client->send($this->__generateData($method, $params, $id));
if (is_array($response)) {
$response = $this->checkRequestIdAndTryAgain($response, $id);
if (array_key_exists('result', $response)) {
return $response['result'];
}
@ -114,7 +116,7 @@ abstract class AbstractServiceClient
return $response['error'];
}
}
throw new RuntimeException('Invalid response.');
throw new RequestException('Invalid response.');
}
protected function __generateRpcPath(string $methodName): string
@ -130,6 +132,19 @@ abstract class AbstractServiceClient
return $this->dataFormatter->formatRequest([$this->__generateRpcPath($methodName), $params, $id]);
}
protected function getIdGenerator(): IdGeneratorInterface
{
if ($this->container->has(IdGenerator\IdGeneratorInterface::class)) {
return $this->container->get(IdGenerator\IdGeneratorInterface::class);
}
if ($this->container->has(IdGeneratorInterface::class)) {
return $this->container->get(IdGeneratorInterface::class);
}
return $this->container->get(IdGenerator\UniqidIdGenerator::class);
}
protected function createLoadBalancer(array $nodes, callable $refresh = null): LoadBalancerInterface
{
$loadBalancer = $this->loadBalancerManager->getInstance($this->serviceName, $this->loadBalancer)->setNodes($nodes);
@ -250,4 +265,24 @@ abstract class AbstractServiceClient
},
]);
}
protected function checkRequestIdAndTryAgain(array $response, $id, int $again = 1): array
{
if (isset($response['id']) && $response['id'] === $id) {
return $response;
}
if ($again <= 0) {
throw new RequestException(sprintf(
'Invalid response. Request id[%s] is not equal to response id[%s].',
$id,
$response['id'] ?? null
));
}
$response = $this->client->recv();
--$again;
return $this->checkRequestIdAndTryAgain($response, $id, $again);
}
}

View File

@ -42,6 +42,13 @@ class Client
return $packer->unpack((string) $response);
}
public function recv()
{
$packer = $this->getPacker();
$response = $this->getTransporter()->recv();
return $packer->unpack((string) $response);
}
public function getPacker(): PackerInterface
{
return $this->packer;

View File

@ -14,4 +14,51 @@ namespace Hyperf\RpcClient\Exception;
class RequestException extends \RuntimeException
{
/**
* @var array
*/
protected $throwable;
/**
* @param $throwable
* [
* 'class' => 'RuntimeException', // The exception class name
* 'code' => 0, // The exception code
* 'message' => '', // The exception message
* 'attributes' => [
* 'message' => '', // The exception message
* 'code' => 0, // The exception code
* 'file' => '/opt/www/hyperf/app/JsonRpc/CalculatorService.php', // The file path which the exception occurred
* 'line' => 99, // The line of file which the exception occurred
* ],
* ]
* @param string $message
* @param int $code
*/
public function __construct($message = '', $code = 0, array $throwable = [])
{
parent::__construct($message, $code);
$this->throwable = $throwable;
}
public function getThrowable(): array
{
return $this->throwable;
}
public function getThrowableCode(): int
{
return intval($this->throwable['code'] ?? $this->throwable['attributes']['code'] ?? 0);
}
public function getThrowableMessage(): string
{
return strval($this->throwable['message'] ?? $this->throwable['attributes']['message'] ?? '');
}
public function getThrowableClassName(): string
{
return strval($this->throwable['class']);
}
}

View File

@ -56,6 +56,8 @@ class ServiceClient extends AbstractServiceClient
throw new RequestException('Invalid response.');
}
$response = $this->checkRequestIdAndTryAgain($response, $id);
if (isset($response['result'])) {
$type = $this->methodDefinitionCollector->getReturnType($this->serviceInterface, $method);
return $this->normalizer->denormalize($response['result'], $type->getName());
@ -67,13 +69,13 @@ class ServiceClient extends AbstractServiceClient
$class = Arr::get($error, 'data.class');
$attributes = Arr::get($error, 'data.attributes', []);
if (isset($class) && class_exists($class) && $e = $this->normalizer->denormalize($attributes, $class)) {
if ($e instanceof \Exception) {
if ($e instanceof \Throwable) {
throw $e;
}
}
// Throw RequestException when denormalize exception failed.
throw new RequestException($error['message'] ?? '', $error['code']);
throw new RequestException($error['message'] ?? '', $code, $error['data'] ?? []);
}
throw new RequestException('Invalid response.');

View File

@ -96,9 +96,7 @@ abstract class Server implements OnReceiveInterface, MiddlewareInitializerInterf
$config = $this->container->get(ConfigInterface::class);
$this->middlewares = $config->get('middlewares.' . $serverName, []);
$this->exceptionHandlers = $config->get('exceptions.handler.' . $serverName, [
HttpExceptionHandler::class,
]);
$this->exceptionHandlers = $config->get('exceptions.handler.' . $serverName, $this->getDefaultExceptionHandler());
}
public function onReceive(SwooleServer $server, int $fd, int $fromId, string $data): void
@ -137,6 +135,13 @@ abstract class Server implements OnReceiveInterface, MiddlewareInitializerInterf
$this->logger->debug(sprintf('Connect to %s:%d', $port->host, $port->port));
}
protected function getDefaultExceptionHandler(): array
{
return [
HttpExceptionHandler::class,
];
}
protected function send(SwooleServer $server, int $fd, ResponseInterface $response): void
{
$server->send($fd, (string) $response->getBody());

View File

@ -18,6 +18,8 @@ interface TransporterInterface
{
public function send(string $data);
public function recv();
public function getLoadBalancer(): ?LoadBalancerInterface;
public function setLoadBalancer(LoadBalancerInterface $loadBalancer): TransporterInterface;

View File

@ -0,0 +1,17 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Rpc\Exception;
class RecvException extends \RuntimeException
{
}

View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Rpc\IdGenerator;
use Hyperf\Contract;
interface IdGeneratorInterface extends Contract\IdGeneratorInterface
{
}

View File

@ -0,0 +1,21 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Rpc\IdGenerator;
class UniqidIdGenerator implements IdGeneratorInterface
{
public function generate()
{
return uniqid();
}
}

View File

@ -32,11 +32,11 @@ class ExceptionNormalizer implements NormalizerInterface, DenormalizerInterface,
{
if (is_string($data)) {
$ex = unserialize($data);
if ($ex instanceof \Exception) {
if ($ex instanceof \Throwable) {
return $ex;
}
// Retry handle it if the exception not instanceof \Exception.
// Retry handle it if the exception not instanceof \Throwable.
$data = $ex;
}
if (is_array($data) && isset($data['message'], $data['code'])) {
@ -73,7 +73,7 @@ class ExceptionNormalizer implements NormalizerInterface, DenormalizerInterface,
*/
public function supportsDenormalization($data, $type, $format = null)
{
return class_exists($type) && is_a($type, \Exception::class, true);
return class_exists($type) && is_a($type, \Throwable::class, true);
}
/**
@ -84,7 +84,7 @@ class ExceptionNormalizer implements NormalizerInterface, DenormalizerInterface,
if ($object instanceof \Serializable) {
return serialize($object);
}
/* @var \Exception $object */
/* @var \Throwable $object */
return [
'message' => $object->getMessage(),
'code' => $object->getCode(),
@ -98,7 +98,7 @@ class ExceptionNormalizer implements NormalizerInterface, DenormalizerInterface,
*/
public function supportsNormalization($data, $format = null)
{
return $data instanceof \Exception;
return $data instanceof \Throwable;
}
/**