Removed requirement ext-swoole from socketio-server. (#5844)

This commit is contained in:
李铭昕 2023-06-17 20:34:02 +08:00 committed by GitHub
parent 3f0c47871a
commit 10362573de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 128 additions and 152 deletions

View File

@ -29,9 +29,6 @@
"phpunit/phpunit": "^9.4",
"swoole/ide-helper": "dev-master"
},
"suggest": {
"ext-swoole": ">=4.5"
},
"type": "library",
"extra": {
"branch-alias": {

View File

@ -23,9 +23,6 @@
"phpunit/phpunit": "^9.4",
"swoole/ide-helper": "dev-master"
},
"suggest": {
"ext-swoole": ">=4.5"
},
"time": "2021-07-11T10:22:13+00:00",
"type": "library",
"extra": {

View File

@ -29,9 +29,6 @@
"phpunit/phpunit": "^9.4",
"swoole/ide-helper": "dev-master"
},
"suggest": {
"ext-swoole": ">=4.5"
},
"type": "library",
"extra": {
"branch-alias": {

View File

@ -24,9 +24,6 @@
"phpunit/phpunit": "^9.4",
"swoole/ide-helper": "dev-master"
},
"suggest": {
"ext-swoole": ">=4.5"
},
"time": "2021-07-11T10:22:13+00:00",
"type": "library",
"extra": {

View File

@ -11,7 +11,6 @@
"php": ">=8.1",
"ext-json": "*",
"ext-redis": "*",
"ext-swoole": ">=5.0",
"hyperf/codec": "~3.1.0",
"hyperf/collection": "~3.1.0",
"hyperf/di": "~3.1.0",

View File

@ -0,0 +1,49 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\SocketIOServer;
use Hyperf\Server\Server;
use Hyperf\Server\ServerFactory;
use Psr\Container\ContainerInterface;
use Swoole\Atomic as SwooleAtomic;
class Atomic
{
protected ?SwooleAtomic $atomic = null;
protected int $id = 0;
public function __construct(?ContainerInterface $container = null)
{
if ($config = $container?->get(ServerFactory::class)->getConfig() and $config->getType() === Server::class) {
$this->atomic = new SwooleAtomic();
}
}
public function get(): int
{
if ($this->atomic) {
return $this->atomic->get();
}
return $this->id;
}
public function add(): int
{
if ($this->atomic) {
return $this->atomic->add();
}
return ++$this->id;
}
}

View File

@ -12,6 +12,7 @@ declare(strict_types=1);
namespace Hyperf\SocketIOServer\Emitter;
use Hyperf\Context\ApplicationContext;
use Hyperf\Engine\WebSocket\Frame;
use Hyperf\SocketIOServer\Parser\Encoder;
use Hyperf\SocketIOServer\Parser\Engine;
use Hyperf\SocketIOServer\Parser\Packet;
@ -29,8 +30,6 @@ use function Hyperf\Support\make;
*/
trait Emitter
{
use Flagger;
protected ?AdapterInterface $adapter = null;
/**
@ -139,8 +138,7 @@ trait Emitter
'event' => $event,
'data' => $data,
'encode' => fn ($i, $event, $data) => $this->encode($i, $event, $data),
'opcode' => SWOOLE_WEBSOCKET_OPCODE_TEXT,
'flag' => $this->guessFlags($this->compress),
'frame' => new Frame(),
]);
}

View File

@ -1,32 +0,0 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\SocketIOServer\Emitter;
trait Flagger
{
/**
* @return bool|int flags
*/
protected function guessFlags(bool $compress): bool|int
{
// older swoole version
if (! defined('SWOOLE_WEBSOCKET_FLAG_FIN')) {
return true;
}
if ($compress) {
return SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS;
}
return SWOOLE_WEBSOCKET_FLAG_FIN;
}
}

View File

@ -12,6 +12,8 @@ declare(strict_types=1);
namespace Hyperf\SocketIOServer\Emitter;
use Hyperf\Engine\Channel;
use Hyperf\Engine\Contract\WebSocket\FrameInterface;
use Hyperf\Engine\WebSocket\Frame;
use Hyperf\SocketIOServer\SocketIO;
use Hyperf\WebSocketServer\Sender;
@ -34,7 +36,8 @@ class Future
private array $data,
callable $encode,
private int $opcode,
private int $flag
private int $flag,
private ?FrameInterface $frame = null
) {
$this->id = '';
$this->encode = $encode;
@ -69,6 +72,10 @@ class Future
}
$message = ($this->encode)($this->id, $this->event, $this->data);
$this->sent = true;
$this->sender->push($this->fd, $message, $this->opcode, $this->flag);
if ($this->frame) {
$this->sender->pushFrame($this->fd, $this->frame->setPayloadData($message));
} else {
$this->sender->pushFrame($this->fd, new Frame(opcode: $this->opcode, payloadData: $message));
}
}
}

