Fixed bug for mysql connection work not expect in not-coroutine.

This commit is contained in:
李铭昕 2019-01-19 11:01:58 +08:00
parent 5618db39bd
commit 209b28cd43
8 changed files with 193 additions and 37 deletions

View File

@ -13,13 +13,29 @@ namespace Hyperf\Contract;
interface ConnectionInterface
{
/**
* Get the real connection from pool.
* @return mixed
*/
public function getConnection();
/**
* Reconnect the connection.
*/
public function reconnect(): bool;
/**
* Check the connection is valid.
*/
public function check(): bool;
/**
* Close the connection.
*/
public function close(): bool;
/**
* Release the connection to pool.
*/
public function release(): void;
}

View File

@ -14,16 +14,21 @@ namespace Hyperf\DbConnection;
use Hyperf\Contract\ConnectionInterface;
use Hyperf\Database\ConnectionInterface as DbConnectionInterface;
use Hyperf\Database\Connectors\ConnectionFactory;
use Hyperf\DbConnection\Pool\DbPool;
use Hyperf\DbConnection\Traits\DbConnection;
use Hyperf\Pool\Connection as BaseConnection;
use Hyperf\Pool\Exception\ConnectionException;
use Hyperf\Pool\Pool;
use Psr\Container\ContainerInterface;
class Connection extends BaseConnection implements ConnectionInterface, DbConnectionInterface
{
use DbConnection;
/**
* @var DbPool
*/
protected $pool;
/**
* @var DbConnectionInterface
*/
@ -39,16 +44,24 @@ class Connection extends BaseConnection implements ConnectionInterface, DbConnec
*/
protected $config;
/**
* @var Context
*/
protected $context;
/**
* @var float
*/
protected $lastUseTime = 0.0;
public function __construct(ContainerInterface $container, Pool $pool, array $config)
protected $transaction = false;
public function __construct(ContainerInterface $container, DbPool $pool, array $config)
{
parent::__construct($container, $pool);
$this->factory = $container->get(ConnectionFactory::class);
$this->config = $config;
$this->context = $container->get(Context::class);
$this->reconnect();
}
@ -99,4 +112,14 @@ class Connection extends BaseConnection implements ConnectionInterface, DbConnec
{
parent::release();
}
public function setTransaction(bool $transaction): void
{
$this->transaction = $transaction;
}
public function isTransaction(): bool
{
return $this->transaction;
}
}

View File

@ -14,7 +14,6 @@ namespace Hyperf\DbConnection;
use Hyperf\Database\ConnectionInterface;
use Hyperf\Database\ConnectionResolverInterface;
use Hyperf\DbConnection\Pool\PoolFactory;
use Hyperf\Utils\Context;
use Psr\Container\ContainerInterface;
class ConnectionResolver implements ConnectionResolverInterface
@ -54,19 +53,17 @@ class ConnectionResolver implements ConnectionResolverInterface
$name = $this->getDefaultConnection();
}
$connections = [];
if (Context::has('databases')) {
$connections = Context::get('databases');
}
if (isset($connections[$name])) {
return $connections[$name];
$context = $this->container->get(Context::class);
$connection = $context->connection($name);
if ($connection) {
return $connection;
}
$pool = $this->factory->getDbPool($name);
$connection = $pool->get()->getConnection();
$connections[$name] = $connection;
Context::set('databases', $connections);
$context->set($name, $connection);
return $connection;
}

View File

@ -0,0 +1,84 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://hyperf.org
* @document https://wiki.hyperf.org
* @contact group@hyperf.org
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DbConnection;
use Hyperf\Database\ConnectionInterface;
use Hyperf\Framework\Contract\StdoutLoggerInterface;
use Hyperf\Utils\Context as RequestContext;
use Hyperf\Contract\ConnectionInterface as PoolConnectionInterface;
use Psr\Container\ContainerInterface;
class Context
{
/**
* @var ContainerInterface
*/
protected $container;
/**
* @var StdoutLoggerInterface
*/
protected $logger;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
$this->logger = $container->get(StdoutLoggerInterface::class);
}
/**
* Get a connection from request context.
*/
public function connection(string $name): ?ConnectionInterface
{
$connections = [];
if (RequestContext::has('databases')) {
$connections = RequestContext::get('databases');
}
if (isset($connections[$name]) && $connections[$name] instanceof ConnectionInterface) {
$connection = $connections[$name];
if (! $connection instanceof PoolConnectionInterface) {
$this->logger->warning(sprintf(
'Connection[] is not instanceof %s',
get_class($connection),
PoolConnectionInterface::class
));
return $connection;
}
return $connection->getConnection();
}
return null;
}
/**
* @return ConnectionInterface[]
*/
public function connections(): array
{
$connections = [];
if (RequestContext::has('databases')) {
$connections = RequestContext::get('databases');
}
return $connections;
}
public function set($name, ConnectionInterface $connection): void
{
$connections = $this->connections();
$connections[$name] = $connection;
RequestContext::set('databases', $connections);
}
}

