mirror of
https://gitee.com/hyperf/hyperf.git
synced 2024-12-04 20:58:13 +08:00
Merge pull request #1835 from Reasno/socketio
feat: add the ability to dismiss a room
This commit is contained in:
commit
ebae64ac96
@ -13,8 +13,12 @@ namespace Hyperf\SocketIOServer;
|
||||
|
||||
use Hyperf\SocketIOServer\Collector\SocketIORouter;
|
||||
use Hyperf\SocketIOServer\Emitter\Emitter;
|
||||
use Hyperf\SocketIOServer\Parser\Encoder;
|
||||
use Hyperf\SocketIOServer\Parser\Engine;
|
||||
use Hyperf\SocketIOServer\Parser\Packet;
|
||||
use Hyperf\SocketIOServer\Room\AdapterInterface;
|
||||
use Hyperf\SocketIOServer\SidProvider\SidProviderInterface;
|
||||
use Hyperf\Utils\ApplicationContext;
|
||||
use Hyperf\WebSocketServer\Sender;
|
||||
|
||||
class BaseNamespace implements NamespaceInterface
|
||||
@ -63,6 +67,22 @@ class BaseNamespace implements NamespaceInterface
|
||||
*/
|
||||
public function getNamespace(): string
|
||||
{
|
||||
return SocketIORouter::getNamespace(static::class);
|
||||
return (string) SocketIORouter::getNamespace(static::class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Kick off a client from room, possibly remotely.
|
||||
*/
|
||||
public function dismiss(string $roomId)
|
||||
{
|
||||
$closePacket = Packet::create([
|
||||
'type' => Packet::CLOSE,
|
||||
'nsp' => $this->getNamespace(),
|
||||
]);
|
||||
$encoder = ApplicationContext::getContainer()->get(Encoder::class);
|
||||
$this->adapter->broadcast(
|
||||
Engine::MESSAGE . $encoder->encode($closePacket),
|
||||
['room' => $roomId, 'flag' => ['close' => true]]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ use Hyperf\Utils\Coroutine;
|
||||
use Hyperf\WebSocketServer\Sender;
|
||||
use Mix\Redis\Subscribe\Subscriber;
|
||||
use Redis;
|
||||
use Swoole\Server;
|
||||
|
||||
class RedisAdapter implements AdapterInterface
|
||||
{
|
||||
@ -185,47 +186,18 @@ class RedisAdapter implements AdapterInterface
|
||||
protected function doBroadcast($packet, $opts)
|
||||
{
|
||||
$rooms = data_get($opts, 'rooms', []);
|
||||
$except = data_get($opts, 'except', []);
|
||||
$volatile = data_get($opts, 'flag.volatile', false);
|
||||
$compress = data_get($opts, 'flag.compress', false);
|
||||
$wsFlag = $this->guessFlags((bool) $compress);
|
||||
|
||||
$pushed = [];
|
||||
if (! empty($rooms)) {
|
||||
foreach ($rooms as $room) {
|
||||
$sids = $this->redis->sMembers($this->getRoomKey($room));
|
||||
foreach ($sids as $sid) {
|
||||
$fd = $this->getFd($sid);
|
||||
if (in_array($sid, $except)) {
|
||||
continue;
|
||||
}
|
||||
if ($this->isLocal($sid)) {
|
||||
$this->sender->push(
|
||||
$fd,
|
||||
$packet,
|
||||
SWOOLE_WEBSOCKET_OPCODE_TEXT,
|
||||
$wsFlag
|
||||
);
|
||||
$pushed[$fd] = true;
|
||||
}
|
||||
$this->tryPush($sid, $packet, $pushed, $opts);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$sids = $this->redis->sMembers($this->getStatKey());
|
||||
|
||||
foreach ($sids as $sid) {
|
||||
$fd = $this->getFd($sid);
|
||||
if (in_array($sid, $except)) {
|
||||
continue;
|
||||
}
|
||||
if ($this->isLocal($sid)) {
|
||||
$this->sender->push(
|
||||
$fd,
|
||||
$packet,
|
||||
SWOOLE_WEBSOCKET_OPCODE_TEXT,
|
||||
$wsFlag
|
||||
);
|
||||
}
|
||||
$this->tryPush($sid, $packet, $pushed, $opts);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -278,6 +250,27 @@ class RedisAdapter implements AdapterInterface
|
||||
return $this->sidProvider->getFd($sid);
|
||||
}
|
||||
|
||||
private function tryPush(string $sid, string $packet, array &$pushed, array $opts): void
|
||||
{
|
||||
$compress = data_get($opts, 'flag.compress', false);
|
||||
$wsFlag = $this->guessFlags((bool) $compress);
|
||||
$except = data_get($opts, 'except', []);
|
||||
$fd = $this->getFd($sid);
|
||||
if (in_array($sid, $except)) {
|
||||
return;
|
||||
}
|
||||
if ($this->isLocal($sid) && ! isset($pushed[$fd])) {
|
||||
$this->sender->push(
|
||||
$fd,
|
||||
$packet,
|
||||
SWOOLE_WEBSOCKET_OPCODE_TEXT,
|
||||
$wsFlag
|
||||
);
|
||||
$pushed[$fd] = true;
|
||||
$this->shouldClose($opts) && $this->close($fd);
|
||||
}
|
||||
}
|
||||
|
||||
private function formatThrowable(\Throwable $throwable): string
|
||||
{
|
||||
sprintf(
|
||||
@ -313,7 +306,7 @@ class RedisAdapter implements AdapterInterface
|
||||
return;
|
||||
}
|
||||
Coroutine::create(function () use ($sub) {
|
||||
CoordinatorManager::get(Constants::ON_WORKER_EXIT)->yield();
|
||||
CoordinatorManager::until(Constants::WORKER_EXIT)->yield();
|
||||
$sub->close();
|
||||
});
|
||||
while (true) {
|
||||
@ -331,4 +324,15 @@ class RedisAdapter implements AdapterInterface
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private function shouldClose(array $opts)
|
||||
{
|
||||
return data_get($opts, 'flag.close', false);
|
||||
}
|
||||
|
||||
private function close(int $fd)
|
||||
{
|
||||
// Sender should be able to disconnect fd in the future. For now we have to use server.
|
||||
ApplicationContext::getContainer()->get(Server::class)->disconnect($fd);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user