PublishCallback.java 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package com.ozs.service.utils;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.alibaba.fastjson2.JSONArray;
  4. import com.alibaba.fastjson2.JSONObject;
  5. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  6. import com.ozs.service.entity.CameraUpdateLog;
  7. import com.ozs.service.service.CameraUpdateLogService;
  8. import com.ozs.service.service.impl.CameraUpdateLogServiceImpl;
  9. import net.sf.jsqlparser.Model;
  10. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  11. import org.eclipse.paho.client.mqttv3.MqttCallback;
  12. import org.eclipse.paho.client.mqttv3.MqttMessage;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.stereotype.Component;
  17. import javax.annotation.Resource;
  18. import java.util.List;
  19. /**
  20. * callback回调:
  21. *
  22. * @author Administrator
  23. */
  24. public class PublishCallback implements MqttCallback {
  25. public static final Logger logger = LoggerFactory.getLogger(PublishCallback.class);
  26. @Resource
  27. private PublishClient publishClient;
  28. //在断开连接时调用
  29. @Override
  30. public void connectionLost(Throwable cause) {
  31. // 连接丢失后,一般在这里面进行重连
  32. logger.error("连接断开,原因:" + cause);
  33. try {
  34. logger.error("------connectionLost-------");
  35. PublishClient.reconnect();
  36. } catch (Exception e) {
  37. logger.error("重连失败:" + e);
  38. }
  39. }
  40. //接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用
  41. @Override
  42. public void deliveryComplete(IMqttDeliveryToken token) {
  43. logger.error("deliveryComplete---------" + token.isComplete());
  44. }
  45. //接收已经预订的发布
  46. @Override
  47. public void messageArrived(String topic, MqttMessage message) throws Exception {
  48. String sign = null;
  49. String deviceSn = null;
  50. Integer code = null;
  51. String model = null;
  52. // subscribe后得到的消息会执行到这里面
  53. logger.info("接收消息主题 : {}", topic);
  54. logger.info("接收消息内容 : {}", new String(message.getPayload()));
  55. if ("heart".equals(topic)) {
  56. try {
  57. String s = new String(message.getPayload());
  58. s = "[" + s + "]";
  59. JSONArray jsonArray = JSONArray.parseArray(s);
  60. for (int i = 0; i < jsonArray.size(); i++) {
  61. JSONObject object = jsonArray.getJSONObject(i);
  62. sign = (String) object.get("sign");
  63. deviceSn = (String) object.get("device_sn");
  64. JSONArray model1 = object.getJSONArray("model");
  65. // 检查数组是否为空
  66. if (model1 != null && !model1.isEmpty()) {
  67. // 取出数组的第一个值
  68. model = model1.getString(0);
  69. }
  70. }
  71. // CallbackUtil.heart(deviceSn);
  72. CallbackUtil.updateVersionNum(model,deviceSn);
  73. PublishClient.pull(sign, deviceSn);
  74. } catch (Exception e) {
  75. logger.error(e.getMessage());
  76. }
  77. }
  78. if ("update".equals(topic)) {
  79. try {
  80. String s = new String(message.getPayload());
  81. s = "[" + s + "]";
  82. JSONArray jsonArray = JSONArray.parseArray(s);
  83. for (int i = 0; i < jsonArray.size(); i++) {
  84. JSONObject object = jsonArray.getJSONObject(i);
  85. code = (Integer) object.get("code");
  86. deviceSn = (String) object.get("device_sn");
  87. }
  88. if (code == 200) {
  89. CallbackUtil.callback(deviceSn, code, "");
  90. } else {
  91. CallbackUtil.callback(deviceSn, code, "固件升级失败");
  92. }
  93. } catch (Exception e) {
  94. logger.error(e.getMessage());
  95. }
  96. }
  97. if ("config".equals(topic)) {
  98. try {
  99. String s = new String(message.getPayload());
  100. s = "[" + s + "]";
  101. JSONArray jsonArray = JSONArray.parseArray(s);
  102. for (int i = 0; i < jsonArray.size(); i++) {
  103. JSONObject object = jsonArray.getJSONObject(i);
  104. code = (Integer) object.get("code");
  105. logger.info("code-------->" + code);
  106. }
  107. CallbackUtil.callback(deviceSn, code, "");
  108. } catch (Exception e) {
  109. logger.error(e.getMessage());
  110. }
  111. }
  112. logger.info("接收消息内容 : " + new String(message.getPayload()));
  113. }
  114. }