Browse Source

MQ队列开发

gao.qiang 1 năm trước cách đây
mục cha
commit
24e53de65e

+ 11 - 0
business-service/src/main/java/com/ozs/service/entity/vo/Codec.java

@@ -0,0 +1,11 @@
+package com.ozs.service.entity.vo;
+
+import lombok.Data;
+
+/**
+ * @author Administrator
+ */
+@Data
+public class Codec {
+    private Venc0 venc0;
+}

+ 11 - 0
business-service/src/main/java/com/ozs/service/entity/vo/Venc0.java

@@ -0,0 +1,11 @@
+package com.ozs.service.entity.vo;
+
+import lombok.Data;
+
+/**
+ * @author Administrator
+ */
+@Data
+public class Venc0 {
+    private Double fps;
+}

+ 12 - 0
business-service/src/main/java/com/ozs/service/service/impl/BaseCameraVersionServiceImpl.java

@@ -139,6 +139,18 @@ public class BaseCameraVersionServiceImpl extends ServiceImpl<BaseCameraVersionM
             cameraLogDetail.setUpdateTime(new Date());
             int insert = cameraLogDetailMapper.insert(cameraLogDetail);
         }
+        if ("3".equals(baseCameraVersionVo.getUpgradeType())){
+            List<BaseVersionCameraParam> baseVersionCameraParamList = baseCameraVersionVo.getBaseVersionCameraParamList();
+            for (BaseVersionCameraParam baseVersionCameraParam : baseVersionCameraParamList) {
+                if ("threshold".equals(baseVersionCameraParam.getKey())){
+                    PublishClient.confidenceCoefficient(baseCameraVersionVo,baseVersionCameraParam.getValue());
+                    new SubscribeClient("config", "config").start();
+                }else if("fps".equals(baseVersionCameraParam.getKey())){
+                    PublishClient.configFrameRate(baseCameraVersionVo,baseVersionCameraParam.getValue());
+                    new SubscribeClient("config", "config").start();
+                }
+            }
+        }
         PublishClient.updateDeviceSn(baseCameraVersionVo);
         new SubscribeClient("update", "update").start();
         return AjaxResult.success();

+ 10 - 0
business-service/src/main/java/com/ozs/service/utils/PublishCallback.java

@@ -66,6 +66,16 @@ public class PublishCallback implements MqttCallback {
                 deviceSn = (String) object.get("device_sn");
             }
             CallbackUtil.callback(deviceSn,code);
+        }if ("config".equals(topic)){
+            String s = new String(message.getPayload());
+            s = "[" + s + "]";
+            JSONArray jsonArray = JSONArray.parseArray(s);
+            for (int i = 0; i < jsonArray.size(); i++) {
+                JSONObject object = jsonArray.getJSONObject(i);
+                code = (Integer) object.get("code");
+                deviceSn = (String) object.get("device_sn");
+            }
+            CallbackUtil.callback(deviceSn,code);
         }
         System.out.println("接收消息内容 : " + new String(message.getPayload()));
         

+ 78 - 32
business-service/src/main/java/com/ozs/service/utils/PublishClient.java

@@ -5,11 +5,13 @@ import com.alibaba.fastjson2.JSONObject;
 import com.ozs.common.utils.sign.Md5Utils;
 import com.ozs.common.utils.stateSecrets.SM4Utils;
 import com.ozs.service.entity.vo.BaseCameraVersionVo;
+import com.ozs.service.entity.vo.Codec;
 import com.ozs.service.entity.vo.Data;
 import com.ozs.service.entity.vo.Heartbeat;
 import com.ozs.service.entity.vo.Param;
 import com.ozs.service.entity.vo.Svp;
 import com.ozs.service.entity.vo.UpdateDeviceSn;
+import com.ozs.service.entity.vo.Venc0;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
@@ -191,43 +193,87 @@ public class PublishClient {
         }
     }
 
