Fixed bug that rpc client recv failed. (#4993)

This commit is contained in:
李铭昕 2022-08-06 20:40:02 +08:00 committed by GitHub
parent 7887c13578
commit dcfe5eacb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 55 additions and 43 deletions

View File

@ -99,7 +99,7 @@ jobs:
PHP_VERSION: ${{ matrix.php-version }} PHP_VERSION: ${{ matrix.php-version }}
strategy: strategy:
matrix: matrix:
no-dev: [ '', '--no-dev' ] no-dev: [ '--no-dev', '' ]
php-version: [ '8.0', '8.1' ] php-version: [ '8.0', '8.1' ]
max-parallel: 2 max-parallel: 2
fail-fast: false fail-fast: false
@ -182,9 +182,12 @@ jobs:
PHP_VERSION: ${{ matrix.php-version }} PHP_VERSION: ${{ matrix.php-version }}
strategy: strategy:
matrix: matrix:
php-version: [ '8.0' ] php-version: [ '8.0', '8.1' ]
log-version: [ '^1.0', '^2.0', '^3.0' ] log-version: [ '^1.0', '^2.0', '^3.0' ]
max-parallel: 3 exclude:
- php-version: '8.1'
log-version: '^1.0'
max-parallel: 5
fail-fast: false fail-fast: false
steps: steps:
- name: Checkout - name: Checkout

View File

@ -60,3 +60,4 @@ parameters:
- '#Class RedisSentinel constructor invoked with .*#' - '#Class RedisSentinel constructor invoked with .*#'
- '#Hyperf\\Engine\\Http\\Server#' - '#Hyperf\\Engine\\Http\\Server#'
- '#Swow\\Socket#' - '#Swow\\Socket#'
- '#Class Hyperf\\Logger\\Logger extends @final class Monolog\\Logger#'

View File

@ -22,8 +22,6 @@ use Hyperf\Pool\Pool;
use Psr\Container\ContainerInterface; use Psr\Container\ContainerInterface;
/** /**
* @method bool|int send($data)
* @method bool|string recv(float $timeout)
* @property int $errCode * @property int $errCode
* @property string $errMsg * @property string $errMsg
*/ */
@ -48,16 +46,26 @@ class RpcConnection extends BaseConnection implements ConnectionInterface
$this->reconnect(); $this->reconnect();
} }
public function __call($name, $arguments)
{
return $this->connection->{$name}(...$arguments);
}
public function __get($name) public function __get($name)
{ {
return $this->connection->{$name}; return $this->connection->{$name};
} }
public function send(string $data): int|false
{
return $this->connection->sendAll($data);
}
public function recv(float $timeout = 0): string|false
{
return $this->recvPacket($timeout);
}
public function recvPacket(float $timeout = 0): string|false
{
return $this->connection->recvPacket($timeout);
}
/** /**
* @throws ConnectionException * @throws ConnectionException
* @return $this * @return $this

View File

@ -22,16 +22,16 @@ trait RecvTrait
*/ */
public function recvAndCheck(mixed $client, float $timeout) public function recvAndCheck(mixed $client, float $timeout)
{ {
$data = $client->recv($timeout); $data = $client->recvPacket($timeout);
if ($data === '') { if ($data === '') {
// RpcConnection: When the next time the connection is taken out of the connection pool, it will reconnecting to the target service. // RpcConnection: When the next time the connection is taken out of the connection pool, it will reconnect to the target service.
// Client: It will reconnect to the target service in the next request. // Client: It will reconnect to the target service in the next request.
$client->close(); $client->close();
throw new RecvException('Connection is closed. ' . $client->errMsg, $client->errCode); throw new RecvException('Connection is closed. ' . $client->errMsg, $client->errCode);
} }
if ($data === false) { if ($data === false) {
$client->close(); $client->close();
throw new RecvException('Error receiving data, errno=' . $client->errCode . ' errmsg=' . swoole_strerror($client->errCode), $client->errCode); throw new RecvException('Error receiving data, errno=' . $client->errCode . ' errmsg=' . $client->errMsg, $client->errCode);
} }
return $data; return $data;

View File

@ -22,26 +22,23 @@ class RpcConnectionStub extends RpcConnection
*/ */
public $reconnectCallback; public $reconnectCallback;
public function __call($name, $arguments)
{
if ($name == 'send') {
$this->lastData = $arguments[0];
return strlen($arguments[0]);
}
if ($name == 'recv') {
return $this->lastData;
}
return false;
}
public function __get($name) public function __get($name)
{ {
return false; return false;
// return $this->connection->{$name}; // return $this->connection->{$name};
} }
public function send(string $data): int|false
{
$this->lastData = $data;
return strlen($data);
}
public function recvPacket(float $timeout = 0): string|false
{
return $this->lastData;
}
public function reconnect(): bool public function reconnect(): bool
{ {
if ($this->reconnectCallback) { if ($this->reconnectCallback) {

View File

@ -22,6 +22,7 @@ use HyperfTest\Logger\Stub\FooProcessor;
use Mockery; use Mockery;
use Monolog\Handler\StreamHandler; use Monolog\Handler\StreamHandler;
use Monolog\Handler\TestHandler; use Monolog\Handler\TestHandler;
use Monolog\LogRecord;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface; use Psr\Container\ContainerInterface;
use ReflectionClass; use ReflectionClass;
@ -135,10 +136,10 @@ class LoggerFactoryTest extends TestCase
$this->assertSame( $this->assertSame(
'Hello world.Hello world.', 'Hello world.Hello world.',
Context::get('test.logger.foo_handler.record')['message'] Context::get('test.logger.foo_handler.record')['extra']['message']
); );
$this->assertTrue(Context::get('test.logger.foo_handler.record')['bar']); $this->assertTrue(Context::get('test.logger.foo_handler.record')['extra']['bar']);
$this->assertTrue(Context::get('test.logger.foo_handler.record')['callback']); $this->assertTrue(Context::get('test.logger.foo_handler.record')['extra']['callback']);
} }
public function testDefaultProcessor() public function testDefaultProcessor()
@ -157,7 +158,7 @@ class LoggerFactoryTest extends TestCase
$this->assertSame( $this->assertSame(
'Hello world.Hello world.', 'Hello world.Hello world.',
Context::get('test.logger.foo_handler.record')['message'] Context::get('test.logger.foo_handler.record')['extra']['message']
); );
} }
@ -230,8 +231,8 @@ class LoggerFactoryTest extends TestCase
[ [
'class' => BarProcessor::class, 'class' => BarProcessor::class,
], ],
function (array $records) { function (array|LogRecord $records) {
$records['callback'] = true; $records['extra']['callback'] = true;
return $records; return $records;
}, },
], ],