View File

@ -12,16 +12,41 @@ declare(strict_types=1);
namespace Hyperf\DbConnection\Listeners;
use Hyperf\Contract\ConnectionInterface;
use Hyperf\DbConnection\Connection;
use Hyperf\DbConnection\Context;
use Hyperf\Event\Annotation\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Contract\StdoutLoggerInterface;
use Hyperf\HttpServer\Event\AfterResponse;
use Hyperf\Utils\Context;
use Psr\Container\ContainerInterface;
/**
* @Listener
*/
class AfterResponseListener implements ListenerInterface
{
/**
* @var ContainerInterface
*/
protected $container;
/**
* @var Context
*/
protected $context;
/**
* @var StdoutLoggerInterface
*/
protected $logger;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
$this->context = $container->get(Context::class);
$this->logger = $container->get(StdoutLoggerInterface::class);
}
public function listen(): array
{
return [
@ -31,13 +56,15 @@ class AfterResponseListener implements ListenerInterface
public function process(object $event)
{
if (Context::has('databases')) {
$dbs = Context::get('databases');
foreach ($dbs as $conn) {
$connections = $this->context->connections();
foreach ($connections as $conn) {
if ($conn instanceof ConnectionInterface) {
if ($conn instanceof Connection && $conn->isTransaction()) {
$conn->rollBack();
$this->logger->error('The Mysql Connection forget commit or rollback.');
}
$conn->release();
}
}
}
}
}

View File

@ -36,6 +36,11 @@ class DbPool extends Pool
parent::__construct($container);
}
public function getName(): string
{
return $this->name;
}
protected function createConnection(): ConnectionInterface
{
return new Connection($this->container, $this, $this->config);

View File

@ -23,91 +23,94 @@ trait DbConnection
{
public function table($table): Builder
{
return $this->connection->table($table);
return $this->__call(__FUNCTION__, func_get_args());
}
public function raw($value): Expression
{
return $this->connection->table($value);
return $this->__call(__FUNCTION__, func_get_args());
}
public function selectOne($query, $bindings = [], $useReadPdo = true)
{
return $this->connection->selectOne($query, $bindings, $useReadPdo);
return $this->__call(__FUNCTION__, func_get_args());
}
public function select($query, $bindings = [], $useReadPdo = true): array
{
return $this->connection->select($query, $bindings, $useReadPdo);
return $this->__call(__FUNCTION__, func_get_args());
}
public function cursor($query, $bindings = [], $useReadPdo = true): Generator
{
return $this->connection->cursor($query, $bindings, $useReadPdo);
return $this->__call(__FUNCTION__, func_get_args());
}
public function insert($query, $bindings = []): bool
{
return $this->connection->insert($query, $bindings);
return $this->__call(__FUNCTION__, func_get_args());
}
public function update($query, $bindings = []): int
{
return $this->connection->update($query, $bindings);
return $this->__call(__FUNCTION__, func_get_args());
}
public function delete($query, $bindings = []): int
{
return $this->connection->delete($query, $bindings);
return $this->__call(__FUNCTION__, func_get_args());
}
public function statement($query, $bindings = []): bool
{
return $this->connection->statement($query, $bindings);
return $this->__call(__FUNCTION__, func_get_args());
}
public function affectingStatement($query, $bindings = []): int
{
return $this->connection->affectingStatement($query, $bindings);
return $this->__call(__FUNCTION__, func_get_args());
}
public function unprepared($query): bool
{
return $this->connection->unprepared($query);
return $this->__call(__FUNCTION__, func_get_args());
}
public function prepareBindings(array $bindings): array
{
return $this->connection->prepareBindings($bindings);
return $this->__call(__FUNCTION__, func_get_args());
}
public function transaction(Closure $callback, $attempts = 1)
{
return $this->connection->transaction($callback, $attempts);
return $this->__call(__FUNCTION__, func_get_args());
}
public function beginTransaction(): void
{
$this->connection->beginTransaction();
$this->setTransaction(true);
$this->__call(__FUNCTION__, func_get_args());
}
public function commit(): void
{
$this->connection->commit();
$this->setTransaction(false);
$this->__call(__FUNCTION__, func_get_args());
}
public function rollBack(): void
{
$this->connection->rollBack();
$this->setTransaction(false);
$this->__call(__FUNCTION__, func_get_args());
}
public function transactionLevel(): int
{
return $this->connection->transactionLevel();
return $this->__call(__FUNCTION__, func_get_args());
}
public function pretend(Closure $callback): array
{
return $this->connection->pretend($callback);
return $this->__call(__FUNCTION__, func_get_args());
}
}

View File

@ -11,7 +11,8 @@
"support": {
},
"require": {
"php": ">=7.2"
"php": ">=7.2",
"hyperf/utils": "dev-master"
},
"require-dev": {
"malukenho/docheader": "^0.1.6",