Explorar el Código

第二版,安装bug

suntianwu hace 3 años
padre
commit
b4531d6494

+ 5 - 0
src/main/java/com/care/common/constant/Constants.java

@@ -20,4 +20,9 @@ public interface Constants {
      * Mqtt 延迟消息KEY
      */
     String MQTT_MSG_LAG_KEY = "mqttMsgLagKey";
+
+    /**
+     * 设备激活返回消息超时 KEY
+     */
+    String MQTT_DEV_JIHUO_TIMEOUT_KEY = "mqttDevJihuoTimeoutKey";
 }

+ 4 - 1
src/main/java/com/care/common/enums/InstallDevStatusEnum.java

@@ -14,7 +14,8 @@ public enum InstallDevStatusEnum {
      */
     DOING("0","处理中"),
     SUCCESS("1","成功"),
-    FAIL("2","失败");
+    FAIL("2","失败"),
+    TIMEOUT("3","超时");
     private String value;
     private String name;
     InstallDevStatusEnum(String value, String name) {
@@ -35,6 +36,8 @@ public enum InstallDevStatusEnum {
             return SUCCESS.name;
         }else if (FAIL.getValue().equals(code)){
             return FAIL.name;
+        }else if (TIMEOUT.getValue().equals(code)){
+            return TIMEOUT.name;
         }
 
         return null;

+ 34 - 17
src/main/java/com/care/installation/service/RoomService.java

@@ -5,6 +5,8 @@ import cn.hutool.core.bean.BeanUtil;
 import cn.hutool.core.collection.CollUtil;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
+import com.care.common.cache.RedisUtil;
+import com.care.common.constant.Constants;
 import com.care.common.entity.*;
 import com.care.common.enums.DeviceActiveStatusEnum;
 import com.care.common.enums.DeviceStatusEnum;
@@ -40,6 +42,8 @@ public class RoomService {
     private MqttPublishService mqttPublishService;
     @Resource
     private MqttConnectorPoolService mqttConnectorPoolService;
+    @Resource
+    private RedisUtil redisUtil;
 
     public  Map<String,Boolean> getConfStatusInfo(String devCode) {
         Map<String,Boolean> map = new HashMap<>();
@@ -270,26 +274,26 @@ public class RoomService {
             updateWrapper.lambda().eq(CareRoom::getRoomId,devCode)
             .set(CareRoom::getAck,null);
             careRoomService.update(updateWrapper);
-            mqttPublishService.publish(careDevice, getRoomMqttData(roomVO));
+            mqttPublishService.publish(careDevice, getRoomMqttData(roomVO),devCode,"careRoom");
 
             UpdateWrapper<CareRoomSensor> updateWrapper2 = new UpdateWrapper<>();
             updateWrapper2.lambda().eq(CareRoomSensor::getRoomId,devCode)
                     .set(CareRoomSensor::getAck,null);
             careRoomSensorService.update(updateWrapper2);
-            mqttPublishService.publish(careDevice, getRoomSensorMqttData(roomSensorVO));
+            mqttPublishService.publish(careDevice, getRoomSensorMqttData(roomSensorVO),devCode,"careRoomSensor");
 
             UpdateWrapper<CareRoomGate> updateWrapper3 = new UpdateWrapper<>();
             updateWrapper3.lambda().eq(CareRoomGate::getRoomId,devCode)
                     .set(CareRoomGate::getAck,null);
             careRoomGateService.update(updateWrapper3);
-            mqttPublishService.publish(careDevice, getRoomGateVOMqttData(roomGateVOList));
+            mqttPublishService.publish(careDevice, getRoomGateVOMqttData(roomGateVOList), devCode,"careRoomGate");
 
             if (CollUtil.isNotEmpty(roomRegionVOList)){
                 UpdateWrapper<CareRoomRegion> updateWrapper4 = new UpdateWrapper<>();
                 updateWrapper4.lambda().eq(CareRoomRegion::getRoomId,devCode)
                         .set(CareRoomRegion::getAck,null);
                 careRoomRegionService.update(updateWrapper4);
-                mqttPublishService.publish(careDevice, getRoomRegionVOMqttData(roomRegionVOList));
+                mqttPublishService.publish(careDevice, getRoomRegionVOMqttData(roomRegionVOList),devCode,"careRoomRegion");
             }
 
             return true;
@@ -313,7 +317,9 @@ public class RoomService {
         CareRoom careRoom = careRoomService.getOne(queryWrapper2);
 
         int careRoomResult = 0;
-        if (careRoom == null || (StringUtils.isNotEmpty(careRoom.getAck()) && !"true".equals(careRoom.getAck()))) {
+        if (redisUtil.get(Constants.MQTT_DEV_JIHUO_TIMEOUT_KEY + ":" + devCode + ":" + "careRoom") == null) {
+            careRoomResult = 3;
+        } else if (careRoom == null || (StringUtils.isNotEmpty(careRoom.getAck()) && !"true".equals(careRoom.getAck()))) {
             careRoomResult = 2;
         } else if (StringUtils.isEmpty(careRoom.getAck())) {
             careRoomResult = 0;
@@ -326,7 +332,9 @@ public class RoomService {
         CareRoomSensor careRoomSensor = careRoomSensorService.getOne(queryWrapper3);
 
         int careRoomSensorResult = 0;
-        if (careRoomSensor == null || (StringUtils.isNotEmpty(careRoomSensor.getAck()) && !"true".equals(careRoomSensor.getAck()))) {
+        if (redisUtil.get(Constants.MQTT_DEV_JIHUO_TIMEOUT_KEY + ":" + devCode + ":" + "careRoomSensor") == null) {
+            careRoomSensorResult = 3;
+        } else if (careRoomSensor == null || (StringUtils.isNotEmpty(careRoomSensor.getAck()) && !"true".equals(careRoomSensor.getAck()))) {
             careRoomSensorResult = 2;
         }
         if (StringUtils.isEmpty(careRoomSensor.getAck())) {
@@ -339,9 +347,11 @@ public class RoomService {
         queryWrapper4.lambda().eq(CareRoomGate::getRoomId,devCode);
         List<CareRoomGate> careRoomGateList = careRoomGateService.list(queryWrapper4);
         int careRoomGateResult = 0;
-        if(CollUtil.isNotEmpty(careRoomGateList)){
+        if (redisUtil.get(Constants.MQTT_DEV_JIHUO_TIMEOUT_KEY + ":" + devCode + ":" + "careRoomGate") == null) {
+            careRoomGateResult = 3;
+        } else if(CollUtil.isNotEmpty(careRoomGateList)){
             CareRoomGate careRoomGate = careRoomGateList.get(0);
-            if(careRoomGate == null|| (StringUtils.isNotEmpty(careRoomGate.getAck()) && !"true".equals(careRoomGate.getAck()))) {
+            if(careRoomGate == null || (StringUtils.isNotEmpty(careRoomGate.getAck()) && !"true".equals(careRoomGate.getAck()))) {
                 careRoomGateResult = 2;
             } else if(StringUtils.isEmpty(careRoomGate.getAck())) {
                 careRoomGateResult = 0;
@@ -356,7 +366,9 @@ public class RoomService {
         queryWrapper5.lambda().eq(CareRoomRegion::getRoomId,devCode);
         List<CareRoomRegion> careRoomRegionList = careRoomRegionService.list(queryWrapper5);
         int careRoomRegionResult = 0;
-        if(CollUtil.isNotEmpty(careRoomRegionList)){
+        if (redisUtil.get(Constants.MQTT_DEV_JIHUO_TIMEOUT_KEY + ":" + devCode + ":" + "careRoomRegion") == null) {
+            careRoomRegionResult = 3;
+        } else if(CollUtil.isNotEmpty(careRoomRegionList)){
             CareRoomRegion careRoomRegion = careRoomRegionList.get(0);
             if (careRoomRegion == null|| (StringUtils.isNotEmpty(careRoomRegion.getAck()) && !"true".equals(careRoomRegion.getAck()))) {
                 careRoomRegionResult = 2;
@@ -370,19 +382,24 @@ public class RoomService {
         }
 
          if (careRoomResult == 2 || careRoomSensorResult == 2 || careRoomGateResult == 2 || careRoomRegionResult == 2) { //失败
-            //设备离线
-            careDevice.setStatus(DeviceStatusEnum.OFFLINE.getValue());
-            mqttConnectorPoolService.removeOne(careDevice);
-            //未激活
-            careDevice.setActiveStatus(DeviceActiveStatusEnum.NOACTIVE.getValue());
-            careDeviceService.updateById(careDevice);
+//            //设备离线
+//            careDevice.setStatus(DeviceStatusEnum.OFFLINE.getValue());
+//            mqttConnectorPoolService.removeOne(careDevice);
+//            //未激活
+//            careDevice.setActiveStatus(DeviceActiveStatusEnum.NOACTIVE.getValue());
+//            careDeviceService.updateById(careDevice);
 
             //去除publish的客户端监听
             careDevice.setClientId(careDevice.getClientId() +"_publish");
             mqttConnectorPoolService.removeOne(careDevice);
             return InstallDevStatusEnum.FAIL.getValue();
-        } else if (careRoomResult == 0 || careRoomSensorResult == 0 || careRoomGateResult == 0 || careRoomRegionResult == 0){ //处理中
-            return InstallDevStatusEnum.DOING.getValue();
+        }  else if (careRoomResult == 3 || careRoomSensorResult == 3 || careRoomGateResult == 3 || careRoomRegionResult == 3){ //超时
+             //去除publish的客户端监听
+             careDevice.setClientId(careDevice.getClientId() +"_publish");
+             mqttConnectorPoolService.removeOne(careDevice);
+             return InstallDevStatusEnum.TIMEOUT.getValue();
+         }  else if (careRoomResult == 0 || careRoomSensorResult == 0 || careRoomGateResult == 0 || careRoomRegionResult == 0){ //处理中
+             return InstallDevStatusEnum.DOING.getValue();
         }  else { //成功
             //激活
             careDevice.setActiveStatus(DeviceActiveStatusEnum.ACTIVE.getValue());

+ 11 - 2
src/main/java/com/care/mqtt/service/MqttConnectorPoolService.java

@@ -39,13 +39,22 @@ public class MqttConnectorPoolService {
      * 添加一个设备到服务
      */
     public MqttDataConnector addOne(CareDevice careDevice) {
+
         if (StringUtils.isEmpty(careDevice.getTopic()) || StringUtils.isEmpty(careDevice.getClientId())) {
             return null;
         }
-        MqttDataConnector xbMqttDataConnector = new MqttDataConnector();
+
+        String cliendId = careDevice.getClientId();
+
+        MqttDataConnector xbMqttDataConnector = MqttDataConnectorPool.getMqttDataConnectorMap().get(cliendId);
+        if ( xbMqttDataConnector != null) {
+            return xbMqttDataConnector;
+        }
+
+        xbMqttDataConnector = new MqttDataConnector();
         Map<String, String> configParams = new HashMap<>();
         String host = "ws://" + CommonConfUtil.getConf("mqtt.broker") + ":" + CommonConfUtil.getConf("mqtt.port");
-        String cliendId = careDevice.getClientId();
+
         String username = CommonConfUtil.getConf("mqtt.username");
         String password = CommonConfUtil.getConf("mqtt.password");
         String topic = careDevice.getTopic();

+ 13 - 8
src/main/java/com/care/mqtt/service/MqttPublishService.java

@@ -2,6 +2,9 @@ package com.care.mqtt.service;
 
 
 import cn.hutool.core.bean.BeanUtil;
+import com.care.common.cache.RedisUtil;
+import com.care.common.constant.Constants;
+import com.care.common.util.CommonConfUtil;
 import com.care.mqtt.tool.MqttDataConnector;
 
 import com.care.common.entity.CareDevice;
@@ -11,6 +14,7 @@ import org.springframework.util.StringUtils;
 
 import javax.annotation.Resource;
 
+
 /**
  * @Author: stw
  * @Date: 2021/8/14
@@ -20,12 +24,13 @@ import javax.annotation.Resource;
 public class MqttPublishService {
     @Resource
     private MqttConnectorPoolService mqttConnectorPoolService;
-
+    @Resource
+    private RedisUtil redisUtil;
 
     /**
      * 发布消息
      */
-    public boolean publish(CareDevice careDevice,String data) {
+    public boolean publish(CareDevice careDevice,String data,String devCode,String roomType) {
         try {
 
             CareDevice careDevicePublish = new CareDevice();
@@ -40,15 +45,15 @@ public class MqttPublishService {
             String publishTopic = topic.replace("event","control");
             String publishCliendId = cliendId + "_publish";
 
-            MqttDataConnector mqttDataConnector = MqttDataConnectorPool.getMqttDataConnectorMap().get(publishCliendId);
-            if (mqttDataConnector == null) {
-                careDevicePublish.setTopic(publishTopic);
-                careDevicePublish.setClientId(publishCliendId);
-                mqttDataConnector = mqttConnectorPoolService.addOne(careDevicePublish);
-            }
+            careDevicePublish.setTopic(publishTopic);
+            careDevicePublish.setClientId(publishCliendId);
+            MqttDataConnector mqttDataConnector = mqttConnectorPoolService.addOne(careDevicePublish);
 
             mqttDataConnector.publish(publishTopic, data);
 
+            //往redis里设置一个超时时钟,超时之后还没返回就认为超时了
+            Long timeout = Long.parseLong(CommonConfUtil.getConf("mqtt.dev.jihuo.timeout"));
+            redisUtil.set(Constants.MQTT_DEV_JIHUO_TIMEOUT_KEY + ":" + devCode + ":" + roomType,"", timeout);
             return true;
         } catch (Exception e) {
             e.printStackTrace();

+ 3 - 1
src/main/resources/common.properties

@@ -14,4 +14,6 @@ mqtt.port=8083
 mqtt.username=test
 mqtt.password=public
 #是否开启mqtt消息接受服务,开启:1,不开启:0
-mqtt.on=1
+mqtt.on=1
+#设备激活返回消息超时(秒)
+mqtt.dev.jihuo.timeout=5