Support Coroutine TCP Server

This commit is contained in:
huangzhhui 2020-06-18 23:18:17 +08:00
parent b645a5cce7
commit e66bfef473
5 changed files with 91 additions and 26 deletions

View File

@ -12,8 +12,12 @@ declare(strict_types=1);
namespace Hyperf\Contract;
use Swoole\Server as SwooleServer;
use Swoole\Coroutine\Server\Connection;
interface OnReceiveInterface
{
public function onReceive(SwooleServer $server, int $fd, int $fromId, string $data): void;
/**
* @param SwooleServer|Connection $server
*/
public function onReceive($server, int $fd, int $fromId, string $data): void;
}

View File

@ -111,7 +111,7 @@ class TcpServer extends Server
return new CoreMiddleware($this->container, $this->protocol, $this->responseBuilder, $this->serverName);
}
protected function buildResponse(int $fd, SwooleServer $server): ResponseInterface
protected function buildResponse(int $fd, $server): ResponseInterface
{
$response = new Psr7Response();
return $response->withAttribute('fd', $fd)->withAttribute('server', $server);

View File

@ -29,6 +29,7 @@ use Psr\Container\ContainerInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Log\LoggerInterface;
use Swoole\Coroutine\Server\Connection;
use Swoole\Server as SwooleServer;
use Throwable;
@ -101,7 +102,7 @@ abstract class Server implements OnReceiveInterface, MiddlewareInitializerInterf
$this->exceptionHandlers = $config->get('exceptions.handler.' . $serverName, $this->getDefaultExceptionHandler());
}
public function onReceive(SwooleServer $server, int $fd, int $fromId, string $data): void
public function onReceive($server, int $fd, int $fromId, string $data): void
{
$request = $response = null;
try {
@ -146,16 +147,23 @@ abstract class Server implements OnReceiveInterface, MiddlewareInitializerInterf
];
}
protected function send(SwooleServer $server, int $fd, ResponseInterface $response): void
/**
* @param SwooleServer|\Swoole\Coroutine\Server\Connection $server
*/
protected function send($server, int $fd, ResponseInterface $response): void
{
$server->send($fd, (string) $response->getBody());
if ($server instanceof SwooleServer) {
$server->send($fd, (string) $response->getBody());
} elseif ($server instanceof Connection) {
$server->send((string) $response->getBody());
}
}
abstract protected function createCoreMiddleware(): CoreMiddlewareInterface;
abstract protected function buildRequest(int $fd, int $fromId, string $data): ServerRequestInterface;
abstract protected function buildResponse(int $fd, SwooleServer $server): ResponseInterface;
abstract protected function buildResponse(int $fd, $server): ResponseInterface;
protected function transferToResponse($response): ?ResponseInterface
{

View File

@ -114,6 +114,9 @@ class CoroutineServer implements ServerInterface
{
$servers = $config->getServers();
foreach ($servers as $server) {
if (! $server instanceof \Hyperf\Server\Port) {
continue;
}
$name = $server->getName();
$type = $server->getType();
$host = $server->getHost();
@ -121,36 +124,85 @@ class CoroutineServer implements ServerInterface
$callbacks = array_replace($config->getCallbacks(), $server->getCallbacks());
$this->server = $this->makeServer($type, $host, $port);
$this->server->set(array_replace($config->getSettings(), $server->getSettings()));
$settings = array_replace($config->getSettings(), $server->getSettings());
$this->server->set($settings);
if (isset($callbacks[SwooleEvent::ON_REQUEST])) {
[$class, $method] = $callbacks[SwooleEvent::ON_REQUEST];
$handler = $this->container->get($class);
if ($handler instanceof MiddlewareInitializerInterface) {
$handler->initCoreMiddleware($name);
}
$this->server->handle('/', [$handler, $method]);
} elseif (isset($callbacks[SwooleEvent::ON_HAND_SHAKE])) {
[$class, $method] = $callbacks[SwooleEvent::ON_HAND_SHAKE];
$handler = $this->container->get($class);
if ($handler instanceof MiddlewareInitializerInterface) {
$handler->initCoreMiddleware($name);
}
$this->server->handle('/', [$handler, $method]);
}
$this->bindServerCallbacks($type, $name, $callbacks);
ServerManager::add($name, [$type, $this->server, $callbacks]);
}
}
protected function bindServerCallbacks(int $type, string $name, array $callbacks)
{
switch ($type) {
case ServerInterface::SERVER_HTTP:
if (isset($callbacks[SwooleEvent::ON_REQUEST])) {
[$handler, $method] = $this->getCallbackMethod(SwooleEvent::ON_REQUEST, $callbacks);
if ($handler instanceof MiddlewareInitializerInterface) {
$handler->initCoreMiddleware($name);
}
$this->server->handle('/', [$handler, $method]);
}
return;
case ServerInterface::SERVER_WEBSOCKET:
if (isset($callbacks[SwooleEvent::ON_HAND_SHAKE])) {
[$handler, $method] = $this->getCallbackMethod(SwooleEvent::ON_HAND_SHAKE, $callbacks);
if ($handler instanceof MiddlewareInitializerInterface) {
$handler->initCoreMiddleware($name);
}
$this->server->handle('/', [$handler, $method]);
}
return;
case ServerInterface::SERVER_BASE:
if (isset($callbacks[SwooleEvent::ON_RECEIVE])) {
[$connectHandler, $connectMethod] = $this->getCallbackMethod(SwooleEvent::ON_CONNECT, $callbacks);
[$receiveHandler, $receiveMethod] = $this->getCallbackMethod(SwooleEvent::ON_RECEIVE, $callbacks);
[$closeHandler, $closeMethod] = $this->getCallbackMethod(SwooleEvent::ON_CLOSE, $callbacks);
$this->server->handle(function (Coroutine\Server\Connection $connection) use ($name, $connectHandler, $connectMethod, $receiveHandler, $receiveMethod, $closeHandler, $closeMethod) {
if ($connectHandler && $connectMethod) {
$connectHandler->$connectMethod($connection, $connection->exportSocket()->fd);
}
if ($receiveHandler instanceof MiddlewareInitializerInterface) {
$receiveHandler->initCoreMiddleware($name);
}
while (true) {
$data = $connection->recv();
if (empty($data)) {
if ($closeHandler && $closeMethod) {
$closeHandler->$closeMethod($connection, $connection->exportSocket()->fd);
}
$connection->close();
break;
}
$receiveHandler->$receiveMethod($connection, $connection->exportSocket()->fd, 0, $data);
}
});
}
return;
}
throw new RuntimeException('Server type is invalid or the server callback does not exists.');
}
protected function getCallbackMethod(string $callack, array $callbacks): array
{
$handler = $method = null;
if (isset($callbacks[$callack])) {
[$class, $method] = $callbacks[$callack];
$handler = $this->container->get($class);
}
return [$handler, $method];
}
protected function makeServer($type, $host, $port)
{
switch ($type) {
case ServerInterface::SERVER_HTTP:
case ServerInterface::SERVER_WEBSOCKET:
return new Coroutine\Http\Server($host, $port);
return new Coroutine\Http\Server($host, $port, false, true);
case ServerInterface::SERVER_BASE:
return new Coroutine\Server($host, $port);
return new Coroutine\Server($host, $port, false, true);
}
throw new RuntimeException('Server type is invalid.');

View File

@ -58,10 +58,11 @@ class AfterWorkerStartListener implements ListenerInterface
switch ($type) {
case Server::SERVER_BASE:
$sockType = $server->type;
if (($sockType === SWOOLE_SOCK_TCP) || ($sockType === SWOOLE_SOCK_TCP6)) {
// type of Swoole\Coroutine\Server is equal to SWOOLE_SOCK_UDP
if ($server instanceof \Swoole\Coroutine\Server || in_array($sockType, [SWOOLE_SOCK_TCP, SWOOLE_SOCK_TCP6])) {
return 'TCP';
}
if (($sockType === SWOOLE_SOCK_UDP) || ($sockType === SWOOLE_SOCK_UDP6)) {
if (in_array($sockType, [SWOOLE_SOCK_UDP, SWOOLE_SOCK_UDP6])) {
return 'UDP';
}
return 'UNKNOWN';