mirror of
https://gitee.com/hyperf/hyperf.git
synced 2024-11-29 18:27:44 +08:00
Added Hyperf\Utils\Channel\ChannelManager
which used to manage channels.
This commit is contained in:
parent
f067d611d4
commit
f1825a9f33
@ -18,3 +18,5 @@
|
||||
## Added
|
||||
|
||||
- [#3589](https://github.com/hyperf/hyperf/pull/3589) Added DAG component.
|
||||
- [#3606](https://github.com/hyperf/hyperf/pull/3606) Added RPN component.
|
||||
- [#3629](https://github.com/hyperf/hyperf/pull/3629) Added `Hyperf\Utils\Channel\ChannelManager` which used to manage channels.
|
||||
|
62
src/utils/src/Channel/ChannelManager.php
Normal file
62
src/utils/src/Channel/ChannelManager.php
Normal file
@ -0,0 +1,62 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://hyperf.wiki
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
namespace Hyperf\Utils\Channel;
|
||||
|
||||
use Hyperf\Engine\Channel;
|
||||
|
||||
class ChannelManager
|
||||
{
|
||||
/**
|
||||
* @var Channel[]
|
||||
*/
|
||||
protected $channels = [];
|
||||
|
||||
public function get(int $id, bool $initialize = false): ?Channel
|
||||
{
|
||||
if (isset($this->channels[$id])) {
|
||||
return $this->channels[$id];
|
||||
}
|
||||
|
||||
if ($initialize) {
|
||||
return $this->channels[$id] = $this->make(1);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public function make(int $limit): Channel
|
||||
{
|
||||
return new Channel($limit);
|
||||
}
|
||||
|
||||
public function close(int $id): void
|
||||
{
|
||||
if ($channel = $this->channels[$id] ?? null) {
|
||||
$channel->close();
|
||||
}
|
||||
|
||||
unset($this->channels[$id]);
|
||||
}
|
||||
|
||||
public function getChannels(): array
|
||||
{
|
||||
return $this->channels;
|
||||
}
|
||||
|
||||
public function flush(): void
|
||||
{
|
||||
$channels = $this->getChannels();
|
||||
foreach ($channels as $id => $channel) {
|
||||
$this->close($id);
|
||||
}
|
||||
}
|
||||
}
|
54
src/utils/tests/Channel/ChannelManagerTest.php
Normal file
54
src/utils/tests/Channel/ChannelManagerTest.php
Normal file
@ -0,0 +1,54 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://hyperf.wiki
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
namespace HyperfTest\Utils\Channel;
|
||||
|
||||
use Hyperf\Engine\Channel;
|
||||
use Hyperf\Utils\Channel\ChannelManager;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
/**
|
||||
* @internal
|
||||
* @coversNothing
|
||||
*/
|
||||
class ChannelManagerTest extends TestCase
|
||||
{
|
||||
public function testChannelManager()
|
||||
{
|
||||
$manager = new ChannelManager();
|
||||
$chan = $manager->get(1, true);
|
||||
$this->assertInstanceOf(Channel::class, $chan);
|
||||
$chan = $manager->get(1);
|
||||
$this->assertInstanceOf(Channel::class, $chan);
|
||||
go(function () use ($chan) {
|
||||
usleep(10 * 1000);
|
||||
$chan->push('Hello World.');
|
||||
});
|
||||
|
||||
$this->assertSame('Hello World.', $chan->pop());
|
||||
$manager->close(1);
|
||||
$this->assertTrue($chan->isClosing());
|
||||
$this->assertNull($manager->get(1));
|
||||
}
|
||||
|
||||
public function testChannelFlush()
|
||||
{
|
||||
$manager = new ChannelManager();
|
||||
$manager->get(1, true);
|
||||
$manager->get(2, true);
|
||||
$manager->get(4, true);
|
||||
$manager->get(5, true);
|
||||
|
||||
$this->assertSame(4, count($manager->getChannels()));
|
||||
$manager->flush();
|
||||
$this->assertSame(0, count($manager->getChannels()));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user