This commit is contained in:
张城铭 2019-03-11 09:29:04 +08:00
commit 165b57c652
17 changed files with 174 additions and 238 deletions

View File

@ -13,7 +13,6 @@ declare(strict_types=1);
namespace Hyperf\Amqp\Annotation;
use Hyperf\Di\Annotation\AbstractAnnotation;
use Hyperf\Di\Annotation\AnnotationCollector;
/**
* @Annotation

View File

@ -15,7 +15,6 @@ namespace Hyperf\Amqp\Pool;
use Hyperf\Pool\Pool;
use Hyperf\Utils\Arr;
use Hyperf\Amqp\Connection;
use Hyperf\Pool\PoolOption;
use InvalidArgumentException;
use Hyperf\Contract\ConfigInterface;
use Psr\Container\ContainerInterface;
@ -37,8 +36,9 @@ class AmqpConnectionPool extends Pool
}
$this->config = $config->get($key);
$options = Arr::get($this->config, 'pool', []);
parent::__construct($container);
parent::__construct($container, $options);
}
public function getName(): string
@ -51,24 +51,13 @@ class AmqpConnectionPool extends Pool
parent::release($connection);
}
protected function initOption(): void
{
if ($poolOptions = Arr::get($this->config, 'pool')) {
$option = new PoolOption();
$option->setMinConnections($poolOptions['min_connections'] ?? 1);
$option->setMaxConnections($poolOptions['max_connections'] ?? 10);
$option->setConnectTimeout($poolOptions['connect_timeout'] ?? 10.0);
$option->setWaitTimeout($poolOptions['wait_timeout'] ?? 3.0);
$option->setHeartbeat($poolOptions['heartbeat'] ?? -1);
$this->option = $option;
} else {
parent::initOption();
}
}
protected function createConnection(): ConnectionInterface
{
return new Connection($this->container, $this, $this->config);
}
protected function getConnectionId(): string
{
return static::class . '.' . $this->getName();
}
}

View File

@ -47,11 +47,6 @@ class Connection extends BaseConnection implements ConnectionInterface, DbConnec
*/
protected $config;
/**
* @var Context
*/
protected $context;
/**
* @var float
*/
@ -64,7 +59,6 @@ class Connection extends BaseConnection implements ConnectionInterface, DbConnec
parent::__construct($container, $pool);
$this->factory = $container->get(ConnectionFactory::class);
$this->config = $config;
$this->context = $container->get(Context::class);
$this->reconnect();
}

View File

@ -54,19 +54,9 @@ class ConnectionResolver implements ConnectionResolverInterface
$name = $this->getDefaultConnection();
}
$context = $this->container->get(Context::class);
$connection = $context->connection($name);
if ($connection) {
return $connection->getConnection();
}
$pool = $this->factory->getPool($name);
$connection = $pool->get();
$context->set($name, $connection);
return $connection->getConnection();
return $pool->get()->getConnection();
}
/**

View File

@ -1,74 +0,0 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://hyperf.org
* @document https://wiki.hyperf.org
* @contact group@hyperf.org
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DbConnection;
use Psr\Container\ContainerInterface;
use Hyperf\Contract\ConnectionInterface;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Utils\Context as RequestContext;
class Context
{
/**
* @var ContainerInterface
*/
protected $container;
/**
* @var StdoutLoggerInterface
*/
protected $logger;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
$this->logger = $container->get(StdoutLoggerInterface::class);
}
/**
* Get a connection from request context.
*/
public function connection(string $name): ?ConnectionInterface
{
$connections = [];
if (RequestContext::has('databases')) {
$connections = RequestContext::get('databases');
}
if (isset($connections[$name]) && $connections[$name] instanceof ConnectionInterface) {
return $connections[$name];
}
return null;
}
/**
* @return ConnectionInterface[]
*/
public function connections(): array
{
$connections = [];
if (RequestContext::has('databases')) {
$connections = RequestContext::get('databases');
}
return $connections;
}
public function set($name, ConnectionInterface $connection): void
{
$connections = $this->connections();
$connections[$name] = $connection;
RequestContext::set('databases', $connections);
}
}

View File

@ -14,7 +14,7 @@ namespace Hyperf\DbConnection\Pool;
use Hyperf\Pool\Pool;
use Hyperf\Utils\Arr;
use Hyperf\Pool\PoolOption;
use Hyperf\Pool\Context;
use Hyperf\DbConnection\Connection;
use Hyperf\Contract\ConfigInterface;
use Psr\Container\ContainerInterface;
@ -36,8 +36,11 @@ class DbPool extends Pool
}
$this->config = $config->get($key);
$options = Arr::get($this->config, 'pool', []);
parent::__construct($container);
parent::__construct($container, $options);
$this->context = make(Context::class, ['name' => $this->name]);
}
public function getName(): string
@ -45,24 +48,13 @@ class DbPool extends Pool
return $this->name;
}
protected function initOption(): void
{
if ($poolOptions = Arr::get($this->config, 'pool')) {
$option = new PoolOption();
$option->setMinConnections($poolOptions['min_connections'] ?? 1);
$option->setMaxConnections($poolOptions['max_connections'] ?? 10);
$option->setConnectTimeout($poolOptions['connect_timeout'] ?? 10.0);
$option->setWaitTimeout($poolOptions['wait_timeout'] ?? 3.0);
$option->setHeartbeat($poolOptions['heartbeat'] ?? -1);
$this->option = $option;
} else {
parent::initOption();
}
}
protected function createConnection(): ConnectionInterface
{
return new Connection($this->container, $this, $this->config);
}
protected function getConnectionId(): string
{
return static::class . '.' . $this->getName();
}
}

