Added Sender to fix fd does not found when server::push.

This commit is contained in:
李铭昕 2019-08-17 10:07:13 +08:00
parent 7557336fcb
commit 5f677361fb
5 changed files with 302 additions and 0 deletions

View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\WebSocketServer\Exception;
use Hyperf\Server\Exception\ServerException;
class MethodInvalidException extends ServerException
{
}

View File

@ -0,0 +1,53 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\WebSocketServer\Listener;
use Hyperf\Event\Annotation\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\AfterWorkerStart;
use Hyperf\WebSocketServer\Sender;
use Psr\Container\ContainerInterface;
/**
* @Listener
*/
class InitSenderListener implements ListenerInterface
{
/**
* @var ContainerInterface
*/
private $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
/**
* @return string[] returns the events that you want to listen
*/
public function listen(): array
{
return [
AfterWorkerStart::class,
];
}
public function process(object $event)
{
if ($event instanceof AfterWorkerStart) {
$sender = $this->container->get(Sender::class);
$sender->setWorkerId($event->workerId);
}
}
}

View File

@ -0,0 +1,88 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\WebSocketServer\Listener;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Event\Annotation\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
use Hyperf\Framework\Event\OnPipeMessage;
use Hyperf\Server\ServerFactory;
use Hyperf\WebSocketServer\Sender;
use Hyperf\WebSocketServer\SenderPipeMessage;
use Psr\Container\ContainerInterface;
/**
* @Listener
*/
class OnPipeMessageListener implements ListenerInterface
{
/**
* @var ContainerInterface
*/
private $container;
/**
* @var ConfigInterface
*/
private $config;
/**
* @var StdoutLoggerInterface
*/
private $logger;
public function __construct(ContainerInterface $container, ConfigInterface $config, StdoutLoggerInterface $logger)
{
$this->container = $container;
$this->config = $config;
$this->logger = $logger;
}
/**
* @return string[] returns the events that you want to listen
*/
public function listen(): array
{
return [
OnPipeMessage::class,
];
}
/**
* Handle the Event when the event is triggered, all listeners will
* complete before the event is returned to the EventDispatcher.
*/
public function process(object $event)
{
if ($event instanceof OnPipeMessage && $event->data instanceof SenderPipeMessage) {
/** @var SenderPipeMessage $message */
$message = $event->data;
try {
$sender = $this->container->get(Sender::class);
$sender->proxy($message->name, $message->arguments);
} catch (\Throwable $exception) {
$formatter = $this->container->get(FormatterInterface::class);
$this->logger->warning($formatter->format($exception));
}
}
}
protected function getServer()
{
return $this->container->get(ServerFactory::class)->getServer()->getServer();
}
}

View File

@ -0,0 +1,110 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\WebSocketServer;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Server\ServerFactory;
use Hyperf\WebSocketServer\Exception\MethodInvalidException;
use Psr\Container\ContainerInterface;
/**
* @method push(int $fd, $data, int $opcode = null, $finish = null)
*/
class Sender
{
protected $container;
protected $logger;
protected $workerId;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
$this->logger = $container->get(StdoutLoggerInterface::class);
}
public function __call($name, $arguments)
{
if (! $this->proxy($name, $arguments)) {
$this->sendPipeMessage($name, $arguments);
}
}
public function proxy($name, $arguments): bool
{
$fd = $this->getFdFromProxyMethod($name, $arguments);
if ($fd === false) {
throw new MethodInvalidException($arguments);
}
$ret = $this->check($fd);
if ($ret) {
$this->getServer()->push(...$arguments);
$this->logger->debug("[WebSocket] Worker.{$this->workerId} send to #{$fd}");
}
return $ret;
}
/**
* @param int $workerId
*/
public function setWorkerId(int $workerId): void
{
$this->workerId = $workerId;
}
public function check($fd): bool
{
$server = $this->getServer();
$info = $server->connection_info($fd);
if ($info && $info['websocket_status'] == WEBSOCKET_STATUS_ACTIVE) {
return true;
}
return false;
}
/**
* @param $method
* @param mixed $arguments
* @return null|bool|int
*/
protected function getFdFromProxyMethod($method, $arguments)
{
if (in_array($method, ['push', 'send', 'sendto'])) {
return $arguments[0];
}
return false;
}
protected function getServer()
{
return $this->container->get(ServerFactory::class)->getServer()->getServer();
}
protected function sendPipeMessage($name, $arguments)
{
$server = $this->getServer();
$workerCount = $server->setting['worker_num'] - 1;
for ($workerId = 0; $workerId <= $workerCount; ++$workerId) {
if ($workerId !== $this->workerId) {
$server->sendMessage(new SenderPipeMessage($name, $arguments), $workerId);
$this->logger->debug("[WebSocket] Let Worker.{$workerId} try to {$name}.");
}
}
}
}

View File

@ -0,0 +1,32 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\WebSocketServer;
class SenderPipeMessage
{
/**
* @var string
*/
public $name;
/**
* @var array
*/
public $arguments;
public function __construct($name, $arguments)
{
$this->name = $name;
$this->arguments = $arguments;
}
}