hexiao 1 year ago
parent
commit
a480040e60

+ 0 - 7
business-service/src/main/java/com/ozs/service/service/impl/BaseCameraVersionServiceImpl.java

@@ -1,7 +1,6 @@
 package com.ozs.service.service.impl;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.github.pagehelper.PageHelper;
 import com.ozs.common.core.domain.AjaxResult;
@@ -14,18 +13,15 @@ import com.ozs.service.service.BaseCameraVersionService;
 import com.ozs.service.service.BaseDeviceDynamicManagementService;
 import com.ozs.service.service.CameraLogDetailService;
 import com.ozs.service.utils.PublishClient;
-import com.ozs.service.utils.SubscribeClient;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
-import org.springframework.util.ObjectUtils;
 
 import javax.annotation.Resource;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * @Author : sunhh
@@ -168,15 +164,12 @@ public class BaseCameraVersionServiceImpl extends ServiceImpl<BaseCameraVersionM
             for (BaseVersionCameraParam baseVersionCameraParam : baseVersionCameraParamList) {
                 if ("threshold".equals(baseVersionCameraParam.getParamKey())) {
                     PublishClient.confidenceCoefficient(baseCameraVersionVo, baseVersionCameraParam.getParamValue());
-                    new SubscribeClient("config", "config").start();
                 } else if ("fps".equals(baseVersionCameraParam.getParamKey())) {
                     PublishClient.configFrameRate(baseCameraVersionVo, baseVersionCameraParam.getParamValue());
-                    new SubscribeClient("config", "config").start();
                 }
             }
         }
         PublishClient.updateDeviceSn(baseCameraVersionVo);
-        new SubscribeClient("update", "update").start();
         return AjaxResult.success("已经为您提交升级申请,请您到“日志管理>相机升级日志”页面查看升级结果。");
     }
 }

+ 59 - 41
business-service/src/main/java/com/ozs/service/utils/PublishCallback.java