View File

@ -14,8 +14,8 @@ namespace Hyperf\SocketIOServer\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeMainServerStart;
use Hyperf\Server\Event\MainCoroutineServerStart;
use Hyperf\SocketIOServer\Atomic;
use Hyperf\SocketIOServer\SocketIO;
use Swoole\Atomic;
class ServerIdListener implements ListenerInterface
{

View File

@ -11,7 +11,7 @@ declare(strict_types=1);
*/
namespace Hyperf\SocketIOServer\Room;
use Hyperf\SocketIOServer\Emitter\Flagger;
use Hyperf\Engine\WebSocket\Frame;
use Hyperf\SocketIOServer\SidProvider\SidProviderInterface;
use Hyperf\WebSocketServer\Sender;
@ -20,8 +20,6 @@ use function Hyperf\Support\make;
class MemoryAdapter implements AdapterInterface
{
use Flagger;
protected array $rooms = [];
protected array $sids = [];
@ -68,9 +66,6 @@ class MemoryAdapter implements AdapterInterface
{
$rooms = data_get($opts, 'rooms', []);
$except = data_get($opts, 'except', []);
$volatile = data_get($opts, 'flag.volatile', false);
$compress = data_get($opts, 'flag.compress', false);
$wsFlag = $this->guessFlags((bool) $compress);
$pushed = [];
if (! empty($rooms)) {
foreach ($rooms as $room) {
@ -84,12 +79,7 @@ class MemoryAdapter implements AdapterInterface
continue;
}
$fd = $this->sidProvider->getFd($sid);
$this->sender->push(
$fd,
$packet,
SWOOLE_WEBSOCKET_OPCODE_TEXT,
$wsFlag
);
$this->sender->pushFrame($fd, new Frame(payloadData: $packet));
$pushed[$fd] = true;
}
}
@ -100,7 +90,7 @@ class MemoryAdapter implements AdapterInterface
continue;
}
$fd = $this->sidProvider->getFd($sid);
$this->sender->push($fd, $packet, SWOOLE_WEBSOCKET_OPCODE_TEXT, $wsFlag);
$this->sender->pushFrame($fd, new Frame(payloadData: $packet));
}
}
}

View File

