add the ability to dismiss a room.

This commit is contained in:
Reasno 2020-05-29 16:01:29 +08:00
parent fd3cc3c5ae
commit 9ec5cb6302
2 changed files with 57 additions and 33 deletions

View File

@ -13,8 +13,12 @@ namespace Hyperf\SocketIOServer;
use Hyperf\SocketIOServer\Collector\SocketIORouter;
use Hyperf\SocketIOServer\Emitter\Emitter;
use Hyperf\SocketIOServer\Parser\Encoder;
use Hyperf\SocketIOServer\Parser\Engine;
use Hyperf\SocketIOServer\Parser\Packet;
use Hyperf\SocketIOServer\Room\AdapterInterface;
use Hyperf\SocketIOServer\SidProvider\SidProviderInterface;
use Hyperf\Utils\ApplicationContext;
use Hyperf\WebSocketServer\Sender;
class BaseNamespace implements NamespaceInterface
@ -63,6 +67,22 @@ class BaseNamespace implements NamespaceInterface
*/
public function getNamespace(): string
{
return SocketIORouter::getNamespace(static::class);
return (string) SocketIORouter::getNamespace(static::class);
}
/**
* Kick off a client from room, possibly remotely.
*/
public function dismiss(string $roomId)
{
$closePacket = Packet::create([
'type' => Packet::CLOSE,
'nsp' => $this->getNamespace(),
]);
$encoder = ApplicationContext::getContainer()->get(Encoder::class);
$this->adapter->broadcast(
Engine::MESSAGE . $encoder->encode($closePacket),
['room' => $roomId, 'flag' => ['close' => true]]
);
}
}

View File

@ -25,6 +25,7 @@ use Hyperf\Utils\Coroutine;
use Hyperf\WebSocketServer\Sender;
use Mix\Redis\Subscribe\Subscriber;
use Redis;
use Swoole\Server;
class RedisAdapter implements AdapterInterface
{
@ -185,47 +186,18 @@ class RedisAdapter implements AdapterInterface
protected function doBroadcast($packet, $opts)
{
$rooms = data_get($opts, 'rooms', []);
$except = data_get($opts, 'except', []);
$volatile = data_get($opts, 'flag.volatile', false);
$compress = data_get($opts, 'flag.compress', false);
$wsFlag = $this->guessFlags((bool) $compress);
$pushed = [];
if (! empty($rooms)) {
foreach ($rooms as $room) {
$sids = $this->redis->sMembers($this->getRoomKey($room));
foreach ($sids as $sid) {
$fd = $this->getFd($sid);
if (in_array($sid, $except)) {
continue;
}
if ($this->isLocal($sid)) {
$this->sender->push(
$fd,
$packet,
SWOOLE_WEBSOCKET_OPCODE_TEXT,
$wsFlag
);
$pushed[$fd] = true;
}
$this->tryPush($sid, $packet, $pushed, $opts);
}
}
} else {
$sids = $this->redis->sMembers($this->getStatKey());
foreach ($sids as $sid) {
$fd = $this->getFd($sid);
if (in_array($sid, $except)) {
continue;
}
if ($this->isLocal($sid)) {
$this->sender->push(
$fd,
$packet,
SWOOLE_WEBSOCKET_OPCODE_TEXT,
$wsFlag
);
}
$this->tryPush($sid, $packet, $pushed, $opts);
}
}
}
@ -278,6 +250,27 @@ class RedisAdapter implements AdapterInterface
return $this->sidProvider->getFd($sid);
}
private function tryPush(string $sid, string $packet, array &$pushed, array $opts): void
{
$compress = data_get($opts, 'flag.compress', false);
$wsFlag = $this->guessFlags((bool) $compress);
$except = data_get($opts, 'except', []);
$fd = $this->getFd($sid);
if (in_array($sid, $except)) {
return;
}
if ($this->isLocal($sid) && ! isset($pushed[$fd])) {
$this->sender->push(
$fd,
$packet,
SWOOLE_WEBSOCKET_OPCODE_TEXT,
$wsFlag
);
$pushed[$fd] = true;
$this->shouldClose($opts) && $this->close($fd);
}
}
private function formatThrowable(\Throwable $throwable): string
{
sprintf(
@ -313,7 +306,7 @@ class RedisAdapter implements AdapterInterface
return;
}
Coroutine::create(function () use ($sub) {
CoordinatorManager::get(Constants::ON_WORKER_EXIT)->yield();
CoordinatorManager::until(Constants::WORKER_EXIT)->yield();
$sub->close();
});
while (true) {
@ -331,4 +324,15 @@ class RedisAdapter implements AdapterInterface
});
}
}
private function shouldClose(array $opts)
{
return data_get($opts, 'flag.close', false);
}
private function close(int $fd)
{
// Sender should be able to disconnect fd in the future. For now we have to use server.
ApplicationContext::getContainer()->get(Server::class)->disconnect($fd);
}
}