mirror of
https://gitee.com/hyperf/hyperf.git
synced 2024-11-30 02:37:58 +08:00
fix: throw exceptions when disconnect
This commit is contained in:
parent
72ec6d243c
commit
1df8f30585
@ -21,7 +21,7 @@ class Request extends BaseRequest
|
||||
private const DEFAULT_CONTENT_TYPE = 'application/grpc+proto';
|
||||
|
||||
/**
|
||||
* @var bool $usePipelineRead
|
||||
* @var bool
|
||||
*/
|
||||
public $usePipelineRead;
|
||||
|
||||
|
@ -19,7 +19,7 @@ use Hyperf\GrpcClient\Exception\GrpcClientException;
|
||||
*/
|
||||
class ServerStreamingCall extends StreamingCall
|
||||
{
|
||||
public function push($message): bool
|
||||
public function push($message): void
|
||||
{
|
||||
throw new GrpcClientException('ServerStreamingCall can not push data by client');
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ declare(strict_types=1);
|
||||
namespace Hyperf\GrpcClient;
|
||||
|
||||
use Hyperf\Grpc\Parser;
|
||||
use Hyperf\GrpcClient\Exception\GrpcClientException;
|
||||
use RuntimeException;
|
||||
|
||||
class StreamingCall
|
||||
@ -65,7 +66,7 @@ class StreamingCall
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function send($message = null): bool
|
||||
public function send($message = null): void
|
||||
{
|
||||
if (! $this->getStreamId()) {
|
||||
$this->setStreamId($this->client->openStream(
|
||||
@ -74,17 +75,22 @@ class StreamingCall
|
||||
'',
|
||||
true
|
||||
));
|
||||
return $this->getStreamId() > 0;
|
||||
if ($this->getStreamId() <= 0) {
|
||||
throw $this->newException();
|
||||
}
|
||||
}
|
||||
throw new RuntimeException('You can only send once by a streaming call except connection closed and you retry.');
|
||||
}
|
||||
|
||||
public function push($message): bool
|
||||
public function push($message): void
|
||||
{
|
||||
if (! $this->getStreamId()) {
|
||||
$this->setStreamId($this->client->openStream($this->method, null, '', true));
|
||||
}
|
||||
return $this->client->write($this->getStreamId(), Parser::serializeMessage($message), false);
|
||||
$success = $this->client->write($this->getStreamId(), Parser::serializeMessage($message), false);
|
||||
if (! $success) {
|
||||
throw $this->newException();
|
||||
}
|
||||
}
|
||||
|
||||
public function recv(float $timeout = -1.0)
|
||||
@ -98,10 +104,9 @@ class StreamingCall
|
||||
$this->streamId = 0;
|
||||
}
|
||||
}
|
||||
// disconnected
|
||||
// disconnected or timed out
|
||||
if ($recv === false) {
|
||||
$this->streamId = 0;
|
||||
return[null, 14, $recv];
|
||||
throw $this->newException();
|
||||
}
|
||||
// server ended the stream
|
||||
if ($recv->pipeline === false) {
|
||||
@ -112,12 +117,20 @@ class StreamingCall
|
||||
return Parser::parseResponse($recv, $this->deserialize);
|
||||
}
|
||||
|
||||
public function end(): bool
|
||||
public function end(): void
|
||||
{
|
||||
if (! $this->getStreamId()) {
|
||||
return false;
|
||||
throw $this->newException();
|
||||
}
|
||||
// we cannot reset the streamId here, otherwise the client streaming will break.
|
||||
return $this->client->write($this->getStreamId(), null, true);
|
||||
$success = $this->client->write($this->getStreamId(), null, true);
|
||||
if (! $success) {
|
||||
throw $this->newException();
|
||||
}
|
||||
}
|
||||
|
||||
private function newException(): GrpcClientException
|
||||
{
|
||||
return new GrpcClientException('the remote server may have been disconnected or timed out');
|
||||
}
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ class Feature extends \Google\Protobuf\Internal\Message
|
||||
* Optional. Data for populating the Message object.
|
||||
*
|
||||
* @var string $name
|
||||
* The name of the feature.
|
||||
* The name of the feature
|
||||
* @var \Routeguide\Point $location
|
||||
* The point where the feature is detected.
|
||||
* }
|
||||
|
@ -44,7 +44,7 @@ class Rectangle extends \Google\Protobuf\Internal\Message
|
||||
* Optional. Data for populating the Message object.
|
||||
*
|
||||
* @var \Routeguide\Point $lo
|
||||
* One corner of the rectangle.
|
||||
* One corner of the rectangle
|
||||
* @var \Routeguide\Point $hi
|
||||
* The other corner of the rectangle.
|
||||
* }
|
||||
|
@ -43,7 +43,7 @@ class RouteNote extends \Google\Protobuf\Internal\Message
|
||||
* Optional. Data for populating the Message object.
|
||||
*
|
||||
* @var \Routeguide\Point $location
|
||||
* The location from which the message is sent.
|
||||
* The location from which the message is sent
|
||||
* @var string $message
|
||||
* The message to be sent.
|
||||
* }
|
||||
|
@ -60,11 +60,11 @@ class RouteSummary extends \Google\Protobuf\Internal\Message
|
||||
* Optional. Data for populating the Message object.
|
||||
*
|
||||
* @var int $point_count
|
||||
* The number of points received.
|
||||
* The number of points received
|
||||
* @var int $feature_count
|
||||
* The number of known features passed while traversing the route.
|
||||
* The number of known features passed while traversing the route
|
||||
* @var int $distance
|
||||
* The distance covered in metres.
|
||||
* The distance covered in metres
|
||||
* @var int $elapsed_time
|
||||
* The duration of the traversal in seconds.
|
||||
* }
|
||||
|
Loading…
Reference in New Issue
Block a user