-    public static void confidenceCoefficient(String code) {
+    public static void confidenceCoefficient(BaseCameraVersionVo baseCameraVersionVo,String value) {
         /**
          * 发布客户端
          */
-        try {
-            PublishClient server = new PublishClient("config_" + code, "config_" + code);
-            JSONObject res = new JSONObject();
-            res.put("Name", "ConfigRequest");
-            Svp svp=new Svp();
-            Param param = new Param();
-            param.setThreshold(0.700000);
-            svp.setParam(param);
-            Map<String,Object> map=new HashMap<>();
-            map.put("svp",svp);
-            res.put("data",map);
-            res.put("sign",UUID.randomUUID().toString());
-            String s = JSONObject.toJSONString(res);
+        for (String code : baseCameraVersionVo.getCameraCodeList()) {
+            try {
+                PublishClient server = new PublishClient("config_" + code, "config_" + code);
+                JSONObject res = new JSONObject();
+                res.put("Name", "ConfigRequest");
+                Svp svp = new Svp();
+                Param param = new Param();
+                param.setThreshold(Double.valueOf(value));
+                svp.setParam(param);
+                Map<String, Object> map = new HashMap<>();
+                map.put("svp", svp);
+                res.put("data", map);
+                res.put("sign", UUID.randomUUID().toString());
+                String s = JSONObject.toJSONString(res);
 
-            server.message = new MqttMessage();
-            /**
-             * 保证消息能到达一次
-             */
-            server.message.setQos(1);
-            /**
-             * 消息保留
-             */
+                server.message = new MqttMessage();
+                /**
+                 * 保证消息能到达一次
+                 */
+                server.message.setQos(1);
+                /**
+                 * 消息保留
+                 */
 //                server.message.setRetained(false);
-            /**
-             * 消息内容
-             */
-            server.message.setPayload(s.getBytes());
-            /**
-             * 发布
-             */
-            server.publish(server.topic11, server.message);
-        } catch (MqttException e) {
-            System.out.println(e.getMessage());
+                /**
+                 * 消息内容
+                 */
+                server.message.setPayload(s.getBytes());
+                /**
+                 * 发布
+                 */
+                server.publish(server.topic11, server.message);
+            } catch (MqttException e) {
+                System.out.println(e.getMessage());
+            }
+        }
+    }
+
+    public static void configFrameRate(BaseCameraVersionVo baseCameraVersionVo,String value) {
+        /**
+         * 发布客户端
+         */
+        for (String code : baseCameraVersionVo.getCameraCodeList()) {
+            try {
+                PublishClient server = new PublishClient("config_" + code, "config_" + code);
+                JSONObject res = new JSONObject();
+                res.put("Name", "ConfigRequest");
+                Codec codec = new Codec();
+                Venc0 venc0 = new Venc0();
+                venc0.setFps(Double.valueOf(value));
+                codec.setVenc0(venc0);
+                Map<String, Object> map = new HashMap<>();
+                map.put("codec", codec);
+                res.put("data", map);
+                res.put("sign", UUID.randomUUID().toString());
+                String s = JSONObject.toJSONString(res);
+
+                server.message = new MqttMessage();
+                /**
+                 * 保证消息能到达一次
+                 */
+                server.message.setQos(1);
+                /**
+                 * 消息保留
+                 */
+//                server.message.setRetained(false);
+                /**
+                 * 消息内容
+                 */
+                server.message.setPayload(s.getBytes());
+                /**
+                 * 发布
+                 */
+                server.publish(server.topic11, server.message);
+            } catch (MqttException e) {
+                System.out.println(e.getMessage());
+            }
         }
     }
 }

+ 0 - 9
hazard-admin/src/main/java/com/ozs/web/controller/mqtt/MqttController.java

@@ -23,15 +23,6 @@ import org.springframework.web.bind.annotation.RestController;
 public class MqttController implements ApplicationRunner {
     
 
-    @ApiOperation(value = "置信度修改")
-    @GetMapping("/confidenceCoefficient")
-    @Log(title = "MQTT", businessType = BusinessType.SELECT)
-    public AjaxResult confidenceCoefficient() {
-        PublishClient.confidenceCoefficient("42010001541320000024");
-        new SubscribeClient("config", "config").start();
-        return AjaxResult.success();
-    }
-
     @Override
     public void run(ApplicationArguments args) throws Exception {
         new SubscribeClient("heart", "heart").start();