|
@@ -5,56 +5,86 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
|
import org.eclipse.paho.client.mqttv3.MqttTopic;
|
|
import org.eclipse.paho.client.mqttv3.MqttTopic;
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
|
|
/**
|
|
/**
|
|
* 订阅客户端
|
|
* 订阅客户端
|
|
|
|
+ *
|
|
* @author Administrator
|
|
* @author Administrator
|
|
*/
|
|
*/
|
|
public class SubscribeClient {
|
|
public class SubscribeClient {
|
|
- //mqtt服务器ip
|
|
|
|
- public static final String HOST = "tcp://124.70.58.209:1883";
|
|
|
|
- //主题
|
|
|
|
- public static final String TOPIC1 = "heart";
|
|
|
|
-// public static final String TOPIC1 = "heart_DEVICESN";
|
|
|
|
- //mqtt 客户机ID
|
|
|
|
- private String clientid = "client";
|
|
|
|
- private MqttClient client;
|
|
|
|
- private MqttConnectOptions options;
|
|
|
|
- private String userName = "guest";
|
|
|
|
- private String passWord = "guest";
|
|
|
|
|
|
+ /**
|
|
|
|
+ * mqtt服务器ip
|
|
|
|
+ */
|
|
|
|
+ public static final String HOST="tcp://124.70.58.209:1883";
|
|
|
|
+ /**
|
|
|
|
+ * 主题
|
|
|
|
+ */
|
|
|
|
+ public String topic;
|
|
|
|
+ /**
|
|
|
|
+ * mqtt 客户机ID
|
|
|
|
+ */
|
|
|
|
+ private final String clientId;
|
|
|
|
+ private static final String USER_NAME = "guest";
|
|
|
|
+ private static final String PASS_WORD = "guest";
|
|
@SuppressWarnings("unused")
|
|
@SuppressWarnings("unused")
|
|
private ScheduledExecutorService scheduler;
|
|
private ScheduledExecutorService scheduler;
|
|
- public SubscribeClient(String clientid){
|
|
|
|
- this.clientid = clientid;
|
|
|
|
|
|
+
|
|
|
|
+ public SubscribeClient(String clientId, String topic) {
|
|
|
|
+ this.clientId = clientId;
|
|
|
|
+ this.topic = topic;
|
|
}
|
|
}
|
|
|
|
+
|
|
public void start() {
|
|
public void start() {
|
|
try {
|
|
try {
|
|
- // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
|
|
|
|
- client = new MqttClient(HOST, clientid, new MemoryPersistence());
|
|
|
|
- // MQTT的连接设置
|
|
|
|
- options = new MqttConnectOptions();
|
|
|
|
- // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
|
|
|
|
|
|
+ /**
|
|
|
|
+ * host为主机名,clientId即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientId的保存形式,默认为以内存保存
|
|
|
|
+ */
|
|
|
|
+ MqttClient client = new MqttClient(HOST, this.clientId, new MemoryPersistence());
|
|
|
|
+ /**
|
|
|
|
+ * MQTT的连接设置
|
|
|
|
+ */
|
|
|
|
+ MqttConnectOptions options = new MqttConnectOptions();
|
|
|
|
+ /**
|
|
|
|
+ * 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
|
|
|
|
+ */
|
|
options.setCleanSession(true);
|
|
options.setCleanSession(true);
|
|
- // 设置连接的用户名
|
|
|
|
- options.setUserName(userName);
|
|
|
|
- // 设置连接的密码
|
|
|
|
- options.setPassword(passWord.toCharArray());
|
|
|
|
- // 设置超时时间 单位为秒
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 设置连接的用户名
|
|
|
|
+ */
|
|
|
|
+ options.setUserName(USER_NAME);
|
|
|
|
+ /**
|
|
|
|
+ * 设置连接的密码
|
|
|
|
+ */
|
|
|
|
+ options.setPassword(PASS_WORD.toCharArray());
|
|
|
|
+ /**
|
|
|
|
+ * 设置超时时间 单位为秒
|
|
|
|
+ */
|
|
options.setConnectionTimeout(10);
|
|
options.setConnectionTimeout(10);
|
|
- // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
|
|
|
|
+ */
|
|
options.setKeepAliveInterval(20);
|
|
options.setKeepAliveInterval(20);
|
|
- // 设置回调
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 设置回调
|
|
|
|
+ */
|
|
client.setCallback(new PublishCallback());
|
|
client.setCallback(new PublishCallback());
|
|
- MqttTopic topic = client.getTopic(TOPIC1);
|
|
|
|
- //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
|
|
|
|
- // 遗嘱
|
|
|
|
|
|
+ MqttTopic topic = client.getTopic(this.topic);
|
|
|
|
+ /**
|
|
|
|
+ * setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
|
|
|
|
+ * 遗嘱
|
|
|
|
+ */
|
|
options.setWill(topic, "close".getBytes(), 2, true);
|
|
options.setWill(topic, "close".getBytes(), 2, true);
|
|
client.connect(options);
|
|
client.connect(options);
|
|
- //订阅消息
|
|
|
|
- int[] Qos = {1};
|
|
|
|
- String[] topic1 = {TOPIC1};
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 订阅消息
|
|
|
|
+ */
|
|
|
|
+ int[] Qos = {1};
|
|
|
|
+ String[] topic1 = {this.topic};
|
|
client.subscribe(topic1, Qos);
|
|
client.subscribe(topic1, Qos);
|
|
|
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|