package com.ozs.service.utils; import com.alibaba.fastjson2.JSON; import com.ozs.common.utils.sign.Md5Utils; import com.ozs.service.entity.vo.BaseCameraVersionVo; import com.ozs.service.entity.vo.Data; import com.ozs.service.entity.vo.Heartbeat; import com.ozs.service.entity.vo.UpdateDeviceSn; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import java.util.UUID; /** * 发布客户端 * * @author Administrator */ public class PublishClient { /** * mqtt服务器地址 */ public static final String HOST="tcp://124.70.58.209:1883"; /** * 主题 */ private final String topic; /** * 客户端实例 */ private final MqttClient client; /** * 主题实例 */ private MqttTopic topic11; private static final String USER_NAME = "guest"; private static final String PASS_WORD = "guest"; private MqttMessage message; /** * 初始化客户端实例 * @param topic * @param clientId * @throws MqttException */ public PublishClient(String topic,String clientId) throws MqttException { /** * mqtt 客户机ID * heart_DEVICESN */ this.topic=topic; /** * MemoryPersistence设置clientId的保存形式,默认为以内存保存 */ client = new MqttClient(HOST, clientId, new MemoryPersistence()); connect(); } /** * 连接服务器 */ private void connect() { /** * 连接配置 */ MqttConnectOptions options = new MqttConnectOptions(); /** * 不保存,每次重启新client */ options.setCleanSession(true); options.setUserName(USER_NAME); options.setPassword(PASS_WORD.toCharArray()); /** * 设置超时时间 */ options.setConnectionTimeout(10); /** * 设置会话心跳时间 */ options.setKeepAliveInterval(20); try { /** * 设置发布回调 */ client.setCallback(new PublishCallback()); client.connect(options); topic11 = client.getTopic(topic); } catch (Exception e) { e.printStackTrace(); } } /** * 发布 * @param topic * @param message * @throws MqttPersistenceException * @throws MqttException */ public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException, MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println("message is published completely! " + token.isComplete()); } public static void pull(String sign,String deviceSn) throws MqttException, InterruptedException { /** * 发布客户端 */ PublishClient server = new PublishClient("heart_"+deviceSn,"heart_"+deviceSn); Heartbeat test = new Heartbeat(); test.setName("HeartResponse"); Data data = new Data(); data.setAlarm(0); data.setStream(0); test.setData(data); test.setCode(200); test.setSign(sign); String s = JSON.toJSONString(test); server.message = new MqttMessage(); /** * 保证消息能到达一次 */ server.message.setQos(1); /** * 消息保留 */ // server.message.setRetained(false); /** * 消息内容 */ server.message.setPayload(s.getBytes()); /** * 发布 */ server.publish(server.topic11, server.message); } public static void updateDeviceSn(BaseCameraVersionVo baseCameraVersionVo){ /** * 发布客户端 */ for (String code : baseCameraVersionVo.getCameraCodeList()) { try { PublishClient server = new PublishClient("update_" + code, "update_" + code); UpdateDeviceSn updateDeviceSn = new UpdateDeviceSn(); updateDeviceSn.setName("UpdateRequest"); updateDeviceSn.setType(Integer.valueOf(baseCameraVersionVo.getUpgradeType())); updateDeviceSn.setMd5(baseCameraVersionVo.getMd5()); updateDeviceSn.setSign(UUID.randomUUID().toString()); updateDeviceSn.setUrl(baseCameraVersionVo.getVersionAddress()); String s = JSON.toJSONString(updateDeviceSn); server.message = new MqttMessage(); /** * 保证消息能到达一次 */ server.message.setQos(1); /** * 消息保留 */ // server.message.setRetained(false); /** * 消息内容 */ server.message.setPayload(s.getBytes()); /** * 发布 */ server.publish(server.topic11, server.message); } catch (MqttException e) { System.out.println(e.getMessage()); } } } }