package com.ozs.service.utils; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.filter.Filter; import com.alibaba.fastjson2.filter.SimplePropertyPreFilter; import com.ozs.common.utils.StringUtils; import com.ozs.common.utils.sign.Md5Utils; import com.ozs.common.utils.stateSecrets.SM4Utils; import com.ozs.common.utils.uuid.IdUtils; import com.ozs.service.entity.BaseCameraManagement; import com.ozs.service.entity.vo.*; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * 发布客户端 * * @author Administrator */ @Slf4j @Component public class PublishClient { /** * mqtt服务器地址 */ public static String HOST; public static String USER_NAME; public static String PASS_WORD; private static String clientId; /** * 客户端实例 */ private static MqttClient client; @Autowired private RabbitMqConfig rabbitMqConfig; /** * 获取 MqttTopic */ public static MqttTopic getMqttTopic(String topic) { return client.getTopic(topic); } /** * 发布 * * @param topic * @param message * @throws MqttPersistenceException * @throws MqttException */ public static void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException, MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); log.info("message is published completely! " + token.isComplete()); } public static void pull(String sign, String deviceSn) throws MqttException { /** * 发布客户端 */ Heartbeat test = new Heartbeat(); test.setName("HeartResponse"); Data data = new Data(); data.setAlarm(0); data.setStream(0); test.setData(data); test.setCode(200); test.setSign(sign); String s = JSON.toJSONString(test); MqttMessage message = new MqttMessage(); /** * 保证消息能到达一次 */ message.setQos(1); /** * 消息保留 */ // server.message.setRetained(false); /** * 消息内容 */ message.setPayload(s.getBytes()); /** * 发布 */ publish(getMqttTopic("heart_" + deviceSn), message); } public static void main(String[] args) { JSONObject res = new JSONObject(); res.put("Name", "ConfigRequest"); Codec codec = new Codec(); Venc venc = new Venc(); venc.setFps(Double.valueOf(11)); ArrayList vencList = new ArrayList<>(); vencList.add(venc); codec.setVenc(vencList); Map map = new HashMap<>(); res.put("device_sn", "11"); res.put("sign", "rate" + IdUtils.fastSimpleUUID()); map.put("codec",codec); res.put("data", map); String s = JSONObject.toJSONString(res); log.info(s); } public static void updateDeviceSn(BaseCameraVersionVo baseCameraVersionVo) { /** * 发布客户端 */ log.info("updateDeviceSn---start"); for (String code : baseCameraVersionVo.getCameraCodeList()) { try { log.info("update_" + code); UpdateDeviceSn updateDeviceSn = new UpdateDeviceSn(); updateDeviceSn.setName("UpdateRequest"); updateDeviceSn.setType(Integer.valueOf(baseCameraVersionVo.getUpgradeType())); updateDeviceSn.setMd5(baseCameraVersionVo.getMd5()); updateDeviceSn.setSign(IdUtils.fastSimpleUUID()); updateDeviceSn.setUrl(baseCameraVersionVo.getVersionAddress()); String s = JSON.toJSONString(updateDeviceSn); MqttMessage message = new MqttMessage(); /** * 保证消息能到达一次 */ message.setQos(1); /** * 消息保留 */ // server.message.setRetained(false); /** * 消息内容 */ message.setPayload(s.getBytes()); /** * 发布 */ publish(getMqttTopic("update_" + code), message); log.info("updateDeviceSn---end"); } catch (MqttException e) { log.error("updateDeviceSn-------" + e.getMessage()); } } } public static void alarmPush(ReqMsgAlarmVo reqMsgAlarmVo) { /** * 发布客户端 */ log.info("alarmPush---start"); try { if (StringUtils.isEmpty(reqMsgAlarmVo.getAlarmMile())){ reqMsgAlarmVo.setAlarmMile(""); } if (ObjectUtils.isEmpty(reqMsgAlarmVo.getLineDir())){ reqMsgAlarmVo.setLineDirStr(""); } String s = JSON.toJSONString(reqMsgAlarmVo); MqttMessage message = new MqttMessage(); /** * 保证消息能到达一次 */ message.setQos(1); /** * 消息保留 */ // server.message.setRetained(false); /** * 消息内容 */ message.setPayload(s.getBytes()); /** * 发布 */ publish(getMqttTopic("alarmPush"), message); log.info("alarmPush---end"); } catch (MqttException e) { log.error("alarmPush-------" + e.getMessage()); } } public static void confidenceCoefficient(BaseCameraManagement baseCameraManagement, String value) { /** * 发布客户端 */ try { JSONObject res = new JSONObject(); res.put("Name", "HeartRequest"); Codec codec = new Codec(); Venc venc = new Venc(); venc.setFps(Double.valueOf(value)); ArrayList vencList = new ArrayList<>(); vencList.add(venc); codec.setVenc(vencList); Map map = new HashMap<>(); res.put("device_sn", baseCameraManagement.getCameraSn()); res.put("sign", "rate" + IdUtils.fastSimpleUUID()); map.put("codec",codec); res.put("data", map); String s = JSONObject.toJSONString(res); MqttMessage message = new MqttMessage(); /** * 保证消息能到达一次 */ message.setQos(1); /** * 消息保留 */ // message.setRetained(false); /** * 消息内容 */ message.setPayload(s.getBytes()); /** * 发布 */ publish(getMqttTopic("config_" + baseCameraManagement.getCameraCode()), message); } catch (MqttException e) { log.error(e.getMessage()); } } public static void configFrameRate(BaseCameraManagement baseCameraManagement,Integer mode) { /** * 发布客户端 */ try { JSONObject res = new JSONObject(); res.put("Name", "HeartRequest"); Codec codec = new Codec(); Day2night day2night = new Day2night(); day2night.setMode(mode); codec.setDay2night(day2night); Map map = new HashMap<>(); res.put("device_sn", baseCameraManagement.getCameraSn()); res.put("sign", "rate" + IdUtils.fastSimpleUUID()); map.put("codec",codec); res.put("data", map); String s = JSONObject.toJSONString(res); MqttMessage message = new MqttMessage(); /** * 保证消息能到达一次 */ message.setQos(1); /** * 消息保留 */ // message.setRetained(false); /** * 消息内容 */ message.setPayload(s.getBytes()); /** * 发布 */ publish(getMqttTopic("config_" + baseCameraManagement.getCameraCode()), message); } catch (MqttException e) { log.error(e.getMessage()); } } @PostConstruct public void init() { HOST = rabbitMqConfig.getHost(); USER_NAME = rabbitMqConfig.getUserName(); PASS_WORD = rabbitMqConfig.getPassword(); clientId = rabbitMqConfig.getClientId(); /** * 连接配置 */ MqttConnectOptions options = new MqttConnectOptions(); /** * 不保存,每次重启新client */ options.setCleanSession(true); options.setUserName(USER_NAME); options.setPassword(PASS_WORD.toCharArray()); /** * 设置超时时间 */ options.setConnectionTimeout(60); /** * 设置会话心跳时间 */ options.setKeepAliveInterval(40); try { /** * 设置发布回调 */ client = new MqttClient(HOST, clientId, new MemoryPersistence()); client.setCallback(new PublishCallback()); client.connect(options); int[] Qos = {1}; String[] topic1 = {"config", "update", "heart", "test"}; client.subscribe(topic1); } catch (Exception e) { log.error(e.getMessage()); e.printStackTrace(); } } public static void reconnect() throws MqttException { log.error("尝试重连..."); if (client != null) { try { client.close(); } catch (MqttException e) { log.error("关闭现有连接时出错:" + e); } } MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName("camera-update"); options.setPassword("05J5+mtYzx.Ry".toCharArray()); options.setConnectionTimeout(60); options.setKeepAliveInterval(40); try { /** * 设置发布回调 */ client = new MqttClient("tcp://10.161.12.60:1883", "HAZARD-CAMERA-CLIENTID-123", new MemoryPersistence()); client.setCallback(new PublishCallback()); client.connect(options); String[] topic1 = {"config", "update", "heart", "test"}; client.subscribe(topic1); } catch (Exception e) { log.error(e.getMessage()); e.printStackTrace(); } log.error("重连成功!"); } }