@ -16,10 +16,10 @@ use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Coordinator\Constants;
use Hyperf\Coordinator\CoordinatorManager;
use Hyperf\Coroutine\Coroutine;
use Hyperf\Engine\WebSocket\Frame;
use Hyperf\Redis\RedisFactory;
use Hyperf\Redis\RedisProxy;
use Hyperf\Server\Exception\RuntimeException;
use Hyperf\SocketIOServer\Emitter\Flagger;
use Hyperf\SocketIOServer\NamespaceInterface;
use Hyperf\SocketIOServer\SidProvider\SidProviderInterface;
use Hyperf\WebSocketServer\Sender;
@ -33,8 +33,6 @@ use function Hyperf\Support\retry;
class RedisAdapter implements AdapterInterface, EphemeralInterface
{
use Flagger;
protected string $redisPrefix = 'ws';
protected int $retryInterval = 1000;
@ -296,20 +294,13 @@ class RedisAdapter implements AdapterInterface, EphemeralInterface
private function tryPush(string $sid, string $packet, array &$pushed, array $opts): void
{
$compress = data_get($opts, 'flag.compress', false);
$wsFlag = $this->guessFlags((bool) $compress);
$except = data_get($opts, 'except', []);
$fd = $this->getFd($sid);
if (in_array($sid, $except)) {
return;
}
if ($this->isLocal($sid) && ! isset($pushed[$fd])) {
$this->sender->push(
$fd,
$packet,
SWOOLE_WEBSOCKET_OPCODE_TEXT,
$wsFlag
);
$this->sender->pushFrame($fd, new Frame(payloadData: $packet));
$pushed[$fd] = true;
$this->shouldClose($opts) && $this->close($fd);
}

View File

@ -11,7 +11,6 @@ declare(strict_types=1);
*/
namespace Hyperf\SocketIOServer;
use Hyperf\Context\ApplicationContext;
use Hyperf\SocketIOServer\Emitter\Emitter;
use Hyperf\SocketIOServer\Exception\ConnectionClosedException;
use Hyperf\SocketIOServer\Parser\Encoder;
@ -22,7 +21,6 @@ use Hyperf\SocketIOServer\SidProvider\SidProviderInterface;
use Hyperf\WebSocketServer\Context;
use Hyperf\WebSocketServer\Sender;
use Psr\Http\Message\ServerRequestInterface;
use Swoole\Server;
class Socket
{
@ -77,9 +75,7 @@ class Socket
]);
// notice client is about to disconnect
$this->sender->push($this->fd, Engine::MESSAGE . $this->encoder->encode($closePacket));
/** @var \Swoole\WebSocket\Server $server */
$server = ApplicationContext::getContainer()->get(Server::class);
$server->disconnect($this->fd);
$this->sender->disconnect($this->fd);
}
public function getNamespace(): string

View File

