package com.ozs.service.utils; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.ozs.service.entity.CameraUpdateLog; import com.ozs.service.service.CameraUpdateLogService; import com.ozs.service.service.impl.CameraUpdateLogServiceImpl; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; /** * callback回调: * * @author Administrator */ public class PublishCallback implements MqttCallback { public static final Logger logger = LoggerFactory.getLogger(PublishCallback.class); //在断开连接时调用 @Override public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 System.out.println("连接断开,可以做重连" + cause); } //接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用 @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } //接收已经预订的发布 @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String sign = null; String deviceSn = null; Integer code = null; // subscribe后得到的消息会执行到这里面 logger.info("接收消息主题 : {}", topic); logger.info("接收消息内容 : {}", new String(message.getPayload())); System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); if ("heart".equals(topic)) { try { String s = new String(message.getPayload()); s = "[" + s + "]"; JSONArray jsonArray = JSONArray.parseArray(s); for (int i = 0; i < jsonArray.size(); i++) { JSONObject object = jsonArray.getJSONObject(i); sign = (String) object.get("sign"); deviceSn = (String) object.get("device_sn"); } CallbackUtil.heart(deviceSn); PublishClient.pull(sign, deviceSn); } catch (Exception e) { logger.error(e.getMessage()); } } if ("update".equals(topic)) { try { String s = new String(message.getPayload()); s = "[" + s + "]"; JSONArray jsonArray = JSONArray.parseArray(s); for (int i = 0; i < jsonArray.size(); i++) { JSONObject object = jsonArray.getJSONObject(i); code = (Integer) object.get("code"); deviceSn = (String) object.get("device_sn"); } if (code == 200) { CallbackUtil.callback(deviceSn, code, ""); } else { CallbackUtil.callback(deviceSn, code, "固件升级失败"); } } catch (Exception e) { logger.error(e.getMessage()); } } if ("config".equals(topic)) { try { String s = new String(message.getPayload()); s = "[" + s + "]"; JSONArray jsonArray = JSONArray.parseArray(s); for (int i = 0; i < jsonArray.size(); i++) { JSONObject object = jsonArray.getJSONObject(i); code = (Integer) object.get("code"); } logger.info("code-------->"+code); CallbackUtil.callback(deviceSn, code, ""); } catch (Exception e) { logger.error(e.getMessage()); } } System.out.println("接收消息内容 : " + new String(message.getPayload())); } }