Browse Source

MQTT接口开发

gao.qiang 1 year atrás
parent
commit
97705f13f1

+ 1 - 2
business-service/src/main/java/com/ozs/service/service/impl/BaseCameraVersionServiceImpl.java

@@ -116,8 +116,7 @@ public class BaseCameraVersionServiceImpl extends ServiceImpl<BaseCameraVersionM
             cameraUpdateLog.setCreateTime(new Date());
             int insert = cameraUpdateLogMapper.insert(cameraUpdateLog);
         }
-        // baseCameraVersionVo
-        PublishClient.updateDeviceSn("update_DEVICESN");
+        PublishClient.updateDeviceSn(baseCameraVersionVo);
         new SubscribeClient("update", "update").start();
         return AjaxResult.success();
     }

+ 34 - 0
business-service/src/main/java/com/ozs/service/utils/PublishCallback.java

@@ -3,16 +3,27 @@ package com.ozs.service.utils;
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.ozs.service.entity.CameraUpdateLog;
+import com.ozs.service.service.CameraUpdateLogService;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
 
 /**
  * callback回调:
  *
  * @author Administrator
  */
+@Component
 public class PublishCallback implements MqttCallback {
+    
+    @Autowired
+    CameraUpdateLogService cameraUpdateLogService;
 
     //在断开连接时调用
     @Override
@@ -32,6 +43,7 @@ public class PublishCallback implements MqttCallback {
     public void messageArrived(String topic, MqttMessage message) throws Exception {
         String sign = null;
         String deviceSn=null;
+        String code=null;
         // subscribe后得到的消息会执行到这里面
         System.out.println("接收消息主题 : " + topic);
         System.out.println("接收消息Qos : " + message.getQos());
@@ -46,6 +58,28 @@ public class PublishCallback implements MqttCallback {
             }
             PublishClient.pull(sign,deviceSn);
         }
+        if ("update".equals(topic)) {
+            String s = new String(message.getPayload());
+            s = "[" + s + "]";
+            JSONArray jsonArray = JSONArray.parseArray(s);
+            for (int i = 0; i < jsonArray.size(); i++) {
+                JSONObject object = jsonArray.getJSONObject(i);
+                code = (String) object.get("code");
+                deviceSn = (String) object.get("device_sn");
+            }
+            LambdaQueryWrapper<CameraUpdateLog> wrapper = new LambdaQueryWrapper<>();
+            wrapper.eq(CameraUpdateLog::getCameraCode, deviceSn);
+            wrapper.orderByDesc(CameraUpdateLog::getCreateTime);
+            List<CameraUpdateLog> list = cameraUpdateLogService.list();
+            CameraUpdateLog cameraUpdateLog = list.get(0);
+            if ("200".equals(code)){
+                cameraUpdateLog.setStatus(1);
+                cameraUpdateLogService.updateById(cameraUpdateLog);
+            }else if ("400".equals(code)){
+                cameraUpdateLog.setStatus(3);
+                cameraUpdateLogService.updateById(cameraUpdateLog);
+            }
+        }
         System.out.println("接收消息内容 : " + new String(message.getPayload()));
         
     }

+ 35 - 30
business-service/src/main/java/com/ozs/service/utils/PublishClient.java

@@ -2,6 +2,7 @@ package com.ozs.service.utils;
 
 import com.alibaba.fastjson2.JSON;
 import com.ozs.common.utils.sign.Md5Utils;
+import com.ozs.service.entity.vo.BaseCameraVersionVo;
 import com.ozs.service.entity.vo.Data;
 import com.ozs.service.entity.vo.Heartbeat;
 import com.ozs.service.entity.vo.UpdateDeviceSn;
@@ -15,6 +16,8 @@ import org.eclipse.paho.client.mqttv3.MqttTopic;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.springframework.beans.factory.annotation.Value;
 
+import java.util.UUID;
+
 
 /**
  * 发布客户端
@@ -131,7 +134,7 @@ public class PublishClient {
         /**
          * 消息保留
          */
-        server.message.setRetained(true);
+//        server.message.setRetained(false);
         /**
          * 消息内容
          */
@@ -142,38 +145,40 @@ public class PublishClient {
         server.publish(server.topic11, server.message);
     }
 
-    public static void updateDeviceSn(String sign){
+    public static void updateDeviceSn(BaseCameraVersionVo baseCameraVersionVo){
         /**
          * 发布客户端
          */
-        try {
-            PublishClient server = new PublishClient(sign,sign);
-            UpdateDeviceSn updateDeviceSn = new UpdateDeviceSn();
-            updateDeviceSn.setName("UpdateRequest");
-            updateDeviceSn.setType(0);
-            updateDeviceSn.setMd5("b867d0e545d9abbf74c2d11b62798250");
-            updateDeviceSn.setSign("b867d0e545d9abbf74c2d11b62798250");
-            updateDeviceSn.setUrl("https://example.com/download/Model_Helmet2_V213_P_c69cbfa9d7de4234f76d6f3811986e3a.bovai");
-            String s = JSON.toJSONString(updateDeviceSn);
-            server.message = new MqttMessage();
-            /**
-             * 保证消息能到达一次
-             */
-            server.message.setQos(1);
-            /**
-             * 消息保留
-             */
-            server.message.setRetained(true);
-            /**
-             * 消息内容
-             */
-            server.message.setPayload(s.getBytes());
-            /**
-             * 发布
-             */
-            server.publish(server.topic11, server.message);
-        } catch (MqttException e) {
-            System.out.println(e.getMessage());
+        for (String code : baseCameraVersionVo.getCameraCodeList()) {
+            try {
+                PublishClient server = new PublishClient("update_" + code, "update_" + code);
+                UpdateDeviceSn updateDeviceSn = new UpdateDeviceSn();
+                updateDeviceSn.setName("UpdateRequest");
+                updateDeviceSn.setType(Integer.valueOf(baseCameraVersionVo.getUpgradeType()));
+                updateDeviceSn.setMd5(baseCameraVersionVo.getMd5());
+                updateDeviceSn.setSign(UUID.randomUUID().toString());
+                updateDeviceSn.setUrl(baseCameraVersionVo.getVersionAddress());
+                String s = JSON.toJSONString(updateDeviceSn);
+                server.message = new MqttMessage();
+                /**
+                 * 保证消息能到达一次
+                 */
+                server.message.setQos(1);
+                /**
+                 * 消息保留
+                 */
+//                server.message.setRetained(false);
+                /**
+                 * 消息内容
+                 */
+                server.message.setPayload(s.getBytes());
+                /**
+                 * 发布
+                 */
+                server.publish(server.topic11, server.message);
+            } catch (MqttException e) {
+                System.out.println(e.getMessage());
+            }
         }
     }
 }

+ 1 - 9
hazard-admin/src/main/java/com/ozs/web/controller/mqtt/MqttController.java

@@ -27,13 +27,5 @@ public class MqttController {
         new SubscribeClient("heart", "heart").start();
         return AjaxResult.success();
     }
-
-    @ApiOperation(value = "更新固件和算法")
-    @GetMapping("/updateDeviceSn")
-    @Log(title = "MQTT", businessType = BusinessType.UPDATE)
-    public AjaxResult updateDeviceSn() {
-        PublishClient.updateDeviceSn("update_DEVICESN");
-        new SubscribeClient("update", "update").start();
-        return AjaxResult.success();
-    }
+    
 }