Merge pull request #1708 from Reasno/socketio

fix a variety of issues found in socketio-server
This commit is contained in:
谷溪 2020-05-12 16:58:56 +08:00 committed by GitHub
commit 811f3b015e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 101 additions and 35 deletions

View File

@ -3,6 +3,7 @@
## Fixed ## Fixed
- [#1696](https://github.com/hyperf/hyperf/pull/1696) Fixed `Context::copy` does not works when use keys. - [#1696](https://github.com/hyperf/hyperf/pull/1696) Fixed `Context::copy` does not works when use keys.
- [#1708](https://github.com/hyperf/hyperf/pull/1708) [#1718](https://github.com/hyperf/hyperf/pull/1718) Fixed a series of issues for `hyperf/socketio-server`.
## Optimized ## Optimized

View File

@ -27,6 +27,8 @@ use Hyperf\WebSocketServer\Sender;
*/ */
trait Emitter trait Emitter
{ {
use Flagger;
/** /**
* @var AdapterInterface * @var AdapterInterface
*/ */
@ -137,18 +139,14 @@ trait Emitter
'except' => [$this->sidProvider->getSid($this->fd)], 'except' => [$this->sidProvider->getSid($this->fd)],
'rooms' => $this->to, 'rooms' => $this->to,
'flag' => [ 'flag' => [
'compress' => $this->realGet('compress'), 'compress' => $this->compress,
'volatile' => $this->realGet('volatile'), 'volatile' => $this->volatile,
'local' => $this->realGet('local'), 'local' => $this->local,
], ],
] ]
); );
} }
if ($this->realGet('compress')) {
$wsFlag = SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS;
} else {
$wsFlag = SWOOLE_WEBSOCKET_FLAG_FIN;
}
return make(Future::class, [ return make(Future::class, [
'fd' => $this->fd, 'fd' => $this->fd,
'event' => $event, 'event' => $event,
@ -157,7 +155,7 @@ trait Emitter
return $this->encode($i, $event, $data); return $this->encode($i, $event, $data);
}, },
'opcode' => SWOOLE_WEBSOCKET_OPCODE_TEXT, 'opcode' => SWOOLE_WEBSOCKET_OPCODE_TEXT,
'flag' => $wsFlag, 'flag' => $this->guessFlags($this->compress),
]); ]);
} }
@ -177,9 +175,4 @@ trait Emitter
]); ]);
return Engine::MESSAGE . $encoder->encode($packet); return Engine::MESSAGE . $encoder->encode($packet);
} }
private function realGet($flag)
{
return $this->{$flag};
}
} }

View File

@ -0,0 +1,32 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\SocketIOServer\Emitter;
trait Flagger
{
/**
* @return int | bool flags
*/
protected function guessFlags(bool $compress)
{
// older swoole version
if (! defined('SWOOLE_WEBSOCKET_FLAG_FIN')) {
return true;
}
if ($compress) {
return SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS;
}
return SWOOLE_WEBSOCKET_FLAG_FIN;
}
}

View File

@ -33,7 +33,7 @@ class Future
private $data; private $data;
/** /**
* @var int * @var bool|int
*/ */
private $flag; private $flag;

View File

