Support GRPC services which can easily to registry and discovery. (#5511)

Co-authored-by: lixinhan <lixinhan@yuanxinjituan.com>
Co-authored-by: 李铭昕 <715557344@qq.com>
This commit is contained in:
lixinhan 2023-03-18 16:32:36 +08:00 committed by GitHub
parent 85b0fa03b6
commit a3de56b4ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 437 additions and 15 deletions

View File

@ -5,6 +5,7 @@
- [#5533](https://github.com/hyperf/hyperf/pull/5533) [#5535](https://github.com/hyperf/hyperf/pull/5535) Added `client` & `socket` config for kafka.
- [#5536](https://github.com/hyperf/hyperf/pull/5536) Added `hyperf/http2-client`.
- [#5538](https://github.com/hyperf/hyperf/pull/5538) Support stream call for http2 client.
- [#5511](https://github.com/hyperf/hyperf/pull/5511) Support GRPC services which can easily to registry and discovery.
## Optimized

View File

@ -11,10 +11,16 @@ declare(strict_types=1);
*/
namespace Hyperf\GrpcClient;
use Hyperf\GrpcClient\Listener\RegisterProtocolListener;
class ConfigProvider
{
public function __invoke(): array
{
return [];
return [
'listeners' => [
RegisterProtocolListener::class,
],
];
}
}

View File

@ -0,0 +1,66 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\GrpcClient;
use Hyperf\Rpc\Context;
use Hyperf\Rpc\Contract\DataFormatterInterface;
use Hyperf\Rpc\ErrorResponse;
use Hyperf\Rpc\Request;
use Hyperf\Rpc\Response;
use Throwable;
class DataFormatter implements DataFormatterInterface
{
public function __construct(protected Context $context)
{
}
public function formatRequest(Request $request): array
{
return [
'method' => $request->getPath(),
'params' => $request->getParams(),
'id' => $request->getId(),
'context' => $this->context->getData(),
];
}
public function formatResponse(Response $response): array
{
return [
'id' => $response->getId(),
'result' => $response->getResult(),
'context' => $this->context->getData(),
];
}
public function formatErrorResponse(ErrorResponse $response): array
{
$exception = $response->getException();
if ($exception instanceof Throwable) {
$exception = [
'class' => get_class($exception),
'code' => $exception->getCode(),
'message' => $exception->getMessage(),
];
}
return [
'id' => $response->getId(),
'error' => [
'code' => $response->getCode(),
'message' => $response->getMessage(),
'data' => $exception,
],
'context' => $this->context->getData(),
];
}
}

View File

@ -0,0 +1,28 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\GrpcClient;
use Hyperf\Contract\NormalizerInterface;
use Hyperf\Grpc\Parser;
class GrpcNormalizer implements NormalizerInterface
{
public function normalize($object)
{
return Parser::serializeMessage($object);
}
public function denormalize($data, string $class)
{
return Parser::deserializeMessage([$class, 'decode'], $data);
}
}

View File

@ -0,0 +1,27 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\GrpcClient;
use Hyperf\Contract\PackerInterface;
class GrpcPacker implements PackerInterface
{
public function pack($data): string
{
return serialize($data);
}
public function unpack(string $data)
{
return unserialize($data);
}
}

View File

@ -0,0 +1,82 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\GrpcClient;
use Hyperf\Grpc\StatusCode;
use Hyperf\LoadBalancer\LoadBalancerInterface;
use Hyperf\LoadBalancer\Node;
use Hyperf\Rpc\Contract\TransporterInterface;
use RuntimeException;
class GrpcTransporter implements TransporterInterface
{
private ?LoadBalancerInterface $loadBalancer = null;
/**
* If $loadBalancer is null, will select a node in $nodes to request,
* otherwise, use the nodes in $loadBalancer.
*
* @var Node[]
*/
private array $nodes = [];
public function send(string $data)
{
$node = $this->getNode();
$unserializeData = unserialize($data);
$method = $unserializeData['method'] ?? '';
$id = $unserializeData['id'] ?? '';
$params = $unserializeData['params'][0] ?? [];
// TODO: Don't make new one when send messages.
$client = new BaseClient($node->host . ':' . $node->port, []);
$request = new Request($method, $params, []);
$streamId = $client->send($request);
$response = $client->recv($streamId);
$client->close();
if ($response->headers['grpc-status'] == StatusCode::OK) {
$responseData = ['id' => $id, 'result' => $response->data];
} else {
$responseData = [
'id' => $id,
'error' => [
'code' => intval($response->headers['grpc-status']),
'message' => $response->headers['grpc-message'],
],
];
}
return serialize($responseData);
}
public function recv()
{
throw new RuntimeException(__CLASS__ . ' does not support recv method.');
}
public function getLoadBalancer(): ?LoadBalancerInterface
{
return $this->loadBalancer;
}
public function setLoadBalancer(LoadBalancerInterface $loadBalancer): TransporterInterface
{
$this->loadBalancer = $loadBalancer;
return $this;
}
private function getNode(): Node
{
if ($this->loadBalancer instanceof LoadBalancerInterface) {
return $this->loadBalancer->select();
}
return $this->nodes[array_rand($this->nodes)];
}
}

View File

@ -0,0 +1,50 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\GrpcClient\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BootApplication;
use Hyperf\Grpc\PathGenerator;
use Hyperf\GrpcClient\DataFormatter;
use Hyperf\GrpcClient\GrpcNormalizer;
use Hyperf\GrpcClient\GrpcPacker;
use Hyperf\GrpcClient\GrpcTransporter;
use Hyperf\Rpc\ProtocolManager;
class RegisterProtocolListener implements ListenerInterface
{
public function __construct(private ProtocolManager $protocolManager)
{
}
public function listen(): array
{
return [
BootApplication::class,
];
}
/**
* All official rpc protocols should register in here,
* and the others non-official protocols should register in their own component via listener.
*/
public function process(object $event): void
{
$this->protocolManager->register('grpc', [
'packer' => GrpcPacker::class,
'transporter' => GrpcTransporter::class,
'path-generator' => PathGenerator::class,
'data-formatter' => DataFormatter::class,
'normalizer' => GrpcNormalizer::class,
]);
}
}

View File

@ -11,11 +11,24 @@ declare(strict_types=1);
*/
namespace Hyperf\GrpcServer;
use Hyperf\GrpcServer\Listener\RegisterProtocolListener;
use Hyperf\GrpcServer\Listener\RegisterServiceListener;
use Hyperf\ServiceGovernance\ServiceManager;
class ConfigProvider
{
public function __invoke(): array
{
return [
'listeners' => [
RegisterProtocolListener::class,
value(function () {
if (class_exists(ServiceManager::class)) {
return RegisterServiceListener::class;
}
return null;
}),
],
];
}
}

View File

@ -15,7 +15,6 @@ use Closure;
use FastRoute\Dispatcher;
use Google\Protobuf\Internal\Message;
use Google\Protobuf\Internal\Message as ProtobufMessage;
use Google\Rpc\Status;
use Hyperf\Context\Context;
use Hyperf\Di\MethodDefinitionCollector;
use Hyperf\Di\ReflectionManager;
@ -24,6 +23,9 @@ use Hyperf\Grpc\StatusCode;
use Hyperf\HttpMessage\Stream\SwooleStream;
use Hyperf\HttpServer\CoreMiddleware as HttpCoreMiddleware;
use Hyperf\HttpServer\Router\Dispatched;
use Hyperf\Rpc\Protocol;
use Hyperf\Rpc\ProtocolManager;
use Hyperf\RpcServer\Router\DispatcherFactory;
use Hyperf\Server\Exception\ServerException;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
@ -33,6 +35,15 @@ use RuntimeException;
class CoreMiddleware extends HttpCoreMiddleware
{
protected Protocol $protocol;
public function __construct($container, string $serverName)
{
$this->protocol = new Protocol($container, $container->get(ProtocolManager::class), 'grpc');
parent::__construct($container, $serverName);
}
/**
* Process an incoming server request and return a response, optionally delegating
* response creation to a handler.
@ -78,6 +89,14 @@ class CoreMiddleware extends HttpCoreMiddleware
}
}
protected function createDispatcher(string $serverName): Dispatcher
{
$factory = make(DispatcherFactory::class, [
'pathGenerator' => $this->protocol->getPathGenerator(),
]);
return $factory->getDispatcher($serverName);
}
/**
* Transfer the non-standard response content to a standard response object.
*

View File

@ -0,0 +1,42 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\GrpcServer\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BootApplication;
use Hyperf\Grpc\PathGenerator;
use Hyperf\Rpc\ProtocolManager;
class RegisterProtocolListener implements ListenerInterface
{
public function __construct(private ProtocolManager $protocolManager)
{
}
public function listen(): array
{
return [
BootApplication::class,
];
}
/**
* All official rpc protocols should register in here,
* and the others non-official protocols should register in their own component via listener.
*/
public function process(object $event): void
{
$this->protocolManager->registerOrAppend('grpc', [
'path-generator' => PathGenerator::class,
]);
}
}

View File

@ -0,0 +1,49 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\GrpcServer\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\RpcServer\Event\AfterPathRegister;
use Hyperf\ServiceGovernance\ServiceManager;
class RegisterServiceListener implements ListenerInterface
{
public function __construct(private ServiceManager $serviceManager)
{
}
public function listen(): array
{
return [
AfterPathRegister::class,
];
}
/**
* All official rpc protocols should register in here,
* and the others non-official protocols should register in their own component via listener.
*
* @param AfterPathRegister $event
*/
public function process(object $event): void
{
$annotation = $event->annotation;
if (! in_array($annotation->protocol, ['grpc'])) {
return;
}
$metadata = $event->toArray();
$annotationArray = $metadata['annotation'];
unset($metadata['path'], $metadata['annotation'], $annotationArray['name']);
$this->serviceManager->register($annotation->name, $event->path, array_merge($metadata, $annotationArray));
}
}

View File

@ -12,21 +12,28 @@ declare(strict_types=1);
namespace HyperfTest\GrpcServer;
use Closure;
use Hyperf\Config\Config;
use Hyperf\Contract\ContainerInterface;
use Hyperf\Contract\NormalizerInterface;
use Hyperf\Di\ClosureDefinitionCollector;
use Hyperf\Di\ClosureDefinitionCollectorInterface;
use Hyperf\Di\MethodDefinitionCollector;
use Hyperf\Di\MethodDefinitionCollectorInterface;
use Hyperf\Grpc\PathGenerator;
use Hyperf\GrpcServer\CoreMiddleware;
use Hyperf\HttpMessage\Server\Request;
use Hyperf\HttpMessage\Uri\Uri;
use Hyperf\HttpServer\Router\Dispatched;
use Hyperf\HttpServer\Router\DispatcherFactory;
use Hyperf\HttpServer\Router\Handler;
use Hyperf\Rpc\ProtocolManager;
use Hyperf\RpcServer\Router\DispatcherFactory as RPCDispatcherFactory;
use Hyperf\RpcServer\Router\RouteCollector;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Utils\Serializer\SimpleNormalizer;
use Mockery;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
/**
* @internal
@ -34,16 +41,17 @@ use Psr\Container\ContainerInterface;
*/
class CoreMiddlewareTest extends TestCase
{
public function testDispatch()
public function testGRPCCoreMiddlewareDispatch()
{
$container = $this->getContainer();
$router = $container->get(DispatcherFactory::class)->getRouter('grpc');
$router->addRoute('GET', '/users', function () {});
/** @var RouteCollector $router */
$router = $container->get(RPCDispatcherFactory::class . '.unit')->getRouter('grpc');
$router->addRoute('/users', static function () {});
$middleware = new CoreMiddleware($container, 'grpc');
$request = new Request('GET', new Uri('/users'));
$request = new Request('POST', new Uri('/users'));
$request = $middleware->dispatch($request);
$dispatched = $request->getAttribute(Dispatched::class);
$this->assertInstanceOf(Request::class, $request);
@ -57,15 +65,21 @@ class CoreMiddlewareTest extends TestCase
protected function getContainer()
{
$container = Mockery::mock(ContainerInterface::class);
ApplicationContext::setContainer($container);
$container->shouldReceive('get')->with(DispatcherFactory::class)->andReturn(new DispatcherFactory());
$container->shouldReceive('get')->with(MethodDefinitionCollectorInterface::class)
->andReturn(new MethodDefinitionCollector());
$container->shouldReceive('has')->with(ClosureDefinitionCollectorInterface::class)
->andReturn(false);
$container->shouldReceive('get')->with(ClosureDefinitionCollectorInterface::class)
->andReturn(new ClosureDefinitionCollector());
$container->shouldReceive('get')->with(NormalizerInterface::class)
->andReturn(new SimpleNormalizer());
$container->shouldReceive('get')->with(MethodDefinitionCollectorInterface::class)->andReturn(new MethodDefinitionCollector());
$container->shouldReceive('has')->with(ClosureDefinitionCollectorInterface::class)->andReturn(false);
$container->shouldReceive('get')->with(ClosureDefinitionCollectorInterface::class)->andReturn(new ClosureDefinitionCollector());
$container->shouldReceive('get')->with(NormalizerInterface::class)->andReturn(new SimpleNormalizer());
$container->shouldReceive('get')->with(ProtocolManager::class)->andReturn($manager = new ProtocolManager(new Config([])));
$manager->registerOrAppend('grpc', [
'path-generator' => PathGenerator::class,
]);
$container->shouldReceive('has')->with(PathGenerator::class)->andReturnTrue();
$container->shouldReceive('get')->with(PathGenerator::class)->andReturn(new PathGenerator());
$container->shouldReceive('get')->with(EventDispatcherInterface::class)->andReturn(Mockery::mock(EventDispatcherInterface::class));
$container->shouldReceive('make')->with(RPCDispatcherFactory::class)->withAnyArgs()->andReturn($dispatcher = new RPCDispatcherFactory(Mockery::mock(EventDispatcherInterface::class), new PathGenerator()));
$container->shouldReceive('get')->with(RPCDispatcherFactory::class . '.unit')->andReturn($dispatcher);
return $container;
}
}

View File

@ -0,0 +1,25 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Grpc;
use Hyperf\Rpc\Contract\PathGeneratorInterface;
use Hyperf\Utils\Str;
class PathGenerator implements PathGeneratorInterface
{
public function generate(string $service, string $method): string
{
$handledNamespace = explode('\\', $service);
$handledNamespace = Str::replaceLast('Service', '', end($handledNamespace));
return '/grpc.' . $handledNamespace . '/' . $method;
}
}