Optimize kafka reporter (#6075)

This commit is contained in:
Deeka Wong 2023-08-23 01:03:59 -05:00 committed by GitHub
parent 71f987d586
commit 3b50eef78f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 141 additions and 78 deletions

View File

@ -3,6 +3,7 @@
## Added ## Added
- [#6070](https://github.com/hyperf/hyperf/pull/6070) Added `php_serialize` protocol for `hyperf/rpc-multilex`. - [#6070](https://github.com/hyperf/hyperf/pull/6070) Added `php_serialize` protocol for `hyperf/rpc-multilex`.
- [#6069](https://github.com/hyperf/hyperf/pull/6069) [#6075](https://github.com/hyperf/hyperf/pull/6075) Added kafka reporter for `hyperf/tracer`.
## Fixed ## Fixed

View File

@ -11,41 +11,9 @@ declare(strict_types=1);
*/ */
namespace Hyperf\Tracer\Adapter; namespace Hyperf\Tracer\Adapter;
use Hyperf\Guzzle\ClientFactory as GuzzleClientFactory; /**
use RuntimeException; * @deprecated v3.0, will remove in v3.1, use \Hyperf\Tracer\Adapter\Reporter\HttpClientFactory instead.
use Zipkin\Reporters\Http\ClientFactory; */
class HttpClientFactory extends Reporter\HttpClientFactory
class HttpClientFactory implements ClientFactory
{ {
public function __construct(private GuzzleClientFactory $guzzleClientFactory)
{
}
public function build(array $options): callable
{
return function (string $payload) use ($options): void {
$url = $options['endpoint_url'];
unset($options['endpoint_url']);
$client = $this->guzzleClientFactory->create($options);
$additionalHeaders = $options['headers'] ?? [];
$requiredHeaders = [
'Content-Type' => 'application/json',
'Content-Length' => strlen($payload),
'b3' => '0',
];
$headers = array_merge($additionalHeaders, $requiredHeaders);
$response = $client->post($url, [
'body' => $payload,
'headers' => $headers,
// If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options.
'no_aspect' => true,
]);
$statusCode = $response->getStatusCode();
if ($statusCode !== 202) {
throw new RuntimeException(
sprintf('Reporting of spans failed, status code %d', $statusCode)
);
}
};
}
} }

View File

@ -0,0 +1,51 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Tracer\Adapter\Reporter;
use Hyperf\Guzzle\ClientFactory as GuzzleClientFactory;
use RuntimeException;
use Zipkin\Reporters\Http\ClientFactory;
class HttpClientFactory implements ClientFactory
{
public function __construct(private GuzzleClientFactory $guzzleClientFactory)
{
}
public function build(array $options): callable
{
return function (string $payload) use ($options): void {
$url = $options['endpoint_url'];
unset($options['endpoint_url']);
$client = $this->guzzleClientFactory->create($options);
$additionalHeaders = $options['headers'] ?? [];
$requiredHeaders = [
'Content-Type' => 'application/json',
'Content-Length' => strlen($payload),
'b3' => '0',
];
$headers = array_merge($additionalHeaders, $requiredHeaders);
$response = $client->post($url, [
'body' => $payload,
'headers' => $headers,
// If 'no_aspect' option is true, then the HttpClientAspect will not modify the client options.
'no_aspect' => true,
]);
$statusCode = $response->getStatusCode();
if ($statusCode !== 202) {
throw new RuntimeException(
sprintf('Reporting of spans failed, status code %d', $statusCode)
);
}
};
}
}

View File

@ -11,8 +11,6 @@ declare(strict_types=1);
*/ */
namespace Hyperf\Tracer\Adapter\Reporter; namespace Hyperf\Tracer\Adapter\Reporter;
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
use Throwable; use Throwable;
@ -21,28 +19,24 @@ use Zipkin\Reporter;
use Zipkin\Reporters\JsonV2Serializer; use Zipkin\Reporters\JsonV2Serializer;
use Zipkin\Reporters\SpanSerializer; use Zipkin\Reporters\SpanSerializer;
use function count;
use function json_last_error;
use function sprintf; use function sprintf;
class Kafka implements Reporter class Kafka implements Reporter
{ {
private Producer $producer;
private string $topic;
private LoggerInterface $logger; private LoggerInterface $logger;
private SpanSerializer $serializer; private SpanSerializer $serializer;
public function __construct( public function __construct(
array $options = [], private array $options,
Producer $producer = null, private KafkaClientFactory $clientFactory,
LoggerInterface $logger = null, LoggerInterface $logger = null,
SpanSerializer $serializer = null SpanSerializer $serializer = null
) { ) {
$this->topic = $options['topic'] ?? 'zipkin';
$this->serializer = $serializer ?? new JsonV2Serializer(); $this->serializer = $serializer ?? new JsonV2Serializer();
$this->logger = $logger ?? new NullLogger(); $this->logger = $logger ?? new NullLogger();
$this->producer = $producer ?? $this->createProducer($options);
} }
/** /**
@ -50,43 +44,25 @@ class Kafka implements Reporter
*/ */
public function report(array $spans): void public function report(array $spans): void
{ {
if (empty($spans)) { if (count($spans) === 0) {
return; return;
} }
try { $payload = $this->serializer->serialize($spans);
$this->producer->send(
$this->topic, if (! $payload) {
$this->serializer->serialize($spans), $this->logger->error(
uniqid('', true) sprintf('failed to encode spans with code %d', json_last_error())
); );
return;
}
$client = $this->clientFactory->build($this->options);
try {
$client($payload);
} catch (Throwable $e) { } catch (Throwable $e) {
$this->logger->error(sprintf('failed to report spans: %s', $e->getMessage())); $this->logger->error(sprintf('failed to report spans: %s', $e->getMessage()));
} }
} }
private function createProducer(array $options): Producer
{
$options = array_replace([
'bootstrap_servers' => '127.0.0.1:9092',
'acks' => -1,
'connect_timeout' => 1,
'send_timeout' => 1,
], $options);
$config = new ProducerConfig();
$config->setBootstrapServer($options['bootstrap_servers']);
$config->setUpdateBrokers(true);
if (is_int($options['acks'])) {
$config->setAcks($options['acks']);
}
if (is_float($options['connect_timeout'])) {
$config->setConnectTimeout($options['connect_timeout']);
}
if (is_float($options['send_timeout'])) {
$config->setSendTimeout($options['send_timeout']);
}
return new Producer($config);
}
} }

View File

@ -0,0 +1,58 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Tracer\Adapter\Reporter;
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;
class KafkaClientFactory
{
private ?Producer $producer = null;
public function build(array $options): callable
{
$this->producer ??= $this->createProducer($options);
return function (string $payload) use ($options): void {
$this->producer->send(
$options['topic'] ?? 'zipkin',
$payload,
uniqid('', true)
);
};
}
private function createProducer(array $options): Producer
{
$options = array_replace([
'bootstrap_servers' => '127.0.0.1:9092',
'acks' => -1,
'connect_timeout' => 1,
'send_timeout' => 1,
], $options);
$config = new ProducerConfig();
$config->setBootstrapServer($options['bootstrap_servers']);
$config->setUpdateBrokers(true);
if (is_int($options['acks'])) {
$config->setAcks($options['acks']);
}
if (is_float($options['connect_timeout'])) {
$config->setConnectTimeout($options['connect_timeout']);
}
if (is_float($options['send_timeout'])) {
$config->setSendTimeout($options['send_timeout']);
}
return new Producer($config);
}
}

View File

@ -18,11 +18,20 @@ use function Hyperf\Support\make;
class ReporterFactory class ReporterFactory
{ {
public function __construct(
private HttpClientFactory $httpClientFactory,
) {
}
public function make(array $option = []): Reporter public function make(array $option = []): Reporter
{ {
$class = $option['class'] ?? ''; $class = $option['class'] ?? '';
$constructor = $option['constructor'] ?? []; $constructor = $option['constructor'] ?? [];
if ($class === \Zipkin\Reporters\Http::class) {
$option['constructor']['requesterFactory'] = $this->httpClientFactory;
}
if (! class_exists($class)) { if (! class_exists($class)) {
throw new RuntimeException(sprintf('Class %s is not exists.', $class)); throw new RuntimeException(sprintf('Class %s is not exists.', $class));
} }

View File

@ -133,7 +133,7 @@ class TracerFactoryTest extends TestCase
protected function getContainer($config) protected function getContainer($config)
{ {
$container = Mockery::mock(Container::class); $container = Mockery::mock(Container::class);
$client = Mockery::mock(\Hyperf\Tracer\Adapter\HttpClientFactory::class); $client = Mockery::mock(\Hyperf\Tracer\Adapter\Reporter\HttpClientFactory::class);
$reporter = Mockery::mock(\Hyperf\Tracer\Adapter\Reporter\ReporterFactory::class); $reporter = Mockery::mock(\Hyperf\Tracer\Adapter\Reporter\ReporterFactory::class);
$reporter->shouldReceive('make') $reporter->shouldReceive('make')
->andReturn(new \Zipkin\Reporters\Http([], $client)); ->andReturn(new \Zipkin\Reporters\Http([], $client));