diff --git a/apps/dgiot_ffmpeg/src/dgiot_ffmpeg_channel.erl b/apps/dgiot_ffmpeg/src/dgiot_ffmpeg_channel.erl index 48ce5c0a..379483ef 100644 --- a/apps/dgiot_ffmpeg/src/dgiot_ffmpeg_channel.erl +++ b/apps/dgiot_ffmpeg/src/dgiot_ffmpeg_channel.erl @@ -196,8 +196,8 @@ handle_message({deliver, _Topic, Msg}, <<"instruct">> => Instruct, <<"devaddr">> => DevAddr, <<"ips">> => Ips }, -%% ?LOG(info,"AppData ~p", [AppData]), -%% dgiot_ffmpeg:start_live(NewEnv), + ?LOG(info,"AppData ~p", [AppData]), + dgiot_ffmpeg:start_live(NewEnv), dgiot_ffmpeg:start_video(NewEnv), {ok, State}; _ -> diff --git a/apps/dgiot_grpc/priv/example/java/exproto-svr-java/README-CN.md b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/README-CN.md new file mode 100644 index 00000000..a7bb0382 --- /dev/null +++ b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/README-CN.md @@ -0,0 +1,478 @@ +# Exproto-svr-java快速指南 + +[English](README.md) | 简体中文 + +## 前提 + +1. 仅支持开源版 4.3+。 +2. JDK 1.8 + +3. 目前官方的4.3.0-4.3.5的版本尚存在bug,需要补丁。以下是加入补丁之后的Demo。 +4. 如果你使用的时候,官方还没有修复bug,可将[补丁](https://github.com/emqx/emqx-extension-examples/tree/master/exproto-svr-python/patch/grpc_client.beam)放到你Emqx的lib/grpc-0.6.2/ebin下。 + +## 新建项目 + +首先新建一个普通 maven 项目 + +## 依赖配置 + +pom.xml 中依赖配置如下: + +```xml + + + 4.0.0 + + org.example + exproto-svr-java + 1.0-SNAPSHOT + + + io.grpc + grpc-netty-shaded + 1.39.0 + + + io.grpc + grpc-protobuf + 1.39.0 + + + io.grpc + grpc-stub + 1.39.0 + + + org.apache.tomcat + annotations-api + 6.0.53 + provided + + + com.google.protobuf + protobuf-java-util + 3.17.2 + + + + + + kr.motd.maven + os-maven-plugin + 1.6.2 + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.17.2:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:1.39.0:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + + +``` + +## 代码生成 + +然后需要通过GRPC的协议规范文件来生成代码, 在此处获取: [exproto.proto](https://github.com/emqx/emqx/tree/master/apps/emqx_exproto/priv/protos) 最新版协议规范描述文件, 把 proto 文件复制到项目目录下和 java 目录平行的 proto 文件夹内, 然后运行命令: + +```bash +mvn compile +``` + +或者直接在IDEA右侧选择MAVEN点击compile,同样的效果。![image-20210716100715536](static/image-20210716100715536.png) + +然后会在target目录下生成一系列目录,其中我们生成的代码在generated-sources + +``` +generated-sources +├── annotations +└── protobuf + ├── grpc-java + │   └──emqx + │ └──exproto + │ └──v1 + │ ├──ConnectionAdapterGrpc.java + │   └── ConnectionHandlerGrpc.java + └── java + └──emqx + └──exproto + └──v1 + └── Exproto.java + +``` + +到此为止我们的项目结构搭建成功,然后开始编写测试 + +## Demo + +在src/main/java下新建包emqx.exproto.v1与上面生成的代码的包名保持一致。此参数是根据proto 文件中的 package 参数生成的。 + +ConnectionHandler.java + +这里的HOST更换成你自己的EMQX服务器的IP。 + +```java +package emqx.exproto.v1; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; + +import java.util.List; + +/** + * @author WangScaler + * @date 2021/7/15 18:20 + */ + +public class ConnectionHandler extends ConnectionHandlerGrpc.ConnectionHandlerImplBase { + private static final String HOST = "192.168.1.19:9100"; + static ManagedChannel channel; + + static { + System.out.println("[LOG] Build singleton channel"); + channel = ManagedChannelBuilder + .forTarget(HOST) + .usePlaintext() + .build(); + } + + @Override + public StreamObserver onSocketCreated(final StreamObserver responseObserver) { + return new StreamObserver() { + public void onNext(Exproto.SocketCreatedRequest request) { + ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel); + System.out.println("[LOG] client socket connection:" + request.getConninfo()); + Exproto.ClientInfo clientInfo = Exproto.ClientInfo.newBuilder() + .setClientid("test") + .setUsername("test") + .build(); + Exproto.AuthenticateRequest authenticateRequest = Exproto.AuthenticateRequest.newBuilder() + .setClientinfo(clientInfo) + .setConn(request.getConn()) + .setPassword("test") + .build(); + Exproto.CodeResponse response = blockingStub.authenticate(authenticateRequest); + System.out.println("[LOG] authenticate" + response.getMessageBytes()); + Exproto.TimerRequest timerRequest = Exproto.TimerRequest.newBuilder() + .setConn(request.getConn()) + .setInterval(20) + .setType(Exproto.TimerType.KEEPALIVE) + .build(); + Exproto.CodeResponse timerResponse = blockingStub.startTimer(timerRequest); + System.out.println("[LOG] startTimer" + timerResponse.getMessageBytes()); + Exproto.SubscribeRequest subscribeRequest = Exproto.SubscribeRequest.newBuilder() + .setConn(request.getConn()) + .setTopic("/test") + .setQos(0) + .build(); + Exproto.CodeResponse subscribeResponse = blockingStub.subscribe(subscribeRequest); + System.out.println("[LOG] subscribe" + subscribeResponse.getMessageBytes()); + + } + + public void onError(Throwable throwable) { + System.out.println(" onSocketCreated error cause" + throwable.getCause()); + System.out.println(" onSocketCreated error message" + throwable.getMessage()); + } + + public void onCompleted() { + responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance()); + responseObserver.onCompleted(); + } + }; + } + + @Override + public StreamObserver onReceivedBytes(final StreamObserver responseObserver) { + return new StreamObserver() { + public void onNext(Exproto.ReceivedBytesRequest request) { + ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel); + System.out.println("[LOG] ReceivedBytesRequest:" + request.getConn()); + Exproto.PublishRequest publishRequest = Exproto.PublishRequest.newBuilder() + .setConn(request.getConn()) + .setTopic("/test1") + .setQos(0) + .setPayload(request.getBytes()).build(); + Exproto.CodeResponse response = blockingStub.publish(publishRequest); + System.out.println("[LOG] publish" + response.getMessage()); + } + + public void onError(Throwable throwable) { + System.out.println(" onReceivedBytes error cause" + throwable.getCause()); + System.out.println(" onReceivedBytes error message" + throwable.getMessage()); + } + + public void onCompleted() { + responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance()); + responseObserver.onCompleted(); + } + }; + } + + @Override + public StreamObserver onReceivedMessages(final StreamObserver responseObserver) { + return new StreamObserver() { + public void onNext(Exproto.ReceivedMessagesRequest receivedMessagesRequest) { + ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel); + System.out.println("[LOG] onReceivedMessages:" + receivedMessagesRequest.getConn()); + List messagesList = receivedMessagesRequest.getMessagesList(); + for (Exproto.Message message : messagesList) { + System.out.println("Message:" + message.getPayload()); + Exproto.SendBytesRequest sendBytesRequest = Exproto.SendBytesRequest.newBuilder() + .setConn(receivedMessagesRequest.getConn()) + .setBytes(message.getPayload()) + .build(); + Exproto.CodeResponse sendBytesResponse = blockingStub.send(sendBytesRequest); + System.out.println("[LOG] sendBytes" + sendBytesResponse.getMessage()); + + } + + } + + public void onError(Throwable throwable) { + System.out.println(" onReceivedMessages error cause" + throwable.getCause()); + System.out.println(" onReceivedMessages error message" + throwable.getMessage()); + } + + public void onCompleted() { + responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance()); + responseObserver.onCompleted(); + } + }; + } + + @Override + public StreamObserver onTimerTimeout(final StreamObserver responseObserver) { + return new StreamObserver() { + public void onNext(Exproto.TimerTimeoutRequest timerTimeoutRequest) { + ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel); + System.out.println("[LOG] onTimerTimeout"); + Exproto.CloseSocketRequest closeSocketRequest = Exproto.CloseSocketRequest.newBuilder() + .setConn(timerTimeoutRequest.getConn()) + .build(); + Exproto.CodeResponse closeResponse = blockingStub.close(closeSocketRequest); + System.out.println("[LOG] close" + closeResponse.getMessage()); + } + + public void onError(Throwable throwable) { + System.out.println(" onTimerTimeout error cause" + throwable.getCause()); + System.out.println(" onTimerTimeout error message" + throwable.getMessage()); + } + + public void onCompleted() { + responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance()); + responseObserver.onCompleted(); + } + }; + } + + @Override + public StreamObserver onSocketClosed(final StreamObserver responseObserver) { + return new StreamObserver() { + public void onNext(Exproto.SocketClosedRequest socketClosedRequest) { + System.out.println("[LOG] onSocketClosed:" + socketClosedRequest.toString()); + } + + public void onError(Throwable throwable) { + System.out.println(" onSocketClosed error cause" + throwable.getCause()); + System.out.println(" onSocketClosed error message" + throwable.getMessage()); + } + + public void onCompleted() { + responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance()); + responseObserver.onCompleted(); + } + }; + } +} +``` + +TestGrpcServer.java + +这里使用`NettyServerBuilder.permitKeepAliveTime(2, TimeUnit.SECONDS)`的原因是如果不设置的话,设备发送三分钟会出现断开现象,通过抓包发现`too_many_pings`,并且通过[A8-client-side-keepalive](https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#server-enforcement)发现是GRPC的一种机制。可参考[tcp keepalive](https://askemq.com/t/topic/684) + +```java +package emqx.exproto.v1; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; + + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public class TestGRpcServer { + + private static final Logger logger = Logger.getLogger(TestGRpcServer.class.getName()); + + public static void main(String[] args) throws IOException, InterruptedException { + int port = 9001; + Server server = NettyServerBuilder + .forPort(9001) + .permitKeepAliveTime(2, TimeUnit.SECONDS) + .permitKeepAliveWithoutCalls(true) + .addService(new ConnectionHandler()) + .build() + .start(); + logger.info("Server started, listening on " + port); + server.awaitTermination(); + } +} +``` + +最终的目录结构 + +``` +├── pom.xml +├── src +│ ├── main +│ │ ├── java +│ │ │ └── emqx +│ │ │ └── exproto +│ │ │ └── v1 +│ │ │ ├── ConnectionHandler.java +│ │ │ └── TestGRpcServer.java +│ │ ├── proto +│ │ │ └── exproto.proto +│ │ └── resources +│ └── test +│ └── java +├── target + └──generated-sources + ├── annotations + └── protobuf + ├── grpc-java + │   └──emqx + │ └──exproto + │ └──v1 + │ ├──ConnectionAdapterGrpc.java + │   └── ConnectionHandlerGrpc.java + └── java + └──emqx + └──exproto + └──v1 + └── Exproto.java + +``` + +## 测试过程 + +我这里使用的网络调试助手进行的测试,可以选择你自己的方式去连接,EMQX服务器的IP:192.168.1.19 + +### 连接 + +![image-20210716105416212](static/image-20210716105416212.png) + +控制台输出 + +``` +[LOG] client socket connection:peername { + host: "192.168.1.77" + port: 54985 +} +sockname { + host: "192.168.1.19" + port: 7993 +} +``` + +### 认证 + +控制台输出 + +``` +[LOG] authenticate code: 0 +``` + +此时观察Dashboard + +![image-20210716105448535](static/image-20210716105448535.png) + +证明认证成功 + +### 订阅 + +控制台输出 + +``` +LOG] subscribe code: 0 +``` + +此时在Dashboard也可观察到![image-20210716105710015](static/image-20210716105710015.png) + +证明订阅成功,当我们向该主题发送消息的时候,控制台输出 + +``` +[LOG] onReceivedMessages:g1hkAA5lbXF4QDEyNy4wLjAuMQAAE/8AAAABAAAAAQ== +Message: +[LOG] sendBytes code: 0 + +``` + +同时我们的网络助手也接收到了消息 + +![image-20210716110738053](static/image-20210716110738053.png) + +### 开启定时器 + +控制台输出 + +``` +[LOG] startTimer code: 0 +``` + +当超过我们设置的时间Interval(20)* 3=60s时将触发回调OnTimerTimeout。 + +``` +[LOG] onTimerTimeout +[LOG] close code: 0 +``` + +### 发布 + +当我们使用网络调试助手发送消息(test)时,控制台输出 + +``` +[LOG] ReceivedBytesRequest:g1hkAA5lbXF4QDEyNy4wLjAuMQAAFDwAAAABAAAAAQ== +[LOG] publish code: 0 +``` + +而我们的订阅者,也可以订阅到消息 + +![image-20210716111229969](static/image-20210716111229969.png) + +### 关闭 + +控制台打印 + +``` +[LOG] onSocketClosed:conn: "g1hkAA5lbXF4QDEyNy4wLjAuMQAAFDwAAAABAAAAAQ==" +reason: "{shutdown,{sock_closed,normal}}" +``` + +## 参考资料 + +- [1] [exproto-svr-java-for-enterpise-e4.2](https://github.com/emqx/emqx-extension-examples/tree/master/exproto-svr-java-for-enterpise-e4.2) +- [2] [gRpc.io](https://www.grpc.io/docs/languages/python/quickstart/) +- [3] [A8-client-side-keepalive](https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#server-enforcement) + +## 注意:这里的StartTimer的意义是如果监测到我们的tcp连接interval*3s没有发送数据,就会回调OnTimerTimeout函数,我们可以在这里处理一些业务,我这里将tcp连接直接踢掉。 \ No newline at end of file diff --git a/apps/dgiot_grpc/priv/example/java/exproto-svr-java/README.md b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/README.md new file mode 100644 index 00000000..3240a6de --- /dev/null +++ b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/README.md @@ -0,0 +1,4 @@ +# Exproto-svr-java Quick guide + +English | [简体中文](README-CN.md) + diff --git a/apps/dgiot_grpc/priv/example/java/exproto-svr-java/pom.xml b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/pom.xml new file mode 100644 index 00000000..4c7ff998 --- /dev/null +++ b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/pom.xml @@ -0,0 +1,68 @@ + + + 4.0.0 + + org.example + exproto-svr-java + 1.0-SNAPSHOT + + + io.grpc + grpc-netty-shaded + 1.39.0 + + + io.grpc + grpc-protobuf + 1.39.0 + + + io.grpc + grpc-stub + 1.39.0 + + + org.apache.tomcat + annotations-api + 6.0.53 + provided + + + com.google.protobuf + protobuf-java-util + 3.17.2 + + + + + + kr.motd.maven + os-maven-plugin + 1.6.2 + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.17.2:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:1.39.0:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + + + diff --git a/apps/dgiot_grpc/priv/example/java/exproto-svr-java/src/main/java/emqx/exproto/v1/ConnectionHandler.java b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/src/main/java/emqx/exproto/v1/ConnectionHandler.java new file mode 100644 index 00000000..f3108969 --- /dev/null +++ b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/src/main/java/emqx/exproto/v1/ConnectionHandler.java @@ -0,0 +1,174 @@ +package emqx.exproto.v1; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; + +import java.util.List; + +/** + * @author WangScaler + * @date 2021/7/15 18:20 + */ + +public class ConnectionHandler extends ConnectionHandlerGrpc.ConnectionHandlerImplBase { + private static final String HOST = "192.168.1.19:9100"; + static ManagedChannel channel; + + static { + System.out.println("[LOG] Build singleton channel"); + channel = ManagedChannelBuilder + .forTarget(HOST) + .usePlaintext() + .build(); + } + + @Override + public StreamObserver onSocketCreated(final StreamObserver responseObserver) { + return new StreamObserver() { + public void onNext(Exproto.SocketCreatedRequest request) { + ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel); + System.out.println("[LOG] client socket connection:" + request.getConninfo()); + Exproto.ClientInfo clientInfo = Exproto.ClientInfo.newBuilder() + .setClientid("test") + .setUsername("test") + .build(); + Exproto.AuthenticateRequest authenticateRequest = Exproto.AuthenticateRequest.newBuilder() + .setClientinfo(clientInfo) + .setConn(request.getConn()) + .setPassword("test") + .build(); + Exproto.CodeResponse response = blockingStub.authenticate(authenticateRequest); + System.out.println("[LOG] authenticate code: " + response.getCodeValue()); + Exproto.TimerRequest timerRequest = Exproto.TimerRequest.newBuilder() + .setConn(request.getConn()) + .setInterval(20) + .setType(Exproto.TimerType.KEEPALIVE) + .build(); + Exproto.CodeResponse timerResponse = blockingStub.startTimer(timerRequest); + System.out.println("[LOG] startTimer code: " + timerResponse.getCodeValue()); + Exproto.SubscribeRequest subscribeRequest = Exproto.SubscribeRequest.newBuilder() + .setConn(request.getConn()) + .setTopic("/test") + .setQos(0) + .build(); + Exproto.CodeResponse subscribeResponse = blockingStub.subscribe(subscribeRequest); + System.out.println("[LOG] subscribe code: " + subscribeResponse.getCodeValue()); + + } + + public void onError(Throwable throwable) { + System.out.println(" onSocketCreated error cause" + throwable.getCause()); + System.out.println(" onSocketCreated error message" + throwable.getMessage()); + } + + public void onCompleted() { + responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance()); + responseObserver.onCompleted(); + } + }; + } + + @Override + public StreamObserver onReceivedBytes(final StreamObserver responseObserver) { + return new StreamObserver() { + public void onNext(Exproto.ReceivedBytesRequest request) { + ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel); + System.out.println("[LOG] ReceivedBytesRequest:" + request.getConn()); + Exproto.PublishRequest publishRequest = Exproto.PublishRequest.newBuilder() + .setConn(request.getConn()) + .setTopic("/test1") + .setQos(0) + .setPayload(request.getBytes()).build(); + Exproto.CodeResponse response = blockingStub.publish(publishRequest); + System.out.println("[LOG] publish code: " + response.getCodeValue()); + } + + public void onError(Throwable throwable) { + System.out.println(" onReceivedBytes error cause" + throwable.getCause()); + System.out.println(" onReceivedBytes error message" + throwable.getMessage()); + } + + public void onCompleted() { + responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance()); + responseObserver.onCompleted(); + } + }; + } + + @Override + public StreamObserver onReceivedMessages(final StreamObserver responseObserver) { + return new StreamObserver() { + public void onNext(Exproto.ReceivedMessagesRequest receivedMessagesRequest) { + ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel); + System.out.println("[LOG] onReceivedMessages:" + receivedMessagesRequest.getConn()); + List messagesList = receivedMessagesRequest.getMessagesList(); + for (Exproto.Message message : messagesList) { + System.out.println("Message:" + message.getPayload()); + Exproto.SendBytesRequest sendBytesRequest = Exproto.SendBytesRequest.newBuilder() + .setConn(receivedMessagesRequest.getConn()) + .setBytes(message.getPayload()) + .build(); + Exproto.CodeResponse sendBytesResponse = blockingStub.send(sendBytesRequest); + System.out.println("[LOG] sendBytes code: " + sendBytesResponse.getCodeValue()); + + } + + } + + public void onError(Throwable throwable) { + System.out.println(" onReceivedMessages error cause" + throwable.getCause()); + System.out.println(" onReceivedMessages error message" + throwable.getMessage()); + } + + public void onCompleted() { + responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance()); + responseObserver.onCompleted(); + } + }; + } + + @Override + public StreamObserver onTimerTimeout(final StreamObserver responseObserver) { + return new StreamObserver() { + public void onNext(Exproto.TimerTimeoutRequest timerTimeoutRequest) { + ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel); + System.out.println("[LOG] onTimerTimeout"); + Exproto.CloseSocketRequest closeSocketRequest = Exproto.CloseSocketRequest.newBuilder() + .setConn(timerTimeoutRequest.getConn()) + .build(); + Exproto.CodeResponse closeResponse = blockingStub.close(closeSocketRequest); + System.out.println("[LOG] close code: " + closeResponse.getCodeValue()); + } + + public void onError(Throwable throwable) { + System.out.println(" onTimerTimeout error cause" + throwable.getCause()); + System.out.println(" onTimerTimeout error message" + throwable.getMessage()); + } + + public void onCompleted() { + responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance()); + responseObserver.onCompleted(); + } + }; + } + + @Override + public StreamObserver onSocketClosed(final StreamObserver responseObserver) { + return new StreamObserver() { + public void onNext(Exproto.SocketClosedRequest socketClosedRequest) { + System.out.println("[LOG] onSocketClosed:" + socketClosedRequest.toString()); + } + + public void onError(Throwable throwable) { + System.out.println(" onSocketClosed error cause" + throwable.getCause()); + System.out.println(" onSocketClosed error message" + throwable.getMessage()); + } + + public void onCompleted() { + responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance()); + responseObserver.onCompleted(); + } + }; + } +} diff --git a/apps/dgiot_grpc/priv/example/java/exproto-svr-java/src/main/java/emqx/exproto/v1/TestGrpcServer.java b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/src/main/java/emqx/exproto/v1/TestGrpcServer.java new file mode 100644 index 00000000..770bb82f --- /dev/null +++ b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/src/main/java/emqx/exproto/v1/TestGrpcServer.java @@ -0,0 +1,30 @@ +package emqx.exproto.v1; + +import io.grpc.Server; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * @author WangScaler + * @date 2021/7/15 18:22 + */ + +public class TestGrpcServer { + private static final Logger logger = Logger.getLogger(TestGrpcServer.class.getName()); + + public static void main(String[] args) throws IOException, InterruptedException { + int port = 9001; + Server server = NettyServerBuilder + .forPort(9001) + .permitKeepAliveTime(2, TimeUnit.SECONDS) + .permitKeepAliveWithoutCalls(true) + .addService(new ConnectionHandler()) + .build() + .start(); + logger.info("Server started, listening on " + port); + server.awaitTermination(); + } +} diff --git a/apps/dgiot_grpc/priv/example/java/exproto-svr-java/src/main/proto/exproto.proto b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/src/main/proto/exproto.proto new file mode 100644 index 00000000..bbc10073 --- /dev/null +++ b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/src/main/proto/exproto.proto @@ -0,0 +1,259 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//------------------------------------------------------------------------------ + +syntax = "proto3"; + +package emqx.exproto.v1; + +// The Broker side serivce. It provides a set of APIs to +// handle a protcol access +service ConnectionAdapter { + + // -- socket layer + + rpc Send(SendBytesRequest) returns (CodeResponse) {}; + + rpc Close(CloseSocketRequest) returns (CodeResponse) {}; + + // -- protocol layer + + rpc Authenticate(AuthenticateRequest) returns (CodeResponse) {}; + + rpc StartTimer(TimerRequest) returns (CodeResponse) {}; + + // -- pub/sub layer + + rpc Publish(PublishRequest) returns (CodeResponse) {}; + + rpc Subscribe(SubscribeRequest) returns (CodeResponse) {}; + + rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {}; +} + +service ConnectionHandler { + + // -- socket layer + + rpc OnSocketCreated(stream SocketCreatedRequest) returns (EmptySuccess) {}; + + rpc OnSocketClosed(stream SocketClosedRequest) returns (EmptySuccess) {}; + + rpc OnReceivedBytes(stream ReceivedBytesRequest) returns (EmptySuccess) {}; + + // -- pub/sub layer + + rpc OnTimerTimeout(stream TimerTimeoutRequest) returns (EmptySuccess) {}; + + rpc OnReceivedMessages(stream ReceivedMessagesRequest) returns (EmptySuccess) {}; +} + +message EmptySuccess { } + +enum ResultCode { + + // Operation successfully + SUCCESS = 0; + + // Unknown Error + UNKNOWN = 1; + + // Connection process is not alive + CONN_PROCESS_NOT_ALIVE = 2; + + // Miss the required parameter + REQUIRED_PARAMS_MISSED = 3; + + // Params type or values incorrect + PARAMS_TYPE_ERROR = 4; + + // No permission or Pre-conditions not fulfilled + PERMISSION_DENY = 5; +} + +message CodeResponse { + + ResultCode code = 1; + + // The reason message if result is false + string message = 2; +} + +message SendBytesRequest { + + string conn = 1; + + bytes bytes = 2; +} + +message CloseSocketRequest { + + string conn = 1; +} + +message AuthenticateRequest { + + string conn = 1; + + ClientInfo clientinfo = 2; + + string password = 3; +} + +message TimerRequest { + + string conn = 1; + + TimerType type = 2; + + uint32 interval = 3; +} + +enum TimerType { + + KEEPALIVE = 0; +} + +message PublishRequest { + + string conn = 1; + + string topic = 2; + + uint32 qos = 3; + + bytes payload = 4; +} + +message SubscribeRequest { + + string conn = 1; + + string topic = 2; + + uint32 qos = 3; +} + +message UnsubscribeRequest { + + string conn = 1; + + string topic = 2; +} + +message SocketCreatedRequest { + + string conn = 1; + + ConnInfo conninfo = 2; +} + +message ReceivedBytesRequest { + + string conn = 1; + + bytes bytes = 2; +} + +message TimerTimeoutRequest { + + string conn = 1; + + TimerType type = 2; +} + +message SocketClosedRequest { + + string conn = 1; + + string reason = 2; +} + +message ReceivedMessagesRequest { + + string conn = 1; + + repeated Message messages = 2; +} + +//-------------------------------------------------------------------- +// Basic data types +//-------------------------------------------------------------------- + +message ConnInfo { + + SocketType socktype = 1; + + Address peername = 2; + + Address sockname = 3; + + CertificateInfo peercert = 4; +} + +enum SocketType { + + TCP = 0; + + SSL = 1; + + UDP = 2; + + DTLS = 3; +} + +message Address { + + string host = 1; + + uint32 port = 2; +} + +message CertificateInfo { + + string cn = 1; + + string dn = 2; +} + +message ClientInfo { + + string proto_name = 1; + + string proto_ver = 2; + + string clientid = 3; + + string username = 4; + + string mountpoint = 5; +} + +message Message { + + string node = 1; + + string id = 2; + + uint32 qos = 3; + + string from = 4; + + string topic = 5; + + bytes payload = 6; + + uint64 timestamp = 7; +} diff --git a/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716100715536.png b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716100715536.png new file mode 100644 index 00000000..3d8c0330 Binary files /dev/null and b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716100715536.png differ diff --git a/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716105416212.png b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716105416212.png new file mode 100644 index 00000000..0b386e20 Binary files /dev/null and b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716105416212.png differ diff --git a/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716105448535.png b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716105448535.png new file mode 100644 index 00000000..e6537207 Binary files /dev/null and b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716105448535.png differ diff --git a/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716105710015.png b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716105710015.png new file mode 100644 index 00000000..fb663555 Binary files /dev/null and b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716105710015.png differ diff --git a/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716110738053.png b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716110738053.png new file mode 100644 index 00000000..7034249f Binary files /dev/null and b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716110738053.png differ diff --git a/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716111229969.png b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716111229969.png new file mode 100644 index 00000000..7588fce0 Binary files /dev/null and b/apps/dgiot_grpc/priv/example/java/exproto-svr-java/static/image-20210716111229969.png differ diff --git a/apps/dgiot_grpc/priv/example/python3/README.md b/apps/dgiot_grpc/priv/example/python3/README.md new file mode 100644 index 00000000..b644d8e9 --- /dev/null +++ b/apps/dgiot_grpc/priv/example/python3/README.md @@ -0,0 +1,153 @@ +# emqx-exproto-python-sdk + +The Python SDK for emqx-exproto + +## Installation + +There are the following ways to install this library + +#### Install by Pypi + +```bash +pip3 install emqx-exproto-sdk +``` + +#### Install from source code + +```bash +git clone https://github.com/emqx/emqx-exproto-python-sdk.git + +cd emqx-exproto-python-sdk + +python3 setup.py install +``` + +## Get Started + +- First of all, follow the step Installation to install dependencies +- Create your Python project + +## Deploy + +After compiled all source codes, you should deploy the sdk and your source files into emqx: + +1. Ensure the emqx hosted machine has installed the `emqx-exproto-python-sdk`. +2. Copy your source code files. E.g: copy `example/demo.py` to `emqx/data/extension` directory. +3. Modify the `emqx/etc/plugins/emqx_exproto.conf` file. e.g: + + ```properties + exproto.listener.protoname = tcp://0.0.0.0:7993 + exproto.listener.protoname.driver = python + exproto.listener.protoname.driver_search_path = data/extension + exproto.listener.protoname.driver_callback_module = demo + ``` +4. Execute `bin/emqx console` to start emqx and load the `emqx_exproto` plugin. + +Use `telnet 127.0.0.1 7993` to establish a TCP connection and observe the console output. + +## Interface + +### Callbacks + +**Connection Layer callbacks** (The Connection object represents a TCP/UDP Socket entity): + +```python +// This function will be scheduled after a TCP connection established to EMQ X +// or receive a new UDP socket. +on_connect(connection: Connection, connection_info: ConnectionInfo) + +// This callback will be scheduled when a connection received bytes from TCP/UDP socket. +on_received(connection: Connection, data: bytes, state: any) + +// This function will be scheduled after a connection terminated. +// +// It indicates that the EMQ X process that maintains the TCP/UDP socket +// has been closed. E.g: a TCP connection is closed, or a UDP socket has +// exceeded maintenance hours. +on_terminated(connection: Connection, reason: str, state: any) +``` + +**Pub/Sub Layer callbacks:** + +```python +// This function will be scheduled when a connection received a Message from EMQ X +// +// When a connection is subscribed to a topic and a message arrives on that topic, +// EMQ X will deliver the message to that connection. At that time, this function +// is triggered. +on_deliver(connection: Connection, message_list: list) +``` + +### APIs + +Similarly, AbstractExprotoHandler also provides a set of APIs to facilitate the use of the `emqx-exproto` APIs. + +**Connection Layer APIs:** + +```python +// Send a stream of bytes to the connection. These bytes are delivered directly +// to the associated TCP/UDP socket. +send(connection: Connection, data: bytes) + +// Terminate the connection process and TCP/UDP socket. +terminate(connection: Connection) +``` + +**Pub/Sub Layer APIs:** + +```python +// Register the connection as a Client of EMQ X. This `clientInfo` contains the +// necessary field information to be an EMQ X client. +// +// This method should normally be invoked after confirming that a connection is +// allowed to access the EMQ X system. For example: after the connection packet +// has been parsed and authenticated successfully. +register(connection: Connection, client_info: ClientInfo) + +// The connection Publish a Message to EMQ X +publish(connection: Connection, message: Message) + +// The connection Subscribe a Topic to EMQ X +subscribe(connection: Connection, topic: str, qos: int) +``` + +## Example + +```python +from emqx.exproto.core import * +from emqx.exproto.abstract_handler import AbstractExProtoHandler +from emqx.exproto.connection import Connection, ConnectionInfo + +import emqx.exproto.driver as driver + +class SdkDemo(AbstractExProtoHandler): + # In this class, you need to implement abstract AbstractExProtoHandler + + def on_connect(self, connection: Connection, connection_info: ConnectionInfo): + print(connection) + print(connection_info) + + def on_received(self, connection: Connection, data: bytes, state: any): + print(connection) + print(data) + + def on_terminated(self, connection: Connection, reason: str, state: any): + print(connection) + print(reason) + + def on_deliver(self, connection: Connection, message_list: list): + print(connection) + for message in message_list: + print(message) + +# set exproto_driver +driver.exproto_driver = SdkDemo() +``` + +## License + +Apache License v2 + +## Author + +- [Adek06](https://github.com/Adek06) diff --git a/apps/dgiot_grpc/priv/example/python3/demo.py b/apps/dgiot_grpc/priv/example/python3/demo.py new file mode 100644 index 00000000..7bc200b8 --- /dev/null +++ b/apps/dgiot_grpc/priv/example/python3/demo.py @@ -0,0 +1,38 @@ +# Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from emqx.exproto.core import * +from emqx.exproto.abstract_handler import AbstractExProtoHandler +from emqx.exproto.connection import Connection, ConnectionInfo + +class SdkDemo(AbstractExProtoHandler): + def on_connect(self, connection: Connection, connection_info: ConnectionInfo): + print(connection) + print(connection_info) + self.subscribe(connection, b"t/dn", 0) + + def on_received(self, connection: Connection, data: bytes, state: any): + print(connection) + self.send(connection, data) + + def on_terminated(self, connection: Connection, reason: str, state: any): + print(connection) + print(reason) + + def on_deliver(self, connection: Connection, message_list: list): + print(connection) + for message in message_list: + print(message) + +driver.exproto_driver = SdkDemo() diff --git a/apps/emqx_exhook/rebar.config b/apps/emqx_exhook/rebar.config index 8e47d58e..9c0a9825 100644 --- a/apps/emqx_exhook/rebar.config +++ b/apps/emqx_exhook/rebar.config @@ -5,7 +5,7 @@ ]}. {deps, - [{grpc, {git, "https://gitee.com/fastdgiot/grpc-erl", {tag, "0.6.2"}}} + [{grpc, {git, "https://gitee.com/fastdgiot/grpc-erl", {tag, "0.6.3"}}} ]}. {grpc, diff --git a/apps/emqx_exproto/etc/emqx_exproto.conf b/apps/emqx_exproto/etc/emqx_exproto.conf index 71368573..6a7401cc 100644 --- a/apps/emqx_exproto/etc/emqx_exproto.conf +++ b/apps/emqx_exproto/etc/emqx_exproto.conf @@ -54,7 +54,7 @@ exproto.listener.protoname.active_n = 100 ## Idle timeout ## ## Value: Duration -exproto.listener.protoname.idle_timeout = 30s +exproto.listener.protoname.idle_timeout = 180s ## The access control rules for the MQTT/TCP listener. ## diff --git a/apps/emqx_exproto/rebar.config b/apps/emqx_exproto/rebar.config index 543d900a..dfdb6ee4 100644 --- a/apps/emqx_exproto/rebar.config +++ b/apps/emqx_exproto/rebar.config @@ -13,7 +13,7 @@ ]}. {deps, - [{grpc, {git, "https://gitee.com/fastdgiot/grpc-erl", {tag, "0.6.2"}}} + [{grpc, {git, "https://gitee.com/fastdgiot/grpc-erl", {tag, "0.6.3"}}} ]}. {grpc, diff --git a/rebar.config b/rebar.config index 0274ecb4..38b7f97a 100644 --- a/rebar.config +++ b/rebar.config @@ -67,7 +67,7 @@ , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {ejdbc, {git, "https://gitee.com/fastdgiot/ejdbc", {tag, "1.0.1"}}} , {snabbkaffe, {git, "https://gitee.com/fastdgiot/snabbkaffe.git", {tag, "0.12.2"}}} - , {ibrowse, {git, "https://github.com.cnpmjs.org/fastdgiot/ibrowse.git", {tag, "v4.4.2"}}} + , {ibrowse, {git, "https://gitee.com/fastdgiot/ibrowse.git", {tag, "v4.4.2"}}} ]}. {xref_ignores,