diff --git a/CHANGELOG.md b/CHANGELOG.md index 9035c0844..b0d8574ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Fixed - [#1696](https://github.com/hyperf/hyperf/pull/1696) Fixed `Context::copy` does not works when use keys. +- [#1708](https://github.com/hyperf/hyperf/pull/1708) [#1718](https://github.com/hyperf/hyperf/pull/1718) Fixed a series of issues for `hyperf/socketio-server`. ## Optimized diff --git a/src/socketio-server/src/Emitter/Emitter.php b/src/socketio-server/src/Emitter/Emitter.php index ce62bd63d..039d79edd 100644 --- a/src/socketio-server/src/Emitter/Emitter.php +++ b/src/socketio-server/src/Emitter/Emitter.php @@ -27,6 +27,8 @@ use Hyperf\WebSocketServer\Sender; */ trait Emitter { + use Flagger; + /** * @var AdapterInterface */ @@ -137,18 +139,14 @@ trait Emitter 'except' => [$this->sidProvider->getSid($this->fd)], 'rooms' => $this->to, 'flag' => [ - 'compress' => $this->realGet('compress'), - 'volatile' => $this->realGet('volatile'), - 'local' => $this->realGet('local'), + 'compress' => $this->compress, + 'volatile' => $this->volatile, + 'local' => $this->local, ], ] ); } - if ($this->realGet('compress')) { - $wsFlag = SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS; - } else { - $wsFlag = SWOOLE_WEBSOCKET_FLAG_FIN; - } + return make(Future::class, [ 'fd' => $this->fd, 'event' => $event, @@ -157,7 +155,7 @@ trait Emitter return $this->encode($i, $event, $data); }, 'opcode' => SWOOLE_WEBSOCKET_OPCODE_TEXT, - 'flag' => $wsFlag, + 'flag' => $this->guessFlags($this->compress), ]); } @@ -177,9 +175,4 @@ trait Emitter ]); return Engine::MESSAGE . $encoder->encode($packet); } - - private function realGet($flag) - { - return $this->{$flag}; - } } diff --git a/src/socketio-server/src/Emitter/Flagger.php b/src/socketio-server/src/Emitter/Flagger.php new file mode 100644 index 000000000..5ef901710 --- /dev/null +++ b/src/socketio-server/src/Emitter/Flagger.php @@ -0,0 +1,32 @@ +guessFlags((bool) $compress); $pushed = []; if (! empty($rooms)) { foreach ($rooms as $room) { diff --git a/src/socketio-server/src/Room/RedisAdapter.php b/src/socketio-server/src/Room/RedisAdapter.php index e4cf3c282..d01c71636 100644 --- a/src/socketio-server/src/Room/RedisAdapter.php +++ b/src/socketio-server/src/Room/RedisAdapter.php @@ -11,9 +11,11 @@ declare(strict_types=1); */ namespace Hyperf\SocketIOServer\Room; +use Hyperf\Contract\StdoutLoggerInterface; use Hyperf\Redis\RedisFactory; use Hyperf\Redis\RedisProxy; use Hyperf\Server\Exception\RuntimeException; +use Hyperf\SocketIOServer\Emitter\Flagger; use Hyperf\SocketIOServer\NamespaceInterface; use Hyperf\SocketIOServer\SidProvider\SidProviderInterface; use Hyperf\Utils\ApplicationContext; @@ -26,6 +28,8 @@ use Redis; class RedisAdapter implements AdapterInterface { + use Flagger; + protected $redisPrefix = 'ws'; protected $retryInterval = 1000; @@ -74,7 +78,11 @@ class RedisAdapter implements AdapterInterface public function del(string $sid, string ...$rooms) { if (count($rooms) === 0) { - $this->del($sid, ...$this->clientRooms($sid)); + $clientRooms = $this->clientRooms($sid); + if (empty($clientRooms)) { + return; + } + $this->del($sid, ...$clientRooms); $this->redis->multi(); $this->redis->del($this->getSidKey($sid)); $this->redis->sRem($this->getStatKey(), $sid); @@ -140,12 +148,21 @@ class RedisAdapter implements AdapterInterface Coroutine::create(function () { CoordinatorManager::get(Constants::ON_WORKER_START)->yield(); retry(PHP_INT_MAX, function () { - $sub = ApplicationContext::getContainer()->get(Subscriber::class); - if ($sub) { - $this->mixSubscribe($sub); - } else { - // Fallback to PhpRedis, which has a very bad blocking subscribe model. - $this->phpRedisSubscribe(); + $container = ApplicationContext::getContainer(); + try { + $sub = $container->get(Subscriber::class); + if ($sub) { + $this->mixSubscribe($sub); + } else { + // Fallback to PhpRedis, which has a very bad blocking subscribe model. + $this->phpRedisSubscribe(); + } + } catch (\Throwable $e) { + if ($container->has(StdoutLoggerInterface::class)) { + $logger = $container->get(StdoutLoggerInterface::class); + $logger->error($this->formatThrowable($e)); + } + throw $e; } }, $this->retryInterval); }); @@ -175,11 +192,8 @@ class RedisAdapter implements AdapterInterface $except = data_get($opts, 'except', []); $volatile = data_get($opts, 'flag.volatile', false); $compress = data_get($opts, 'flag.compress', false); - if ($compress) { - $wsFlag = SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS; - } else { - $wsFlag = SWOOLE_WEBSOCKET_FLAG_FIN; - } + $wsFlag = $this->guessFlags((bool) $compress); + $pushed = []; if (! empty($rooms)) { foreach ($rooms as $room) { @@ -209,7 +223,12 @@ class RedisAdapter implements AdapterInterface continue; } if ($this->isLocal($sid)) { - $this->sender->push($fd, $packet, SWOOLE_WEBSOCKET_OPCODE_TEXT, $wsFlag); + $this->sender->push( + $fd, + $packet, + SWOOLE_WEBSOCKET_OPCODE_TEXT, + $wsFlag + ); } } } @@ -263,16 +282,31 @@ class RedisAdapter implements AdapterInterface return $this->sidProvider->getFd($sid); } + private function formatThrowable(\Throwable $throwable): string + { + sprintf( + "%s:%s(%s) in %s:%s\nStack trace:\n%s", + get_class($throwable), + $throwable->getMessage(), + $throwable->getCode(), + $throwable->getFile(), + $throwable->getLine(), + $throwable->getTraceAsString() + ); + } + private function phpRedisSubscribe() { $redis = $this->redis; + /** @var string $callback */ $callback = function ($redis, $chan, $msg) { Coroutine::create(function () use ($msg) { [$packet, $opts] = unserialize($msg); $this->doBroadcast($packet, $opts); }); }; - $redis->subscribe([$this->getChannelKey()], 'callback'); + // cast to string because PHPStan asked so. + $redis->subscribe([$this->getChannelKey()], $callback); } private function mixSubscribe(Subscriber $sub) diff --git a/src/socketio-server/tests/Cases/RoomAdapterTest.php b/src/socketio-server/tests/Cases/RoomAdapterTest.php index 47a51d188..41cb2f074 100644 --- a/src/socketio-server/tests/Cases/RoomAdapterTest.php +++ b/src/socketio-server/tests/Cases/RoomAdapterTest.php @@ -94,6 +94,14 @@ class RoomAdapterTest extends AbstractTestCase $room->broadcast('', ['rooms' => ['universe'], 'flag' => ['local' => true]]); $room->cleanUp(); $this->assertNotContains('42', $room->clientRooms('42')); + + // Test empty room + try { + $room->del('non-exist'); + } catch (\Throwable $t) { + $this->assertTrue(false); + } + } private function getRedis($options = [])