|
@@ -3,6 +3,7 @@ package com.ozs.service.utils;
|
|
|
import com.alibaba.fastjson2.JSON;
|
|
|
import com.ozs.service.entity.vo.Data;
|
|
|
import com.ozs.service.entity.vo.Heartbeat;
|
|
|
+import com.ozs.service.entity.vo.UpdateDeviceSn;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
|
|
@@ -11,80 +12,167 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttTopic;
|
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 发布客户端
|
|
|
+ *
|
|
|
* @author Administrator
|
|
|
*/
|
|
|
public class PublishClient {
|
|
|
- //mqtt服务器地址
|
|
|
- public static final String HOST = "tcp://124.70.58.209:1883";
|
|
|
- //主题
|
|
|
- public static final String TOPIC = "heart_DEVICESN";
|
|
|
- //mqtt 客户机ID
|
|
|
- private static final String clientid = "server";
|
|
|
- private MqttClient client;//客户端实例
|
|
|
- private MqttTopic topic11;//主题实例
|
|
|
- private String userName = "guest"; //非必须
|
|
|
- private String passWord = "guest"; //非必须
|
|
|
+ /**
|
|
|
+ * mqtt服务器地址
|
|
|
+ */
|
|
|
+ public static final String HOST="tcp://124.70.58.209:1883";
|
|
|
+ /**
|
|
|
+ * 主题
|
|
|
+ */
|
|
|
+ private final String topic;
|
|
|
+ /**
|
|
|
+ * 客户端实例
|
|
|
+ */
|
|
|
+ private final MqttClient client;
|
|
|
+ /**
|
|
|
+ * 主题实例
|
|
|
+ */
|
|
|
+ private MqttTopic topic11;
|
|
|
+ private static final String USER_NAME = "guest";
|
|
|
+ private static final String PASS_WORD = "guest";
|
|
|
private MqttMessage message;
|
|
|
- //初始化客户端实例
|
|
|
- public PublishClient() throws MqttException {
|
|
|
- //MemoryPersistence设置clientid的保存形式,默认为以内存保存
|
|
|
- client = new MqttClient(HOST, clientid, new MemoryPersistence());
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 初始化客户端实例
|
|
|
+ * @param topic
|
|
|
+ * @param clientId
|
|
|
+ * @throws MqttException
|
|
|
+ */
|
|
|
+ public PublishClient(String topic,String clientId) throws MqttException {
|
|
|
+ /**
|
|
|
+ * mqtt 客户机ID
|
|
|
+ * heart_DEVICESN
|
|
|
+ */
|
|
|
+ this.topic=topic;
|
|
|
+ /**
|
|
|
+ * MemoryPersistence设置clientId的保存形式,默认为以内存保存
|
|
|
+ */
|
|
|
+ client = new MqttClient(HOST, clientId, new MemoryPersistence());
|
|
|
connect();
|
|
|
}
|
|
|
- //连接服务器
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 连接服务器
|
|
|
+ */
|
|
|
private void connect() {
|
|
|
- //连接配置
|
|
|
+ /**
|
|
|
+ * 连接配置
|
|
|
+ */
|
|
|
MqttConnectOptions options = new MqttConnectOptions();
|
|
|
- options.setCleanSession(true);//不保存,每次重启新client
|
|
|
- options.setUserName(userName);
|
|
|
- options.setPassword(passWord.toCharArray());
|
|
|
- // 设置超时时间
|
|
|
+ /**
|
|
|
+ * 不保存,每次重启新client
|
|
|
+ */
|
|
|
+ options.setCleanSession(true);
|
|
|
+ options.setUserName(USER_NAME);
|
|
|
+ options.setPassword(PASS_WORD.toCharArray());
|
|
|
+ /**
|
|
|
+ * 设置超时时间
|
|
|
+ */
|
|
|
options.setConnectionTimeout(10);
|
|
|
- // 设置会话心跳时间
|
|
|
+ /**
|
|
|
+ * 设置会话心跳时间
|
|
|
+ */
|
|
|
options.setKeepAliveInterval(20);
|
|
|
try {
|
|
|
- //设置发布回调
|
|
|
+ /**
|
|
|
+ * 设置发布回调
|
|
|
+ */
|
|
|
client.setCallback(new PublishCallback());
|
|
|
client.connect(options);
|
|
|
- topic11 = client.getTopic(TOPIC);
|
|
|
+ topic11 = client.getTopic(topic);
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
- //发布
|
|
|
- public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发布
|
|
|
+ * @param topic
|
|
|
+ * @param message
|
|
|
+ * @throws MqttPersistenceException
|
|
|
+ * @throws MqttException
|
|
|
+ */
|
|
|
+ public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException,
|
|
|
MqttException {
|
|
|
MqttDeliveryToken token = topic.publish(message);
|
|
|
token.waitForCompletion();
|
|
|
- System.out.println("message is published completely! "+ token.isComplete());
|
|
|
+ System.out.println("message is published completely! " + token.isComplete());
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void pull(String sign,String deviceSn) throws MqttException, InterruptedException {
|
|
|
+ /**
|
|
|
+ * 发布客户端
|
|
|
+ */
|
|
|
+ PublishClient server = new PublishClient("heart_"+deviceSn,"heart_"+deviceSn);
|
|
|
+ Heartbeat test = new Heartbeat();
|
|
|
+ test.setName("HeartResponse");
|
|
|
+ Data data = new Data();
|
|
|
+ data.setAlarm(0);
|
|
|
+ data.setStream(0);
|
|
|
+ test.setData(data);
|
|
|
+ test.setCode(200);
|
|
|
+ test.setSign(sign);
|
|
|
+ String s = JSON.toJSONString(test);
|
|
|
+ server.message = new MqttMessage();
|
|
|
+ /**
|
|
|
+ * 保证消息能到达一次
|
|
|
+ */
|
|
|
+ server.message.setQos(1);
|
|
|
+ /**
|
|
|
+ * 消息保留
|
|
|
+ */
|
|
|
+ server.message.setRetained(true);
|
|
|
+ /**
|
|
|
+ * 消息内容
|
|
|
+ */
|
|
|
+ server.message.setPayload(s.getBytes());
|
|
|
+ /**
|
|
|
+ * 发布
|
|
|
+ */
|
|
|
+ server.publish(server.topic11, server.message);
|
|
|
}
|
|
|
- //测试类
|
|
|
- public static void main(String[] args) throws MqttException, InterruptedException {
|
|
|
- //发布客户端
|
|
|
- PublishClient server = new PublishClient();
|
|
|
- //每隔10s发一条
|
|
|
- for (;;){
|
|
|
- Heartbeat test = new Heartbeat();
|
|
|
- test.setName("HeartResponse");
|
|
|
- Data data = new Data();
|
|
|
- data.setAlarm(0);
|
|
|
- data.setStream(0);
|
|
|
- test.setData(data);
|
|
|
- test.setCode(200);
|
|
|
- test.setSign("b867d0e545d9abbf74c2d11b62798250");
|
|
|
- String s = JSON.toJSONString(test);
|
|
|
-
|
|
|
|
|
|
+ public static void updateDeviceSn(String sign){
|
|
|
+ /**
|
|
|
+ * 发布客户端
|
|
|
+ */
|
|
|
+ try {
|
|
|
+ PublishClient server = new PublishClient(sign,sign);
|
|
|
+ UpdateDeviceSn updateDeviceSn = new UpdateDeviceSn();
|
|
|
+ updateDeviceSn.setName("UpdateRequest");
|
|
|
+ updateDeviceSn.setType(0);
|
|
|
+ updateDeviceSn.setMd5("b867d0e545d9abbf74c2d11b62798250");
|
|
|
+ updateDeviceSn.setSign("b867d0e545d9abbf74c2d11b62798250");
|
|
|
+ updateDeviceSn.setUrl("https://example.com/download/Model_Helmet2_V213_P_c69cbfa9d7de4234f76d6f3811986e3a.bovai");
|
|
|
+ String s = JSON.toJSONString(updateDeviceSn);
|
|
|
server.message = new MqttMessage();
|
|
|
- server.message.setQos(1);//保证消息能到达一次
|
|
|
- server.message.setRetained(true);//消息保留
|
|
|
- server.message.setPayload(s.getBytes());//消息内容
|
|
|
- server.publish(server.topic11 , server.message);//发布
|
|
|
- Thread.sleep(10000);
|
|
|
+ /**
|
|
|
+ * 保证消息能到达一次
|
|
|
+ */
|
|
|
+ server.message.setQos(1);
|
|
|
+ /**
|
|
|
+ * 消息保留
|
|
|
+ */
|
|
|
+ server.message.setRetained(true);
|
|
|
+ /**
|
|
|
+ * 消息内容
|
|
|
+ */
|
|
|
+ server.message.setPayload(s.getBytes());
|
|
|
+ /**
|
|
|
+ * 发布
|
|
|
+ */
|
|
|
+ server.publish(server.topic11, server.message);
|
|
|
+ } catch (MqttException e) {
|
|
|
+ System.out.println(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
}
|