@ -11,11 +11,14 @@ declare(strict_types=1);
*/ */
namespace Hyperf\SocketIOServer\Room; namespace Hyperf\SocketIOServer\Room;
use Hyperf\SocketIOServer\Emitter\Flagger;
use Hyperf\SocketIOServer\SidProvider\SidProviderInterface; use Hyperf\SocketIOServer\SidProvider\SidProviderInterface;
use Hyperf\WebSocketServer\Sender; use Hyperf\WebSocketServer\Sender;
class MemoryAdapter implements AdapterInterface class MemoryAdapter implements AdapterInterface
{ {
use Flagger;
protected $rooms = []; protected $rooms = [];
protected $sids = []; protected $sids = [];
@ -72,12 +75,7 @@ class MemoryAdapter implements AdapterInterface
$except = data_get($opts, 'except', []); $except = data_get($opts, 'except', []);
$volatile = data_get($opts, 'flag.volatile', false); $volatile = data_get($opts, 'flag.volatile', false);
$compress = data_get($opts, 'flag.compress', false); $compress = data_get($opts, 'flag.compress', false);
if ($compress) { $wsFlag = $this->guessFlags((bool) $compress);
$wsFlag = SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS;
} else {
$wsFlag = SWOOLE_WEBSOCKET_FLAG_FIN;
}
$pushed = []; $pushed = [];
if (! empty($rooms)) { if (! empty($rooms)) {
foreach ($rooms as $room) { foreach ($rooms as $room) {

View File

@ -11,9 +11,11 @@ declare(strict_types=1);
*/ */
namespace Hyperf\SocketIOServer\Room; namespace Hyperf\SocketIOServer\Room;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Redis\RedisFactory; use Hyperf\Redis\RedisFactory;
use Hyperf\Redis\RedisProxy; use Hyperf\Redis\RedisProxy;
use Hyperf\Server\Exception\RuntimeException; use Hyperf\Server\Exception\RuntimeException;
use Hyperf\SocketIOServer\Emitter\Flagger;
use Hyperf\SocketIOServer\NamespaceInterface; use Hyperf\SocketIOServer\NamespaceInterface;
use Hyperf\SocketIOServer\SidProvider\SidProviderInterface; use Hyperf\SocketIOServer\SidProvider\SidProviderInterface;
use Hyperf\Utils\ApplicationContext; use Hyperf\Utils\ApplicationContext;
@ -26,6 +28,8 @@ use Redis;
class RedisAdapter implements AdapterInterface class RedisAdapter implements AdapterInterface
{ {
use Flagger;
protected $redisPrefix = 'ws'; protected $redisPrefix = 'ws';
protected $retryInterval = 1000; protected $retryInterval = 1000;
@ -74,7 +78,11 @@ class RedisAdapter implements AdapterInterface
public function del(string $sid, string ...$rooms) public function del(string $sid, string ...$rooms)
{ {
if (count($rooms) === 0) { if (count($rooms) === 0) {
$this->del($sid, ...$this->clientRooms($sid)); $clientRooms = $this->clientRooms($sid);
if (empty($clientRooms)) {
return;
}
$this->del($sid, ...$clientRooms);
$this->redis->multi(); $this->redis->multi();
$this->redis->del($this->getSidKey($sid)); $this->redis->del($this->getSidKey($sid));
$this->redis->sRem($this->getStatKey(), $sid); $this->redis->sRem($this->getStatKey(), $sid);
@ -140,12 +148,21 @@ class RedisAdapter implements AdapterInterface
Coroutine::create(function () { Coroutine::create(function () {
CoordinatorManager::get(Constants::ON_WORKER_START)->yield(); CoordinatorManager::get(Constants::ON_WORKER_START)->yield();
retry(PHP_INT_MAX, function () { retry(PHP_INT_MAX, function () {
$sub = ApplicationContext::getContainer()->get(Subscriber::class); $container = ApplicationContext::getContainer();
if ($sub) { try {
$this->mixSubscribe($sub); $sub = $container->get(Subscriber::class);
} else { if ($sub) {
// Fallback to PhpRedis, which has a very bad blocking subscribe model. $this->mixSubscribe($sub);
$this->phpRedisSubscribe(); } else {
// Fallback to PhpRedis, which has a very bad blocking subscribe model.
$this->phpRedisSubscribe();
}
} catch (\Throwable $e) {
if ($container->has(StdoutLoggerInterface::class)) {
$logger = $container->get(StdoutLoggerInterface::class);
$logger->error($this->formatThrowable($e));
}
throw $e;
} }
}, $this->retryInterval); }, $this->retryInterval);
}); });
@ -175,11 +192,8 @@ class RedisAdapter implements AdapterInterface
$except = data_get($opts, 'except', []); $except = data_get($opts, 'except', []);
$volatile = data_get($opts, 'flag.volatile', false); $volatile = data_get($opts, 'flag.volatile', false);
$compress = data_get($opts, 'flag.compress', false); $compress = data_get($opts, 'flag.compress', false);
if ($compress) { $wsFlag = $this->guessFlags((bool) $compress);
$wsFlag = SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS;
} else {
$wsFlag = SWOOLE_WEBSOCKET_FLAG_FIN;
}
$pushed = []; $pushed = [];
if (! empty($rooms)) { if (! empty($rooms)) {
foreach ($rooms as $room) { foreach ($rooms as $room) {
@ -209,7 +223,12 @@ class RedisAdapter implements AdapterInterface
continue; continue;
} }
if ($this->isLocal($sid)) { if ($this->isLocal($sid)) {
$this->sender->push($fd, $packet, SWOOLE_WEBSOCKET_OPCODE_TEXT, $wsFlag); $this->sender->push(
$fd,
$packet,
SWOOLE_WEBSOCKET_OPCODE_TEXT,
$wsFlag
);
} }
} }
} }
@ -263,16 +282,31 @@ class RedisAdapter implements AdapterInterface
return $this->sidProvider->getFd($sid); return $this->sidProvider->getFd($sid);
} }
private function formatThrowable(\Throwable $throwable): string
{
sprintf(
"%s:%s(%s) in %s:%s\nStack trace:\n%s",
get_class($throwable),
$throwable->getMessage(),
$throwable->getCode(),
$throwable->getFile(),
$throwable->getLine(),
$throwable->getTraceAsString()
);
}
private function phpRedisSubscribe() private function phpRedisSubscribe()
{ {
$redis = $this->redis; $redis = $this->redis;
/** @var string $callback */
$callback = function ($redis, $chan, $msg) { $callback = function ($redis, $chan, $msg) {
Coroutine::create(function () use ($msg) { Coroutine::create(function () use ($msg) {
[$packet, $opts] = unserialize($msg); [$packet, $opts] = unserialize($msg);
$this->doBroadcast($packet, $opts); $this->doBroadcast($packet, $opts);
}); });
}; };
$redis->subscribe([$this->getChannelKey()], 'callback'); // cast to string because PHPStan asked so.
$redis->subscribe([$this->getChannelKey()], $callback);
} }
private function mixSubscribe(Subscriber $sub) private function mixSubscribe(Subscriber $sub)

View File

@ -94,6 +94,14 @@ class RoomAdapterTest extends AbstractTestCase
$room->broadcast('', ['rooms' => ['universe'], 'flag' => ['local' => true]]); $room->broadcast('', ['rooms' => ['universe'], 'flag' => ['local' => true]]);
$room->cleanUp(); $room->cleanUp();
$this->assertNotContains('42', $room->clientRooms('42')); $this->assertNotContains('42', $room->clientRooms('42'));
// Test empty room
try {
$room->del('non-exist');
} catch (\Throwable $t) {
$this->assertTrue(false);
}
} }
private function getRedis($options = []) private function getRedis($options = [])