package com.ozs.service.utils; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.ozs.common.utils.sign.Md5Utils; import com.ozs.common.utils.stateSecrets.SM4Utils; import com.ozs.service.entity.vo.BaseCameraVersionVo; import com.ozs.service.entity.vo.Codec; import com.ozs.service.entity.vo.Data; import com.ozs.service.entity.vo.Heartbeat; import com.ozs.service.entity.vo.Param; import com.ozs.service.entity.vo.Svp; import com.ozs.service.entity.vo.UpdateDeviceSn; import com.ozs.service.entity.vo.Venc0; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * 发布客户端 * * @author Administrator */ public class PublishClient { /** * mqtt服务器地址 */ public static final String HOST = "tcp://124.70.58.209:1883"; /** * 主题 */ private final String topic; /** * 客户端实例 */ private final MqttClient client; /** * 主题实例 */ private MqttTopic topic11; private static final String USER_NAME = "guest"; private static final String PASS_WORD = "guest"; private MqttMessage message; /** * 初始化客户端实例 * * @param topic * @param clientId * @throws MqttException */ public PublishClient(String topic, String clientId) throws MqttException { /** * mqtt 客户机ID * heart_DEVICESN */ this.topic = topic; /** * MemoryPersistence设置clientId的保存形式,默认为以内存保存 */ client = new MqttClient(HOST, clientId, new MemoryPersistence()); connect(); } /** * 连接服务器 */ private void connect() { /** * 连接配置 */ MqttConnectOptions options = new MqttConnectOptions(); /** * 不保存,每次重启新client */ options.setCleanSession(true); options.setUserName(USER_NAME); options.setPassword(PASS_WORD.toCharArray()); /** * 设置超时时间 */ options.setConnectionTimeout(10); /** * 设置会话心跳时间 */ options.setKeepAliveInterval(20); try { /** * 设置发布回调 */ client.setCallback(new PublishCallback()); client.connect(options); topic11 = client.getTopic(topic); } catch (Exception e) { e.printStackTrace(); } } /** * 发布 * * @param topic * @param message * @throws MqttPersistenceException * @throws MqttException */ public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException, MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println("message is published completely! " + token.isComplete()); } public static void pull(String sign, String deviceSn) throws MqttException, InterruptedException { /** * 发布客户端 */ PublishClient server = new PublishClient("heart_" + deviceSn, "heart_" + deviceSn); Heartbeat test = new Heartbeat(); test.setName("HeartResponse"); Data data = new Data(); data.setAlarm(0); data.setStream(0); test.setData(data); test.setCode(200); test.setSign(sign); String s = JSON.toJSONString(test); server.message = new MqttMessage(); /** * 保证消息能到达一次 */ server.message.setQos(1); /** * 消息保留 */ // server.message.setRetained(false); /** * 消息内容 */ server.message.setPayload(s.getBytes()); /** * 发布 */ server.publish(server.topic11, server.message); } public static void updateDeviceSn(BaseCameraVersionVo baseCameraVersionVo) { /** * 发布客户端 */ for (String code : baseCameraVersionVo.getCameraCodeList()) { try { PublishClient server = new PublishClient("update_" + code, "update_" + code); UpdateDeviceSn updateDeviceSn = new UpdateDeviceSn(); updateDeviceSn.setName("UpdateRequest"); updateDeviceSn.setType(Integer.valueOf(baseCameraVersionVo.getUpgradeType())); updateDeviceSn.setMd5(baseCameraVersionVo.getMd5()); updateDeviceSn.setSign(UUID.randomUUID().toString()); updateDeviceSn.setUrl(baseCameraVersionVo.getVersionAddress()); String s = JSON.toJSONString(updateDeviceSn); server.message = new MqttMessage(); /** * 保证消息能到达一次 */ server.message.setQos(1); /** * 消息保留 */ // server.message.setRetained(false); /** * 消息内容 */ server.message.setPayload(s.getBytes()); /** * 发布 */ server.publish(server.topic11, server.message); } catch (MqttException e) { System.out.println(e.getMessage()); } } } public static void confidenceCoefficient(BaseCameraVersionVo baseCameraVersionVo,String value) { /** * 发布客户端 */ for (String code : baseCameraVersionVo.getCameraCodeList()) { try { PublishClient server = new PublishClient("config_" + code, "config_" + code); JSONObject res = new JSONObject(); res.put("Name", "ConfigRequest"); Svp svp = new Svp(); Param param = new Param(); param.setThreshold(Double.valueOf(value)); svp.setParam(param); Map map = new HashMap<>(); map.put("svp", svp); res.put("data", map); res.put("sign", UUID.randomUUID().toString()); String s = JSONObject.toJSONString(res); server.message = new MqttMessage(); /** * 保证消息能到达一次 */ server.message.setQos(1); /** * 消息保留 */ // server.message.setRetained(false); /** * 消息内容 */ server.message.setPayload(s.getBytes()); /** * 发布 */ server.publish(server.topic11, server.message); } catch (MqttException e) { System.out.println(e.getMessage()); } } } public static void configFrameRate(BaseCameraVersionVo baseCameraVersionVo,String value) { /** * 发布客户端 */ for (String code : baseCameraVersionVo.getCameraCodeList()) { try { PublishClient server = new PublishClient("config_" + code, "config_" + code); JSONObject res = new JSONObject(); res.put("Name", "ConfigRequest"); Codec codec = new Codec(); Venc0 venc0 = new Venc0(); venc0.setFps(Double.valueOf(value)); codec.setVenc0(venc0); Map map = new HashMap<>(); map.put("codec", codec); res.put("data", map); res.put("sign", UUID.randomUUID().toString()); String s = JSONObject.toJSONString(res); server.message = new MqttMessage(); /** * 保证消息能到达一次 */ server.message.setQos(1); /** * 消息保留 */ // server.message.setRetained(false); /** * 消息内容 */ server.message.setPayload(s.getBytes()); /** * 发布 */ server.publish(server.topic11, server.message); } catch (MqttException e) { System.out.println(e.getMessage()); } } } }