@ -17,7 +17,10 @@ use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Coordinator\Timer;
use Hyperf\Engine\Channel;
use Hyperf\Engine\WebSocket\Frame;
use Hyperf\Engine\WebSocket\Response;
use Hyperf\SocketIOServer\Collector\EventAnnotationCollector;
use Hyperf\SocketIOServer\Collector\SocketIORouter;
use Hyperf\SocketIOServer\Exception\RouteNotFoundException;
@ -29,11 +32,6 @@ use Hyperf\SocketIOServer\Room\EphemeralInterface;
use Hyperf\SocketIOServer\SidProvider\SidProviderInterface;
use Hyperf\WebSocketServer\Constant\Opcode;
use Hyperf\WebSocketServer\Sender;
use Swoole\Atomic;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\Timer;
use Swoole\WebSocket\Server;
use Throwable;
use function Hyperf\Support\make;
@ -87,6 +85,8 @@ class SocketIO implements OnMessageInterface, OnOpenInterface, OnCloseInterface
protected SocketIOConfig $config;
protected Timer $timer;
public function __construct(
protected StdoutLoggerInterface $stdoutLogger,
protected Sender $sender,
@ -96,6 +96,7 @@ class SocketIO implements OnMessageInterface, OnOpenInterface, OnCloseInterface
?SocketIOConfig $config = null
) {
$this->config = $config ?? ApplicationContext::getContainer()->get(SocketIOConfig::class);
$this->timer = new Timer();
}
public function __call($method, $args)
@ -103,36 +104,21 @@ class SocketIO implements OnMessageInterface, OnOpenInterface, OnCloseInterface
return $this->of('/')->{$method}(...$args);
}
/**
* @param Response|Server $server
* @param mixed $frame
*/
public function onMessage($server, $frame): void
{
$response = (new Response($server))->init($frame);
if ($frame->opcode == Opcode::PING) {
if ($server instanceof Response) {
$server->push('', Opcode::PONG);
} else {
$server->push($frame->fd, '', Opcode::PONG);
}
$response->push(new Frame(opcode: Opcode::PONG));
return;
}
if ($frame->data[0] === Engine::PING) {
$this->renewInAllNamespaces($frame->fd);
if ($server instanceof Response) {
$server->push(Engine::PONG); // sever pong
} else {
$server->push($frame->fd, Engine::PONG); // sever pong
}
$response->push(new Frame(payloadData: Engine::PONG));
return;
}
if ($frame->data[0] === Engine::CLOSE) {
if ($server instanceof Response) {
$server->close();
} else {
$server->disconnect($frame->fd);
}
$response->close();
return;
}
if ($frame->data[0] !== Engine::MESSAGE) {
@ -151,22 +137,14 @@ class SocketIO implements OnMessageInterface, OnOpenInterface, OnCloseInterface
'type' => Packet::OPEN,
'nsp' => $packet->nsp,
]);
if ($server instanceof Response) {
$server->push(Engine::MESSAGE . $this->encoder->encode($responsePacket)); // sever open
} else {
$server->push($frame->fd, Engine::MESSAGE . $this->encoder->encode($responsePacket)); // sever open
}
$response->push(new Frame(payloadData: Engine::MESSAGE . $this->encoder->encode($responsePacket)));
break;
case Packet::CLOSE: // client disconnect
if ($server instanceof Response) {
$server->close();
} else {
$server->disconnect($frame->fd);
}
$response->close();
break;
case Packet::EVENT: // client message with ack
if ($packet->id !== '') {
$packet->data[] = function ($data) use ($frame, $packet) {
$packet->data[] = function ($data) use ($packet, $response) {
$responsePacket = Packet::create([
'id' => $packet->id,
'nsp' => $packet->nsp,
@ -174,9 +152,7 @@ class SocketIO implements OnMessageInterface, OnOpenInterface, OnCloseInterface
'data' => $data,
]);
if ($this->sender->check($frame->fd)) {
$this->sender->push($frame->fd, Engine::MESSAGE . $this->encoder->encode($responsePacket));
}
$this->sender->pushFrame($response->getFd(), new Frame(payloadData: Engine::MESSAGE . $this->encoder->encode($responsePacket)));
};
}
$this->dispatch($frame->fd, $packet->nsp, ...$packet->data);
@ -192,7 +168,7 @@ class SocketIO implements OnMessageInterface, OnOpenInterface, OnCloseInterface
$this->clientCallbacks[$ackId]->push($packet->data);
}
unset($this->clientCallbacks[$ackId]);
Timer::clear($this->clientCallbackTimers[$ackId]);
$this->timer->clear($this->clientCallbackTimers[$ackId]);
}
break;
default:
@ -200,27 +176,21 @@ class SocketIO implements OnMessageInterface, OnOpenInterface, OnCloseInterface
}
}
/**
* @param Response|Server $server
* @param Request $request
*/
public function onOpen($server, $request): void
{
$response = (new Response($server))->init($request);
$data = [
'sid' => $this->sidProvider->getSid($request->fd),
'sid' => $this->sidProvider->getSid($response->getFd()),
'upgrades' => ['websocket'],
'pingInterval' => $this->config->getPingInterval(),
'pingTimeout' => $this->config->getPingTimeout(),
];
if ($server instanceof Response) {
$server->push(Engine::OPEN . json_encode($data)); // socket is open
$server->push(Engine::MESSAGE . Packet::OPEN); // server open
} else {
$server->push($request->fd, Engine::OPEN . json_encode($data)); // socket is open
$server->push($request->fd, Engine::MESSAGE . Packet::OPEN); // server open
}
$this->dispatchEventInAllNamespaces($request->fd, 'connect');
$response->push(new Frame(payloadData: Engine::OPEN . json_encode($data))); // socket is open
$response->push(new Frame(payloadData: Engine::MESSAGE . Packet::OPEN)); // server open
$this->dispatchEventInAllNamespaces($response->getFd(), 'connect');
}
public function onClose($server, int $fd, int $reactorId): void
@ -247,7 +217,7 @@ class SocketIO implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
$this->clientCallbacks[$ackId] = $channel;
// Clean up using timer to avoid memory leak.
$timerId = Timer::after($timeoutMs ?? $this->config->getClientCallbackTimeout(), function () use ($ackId) {
$timerId = $this->timer->after($timeoutMs ?? $this->config->getClientCallbackTimeout(), function () use ($ackId) {
if (! isset($this->clientCallbacks[$ackId])) {
return;
}
@ -330,7 +300,8 @@ class SocketIO implements OnMessageInterface, OnOpenInterface, OnCloseInterface
'nsp' => $nsp,
'addCallback' => function (string $ackId, Channel $channel, ?int $timeout = null) {
$this->addCallback($ackId, $channel, $timeout);
}, ]);
},
]);
} catch (Throwable $exception) {
$this->stdoutLogger->error('Socket.io ' . $exception->getMessage());
return null;

View File

@ -18,6 +18,7 @@ use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Di\Container;
use Hyperf\Di\Definition\DefinitionSource;
use Hyperf\Framework\Logger\StdoutLogger;
use Hyperf\SocketIOServer\Atomic;
use Hyperf\SocketIOServer\Parser\Encoder;
use Hyperf\SocketIOServer\Room\AdapterInterface;
use Hyperf\SocketIOServer\Room\MemoryAdapter;
@ -28,7 +29,6 @@ use Hyperf\SocketIOServer\SidProvider\SidProviderInterface;
use Hyperf\SocketIOServer\SocketIO;
use Mockery;
use PHPUnit\Framework\TestCase;
use Swoole\Atomic;
use Swoole\Timer;
/**

View File

@ -39,7 +39,7 @@ class FutureTest extends AbstractTestCase
/** @var ContainerInterface $container */
$container = ApplicationContext::getContainer();
$mock = Mockery::mock(Sender::class);
$mock->shouldReceive('push')->with(1, Mockery::any(), Mockery::any(), Mockery::any())->once();
$mock->shouldReceive('pushFrame')->with(1, Mockery::any())->once();
$container->set(Sender::class, $mock);
$future = make(Future::class, ['fd' => 1,
'event' => 'event',
@ -58,7 +58,7 @@ class FutureTest extends AbstractTestCase
/** @var ContainerInterface $container */
$container = ApplicationContext::getContainer();
$mock = Mockery::mock(Sender::class);
$mock->shouldReceive('push')->with(1, Mockery::any(), Mockery::any(), Mockery::any())->once();
$mock->shouldReceive('pushFrame')->with(1, Mockery::any())->once();
$container->set(Sender::class, $mock);
/** @var Future $future */
$future = make(Future::class, ['fd' => 1,
@ -78,7 +78,7 @@ class FutureTest extends AbstractTestCase
/** @var ContainerInterface $container */
$container = ApplicationContext::getContainer();
$mock = Mockery::mock(Sender::class);
$mock->shouldReceive('push')->with(1, Mockery::any(), Mockery::any(), Mockery::any())->once();
$mock->shouldReceive('pushFrame')->with(1, Mockery::any())->once();
$container->set(Sender::class, $mock);
/** @var Future $future */
$future = make(Future::class, ['fd' => 1,

View File

@ -13,6 +13,7 @@ namespace HyperfTest\SocketIOServer\Cases;
use Hyperf\Context\ApplicationContext;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\SocketIOServer\Atomic;
use Hyperf\SocketIOServer\BaseNamespace;
use Hyperf\SocketIOServer\Collector\SocketIORouter;
use Hyperf\SocketIOServer\Parser\Decoder;
@ -25,17 +26,12 @@ use HyperfTest\SocketIOServer\Stub\EphemeralAdapter;
use Mockery;
use PHPUnit\Framework\Attributes\CoversNothing;
use ReflectionClass;
use Swoole\Atomic;
/**
* @internal
* @coversNothing
*/
#[CoversNothing]
/**
* @internal
* @coversNothing
*/
class IONamespaceTest extends AbstractTestCase
{
protected function setUp(): void
@ -52,7 +48,7 @@ class IONamespaceTest extends AbstractTestCase
$io->getAdapter()->add('1');
$io->getAdapter()->add('2');
$io->emit('hello', 'world');
$sender->shouldHaveReceived('push')->twice();
$sender->shouldHaveReceived('pushFrame')->twice();
$this->assertTrue(true);
}
@ -82,7 +78,7 @@ class IONamespaceTest extends AbstractTestCase
$io->getAdapter()->add('1');
$io->getAdapter()->add('2');
$io->emit('hello', 'world', true);
$sender->shouldHaveReceived('push')->twice();
$sender->shouldHaveReceived('pushFrame')->twice();
$this->assertTrue(true);
}
@ -117,7 +113,7 @@ class IONamespaceTest extends AbstractTestCase
$io->getAdapter()->add('1', 'room');
$io->getAdapter()->add('2', 'room');
$io->to('room')->emit('hello', 'world', false);
$sender->shouldHaveReceived('push')->withAnyArgs()->twice();
$sender->shouldHaveReceived('pushFrame')->withAnyArgs()->twice();
$this->assertTrue(true);
}

View File

@ -46,7 +46,7 @@ class RoomAdapterTest extends AbstractTestCase
{
$sidProvider = new LocalSidProvider();
$server = Mockery::Mock(Sender::class);
$server->shouldReceive('push')->twice();
$server->shouldReceive('pushFrame')->twice();
$room = new MemoryAdapter($server, $sidProvider);
$room->add('42', 'universe', '42');
$room->add('43', 'universe', '43');
@ -92,7 +92,7 @@ class RoomAdapterTest extends AbstractTestCase
$nsp->shouldReceive('getNamespace')->andReturn('test');
$redis = $this->getRedis();
$server = Mockery::Mock(Sender::class);
$server->shouldReceive('push')->twice();
$server->shouldReceive('pushFrame')->twice();
$sidProvider = new LocalSidProvider();
$room = new RedisAdapter($redis, $server, $nsp, $sidProvider);
$room->add('42', 'universe', '42');

View File

@ -85,9 +85,9 @@ class SocketTest extends AbstractTestCase
/** @var ContainerInterface $container */
$container = ApplicationContext::getContainer();
$mock = Mockery::mock(Sender::class);
$mock->shouldNotReceive('push')->with(1, Mockery::any(), Mockery::any(), Mockery::any());
$mock->shouldReceive('push')->with(2, Mockery::any(), Mockery::any(), Mockery::any())->once();
$mock->shouldReceive('push')->with(3, Mockery::any(), Mockery::any(), Mockery::any())->once();
$mock->shouldNotReceive('pushFrame')->with(1, Mockery::any());
$mock->shouldReceive('pushFrame')->with(2, Mockery::any())->once();
$mock->shouldReceive('pushFrame')->with(3, Mockery::any())->once();
$container->set(Sender::class, $mock);
/** @var Socket $socket1 */
$socket1 = make(Socket::class, [

View File

@ -13,6 +13,8 @@ namespace Hyperf\WebSocketServer;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Engine\Contract\WebSocket\FrameInterface;
use Hyperf\Engine\WebSocket\Response as WsResponse;
use Hyperf\Server\CoroutineServer;
use Hyperf\WebSocketServer\Exception\InvalidMethodException;
use Psr\Container\ContainerInterface;
@ -21,6 +23,8 @@ use Swoole\Http\Response;
use Swoole\Server;
use Swow\Psr7\Server\ServerConnection;
use function Hyperf\Engine\swoole_get_flags_from_frame;
/**
* @method push(int $fd, $data, int $opcode = null, $finish = null)
* @method disconnect(int $fd, int $code = null, string $reason = null)
@ -56,6 +60,7 @@ class Sender
if ($method === 'disconnect') {
$method = 'close';
}
$this->responses[$fd]->{$method}(...$arguments);
$this->logger->debug("[WebSocket] Worker send to #{$fd}");
}
@ -67,6 +72,24 @@ class Sender
}
}
public function pushFrame(int $fd, FrameInterface $frame): bool
{
if ($this->isCoroutineServer) {
if (isset($this->responses[$fd])) {
return (new WsResponse($this->responses[$fd]))->init($fd)->push($frame);
}
return false;
}
if ($this->check($fd)) {
return (new WsResponse($this->getServer()))->init($fd)->push($frame);
}
$this->sendPipeMessage('push', [$fd, (string) $frame->getPayloadData(), $frame->getOpcode(), swoole_get_flags_from_frame($frame)]);
return false;
}
public function proxy(int $fd, string $method, array $arguments): bool
{
$result = $this->check($fd);