Merge pull request #133 from nutzam/add_mqtt_client

add: starter-mqtt-client 及其demo
This commit is contained in:
Wendal Chen 2018-05-02 17:30:25 -07:00 committed by GitHub
commit e4338b43ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 422 additions and 38 deletions

View File

@ -1,5 +1,11 @@
# NB进化史
# dev 迭代中
* 变更:
* add: jetty支持任意状态码和任意异常类型的错误页面设置 by 蛋蛋 and wendal
* add: 添加starter-mqtt-client by wendal
# 2.1.6 "A Million Dreams"
增强对新版dubbo的集成,以协助nutzwk的nutzboot-dubbo分支的线上部署.

View File

@ -127,6 +127,7 @@ public class MainLauncher {
* [NutzBoot目录约定](doc/struct.md)
* [NB与Nutz.Mvc对比](doc/diff_nb_mvc.md)
* [转换为NB项目](doc/convert2nb.md)
* [Maven Plugin](https://github.com/nutzam/nutzboot-maven-plugin)
## 开发进度
@ -134,7 +135,7 @@ public class MainLauncher {
- 基础框架
- [x] 基础框架的文档
- [x] starter-core 核心框架的实现
- [x] nutzboot-core 核心框架的实现
- 嵌入式web容器
- [x] starter-[jetty](https://www.eclipse.org/jetty/)
- [x] starter-[undertow](http://undertow.io/) by [@qinerg](https://github.com/qinerg)
@ -210,6 +211,8 @@ public class MainLauncher {
- 微信公众号开放平台
- [x] starter-[nutzwx](https://github.com/nutzam/nutzwx) Weixin Api By Nutz
- [ ] [weixin-java-tools](https://gitee.com/binary/weixin-java-tools)
- 物联网(IoT)
- [x] starter-[mqtt-client](https://github.com/eclipse/paho.mqtt.java) 消息队列遥测传输, IoT 通信的标准
- 云平台
- [ ] [阿里云](https://aliyun.com)
- [ ] [腾讯云](https://qcloud.com)

View File

@ -197,43 +197,11 @@
<additionalparam>-Xdoclint:none</additionalparam>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<mainClass>[[${params.packageName}]].MainLauncher</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/nutz/org.nutz.boot.starter.NbStarter</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>[[${params.packageName}]].MainLauncher</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.nutz.boot</groupId>
<artifactId>nutzboot-maven-plugin</artifactId>
<version>${nutzboot.version}</version>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>nutzboot-demo-simple</artifactId>
<groupId>org.nutz</groupId>
<version>2.2-SNAPSHOT</version>
</parent>
<artifactId>nutzboot-demo-simple-mqtt-client</artifactId>
<dependencies>
<dependency>
<groupId>org.nutz</groupId>
<artifactId>nutzboot-starter-mqtt-client</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,62 @@
package io.nutz.demo.simple;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.nutz.boot.NbApp;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;
import org.nutz.json.Json;
import org.nutz.lang.Tasks;
import org.nutz.lang.util.NutMap;
import org.nutz.log.Log;
import org.nutz.log.Logs;
@IocBean(create="init")
public class MainLauncher {
private static final Log log = Logs.get();
@Inject
MqttClient mqttClient;
@Inject
MqttAsyncClient mqttAsyncClient;
public void init() throws MqttException {
// 这里为了方便演示,订阅发布都是同一个topic
String topic = "nutzboot-mqtt-demo";
// 启动一个订阅者
Tasks.getTaskScheduler().schedule(()->{
try {
mqttClient.subscribe(topic, (_topic, msg)->{
log.debugf("revc topic=%s msg=%s", _topic, msg.toString());
});
}
catch (MqttException e) {
log.debug("FUCK", e);
}
}, 1, TimeUnit.MILLISECONDS);
// 定时发布一个消息
Tasks.getTaskScheduler().scheduleAtFixedRate(()->{
try {
NutMap msg = new NutMap();
msg.setv("sender", mqttClient.getClientId());
msg.setv("time", System.currentTimeMillis());
mqttClient.publish(topic, new MqttMessage(Json.toJson(msg).getBytes()));
}
catch (MqttException e) {
log.debug("FUCK", e);
}
}, 2, 5, TimeUnit.SECONDS);
}
// 请启动2次!!
public static void main(String[] args) throws Exception {
new NbApp().setPrintProcDoc(true).run();
}
}

View File

@ -0,0 +1,33 @@
package io.nutz.demo.simple.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.nutz.ioc.loader.annotation.IocBean;
import org.nutz.lang.Strings;
import org.nutz.log.Log;
import org.nutz.log.Logs;
@IocBean(name="mqttCallback")
public class MqttCallbackImpl implements MqttCallback {
private static final Log log = Logs.get();
@Override
public void connectionLost(Throwable cause) {
log.info("!!!!FUCK!!!!", cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.debugf("topic=%s, msg id=%s, qos=%s, len=%s", topic, message.getId(), message.getQos(), message.getPayload().length);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
if (!log.isDebugEnabled())
return;
log.debugf("token topics=[%s]", Strings.join(",", token.getTopics()));
}
}

View File

@ -0,0 +1,4 @@
server.port=8081
server.host=0.0.0.0
mqtt.client.url=tcp://iot.eclipse.org:1883

View File

@ -0,0 +1,7 @@
log4j.rootLogger=debug,Console
log4j.logger.org.eclipse.jetty=info
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=[%-5p] %d{HH:mm:ss.SSS} %l - %m%n

View File

@ -0,0 +1,10 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>404</title>
</head>
<body>
this is 404 page!
</body>
</html>

View File

@ -0,0 +1,10 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>500</title>
</head>
<body>
this is 500 page!
</body>
</html>

View File

@ -0,0 +1,10 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Hello, So NB!</title>
</head>
<body>
Hello, So NB!(Jetty)
</body>
</html>

View File

@ -46,6 +46,7 @@
<module>nutzboot-demo-simple-freemarker</module>
<module>nutzboot-demo-simple-tio-mvc</module>
<module>nutzboot-demo-simple-web3j</module>
<module>nutzboot-demo-simple-mqtt-client</module>
</modules>
<dependencies>

View File

@ -0,0 +1,60 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.nutz</groupId>
<artifactId>nutzboot-starter</artifactId>
<version>2.2-SNAPSHOT</version>
</parent>
<artifactId>nutzboot-starter-mqtt-client</artifactId>
<packaging>jar</packaging>
<name>nutzboot-starter-mqtt-client</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<description>NutzBoot, micoservice base on Nutz</description>
<url>http://nutzam.com</url>
<issueManagement>
<system>Github Issue</system>
<url>http://github.com/nutzam/nutzboot/issues</url>
</issueManagement>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
<developers>
<developer>
<id>wendal</id>
<name>Wendal Chen</name>
<email>wendal1985@gmail.com</email>
<url>http://wendal.net/</url>
</developer>
</developers>
<scm>
<connection>scm:git:git://github.com/nutzam/nutzboot.git</connection>
<developerConnection>scm:git:git://github.com/nutzam/nutzboot.git</developerConnection>
<url>git://github.com/nutzam/nutzboot.git</url>
</scm>
<distributionManagement>
<snapshotRepository>
<id>nutzcn-snapshots</id>
<name>NutzCN snapshot repository</name>
<url>https://jfrog.nutz.cn/artifactory/snapshots</url>
</snapshotRepository>
<repository>
<id>sonatype-release-staging</id>
<name>Sonatype Nexus release repository</name>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2</url>
</repository>
</distributionManagement>
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,175 @@
package org.nutz.boot.starter.mqtt.client;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.nutz.boot.annotation.PropDoc;
import org.nutz.ioc.Ioc;
import org.nutz.ioc.impl.PropertiesProxy;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;
import org.nutz.lang.Strings;
import org.nutz.log.Log;
import org.nutz.log.Logs;
/**
* 封装Eclipse Paho项目的Java客户端, 它实现了Mqtt 3.1.1/3.1协议, 兼容实现了该协议的服务器端,包括paho和emqtt
* @author wendal(wendal1985@gmail.com)
*
*/
@IocBean
public class MqttClientStarter {
private static final Log log = Logs.get();
/**
* 这是客户端,暂定前缀
*/
protected static final String PRE = "mqtt.client.";
@PropDoc(value = "服务器地址", defaultValue = "tcp://127.0.0.1:1883")
public static final String PROP_URL = PRE + "url";
@PropDoc(value = "客户端id", defaultValue = "MqttClient.generateClientId()")
public static final String PROP_CLIENT_ID = PRE + "clientId";
@PropDoc(value = "同步客户端的最大等待时间", defaultValue = "-1")
public static final String PROP_TIME_TO_WAIT = PRE + "timeToWait";
@PropDoc(value = "启动时自动连接", defaultValue = "true")
public static final String PROP_CONNECT_ON_START = PRE + "connectOnStart";
@PropDoc(value = "自动重连", defaultValue = "true")
public static final String PROP_OPTIONS_AUTOMATIC_RECONNECT = PRE + "options.automaticReconnect";
@PropDoc(value = "心跳频率,单位秒", defaultValue = "60")
public static final String PROP_OPTIONS_KEEP_ALIVE_INTERVAL = PRE + "options.keepAliveInterval";
@PropDoc(value = "Will消息的topic")
public static final String PROP_OPTIONS_WILL_TOPIC = PRE + "options.will.topic";
@PropDoc(value = "Will消息的内容")
public static final String PROP_OPTIONS_WILL_PAYLOAD = PRE + "options.will.payload";
@PropDoc(value = "Will消息的QOS", defaultValue = "2")
public static final String PROP_OPTIONS_WILL_QOS = PRE + "options.will.qos";
@PropDoc(value = "Will消息是否retained", defaultValue = "true")
public static final String PROP_OPTIONS_WILL_RETAINED = PRE + "options.will.retained";
@PropDoc(value = "用户名")
public static final String PROP_OPTIONS_USERNAME = PRE + "options.username";
@PropDoc(value = "密码")
public static final String PROP_OPTIONS_PASSWORD = PRE + "options.password";
@PropDoc(value = "清除session", defaultValue = "true")
public static final String PROP_OPTIONS_CLEAN_SESSION = PRE + "options.cleanSession";
@PropDoc(value = "连接超时设置", defaultValue = "30")
public static final String PROP_OPTIONS_CONNECTION_TIMEOUT = PRE + "options.connectionTimeout";
@PropDoc(value = "多服务器地址设置")
public static final String PROP_OPTIONS_URLS = PRE + "options.urls";
// TODO SSL相关的配置
@PropDoc(value = "持久化方式", defaultValue = "memory", possible = {"memory", "file"})
public static final String PROP_PERSISTENCE_TYPE = PRE + "persistence.type";
@PropDoc(value = "文件持久化时的目录", defaultValue = "用户主目录")
public static final String PROP_PERSISTENCE_PATH = PRE + "persistence.path";
@Inject
protected PropertiesProxy conf;
@Inject("refer:$ioc")
protected Ioc ioc;
/**
* MqttClient的主要配置来至于MqttConnectOptions
*/
@IocBean(name = "mqttConnectOptions")
public MqttConnectOptions createMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(conf.getBoolean(PROP_OPTIONS_AUTOMATIC_RECONNECT, true));
options.setKeepAliveInterval(conf.getInt(PROP_OPTIONS_KEEP_ALIVE_INTERVAL, 60));
if (!Strings.isBlank(conf.get(PROP_OPTIONS_WILL_TOPIC))) {
options.setWill(conf.get(PROP_OPTIONS_WILL_TOPIC),
conf.get(PROP_OPTIONS_WILL_PAYLOAD).getBytes(),
conf.getInt(PROP_OPTIONS_WILL_QOS, 2),
conf.getBoolean(PROP_OPTIONS_WILL_RETAINED, true));
}
// 用户信息不一定存在,也不一定需要,所以需要判断一下是否真的要设置
if (!Strings.isBlank(conf.get(PROP_OPTIONS_USERNAME))) {
options.setUserName(conf.get(PROP_OPTIONS_USERNAME));
}
if (!Strings.isBlank(conf.get(PROP_OPTIONS_PASSWORD))) {
options.setPassword(conf.get(PROP_OPTIONS_PASSWORD).toCharArray());
}
options.setCleanSession(conf.getBoolean(PROP_OPTIONS_CLEAN_SESSION, MqttConnectOptions.CLEAN_SESSION_DEFAULT));
options.setConnectionTimeout(conf.getInt(PROP_OPTIONS_CONNECTION_TIMEOUT, MqttConnectOptions.CONNECTION_TIMEOUT_DEFAULT));
// 事实上, urls的优先级是高于url的
if (!Strings.isBlank(conf.get(PROP_OPTIONS_URLS))) {
options.setServerURIs(Strings.splitIgnoreBlank(conf.get(PROP_OPTIONS_URLS), " "));
}
// TODO 完成SSL相关的配置
return options;
}
/**
* mqtt持久化策略, 默认行为是存到内存
*/
@IocBean(name = "mqttClientPersistence")
public MqttClientPersistence createMqttClientPersistence() {
switch (conf.get(PROP_PERSISTENCE_TYPE, "memory")) {
case "file":
return new MqttDefaultFilePersistence(conf.get(PROP_PERSISTENCE_PATH, System.getProperty("user.dir")));
default:
return new MemoryPersistence();
}
}
/**
* 同步阻塞客户端, 然后它不支持通过MqttAsyncClient来构建,真蛋疼
*/
@IocBean(name = "mqttClient", depose = "close")
public MqttClient createMqttClient(@Inject MqttConnectOptions mqttConnectOptions, @Inject MqttClientPersistence mqttClientPersistence) throws MqttException {
String clientId = conf.get(PROP_CLIENT_ID);
if (Strings.isBlank(clientId)) {
clientId = MqttClient.generateClientId();
}
log.info("Client Id = " + clientId);
MqttClient client = new MqttClient(conf.get(PROP_URL, "tcp://127.0.0.1:1883"), clientId, mqttClientPersistence);
if (ioc.has("mqttCallback")) {
client.setCallback(ioc.get(MqttCallback.class, "mqttCallback"));
}
client.setTimeToWait(conf.getLong(PROP_TIME_TO_WAIT, -1));
if (conf.getBoolean(PROP_CONNECT_ON_START, true)) {
IMqttToken token = client.connectWithResult(mqttConnectOptions);
if (token.getException() != null)
throw token.getException();
}
return client;
}
/**
* 异步客户端
*/
@IocBean(name = "mqttAsyncClient", depose = "close")
public MqttAsyncClient createMqttAsyncClient(@Inject MqttConnectOptions mqttConnectOptions, @Inject MqttClientPersistence mqttClientPersistence) throws MqttException {
String clientId = conf.get(PROP_CLIENT_ID);
if (Strings.isBlank(clientId)) {
clientId = MqttClient.generateClientId();
}
log.info("Client Id = " + clientId);
MqttAsyncClient client = new MqttAsyncClient(conf.get(PROP_URL, "tcp://127.0.0.1:1883"), clientId, mqttClientPersistence);
if (ioc.has("mqttCallback")) {
client.setCallback(ioc.get(MqttCallback.class, "mqttCallback"));
}
if (conf.getBoolean(PROP_CONNECT_ON_START, true)) {
IMqttToken token = client.connect(mqttConnectOptions, null, null);
token.waitForCompletion(conf.getLong(PROP_TIME_TO_WAIT, -1));
if (token.getException() != null)
throw token.getException();
}
return client;
}
}

View File

@ -0,0 +1 @@
org.nutz.boot.starter.mqtt.client.MqttClientStarter

View File

@ -60,6 +60,7 @@
<module>nutzboot-starter-tio-mvc</module>
<module>nutzboot-starter-web3j</module>
<module>nutzboot-starter-test-junit4</module>
<module>nutzboot-starter-mqtt-client</module>
</modules>
<dependencies>
<dependency>

11
pom.xml
View File

@ -28,6 +28,7 @@
<feign.version>9.5.1</feign.version>
<hystrix.vesion>1.5.12</hystrix.vesion>
<web3j.vesion>3.2.0</web3j.vesion>
<mqtt.version>1.2.0</mqtt.version>
</properties>
<description>NutzBoot, micoservice base on Nutz</description>
@ -482,6 +483,11 @@
<artifactId>nutzboot-starter-nutz-mvc</artifactId>
<version>${nutzboot.version}</version>
</dependency>
<dependency>
<groupId>org.nutz</groupId>
<artifactId>nutzboot-starter-mqtt-client</artifactId>
<version>${nutzboot.version}</version>
</dependency>
<dependency>
<groupId>org.nutz</groupId>
<artifactId>nutzboot-starter-tio</artifactId>
@ -826,6 +832,11 @@
<artifactId>nutzboot-starter-freemarker</artifactId>
<version>${nutzboot.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${mqtt.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>