View File

@ -34,17 +34,12 @@ class Inject extends AbstractAnnotation
public function __construct($value = null)
{
parent::__construct($value);
$this->docReader = new PhpDocReader();
$this->docReader = make(PhpDocReader::class);
}
/**
* {@inheritdoc}
*/
public function collectProperty(string $className, ?string $target): void
{
if ($this->value !== null) {
$this->value = $this->docReader->getPropertyClass(ReflectionManager::reflectClass($className)->getProperty($target));
AnnotationCollector::collectProperty($className, $target, static::class, $this);
}
$this->value = $this->docReader->getPropertyClass(ReflectionManager::reflectClass($className)->getProperty($target));
AnnotationCollector::collectProperty($className, $target, static::class, $this);
}
}

View File

@ -12,7 +12,9 @@ declare(strict_types=1);
namespace Hyperf\ModelCache;
use Hyperf\Database\Model\Model;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Database\Model\Collection;
trait Cacheable
{
@ -21,7 +23,7 @@ trait Cacheable
* @param mixed $id
* @return null|self
*/
public static function findFromCache($id)
public static function findFromCache($id): ?Model
{
$container = ApplicationContext::getContainer();
$manager = $container->get(Manager::class);
@ -34,7 +36,7 @@ trait Cacheable
* @param mixed $ids
* @return \Hyperf\Database\Model\Collection
*/
public static function findManyFromCache($ids)
public static function findManyFromCache($ids): Collection
{
$container = ApplicationContext::getContainer();
$manager = $container->get(Manager::class);
@ -46,7 +48,7 @@ trait Cacheable
* Delete model from cache.
* @return bool
*/
public function deleteCache()
public function deleteCache(): bool
{
$manager = $this->getContainer()->get(Manager::class);

View File

@ -0,0 +1,25 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://hyperf.org
* @document https://wiki.hyperf.org
* @contact group@hyperf.org
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\ModelCache;
use Hyperf\Database\Model\Model;
use Hyperf\Database\Model\Collection;
interface CacheableInterface
{
public static function findFromCache($id): ?Model;
public static function findManyFromCache($ids): Collection;
public function deleteCache(): bool;
}

View File

@ -18,6 +18,7 @@ use Hyperf\Database\Model\Events\Saved;
use Hyperf\Database\Model\Events\Created;
use Hyperf\Database\Model\Events\Deleted;
use Hyperf\Database\Model\Events\Updated;
use Hyperf\ModelCache\CacheableInterface;
use Hyperf\Event\Contract\ListenerInterface;
/**
@ -39,7 +40,7 @@ class DeleteCacheListener implements ListenerInterface
{
if ($event instanceof Event) {
$model = $event->getModel();
if (method_exists($model, 'deleteCache')) {
if ($model instanceof CacheableInterface) {
$model->deleteCache();
}
}

View File

@ -41,8 +41,8 @@ abstract class Connection implements ConnectionInterface
public function release(): void
{
if (! $this->release) {
$this->pool->release($this);
$this->release = true;
$this->pool->release($this);
}
}

View File

@ -1,66 +0,0 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://hyperf.org
* @document https://wiki.hyperf.org
* @contact group@hyperf.org
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Pool;
use Swoole\Coroutine\Channel;
use Hyperf\Pool\Exception\InvalidArgumentException;
abstract class ConnectionPool
{
/**
* @var array
*/
protected static $container = [];
/**
* @var array
*/
protected static $options = [];
abstract protected static function createConnection(): ConnectionInterface;
protected static function getConnection(string $key, int $timeout)
{
return static::getPool($key)->pop($timeout);
}
protected static function releaseConnection(string $key, $connection)
{
return static::getPool($key)->push($connection);
}
protected static function initPool(string $key, PoolOption $option)
{
if (! $option->getMaxConnections() > 0) {
throw new InvalidArgumentException('Missing max connections of option.');
}
$channel = new Channel($option->getMaxConnections());
if ($option->getMinConnections() > 0) {
for ($i = 0; $i < $option->getMinConnections(); ++$i) {
$channel->push(static::createConnection());
}
}
static::$container[$key] = $channel;
static::$options[$key] = $option;
}
protected static function getPool(string $key): Channel
{
return static::$container[$key];
}
protected static function removePool(string $key)
{
unset(static::$container[$key]);
}
}

60
src/pool/src/Context.php Normal file
View File

@ -0,0 +1,60 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://hyperf.org
* @document https://wiki.hyperf.org
* @contact group@hyperf.org
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Pool;
use Psr\Container\ContainerInterface;
use Hyperf\Contract\ConnectionInterface;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Utils\Context as RequestContext;
class Context
{
/**
* @var ContainerInterface
*/
protected $container;
/**
* @var StdoutLoggerInterface
*/
protected $logger;
/**
* @var string
*/
protected $name;
public function __construct(ContainerInterface $container, string $name)
{
$this->container = $container;
$this->logger = $container->get(StdoutLoggerInterface::class);
$this->name = $name;
}
/**
* Get a connection from request context.
*/
public function connection(): ?ConnectionInterface
{
if (RequestContext::has($this->name)) {
return RequestContext::get($this->name);
}
return null;
}
public function set(ConnectionInterface $connection): void
{
RequestContext::set($this->name, $connection);
}
}

View File

@ -32,33 +32,44 @@ abstract class Pool implements PoolInterface
*/
protected $container;
/**
* @var string
*/
protected $optionName = PoolOption::class;
/**
* @var PoolOptionInterface
*/
protected $option;
/**
* @var Context
*/
protected $context;
/**
* @var int
*/
protected $currentConnections = 0;
public function __construct(ContainerInterface $container)
public function __construct(ContainerInterface $container, array $config = [])
{
$this->container = $container;
$this->initOption();
$this->initOption($config);
$this->channel = new Channel($this->option->getMaxConnections());
$this->channel = make(Channel::class, ['size' => $this->option->getMaxConnections()]);
}
public function get(): ConnectionInterface
{
if ($this->context instanceof Context) {
$connection = $this->context->connection();
if ($connection) {
return $connection;
}
}
$connection = $this->getConnection();
if ($this->context instanceof Context) {
$this->context->set($connection);
}
if (Coroutine::inCoroutine()) {
// Release the connecion before the current coroutine end.
defer(function () use ($connection) {
@ -95,9 +106,15 @@ abstract class Pool implements PoolInterface
return $this->channel->length();
}
protected function initOption(): void
protected function initOption(array $options = []): void
{
$this->option = $this->container->get($this->optionName);
$this->option = make(PoolOption::class, [
'minConnections' => $options['min_connections'] ?? 1,
'maxConnections' => $options['max_connections'] ?? 10,
'connectTimeout' => $options['connect_timeout'] ?? 10.0,
'waitTimeout' => $options['wait_timeout'] ?? 3.0,
'heartbeat' => $options['heartbeat'] ?? -1,
]);
}
abstract protected function createConnection(): ConnectionInterface;

View File

@ -58,6 +58,15 @@ class PoolOption implements PoolOptionInterface
*/
private $heartbeat = -1;
public function __construct(int $minConnections, int $maxConnections, float $connectTimeout, float $waitTimeout, float $heartbeat)
{
$this->minConnections = $minConnections;
$this->maxConnections = $maxConnections;
$this->connectTimeout = $connectTimeout;
$this->waitTimeout = $waitTimeout;
$this->heartbeat = $heartbeat;
}
public function getMaxConnections(): int
{
return $this->maxConnections;

View File

@ -14,7 +14,7 @@ namespace Hyperf\Redis\Pool;
use Hyperf\Pool\Pool;
use Hyperf\Utils\Arr;
use Hyperf\Pool\PoolOption;
use Hyperf\Pool\Context;
use Hyperf\Redis\RedisConnection;
use Hyperf\Contract\ConfigInterface;
use Psr\Container\ContainerInterface;
@ -36,27 +36,28 @@ class RedisPool extends Pool
}
$this->config = $config->get($key);
parent::__construct($container);
$options = Arr::get($this->config, 'pool', []);
parent::__construct($container, $options);
$this->context = make(Context::class, ['name' => $this->getConnectionId()]);
}
protected function initOption(): void
/**
* @return string
*/
public function getName(): string
{
if ($poolOptions = Arr::get($this->config, 'pool')) {
$option = new PoolOption();
$option->setMinConnections($poolOptions['min_connections'] ?? 1);
$option->setMaxConnections($poolOptions['max_connections'] ?? 10);
$option->setConnectTimeout($poolOptions['connect_timeout'] ?? 10.0);
$option->setWaitTimeout($poolOptions['wait_timeout'] ?? 3.0);
$option->setHeartbeat($poolOptions['heartbeat'] ?? -1);
$this->option = $option;
} else {
parent::initOption();
}
return $this->name;
}
protected function createConnection(): ConnectionInterface
{
return new RedisConnection($this->container, $this, $this->config);
}
protected function getConnectionId(): string
{
return static::class . '.' . $this->getName();
}
}

View File

@ -22,23 +22,25 @@ class Redis
*/
protected $container;
/**
* @var PoolFactory
*/
protected $factory;
protected $name = 'default';
public function __construct(ContainerInterface $container)
{
$this->container = $container;
$this->factory = $container->get(PoolFactory::class);
}
public function __call($name, $arguments)
{
$factory = $this->container->get(PoolFactory::class);
$pool = $factory->getPool($this->name);
$pool = $this->factory->getPool($this->name);
$connection = $pool->get()->getConnection();
// TODO: Handle multi ...
$res = $connection->{$name}(...$arguments);
$connection->release();
return $res;
return $connection->{$name}(...$arguments);
}
}