Merge remote-tracking branch 'remotes/upstream/master'

This commit is contained in:
You Ming 2019-11-10 11:07:40 +08:00
commit 90a7aecea4
121 changed files with 5544 additions and 206 deletions

View File

@ -5,11 +5,11 @@ sudo: required
matrix:
include:
- php: 7.2
env: SW_VERSION="4.4.8"
env: SW_VERSION="4.4.12"
- php: 7.3
env: SW_VERSION="4.4.8"
env: SW_VERSION="4.4.12"
- php: master
env: SW_VERSION="4.4.8"
env: SW_VERSION="4.4.12"
allow_failures:
- php: master
@ -38,6 +38,6 @@ before_script:
- composer config -g process-timeout 900 && composer update
script:
- composer analyse src/di src/json-rpc src/tracer src/metric
- composer analyse src/di src/json-rpc src/tracer src/metric src/redis src/nats src/db
- composer test -- --exclude-group NonCoroutine
- vendor/bin/phpunit --group NonCoroutine

View File

@ -1,20 +1,42 @@
# v1.1.5 - TBD
# v1.1.6 - TBD
## Added
- [#827](https://github.com/hyperf/hyperf/pull/827) Added a simple db component.
## Fixed
- [#897](https://github.com/hyperf/hyperf/pull/897) Fixed connection pool of `Hyperf\Nats\Annotation\Consumer` does not works as expected.
- [#903](https://github.com/hyperf/hyperf/pull/903) Fixed execute `init-proxy` command can not stop when `hyperf/rpc-client` component exists.
- [#904](https://github.com/hyperf/hyperf/pull/904) Fixed the hooked I/O request does not works in the listener that listening `Hyperf\Framework\Event\BeforeMainServerStart` event.
# v1.1.5 - 2019-11-07
## Added
- [#812](https://github.com/hyperf/hyperf/pull/812) Added singleton crontab task support.
- [#820](https://github.com/hyperf/hyperf/pull/820) Added nats component.
- [#832](https://github.com/hyperf/hyperf/pull/832) Added `Hyperf\Utils\Codec\Json`.
- [#833](https://github.com/hyperf/hyperf/pull/833) Added `Hyperf\Utils\Backoff`.
- [#852](https://github.com/hyperf/hyperf/pull/852) Added a `clear()` method for `Hyperf\Utils\Parallel` to clear added callbacks.
- [#854](https://github.com/hyperf/hyperf/pull/854) Added `GraphQLMiddleware`.
- [#859](https://github.com/hyperf/hyperf/pull/859) Added Consul cluster mode support, now available to fetch the service information from Consul cluster.
- [#873](https://github.com/hyperf/hyperf/pull/873) Added redis cluster.
## Fixed
- [#831](https://github.com/hyperf/hyperf/pull/831) Fixed Redis client can not reconnect the server after the Redis server restarted.
- [#835](https://github.com/hyperf/hyperf/pull/835) Fixed `Request::inputs` default value does not works.
- [#841](https://github.com/hyperf/hyperf/pull/841) Fixed migration does not take effect under multiple data sources.
- [#844](https://github.com/hyperf/hyperf/pull/844) Fixed the composer.json reader support root namespace.
- [#844](https://github.com/hyperf/hyperf/pull/844) Fixed the reader of `composer.json` does not support the root namespace.
- [#846](https://github.com/hyperf/hyperf/pull/846) Fixed `scan` `hScan` `zScan` and `sScan` don't works for Redis.
- [#850](https://github.com/hyperf/hyperf/pull/850) Fixed logger group does not works when the name is same.
## Optimized
- [#832](https://github.com/hyperf/hyperf/pull/832) Optimized that response will throw a exception when json format failed.
- [#840](https://github.com/hyperf/hyperf/pull/840) Use `\Swoole\Timer::*` to instead of `swoole_timer_*` functions.
- [#859](https://github.com/hyperf/hyperf/pull/859) Optimized the logical of fetch health nodes infomation from consul.
# v1.1.4 - 2019-10-31

View File

@ -8,23 +8,23 @@ English | [中文](./README-CN.md)
# Introduction
Hyperf is a high-performance, highly flexible PHP CLI framework based on `Swoole 4.4+`. It has a built-in coroutine server with a large number of commonly used components. It provides ultra-high and better performance than the traditional PHP-FPM-based framework and also maintains extremely flexible scalability at the same time. Standard components are implemented in the latest PSR standards, and a powerful dependency injection design ensures that most components or classes within the framework are replaceable.
Hyperf is an extremely performant and flexible PHP CLI framework based on `Swoole 4.4+`, powered by the state-of-the-art coroutine server and a large number of battle-tested components. Aside from the decisive benchmark outmatching against PHP-FPM frameworks, Hyperf also distinct itself by its focus on flexibility and composability. Hyperf ships with an AOP-enabling dependency injector to ensure components and classes are pluggable and meta programmable. All of its core components strictly follow the PSR standards and thus can be used in other frameworks.
In addition to providing common coroutine clients such as `MySQL coroutine client` and `Redis coroutine client`, the Hyperf component libraries are also prepared for the coroutine version of `Eloquent ORM`, `WebSocket server and client`, `JSON RPC server and client`, `gRPC server and client`, `Zipkin/Jaeger (OpenTracing) client`, `Guzzle HTTP client`, `Elasticsearch client`, `Consul client`, `ETCD client`, `AMQP component`, `Apollo configuration center`, `Aliyun ACM`, `ETCD configuration center`, `Token bucket algorithm-based limiter`, `Universal connection pool`, `Circuit breaker`, `Swagger`, `Swoole Tracker`, `View engine (Blake/Smarty)`, `Snowflake` etc. Therefore, the trouble of implementing the corresponding coroutine version client by yourself can be avoided. Hyperf also provides convenient functions such as `Dependency injection`, `Annotation`, `AOP (aspect-oriented programming)`, `Middleware`, `Custom Processes`, `Event Manager`, `Simply Redis MQ`, ` RabbitMQ`, `Automatic model cache`, `Simple Cache`, `Seconds level crontab`, `Translation`, `Validation` to meet a wide range of technical and business scenarios.
Hyperf's architecture is built upon the combination of `Coroutine`, `Dependency injection`, `Events`, `Annotation`, `AOP (aspect-oriented programming)`. Core components provided by Hyperf can be used out of the box in coroutine context. The set includes but not limited to: `MySQL coroutine client`, `Redis coroutine client`, `WebSocket server and client`, `JSON RPC server and client`, `gRPC server and client`, `Zipkin/Jaeger (OpenTracing) client`, `Guzzle HTTP client`, `Elasticsearch client`, `Consul client`, `ETCD client`, `AMQP component`, `Apollo configuration center`, `Aliyun ACM`, `ETCD configuration center`, `Token bucket algorithm-based limiter`, `Universal connection pool`, `Circuit breaker`, `Swagger`, `Swoole Tracker`, `Snowflake`, `Simply Redis MQ`, ` RabbitMQ`, `Seconds level crontab`, `Custom Processes`, etc. Be assured Hyperf is still a PHP framework. You will also find familiar packages such as `Middleware`, `Event Manager`, `Coroutine optimized Eloquent ORM` (And Model Cache!), `Translation`, `Validation`, `View engine (Blake/Smarty)` and more at your command.
# Original intention
# Origin
Although many new PHP frameworks have appeared, we still haven't seen a comprehensive framework, which introduces an elegant design and ultra-high performance, suitable for PHP microservices and as an evangelist of PHP microservices. For the original intention of Hyperf and its team members, we will continue to invest in it, and you are welcome to join us to participate in open source development.
Many new PHP frameworks had emerged over the years, yet we were still waiting for one that unites the ultra-performance and elegant design, as well as paving the way for PHP microservice. Hence Hyperf was born to be the pioneer. With this vision in mind, we will continue to invest in it, and you are welcome to join us to participate in open source development.
# Design concept
# Design Goals
`Hyperspeed + Flexibility = Hyperf`, from the framework name we have been used `hyperfspeed (ultra-high performance)` and `flexibility` as the gene of Hyperf.
`Hyperspeed + Flexibility = Hyperf`. The equation hidden in the name exhibits Hyperf's genetic ambition.
For ultra-high performance, Hyperf based on the Swoole coroutine, it providered an amazing performance, Hyperf team also makes a lots of code optimizations on the framework design to ensure ultra-high performance.
Hyperspeed: Leveraging Swoole coroutine, Hyperf is capable of handling massive traffic. Hyperf team made many optimizations throughout the framework to eliminate every obstacle between the end-user and the roaring engine.
For flexibility, based on the powerful dependency injection component of Hyperf, all components are based on [PSR](https://www.php-fig.org/psr) and the contracts that defined by Hyperf, so that most of the components or classes within the framework are replaceable and re-useable.
Flexibility: Hyperf believes its Dependency Injection component is best in class. With the help of Hyperf DI, components and class are all pluggable and meta programmable. On the other hand, All Hyperf components are mean to be shared interchangeably with the world, as they all follow [PSR](https://www.php-fig.org/psr) or open contracts.
Based on the above characteristics, Hyperf has a lots of possibilities, such as implementing Web servers, gateway servers, distributed middleware software, microservices architecture, game servers, and Internet of Things (IoT).
Thanks to these metrits, Hyperf has enabled untapped potential in many areas, such as implementing Web servers, gateway servers, distributed middleware software, microservices architecture, game servers, and Internet of Things (IoT).
# Documentation

View File

@ -25,7 +25,7 @@ foreach ($files as $file) {
$composerJson = json_decode(file_get_contents($file), true);
foreach ($composerJson['autoload']['files'] ?? [] as $file) {
$autoloadFiles[] = "src/{$component}/". preg_replace('#^./#', '', $file);
$autoloadFiles[] = "src/{$component}/" . preg_replace('#^./#', '', $file);
}
foreach ($composerJson['autoload']['psr-4'] ?? [] as $ns => $dir) {
$autoload[$ns] = "src/{$component}/" . trim($dir, '/') . '/';
@ -51,5 +51,5 @@ $json->extra->hyperf->config = $configProviders;
file_put_contents(
__DIR__ . '/../composer.json',
json_encode($json, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES | JSON_PRETTY_PRINT)
json_encode($json, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES | JSON_PRETTY_PRINT) . PHP_EOL
);

View File

@ -18,10 +18,12 @@
"doctrine/instantiator": "^1.0",
"egulias/email-validator": "^2.1",
"elasticsearch/elasticsearch": "^6.1",
"endclothing/prometheus_client_php": "^0.9.1",
"fig/http-message-util": "^1.1.2",
"google/protobuf": "^3.6.1",
"grpc/grpc": "^1.15",
"guzzlehttp/guzzle": "^6.3",
"ircmaxell/random-lib": "^1.2",
"jcchavezs/zipkin-opentracing": "^0.1.2",
"jean85/pretty-package-versions": "^1.2",
"monolog/monolog": "^1.24",
@ -40,8 +42,7 @@
"start-point/etcd-php": "^1.1",
"symfony/console": "^4.2",
"symfony/finder": "^4.1",
"vlucas/phpdotenv": "^3.1",
"endclothing/prometheus_client_php": "^0.9.1"
"vlucas/phpdotenv": "^3.1"
},
"require-dev": {
"doctrine/common": "@stable",
@ -114,6 +115,7 @@
"files": [
"src/config/src/Functions.php",
"src/di/src/Functions.php",
"src/nats/src/Functions.php",
"src/translation/src/Functions.php",
"src/utils/src/Functions.php"
],
@ -131,6 +133,7 @@
"Hyperf\\Consul\\": "src/consul/src/",
"Hyperf\\Contract\\": "src/contract/src/",
"Hyperf\\Crontab\\": "src/crontab/src/",
"Hyperf\\DB\\": "src/db/src/",
"Hyperf\\Database\\": "src/database/src/",
"Hyperf\\DbConnection\\": "src/db-connection/src/",
"Hyperf\\Devtool\\": "src/devtool/src/",
@ -155,6 +158,7 @@
"Hyperf\\Metric\\": "src/metric/src/",
"Hyperf\\ModelCache\\": "src/model-cache/src/",
"Hyperf\\ModelListener\\": "src/model-listener/src/",
"Hyperf\\Nats\\": "src/nats/src/",
"Hyperf\\Paginator\\": "src/paginator/src/",
"Hyperf\\Pool\\": "src/pool/src/",
"Hyperf\\Process\\": "src/process/src/",
@ -197,6 +201,7 @@
"HyperfTest\\Constants\\": "src/constants/tests/",
"HyperfTest\\Consul\\": "src/consul/tests/",
"HyperfTest\\Crontab\\": "src/crontab/tests/",
"HyperfTest\\DB\\": "src/db/tests/",
"HyperfTest\\Database\\": "src/database/tests/",
"HyperfTest\\DbConnection\\": "src/db-connection/tests/",
"HyperfTest\\Di\\": "src/di/tests/",
@ -216,6 +221,7 @@
"HyperfTest\\Metric\\": "src/metric/tests/",
"HyperfTest\\ModelCache\\": "src/model-cache/tests/",
"HyperfTest\\ModelListener\\": "src/model-listener/tests/",
"HyperfTest\\Nats\\": "src/nats/tests/",
"HyperfTest\\Paginator\\": "src/paginator/tests/",
"HyperfTest\\Pool\\": "src/pool/tests/",
"HyperfTest\\Process\\": "src/process/tests/",
@ -252,6 +258,7 @@
"Hyperf\\Constants\\ConfigProvider",
"Hyperf\\Consul\\ConfigProvider",
"Hyperf\\Crontab\\ConfigProvider",
"Hyperf\\DB\\ConfigProvider",
"Hyperf\\DbConnection\\ConfigProvider",
"Hyperf\\Devtool\\ConfigProvider",
"Hyperf\\Di\\ConfigProvider",
@ -271,6 +278,7 @@
"Hyperf\\Metric\\ConfigProvider",
"Hyperf\\ModelCache\\ConfigProvider",
"Hyperf\\ModelListener\\ConfigProvider",
"Hyperf\\Nats\\ConfigProvider",
"Hyperf\\Paginator\\ConfigProvider",
"Hyperf\\Pool\\ConfigProvider",
"Hyperf\\Process\\ConfigProvider",
@ -308,4 +316,4 @@
},
"minimum-stability": "dev",
"prefer-stable": true
}
}

View File

@ -38,6 +38,7 @@
## 依赖注入容器
- [hyperf/di](https://github.com/hyperf/di) Hyperf 官方提供的支持注解及 AOP 的依赖注入容器
- [reasno/lazy-loader](https://github.com/Reasno/LazyLoader) 为Hyperf DI补充基于类型提示的懒加载注入。
## 服务
@ -109,13 +110,16 @@
## Swoole
- - [hyperf/swoole-tracker](https://github.com/hyperf/swoole-tracker) Hyperf 官方提供的对接 Swoole Tracker 的组件,提供阻塞分析、性能分析、内存泄漏分析、运行状态及调用统计等功能
- - [hyperf/task](https://github.com/hyperf/task) Hyperf 官方提供的 Task 组件,对 Swoole 的 Task 机制进行了封装及抽象,提供便捷的注解用法
- [hyperf/swoole-tracker](https://github.com/hyperf/swoole-tracker) Hyperf 官方提供的对接 Swoole Tracker 的组件,提供阻塞分析、性能分析、内存泄漏分析、运行状态及调用统计等功能
- [hyperf/task](https://github.com/hyperf/task) Hyperf 官方提供的 Task 组件,对 Swoole 的 Task 机制进行了封装及抽象,提供便捷的注解用法
## 开发调试
- [firstphp/wsdebug](https://github.com/lamplife/wsdebug) 通过 `WebSocket` 实时观测异常错误的开发调试组件
## 第三方 SDK
- [yurunsoft/pay-sdk](https://github.com/Yurunsoft/PaySDK) 支持 Swoole 协程的支付宝/微信支付SDK
- [yurunsoft/pay-sdk](https://github.com/Yurunsoft/PaySDK) 支持 Swoole 协程的支付宝/微信支付 SDK
- [yurunsoft/yurun-oauth-login](https://github.com/Yurunsoft/YurunOAuthLogin) 支持 Swoole 协程的第三方登录授权 SDKQQ、微信、微博、Github、Gitee等
- [overtrue/wechat](zh/sdks/wechat) EasyWeChat一个流行的微信 SDK非微信官方 SDK
- [Yurunsoft/PHPMailer-Swoole](https://github.com/Yurunsoft/PHPMailer-Swoole) Swoole环境下的 PHPMailer
- [overtrue/wechat](zh/sdks/wechat) EasyWeChat一个流行的非官方微信 SDK
- [Yurunsoft/PHPMailer-Swoole](https://github.com/Yurunsoft/PHPMailer-Swoole) Swoole 协程环境下的可用的 PHPMailer

View File

@ -1,5 +1,60 @@
# 版本更新记录
# v1.1.5 - 2019-11-07
## 新增
- [#812](https://github.com/hyperf/hyperf/pull/812) 新增计划任务在集群下仅执行一次的支持;
- [#820](https://github.com/hyperf/hyperf/pull/820) 新增 hyperf/nats 组件;
- [#832](https://github.com/hyperf/hyperf/pull/832) 新增 `Hyperf\Utils\Codec\Json`
- [#833](https://github.com/hyperf/hyperf/pull/833) 新增 `Hyperf\Utils\Backoff`
- [#852](https://github.com/hyperf/hyperf/pull/852) 为 `Hyperf\Utils\Parallel` 新增 `clear()` 方法来清理所有已添加的回调;
- [#854](https://github.com/hyperf/hyperf/pull/854) 新增 `Hyperf\GraphQL\GraphQLMiddleware` 用于解析 GraphQL 请求;
- [#859](https://github.com/hyperf/hyperf/pull/859) 新增 Consul 集群的支持,现在可以从 Consul 集群中拉取服务提供者的节点信息;
- [#873](https://github.com/hyperf/hyperf/pull/873) 新增 Redis 集群的客户端支持;
## 修复
- [#831](https://github.com/hyperf/hyperf/pull/831) 修复 Redis 客户端连接在 Redis Server 重启后不会自动重连的问题;
- [#835](https://github.com/hyperf/hyperf/pull/835) 修复 `Request::inputs` 方法的默认值参数与预期效果不一致的问题;
- [#841](https://github.com/hyperf/hyperf/pull/841) 修复数据库迁移在多数据库的情况下连接无效的问题;
- [#844](https://github.com/hyperf/hyperf/pull/844) 修复 Composer 阅读器不支持根命名空间的用法的问题;
- [#846](https://github.com/hyperf/hyperf/pull/846) 修复 Redis 客户端的 `scan`, `hScan`, `zScan`, `sScan` 无法使用的问题;
- [#850](https://github.com/hyperf/hyperf/pull/850) 修复 Logger group 在 name 一样时不生效的问题;
## 优化
- [#832](https://github.com/hyperf/hyperf/pull/832) 优化了 Response 对象在转 JSON 格式时的异常处理逻辑;
- [#840](https://github.com/hyperf/hyperf/pull/840) 使用 `\Swoole\Timer::*` 来替代 `swoole_timer_*` 函数;
- [#859](https://github.com/hyperf/hyperf/pull/859) 优化了 RPC 客户端去 Consul 获取健康的节点信息的逻辑;
# v1.1.4 - 2019-10-31
## 新增
- [#778](https://github.com/hyperf/hyperf/pull/778) `Hyperf\Testing\Client` 新增 `PUT``DELETE`方法。
- [#784](https://github.com/hyperf/hyperf/pull/784) 新增服务监控组件。
- [#795](https://github.com/hyperf/hyperf/pull/795) `AbstractProcess` 增加 `restartInterval` 参数,允许子进程异常或正常退出后,延迟重启。
- [#804](https://github.com/hyperf/hyperf/pull/804) `Command` 增加事件 `BeforeHandle` `AfterHandle``FailToHandle`
## 变更
- [#793](https://github.com/hyperf/hyperf/pull/793) `Pool::getConnectionsInChannel` 方法由 `protected` 改为 `public`.
- [#811](https://github.com/hyperf/hyperf/pull/811) 命令 `di:init-proxy` 不再主动清理代理缓存,如果想清理缓存请使用命令 `vendor/bin/init-proxy.sh`
## 修复
- [#779](https://github.com/hyperf/hyperf/pull/779) 修复 `JPG` 文件验证不通过的问题。
- [#787](https://github.com/hyperf/hyperf/pull/787) 修复 `db:seed` 参数 `--class` 多余,导致报错的问题。
- [#795](https://github.com/hyperf/hyperf/pull/795) 修复自定义进程在异常抛出后无法正常重启的BUG。
- [#796](https://github.com/hyperf/hyperf/pull/796) 修复 `etcd` 配置中心 `enable` 即时设为 `false`在项目启动时依然会拉取配置的BUG。
## 优化
- [#781](https://github.com/hyperf/hyperf/pull/781) 可以根据国际化组件配置发布验证器语言包到规定位置。
- [#796](https://github.com/hyperf/hyperf/pull/796) 优化 `ETCD` 客户端,不会多次创建 `HandlerStack`
- [#797](https://github.com/hyperf/hyperf/pull/797) 优化子进程重启
# v1.1.3 - 2019-10-24
## 新增

View File

@ -36,12 +36,35 @@ class ConfigProvider
'commands' => [],
// 与 commands 类似
'listeners' => [],
// 亦可继续定义其它配置,最终都会何必到与 ConfigInterface 对应的配置储存器中
// 组件默认配置文件,即执行命令后会把 source 的对应的文件复制为 destination 对应的的文件
'publish' => [
[
'id' => 'config',
'description' => 'description of this config file.', // 描述
// 建议默认配置放在 publish 文件夹中,文件命名和组件名称相同
'source' => __DIR__ . '/../publish/file.php', // 对应的配置文件路径
'destination' => BASE_PATH . '/config/autoload/file.php', // 复制为这个路径下的该文件
],
],
// 亦可继续定义其它配置,最终都会合并到与 ConfigInterface 对应的配置储存器中
];
}
}
```
## 默认配置文件说明
`ConfigProvider` 中定义好 `publish` 后,可以使用如下命令快速生成配置文件
```bash
php bin/hyperf.php vendor:publish 包名称
```
如包名称为 `hyperf/amqp`,可执行命令来生成 `amqp` 默认的配置文件
```bash
php bin/hyperf.php vendor:publish hyperf/amqp
```
只创建一个类并不会被 Hyperf 自动的加载,您仍需在组件的 `composer.json` 添加一些定义,告诉 Hyperf 这是一个 ConfigProvider 类需要被加载,您需要在组件内的 `composer.json` 文件内增加 `extra.hyperf.config` 配置,并指定对应的 `ConfigProvider` 类的命名空间,如下所示:
```json
@ -75,7 +98,7 @@ class ConfigProvider
- 所有类的设计都必须允许通过标准 `OOP` 的使用方式来使用,所有 Hyperf 专有的功能必须作为增强功能并以单独的类来提供,也就意味着在非 Hyperf 框架下仍能通过标准的手段来实现组件的使用;
- 组件的依赖设计如果可满足 [PSR 标准](https://www.php-fig.org/psr) 则优先满足且依赖对应的接口而不是实现类;如 [PSR 标准](https://www.php-fig.org/psr) 没有包含的功能,则可满足由 Hyperf 定义的契约库 [Hyperf/contract](https://github.com/hyperf/contract) 内的接口时优先满足且依赖对应的接口而不是实现类;
- 对于实现 Hyperf 专有功能所增加的增强功能类,通常来说也会对 Hyperf 的一些组件有依赖,那么这些组件的依赖不应该写在 `composer.json``require` 项,而是写在 `suggust` 项作为建议项存在;
- 对于实现 Hyperf 专有功能所增加的增强功能类,通常来说也会对 Hyperf 的一些组件有依赖,那么这些组件的依赖不应该写在 `composer.json``require` 项,而是写在 `suggest` 项作为建议项存在;
- 组件设计时不应该通过注解进行任何的依赖注入,注入方式应只使用 `构造函数注入` 的方式,这样同时也能满足在 `OOP` 下的使用;
- 组件设计时不应该通过注解进行任何的功能定义,功能定义应只通过 `ConfigProvider` 来定义;
- 类的设计时应尽可能的不储存状态数据,因为这会导致这个类不能作为长生命周期的对象来提供,也无法很方便的使用依赖注入功能,这样会在一定程度下降低性能,状态数据应都通过 `Hyperf\Utils\Context` 协程上下文来储存;

View File

@ -41,7 +41,7 @@ $db->connect($config, function ($db, $r) {
});
```
> 注意 `MySQL` 等异步模块已在[4.3.0](https://wiki.swoole.com/wiki/page/p-4.3.0.html)中移除,并转移到了[swoolw_async](https://github.com/swoole/ext-async)。
> 注意 `MySQL` 等异步模块已在[4.3.0](https://wiki.swoole.com/wiki/page/p-4.3.0.html)中移除,并转移到了[swoole_async](https://github.com/swoole/ext-async)。
从上面的代码片段可以看出,每一个操作几乎就需要一个回调函数,在复杂的业务场景中回调的层次感和代码结构绝对会让你崩溃,其实不难看出这样的写法有点类似 `JavaScript` 上的异步方法的写法,而 `JavaScript` 也为此提供了不少的解决方案(当然方案是源于其它编程语言),如 `Promise``yield + generator`, `async/await``Promise` 则是对回调的一种封装方式,而 `yield + generator``async/await` 则需要在代码上显性的增加一些代码语法标记,这些相对比回调函数来说,不妨都是一些非常不错的解决方案,但是你需要另花时间来理解它的实现机制和语法。
Swoole 协程也是对异步回调的一种解决方案,在 `PHP` 语言下,`Swoole` 协程与 `yield + generator` 都属于协程的解决方案,协程的解决方案可以使代码以近乎于同步代码的书写方式来书写异步代码,显性的区别则是 `yield + generator` 的协程机制下,每一处 `I/O` 操作的调用代码都需要在前面加上 `yield` 语法实现协程切换,每一层调用都需要加上,否则会出现意料之外的错误,而 `Swoole` 协程的解决方案对比于此就高明多了,在遇到 `I/O` 时底层自动的进行隐式协程切换,无需添加任何的额外语法,无需在代码前加上 `yield`,协程切换的过程无声无息,极大的减轻了维护异步系统的心智负担。

View File

@ -62,6 +62,7 @@ namespace App\Task;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Crontab\Annotation\Crontab;
use Hyperf\Di\Annotation\Inject;
/**
* @Crontab(name="Foo", rule="* * * * *", callback="execute", memo="这是一个示例的定时任务")

69
doc/zh/db/db.md Normal file
View File

@ -0,0 +1,69 @@
# 极简的DB组件
[hyperf/database](https://github.com/hyperf/database) 功能十分强大,但也不可否认效率上确实些许不足。这里提供一个极简的 `DB` 组件,支持 `PDO``SwooleMysql`
> 压测对比 database 1800qpsdb 6800qps。
## 组件配置
默认配置 `autoload/db.php` 如下,数据库支持多库配置,默认为 `default`
| 配置项 | 类型 | 默认值 | 备注 |
|:--------------------:|:------:|:------------------:|:--------------------------------:|
| driver | string | 无 | 数据库引擎 支持 `pdo``mysql` |
| host | string | `localhost` | 数据库地址 |
| port | int | 3306 | 数据库地址 |
| database | string | 无 | 数据库默认DB |
| username | string | 无 | 数据库用户名 |
| password | string | null | 数据库密码 |
| charset | string | utf8 | 数据库编码 |
| collation | string | utf8_unicode_ci | 数据库编码 |
| fetch_mode | int | `PDO::FETCH_ASSOC` | PDO查询结果集类型 |
| pool.min_connections | int | 1 | 连接池内最少连接数 |
| pool.max_connections | int | 10 | 连接池内最大连接数 |
| pool.connect_timeout | float | 10.0 | 连接等待超时时间 |
| pool.wait_timeout | float | 3.0 | 超时时间 |
| pool.heartbeat | int | -1 | 心跳 |
| pool.max_idle_time | float | 60.0 | 最大闲置时间 |
| options | array | | PDO 配置 |
## 组件支持的方法
具体接口可以查看 `Hyperf\DB\ConnectionInterface`
| 方法名 | 返回值类型 | 备注 |
|:----------------:|:----------:|:--------------------------------------:|
| beginTransaction | void | 开启事务 支持事务嵌套 |
| commit | void | 提交事务 支持事务嵌套 |
| rollBack | void | 回滚事务 支持事务嵌套 |
| insert | int | 插入数据返回主键ID非自称主键返回 0 |
| execute | int | 执行SQL返回受影响的行数 |
| query | array | 查询SQL |
| fetch | array | object|查询SQL返回首行数据 |
## 使用
### 使用DB实例
```php
<?php
use Hyperf\Utils\ApplicationContext;
use Hyperf\DB\DB;
$db = ApplicationContext::getContainer()->get(DB::class);
$res = $db->query('SELECT * FROM `user` WHERE gender = ?;',[1]);
```
### 使用静态方法
```php
<?php
use Hyperf\DB\DB;
$res = DB::query('SELECT * FROM `user` WHERE gender = ?;',[1]);
```

View File

@ -641,7 +641,7 @@ Db::table('users')->where('id', 1)->update(['votes' => 1]);
### 更新或者新增
有时您可能希望更新数据库中的现有记录,或者如果不存在匹配记录则创建它。 在这种情况下,可以使用 `updateOrInsert` 方法。 `updateOrInsert` 方法接受两个参数:一个用于查找记录的条件数组,以及一个包含要更记录的键值对数组。
有时您可能希望更新数据库中的现有记录,或者如果不存在匹配记录则创建它。 在这种情况下,可以使用 `updateOrInsert` 方法。 `updateOrInsert` 方法接受两个参数:一个用于查找记录的条件数组,以及一个包含要更记录的键值对数组。
`updateOrInsert` 方法将首先尝试使用第一个参数的键和值对来查找匹配的数据库记录。 如果记录存在,则使用第二个参数中的值去更新记录。 如果找不到记录,将插入一个新记录,更新的数据是两个数组的集合:

View File

@ -27,4 +27,4 @@ php bin/hyperf.php
gen:process Create a new process class
vendor
vendor:publish Publish any publishable configs from vendor packages.
```x
```

View File

@ -256,3 +256,66 @@ return [
```
如果您希望再进行更细粒度的日志切割,也可通过继承 `Monolog\Handler\RotatingFileHandler` 类并重新实现 `rotate()` 方法实现。
### 配置多个 `Handler`
用户可以修改 `handlers` 让对应日志组支持多个 `handler`。比如以下配置,当用户投递一个 `INFO` 级别以上的日志时,只会在 `hyperf.log` 中写入日志。
当用户投递一个 `DEBUG` 级别以上日志时,会在 `hyperf.log``hyperf-debug.log` 写入日志。
```php
<?php
declare(strict_types=1);
use Monolog\Handler;
use Monolog\Formatter;
use Monolog\Logger;
return [
'default' => [
'handlers' => [
[
'class' => Handler\StreamHandler::class,
'constructor' => [
'stream' => BASE_PATH . '/runtime/logs/hyperf.log',
'level' => Logger::INFO,
],
'formatter' => [
'class' => Formatter\LineFormatter::class,
'constructor' => [
'format' => null,
'dateFormat' => null,
'allowInlineLineBreaks' => true,
],
],
],
[
'class' => Handler\StreamHandler::class,
'constructor' => [
'stream' => BASE_PATH . '/runtime/logs/hyperf-debug.log',
'level' => Logger::DEBUG,
],
'formatter' => [
'class' => Formatter\JsonFormatter::class,
'constructor' => [
'batchMode' => Formatter\JsonFormatter::BATCH_MODE_JSON,
'appendNewline' => true,
],
],
],
],
],
];
```
结果如下
```
==> runtime/logs/hyperf.log <==
[2019-11-08 11:11:35] hyperf.INFO: 5dc4dce791690 [] []
==> runtime/logs/hyperf-debug.log <==
{"message":"5dc4dce791690","context":[],"level":200,"level_name":"INFO","channel":"hyperf","datetime":{"date":"2019-11-08 11:11:35.597153","timezone_type":3,"timezone":"Asia/Shanghai"},"extra":[]}
{"message":"xxxx","context":[],"level":100,"level_name":"DEBUG","channel":"hyperf","datetime":{"date":"2019-11-08 11:11:35.597635","timezone_type":3,"timezone":"Asia/Shanghai"},"extra":[]}
```

151
doc/zh/nats.md Normal file
View File

@ -0,0 +1,151 @@
# NATS
NATS是一个开源、轻量级、高性能的分布式消息中间件实现了高可伸缩性和优雅的 `Publish` / `Subscribe` 模型,使用 `Golang` 语言开发。NATS的开发哲学认为高质量的QoS应该在客户端构建故只建立了 `Request-Reply`,不提供 1.持久化 2.事务处理 3.增强的交付模式 4.企业级队列。
## 使用
### 创建消费者
```
$ php bin/hyperf.php gen:nats-consumer DemoConsumer
```
如果设置了 `queue`,则相同的 `subject` 只会被一个 `queue` 消费。若不设置 `queue`,则每个消费者都会受到消息。
```php
<?php
declare(strict_types=1);
namespace App\Nats\Consumer;
use Hyperf\Nats\AbstractConsumer;
use Hyperf\Nats\Annotation\Consumer;
use Hyperf\Nats\Message;
/**
* @Consumer(subject="hyperf.demo", queue="hyperf.demo", name="DemoConsumer", nums=1)
*/
class DemoConsumer extends AbstractConsumer
{
public function consume(Message $payload)
{
// Do something...
}
}
```
### 投递消息
使用 publish 投递消息。
```php
<?php
declare(strict_types=1);
namespace App\Controller;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\AutoController;
use Hyperf\Nats\Driver\DriverInterface;
/**
* @AutoController(prefix="nats")
*/
class NatsController extends Controller
{
/**
* @Inject
* @var DriverInterface
*/
protected $nats;
public function publish()
{
$res = $this->nats->publish('hyperf.demo', [
'id' => 'Hyperf',
]);
return $this->response->success($res);
}
}
```
使用 request 投递消息。
```php
<?php
declare(strict_types=1);
namespace App\Controller;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\AutoController;
use Hyperf\Nats\Driver\DriverInterface;
use Hyperf\Nats\Message;
/**
* @AutoController(prefix="nats")
*/
class NatsController extends Controller
{
/**
* @Inject
* @var DriverInterface
*/
protected $nats;
public function request()
{
$res = $this->nats->request('hyperf.reply', [
'id' => 'limx',
], function (Message $payload) {
var_dump($payload->getBody());
});
return $this->response->success($res);
}
}
```
使用 requestSync 投递消息。
```php
<?php
declare(strict_types=1);
namespace App\Controller;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\AutoController;
use Hyperf\Nats\Driver\DriverInterface;
use Hyperf\Nats\Message;
/**
* @AutoController(prefix="nats")
*/
class NatsController extends Controller
{
/**
* @Inject
* @var DriverInterface
*/
protected $nats;
public function sync()
{
/** @var Message $message */
$message = $this->nats->requestSync('hyperf.reply', [
'id' => 'limx',
]);
return $this->response->success($message->getBody());
}
}
```

View File

@ -8,12 +8,17 @@ composer require hyperf/redis
## 配置
| 配置项 | 类型 | 默认值 | 备注 |
|:------:|:-------:|:-----------:|:---------:|
| host | string | 'localhost' | Redis地址 |
| auth | string | 无 | 密码 |
| port | integer | 6379 | 端口 |
| db | integer | 0 | DB |
| 配置项 | 类型 | 默认值 | 备注 |
|:--------------:|:-------:|:-----------:|:------------------------------:|
| host | string | 'localhost' | Redis地址 |
| auth | string | 无 | 密码 |
| port | integer | 6379 | 端口 |
| db | integer | 0 | DB |
| cluster.enable | boolean | false | 是否集群模式 |
| cluster.name | string | null | 集群名 |
| cluster.seeds | array | [] | 集群连接地址数组 ['host:port'] |
| pool | object | {} | 连接池配置 |
| options | object | {} | Redis 配置选项 |
```php
<?php
@ -23,6 +28,11 @@ return [
'auth' => env('REDIS_AUTH', ''),
'port' => (int) env('REDIS_PORT', 6379),
'db' => (int) env('REDIS_DB', 0),
'cluster' => [
'enable' => (bool) env('REDIS_CLUSTER_ENABLE', false),
'name' => null,
'seeds' => [],
],
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
@ -63,6 +73,11 @@ return [
'auth' => env('REDIS_AUTH', ''),
'port' => (int) env('REDIS_PORT', 6379),
'db' => (int) env('REDIS_DB', 0),
'cluster' => [
'enable' => (bool) env('REDIS_CLUSTER_ENABLE', false),
'name' => null,
'seeds' => [],
],
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
@ -128,3 +143,121 @@ $redis = $container->get(RedisFactory::class)->get('foo');
$result = $redis->keys('*');
```
## 集群模式
### 使用 `name`
配置 `cluster`,修改修改 `redis.ini`,也可以修改 `Dockerfile` 如下
```
# - config PHP
&& { \
echo "upload_max_filesize=100M"; \
echo "post_max_size=108M"; \
echo "memory_limit=1024M"; \
echo "date.timezone=${TIMEZONE}"; \
echo "redis.clusters.seeds = \"mycluster[]=localhost:7000&mycluster[]=localhost:7001\""; \
echo "redis.clusters.timeout = \"mycluster=5\""; \
echo "redis.clusters.read_timeout = \"mycluster=10\""; \
echo "redis.clusters.auth = \"mycluster=password\"";
} | tee conf.d/99-overrides.ini \
```
对应 PHP 配置如下
```php
<?php
// 省略其他配置
return [
'default' => [
'cluster' => [
'enable' => true,
'name' => 'mycluster',
'seeds' => [],
],
],
];
```
### 使用 seeds
当然不配置 name 直接使用 seeds 也是可以的。如下
```php
<?php
// 省略其他配置
return [
'default' => [
'cluster' => [
'enable' => true,
'name' => null,
'seeds' => [
'192.168.1.110:6379',
'192.168.1.111:6379',
],
],
],
];
```
## Options
用户可以修改 `options`,来设置 `Redis` 配置选项。
例如修改 `Redis` 序列化为 `PHP` 序列化。
```php
<?php
declare(strict_types=1);
return [
'default' => [
'host' => env('REDIS_HOST', 'localhost'),
'auth' => env('REDIS_AUTH', null),
'port' => (int) env('REDIS_PORT', 6379),
'db' => (int) env('REDIS_DB', 0),
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => (float) env('REDIS_MAX_IDLE_TIME', 60),
],
'options' => [
Redis::OPT_SERIALIZER => Redis::SERIALIZER_PHP,
],
],
];
```
比如设置 `Redis` 永不超时
```php
<?php
declare(strict_types=1);
return [
'default' => [
'host' => env('REDIS_HOST', 'localhost'),
'auth' => env('REDIS_AUTH', null),
'port' => (int) env('REDIS_PORT', 6379),
'db' => (int) env('REDIS_DB', 0),
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => (float) env('REDIS_MAX_IDLE_TIME', 60),
],
'options' => [
Redis::OPT_READ_TIMEOUT => -1,
],
],
];
```
> 有的 `phpredis` 扩展版本,`option` 的 `value` 必须是 `string` 类型。

View File

@ -68,7 +68,7 @@ Router::addRoute(['GET', 'POST','PUT','DELETE'], $uri, $callback);
#### 路由组的定义方式
实际路由为 `gourp/route`, 即 `/user/index`, `/user/store`, `/user/update`, `/user/delete`
实际路由为 `group/route`, 即 `/user/index`, `/user/store`, `/user/update`, `/user/delete`
```php
Router::addGroup('/user/',function (){

View File

@ -25,7 +25,7 @@ use Hyperf\RpcServer\Annotation\RpcService;
class CalculatorService implements CalculatorServiceInterface
{
// 实现一个加法方法,这里简单的认为参数都是 int 类型
public function caculate(int $a, int $b): int
public function calculate(int $a, int $b): int
{
// 这里是服务方法的具体实现
return $a + $b;

View File

@ -49,6 +49,7 @@
* [模型事件](zh/db/event.md)
* [模型缓存](zh/db/model-cache.md)
* [数据库迁移](zh/db/migration.md)
* [极简的DB组件](zh/db/db.md)
* 微服务
@ -83,6 +84,7 @@
* [Task 机制](zh/task.md)
* [枚举类](zh/constants.md)
* [Snowflake](zh/snowflake.md)
* [Nats](zh/nats.md)
* 应用部署

View File

@ -1,19 +1,32 @@
# Swoole Tracker
[Swoole Tracker](https://www.swoole-cloud.com/tracker.html) 作为 `Swoole` 官方出品的一整套企业级 `PHP``Swoole`分析调试工具更专一、更专业。曾命名Swoole Enterprise
[Swoole Tracker](https://www.swoole-cloud.com/tracker.html)是 Swoole 官方推出的一整套企业级包括 PHP 和 Swoole 分析调试工具以及应用性能管理APM平台针对常规的 FPM 和 Swoole 常驻进程的业务提供全面的性能监控、分析和调试的解决方案。曾命名Swoole Enterprise
Swoole Tracker 能够帮助企业自动分析并汇总统计关键系统调用并智能准确的定位到具体的 PHP 业务代码,实现业务应用性能最优化、强大的调试工具链为企业业务保驾护航、提高 IT 生产效率。
- 时刻掌握应用架构模型
> 自动发现应用依赖拓扑结构和展示,时刻掌握应用的架构模型
- 分布式跨应用链路追踪
> 支持无侵入的分布式跨应用链路追踪,让每个请求一目了然,全面支持协程/非协程环境,数据实时可视化
- 全面分析报告服务状况
> 各种维度统计服务上报的调用信息, 比如总流量、平均耗时、超时率等,并全面分析报告服务状况
- 拥有强大的调试工具链
> 本系统支持远程调试,可在系统后台远程开启检测内存泄漏、阻塞检测和代码性能分析
> 本系统支持远程调试,可在系统后台远程开启检测内存泄漏、阻塞检测、代码性能分析和查看调用栈;也支持手动埋点进行调试,后台统一查看结果
- 同时支持FPM和Swoole
> 完美支持PHP-FPM环境不仅限于在Swoole中使用
- 完善的系统监控
> 支持完善的系统监控零成本部署监控机器的CPU、内存、网络、磁盘等资源可以很方便的集成到现有报警系统
- 零成本接入系统
> 本系统的客户端提供脚本可一键部署服务端可在Docker环境中运行简单快捷
- 一键安装和零成本接入
> 规避与减小整体投资风险本系统的客户端提供脚本可一键部署服务端可在Docker环境中运行简单快捷
- 提高各部门生产效率
> 在复杂系统中追踪服务及代码层级性能瓶颈帮助IT、开发等部门提升工作效率将重点聚焦在核心工作中
## 安装
@ -46,7 +59,7 @@ extension=/opt/swoole_tracker.so
apm.enable=1 #打开总开关
apm.sampling_rate=100 #采样率 例如100%
# 支持远程调试;需要手动埋点时再添加
# 开启内存泄漏检测时需要添加
apm.enable_memcheck=1 #开启内存泄漏检测 默认0 关闭状态
```

View File

@ -201,3 +201,202 @@ class UserTest extends HttpTestCase
```
composer test -- --filter=testUserDaoFirst
```
## 测试替身
Gerard Meszaros 在 Meszaros2007 中介绍了测试替身的概念:
有时候对被测系统(SUT)进行测试是很困难的,因为它依赖于其他无法在测试环境中使用的组件。这有可能是因为这些组件不可用,它们不会返回测试所需要的结果,或者执行它们会有不良副作用。在其他情况下,我们的测试策略要求对被测系统的内部行为有更多控制或更多可见性。
如果在编写测试时无法使用(或选择不使用)实际的依赖组件(DOC),可以用测试替身来代替。测试替身不需要和真正的依赖组件有完全一样的的行为方式;他只需要提供和真正的组件同样的 API 即可,这样被测系统就会以为它是真正的组件!
下面展示分别通过构造函数注入依赖、通过inject注释注入依赖的测试替身
### 构造函数注入依赖的测试替身
```
<?php
namespace App\Logic;
use App\Api\DemoApi;
class DemoLogic
{
/**
* @var DemoApi $demoApi
*/
private $demoApi;
public function __construct(DemoApi $demoApi)
{
$this->demoApi = $demoApi;
}
public function test()
{
$result = $this->demoApi->test();
return $result;
}
}
```
```
<?php
namespace App\Api;
class DemoApi
{
public function test()
{
return [
'status' => 1
];
}
}
```
```
<?php
namespace HyperfTest\Cases;
use App\Api\DemoApi;
use App\Logic\DemoLogic;
use Hyperf\Di\Container;
use HyperfTest\HttpTestCase;
use Mockery;
class DemoLogicTest extends HttpTestCase
{
public function tearDown()
{
Mockery::close();
}
public function testIndex()
{
$res = $this->getContainer()->get(DemoLogic::class)->test();
$this->assertEquals(1, $res['status']);
}
/**
* @return Container
*/
protected function getContainer()
{
$container = Mockery::mock(Container::class);
$apiStub = $this->createMock(DemoApi::class);
$apiStub->method('test')->willReturn([
'status' => 1,
]);
$container->shouldReceive('get')->with(DemoLogic::class)->andReturn(new DemoLogic($apiStub));
return $container;
}
}
```
### 通过inject注释注入依赖的测试替身
```
<?php
namespace App\Logic;
use App\Api\DemoApi;
use Hyperf\Di\Annotation\Inject;
class DemoLogic
{
/**
* @var DemoApi $demoApi
* @Inject()
*/
private $demoApi;
public function test()
{
$result = $this->demoApi->test();
return $result;
}
}
```
```
<?php
namespace App\Api;
class DemoApi
{
public function test()
{
return [
'status' => 1
];
}
}
```
```
<?php
namespace HyperfTest\Cases;
use App\Api\DemoApi;
use App\Logic\DemoLogic;
use Hyperf\Di\Container;
use Hyperf\Utils\ApplicationContext;
use HyperfTest\HttpTestCase;
use Mockery;
class DemoLogicTest extends HttpTestCase
{
public function tearDown()
{
Mockery::close();
}
public function testIndex()
{
$this->getContainer();
$res = $this->getContainer()->get(DemoLogic::class)->test();
$this->assertEquals(11, $res['status']);
}
/**
* @return Container
*/
protected function getContainer()
{
$container = ApplicationContext::getContainer();
$apiStub = $this->createMock(DemoApi::class);
$apiStub->method('test')->willReturn([
'status' => 11
]);
$container->getDefinitionSource()->addDefinition(DemoApi::class, function () use ($apiStub) {
return $apiStub;
});
return $container;
}
}
```

View File

@ -67,15 +67,15 @@ return [
'tracer' => [
// Zipkin 配置
'staging_zipkin' => [
'driver' => Hyperf\Tracer\Adapter\ZipkinTracerFactory::class,
'driver' => \Hyperf\Tracer\Adapter\ZipkinTracerFactory::class,
],
// 另一套 Zipkin 配置
'producton_zipkin' => [
'driver' => Hyperf\Tracer\Adapter\ZipkinTracerFactory::class,
'driver' => \Hyperf\Tracer\Adapter\ZipkinTracerFactory::class,
],
// Jaeger 配置
'jaeger' => [
'driver' => Hyperf\Tracer\Adapter\JaegerTracerFactory::class,
'driver' => \Hyperf\Tracer\Adapter\JaegerTracerFactory::class,
],
]
];
@ -109,6 +109,7 @@ return [
'ipv6' => null,
'port' => 9501,
],
'driver' => \Hyperf\Tracer\Adapter\ZipkinTracerFactory::class,
'options' => [
// Zipkin 服务的 endpoint 地址
'endpoint_url' => env('ZIPKIN_ENDPOINT_URL', 'http://localhost:9411/api/v2/spans'),

View File

@ -17,6 +17,7 @@
<directory suffix="Test.php">./src/constants/tests</directory>
<directory suffix="Test.php">./src/consul/tests</directory>
<directory suffix="Test.php">./src/database/tests</directory>
<directory suffix="Test.php">./src/db/tests</directory>
<directory suffix="Test.php">./src/db-connection/tests</directory>
<directory suffix="Test.php">./src/di/tests</directory>
<directory suffix="Test.php">./src/dispatcher/tests</directory>

View File

@ -23,6 +23,7 @@ use Hyperf\Crontab\Mutex\ServerMutex;
use Hyperf\Crontab\Mutex\TaskMutex;
use Hyperf\Utils\Coroutine;
use Psr\Container\ContainerInterface;
use Swoole\Timer;
class Executor
{
@ -111,7 +112,7 @@ class Executor
};
break;
}
$callback && swoole_timer_after($diff > 0 ? $diff * 1000 : 1, $callback);
$callback && Timer::after($diff > 0 ? $diff * 1000 : 1, $callback);
}
protected function runInSingleton(Crontab $crontab, Closure $runnable): Closure

View File

@ -14,6 +14,7 @@ return [
'default' => [
'driver' => env('DB_DRIVER', 'mysql'),
'host' => env('DB_HOST', 'localhost'),
'port' => env('DB_PORT', '3306'),
'database' => env('DB_DATABASE', 'hyperf'),
'username' => env('DB_USERNAME', 'root'),
'password' => env('DB_PASSWORD', ''),

View File

@ -64,6 +64,7 @@ class ConnectionResolver implements ConnectionResolverInterface
if (! $connection instanceof ConnectionInterface) {
$pool = $this->factory->getPool($name);
// When Mysql connect failed, it will be catched by `Hyperf\Database\Connectors\ConnectionFactory`.
$connection = $pool->get()->getConnection();
Context::set($id, $connection);
if (Coroutine::inCoroutine()) {

1
src/db/.gitattributes vendored Normal file
View File

@ -0,0 +1 @@
/tests export-ignore

21
src/db/LICENSE Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) Hyperf
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

49
src/db/composer.json Normal file
View File

@ -0,0 +1,49 @@
{
"name": "hyperf/db",
"type": "library",
"license": "MIT",
"keywords": [
"php",
"hyperf"
],
"description": "",
"autoload": {
"psr-4": {
"Hyperf\\DB\\": "src/"
}
},
"autoload-dev": {
"psr-4": {
"HyperfTest\\DB\\": "tests/"
}
},
"require": {
"php": ">=7.2",
"ext-swoole": ">=4.4",
"hyperf/config": "~1.1.0",
"hyperf/contract": "~1.1.0",
"hyperf/pool": "~1.1.0",
"hyperf/utils": "~1.1.0",
"psr/container": "^1.0"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^2.14",
"phpstan/phpstan": "^0.10.5",
"hyperf/testing": "1.1.*",
"mockery/mockery": "^1.0",
"swoft/swoole-ide-helper": "dev-master"
},
"config": {
"sort-packages": true
},
"scripts": {
"test": "co-phpunit -c phpunit.xml --colors=always",
"analyze": "phpstan analyse --memory-limit 300M -l 0 ./src",
"cs-fix": "php-cs-fixer fix $1"
},
"extra": {
"hyperf": {
"config": "Hyperf\\DB\\ConfigProvider"
}
}
}

40
src/db/publish/db.php Normal file
View File

@ -0,0 +1,40 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
return [
'default' => [
'driver' => 'pdo',
'host' => env('DB_HOST', 'localhost'),
'port' => env('DB_PORT', 3306),
'database' => env('DB_DATABASE', 'hyperf'),
'username' => env('DB_USERNAME', 'root'),
'password' => env('DB_PASSWORD', ''),
'charset' => env('DB_CHARSET', 'utf8mb4'),
'collation' => env('DB_COLLATION', 'utf8mb4_unicode_ci'),
'fetch_mode' => PDO::FETCH_ASSOC,
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => (float) env('DB_MAX_IDLE_TIME', 60),
],
'options' => [
PDO::ATTR_CASE => PDO::CASE_NATURAL,
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_ORACLE_NULLS => PDO::NULL_NATURAL,
PDO::ATTR_STRINGIFY_FETCHES => false,
PDO::ATTR_EMULATE_PREPARES => false,
],
],
];

View File

@ -0,0 +1,75 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Pool\Connection;
use Hyperf\Pool\Exception\ConnectionException;
abstract class AbstractConnection extends Connection implements ConnectionInterface
{
use DetectsLostConnections;
use ManagesTransactions;
/**
* @var array
*/
protected $config = [];
public function getConfig(): array
{
return $this->config;
}
public function release(): void
{
if ($this->transactionLevel() > 0) {
$this->rollBack(0);
if ($this->container->has(StdoutLoggerInterface::class)) {
$logger = $this->container->get(StdoutLoggerInterface::class);
$logger->error('Maybe you\'ve forgotten to commit or rollback the MySQL transaction.');
}
}
$this->pool->release($this);
}
public function getActiveConnection()
{
if ($this->check()) {
return $this;
}
if (! $this->reconnect()) {
throw new ConnectionException('Connection reconnect failed.');
}
return $this;
}
public function retry(\Throwable $throwable, $name, $arguments)
{
if ($this->causedByLostConnection($throwable)) {
try {
$this->reconnect();
return $this->{$name}(...$arguments);
} catch (\Throwable $throwable) {
if ($this->container->has(StdoutLoggerInterface::class)) {
$logger = $this->container->get(StdoutLoggerInterface::class);
$logger->error('Connection execute retry failed. message = ' . $throwable->getMessage());
}
}
}
throw $throwable;
}
}

View File

@ -0,0 +1,41 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB;
class ConfigProvider
{
public function __invoke(): array
{
return [
'dependencies' => [
],
'commands' => [
],
'annotations' => [
'scan' => [
'paths' => [
__DIR__,
],
],
],
'publish' => [
[
'id' => 'db',
'description' => 'The config for db.',
'source' => __DIR__ . '/../publish/db.php',
'destination' => BASE_PATH . '/config/autoload/db.php',
],
],
];
}
}

View File

@ -0,0 +1,64 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB;
interface ConnectionInterface
{
/**
* Start a new database transaction.
*/
public function beginTransaction(): void;
/**
* Commit the active database transaction.
*/
public function commit(): void;
/**
* Rollback the active database transaction.
*/
public function rollBack(?int $toLevel = null): void;
/**
* Run an insert statement against the database.
*
* @return int last insert id
*/
public function insert(string $query, array $bindings = []): int;
/**
* Run an execute statement against the database.
*
* @return int affected rows
*/
public function execute(string $query, array $bindings = []): int;
/**
* Execute an SQL statement and return the number of affected rows.
*
* @return int affected rows
*/
public function exec(string $sql): int;
/**
* Run a select statement against the database.
*/
public function query(string $query, array $bindings = []): array;
/**
* Run a select statement and return a single result.
*/
public function fetch(string $query, array $bindings = []);
public function call(string $method, array $argument = []);
}

118
src/db/src/DB.php Normal file
View File

@ -0,0 +1,118 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB;
use Hyperf\DB\Pool\PoolFactory;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Utils\Context;
use Throwable;
/**
* @method beginTransaction()
* @method commit()
* @method rollback()
* @method insert(string $query, array $bindings = [])
* @method execute(string $query, array $bindings = [])
* @method query(string $query, array $bindings = [])
* @method fetch(string $query, array $bindings = [])
*/
class DB
{
/**
* @var PoolFactory
*/
protected $factory;
/**
* @var string
*/
protected $poolName;
public function __construct(PoolFactory $factory, string $poolName = 'default')
{
$this->factory = $factory;
$this->poolName = $poolName;
}
public function __call($name, $arguments)
{
$hasContextConnection = Context::has($this->getContextKey());
$connection = $this->getConnection($hasContextConnection);
try {
$connection = $connection->getConnection();
$result = $connection->{$name}(...$arguments);
} catch (Throwable $exception) {
$result = $connection->retry($exception, $name, $arguments);
} finally {
if (! $hasContextConnection) {
if ($this->shouldUseSameConnection($name)) {
// Should storage the connection to coroutine context, then use defer() to release the connection.
Context::set($this->getContextKey(), $connection);
defer(function () use ($connection) {
$connection->release();
});
} else {
// Release the connection after command executed.
$connection->release();
}
}
}
return $result;
}
public static function __callStatic($name, $arguments)
{
$container = ApplicationContext::getContainer();
$db = $container->get(static::class);
return $db->{$name}(...$arguments);
}
/**
* Define the commands that needs same connection to execute.
* When these commands executed, the connection will storage to coroutine context.
*/
protected function shouldUseSameConnection(string $methodName): bool
{
return in_array($methodName, [
'beginTransaction',
'commit',
'rollBack',
]);
}
/**
* Get a connection from coroutine context, or from mysql connectio pool.
*/
protected function getConnection(bool $hasContextConnection): AbstractConnection
{
$connection = null;
if ($hasContextConnection) {
$connection = Context::get($this->getContextKey());
}
if (! $connection instanceof AbstractConnection) {
$pool = $this->factory->getPool($this->poolName);
$connection = $pool->get();
}
return $connection;
}
/**
* The key to identify the connection object in coroutine context.
*/
private function getContextKey(): string
{
return sprintf('db.connection.%s', $this->poolName);
}
}

View File

@ -0,0 +1,49 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB;
use Hyperf\Utils\Str;
use Throwable;
trait DetectsLostConnections
{
/**
* Determine if the given exception was caused by a lost connection.
*/
protected function causedByLostConnection(Throwable $e): bool
{
$message = $e->getMessage();
return Str::contains($message, [
'server has gone away',
'no connection to the server',
'Lost connection',
'is dead or not enabled',
'Error while sending',
'decryption failed or bad record mac',
'server closed the connection unexpectedly',
'SSL connection has been closed unexpectedly',
'Error writing data to the connection',
'Resource deadlock avoided',
'Transaction() on null',
'child connection forced to terminate due to client_idle_limit',
'query_wait_timeout',
'reset by peer',
'Physical connection is not usable',
'TCP Provider: Error code 0x68',
'Name or service not known',
'ORA-03114',
'Packets out of order. Expected',
]);
}
}

View File

@ -0,0 +1,17 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB\Exception;
class DriverNotFoundException extends RuntimeException
{
}

View File

@ -0,0 +1,17 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB\Exception;
class QueryException extends \PDOException
{
}

View File

@ -0,0 +1,17 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB\Exception;
class RuntimeException extends \RuntimeException
{
}

19
src/db/src/Frequency.php Normal file
View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB;
use Hyperf\Pool\Frequency as DefaultFrequency;
class Frequency extends DefaultFrequency
{
}

View File

@ -0,0 +1,172 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB;
use Throwable;
trait ManagesTransactions
{
/**
* The number of active transactions.
*
* @var int
*/
protected $transactions = 0;
/**
* Start a new database transaction.
* @throws Throwable
*/
public function beginTransaction(): void
{
$this->createTransaction();
++$this->transactions;
}
/**
* Commit the active database transaction.
*/
public function commit(): void
{
if ($this->transactions == 1) {
$this->call('commit');
}
$this->transactions = max(0, $this->transactions - 1);
}
/**
* Rollback the active database transaction.
*
* @throws Throwable
*/
public function rollBack(?int $toLevel = null): void
{
// We allow developers to rollback to a certain transaction level. We will verify
// that this given transaction level is valid before attempting to rollback to
// that level. If it's not we will just return out and not attempt anything.
$toLevel = is_null($toLevel)
? $this->transactions - 1
: $toLevel;
if ($toLevel < 0 || $toLevel >= $this->transactions) {
return;
}
// Next, we will actually perform this rollback within this database and fire the
// rollback event. We will also set the current transaction level to the given
// level that was passed into this method so it will be right from here out.
try {
$this->performRollBack($toLevel);
} catch (Throwable $e) {
$this->handleRollBackException($e);
}
$this->transactions = $toLevel;
}
/**
* Get the number of active transactions.
*/
public function transactionLevel(): int
{
return $this->transactions;
}
/**
* Create a transaction within the database.
*/
protected function createTransaction(): void
{
if ($this->transactions == 0) {
try {
$this->call('beginTransaction');
} catch (Throwable $e) {
$this->handleBeginTransactionException($e);
}
} elseif ($this->transactions >= 1) {
$this->createSavepoint();
}
}
/**
* Create a save point within the database.
*/
protected function createSavepoint()
{
$this->exec(
$this->compileSavepoint('trans' . ($this->transactions + 1))
);
}
/**
* Handle an exception from a transaction beginning.
*
* @throws Throwable
*/
protected function handleBeginTransactionException(Throwable $e)
{
if ($this->causedByLostConnection($e)) {
$this->reconnect();
$this->call('beginTransaction');
} else {
throw $e;
}
}
/**
* Perform a rollback within the database.
*/
protected function performRollBack(int $toLevel)
{
if ($toLevel == 0) {
$this->call('rollBack');
} else {
$this->exec(
$this->compileSavepointRollBack('trans' . ($toLevel + 1))
);
}
}
/**
* Handle an exception from a rollback.
*
* @throws Throwable
*/
protected function handleRollBackException(Throwable $e)
{
if ($this->causedByLostConnection($e)) {
$this->transactions = 0;
}
throw $e;
}
/**
* Compile the SQL statement to define a savepoint.
*/
protected function compileSavepoint(string $name): string
{
return 'SAVEPOINT ' . $name;
}
/**
* Compile the SQL statement to execute a savepoint rollback.
*/
protected function compileSavepointRollBack(string $name): string
{
return 'ROLLBACK TO SAVEPOINT ' . $name;
}
}

View File

@ -0,0 +1,167 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB;
use Hyperf\DB\Exception\RuntimeException;
use Hyperf\Pool\Pool;
use Psr\Container\ContainerInterface;
use Swoole\Coroutine\MySQL;
use Swoole\Coroutine\MySQL\Statement;
class MySQLConnection extends AbstractConnection
{
/**
* @var MySQL
*/
protected $connection;
/**
* @var array
*/
protected $config = [
'driver' => 'pdo',
'host' => 'localhost',
'port' => 3306,
'database' => 'hyperf',
'username' => 'root',
'password' => '',
'charset' => 'utf8mb4',
'collation' => 'utf8mb4_unicode_ci',
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => 60.0,
],
];
public function __construct(ContainerInterface $container, Pool $pool, array $config)
{
parent::__construct($container, $pool);
$this->config = array_replace_recursive($this->config, $config);
$this->reconnect();
}
public function __call($name, $arguments)
{
return $this->connection->{$name}(...$arguments);
}
/**
* Reconnect the connection.
*/
public function reconnect(): bool
{
$connection = new MySQL();
$connection->connect([
'host' => $this->config['host'],
'port' => $this->config['port'],
'user' => $this->config['username'],
'password' => $this->config['password'],
'database' => $this->config['database'],
'timeout' => $this->config['pool']['connect_timeout'],
'charset' => $this->config['charset'],
'fetch_mode' => true,
]);
$this->connection = $connection;
$this->lastUseTime = microtime(true);
return true;
}
/**
* Close the connection.
*/
public function close(): bool
{
unset($this->connection);
return true;
}
public function insert(string $query, array $bindings = []): int
{
$statement = $this->prepare($query);
$statement->execute($bindings);
return $statement->insert_id;
}
public function execute(string $query, array $bindings = []): int
{
$statement = $this->prepare($query);
$statement->execute($bindings);
return $statement->affected_rows;
}
public function exec(string $sql): int
{
$res = $this->connection->query($sql);
if ($res === false) {
throw new RuntimeException($this->connection->error);
}
return $this->connection->affected_rows;
}
public function query(string $query, array $bindings = []): array
{
// For select statements, we'll simply execute the query and return an array
// of the database result set. Each element in the array will be a single
// row from the database table, and will either be an array or objects.
$statement = $this->prepare($query);
$statement->execute($bindings);
return $statement->fetchAll();
}
public function fetch(string $query, array $bindings = [])
{
$records = $this->query($query, $bindings);
return array_shift($records);
}
public function call(string $method, array $argument = [])
{
$timeout = $this->config['pool']['wait_timeout'];
switch ($method) {
case 'beginTransaction':
return $this->connection->begin($timeout);
case 'rollBack':
return $this->connection->rollback($timeout);
case 'commit':
return $this->connection->commit($timeout);
}
return $this->connection->{$method}(...$argument);
}
protected function prepare(string $query): Statement
{
$statement = $this->connection->prepare($query);
if ($statement === false) {
throw new RuntimeException($this->connection->error);
}
return $statement;
}
}

View File

@ -0,0 +1,176 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB;
use Hyperf\Pool\Exception\ConnectionException;
use Hyperf\Pool\Pool;
use PDO;
use PDOStatement;
use Psr\Container\ContainerInterface;
class PDOConnection extends AbstractConnection
{
/**
* @var PDO
*/
protected $connection;
/**
* @var array
*/
protected $config = [
'driver' => 'pdo',
'host' => 'localhost',
'port' => 3306,
'database' => 'hyperf',
'username' => 'root',
'password' => '',
'charset' => 'utf8mb4',
'collation' => 'utf8mb4_unicode_ci',
'fetch_mode' => PDO::FETCH_ASSOC,
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => 60.0,
],
'options' => [
PDO::ATTR_CASE => PDO::CASE_NATURAL,
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_ORACLE_NULLS => PDO::NULL_NATURAL,
PDO::ATTR_STRINGIFY_FETCHES => false,
PDO::ATTR_EMULATE_PREPARES => false,
],
];
/**
* Current mysql database.
* @var null|int
*/
protected $database;
public function __construct(ContainerInterface $container, Pool $pool, array $config)
{
parent::__construct($container, $pool);
$this->config = array_replace_recursive($this->config, $config);
$this->reconnect();
}
public function __call($name, $arguments)
{
return $this->connection->{$name}(...$arguments);
}
/**
* Reconnect the connection.
*/
public function reconnect(): bool
{
$host = $this->config['host'];
$dbName = $this->config['database'];
$username = $this->config['username'];
$password = $this->config['password'];
$dsn = "mysql:host={$host};dbname={$dbName}";
try {
$pdo = new \PDO($dsn, $username, $password, $this->config['options']);
} catch (\Throwable $e) {
throw new ConnectionException('Connection reconnect failed.:' . $e->getMessage());
}
$this->connection = $pdo;
$this->lastUseTime = microtime(true);
return true;
}
/**
* Close the connection.
*/
public function close(): bool
{
unset($this->connection);
return true;
}
public function query(string $query, array $bindings = []): array
{
// For select statements, we'll simply execute the query and return an array
// of the database result set. Each element in the array will be a single
// row from the database table, and will either be an array or objects.
$statement = $this->connection->prepare($query);
$this->bindValues($statement, $bindings);
$statement->execute();
$fetchModel = $this->config['fetch_mode'];
return $statement->fetchAll($fetchModel);
}
public function fetch(string $query, array $bindings = [])
{
$records = $this->query($query, $bindings);
return array_shift($records);
}
public function execute(string $query, array $bindings = []): int
{
$statement = $this->connection->prepare($query);
$this->bindValues($statement, $bindings);
$statement->execute();
return $statement->rowCount();
}
public function exec(string $sql): int
{
return $this->connection->exec($sql);
}
public function insert(string $query, array $bindings = []): int
{
$statement = $this->connection->prepare($query);
$this->bindValues($statement, $bindings);
$statement->execute();
return (int) $this->connection->lastInsertId();
}
public function call(string $method, array $argument = [])
{
return $this->connection->{$method}(...$argument);
}
/**
* Bind values to their parameters in the given statement.
*/
protected function bindValues(PDOStatement $statement, array $bindings): void
{
foreach ($bindings as $key => $value) {
$statement->bindValue(
is_string($key) ? $key : $key + 1,
$value,
is_int($value) ? PDO::PARAM_INT : PDO::PARAM_STR
);
}
}
}

View File

@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB\Pool;
use Hyperf\Contract\ConnectionInterface;
use Hyperf\DB\MySQLConnection;
class MySQLPool extends Pool
{
protected function createConnection(): ConnectionInterface
{
return new MySQLConnection($this->container, $this, $this->config);
}
}

View File

@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB\Pool;
use Hyperf\Contract\ConnectionInterface;
use Hyperf\DB\PDOConnection;
class PDOPool extends Pool
{
protected function createConnection(): ConnectionInterface
{
return new PDOConnection($this->container, $this, $this->config);
}
}

58
src/db/src/Pool/Pool.php Normal file
View File

@ -0,0 +1,58 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB\Pool;
use Hyperf\Contract\ConfigInterface;
use Hyperf\DB\Frequency;
use Hyperf\Pool\Pool as HyperfPool;
use Hyperf\Utils\Arr;
use Psr\Container\ContainerInterface;
abstract class Pool extends HyperfPool
{
/**
* @var string
*/
protected $name;
/**
* @var array
*/
protected $config;
public function __construct(ContainerInterface $container, string $name)
{
$config = $container->get(ConfigInterface::class);
$key = sprintf('db.%s', $name);
if (! $config->has($key)) {
throw new \InvalidArgumentException(sprintf('config[%s] is not exist!', $key));
}
$this->name = $name;
$this->config = $config->get($key);
$options = Arr::get($this->config, 'pool', []);
$this->frequency = make(Frequency::class);
parent::__construct($container, $options);
}
public function getName(): string
{
return $this->name;
}
public function getConfig(): array
{
return $this->config;
}
}

View File

@ -0,0 +1,60 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\DB\Pool;
use Hyperf\Contract\ConfigInterface;
use Hyperf\DB\Exception\DriverNotFoundException;
use Psr\Container\ContainerInterface;
class PoolFactory
{
/**
* @var Pool[]
*/
protected $pools = [];
/**
* @var ContainerInterface
*/
protected $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
public function getPool(string $name)
{
if (isset($this->pools[$name])) {
return $this->pools[$name];
}
$config = $this->container->get(ConfigInterface::class);
$driver = $config->get(sprintf('db.%s.driver', $name), 'pdo');
$class = $this->getPoolName($driver);
return $this->pools[$name] = make($class, [$this->container, $name]);
}
protected function getPoolName(string $driver)
{
switch (strtolower($driver)) {
case 'mysql':
return MySQLPool::class;
case 'pdo':
return PDOPool::class;
}
throw new DriverNotFoundException(sprintf('Driver %s is not found.', $driver));
}
}

View File

@ -0,0 +1,79 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\DB\Cases;
use Hyperf\Config\Config;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\DB\DB;
use Hyperf\DB\Frequency;
use Hyperf\DB\Pool\MySQLPool;
use Hyperf\DB\Pool\PDOPool;
use Hyperf\DB\Pool\PoolFactory;
use Hyperf\Di\Container;
use Hyperf\Pool\Channel;
use Hyperf\Pool\PoolOption;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Utils\Context;
use Mockery;
use PHPUnit\Framework\TestCase;
/**
* Class AbstractTestCase.
*/
abstract class AbstractTestCase extends TestCase
{
protected $driver = 'pdo';
protected function tearDown()
{
Mockery::close();
Context::set('db.connection.default', null);
}
protected function getContainer($options = [])
{
$container = Mockery::mock(Container::class);
$container->shouldReceive('get')->with(ConfigInterface::class)->andReturn(new Config([
'db' => [
'default' => [
'driver' => $this->driver,
'password' => '',
'database' => 'hyperf',
'pool' => [
'max_connections' => 20,
],
'options' => $options,
],
],
]));
$container->shouldReceive('make')->with(PDOPool::class, Mockery::any())->andReturnUsing(function ($_, $args) {
return new PDOPool(...array_values($args));
});
$container->shouldReceive('make')->with(MySQLPool::class, Mockery::any())->andReturnUsing(function ($_, $args) {
return new MySQLPool(...array_values($args));
});
$container->shouldReceive('make')->with(Frequency::class, Mockery::any())->andReturn(new Frequency());
$container->shouldReceive('make')->with(PoolOption::class, Mockery::any())->andReturnUsing(function ($_, $args) {
return new PoolOption(...array_values($args));
});
$container->shouldReceive('make')->with(Channel::class, Mockery::any())->andReturnUsing(function ($_, $args) {
return new Channel(...array_values($args));
});
$container->shouldReceive('get')->with(PoolFactory::class)->andReturn($factory = new PoolFactory($container));
$container->shouldReceive('get')->with(DB::class)->andReturn(new DB($factory, 'default'));
$container->shouldReceive('has')->with(StdoutLoggerInterface::class)->andReturn(false);
ApplicationContext::setContainer($container);
return $container;
}
}

View File

@ -0,0 +1,52 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\DB\Cases;
/**
* @internal
* @coversNothing
*/
class MySQLDriverTest extends PDODriverTest
{
protected $driver = 'mysql';
public function testFetch()
{
parent::testFetch();
}
public function testQuery()
{
parent::testQuery();
}
public function testInsertAndExecute()
{
parent::testInsertAndExecute();
}
public function testTransaction()
{
parent::testTransaction();
}
public function testConfig()
{
parent::testConfig();
}
public function testMultiTransaction()
{
parent::testMultiTransaction();
}
}

View File

@ -0,0 +1,118 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\DB\Cases;
use Hyperf\DB\DB;
use Hyperf\DB\Pool\PoolFactory;
/**
* @internal
* @coversNothing
*/
class PDODriverTest extends AbstractTestCase
{
public function testFetch()
{
$db = $this->getContainer()->get(DB::class);
$res = $db->fetch('SELECT * FROM `user` WHERE id = ?;', [2]);
$this->assertSame('Hyperflex', $res['name']);
}
public function testQuery()
{
$db = $this->getContainer()->get(DB::class);
$res = $db->query('SELECT * FROM `user` WHERE id = ?;', [2]);
$this->assertSame('Hyperflex', $res[0]['name']);
}
public function testInsertAndExecute()
{
$db = $this->getContainer()->get(DB::class);
$id = $db->insert('INSERT INTO `user` (`name`, `gender`) VALUES (?,?);', [$name = uniqid(), $gender = rand(0, 2)]);
$this->assertTrue($id > 0);
$res = $db->fetch('SELECT * FROM `user` WHERE id = ?;', [$id]);
$this->assertSame($name, $res['name']);
$this->assertSame($gender, $res['gender']);
$res = $db->execute('UPDATE `user` SET `name` = ? WHERE id = ?', [$name = uniqid(), $id]);
$this->assertTrue($res > 0);
$res = $db->fetch('SELECT * FROM `user` WHERE id = ?;', [$id]);
$this->assertSame($name, $res['name']);
}
public function testTransaction()
{
$db = $this->getContainer()->get(DB::class);
$db->beginTransaction();
$id = $db->insert('INSERT INTO `user` (`name`, `gender`) VALUES (?,?);', [$name = uniqid(), $gender = rand(0, 2)]);
$this->assertTrue($id > 0);
$db->commit();
$res = $db->fetch('SELECT * FROM `user` WHERE id = ?;', [$id]);
$this->assertSame($name, $res['name']);
$this->assertSame($gender, $res['gender']);
$db->beginTransaction();
$id = $db->insert('INSERT INTO `user` (`name`, `gender`) VALUES (?,?);', [$name = uniqid(), $gender = rand(0, 2)]);
$this->assertTrue($id > 0);
$db->rollBack();
$res = $db->fetch('SELECT * FROM `user` WHERE id = ?;', [$id]);
$this->assertNull($res);
}
public function testConfig()
{
$factory = $this->getContainer()->get(PoolFactory::class);
$pool = $factory->getPool('default');
$this->assertSame('hyperf', $pool->getConfig()['database']);
$this->assertSame([], $pool->getConfig()['options']);
$connection = $pool->get();
$this->assertSame(6, count($connection->getConfig()['pool']));
$this->assertSame(20, $connection->getConfig()['pool']['max_connections']);
}
public function testMultiTransaction()
{
$db = $this->getContainer()->get(DB::class);
$db->beginTransaction();
$id = $db->insert('INSERT INTO `user` (`name`, `gender`) VALUES (?,?);', [$name = 'trans' . uniqid(), $gender = rand(0, 2)]);
$this->assertTrue($id > 0);
$db->beginTransaction();
$id2 = $db->insert('INSERT INTO `user` (`name`, `gender`) VALUES (?,?);', ['rollback' . uniqid(), rand(0, 2)]);
$this->assertTrue($id2 > 0);
$db->rollBack();
$db->commit();
$res = $db->fetch('SELECT * FROM `user` WHERE id = ?;', [$id2]);
$this->assertNull($res);
$res = $db->fetch('SELECT * FROM `user` WHERE id = ?;', [$id]);
$this->assertNotNull($res);
}
public function testStaticCall()
{
$this->getContainer();
$res = DB::fetch('SELECT * FROM `user` WHERE id = ?;', [1]);
$this->assertSame('Hyperf', $res['name']);
}
}

View File

@ -0,0 +1,13 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
require_once dirname(dirname(__FILE__)) . '/vendor/autoload.php';

View File

@ -32,6 +32,6 @@ class AmqpConsumerCommand extends GeneratorCommand
protected function getDefaultNamespace(): string
{
return $this->getConfig()['namespace'] ?? 'App\\Amqp\\Consumers';
return $this->getConfig()['namespace'] ?? 'App\\Amqp\\Consumer';
}
}

View File

@ -32,6 +32,6 @@ class AmqpProducerCommand extends GeneratorCommand
protected function getDefaultNamespace(): string
{
return $this->getConfig()['namespace'] ?? 'App\\Amqp\\Producers';
return $this->getConfig()['namespace'] ?? 'App\\Amqp\\Producer';
}
}

View File

@ -0,0 +1,37 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Devtool\Generator;
use Hyperf\Command\Annotation\Command;
/**
* @Command
*/
class NatsConsumerCommand extends GeneratorCommand
{
public function __construct()
{
parent::__construct('gen:nats-consumer');
$this->setDescription('Create a new nats consumer class');
}
protected function getStub(): string
{
return $this->getConfig()['stub'] ?? __DIR__ . '/stubs/nats-consumer.stub';
}
protected function getDefaultNamespace(): string
{
return $this->getConfig()['namespace'] ?? 'App\\Nats\\Consumer';
}
}

View File

@ -0,0 +1,20 @@
<?php
declare(strict_types=1);
namespace %NAMESPACE%;
use Hyperf\Nats\AbstractConsumer;
use Hyperf\Nats\Annotation\Consumer;
use Hyperf\Nats\Message;
/**
* @Consumer(subject="hyperf", queue="hyperf", name ="%CLASS%", nums=1)
*/
class %CLASS% extends AbstractConsumer
{
public function consume(Message $payload)
{
var_dump($payload->getBody());
}
}

View File

@ -17,6 +17,7 @@ use Hyperf\Config\ProviderConfig;
use Hyperf\Di\Annotation\Scanner;
use Hyperf\Di\Container;
use Psr\Container\ContainerInterface;
use Swoole\Timer;
use Symfony\Component\Console\Exception\LogicException;
class InitProxyCommand extends Command
@ -51,6 +52,8 @@ class InitProxyCommand extends Command
$this->createAopProxies();
Timer::clearAll();
$this->output->writeln('<info>Proxy class create success.</info>');
}

View File

@ -54,7 +54,6 @@ class Container implements ContainerInterface
/**
* Container constructor.
* @param Definition\DefinitionSourceInterface $definitionSource
*/
public function __construct(Definition\DefinitionSourceInterface $definitionSource)
{

View File

@ -22,7 +22,6 @@ interface MethodDefinitionCollectorInterface
/**
* Retrieve the metadata for the return value of the method.
* @return ReflectionType
*/
public function getReturnType(string $class, string $method): ReflectionType;
}

View File

@ -18,6 +18,10 @@
"friendsofphp/php-cs-fixer": "^2.15",
"phpunit/phpunit": "^7.0.0"
},
"suggest": {
"hyperf/http-server": "Required to use GraphQLMiddleware.",
"hyperf/http-message": "Required to use GraphQLMiddleware."
},
"autoload": {
"psr-4": {
"Hyperf\\GraphQL\\": "src/"

View File

@ -63,9 +63,7 @@ class AnnotationReader
/**
* AnnotationReader constructor.
* @param Reader $reader
* @param string $mode One of self::LAX_MODE or self::STRICT_MODE
* @param array $strictNamespaces
*/
public function __construct(Reader $reader, string $mode = self::STRICT_MODE, array $strictNamespaces = [])
{

View File

@ -223,7 +223,6 @@ class FieldsBuilder
/**
* @param object $controller
* @param string $annotationName
* @param bool $injectSource whether to inject the source object or not as the first argument
* @throws CannotMapTypeExceptionInterface
* @throws \ReflectionException
@ -464,9 +463,6 @@ class FieldsBuilder
/**
* Checks the Logged and Right annotations.
*
* @param \ReflectionMethod $reflectionMethod
* @return bool
*/
private function isAuthorized(ReflectionMethod $reflectionMethod): bool
{
@ -686,9 +682,6 @@ class FieldsBuilder
* Casts a Type to a GraphQL type.
* Does not deal with nullable.
*
* @param Type $type
* @param null|GraphQLType $subType
* @param bool $mapToInputType
* @throws CannotMapTypeExceptionInterface
* @return GraphQLType (InputType&GraphQLType)|(OutputType&GraphQLType)
*/
@ -734,9 +727,6 @@ class FieldsBuilder
/**
* Removes "null" from the type (if it is compound). Return an array of types (not a Compound type).
*
* @param Type $docBlockTypeHint
* @return array
*/
private function typesWithoutNullable(Type $docBlockTypeHint): array
{
@ -752,9 +742,6 @@ class FieldsBuilder
/**
* Drops "Nullable" types and return the core type.
*
* @param Type $typeHint
* @return Type
*/
private function dropNullableType(Type $typeHint): Type
{
@ -766,9 +753,6 @@ class FieldsBuilder
/**
* Resolves a list type.
*
* @param Type $typeHint
* @return null|Type
*/
private function getTypeInArray(Type $typeHint): ?Type
{
@ -781,10 +765,6 @@ class FieldsBuilder
return $this->dropNullableType($typeHint->getValueType());
}
/**
* @param Type $docBlockTypeHint
* @return bool
*/
private function isNullable(Type $docBlockTypeHint): bool
{
if ($docBlockTypeHint instanceof Null_) {
@ -803,9 +783,6 @@ class FieldsBuilder
/**
* Resolves "self" types into the class type.
*
* @param Type $type
* @return Type
*/
private function resolveSelf(Type $type, ReflectionClass $reflectionClass): Type
{

View File

@ -0,0 +1,60 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\GraphQL;
use GraphQL\GraphQL;
use GraphQL\Type\Schema;
use Hyperf\HttpMessage\Stream\SwooleStream;
use Hyperf\Utils\Codec\Json;
use Hyperf\Utils\Context;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Server\MiddlewareInterface;
use Psr\Http\Server\RequestHandlerInterface;
class GraphQLMiddleware implements MiddlewareInterface
{
/**
* @var Schema
*/
protected $schema;
public function __construct(Schema $schema)
{
$this->schema = $schema;
}
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
if (! $this->isGraphQL($request)) {
return $handler->handle($request);
}
$input = $request->getParsedBody();
$query = $input['query'];
$variableValues = isset($input['variables']) ? $input['variables'] : null;
$result = GraphQL::executeQuery($this->schema, $query, null, null, $variableValues);
return $this->getResponse()->withBody(new SwooleStream(Json::encode($result)));
}
protected function getResponse(): ResponseInterface
{
return Context::get(ResponseInterface::class);
}
protected function isGraphQL(ServerRequestInterface $request): bool
{
return $request->getUri()->getPath() === '/graphql';
}
}

View File

@ -53,12 +53,6 @@ class InputTypeGenerator
$this->argumentResolver = $argumentResolver;
}
/**
* @param string $factory
* @param string $methodName
* @param RecursiveTypeMapperInterface $recursiveTypeMapper
* @return InputObjectType
*/
public function mapFactoryMethod(string $factory, string $methodName, RecursiveTypeMapperInterface $recursiveTypeMapper, ContainerInterface $container): InputObjectType
{
$method = new ReflectionMethod($factory, $methodName);

View File

@ -44,7 +44,6 @@ class InputTypeUtils
/**
* Returns an array with 2 elements: [ $inputName, $className ].
*
* @param ReflectionMethod $method
* @return string[]
*/
public function getInputTypeNameAndClassName(ReflectionMethod $method): array
@ -83,9 +82,6 @@ class InputTypeUtils
/**
* Resolves "self" types into the class type.
*
* @param Type $type
* @return Type
*/
private function resolveSelf(Type $type, ReflectionClass $reflectionClass): Type
{

View File

@ -37,14 +37,7 @@ class ResolvableInputObjectType extends InputObjectType implements ResolvableInp
/**
* QueryField constructor.
* @param string $name
* @param FieldsBuilderFactory $controllerQueryProviderFactory
* @param RecursiveTypeMapperInterface $recursiveTypeMapper
* @param object|string $factory
* @param string $methodName
* @param ArgumentResolver $argumentResolver
* @param null|string $comment
* @param array $additionalConfig
*/
public function __construct(string $name, FieldsBuilderFactory $controllerQueryProviderFactory, RecursiveTypeMapperInterface $recursiveTypeMapper, $factory, string $methodName, ArgumentResolver $argumentResolver, ?string $comment, array $additionalConfig = [])
{
@ -70,7 +63,6 @@ class ResolvableInputObjectType extends InputObjectType implements ResolvableInp
}
/**
* @param array $args
* @return object
*/
public function resolve(array $args)

View File

@ -66,9 +66,7 @@ class TypeGenerator
/**
* @param string $annotatedObjectClassName the FQCN of an object with a Type annotation
* @param RecursiveTypeMapperInterface $recursiveTypeMapper
* @throws \ReflectionException
* @return MutableObjectType
*/
public function mapAnnotatedObject(string $annotatedObjectClassName, RecursiveTypeMapperInterface $recursiveTypeMapper): MutableObjectType
{
@ -97,8 +95,6 @@ class TypeGenerator
/**
* @param object $annotatedObject an object with a ExtendType annotation
* @param MutableObjectType $type
* @param RecursiveTypeMapperInterface $recursiveTypeMapper
*/
public function extendAnnotatedObject($annotatedObject, MutableObjectType $type, RecursiveTypeMapperInterface $recursiveTypeMapper)
{

View File

@ -176,9 +176,6 @@ class TypeMapper implements TypeMapperInterface
/**
* Returns true if this type mapper can map the $className FQCN to a GraphQL type.
*
* @param string $className
* @return bool
*/
public function canMapClassToType(string $className): bool
{
@ -198,9 +195,7 @@ class TypeMapper implements TypeMapperInterface
*
* @param string $className the exact class name to look for (this function does not look into parent classes)
* @param null|OutputType $subType an optional sub-type if the main class is an iterator that needs to be typed
* @param RecursiveTypeMapperInterface $recursiveTypeMapper
* @throws CannotMapTypeExceptionInterface
* @return MutableObjectType
*/
public function mapClassToType(string $className, ?OutputType $subType, RecursiveTypeMapperInterface $recursiveTypeMapper): MutableObjectType
{
@ -229,9 +224,6 @@ class TypeMapper implements TypeMapperInterface
/**
* Returns true if this type mapper can map the $className FQCN to a GraphQL input type.
*
* @param string $className
* @return bool
*/
public function canMapClassToInputType(string $className): bool
{
@ -247,10 +239,7 @@ class TypeMapper implements TypeMapperInterface
/**
* Maps a PHP fully qualified class name to a GraphQL input type.
*
* @param string $className
* @param RecursiveTypeMapperInterface $recursiveTypeMapper
* @throws CannotMapTypeExceptionInterface
* @return InputObjectType
*/
public function mapClassToInputType(string $className, RecursiveTypeMapperInterface $recursiveTypeMapper): InputObjectType
{
@ -271,7 +260,6 @@ class TypeMapper implements TypeMapperInterface
* Returns a GraphQL type by name (can be either an input or output type).
*
* @param string $typeName The name of the GraphQL type
* @param RecursiveTypeMapperInterface $recursiveTypeMapper
* @throws CannotMapTypeExceptionInterface
* @throws \ReflectionException
* @return \GraphQL\Type\Definition\Type&(InputType|OutputType)
@ -308,7 +296,6 @@ class TypeMapper implements TypeMapperInterface
* Returns true if this type mapper can map the $typeName GraphQL name to a GraphQL type.
*
* @param string $typeName The name of the GraphQL type
* @return bool
*/
public function canMapNameToType(string $typeName): bool
{
@ -330,10 +317,6 @@ class TypeMapper implements TypeMapperInterface
/**
* Returns true if this type mapper can extend an existing type for the $className FQCN.
*
* @param string $className
* @param MutableObjectType $type
* @return bool
*/
public function canExtendTypeForClass(string $className, MutableObjectType $type, RecursiveTypeMapperInterface $recursiveTypeMapper): bool
{
@ -349,9 +332,6 @@ class TypeMapper implements TypeMapperInterface
/**
* Extends the existing GraphQL type that is mapped to $className.
*
* @param string $className
* @param MutableObjectType $type
* @param RecursiveTypeMapperInterface $recursiveTypeMapper
* @throws CannotMapTypeExceptionInterface
*/
public function extendTypeForClass(string $className, MutableObjectType $type, RecursiveTypeMapperInterface $recursiveTypeMapper): void
@ -373,10 +353,6 @@ class TypeMapper implements TypeMapperInterface
/**
* Returns true if this type mapper can extend an existing type for the $typeName GraphQL type.
*
* @param string $typeName
* @param MutableObjectType $type
* @return bool
*/
public function canExtendTypeForName(string $typeName, MutableObjectType $type, RecursiveTypeMapperInterface $recursiveTypeMapper): bool
{
@ -399,9 +375,6 @@ class TypeMapper implements TypeMapperInterface
/**
* Extends the existing GraphQL type that is mapped to the $typeName GraphQL type.
*
* @param string $typeName
* @param MutableObjectType $type
* @param RecursiveTypeMapperInterface $recursiveTypeMapper
* @throws CannotMapTypeExceptionInterface
*/
public function extendTypeForName(string $typeName, MutableObjectType $type, RecursiveTypeMapperInterface $recursiveTypeMapper): void
@ -805,7 +778,6 @@ class TypeMapper implements TypeMapperInterface
}
/**
* @param string $className
* @return null|array<string,string> An array of classes with the ExtendType annotation (key and value = FQCN)
*/
private function getExtendTypesFromCacheByObjectClass(string $className): ?array
@ -834,7 +806,6 @@ class TypeMapper implements TypeMapperInterface
}
/**
* @param string $graphqlTypeName
* @return null|array<string,string> An array of classes with the ExtendType annotation (key and value = FQCN)
*/
private function getExtendTypesFromCacheByGraphQLTypeName(string $graphqlTypeName): ?array

View File

@ -12,6 +12,8 @@ declare(strict_types=1);
namespace Hyperf\LoadBalancer;
use Swoole\Timer;
abstract class AbstractLoadBalancer implements LoadBalancerInterface
{
/**
@ -53,7 +55,7 @@ abstract class AbstractLoadBalancer implements LoadBalancerInterface
public function refresh(callable $callback, int $tickMs = 5000)
{
swoole_timer_tick($tickMs, function () use ($callback) {
Timer::tick($tickMs, function () use ($callback) {
$nodes = call($callback);
is_array($nodes) && $this->setNodes($nodes);
});

View File

@ -63,11 +63,11 @@ class LoggerFactory
public function get($name = 'hyperf', $group = 'default'): LoggerInterface
{
if (isset($this->loggers[$name]) && $this->loggers[$name] instanceof Logger) {
return $this->loggers[$name];
if (isset($this->loggers[$group][$name]) && $this->loggers[$group][$name] instanceof Logger) {
return $this->loggers[$group][$name];
}
return $this->loggers[$name] = $this->make($name, $group);
return $this->loggers[$group][$name] = $this->make($name, $group);
}
protected function getDefaultFormatterConfig($config)

View File

@ -79,6 +79,30 @@ class LoggerFactoryTest extends TestCase
$this->assertInstanceOf(TestHandler::class, $handlers[1]);
}
public function testHandlerGroupNotWorks()
{
$container = $this->mockContainer();
$factory = $container->get(LoggerFactory::class);
$logger = $factory->get('hyperf');
$this->assertInstanceOf(\Hyperf\Logger\Logger::class, $logger);
$reflectionClass = new ReflectionClass($logger);
$handlersProperty = $reflectionClass->getProperty('handlers');
$handlersProperty->setAccessible(true);
$handlers = $handlersProperty->getValue($logger);
$this->assertCount(1, $handlers);
$this->assertInstanceOf(StreamHandler::class, $handlers[0]);
$logger = $factory->get('hyperf', 'default-handlers');
$this->assertInstanceOf(\Hyperf\Logger\Logger::class, $logger);
$reflectionClass = new ReflectionClass($logger);
$handlersProperty = $reflectionClass->getProperty('handlers');
$handlersProperty->setAccessible(true);
$handlers = $handlersProperty->getValue($logger);
$this->assertCount(2, $handlers);
$this->assertInstanceOf(StreamHandler::class, $handlers[0]);
$this->assertInstanceOf(TestHandler::class, $handlers[1]);
}
private function mockContainer(): ContainerInterface
{
$container = Mockery::mock(ContainerInterface::class);

1
src/nats/.gitattributes vendored Normal file
View File

@ -0,0 +1 @@
/tests export-ignore

21
src/nats/LICENSE Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) Hyperf
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

59
src/nats/composer.json Normal file
View File

@ -0,0 +1,59 @@
{
"name": "hyperf/nats",
"description": "A nats library for Hyperf.",
"license": "MIT",
"keywords": [
"php",
"swoole",
"hyperf",
"nats"
],
"support": {
},
"require": {
"php": ">=7.2",
"hyperf/contract": "~1.1.0",
"hyperf/pool": "~1.1.0",
"hyperf/utils": "~1.1.0",
"ircmaxell/random-lib": "^1.2",
"psr/container": "^1.0"
},
"require-dev": {
"malukenho/docheader": "^0.1.6",
"mockery/mockery": "^1.0",
"phpunit/phpunit": "^7.0.0",
"friendsofphp/php-cs-fixer": "^2.9"
},
"suggest": {
},
"autoload": {
"psr-4": {
"Hyperf\\Nats\\": "src/"
},
"files": [
"src/Functions.php"
]
},
"autoload-dev": {
"psr-4": {
"HyperfTest\\Nats\\": "tests/"
}
},
"config": {
"sort-packages": true
},
"extra": {
"branch-alias": {
"dev-master": "1.1-dev"
},
"hyperf": {
"config": "Hyperf\\Nats\\ConfigProvider"
}
},
"bin": [
],
"scripts": {
"cs-fix": "php-cs-fixer fix $1",
"test": "phpunit --colors=always"
}
}

34
src/nats/publish/nats.php Normal file
View File

@ -0,0 +1,34 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
return [
'default' => [
'driver' => Hyperf\Nats\Driver\NatsDriver::class,
'encoder' => Hyperf\Nats\Encoders\JSONEncoder::class,
'timeout' => 10.0,
'options' => [
'host' => '127.0.0.1',
'port' => 4222,
'user' => 'nats',
'pass' => 'nats',
'lang' => 'php',
],
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => 60,
],
],
];

View File

@ -0,0 +1,110 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
use Psr\Container\ContainerInterface;
abstract class AbstractConsumer
{
/**
* @var string
*/
public $pool = 'default';
/**
* @var string
*/
protected $subject = '';
/**
* @var string
*/
protected $queue = '';
/**
* @var string
*/
protected $name = 'NatsConsumer';
/**
* @var int
*/
protected $nums = 1;
/**
* @var ContainerInterface
*/
protected $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
abstract public function consume(Message $payload);
public function getSubject(): string
{
return $this->subject;
}
public function setSubject(string $subject): self
{
$this->subject = $subject;
return $this;
}
public function getQueue(): string
{
return $this->queue;
}
public function setQueue(string $queue): self
{
$this->queue = $queue;
return $this;
}
public function getName(): string
{
return $this->name;
}
public function setName(string $name): self
{
$this->name = $name;
return $this;
}
public function getNums(): int
{
return $this->nums;
}
public function setNums(int $nums): self
{
$this->nums = $nums;
return $this;
}
public function getPool(): string
{
return $this->pool;
}
public function setPool(string $pool): self
{
$this->pool = $pool;
return $this;
}
}

View File

@ -0,0 +1,47 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Annotation;
use Hyperf\Di\Annotation\AbstractAnnotation;
/**
* @Annotation
* @Target({"CLASS"})
*/
class Consumer extends AbstractAnnotation
{
/**
* @var string
*/
public $subject = '';
/**
* @var string
*/
public $queue = '';
/**
* @var string
*/
public $name = '';
/**
* @var int
*/
public $nums = 1;
/**
* @var string
*/
public $pool = '';
}

View File

@ -0,0 +1,47 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
use Hyperf\Nats\Driver\DriverFactory;
use Hyperf\Nats\Driver\DriverInterface;
use Psr\Container\ContainerInterface;
class ConfigProvider
{
public function __invoke(): array
{
return [
'dependencies' => [
DriverInterface::class => function (ContainerInterface $container) {
$factory = $container->get(DriverFactory::class);
return $factory->get('default');
},
],
'annotations' => [
'scan' => [
'paths' => [
__DIR__,
],
],
],
'publish' => [
[
'id' => 'config',
'description' => 'The config for amqp.',
'source' => __DIR__ . '/../publish/nats.php',
'destination' => BASE_PATH . '/config/autoload/nats.php',
],
],
];
}
}

612
src/nats/src/Connection.php Normal file
View File

@ -0,0 +1,612 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
use RandomLib\Factory;
use RandomLib\Generator;
/**
* Connection Class.
*
* Handles the connection to a NATS server or cluster of servers.
*/
class Connection
{
/**
* Show DEBUG info?
*
* @var bool if debug is enabled
*/
private $debug = false;
/**
* Number of PINGs.
*
* @var int number of pings
*/
private $pings = 0;
/**
* Chunk size in bytes to use when reading an stream of data.
*
* @var int size of chunk
*/
private $chunkSize = 1500;
/**
* Number of messages published.
*
* @var int number of messages
*/
private $pubs = 0;
/**
* Number of reconnects to the server.
*
* @var int Number of reconnects
*/
private $reconnects = 0;
/**
* List of available subscriptions.
*
* @var array list of subscriptions
*/
private $subscriptions = [];
/**
* Connection options object.
*
* @var null|ConnectionOptions
*/
private $options;
/**
* Connection timeout.
*
* @var float
*/
private $timeout;
/**
* Stream File Pointer.
*
* @var mixed Socket file pointer
*/
private $streamSocket;
/**
* Generator object.
*
* @var Generator|Php71RandomGenerator
*/
private $randomGenerator;
/**
* Server information.
*
* @var mixed
*/
private $serverInfo;
/**
* Constructor.
*
* @param ConnectionOptions $options connection options object
*/
public function __construct(ConnectionOptions $options = null)
{
$this->pings = 0;
$this->pubs = 0;
$this->subscriptions = [];
$this->options = $options;
if (version_compare(phpversion(), '7.0', '>') === true) {
$this->randomGenerator = new Php71RandomGenerator();
} else {
$randomFactory = new Factory();
$this->randomGenerator = $randomFactory->getLowStrengthGenerator();
}
if ($options === null) {
$this->options = new ConnectionOptions();
}
}
/**
* Enable or disable debug mode.
*
* @param bool $debug if debug is enabled
*/
public function setDebug($debug)
{
$this->debug = $debug;
}
/**
* Return the number of pings.
*
* @return int Number of pings
*/
public function pingsCount()
{
return $this->pings;
}
/**
* Return the number of messages published.
*
* @return int number of messages published
*/
public function pubsCount()
{
return $this->pubs;
}
/**
* Return the number of reconnects to the server.
*
* @return int number of reconnects
*/
public function reconnectsCount()
{
return $this->reconnects;
}
/**
* Return the number of subscriptions available.
*
* @return int number of subscription
*/
public function subscriptionsCount()
{
return count($this->subscriptions);
}
/**
* Return subscriptions list.
*
* @return array list of subscription ids
*/
public function getSubscriptions()
{
return array_keys($this->subscriptions);
}
/**
* Sets the chunck size in bytes to be processed when reading.
*
* @param int $chunkSize set byte chunk len to read when reading from wire
*/
public function setChunkSize($chunkSize)
{
$this->chunkSize = $chunkSize;
}
/**
* Set Stream Timeout.
*
* @param float $seconds before timeout on stream
*
* @return bool
*/
public function setStreamTimeout($seconds)
{
if ($this->isConnected() === true) {
if (is_numeric($seconds) === true) {
try {
$timeout = (float) number_format($seconds, 3);
$seconds = floor($timeout);
$microseconds = (($timeout - $seconds) * 1000);
return stream_set_timeout($this->streamSocket, $seconds, $microseconds);
} catch (\Exception $e) {
return false;
}
}
}
return false;
}
/**
* Returns an stream socket for this connection.
*
* @return resource
*/
public function getStreamSocket()
{
return $this->streamSocket;
}
/**
* Checks if the client is connected to a server.
*
* @return bool
*/
public function isConnected()
{
return isset($this->streamSocket);
}
/**
* Returns current connected server ID.
*
* @return string server ID
*/
public function connectedServerID()
{
return $this->serverInfo->getServerID();
}
/**
* Connect to server.
*
* @param float $timeout number of seconds until the connect() system call should timeout
*
* @throws \Throwable exception raised if connection fails
*/
public function connect($timeout = null)
{
if ($timeout === null) {
$timeout = intval(ini_get('default_socket_timeout'));
}
$this->timeout = $timeout;
$this->streamSocket = $this->getStream($this->options->getAddress(), $timeout);
$this->setStreamTimeout($timeout);
$msg = 'CONNECT ' . $this->options;
$this->send($msg);
$connectResponse = $this->receive();
if ($this->isErrorResponse($connectResponse) === true) {
throw Exception::forFailedConnection($connectResponse);
}
$this->processServerInfo($connectResponse);
$this->ping();
$pingResponse = $this->receive();
if ($this->isErrorResponse($pingResponse) === true) {
throw Exception::forFailedPing($pingResponse);
}
}
/**
* Sends PING message.
*/
public function ping()
{
$msg = 'PING';
$this->send($msg);
++$this->pings;
}
/**
* Request does a request and executes a callback with the response.
*
* @param string $subject message topic
* @param string $payload message data
* @param \Closure $callback closure to be executed as callback
*/
public function request($subject, $payload, \Closure $callback)
{
$inbox = uniqid('_INBOX.');
$sid = $this->subscribe(
$inbox,
$callback
);
$this->unsubscribe($sid, 1);
$this->publish($subject, $payload, $inbox);
$this->wait(1);
}
/**
* Subscribes to an specific event given a subject.
*
* @param string $subject message topic
* @param \Closure $callback closure to be executed as callback
*
* @return string
*/
public function subscribe($subject, \Closure $callback)
{
$sid = $this->randomGenerator->generateString(16);
$msg = 'SUB ' . $subject . ' ' . $sid;
$this->send($msg);
$this->subscriptions[$sid] = $callback;
return $sid;
}
/**
* Subscribes to an specific event given a subject and a queue.
*
* @param string $subject message topic
* @param string $queue queue name
* @param \Closure $callback closure to be executed as callback
*
* @return string
*/
public function queueSubscribe($subject, $queue, \Closure $callback)
{
$sid = $this->randomGenerator->generateString(16);
$msg = 'SUB ' . $subject . ' ' . $queue . ' ' . $sid;
$this->send($msg);
$this->subscriptions[$sid] = $callback;
return $sid;
}
/**
* Unsubscribe from a event given a subject.
*
* @param string $sid subscription ID
* @param int $quantity quantity of messages
*/
public function unsubscribe($sid, $quantity = null)
{
$msg = 'UNSUB ' . $sid;
if ($quantity !== null) {
$msg = $msg . ' ' . $quantity;
}
$this->send($msg);
if ($quantity === null) {
unset($this->subscriptions[$sid]);
}
}
/**
* Publish publishes the data argument to the given subject.
*
* @param string $subject message topic
* @param string $payload message data
* @param string $inbox message inbox
*
* @throws Exception if subscription not found
*/
public function publish($subject, $payload = null, $inbox = null)
{
$msg = 'PUB ' . $subject;
if ($inbox !== null) {
$msg = $msg . ' ' . $inbox;
}
$msg = $msg . ' ' . strlen($payload);
$this->send($msg . "\r\n" . $payload);
++$this->pubs;
}
/**
* Waits for messages.
*
* @param int $quantity number of messages to wait for
*
* @return null|Connection $connection Connection object
*/
public function wait($quantity = 0)
{
$count = 0;
$info = stream_get_meta_data($this->streamSocket);
while (is_resource($this->streamSocket) === true && feof($this->streamSocket) === false && empty($info['timed_out']) === true) {
$line = $this->receive();
if ($line === false) {
return null;
}
if (strpos($line, 'PING') === 0) {
$this->handlePING();
}
if (strpos($line, 'MSG') === 0) {
++$count;
$this->handleMSG($line);
if (($quantity !== 0) && ($count >= $quantity)) {
return $this;
}
}
$info = stream_get_meta_data($this->streamSocket);
}
$this->close();
return $this;
}
/**
* Reconnects to the server.
*/
public function reconnect()
{
++$this->reconnects;
$this->close();
$this->connect($this->timeout);
}
/**
* Close will close the connection to the server.
*/
public function close()
{
if ($this->streamSocket === null) {
return;
}
fclose($this->streamSocket);
$this->streamSocket = null;
}
/**
* Indicates whether $response is an error response.
*
* @param string $response the Nats Server response
*
* @return bool
*/
private function isErrorResponse($response)
{
return substr($response, 0, 4) === '-ERR';
}
/**
* Returns an stream socket to the desired server.
*
* @param string $address server url string
* @param float $timeout number of seconds until the connect() system call should timeout
*
* @throws \Exception exception raised if connection fails
* @return resource
*/
private function getStream($address, $timeout)
{
$errno = null;
$errstr = null;
$fp = stream_socket_client($address, $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT);
if ($fp === false) {
throw Exception::forStreamSocketClientError($errstr, $errno);
}
$timeout = (float) number_format($timeout, 3);
$seconds = floor($timeout);
$microseconds = (($timeout - $seconds) * 1000);
stream_set_timeout($fp, $seconds, $microseconds);
return $fp;
}
/**
* Process information returned by the server after connection.
*
* @param string $connectionResponse INFO message
*/
private function processServerInfo($connectionResponse)
{
$this->serverInfo = new ServerInfo($connectionResponse);
}
/**
* Sends data thought the stream.
*
* @param string $payload message data
*
* @throws \Exception raises if fails sending data
*/
private function send($payload)
{
$msg = $payload . "\r\n";
$len = strlen($msg);
while (true) {
$written = @fwrite($this->streamSocket, $msg);
if ($written === false) {
throw new \Exception('Error sending data');
}
if ($written === 0) {
throw new \Exception('Broken pipe or closed connection');
}
$len = ($len - $written);
if ($len > 0) {
$msg = substr($msg, (0 - $len));
} else {
break;
}
}
if ($this->debug === true) {
printf('>>>> %s', $msg);
}
}
/**
* Receives a message thought the stream.
*
* @param int $len number of bytes to receive
*
* @return string
*/
private function receive(int $len = 0)
{
if ($len > 0) {
$chunkSize = $this->chunkSize;
$line = null;
$receivedBytes = 0;
while ($receivedBytes < $len) {
$bytesLeft = ($len - $receivedBytes);
if ($bytesLeft < $this->chunkSize) {
$chunkSize = $bytesLeft;
}
$readChunk = fread($this->streamSocket, $chunkSize);
$receivedBytes += strlen($readChunk);
$line .= $readChunk;
}
} else {
$line = fgets($this->streamSocket);
}
if ($this->debug === true) {
printf('<<<< %s\r\n', $line);
}
return $line;
}
/**
* Handles PING command.
*/
private function handlePING()
{
$this->send('PONG');
}
/**
* Handles MSG command.
*
* @param string $line message command from Nats
*
* @throws Exception if subscription not found
* @codeCoverageIgnore
*/
private function handleMSG($line)
{
$parts = explode(' ', $line);
$subject = null;
$length = trim($parts[3]);
$sid = $parts[2];
if (count($parts) === 5) {
$length = trim($parts[4]);
$subject = $parts[3];
} elseif (count($parts) === 4) {
$length = trim($parts[3]);
$subject = $parts[1];
}
$payload = $this->receive((int) $length);
$msg = new Message($subject, $payload, $sid, $this);
if (isset($this->subscriptions[$sid]) === false) {
throw Exception::forSubscriptionNotFound($sid);
}
$func = $this->subscriptions[$sid];
if (is_callable($func) === true) {
$func($msg);
} else {
throw Exception::forSubscriptionCallbackInvalid($sid);
}
}
}

View File

@ -0,0 +1,448 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
use Traversable;
/**
* ConnectionOptions Class.
*/
class ConnectionOptions
{
/**
* Hostname or IP to connect.
*
* @var string
*/
private $host = 'localhost';
/**
* Port number to connect.
*
* @var int
*/
private $port = 4222;
/**
* Username to connect.
*
* @var string
*/
private $user;
/**
* Password to connect.
*
* @var string
*/
private $pass;
/**
* Token to connect.
*
* @var string
*/
private $token;
/**
* Language of this client.
*
* @var string
*/
private $lang = 'php';
/**
* Version of this client.
*
* @var string
*/
private $version = '0.8.2';
/**
* If verbose mode is enabled.
*
* @var bool
*/
private $verbose = false;
/**
* If pedantic mode is enabled.
*
* @var bool
*/
private $pedantic = false;
/**
* If reconnect mode is enabled.
*
* @var bool
*/
private $reconnect = true;
/**
* Allows to define parameters which can be set by passing them to the class constructor.
*
* @var array
*/
private $configurable = [
'host',
'port',
'user',
'pass',
'token',
'lang',
'version',
'verbose',
'pedantic',
'reconnect',
];
/**
* ConnectionOptions constructor.
*
* <code>
* use Nats\ConnectionOptions;
*
* $options = new ConnectionOptions([
* 'host' => '127.0.0.1',
* 'port' => 4222,
* 'user' => 'nats',
* 'pass' => 'nats',
* 'lang' => 'php',
* // ...
* ]);
* </code>
*
* @param array|Traversable $options the connection options
*/
public function __construct($options = null)
{
if (empty($options) === false) {
$this->initialize($options);
}
}
/**
* Get the options JSON string.
*
* @return string
*/
public function __toString()
{
$a = [
'lang' => $this->lang,
'version' => $this->version,
'verbose' => $this->verbose,
'pedantic' => $this->pedantic,
];
if (empty($this->user) === false) {
$a['user'] = $this->user;
}
if (empty($this->pass) === false) {
$a['pass'] = $this->pass;
}
if (empty($this->token) === false) {
$a['auth_token'] = $this->token;
}
return json_encode($a);
}
/**
* Get the URI for a server.
*
* @return string
*/
public function getAddress()
{
return 'tcp://' . $this->host . ':' . $this->port;
}
/**
* Get host.
*
* @return string
*/
public function getHost()
{
return $this->host;
}
/**
* Set host.
*
* @param string $host host
*
* @return $this
*/
public function setHost($host)
{
$this->host = $host;
return $this;
}
/**
* Get port.
*
* @return int
*/
public function getPort()
{
return $this->port;
}
/**
* Set port.
*
* @param int $port port
*
* @return $this
*/
public function setPort($port)
{
$this->port = $port;
return $this;
}
/**
* Get user.
*
* @return string
*/
public function getUser()
{
return $this->user;
}
/**
* Set user.
*
* @param string $user user
*
* @return $this
*/
public function setUser($user)
{
$this->user = $user;
return $this;
}
/**
* Get password.
*
* @return string
*/
public function getPass()
{
return $this->pass;
}
/**
* Set password.
*
* @param string $pass password
*
* @return $this
*/
public function setPass($pass)
{
$this->pass = $pass;
return $this;
}
/**
* Get token.
*
* @return string
*/
public function getToken()
{
return $this->token;
}
/**
* Set token.
*
* @param string $token token
*
* @return $this
*/
public function setToken($token)
{
$this->token = $token;
return $this;
}
/**
* Get language.
*
* @return string
*/
public function getLang()
{
return $this->lang;
}
/**
* Set language.
*
* @param string $lang language
*
* @return $this
*/
public function setLang($lang)
{
$this->lang = $lang;
return $this;
}
/**
* Get version.
*
* @return string
*/
public function getVersion()
{
return $this->version;
}
/**
* Set version.
*
* @param string $version version number
*
* @return $this
*/
public function setVersion($version)
{
$this->version = $version;
return $this;
}
/**
* Get verbose.
*
* @return bool
*/
public function isVerbose()
{
return $this->verbose;
}
/**
* Set verbose.
*
* @param bool $verbose verbose flag
*
* @return $this
*/
public function setVerbose($verbose)
{
$this->verbose = $verbose;
return $this;
}
/**
* Get pedantic.
*
* @return bool
*/
public function isPedantic()
{
return $this->pedantic;
}
/**
* Set pedantic.
*
* @param bool $pedantic pedantic flag
*
* @return $this
*/
public function setPedantic($pedantic)
{
$this->pedantic = $pedantic;
return $this;
}
/**
* Get reconnect.
*
* @return bool
*/
public function isReconnect()
{
return $this->reconnect;
}
/**
* Set reconnect.
*
* @param bool $reconnect reconnect flag
*
* @return $this
*/
public function setReconnect($reconnect)
{
$this->reconnect = $reconnect;
return $this;
}
/**
* Set the connection options.
*
* @param array|Traversable $options the connection options
*/
public function setConnectionOptions($options)
{
$this->initialize($options);
}
/**
* Initialize the parameters.
*
* @param array|Traversable $options the connection options
*
* @throws Exception when $options are an invalid type
*/
protected function initialize($options)
{
if (is_array($options) === false && ($options instanceof Traversable) === false) {
throw new Exception('The $options argument must be either an array or Traversable');
}
foreach ($options as $key => $value) {
if (in_array($key, $this->configurable, true) === false) {
continue;
}
$method = 'set' . ucfirst($key);
if (method_exists($this, $method) === true) {
$this->{$method}($value);
}
}
}
}

View File

@ -0,0 +1,93 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
use Hyperf\Di\Annotation\AnnotationCollector;
use Hyperf\Nats\Annotation\Consumer as ConsumerAnnotation;
use Hyperf\Nats\Driver\DriverFactory;
use Hyperf\Process\AbstractProcess;
use Hyperf\Process\ProcessManager;
use Psr\Container\ContainerInterface;
class ConsumerManager
{
/**
* @var ContainerInterface
*/
private $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
public function run()
{
$classes = AnnotationCollector::getClassByAnnotation(ConsumerAnnotation::class);
/**
* @var string
* @var ConsumerAnnotation $annotation
*/
foreach ($classes as $class => $annotation) {
$instance = make($class);
if (! $instance instanceof AbstractConsumer) {
continue;
}
$annotation->subject && $instance->setSubject($annotation->subject);
$annotation->queue && $instance->setQueue($annotation->queue);
$annotation->name && $instance->setName($annotation->name);
$annotation->pool && $instance->setPool($annotation->pool);
$nums = $annotation->nums;
$process = $this->createProcess($instance);
$process->nums = (int) $nums;
$process->name = $instance->getName() . '-' . $instance->getSubject();
ProcessManager::register($process);
}
}
private function createProcess(AbstractConsumer $consumer): AbstractProcess
{
return new class($this->container, $consumer) extends AbstractProcess {
/**
* @var AbstractConsumer
*/
private $consumer;
/**
* @var Driver\DriverInterface
*/
private $subscriber;
public function __construct(ContainerInterface $container, AbstractConsumer $consumer)
{
parent::__construct($container);
$this->consumer = $consumer;
$pool = $this->consumer->getPool();
$this->subscriber = $this->container->get(DriverFactory::class)->get($pool);
}
public function handle(): void
{
$this->subscriber->subscribe(
$this->consumer->getSubject(),
$this->consumer->getQueue(),
function ($data) {
$this->consumer->consume($data);
}
);
}
};
}
}

View File

@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Contract;
interface PublishInterface
{
public function publish(string $subject, $payload = null, $inbox = null);
}

View File

@ -0,0 +1,23 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Contract;
use Closure;
use Hyperf\Nats\Message;
interface RequestInterface
{
public function request(string $subject, $payload, Closure $callback);
public function requestSync(string $subject, $payload): Message;
}

View File

@ -0,0 +1,20 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Contract;
use Closure;
interface SubscribeInterface
{
public function subscribe(string $subject, string $queue, Closure $callback): void;
}

View File

@ -0,0 +1,48 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Driver;
use Hyperf\Contract\StdoutLoggerInterface;
use Psr\Container\ContainerInterface;
abstract class AbstractDriver implements DriverInterface
{
/**
* @var ContainerInterface
*/
protected $container;
/**
* @var string
*/
protected $name;
/**
* @var array
*/
protected $config;
/**
* @var StdoutLoggerInterface
*/
protected $logger;
public function __construct(ContainerInterface $container, string $name, array $config)
{
$this->container = $container;
$this->name = $name;
$this->config = $config;
$this->logger = $container->get(StdoutLoggerInterface::class);
}
}

View File

@ -0,0 +1,58 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Driver;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Nats\Exception\ConfigNotFoundException;
use Psr\Container\ContainerInterface;
class DriverFactory
{
/**
* @var ContainerInterface
*/
protected $container;
/**
* @var array
*/
protected $config;
/**
* @var DriverInterface[]
*/
protected $drivers = [];
public function __construct(ContainerInterface $container)
{
$this->container = $container;
$this->config = $container->get(ConfigInterface::class)->get('nats', []);
}
public function get($pool = 'default'): DriverInterface
{
if (isset($this->drivers[$pool]) && $this->drivers[$pool] instanceof DriverInterface) {
return $this->drivers[$pool];
}
$config = $this->config[$pool] ?? null;
if (empty($config)) {
throw new ConfigNotFoundException(sprintf('The config of %s is not found.', $pool));
}
return $this->drivers[$pool] = make($config['driver'], [
'name' => $pool,
'config' => $config,
]);
}
}

View File

@ -0,0 +1,21 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Driver;
use Hyperf\Nats\Contract\PublishInterface;
use Hyperf\Nats\Contract\RequestInterface;
use Hyperf\Nats\Contract\SubscribeInterface;
interface DriverInterface extends PublishInterface, RequestInterface, SubscribeInterface
{
}

View File

@ -0,0 +1,117 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Driver;
use Closure;
use Hyperf\Nats\Connection as NatsConnection;
use Hyperf\Nats\ConnectionOptions;
use Hyperf\Nats\EncodedConnection;
use Hyperf\Nats\Encoders\JSONEncoder;
use Hyperf\Nats\Exception\TimeoutException;
use Hyperf\Nats\Message;
use Hyperf\Pool\SimplePool\Connection;
use Hyperf\Pool\SimplePool\PoolFactory;
use Psr\Container\ContainerInterface;
use Swoole\Coroutine\Channel;
class NatsDriver extends AbstractDriver
{
/**
* @var \Hyperf\Pool\SimplePool\Pool
*/
protected $pool;
public function __construct(ContainerInterface $container, string $name, array $config)
{
parent::__construct($container, $name, $config);
$factory = $this->container->get(PoolFactory::class);
$poolConfig = $config['pool'] ?? [];
$this->pool = $factory->get('nats' . $this->name, function () use ($config) {
$option = new ConnectionOptions($config['options'] ?? []);
$encoder = make($config['encoder'] ?? JSONEncoder::class);
$timeout = $config['timeout'] ?? null;
$conn = make(EncodedConnection::class, [$option, $encoder]);
$conn->connect($timeout);
return $conn;
}, $poolConfig);
}
public function publish(string $subject, $payload = null, $inbox = null)
{
try {
/** @var Connection $connection */
$connection = $this->pool->get();
/** @var NatsConnection $client */
$client = $connection->getConnection();
$client->publish($subject, $payload, $inbox);
} finally {
$connection && $connection->release();
}
}
public function request(string $subject, $payload, Closure $callback)
{
try {
/** @var Connection $connection */
$connection = $this->pool->get();
/** @var NatsConnection $client */
$client = $connection->getConnection();
$client->request($subject, $payload, $callback);
} finally {
$connection && $connection->release();
}
}
public function requestSync(string $subject, $payload): Message
{
try {
/** @var Connection $connection */
$connection = $this->pool->get();
/** @var NatsConnection $client */
$client = $connection->getConnection();
$channel = new Channel(1);
$timeout = floatval($this->config['timeout'] ?? 1.0);
$client->request($subject, $payload, function (Message $message) use ($channel) {
$channel->push($message);
});
$message = $channel->pop($timeout);
if (! $message instanceof Message) {
throw new TimeoutException('Request timeout.');
}
return $message;
} finally {
$connection && $connection->release();
}
}
public function subscribe(string $subject, string $queue, Closure $callback): void
{
try {
/** @var Connection $connection */
$connection = $this->pool->get();
/** @var NatsConnection $client */
$client = $connection->getConnection();
if (empty($queue)) {
$client->subscribe($subject, $callback);
} else {
$client->queueSubscribe($subject, $queue, $callback);
}
$client->wait();
} finally {
$connection && $connection->release();
}
}
}

View File

@ -0,0 +1,86 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
use Hyperf\Nats\Encoders\Encoder;
/**
* Class EncodedConnection.
*/
class EncodedConnection extends Connection
{
/**
* Encoder for this connection.
*
* @var null|Encoder
*/
private $encoder;
/**
* EncodedConnection constructor.
*
* @param ConnectionOptions $options connection options object
* @param null|Encoder $encoder encoder to use with the payload
*/
public function __construct(ConnectionOptions $options = null, Encoder $encoder = null)
{
$this->encoder = $encoder;
parent::__construct($options);
}
/**
* Publish publishes the data argument to the given subject.
*
* @param string $subject message topic
* @param string $payload message data
* @param string $inbox message inbox
*/
public function publish($subject, $payload = null, $inbox = null)
{
$payload = $this->encoder->encode($payload);
parent::publish($subject, $payload, $inbox);
}
/**
* Subscribes to an specific event given a subject.
*
* @param string $subject message topic
* @param \Closure $callback closure to be executed as callback
*
* @return string
*/
public function subscribe($subject, \Closure $callback)
{
$c = function ($message) use ($callback) {
$message->setBody($this->encoder->decode($message->getBody()));
$callback($message);
};
return parent::subscribe($subject, $c);
}
/**
* Subscribes to an specific event given a subject and a queue.
*
* @param string $subject message topic
* @param string $queue queue name
* @param \Closure $callback closure to be executed as callback
*/
public function queueSubscribe($subject, $queue, \Closure $callback)
{
$c = function ($message) use ($callback) {
$message->setBody($this->encoder->decode($message->getBody()));
$callback($message);
};
parent::queueSubscribe($subject, $queue, $c);
}
}

View File

@ -0,0 +1,37 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Encoders;
/**
* Interface Encoder.
*/
interface Encoder
{
/**
* Encodes a message.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function encode($payload);
/**
* Decodes a message.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function decode($payload);
}

View File

@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Encoders;
/**
* Class JSONEncoder.
*
* Encodes and decodes messages in JSON format.
*/
class JSONEncoder implements Encoder
{
/**
* Encodes a message to JSON.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function encode($payload)
{
return json_encode($payload);
}
/**
* Decodes a message from JSON.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function decode($payload)
{
return json_decode($payload, true);
}
}

View File

@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Encoders;
/**
* Class PHPEncoder.
*
* Encodes and decodes messages in PHP format.
*/
class PHPEncoder implements Encoder
{
/**
* Encodes a message to PHP.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function encode($payload)
{
return serialize($payload);
}
/**
* Decodes a message from PHP.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function decode($payload)
{
return unserialize($payload);
}
}

View File

@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Encoders;
/**
* Class YAMLEncoder.
*
* Encodes and decodes messages in YAML format.
*/
class YAMLEncoder implements Encoder
{
/**
* Encodes a message to YAML.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function encode($payload)
{
return yaml_emit($payload);
}
/**
* Decodes a message from YAML.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function decode($payload)
{
return yaml_parse($payload);
}
}

View File

@ -0,0 +1,70 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
/**
* Class Exception.
*/
class Exception extends \Exception
{
/**
* Creates an Exception for a failed connection.
*
* @param string $response the failed error response
*/
public static function forFailedConnection($response)
{
return new static(sprintf('Failed to connect: %s', $response));
}
/**
* Creates an Exception for a failed PING response.
*
* @param string $response the failed PING response
*/
public static function forFailedPing($response)
{
return new static(sprintf('Failed to ping: %s', $response));
}
/**
* Creates an Exception for an invalid Subscription Identifier (sid).
*
* @param string $subscription the Subscription Identifier (sid)
*/
public static function forSubscriptionNotFound($subscription)
{
return new static(sprintf('Subscription not found: %s', $subscription));
}
/**
* Creates an Exception for an invalid Subscription Identifier (sid) callback.
*
* @param string $subscription the Subscription Identifier (sid)
*/
public static function forSubscriptionCallbackInvalid($subscription)
{
return new static(sprintf('Subscription callback is invalid: %s', $subscription));
}
/**
* Creates an Exception for the failed creation of a Stream Socket Client.
*
* @param string $message the system level error message
* @param int $code the system level error code
*/
public static function forStreamSocketClientError($message, $code)
{
return new static(sprintf('A Stream Socket Client could not be created: (%d) %s', $code, $message), $code);
}
}

View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Exception;
use RuntimeException;
class ConfigNotFoundException extends RuntimeException
{
}

View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Exception;
use RuntimeException;
class DriverException extends RuntimeException
{
}

View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Exception;
use RuntimeException;
class TimeoutException extends RuntimeException
{
}

View File

@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
function stream_set_timeout($fp, $seconds, $microseconds)
{
return \stream_set_timeout($fp, (int) $seconds, (int) $microseconds);
}

View File

@ -0,0 +1,57 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Listener;
use Hyperf\Event\Annotation\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeMainServerStart;
use Hyperf\Nats\ConsumerManager;
use Psr\Container\ContainerInterface;
/**
* Must handle the event before `Hyperf\Process\Listener\BootProcessListener`.
* @Listener(priority=99)
*/
class BeforeMainServerStartListener implements ListenerInterface
{
/**
* @var ContainerInterface
*/
private $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
/**
* @return string[] returns the events that you want to listen
*/
public function listen(): array
{
return [
BeforeMainServerStart::class,
];
}
/**
* Handle the Event when the event is triggered, all listeners will
* complete before the event is returned to the EventDispatcher.
*/
public function process(object $event)
{
// Init the consumer process.
$consumerManager = $this->container->get(ConsumerManager::class);
$consumerManager->run();
}
}

179
src/nats/src/Message.php Normal file
View File

@ -0,0 +1,179 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
/**
* Message Class.
*/
class Message
{
/**
* Message Body.
*
* @var string
*/
public $body;
/**
* Message Subject.
*
* @var string
*/
private $subject;
/**
* Message Ssid.
*
* @var string
*/
private $sid;
/**
* Message related connection.
*
* @var Connection
*/
private $conn;
/**
* Message constructor.
*
* @param string $subject message subject
* @param string $body message body
* @param string $sid message Sid
* @param Connection $conn message Connection
*/
public function __construct($subject, $body, $sid, Connection $conn)
{
$this->setSubject($subject);
$this->setBody($body);
$this->setSid($sid);
$this->setConn($conn);
}
/**
* String representation of a message.
*
* @return string
*/
public function __toString()
{
return $this->getBody();
}
/**
* Set subject.
*
* @param string $subject subject
*
* @return $this
*/
public function setSubject($subject)
{
$this->subject = $subject;
return $this;
}
/**
* Get subject.
*
* @return string
*/
public function getSubject()
{
return $this->subject;
}
/**
* Set body.
*
* @param string $body body
*
* @return $this
*/
public function setBody($body)
{
$this->body = $body;
return $this;
}
/**
* Get body.
*
* @return string
*/
public function getBody()
{
return $this->body;
}
/**
* Set Ssid.
*
* @param string $sid ssid
*
* @return $this
*/
public function setSid($sid)
{
$this->sid = $sid;
return $this;
}
/**
* Get Ssid.
*
* @return string
*/
public function getSid()
{
return $this->sid;
}
/**
* Set Conn.
*
* @param Connection $conn connection
*
* @return $this
*/
public function setConn(Connection $conn)
{
$this->conn = $conn;
return $this;
}
/**
* Get Conn.
*
* @return Connection
*/
public function getConn()
{
return $this->conn;
}
/**
* Allows you reply the message with a specific body.
*
* @param string $body body to be set
*/
public function reply($body)
{
$this->conn->publish(
$this->subject,
$body
);
}
}

View File

@ -0,0 +1,31 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
/**
* Class Php71RandomGenerator.
*/
class Php71RandomGenerator
{
/**
* A simple wrapper on random_bytes.
*
* @param int $len length of the string
*
* @return string random string
*/
public function generateString($len)
{
return bin2hex(random_bytes($len));
}
}

341
src/nats/src/ServerInfo.php Normal file
View File

@ -0,0 +1,341 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
/**
* Class ServerInfo.
*/
class ServerInfo
{
/**
* Server unique ID.
*
* @var string
*/
private $serverID;
/**
* Server hostname.
*
* @var string
*/
private $host;
/**
* Server port.
*
* @var int
*/
private $port;
/**
* Server version number.
*
* @var string
*/
private $version;
/**
* Server Golang version.
*
* @var string
*/
private $goVersion;
/**
* Is authorization required?
*
* @var bool
*/
private $authRequired;
/**
* Is TLS required?
*
* @var bool
*/
private $TLSRequired;
/**
* Should TLS be verified?
*
* @var bool
*/
private $TLSVerify;
/**
* Is SSL required?
*
* @var bool
*/
private $SSLRequired;
/**
* Max payload size.
*
* @var int
*/
private $maxPayload;
/**
* Connection URL list.
*
* @var array
*/
private $connectURLs;
/**
* ServerInfo constructor.
*
* @param string $connectionResponse connection response Message
*/
public function __construct($connectionResponse)
{
$parts = explode(' ', $connectionResponse);
$data = json_decode($parts[1], true);
$this->setServerID($data['server_id']);
$this->setHost($data['host']);
$this->setPort($data['port']);
$this->setVersion($data['version']);
$this->setGoVersion($data['go']);
$this->setAuthRequired($data['auth_required'] ?? false);
$this->setTLSRequired($data['tls_required'] ?? false);
$this->setTLSVerify($data['tls_verify'] ?? false);
$this->setMaxPayload($data['max_payload']);
if (version_compare($data['version'], '1.1.0') === -1) {
$this->setSSLRequired($data['ssl_required']);
}
}
/**
* Get the server ID.
*
* @return string server ID
*/
public function getServerID()
{
return $this->serverID;
}
/**
* Set the server ID.
*
* @param string $serverID server ID
*/
public function setServerID($serverID)
{
$this->serverID = $serverID;
}
/**
* Get the server host name or ip.
*
* @return string server host
*/
public function getHost()
{
return $this->host;
}
/**
* Set the server host name or ip.
*
* @param string $host server host
*/
public function setHost($host)
{
$this->host = $host;
}
/**
* Get server port number.
*
* @return int server port number
*/
public function getPort()
{
return $this->port;
}
/**
* Set server port number.
*
* @param int $port server port number
*/
public function setPort($port)
{
$this->port = $port;
}
/**
* Get server version number.
*
* @return string server version number
*/
public function getVersion()
{
return $this->version;
}
/**
* Set server version number.
*
* @param string $version server version number
*/
public function setVersion($version)
{
$this->version = $version;
}
/**
* Get the golang version number.
*
* @return string go version number
*/
public function getGoVersion()
{
return $this->goVersion;
}
/**
* Set the golang version number.
*
* @param string $goVersion go version number
*/
public function setGoVersion($goVersion)
{
$this->goVersion = $goVersion;
}
/**
* Check if server requires authorization.
*
* @return bool if auth is required
*/
public function isAuthRequired()
{
return $this->authRequired;
}
/**
* Set if the server requires authorization.
*
* @param bool $authRequired if auth is required
*/
public function setAuthRequired($authRequired)
{
$this->authRequired = $authRequired;
}
/**
* Check if server requires TLS.
*
* @return bool if TLS is required
*/
public function isTLSRequired()
{
return $this->TLSRequired;
}
/**
* Set if server requires TLS.
*
* @param bool $TLSRequired if TLS is required
*/
public function setTLSRequired($TLSRequired)
{
$this->TLSRequired = $TLSRequired;
}
/**
* Check if TLS certificate is verified.
*
* @return bool if TLS certificate is verified
*/
public function isTLSVerify()
{
return $this->TLSVerify;
}
/**
* Set if server verifies TLS certificate.
*
* @param bool $TLSVerify if TLS certificate is verified
*/
public function setTLSVerify($TLSVerify)
{
$this->TLSVerify = $TLSVerify;
}
/**
* Check if SSL is required.
*
* @return bool if SSL is required
*/
public function isSSLRequired()
{
return $this->SSLRequired;
}
/**
* Set if SSL is required.
*
* @param bool $SSLRequired if SSL is required
*/
public function setSSLRequired($SSLRequired)
{
$this->SSLRequired = $SSLRequired;
}
/**
* Get the max size of the payload.
*
* @return int size in bytes
*/
public function getMaxPayload()
{
return $this->maxPayload;
}
/**
* Set the max size of the payload.
*
* @param int $maxPayload size in bytes
*/
public function setMaxPayload($maxPayload)
{
$this->maxPayload = $maxPayload;
}
/**
* Get the server connection URLs.
*
* @return array list of server connection urls
*/
public function getConnectURLs()
{
return $this->connectURLs;
}
/**
* Set the server connection URLs.
*
* @param array $connectURLs list of server connection urls
*/
public function setConnectURLs(array $connectURLs)
{
$this->connectURLs = $connectURLs;
}
}

Some files were not shown because too many files have changed in this diff Show More