PublishClient.java 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package com.ozs.service.utils;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.ozs.common.utils.sign.Md5Utils;
  4. import com.ozs.service.entity.vo.BaseCameraVersionVo;
  5. import com.ozs.service.entity.vo.Data;
  6. import com.ozs.service.entity.vo.Heartbeat;
  7. import com.ozs.service.entity.vo.UpdateDeviceSn;
  8. import org.eclipse.paho.client.mqttv3.MqttClient;
  9. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  10. import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
  11. import org.eclipse.paho.client.mqttv3.MqttException;
  12. import org.eclipse.paho.client.mqttv3.MqttMessage;
  13. import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
  14. import org.eclipse.paho.client.mqttv3.MqttTopic;
  15. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  16. import org.springframework.beans.factory.annotation.Value;
  17. import java.util.UUID;
  18. /**
  19. * 发布客户端
  20. *
  21. * @author Administrator
  22. */
  23. public class PublishClient {
  24. /**
  25. * mqtt服务器地址
  26. */
  27. public static final String HOST="tcp://124.70.58.209:1883";
  28. /**
  29. * 主题
  30. */
  31. private final String topic;
  32. /**
  33. * 客户端实例
  34. */
  35. private final MqttClient client;
  36. /**
  37. * 主题实例
  38. */
  39. private MqttTopic topic11;
  40. private static final String USER_NAME = "guest";
  41. private static final String PASS_WORD = "guest";
  42. private MqttMessage message;
  43. /**
  44. * 初始化客户端实例
  45. * @param topic
  46. * @param clientId
  47. * @throws MqttException
  48. */
  49. public PublishClient(String topic,String clientId) throws MqttException {
  50. /**
  51. * mqtt 客户机ID
  52. * heart_DEVICESN
  53. */
  54. this.topic=topic;
  55. /**
  56. * MemoryPersistence设置clientId的保存形式,默认为以内存保存
  57. */
  58. client = new MqttClient(HOST, clientId, new MemoryPersistence());
  59. connect();
  60. }
  61. /**
  62. * 连接服务器
  63. */
  64. private void connect() {
  65. /**
  66. * 连接配置
  67. */
  68. MqttConnectOptions options = new MqttConnectOptions();
  69. /**
  70. * 不保存,每次重启新client
  71. */
  72. options.setCleanSession(true);
  73. options.setUserName(USER_NAME);
  74. options.setPassword(PASS_WORD.toCharArray());
  75. /**
  76. * 设置超时时间
  77. */
  78. options.setConnectionTimeout(10);
  79. /**
  80. * 设置会话心跳时间
  81. */
  82. options.setKeepAliveInterval(20);
  83. try {
  84. /**
  85. * 设置发布回调
  86. */
  87. client.setCallback(new PublishCallback());
  88. client.connect(options);
  89. topic11 = client.getTopic(topic);
  90. } catch (Exception e) {
  91. e.printStackTrace();
  92. }
  93. }
  94. /**
  95. * 发布
  96. * @param topic
  97. * @param message
  98. * @throws MqttPersistenceException
  99. * @throws MqttException
  100. */
  101. public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException,
  102. MqttException {
  103. MqttDeliveryToken token = topic.publish(message);
  104. token.waitForCompletion();
  105. System.out.println("message is published completely! " + token.isComplete());
  106. }
  107. public static void pull(String sign,String deviceSn) throws MqttException, InterruptedException {
  108. /**
  109. * 发布客户端
  110. */
  111. PublishClient server = new PublishClient("heart_"+deviceSn,"heart_"+deviceSn);
  112. Heartbeat test = new Heartbeat();
  113. test.setName("HeartResponse");
  114. Data data = new Data();
  115. data.setAlarm(0);
  116. data.setStream(0);
  117. test.setData(data);
  118. test.setCode(200);
  119. test.setSign(sign);
  120. String s = JSON.toJSONString(test);
  121. server.message = new MqttMessage();
  122. /**
  123. * 保证消息能到达一次
  124. */
  125. server.message.setQos(1);
  126. /**
  127. * 消息保留
  128. */
  129. // server.message.setRetained(false);
  130. /**
  131. * 消息内容
  132. */
  133. server.message.setPayload(s.getBytes());
  134. /**
  135. * 发布
  136. */
  137. server.publish(server.topic11, server.message);
  138. }
  139. public static void updateDeviceSn(BaseCameraVersionVo baseCameraVersionVo){
  140. /**
  141. * 发布客户端
  142. */
  143. for (String code : baseCameraVersionVo.getCameraCodeList()) {
  144. try {
  145. PublishClient server = new PublishClient("update_" + code, "update_" + code);
  146. UpdateDeviceSn updateDeviceSn = new UpdateDeviceSn();
  147. updateDeviceSn.setName("UpdateRequest");
  148. updateDeviceSn.setType(Integer.valueOf(baseCameraVersionVo.getUpgradeType()));
  149. updateDeviceSn.setMd5(baseCameraVersionVo.getMd5());
  150. updateDeviceSn.setSign(UUID.randomUUID().toString());
  151. updateDeviceSn.setUrl(baseCameraVersionVo.getVersionAddress());
  152. String s = JSON.toJSONString(updateDeviceSn);
  153. server.message = new MqttMessage();
  154. /**
  155. * 保证消息能到达一次
  156. */
  157. server.message.setQos(1);
  158. /**
  159. * 消息保留
  160. */
  161. // server.message.setRetained(false);
  162. /**
  163. * 消息内容
  164. */
  165. server.message.setPayload(s.getBytes());
  166. /**
  167. * 发布
  168. */
  169. server.publish(server.topic11, server.message);
  170. } catch (MqttException e) {
  171. System.out.println(e.getMessage());
  172. }
  173. }
  174. }
  175. }