package com.ozs.service.utils; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.ozs.service.entity.CameraUpdateLog; import com.ozs.service.service.CameraUpdateLogService; import com.ozs.service.service.impl.CameraUpdateLogServiceImpl; import net.sf.jsqlparser.Model; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; /** * callback回调: * * @author Administrator */ public class PublishCallback implements MqttCallback { public static final Logger logger = LoggerFactory.getLogger(PublishCallback.class); @Resource private PublishClient publishClient; //在断开连接时调用 @Override public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 logger.error("连接断开,原因:" + cause); try { logger.error("------connectionLost-------"); PublishClient.reconnect(); } catch (Exception e) { logger.error("重连失败:" + e); } } //接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用 @Override public void deliveryComplete(IMqttDeliveryToken token) { logger.error("deliveryComplete---------" + token.isComplete()); } //接收已经预订的发布 @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String sign = null; String deviceSn = null; Integer code = null; String model = null; // subscribe后得到的消息会执行到这里面 logger.info("接收消息主题 : {}", topic); logger.info("接收消息内容 : {}", new String(message.getPayload())); if ("heart".equals(topic)) { try { String s = new String(message.getPayload()); s = "[" + s + "]"; JSONArray jsonArray = JSONArray.parseArray(s); for (int i = 0; i < jsonArray.size(); i++) { JSONObject object = jsonArray.getJSONObject(i); sign = (String) object.get("sign"); deviceSn = (String) object.get("device_sn"); JSONArray model1 = object.getJSONArray("model"); // 检查数组是否为空 if (model1 != null && !model1.isEmpty()) { // 取出数组的第一个值 model = model1.getString(0); } } // CallbackUtil.heart(deviceSn); CallbackUtil.updateVersionNum(model,deviceSn); PublishClient.pull(sign, deviceSn); } catch (Exception e) { logger.error(e.getMessage()); } } if ("update".equals(topic)) { try { String s = new String(message.getPayload()); s = "[" + s + "]"; JSONArray jsonArray = JSONArray.parseArray(s); for (int i = 0; i < jsonArray.size(); i++) { JSONObject object = jsonArray.getJSONObject(i); code = (Integer) object.get("code"); deviceSn = (String) object.get("device_sn"); } if (code == 200) { CallbackUtil.callback(deviceSn, code, ""); } else { CallbackUtil.callback(deviceSn, code, "固件升级失败"); } } catch (Exception e) { logger.error(e.getMessage()); } } if ("config".equals(topic)) { try { String s = new String(message.getPayload()); s = "[" + s + "]"; JSONArray jsonArray = JSONArray.parseArray(s); for (int i = 0; i < jsonArray.size(); i++) { JSONObject object = jsonArray.getJSONObject(i); code = (Integer) object.get("code"); logger.info("code-------->" + code); } CallbackUtil.callback(deviceSn, code, ""); } catch (Exception e) { logger.error(e.getMessage()); } } logger.info("接收消息内容 : " + new String(message.getPayload())); } }