Parcourir la source

MQ代码开发

gao.qiang il y a 2 ans
Parent
commit
11ddc4fed1

+ 15 - 22
business-service/src/main/java/com/ozs/service/utils/PublishCallback.java

@@ -1,13 +1,15 @@
 package com.ozs.service.utils;
 package com.ozs.service.utils;
 
 
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.alibaba.fastjson2.JSONObject;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 
 
 /**
 /**
- *  callback回调:
+ * callback回调:
+ *
  * @author Administrator
  * @author Administrator
  */
  */
 public class PublishCallback implements MqttCallback {
 public class PublishCallback implements MqttCallback {
@@ -28,30 +30,21 @@ public class PublishCallback implements MqttCallback {
     //接收已经预订的发布
     //接收已经预订的发布
     @Override
     @Override
     public void messageArrived(String topic, MqttMessage message) throws Exception {
     public void messageArrived(String topic, MqttMessage message) throws Exception {
+        String sign = null;
         // subscribe后得到的消息会执行到这里面
         // subscribe后得到的消息会执行到这里面
         System.out.println("接收消息主题 : " + topic);
         System.out.println("接收消息主题 : " + topic);
         System.out.println("接收消息Qos : " + message.getQos());
         System.out.println("接收消息Qos : " + message.getQos());
+        if ("heart".equals(topic)) {
+            String s = new String(message.getPayload());
+            s = "[" + s + "]";
+            JSONArray jsonArray = JSONArray.parseArray(s);
+            for (int i = 0; i < jsonArray.size(); i++) {
+                JSONObject object = jsonArray.getJSONObject(i);
+                sign = (String) object.get("sign");
+            }
+            PublishClient.pull(sign);
+        }
         System.out.println("接收消息内容 : " + new String(message.getPayload()));
         System.out.println("接收消息内容 : " + new String(message.getPayload()));
-    }
-
-    public static void main(String[] args) {
-        String s=new String("{\n" +
-                "    \"Name\": \"HeartRequest\",\n" +
-                "    \"device_sn\": \"202211010001\" \n" +
-                "    \"rssi\": 1,          \n" +
-                "    \"battery_level\":99,       \n" +
-                "    \"timestamp\": 1610959147, \n" +
-                "    \"language\": 0, \n" +
-                "    \"sw_ver\": \"LuatOS-Air_V3205_RDA8910_RBLFTVT_0xb0000__TW06-530_1.3.1\", \n" +
-                "    \"hw_ver\": \"vLuatOS-Air_V3205_RDA8910_RBLFTVT_0xb0000__TW06-530_1.3.1\", \n" +
-                "    \"stream\": 0, \n" +
-                "    \"model\":[\"Model_Helmet2_V213_P_c69cbfa9d7de4234f76d6f3811986e3a.bovai\",\"model_2\"], \n" +
-                "    \"type\": 0,\n" +
-                "    \"sign\": \"b867d0e545d9abbf74c2d11b62798250\"\n" +
-                "}");
-
-        JSONObject jsonObject =  JSON.parseObject(s);
-        Object sign = jsonObject.get("sign");
-        System.out.println(sign.toString());
+        
     }
     }
 }
 }

+ 102 - 48
business-service/src/main/java/com/ozs/service/utils/PublishClient.java

@@ -11,80 +11,134 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
 import org.eclipse.paho.client.mqttv3.MqttTopic;
 import org.eclipse.paho.client.mqttv3.MqttTopic;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Value;
 
 
 
 
 /**
 /**
  * 发布客户端
  * 发布客户端
+ *
  * @author Administrator
  * @author Administrator
  */
  */
 public class PublishClient {
 public class PublishClient {
-    //mqtt服务器地址
-    public static final String HOST = "tcp://124.70.58.209:1883";
-    //主题
-    public static final String TOPIC = "heart_DEVICESN";
-    //mqtt 客户机ID
-    private static final String clientid = "server";
-    private MqttClient client;//客户端实例
-    private MqttTopic topic11;//主题实例
-    private String userName = "guest";  //非必须
-    private String passWord = "guest";  //非必须
+    /**
+     * mqtt服务器地址
+     */
+    public static  final String HOST="tcp://124.70.58.209:1883";
+    /**
+     * 主题
+     */
+    private final String topic;
+    /**
+     * 客户端实例
+     */
+    private final MqttClient client;
+    /**
+     * 主题实例
+     */
+    private MqttTopic topic11;
+    private static final String USER_NAME = "guest";
+    private static final String PASS_WORD = "guest";
     private MqttMessage message;
     private MqttMessage message;
-    //初始化客户端实例
-    public PublishClient() throws MqttException {
-        //MemoryPersistence设置clientid的保存形式,默认为以内存保存
-        client = new MqttClient(HOST, clientid, new MemoryPersistence());
+
+    /**
+     * 初始化客户端实例
+     * @param topic
+     * @param clientId
+     * @throws MqttException
+     */
+    public PublishClient(String topic,String clientId) throws MqttException {
+        /**
+         * mqtt 客户机ID
+         * heart_DEVICESN
+         */
+        this.topic=topic;
+        /**
+         * MemoryPersistence设置clientId的保存形式,默认为以内存保存
+         */
+        client = new MqttClient(HOST, clientId, new MemoryPersistence());
         connect();
         connect();
     }
     }
-    //连接服务器
+
+    /**
+     * 连接服务器
+     */
     private void connect() {
     private void connect() {
-        //连接配置
+        /**
+         * 连接配置
+         */
         MqttConnectOptions options = new MqttConnectOptions();
         MqttConnectOptions options = new MqttConnectOptions();
-        options.setCleanSession(true);//不保存,每次重启新client
-        options.setUserName(userName);
-        options.setPassword(passWord.toCharArray());
-        // 设置超时时间
+        /**
+         * 不保存,每次重启新client
+         */
+        options.setCleanSession(true);
+        options.setUserName(USER_NAME);
+        options.setPassword(PASS_WORD.toCharArray());
+        /**
+         * 设置超时时间
+         */
         options.setConnectionTimeout(10);
         options.setConnectionTimeout(10);
-        // 设置会话心跳时间
+        /**
+         * 设置会话心跳时间
+         */
         options.setKeepAliveInterval(20);
         options.setKeepAliveInterval(20);
         try {
         try {
-            //设置发布回调
+            /**
+             * 设置发布回调
+             */
             client.setCallback(new PublishCallback());
             client.setCallback(new PublishCallback());
             client.connect(options);
             client.connect(options);
-            topic11 = client.getTopic(TOPIC);
+            topic11 = client.getTopic(topic);
         } catch (Exception e) {
         } catch (Exception e) {
             e.printStackTrace();
             e.printStackTrace();
         }
         }
     }
     }
-    //发布
-    public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
+
+    /**
+     * 发布
+     * @param topic
+     * @param message
+     * @throws MqttPersistenceException
+     * @throws MqttException
+     */
+    public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException,
             MqttException {
             MqttException {
         MqttDeliveryToken token = topic.publish(message);
         MqttDeliveryToken token = topic.publish(message);
         token.waitForCompletion();
         token.waitForCompletion();
-        System.out.println("message is published completely! "+ token.isComplete());
+        System.out.println("message is published completely! " + token.isComplete());
     }
     }
-    //测试类
-    public static void main(String[] args) throws MqttException, InterruptedException {
-        //发布客户端
-        PublishClient server = new PublishClient();
-        //每隔10s发一条
-        for (;;){
-            Heartbeat test = new Heartbeat();
-            test.setName("HeartResponse");
-            Data data = new Data();
-            data.setAlarm(0);
-            data.setStream(0);
-            test.setData(data);
-            test.setCode(200);
-            test.setSign("b867d0e545d9abbf74c2d11b62798250");
-            String s = JSON.toJSONString(test);
+    
+    public static void pull(String sign) throws MqttException, InterruptedException {
+        /**
+         * 发布客户端
+         */
+        PublishClient server = new PublishClient("heart_DEVICESN","heart_DEVICESN");
+        Heartbeat test = new Heartbeat();
+        test.setName("HeartResponse");
+        Data data = new Data();
+        data.setAlarm(0);
+        data.setStream(0);
+        test.setData(data);
+        test.setCode(200);
+        test.setSign(sign);
+        String s = JSON.toJSONString(test);
 
 
 
 
-            server.message = new MqttMessage();
-            server.message.setQos(1);//保证消息能到达一次
-            server.message.setRetained(true);//消息保留
-            server.message.setPayload(s.getBytes());//消息内容
-            server.publish(server.topic11 , server.message);//发布
-            Thread.sleep(10000);
-        }
+        server.message = new MqttMessage();
+        /**
+         * 保证消息能到达一次
+         */
+        server.message.setQos(1);
+        /**
+         * 消息保留
+         */
+        server.message.setRetained(true);
+        /**
+         * 消息内容
+         */
+        server.message.setPayload(s.getBytes());
+        /**
+         * 发布
+         */
+        server.publish(server.topic11, server.message);
     }
     }
 }
 }

+ 1 - 0
hazard-admin/src/main/java/com/ozs/web/controller/accountmanagment/BaseCameraManagementController.java

@@ -370,6 +370,7 @@ public class BaseCameraManagementController extends BaseController {
         BaseCameraManagement management = baseCameraManagementService.getById(baseCameraManagement.getId());
         BaseCameraManagement management = baseCameraManagementService.getById(baseCameraManagement.getId());
         baseCameraManagement.setUpdateBy(getUserId());
         baseCameraManagement.setUpdateBy(getUserId());
         baseCameraManagement.setCreateBy(management.getCreateBy());
         baseCameraManagement.setCreateBy(management.getCreateBy());
+        baseCameraManagement.setUpdateTime(new Date());
         LambdaQueryWrapper<BaseDeviceDynamicManagement> queryWrapper = new LambdaQueryWrapper<BaseDeviceDynamicManagement>();
         LambdaQueryWrapper<BaseDeviceDynamicManagement> queryWrapper = new LambdaQueryWrapper<BaseDeviceDynamicManagement>();
         if (!ObjectUtils.isEmpty(management.getCameraCode())) {
         if (!ObjectUtils.isEmpty(management.getCameraCode())) {
             queryWrapper.eq(BaseDeviceDynamicManagement::getCameraCode, management.getCameraCode());
             queryWrapper.eq(BaseDeviceDynamicManagement::getCameraCode, management.getCameraCode());

+ 9 - 1
hazard-admin/src/main/java/com/ozs/web/controller/mqtt/MqttController.java

@@ -25,7 +25,15 @@ public class MqttController {
     @GetMapping("/deviceHeartbeat")
     @GetMapping("/deviceHeartbeat")
     @Log(title = "MQTT", businessType = BusinessType.SELECT)
     @Log(title = "MQTT", businessType = BusinessType.SELECT)
     public AjaxResult selectCameraAll() {
     public AjaxResult selectCameraAll() {
-        new SubscribeClient("client_1").start();
+        new SubscribeClient("heart","heart").start();
+        return AjaxResult.success();
+    }
+
+    @ApiOperation(value = "更新固件和算法")
+    @GetMapping("/updateDeviceSn")
+    @Log(title = "MQTT", businessType = BusinessType.UPDATE)
+    public AjaxResult updateDeviceSn() {
+        new SubscribeClient("update_DEVICESN","update_DEVICESN").start();
         return AjaxResult.success();
         return AjaxResult.success();
     }
     }
 }
 }

+ 5 - 0
hazard-admin/src/main/resources/application.yml

@@ -162,3 +162,8 @@ file:
   avatarUrl: /data/service/avatar/
   avatarUrl: /data/service/avatar/
   #文件访问前缀
   #文件访问前缀
   filreUrl: http://124.71.171.71:18878
   filreUrl: http://124.71.171.71:18878
+  
+mqtt:
+  host: tcp://124.70.58.209:1883
+  userName: guest
+  passWord: guest