@@ -10,6 +10,8 @@ import com.ozs.service.service.impl.CameraUpdateLogServiceImpl;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -21,13 +23,14 @@ import java.util.List;
  * @author Administrator
  */
 public class PublishCallback implements MqttCallback {
-    
+
+    public static final Logger logger = LoggerFactory.getLogger(PublishCallback.class);
 
     //在断开连接时调用
     @Override
     public void connectionLost(Throwable cause) {
         // 连接丢失后,一般在这里面进行重连
-        System.out.println("连接断开,可以做重连"+cause);
+        System.out.println("连接断开,可以做重连" + cause);
     }
 
     //接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用
@@ -40,56 +43,71 @@ public class PublishCallback implements MqttCallback {
     @Override
     public void messageArrived(String topic, MqttMessage message) throws Exception {
         String sign = null;
-        String deviceSn=null;
-        Integer code=null;
+        String deviceSn = null;
+        Integer code = null;
         // subscribe后得到的消息会执行到这里面
+        logger.info("接收消息主题 : {}", topic);
+        logger.info("接收消息内容 : {}", new String(message.getPayload()));
         System.out.println("接收消息主题 : " + topic);
         System.out.println("接收消息Qos : " + message.getQos());
         if ("heart".equals(topic)) {
-            String s = new String(message.getPayload());
-            s = "[" + s + "]";
-            JSONArray jsonArray = JSONArray.parseArray(s);
-            for (int i = 0; i < jsonArray.size(); i++) {
-                JSONObject object = jsonArray.getJSONObject(i);
-                sign = (String) object.get("sign");
-                deviceSn = (String) object.get("device_sn");
+            try {
+                String s = new String(message.getPayload());
+                s = "[" + s + "]";
+                JSONArray jsonArray = JSONArray.parseArray(s);
+                for (int i = 0; i < jsonArray.size(); i++) {
+                    JSONObject object = jsonArray.getJSONObject(i);
+                    sign = (String) object.get("sign");
+                    deviceSn = (String) object.get("device_sn");
+                }
+                CallbackUtil.heart(deviceSn);
+                PublishClient.pull(sign, deviceSn);
+            } catch (Exception e) {
+                logger.error(e.getMessage());
             }
-            CallbackUtil.heart(deviceSn);
-            PublishClient.pull(sign,deviceSn);
         }
         if ("update".equals(topic)) {
-            String s = new String(message.getPayload());
-            s = "[" + s + "]";
-            JSONArray jsonArray = JSONArray.parseArray(s);
-            for (int i = 0; i < jsonArray.size(); i++) {
-                JSONObject object = jsonArray.getJSONObject(i);
-                code = (Integer) object.get("code");
-                deviceSn = (String) object.get("device_sn");
+            try {
+                String s = new String(message.getPayload());
+                s = "[" + s + "]";
+                JSONArray jsonArray = JSONArray.parseArray(s);
+                for (int i = 0; i < jsonArray.size(); i++) {
+                    JSONObject object = jsonArray.getJSONObject(i);
+                    code = (Integer) object.get("code");
+                    deviceSn = (String) object.get("device_sn");
+                }
+                if (code == 200) {
+                    CallbackUtil.callback(deviceSn, code, "");
+                } else {
+                    CallbackUtil.callback(deviceSn, code, "固件升级失败");
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage());
             }
-            if (code==200) {
+        }
+        if ("config".equals(topic)) {
+            try {
+                String s = new String(message.getPayload());
+                s = "[" + s + "]";
+                JSONArray jsonArray = JSONArray.parseArray(s);
+                for (int i = 0; i < jsonArray.size(); i++) {
+                    JSONObject object = jsonArray.getJSONObject(i);
+                    code = (Integer) object.get("code");
+                    deviceSn = (String) object.get("device_sn");
+                    sign = (String) object.get("sign");
+                }
+                if (sign.startsWith("rate") && code == 400) {
+                    CallbackUtil.callback(deviceSn, code, "帧率升级失败");
+                }
+                if (sign.startsWith("cien") && code == 400) {
+                    CallbackUtil.callback(deviceSn, code, "置信度升级失败");
+                }
                 CallbackUtil.callback(deviceSn, code, "");
-            }else {
-                CallbackUtil.callback(deviceSn, code, "固件升级失败");
-            }
-        }if ("config".equals(topic)){
-            String s = new String(message.getPayload());
-            s = "[" + s + "]";
-            JSONArray jsonArray = JSONArray.parseArray(s);
-            for (int i = 0; i < jsonArray.size(); i++) {
-                JSONObject object = jsonArray.getJSONObject(i);
-                code = (Integer) object.get("code");
-                deviceSn = (String) object.get("device_sn");
-                sign =(String) object.get("sign");
+            } catch (Exception e) {
+                logger.error(e.getMessage());
             }
-            if (sign.startsWith("rate")&&code==400){
-                CallbackUtil.callback(deviceSn,code,"帧率升级失败");
-            }
-            if (sign.startsWith("cien")&&code==400){
-                CallbackUtil.callback(deviceSn,code,"置信度升级失败");
-            }
-            CallbackUtil.callback(deviceSn,code,"");
         }
         System.out.println("接收消息内容 : " + new String(message.getPayload()));
-        
+
     }
 }

+ 80 - 92
business-service/src/main/java/com/ozs/service/utils/PublishClient.java

@@ -22,8 +22,11 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
 import org.eclipse.paho.client.mqttv3.MqttTopic;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
@@ -36,79 +39,28 @@ import java.util.UUID;
  * @author Administrator
  */
 @Slf4j
+@Component
 public class PublishClient {
+
     /**
      * mqtt服务器地址
      */
-    public static final String HOST = "tcp://124.70.58.209:1883";
-    /**
-     * 主题
-     */
-    private final String topic;
+    public static String HOST;
+    public static String USER_NAME;
+    public static String PASS_WORD;
+    private static String clientId;
     /**
      * 客户端实例
      */
-    private final MqttClient client;
-    /**
-     * 主题实例
-     */
-    private MqttTopic topic11;
-    private static final String USER_NAME = "guest";
-    private static final String PASS_WORD = "guest";
-    private MqttMessage message;
+    private static MqttClient client;
+    @Autowired
+    private RabbitMqConfig rabbitMqConfig;
 
     /**
-     * 初始化客户端实例
-     *
-     * @param topic
-     * @param clientId
-     * @throws MqttException
+     * 获取 MqttTopic
      */
-    public PublishClient(String topic, String clientId) throws MqttException {
-        /**
-         * mqtt 客户机ID
-         * heart_DEVICESN
-         */
-        this.topic = topic;
-        /**
-         * MemoryPersistence设置clientId的保存形式,默认为以内存保存
-         */
-        client = new MqttClient(HOST, clientId, new MemoryPersistence());
-        connect();
-    }
-
-    /**
-     * 连接服务器
-     */
-    private void connect() {
-        /**
-         * 连接配置
-         */
-        MqttConnectOptions options = new MqttConnectOptions();
-        /**
-         * 不保存,每次重启新client
-         */
-        options.setCleanSession(true);
-        options.setUserName(USER_NAME);
-        options.setPassword(PASS_WORD.toCharArray());
-        /**
-         * 设置超时时间
-         */
-        options.setConnectionTimeout(10);
-        /**
-         * 设置会话心跳时间
-         */
-        options.setKeepAliveInterval(20);
-        try {
-            /**
-             * 设置发布回调
-             */
-            client.setCallback(new PublishCallback());
-            client.connect(options);
-            topic11 = client.getTopic(topic);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public static MqttTopic getMqttTopic(String topic) {
+        return client.getTopic(topic);
     }
 
     /**
@@ -119,18 +71,17 @@ public class PublishClient {
      * @throws MqttPersistenceException
      * @throws MqttException
      */
-    public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException,
+    public static void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException,
             MqttException {
         MqttDeliveryToken token = topic.publish(message);
         token.waitForCompletion();
         System.out.println("message is published completely! " + token.isComplete());
     }
 
-    public static void pull(String sign, String deviceSn) throws MqttException, InterruptedException {
+    public static void pull(String sign, String deviceSn) throws MqttException {
         /**
          * 发布客户端
          */
-        PublishClient server = new PublishClient("heart_" + deviceSn, "heart_" + deviceSn);
         Heartbeat test = new Heartbeat();
         test.setName("HeartResponse");
         Data data = new Data();
@@ -140,11 +91,11 @@ public class PublishClient {
         test.setCode(200);
         test.setSign(sign);
         String s = JSON.toJSONString(test);
-        server.message = new MqttMessage();
+        MqttMessage message = new MqttMessage();
         /**
          * 保证消息能到达一次
          */
-        server.message.setQos(1);
+        message.setQos(1);
         /**
          * 消息保留
          */
@@ -152,11 +103,11 @@ public class PublishClient {
         /**
          * 消息内容
          */
-        server.message.setPayload(s.getBytes());
+        message.setPayload(s.getBytes());
         /**
          * 发布
          */
-        server.publish(server.topic11, server.message);
+        publish(getMqttTopic("heart_" + deviceSn), message);
     }
 
     public static void updateDeviceSn(BaseCameraVersionVo baseCameraVersionVo) {
@@ -166,7 +117,6 @@ public class PublishClient {
         log.info("updateDeviceSn---start");
         for (String code : baseCameraVersionVo.getCameraCodeList()) {
             try {
-                PublishClient server = new PublishClient("update_" + code, "update_" + code);
                 log.info("update_" + code);
                 UpdateDeviceSn updateDeviceSn = new UpdateDeviceSn();
                 updateDeviceSn.setName("UpdateRequest");
@@ -175,11 +125,11 @@ public class PublishClient {
                 updateDeviceSn.setSign(IdUtils.fastSimpleUUID());
                 updateDeviceSn.setUrl(baseCameraVersionVo.getVersionAddress());
                 String s = JSON.toJSONString(updateDeviceSn);
-                server.message = new MqttMessage();
+                MqttMessage message = new MqttMessage();
                 /**
                  * 保证消息能到达一次
                  */
-                server.message.setQos(1);
+                message.setQos(1);
                 /**
                  * 消息保留
                  */
@@ -187,25 +137,24 @@ public class PublishClient {
                 /**
                  * 消息内容
                  */
-                server.message.setPayload(s.getBytes());
+                message.setPayload(s.getBytes());
                 /**
                  * 发布
                  */
-                server.publish(server.topic11, server.message);
+                publish(getMqttTopic("update_" + code), message);
                 log.info("updateDeviceSn---end");
             } catch (MqttException e) {
-                System.out.println("updateDeviceSn-------"+e.getMessage());
+                System.out.println("updateDeviceSn-------" + e.getMessage());
             }
         }
     }
 
-    public static void confidenceCoefficient(BaseCameraVersionVo baseCameraVersionVo,String value) {
+    public static void confidenceCoefficient(BaseCameraVersionVo baseCameraVersionVo, String value) {
         /**
          * 发布客户端
          */
         for (String code : baseCameraVersionVo.getCameraCodeList()) {
             try {
-                PublishClient server = new PublishClient("config_" + code, "config_" + code);
                 JSONObject res = new JSONObject();
                 res.put("Name", "ConfigRequest");
                 Svp svp = new Svp();
@@ -215,39 +164,38 @@ public class PublishClient {
                 Map<String, Object> map = new HashMap<>();
                 map.put("svp", svp);
                 res.put("data", map);
-                res.put("sign", "cien"+IdUtils.fastSimpleUUID());
+                res.put("sign", "cien" + IdUtils.fastSimpleUUID());
                 String s = JSONObject.toJSONString(res);
 
-                server.message = new MqttMessage();
+                MqttMessage message = new MqttMessage();
                 /**
                  * 保证消息能到达一次
                  */
-                server.message.setQos(1);
+                message.setQos(1);
                 /**
                  * 消息保留
                  */
-//                server.message.setRetained(false);
+//               message.setRetained(false);
                 /**
                  * 消息内容
                  */
-                server.message.setPayload(s.getBytes());
+                message.setPayload(s.getBytes());
                 /**
                  * 发布
                  */
-                server.publish(server.topic11, server.message);
+                publish(getMqttTopic("config_" + code), message);
             } catch (MqttException e) {
                 System.out.println(e.getMessage());
             }
         }
     }
 
-    public static void configFrameRate(BaseCameraVersionVo baseCameraVersionVo,String value) {
+    public static void configFrameRate(BaseCameraVersionVo baseCameraVersionVo, String value) {
         /**
          * 发布客户端
          */
         for (String code : baseCameraVersionVo.getCameraCodeList()) {
             try {
-                PublishClient server = new PublishClient("config_" + code, "config_" + code);
                 JSONObject res = new JSONObject();
                 res.put("Name", "ConfigRequest");
                 Codec codec = new Codec();
@@ -257,29 +205,69 @@ public class PublishClient {
                 Map<String, Object> map = new HashMap<>();
                 map.put("codec", codec);
                 res.put("data", map);
-                res.put("sign", "rate"+IdUtils.fastSimpleUUID());
+                res.put("sign", "rate" + IdUtils.fastSimpleUUID());
                 String s = JSONObject.toJSONString(res);
 
-                server.message = new MqttMessage();
+                MqttMessage message = new MqttMessage();
                 /**
                  * 保证消息能到达一次
                  */
-                server.message.setQos(1);
+                message.setQos(1);
                 /**
                  * 消息保留
                  */
-//                server.message.setRetained(false);
+//              message.setRetained(false);
                 /**
                  * 消息内容
                  */
-                server.message.setPayload(s.getBytes());
+                message.setPayload(s.getBytes());
                 /**
                  * 发布
                  */
-                server.publish(server.topic11, server.message);
+                publish(getMqttTopic("config_" + code), message);
             } catch (MqttException e) {
                 System.out.println(e.getMessage());
             }
         }
     }
+
+    @PostConstruct
+    public void init() {
+        HOST = rabbitMqConfig.getHost();
+        USER_NAME = rabbitMqConfig.getUserName();
+        PASS_WORD = rabbitMqConfig.getPassword();
+        clientId = rabbitMqConfig.getClientId();
+        /**
+         * 连接配置
+         */
+        MqttConnectOptions options = new MqttConnectOptions();
+        /**
+         * 不保存,每次重启新client
+         */
+        options.setCleanSession(true);
+        options.setUserName(USER_NAME);
+        options.setPassword(PASS_WORD.toCharArray());
+        /**
+         * 设置超时时间
+         */
+        options.setConnectionTimeout(10);
+        /**
+         * 设置会话心跳时间
+         */
+        options.setKeepAliveInterval(20);
+        try {
+            /**
+             * 设置发布回调
+             */
+            client = new MqttClient(HOST, clientId, new MemoryPersistence());
+
+            client.setCallback(new PublishCallback());
+            client.connect(options);
+            int[] Qos = {1};
+            String[] topic1 = {"config", "update", "heart", "test"};
+            client.subscribe(topic1);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
 }

+ 23 - 0
business-service/src/main/java/com/ozs/service/utils/RabbitMqConfig.java

@@ -0,0 +1,23 @@
+package com.ozs.service.utils;
+
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@Data
+public class RabbitMqConfig {
+
+
+    @Value("${mqtt.host:tcp://124.70.58.209:1883}")
+    private String host;
+
+    @Value("${mqtt.userName:guest}")
+    private String userName;
+
+    @Value("${mqtt.passWord:guest}")
+    private String password;
+
+    @Value("${mqtt.clientId:HAZARD-CAMERA-CLIENTID-123}")
+    private String clientId;
+}

+ 0 - 94
business-service/src/main/java/com/ozs/service/utils/SubscribeClient.java

@@ -1,94 +0,0 @@
-package com.ozs.service.utils;
-
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttTopic;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.stereotype.Component;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-/**
- * 订阅客户端
- *
- * @author Administrator
- */
-public class SubscribeClient {
-    /**
-     * mqtt服务器ip
-     */
-    public static  final String HOST="tcp://124.70.58.209:1883";
-    /**
-     * 主题
-     */
-    public String topic;
-    /**
-     * mqtt 客户机ID
-     */
-    private final String clientId;
-    private static final String USER_NAME = "guest";
-    private static final String PASS_WORD = "guest";
-    @SuppressWarnings("unused")
-    private ScheduledExecutorService scheduler;
-
-    public SubscribeClient(String clientId, String topic) {
-        this.clientId = clientId;
-        this.topic = topic;
-    }
-
-    public void start() {
-        try {
-            /**
-             *  host为主机名,clientId即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientId的保存形式,默认为以内存保存
-             */
-            MqttClient client = new MqttClient(HOST, this.clientId, new MemoryPersistence());
-            /**
-             * MQTT的连接设置
-             */
-            MqttConnectOptions options = new MqttConnectOptions();
-            /**
-             *  设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
-             */
-            options.setCleanSession(true);
-            /**
-             * 设置连接的用户名
-             */
-            options.setUserName(USER_NAME);
-            /**
-             * 设置连接的密码
-             */
-            options.setPassword(PASS_WORD.toCharArray());
-            /**
-             * 设置超时时间 单位为秒
-             */
-            options.setConnectionTimeout(10);
-            /**
-             *  设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
-             */
-            options.setKeepAliveInterval(20);
-            /**
-             * 设置回调
-             */
-            client.setCallback(new PublishCallback());
-            MqttTopic topic = client.getTopic(this.topic);
-            /**
-             * setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
-             *  遗嘱
-             */
-            options.setWill(topic, "close".getBytes(), 2, true);
-            client.connect(options);
-            /**
-             * 订阅消息
-             */
-            int[] Qos = {1};
-            String[] topic1 = {this.topic};
-            client.subscribe(topic1, Qos);
-
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-}

+ 3 - 10
hazard-admin/src/main/java/com/ozs/web/controller/mqtt/MqttController.java

@@ -1,14 +1,7 @@
 package com.ozs.web.controller.mqtt;
 
-import com.ozs.common.annotation.Log;
-import com.ozs.common.core.domain.AjaxResult;
-import com.ozs.common.enums.BusinessType;
-import com.ozs.service.utils.PublishClient;
-import com.ozs.service.utils.SubscribeClient;
 import com.ozs.web.core.util.CameraUtil;
-import io.swagger.annotations.ApiOperation;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
@@ -28,10 +21,10 @@ public class MqttController implements ApplicationRunner {
 
     @Override
     public void run(ApplicationArguments args) throws Exception {
-        new SubscribeClient("heart", "heart").start();
+//        SubscribeClient.start("heart");
     }
-    
-    
+
+
     @GetMapping("/cs")
     public void cs(){
         cameraUtil.cameraBlockUp();