View File

@ -14,6 +14,7 @@ namespace HyperfTest\Logger;
use Hyperf\Contract\StdoutLoggerInterface; use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Logger\Logger; use Hyperf\Logger\Logger;
use Monolog\Handler\TestHandler; use Monolog\Handler\TestHandler;
use Monolog\LogRecord;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
@ -53,7 +54,7 @@ class LoggerTest extends TestCase
{ {
$logger = new Logger('test', [ $logger = new Logger('test', [
$handler = new class() extends TestHandler { $handler = new class() extends TestHandler {
protected function write(array $record): void protected function write(array|LogRecord $record): void
{ {
usleep(1); usleep(1);
parent::write($record); parent::write($record);

View File

@ -11,11 +11,13 @@ declare(strict_types=1);
*/ */
namespace HyperfTest\Logger\Stub; namespace HyperfTest\Logger\Stub;
use Monolog\LogRecord;
class BarProcessor class BarProcessor
{ {
public function __invoke(array $records) public function __invoke(array|LogRecord $records)
{ {
$records['bar'] = true; $records['extra']['bar'] = true;
return $records; return $records;
} }
} }

View File

@ -13,10 +13,11 @@ namespace HyperfTest\Logger\Stub;
use Hyperf\Context\Context; use Hyperf\Context\Context;
use Monolog\Handler\StreamHandler; use Monolog\Handler\StreamHandler;
use Monolog\LogRecord;
class FooHandler extends StreamHandler class FooHandler extends StreamHandler
{ {
public function write(array $record): void public function write(array|LogRecord $record): void
{ {
Context::set('test.logger.foo_handler.record', $record); Context::set('test.logger.foo_handler.record', $record);
} }

View File

@ -11,20 +11,18 @@ declare(strict_types=1);
*/ */
namespace HyperfTest\Logger\Stub; namespace HyperfTest\Logger\Stub;
use Monolog\LogRecord;
use Monolog\Processor\ProcessorInterface; use Monolog\Processor\ProcessorInterface;
class FooProcessor implements ProcessorInterface class FooProcessor implements ProcessorInterface
{ {
protected $repeat; public function __construct(protected int $repeat)
public function __construct(int $repeat)
{ {
$this->repeat = 2;
} }
public function __invoke(array $records) public function __invoke(array|LogRecord $records)
{ {
$records['message'] = str_repeat($records['message'], $this->repeat); $records['extra']['message'] = str_repeat($records['message'], $this->repeat);
return $records; return $records;
} }
} }