fix grpc disconnect

This commit is contained in:
lsxredrain 2021-08-20 16:05:42 +08:00
parent 92159bfb74
commit bebf3ebac6
19 changed files with 1210 additions and 6 deletions

View File

@ -196,8 +196,8 @@ handle_message({deliver, _Topic, Msg},
<<"instruct">> => Instruct, <<"devaddr">> => DevAddr,
<<"ips">> => Ips
},
%% ?LOG(info,"AppData ~p", [AppData]),
%% dgiot_ffmpeg:start_live(NewEnv),
?LOG(info,"AppData ~p", [AppData]),
dgiot_ffmpeg:start_live(NewEnv),
dgiot_ffmpeg:start_video(NewEnv),
{ok, State};
_ ->

View File

@ -0,0 +1,478 @@
# Exproto-svr-java快速指南
[English](README.md) | 简体中文
## 前提
1. 仅支持开源版 4.3+。
2. JDK 1.8 +
3. 目前官方的4.3.0-4.3.5的版本尚存在bug需要补丁。以下是加入补丁之后的Demo。
4. 如果你使用的时候官方还没有修复bug可将[补丁](https://github.com/emqx/emqx-extension-examples/tree/master/exproto-svr-python/patch/grpc_client.beam)放到你Emqx的lib/grpc-0.6.2/ebin下。
## 新建项目
首先新建一个普通 maven 项目
## 依赖配置
pom.xml 中依赖配置如下:
```xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>exproto-svr-java</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.39.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.39.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.39.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.17.2</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.17.2:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.39.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```
## 代码生成
然后需要通过GRPC的协议规范文件来生成代码, 在此处获取: [exproto.proto](https://github.com/emqx/emqx/tree/master/apps/emqx_exproto/priv/protos) 最新版协议规范描述文件, 把 proto 文件复制到项目目录下和 java 目录平行的 proto 文件夹内, 然后运行命令:
```bash
mvn compile
```
或者直接在IDEA右侧选择MAVEN点击compile,同样的效果。![image-20210716100715536](static/image-20210716100715536.png)
然后会在target目录下生成一系列目录其中我们生成的代码在generated-sources
```
generated-sources
├── annotations
└── protobuf
├── grpc-java
│   └──emqx
│ └──exproto
│ └──v1
│ ├──ConnectionAdapterGrpc.java
│   └── ConnectionHandlerGrpc.java
└── java
└──emqx
└──exproto
└──v1
└── Exproto.java
```
到此为止我们的项目结构搭建成功,然后开始编写测试
## Demo
在src/main/java下新建包emqx.exproto.v1与上面生成的代码的包名保持一致。此参数是根据proto 文件中的 package 参数生成的。
ConnectionHandler.java
这里的HOST更换成你自己的EMQX服务器的IP。
```java
package emqx.exproto.v1;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.List;
/**
* @author WangScaler
* @date 2021/7/15 18:20
*/
public class ConnectionHandler extends ConnectionHandlerGrpc.ConnectionHandlerImplBase {
private static final String HOST = "192.168.1.19:9100";
static ManagedChannel channel;
static {
System.out.println("[LOG] Build singleton channel");
channel = ManagedChannelBuilder
.forTarget(HOST)
.usePlaintext()
.build();
}
@Override
public StreamObserver<Exproto.SocketCreatedRequest> onSocketCreated(final StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.SocketCreatedRequest>() {
public void onNext(Exproto.SocketCreatedRequest request) {
ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
System.out.println("[LOG] client socket connection:" + request.getConninfo());
Exproto.ClientInfo clientInfo = Exproto.ClientInfo.newBuilder()
.setClientid("test")
.setUsername("test")
.build();
Exproto.AuthenticateRequest authenticateRequest = Exproto.AuthenticateRequest.newBuilder()
.setClientinfo(clientInfo)
.setConn(request.getConn())
.setPassword("test")
.build();
Exproto.CodeResponse response = blockingStub.authenticate(authenticateRequest);
System.out.println("[LOG] authenticate" + response.getMessageBytes());
Exproto.TimerRequest timerRequest = Exproto.TimerRequest.newBuilder()
.setConn(request.getConn())
.setInterval(20)
.setType(Exproto.TimerType.KEEPALIVE)
.build();
Exproto.CodeResponse timerResponse = blockingStub.startTimer(timerRequest);
System.out.println("[LOG] startTimer" + timerResponse.getMessageBytes());
Exproto.SubscribeRequest subscribeRequest = Exproto.SubscribeRequest.newBuilder()
.setConn(request.getConn())
.setTopic("/test")
.setQos(0)
.build();
Exproto.CodeResponse subscribeResponse = blockingStub.subscribe(subscribeRequest);
System.out.println("[LOG] subscribe" + subscribeResponse.getMessageBytes());
}
public void onError(Throwable throwable) {
System.out.println(" onSocketCreated error cause" + throwable.getCause());
System.out.println(" onSocketCreated error message" + throwable.getMessage());
}
public void onCompleted() {
responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<Exproto.ReceivedBytesRequest> onReceivedBytes(final StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.ReceivedBytesRequest>() {
public void onNext(Exproto.ReceivedBytesRequest request) {
ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
System.out.println("[LOG] ReceivedBytesRequest" + request.getConn());
Exproto.PublishRequest publishRequest = Exproto.PublishRequest.newBuilder()
.setConn(request.getConn())
.setTopic("/test1")
.setQos(0)
.setPayload(request.getBytes()).build();
Exproto.CodeResponse response = blockingStub.publish(publishRequest);
System.out.println("[LOG] publish" + response.getMessage());
}
public void onError(Throwable throwable) {
System.out.println(" onReceivedBytes error cause" + throwable.getCause());
System.out.println(" onReceivedBytes error message" + throwable.getMessage());
}
public void onCompleted() {
responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<Exproto.ReceivedMessagesRequest> onReceivedMessages(final StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.ReceivedMessagesRequest>() {
public void onNext(Exproto.ReceivedMessagesRequest receivedMessagesRequest) {
ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
System.out.println("[LOG] onReceivedMessages" + receivedMessagesRequest.getConn());
List<Exproto.Message> messagesList = receivedMessagesRequest.getMessagesList();
for (Exproto.Message message : messagesList) {
System.out.println("Message:" + message.getPayload());
Exproto.SendBytesRequest sendBytesRequest = Exproto.SendBytesRequest.newBuilder()
.setConn(receivedMessagesRequest.getConn())
.setBytes(message.getPayload())
.build();
Exproto.CodeResponse sendBytesResponse = blockingStub.send(sendBytesRequest);
System.out.println("[LOG] sendBytes" + sendBytesResponse.getMessage());
}
}
public void onError(Throwable throwable) {
System.out.println(" onReceivedMessages error cause" + throwable.getCause());
System.out.println(" onReceivedMessages error message" + throwable.getMessage());
}
public void onCompleted() {
responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<Exproto.TimerTimeoutRequest> onTimerTimeout(final StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.TimerTimeoutRequest>() {
public void onNext(Exproto.TimerTimeoutRequest timerTimeoutRequest) {
ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
System.out.println("[LOG] onTimerTimeout");
Exproto.CloseSocketRequest closeSocketRequest = Exproto.CloseSocketRequest.newBuilder()
.setConn(timerTimeoutRequest.getConn())
.build();
Exproto.CodeResponse closeResponse = blockingStub.close(closeSocketRequest);
System.out.println("[LOG] close" + closeResponse.getMessage());
}
public void onError(Throwable throwable) {
System.out.println(" onTimerTimeout error cause" + throwable.getCause());
System.out.println(" onTimerTimeout error message" + throwable.getMessage());
}
public void onCompleted() {
responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<Exproto.SocketClosedRequest> onSocketClosed(final StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.SocketClosedRequest>() {
public void onNext(Exproto.SocketClosedRequest socketClosedRequest) {
System.out.println("[LOG] onSocketClosed:" + socketClosedRequest.toString());
}
public void onError(Throwable throwable) {
System.out.println(" onSocketClosed error cause" + throwable.getCause());
System.out.println(" onSocketClosed error message" + throwable.getMessage());
}
public void onCompleted() {
responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
responseObserver.onCompleted();
}
};
}
}
```
TestGrpcServer.java
这里使用`NettyServerBuilder.permitKeepAliveTime(2, TimeUnit.SECONDS)`的原因是如果不设置的话,设备发送三分钟会出现断开现象,通过抓包发现`too_many_pings`,并且通过[A8-client-side-keepalive](https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#server-enforcement)发现是GRPC的一种机制。可参考[tcp keepalive](https://askemq.com/t/topic/684)
```java
package emqx.exproto.v1;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
public class TestGRpcServer {
private static final Logger logger = Logger.getLogger(TestGRpcServer.class.getName());
public static void main(String[] args) throws IOException, InterruptedException {
int port = 9001;
Server server = NettyServerBuilder
.forPort(9001)
.permitKeepAliveTime(2, TimeUnit.SECONDS)
.permitKeepAliveWithoutCalls(true)
.addService(new ConnectionHandler())
.build()
.start();
logger.info("Server started, listening on " + port);
server.awaitTermination();
}
}
```
最终的目录结构
```
├── pom.xml
├── src
│ ├── main
│ │ ├── java
│ │ │ └── emqx
│ │ │ └── exproto
│ │ │ └── v1
│ │ │ ├── ConnectionHandler.java
│ │ │ └── TestGRpcServer.java
│ │ ├── proto
│ │ │ └── exproto.proto
│ │ └── resources
│ └── test
│ └── java
├── target
└──generated-sources
├── annotations
└── protobuf
├── grpc-java
│   └──emqx
│ └──exproto
│ └──v1
│ ├──ConnectionAdapterGrpc.java
│   └── ConnectionHandlerGrpc.java
└── java
└──emqx
└──exproto
└──v1
└── Exproto.java
```
## 测试过程
我这里使用的网络调试助手进行的测试可以选择你自己的方式去连接EMQX服务器的IP:192.168.1.19
### 连接
![image-20210716105416212](static/image-20210716105416212.png)
控制台输出
```
[LOG] client socket connection:peername {
host: "192.168.1.77"
port: 54985
}
sockname {
host: "192.168.1.19"
port: 7993
}
```
### 认证
控制台输出
```
[LOG] authenticate code: 0
```
此时观察Dashboard
![image-20210716105448535](static/image-20210716105448535.png)
证明认证成功
### 订阅
控制台输出
```
LOG] subscribe code: 0
```
此时在Dashboard也可观察到![image-20210716105710015](static/image-20210716105710015.png)
证明订阅成功,当我们向该主题发送消息的时候,控制台输出
```
[LOG] onReceivedMessagesg1hkAA5lbXF4QDEyNy4wLjAuMQAAE/8AAAABAAAAAQ==
Message:<ByteString@36abb5d4 size=20 contents="{\n \"msg\": \"hello\"\n}">
[LOG] sendBytes code: 0
```
同时我们的网络助手也接收到了消息
![image-20210716110738053](static/image-20210716110738053.png)
### 开启定时器
控制台输出
```
[LOG] startTimer code: 0
```
当超过我们设置的时间Interval(20* 3=60s时将触发回调OnTimerTimeout。
```
[LOG] onTimerTimeout
[LOG] close code: 0
```
### 发布
当我们使用网络调试助手发送消息(test)时,控制台输出
```
[LOG] ReceivedBytesRequestg1hkAA5lbXF4QDEyNy4wLjAuMQAAFDwAAAABAAAAAQ==
[LOG] publish code: 0
```
而我们的订阅者,也可以订阅到消息
![image-20210716111229969](static/image-20210716111229969.png)
### 关闭
控制台打印
```
[LOG] onSocketClosed:conn: "g1hkAA5lbXF4QDEyNy4wLjAuMQAAFDwAAAABAAAAAQ=="
reason: "{shutdown,{sock_closed,normal}}"
```
## 参考资料
- [1] [exproto-svr-java-for-enterpise-e4.2](https://github.com/emqx/emqx-extension-examples/tree/master/exproto-svr-java-for-enterpise-e4.2)
- [2] [gRpc.io](https://www.grpc.io/docs/languages/python/quickstart/)
- [3] [A8-client-side-keepalive](https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#server-enforcement)
## 注意这里的StartTimer的意义是如果监测到我们的tcp连接interval*3s没有发送数据就会回调OnTimerTimeout函数我们可以在这里处理一些业务我这里将tcp连接直接踢掉。

View File

@ -0,0 +1,4 @@
# Exproto-svr-java Quick guide
English | [简体中文](README-CN.md)

View File

@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>exproto-svr-java</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.39.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.39.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.39.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.17.2</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.17.2:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.39.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,174 @@
package emqx.exproto.v1;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.List;
/**
* @author WangScaler
* @date 2021/7/15 18:20
*/
public class ConnectionHandler extends ConnectionHandlerGrpc.ConnectionHandlerImplBase {
private static final String HOST = "192.168.1.19:9100";
static ManagedChannel channel;
static {
System.out.println("[LOG] Build singleton channel");
channel = ManagedChannelBuilder
.forTarget(HOST)
.usePlaintext()
.build();
}
@Override
public StreamObserver<Exproto.SocketCreatedRequest> onSocketCreated(final StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.SocketCreatedRequest>() {
public void onNext(Exproto.SocketCreatedRequest request) {
ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
System.out.println("[LOG] client socket connection:" + request.getConninfo());
Exproto.ClientInfo clientInfo = Exproto.ClientInfo.newBuilder()
.setClientid("test")
.setUsername("test")
.build();
Exproto.AuthenticateRequest authenticateRequest = Exproto.AuthenticateRequest.newBuilder()
.setClientinfo(clientInfo)
.setConn(request.getConn())
.setPassword("test")
.build();
Exproto.CodeResponse response = blockingStub.authenticate(authenticateRequest);
System.out.println("[LOG] authenticate code: " + response.getCodeValue());
Exproto.TimerRequest timerRequest = Exproto.TimerRequest.newBuilder()
.setConn(request.getConn())
.setInterval(20)
.setType(Exproto.TimerType.KEEPALIVE)
.build();
Exproto.CodeResponse timerResponse = blockingStub.startTimer(timerRequest);
System.out.println("[LOG] startTimer code: " + timerResponse.getCodeValue());
Exproto.SubscribeRequest subscribeRequest = Exproto.SubscribeRequest.newBuilder()
.setConn(request.getConn())
.setTopic("/test")
.setQos(0)
.build();
Exproto.CodeResponse subscribeResponse = blockingStub.subscribe(subscribeRequest);
System.out.println("[LOG] subscribe code: " + subscribeResponse.getCodeValue());
}
public void onError(Throwable throwable) {
System.out.println(" onSocketCreated error cause" + throwable.getCause());
System.out.println(" onSocketCreated error message" + throwable.getMessage());
}
public void onCompleted() {
responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<Exproto.ReceivedBytesRequest> onReceivedBytes(final StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.ReceivedBytesRequest>() {
public void onNext(Exproto.ReceivedBytesRequest request) {
ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
System.out.println("[LOG] ReceivedBytesRequest" + request.getConn());
Exproto.PublishRequest publishRequest = Exproto.PublishRequest.newBuilder()
.setConn(request.getConn())
.setTopic("/test1")
.setQos(0)
.setPayload(request.getBytes()).build();
Exproto.CodeResponse response = blockingStub.publish(publishRequest);
System.out.println("[LOG] publish code: " + response.getCodeValue());
}
public void onError(Throwable throwable) {
System.out.println(" onReceivedBytes error cause" + throwable.getCause());
System.out.println(" onReceivedBytes error message" + throwable.getMessage());
}
public void onCompleted() {
responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<Exproto.ReceivedMessagesRequest> onReceivedMessages(final StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.ReceivedMessagesRequest>() {
public void onNext(Exproto.ReceivedMessagesRequest receivedMessagesRequest) {
ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
System.out.println("[LOG] onReceivedMessages" + receivedMessagesRequest.getConn());
List<Exproto.Message> messagesList = receivedMessagesRequest.getMessagesList();
for (Exproto.Message message : messagesList) {
System.out.println("Message:" + message.getPayload());
Exproto.SendBytesRequest sendBytesRequest = Exproto.SendBytesRequest.newBuilder()
.setConn(receivedMessagesRequest.getConn())
.setBytes(message.getPayload())
.build();
Exproto.CodeResponse sendBytesResponse = blockingStub.send(sendBytesRequest);
System.out.println("[LOG] sendBytes code: " + sendBytesResponse.getCodeValue());
}
}
public void onError(Throwable throwable) {
System.out.println(" onReceivedMessages error cause" + throwable.getCause());
System.out.println(" onReceivedMessages error message" + throwable.getMessage());
}
public void onCompleted() {
responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<Exproto.TimerTimeoutRequest> onTimerTimeout(final StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.TimerTimeoutRequest>() {
public void onNext(Exproto.TimerTimeoutRequest timerTimeoutRequest) {
ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
System.out.println("[LOG] onTimerTimeout");
Exproto.CloseSocketRequest closeSocketRequest = Exproto.CloseSocketRequest.newBuilder()
.setConn(timerTimeoutRequest.getConn())
.build();
Exproto.CodeResponse closeResponse = blockingStub.close(closeSocketRequest);
System.out.println("[LOG] close code: " + closeResponse.getCodeValue());
}
public void onError(Throwable throwable) {
System.out.println(" onTimerTimeout error cause" + throwable.getCause());
System.out.println(" onTimerTimeout error message" + throwable.getMessage());
}
public void onCompleted() {
responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<Exproto.SocketClosedRequest> onSocketClosed(final StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.SocketClosedRequest>() {
public void onNext(Exproto.SocketClosedRequest socketClosedRequest) {
System.out.println("[LOG] onSocketClosed:" + socketClosedRequest.toString());
}
public void onError(Throwable throwable) {
System.out.println(" onSocketClosed error cause" + throwable.getCause());
System.out.println(" onSocketClosed error message" + throwable.getMessage());
}
public void onCompleted() {
responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
responseObserver.onCompleted();
}
};
}
}

View File

@ -0,0 +1,30 @@
package emqx.exproto.v1;
import io.grpc.Server;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
/**
* @author WangScaler
* @date 2021/7/15 18:22
*/
public class TestGrpcServer {
private static final Logger logger = Logger.getLogger(TestGrpcServer.class.getName());
public static void main(String[] args) throws IOException, InterruptedException {
int port = 9001;
Server server = NettyServerBuilder
.forPort(9001)
.permitKeepAliveTime(2, TimeUnit.SECONDS)
.permitKeepAliveWithoutCalls(true)
.addService(new ConnectionHandler())
.build()
.start();
logger.info("Server started, listening on " + port);
server.awaitTermination();
}
}

View File

@ -0,0 +1,259 @@
//------------------------------------------------------------------------------
// Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//------------------------------------------------------------------------------
syntax = "proto3";
package emqx.exproto.v1;
// The Broker side serivce. It provides a set of APIs to
// handle a protcol access
service ConnectionAdapter {
// -- socket layer
rpc Send(SendBytesRequest) returns (CodeResponse) {};
rpc Close(CloseSocketRequest) returns (CodeResponse) {};
// -- protocol layer
rpc Authenticate(AuthenticateRequest) returns (CodeResponse) {};
rpc StartTimer(TimerRequest) returns (CodeResponse) {};
// -- pub/sub layer
rpc Publish(PublishRequest) returns (CodeResponse) {};
rpc Subscribe(SubscribeRequest) returns (CodeResponse) {};
rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {};
}
service ConnectionHandler {
// -- socket layer
rpc OnSocketCreated(stream SocketCreatedRequest) returns (EmptySuccess) {};
rpc OnSocketClosed(stream SocketClosedRequest) returns (EmptySuccess) {};
rpc OnReceivedBytes(stream ReceivedBytesRequest) returns (EmptySuccess) {};
// -- pub/sub layer
rpc OnTimerTimeout(stream TimerTimeoutRequest) returns (EmptySuccess) {};
rpc OnReceivedMessages(stream ReceivedMessagesRequest) returns (EmptySuccess) {};
}
message EmptySuccess { }
enum ResultCode {
// Operation successfully
SUCCESS = 0;
// Unknown Error
UNKNOWN = 1;
// Connection process is not alive
CONN_PROCESS_NOT_ALIVE = 2;
// Miss the required parameter
REQUIRED_PARAMS_MISSED = 3;
// Params type or values incorrect
PARAMS_TYPE_ERROR = 4;
// No permission or Pre-conditions not fulfilled
PERMISSION_DENY = 5;
}
message CodeResponse {
ResultCode code = 1;
// The reason message if result is false
string message = 2;
}
message SendBytesRequest {
string conn = 1;
bytes bytes = 2;
}
message CloseSocketRequest {
string conn = 1;
}
message AuthenticateRequest {
string conn = 1;
ClientInfo clientinfo = 2;
string password = 3;
}
message TimerRequest {
string conn = 1;
TimerType type = 2;
uint32 interval = 3;
}
enum TimerType {
KEEPALIVE = 0;
}
message PublishRequest {
string conn = 1;
string topic = 2;
uint32 qos = 3;
bytes payload = 4;
}
message SubscribeRequest {
string conn = 1;
string topic = 2;
uint32 qos = 3;
}
message UnsubscribeRequest {
string conn = 1;
string topic = 2;
}
message SocketCreatedRequest {
string conn = 1;
ConnInfo conninfo = 2;
}
message ReceivedBytesRequest {
string conn = 1;
bytes bytes = 2;
}
message TimerTimeoutRequest {
string conn = 1;
TimerType type = 2;
}
message SocketClosedRequest {
string conn = 1;
string reason = 2;
}
message ReceivedMessagesRequest {
string conn = 1;
repeated Message messages = 2;
}
//--------------------------------------------------------------------
// Basic data types
//--------------------------------------------------------------------
message ConnInfo {
SocketType socktype = 1;
Address peername = 2;
Address sockname = 3;
CertificateInfo peercert = 4;
}
enum SocketType {
TCP = 0;
SSL = 1;
UDP = 2;
DTLS = 3;
}
message Address {
string host = 1;
uint32 port = 2;
}
message CertificateInfo {
string cn = 1;
string dn = 2;
}
message ClientInfo {
string proto_name = 1;
string proto_ver = 2;
string clientid = 3;
string username = 4;
string mountpoint = 5;
}
message Message {
string node = 1;
string id = 2;
uint32 qos = 3;
string from = 4;
string topic = 5;
bytes payload = 6;
uint64 timestamp = 7;
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 484 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

View File

@ -0,0 +1,153 @@
# emqx-exproto-python-sdk
The Python SDK for emqx-exproto
## Installation
There are the following ways to install this library
#### Install by Pypi
```bash
pip3 install emqx-exproto-sdk
```
#### Install from source code
```bash
git clone https://github.com/emqx/emqx-exproto-python-sdk.git
cd emqx-exproto-python-sdk
python3 setup.py install
```
## Get Started
- First of all, follow the step Installation to install dependencies
- Create your Python project
## Deploy
After compiled all source codes, you should deploy the sdk and your source files into emqx:
1. Ensure the emqx hosted machine has installed the `emqx-exproto-python-sdk`.
2. Copy your source code files. E.g: copy `example/demo.py` to `emqx/data/extension` directory.
3. Modify the `emqx/etc/plugins/emqx_exproto.conf` file. e.g:
```properties
exproto.listener.protoname = tcp://0.0.0.0:7993
exproto.listener.protoname.driver = python
exproto.listener.protoname.driver_search_path = data/extension
exproto.listener.protoname.driver_callback_module = demo
```
4. Execute `bin/emqx console` to start emqx and load the `emqx_exproto` plugin.
Use `telnet 127.0.0.1 7993` to establish a TCP connection and observe the console output.
## Interface
### Callbacks
**Connection Layer callbacks** (The Connection object represents a TCP/UDP Socket entity):
```python
// This function will be scheduled after a TCP connection established to EMQ X
// or receive a new UDP socket.
on_connect(connection: Connection, connection_info: ConnectionInfo)
// This callback will be scheduled when a connection received bytes from TCP/UDP socket.
on_received(connection: Connection, data: bytes, state: any)
// This function will be scheduled after a connection terminated.
//
// It indicates that the EMQ X process that maintains the TCP/UDP socket
// has been closed. E.g: a TCP connection is closed, or a UDP socket has
// exceeded maintenance hours.
on_terminated(connection: Connection, reason: str, state: any)
```
**Pub/Sub Layer callbacks:**
```python
// This function will be scheduled when a connection received a Message from EMQ X
//
// When a connection is subscribed to a topic and a message arrives on that topic,
// EMQ X will deliver the message to that connection. At that time, this function
// is triggered.
on_deliver(connection: Connection, message_list: list)
```
### APIs
Similarly, AbstractExprotoHandler also provides a set of APIs to facilitate the use of the `emqx-exproto` APIs.
**Connection Layer APIs:**
```python
// Send a stream of bytes to the connection. These bytes are delivered directly
// to the associated TCP/UDP socket.
send(connection: Connection, data: bytes)
// Terminate the connection process and TCP/UDP socket.
terminate(connection: Connection)
```
**Pub/Sub Layer APIs:**
```python
// Register the connection as a Client of EMQ X. This `clientInfo` contains the
// necessary field information to be an EMQ X client.
//
// This method should normally be invoked after confirming that a connection is
// allowed to access the EMQ X system. For example: after the connection packet
// has been parsed and authenticated successfully.
register(connection: Connection, client_info: ClientInfo)
// The connection Publish a Message to EMQ X
publish(connection: Connection, message: Message)
// The connection Subscribe a Topic to EMQ X
subscribe(connection: Connection, topic: str, qos: int)
```
## Example
```python
from emqx.exproto.core import *
from emqx.exproto.abstract_handler import AbstractExProtoHandler
from emqx.exproto.connection import Connection, ConnectionInfo
import emqx.exproto.driver as driver
class SdkDemo(AbstractExProtoHandler):
# In this class, you need to implement abstract AbstractExProtoHandler
def on_connect(self, connection: Connection, connection_info: ConnectionInfo):
print(connection)
print(connection_info)
def on_received(self, connection: Connection, data: bytes, state: any):
print(connection)
print(data)
def on_terminated(self, connection: Connection, reason: str, state: any):
print(connection)
print(reason)
def on_deliver(self, connection: Connection, message_list: list):
print(connection)
for message in message_list:
print(message)
# set exproto_driver
driver.exproto_driver = SdkDemo()
```
## License
Apache License v2
## Author
- [Adek06](https://github.com/Adek06)

View File

@ -0,0 +1,38 @@
# Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from emqx.exproto.core import *
from emqx.exproto.abstract_handler import AbstractExProtoHandler
from emqx.exproto.connection import Connection, ConnectionInfo
class SdkDemo(AbstractExProtoHandler):
def on_connect(self, connection: Connection, connection_info: ConnectionInfo):
print(connection)
print(connection_info)
self.subscribe(connection, b"t/dn", 0)
def on_received(self, connection: Connection, data: bytes, state: any):
print(connection)
self.send(connection, data)
def on_terminated(self, connection: Connection, reason: str, state: any):
print(connection)
print(reason)
def on_deliver(self, connection: Connection, message_list: list):
print(connection)
for message in message_list:
print(message)
driver.exproto_driver = SdkDemo()

View File

@ -5,7 +5,7 @@
]}.
{deps,
[{grpc, {git, "https://gitee.com/fastdgiot/grpc-erl", {tag, "0.6.2"}}}
[{grpc, {git, "https://gitee.com/fastdgiot/grpc-erl", {tag, "0.6.3"}}}
]}.
{grpc,

View File

@ -54,7 +54,7 @@ exproto.listener.protoname.active_n = 100
## Idle timeout
##
## Value: Duration
exproto.listener.protoname.idle_timeout = 30s
exproto.listener.protoname.idle_timeout = 180s
## The access control rules for the MQTT/TCP listener.
##

View File

@ -13,7 +13,7 @@
]}.
{deps,
[{grpc, {git, "https://gitee.com/fastdgiot/grpc-erl", {tag, "0.6.2"}}}
[{grpc, {git, "https://gitee.com/fastdgiot/grpc-erl", {tag, "0.6.3"}}}
]}.
{grpc,

View File

@ -67,7 +67,7 @@
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
, {ejdbc, {git, "https://gitee.com/fastdgiot/ejdbc", {tag, "1.0.1"}}}
, {snabbkaffe, {git, "https://gitee.com/fastdgiot/snabbkaffe.git", {tag, "0.12.2"}}}
, {ibrowse, {git, "https://github.com.cnpmjs.org/fastdgiot/ibrowse.git", {tag, "v4.4.2"}}}
, {ibrowse, {git, "https://gitee.com/fastdgiot/ibrowse.git", {tag, "v4.4.2"}}}
]}.
{xref_ignores,