Procházet zdrojové kódy

二期 频安宝bug

suntianwu před 3 roky
rodič
revize
77383abb2b

+ 21 - 6
src/main/java/com/care/bms/service/DeviceService.java

@@ -10,6 +10,7 @@ import com.care.common.entity.*;
 import com.care.common.enums.*;
 import com.care.common.exception.BDException;
 import com.care.common.service.*;
+import com.care.common.util.CommonConfUtil;
 import com.care.common.util.MyBeanUtils;
 import com.care.common.vo.PageReqVO;
 import com.care.common.vo.UserLogindConvertVO;
@@ -26,6 +27,7 @@ import javax.annotation.Resource;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.UUID;
 
 /**
  * @Author: lilt
@@ -148,9 +150,14 @@ public class DeviceService {
         device.setCreateTime(new Date());
         device.setOrgId(loginUser.getOrgId());
 
-        device.setTopic("5JPD/monitor/"+device.getDevCode()+"/event");
-        Long time = System.currentTimeMillis();
-        device.setClientId(HexUtil.toHex(time));
+        String topic = CommonConfUtil.getConf("mqtt.topic.work.template").replace("XXXXX",device.getDevCode());
+        device.setTopic(topic);
+
+        String topicInstall = CommonConfUtil.getConf("mqtt.topic.install.template").replace("XXXXX",device.getDevCode());
+        device.setTopicInstall(topicInstall);
+
+        device.setClientId(UUID.randomUUID().toString());
+        device.setClientIdInstall(UUID.randomUUID().toString());
 
         this.careDeviceService.save(device);
         //保存完后,启动通道
@@ -214,12 +221,20 @@ public class DeviceService {
 
         //mqtt
         if(StringUtils.isEmpty(device.getTopic())){
-            device.setTopic("5JPD/monitor/"+device.getDevCode()+"/event");
+            String topic = CommonConfUtil.getConf("mqtt.topic.work.template").replace("XXXXX",device.getDevCode());
+            device.setTopic(topic);
+        }
+        if(StringUtils.isEmpty(device.getTopicInstall())){
+            String topicInstall = CommonConfUtil.getConf("mqtt.topic.install.template").replace("XXXXX",device.getDevCode());
+            device.setTopicInstall(topicInstall);
         }
         if(StringUtils.isEmpty(device.getClientId())){
-            Long time = System.currentTimeMillis();
-            device.setClientId(HexUtil.toHex(time));
+            device.setClientId(UUID.randomUUID().toString());
         }
+        if(StringUtils.isEmpty(device.getClientIdInstall())){
+            device.setClientIdInstall(UUID.randomUUID().toString());
+        }
+
         device.setModifyTime(new Date());
         this.careDeviceService.updateById(device);
 

+ 9 - 2
src/main/java/com/care/common/entity/CareDevice.java

@@ -130,15 +130,22 @@ public class CareDevice implements Serializable {
     private Integer heartRate;
 
 
-    @ApiModelProperty("设备mqtt消息topic")
+    @ApiModelProperty("设备工作时接受mqtt消息topic")
     @TableField("TOPIC")
     private String topic;
 
+    @ApiModelProperty("设备安装时发布房间信息的topic")
+    @TableField("TOPIC_INSTALL")
+    private String topicInstall;
 
-    @ApiModelProperty("设备mqtt消息客户端ID")
+    @ApiModelProperty("设备工作时的mqtt的客户端ID")
     @TableField("CLIENT_ID")
     private String clientId;
 
+    @ApiModelProperty("设备安装时的mqtt的客户端ID")
+    @TableField("CLIENT_ID_INSTALL")
+    private String clientIdInstall;
+
     @ApiModelProperty("备注")
     @TableField("REMARK")
     private String remark;

+ 7 - 2
src/main/java/com/care/common/vo/device/DeviceVO.java

@@ -105,13 +105,18 @@ public class DeviceVO {
     private Integer heartRate;
 
 
-    @ApiModelProperty("设备mqtt消息topic")
+    @ApiModelProperty("设备工作时接受mqtt消息topic")
     private String topic;
 
+    @ApiModelProperty("设备安装时发布房间信息的topic")
+    private String topicInstall;
 
-    @ApiModelProperty("设备mqtt消息客户端ID")
+    @ApiModelProperty("设备工作时的mqtt的客户端ID")
     private String clientId;
 
+    @ApiModelProperty("设备安装时的mqtt的客户端ID")
+    private String clientIdInstall;
+
     @ApiModelProperty("备注")
     private String remark;
 

+ 17 - 0
src/main/java/com/care/installation/service/DevInstallService.java

@@ -7,6 +7,7 @@ import com.care.common.entity.*;
 
 import com.care.common.service.*;
 
+import com.care.common.util.CommonConfUtil;
 import com.care.installation.vo.*;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -14,6 +15,7 @@ import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
 import java.util.Date;
+import java.util.UUID;
 
 @Slf4j
 @Service
@@ -116,6 +118,21 @@ public class DevInstallService {
                 careDevice.setHouseId(careHouse.getId());
             }
         }
+        //mqtt
+        if(StringUtils.isEmpty(careDevice.getTopic())){
+            String topic = CommonConfUtil.getConf("mqtt.topic.work.template").replace("XXXXX",careDevice.getDevCode());
+            careDevice.setTopic(topic);
+        }
+        if(StringUtils.isEmpty(careDevice.getTopicInstall())){
+            String topicInstall = CommonConfUtil.getConf("mqtt.topic.install.template").replace("XXXXX",careDevice.getDevCode());
+            careDevice.setTopicInstall(topicInstall);
+        }
+        if(StringUtils.isEmpty(careDevice.getClientId())){
+            careDevice.setClientId(UUID.randomUUID().toString());
+        }
+        if(StringUtils.isEmpty(careDevice.getClientIdInstall())){
+            careDevice.setClientIdInstall(UUID.randomUUID().toString());
+        }
 
         return careDeviceService.updateById(careDevice);
     }

+ 6 - 8
src/main/java/com/care/installation/service/RoomService.java

@@ -14,6 +14,7 @@ import com.care.common.enums.InstallDevStatusEnum;
 import com.care.common.service.*;
 import com.care.common.util.JsonUtil;
 import com.care.installation.vo.*;
+import com.care.mqtt.service.MqttConnectorInstallPoolService;
 import com.care.mqtt.service.MqttConnectorPoolService;
 import com.care.mqtt.service.MqttPublishService;
 import lombok.extern.slf4j.Slf4j;
@@ -41,7 +42,7 @@ public class RoomService {
     @Resource
     private MqttPublishService mqttPublishService;
     @Resource
-    private MqttConnectorPoolService mqttConnectorPoolService;
+    private MqttConnectorInstallPoolService mqttConnectorInstallPoolService;
     @Resource
     private RedisUtil redisUtil;
 
@@ -390,13 +391,11 @@ public class RoomService {
 //            careDeviceService.updateById(careDevice);
 
             //去除publish的客户端监听
-            careDevice.setClientId(careDevice.getClientId() +"_publish");
-            mqttConnectorPoolService.removeOne(careDevice);
+             mqttConnectorInstallPoolService.removeInstallOne(careDevice);
             return InstallDevStatusEnum.FAIL.getValue();
         }  else if (careRoomResult == 3 || careRoomSensorResult == 3 || careRoomGateResult == 3 || careRoomRegionResult == 3){ //超时
              //去除publish的客户端监听
-             careDevice.setClientId(careDevice.getClientId() +"_publish");
-             mqttConnectorPoolService.removeOne(careDevice);
+             mqttConnectorInstallPoolService.removeInstallOne(careDevice);
              return InstallDevStatusEnum.TIMEOUT.getValue();
          }  else if (careRoomResult == 0 || careRoomSensorResult == 0 || careRoomGateResult == 0 || careRoomRegionResult == 0){ //处理中
              return InstallDevStatusEnum.DOING.getValue();
@@ -404,14 +403,13 @@ public class RoomService {
             //激活
             careDevice.setActiveStatus(DeviceActiveStatusEnum.ACTIVE.getValue());
             //启动通道
-            mqttConnectorPoolService.addOne(careDevice);
+             mqttConnectorInstallPoolService.addInsallOne(careDevice);
             //上线
             careDevice.setStatus(DeviceStatusEnum.ONLINE.getValue());
             careDeviceService.updateById(careDevice);
 
             //去除publish的客户端监听
-            careDevice.setClientId(careDevice.getClientId() +"_publish");
-            mqttConnectorPoolService.removeOne(careDevice);
+             mqttConnectorInstallPoolService.removeInstallOne(careDevice);
 
             return InstallDevStatusEnum.SUCCESS.getValue();
 

+ 83 - 0
src/main/java/com/care/mqtt/service/MqttConnectorInstallPoolService.java

@@ -0,0 +1,83 @@
+package com.care.mqtt.service;
+
+
+import com.care.common.entity.CareDevice;
+import com.care.common.util.CommonConfUtil;
+import com.care.mqtt.tool.MqttDataConnector;
+import com.care.mqtt.tool.MqttDataConnectorCache;
+import com.care.mqtt.tool.MqttMessageListener;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+
+import javax.annotation.Resource;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @Author: stw
+ * @Date: 2021/8/14
+ * @Desc:
+ */
+@Service
+public class MqttConnectorInstallPoolService {
+    @Resource
+    private MqttMsgInstallService mqttMsgInstallService;
+
+
+    /**
+     * 添加一个设备到服务
+     */
+    public MqttDataConnector addInsallOne(CareDevice careDevice) {
+
+        if (StringUtils.isEmpty(careDevice.getTopicInstall()) || StringUtils.isEmpty(careDevice.getClientIdInstall())) {
+            return null;
+        }
+
+        String cliendIdInstall = careDevice.getClientIdInstall();
+
+        MqttDataConnector xbMqttDataConnector = MqttDataConnectorCache.getMqttDataConnectorMap().get(cliendIdInstall);
+        if ( xbMqttDataConnector != null) {
+            return xbMqttDataConnector;
+        }
+
+        xbMqttDataConnector = new MqttDataConnector();
+        Map<String, String> configParams = new HashMap<>();
+        String host = "ws://" + CommonConfUtil.getConf("mqtt.broker") + ":" + CommonConfUtil.getConf("mqtt.port");
+
+        String username = CommonConfUtil.getConf("mqtt.username");
+        String password = CommonConfUtil.getConf("mqtt.password");
+        String topicInstall = careDevice.getTopicInstall();
+        configParams.put("MQTT_HOST", host);
+        configParams.put("MQTT_CLIENTID", cliendIdInstall);
+        configParams.put("MQTT_USERNAME", username);
+        configParams.put("MQTT_PASSWORD", password);
+        try {
+            xbMqttDataConnector.createIotDataSource(configParams);
+            MqttMessageListener mqttMessageListener = new MqttMessageListener();
+            mqttMessageListener.setMqttMsgInstallService(mqttMsgInstallService);
+            xbMqttDataConnector.setReSubscribe(topicInstall, mqttMessageListener);
+
+            MqttDataConnectorCache.getMqttDataConnectorMap().put(cliendIdInstall, xbMqttDataConnector);
+
+            return xbMqttDataConnector;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    /**
+     * 从服务中去除一个设备
+     */
+    public void removeInstallOne(CareDevice careDevice) {
+        String clientIdInstall = careDevice.getClientIdInstall();
+        if (StringUtils.isEmpty(clientIdInstall)) {
+            return;
+        }
+        if (MqttDataConnectorCache.getMqttDataConnectorMap().get(clientIdInstall) != null) {
+            MqttDataConnectorCache.getMqttDataConnectorMap().get(clientIdInstall).destroy();
+            MqttDataConnectorCache.getMqttDataConnectorMap().remove(clientIdInstall);
+        }
+    }
+
+}

+ 6 - 6
src/main/java/com/care/mqtt/service/MqttConnectorPoolService.java

@@ -3,7 +3,7 @@ package com.care.mqtt.service;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.care.mqtt.tool.MqttDataConnector;
 import com.care.mqtt.tool.MqttMessageListener;
-import com.care.mqtt.tool.MqttDataConnectorPool;
+import com.care.mqtt.tool.MqttDataConnectorCache;
 import com.care.common.entity.*;
 import com.care.common.enums.*;
 import com.care.common.service.*;
@@ -46,7 +46,7 @@ public class MqttConnectorPoolService {
 
         String cliendId = careDevice.getClientId();
 
-        MqttDataConnector xbMqttDataConnector = MqttDataConnectorPool.getMqttDataConnectorMap().get(cliendId);
+        MqttDataConnector xbMqttDataConnector = MqttDataConnectorCache.getMqttDataConnectorMap().get(cliendId);
         if ( xbMqttDataConnector != null) {
             return xbMqttDataConnector;
         }
@@ -68,7 +68,7 @@ public class MqttConnectorPoolService {
             mqttMessageListener.setMqttMsgService(mqttMsgService);
             xbMqttDataConnector.setReSubscribe(topic, mqttMessageListener);
 
-            MqttDataConnectorPool.getMqttDataConnectorMap().put(cliendId, xbMqttDataConnector);
+            MqttDataConnectorCache.getMqttDataConnectorMap().put(cliendId, xbMqttDataConnector);
 
             return xbMqttDataConnector;
         } catch (Exception e) {
@@ -85,9 +85,9 @@ public class MqttConnectorPoolService {
         if (StringUtils.isEmpty(clientId)) {
             return;
         }
-        if (MqttDataConnectorPool.getMqttDataConnectorMap().get(clientId) != null) {
-            MqttDataConnectorPool.getMqttDataConnectorMap().get(clientId).destroy();
-            MqttDataConnectorPool.getMqttDataConnectorMap().remove(clientId);
+        if (MqttDataConnectorCache.getMqttDataConnectorMap().get(clientId) != null) {
+            MqttDataConnectorCache.getMqttDataConnectorMap().get(clientId).destroy();
+            MqttDataConnectorCache.getMqttDataConnectorMap().remove(clientId);
         }
     }
 

+ 106 - 0
src/main/java/com/care/mqtt/service/MqttMsgInstallService.java

@@ -0,0 +1,106 @@
+package com.care.mqtt.service;
+
+
+
+import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
+import com.care.common.entity.*;
+
+import com.care.common.service.*;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+/**
+ * @Author: stw
+ * @Date: 2021/8/14
+ * @Desc:
+ */
+@Service
+public class MqttMsgInstallService {
+
+    @Resource
+    private CareDeviceService careDeviceService;
+    @Resource
+    private CareRoomSensorService careRoomSensorService;
+    @Resource
+    private CareRoomService careRoomService;
+    @Resource
+    private CareRoomGateService careRoomGateService;
+    @Resource
+    private CareRoomRegionService careRoomRegionService;
+
+    private static Logger logger = LoggerFactory.getLogger(MqttMsgInstallService.class);
+
+    /**
+     * 处理mqtt消息
+     * @param topicInstall
+     * @param mqttMessageStr
+     * @throws Exception
+     */
+    public void handleMessage(String topicInstall, String mqttMessageStr) throws Exception {
+         //房间信息配置的返回消息
+        handleConfigureRoomInfoResultMessage(topicInstall,mqttMessageStr);
+    }
+
+    /**
+     * 查询配置房间信息的结果消息 处理
+     * @param topicInstall
+     * @param mqttMessageStr
+     */
+    private void handleConfigureRoomInfoResultMessage(String topicInstall, String mqttMessageStr) {
+        Map map = JSON.parseObject(mqttMessageStr);
+        String type = (String) map.get("type");
+        if(StringUtils.isEmpty(type)) {
+            logger.error("不是正确格式的消息");
+            return;
+        }
+        Map msg = (Map) map.get("msg");
+        if(msg == null) {
+            logger.error("不是正确格式的消息");
+            return;
+        }
+        String ack = (String)msg.get("ack");
+        if(StringUtils.isEmpty(ack)) {
+            logger.error("不是激活返回消息");
+            return;
+        }
+
+        logger.info("是激活返回消息: ack == " + ack);
+
+        QueryWrapper<CareDevice> queryWrapper = new QueryWrapper<>();
+        queryWrapper.lambda().eq(CareDevice::getTopicInstall,topicInstall);
+        CareDevice careDevice = careDeviceService.getOne(queryWrapper);
+        if(careDevice == null) {
+            logger.error("数据库中没找到对应设备");
+            return;
+        }
+        String devCode = careDevice.getDevCode();
+
+        if("CfgAreaAck".equals(type)) { //房间信息
+            UpdateWrapper<CareRoom> updateWrapper = new UpdateWrapper<>();
+            updateWrapper.lambda().eq(CareRoom::getRoomId,devCode).set(CareRoom::getAck,ack);
+            careRoomService.update(updateWrapper);
+        } else if("CfgDevicePositionAck".equals(type)) { //传感器
+            UpdateWrapper<CareRoomSensor> updateWrapper = new UpdateWrapper<>();
+            updateWrapper.lambda().eq(CareRoomSensor::getRoomId,devCode).set(CareRoomSensor::getAck,ack);
+            careRoomSensorService.update(updateWrapper);
+        } else if("CfgGatesAck".equals(type)) { //门
+            UpdateWrapper<CareRoomGate> updateWrapper = new UpdateWrapper<>();
+            updateWrapper.lambda().eq(CareRoomGate::getRoomId,devCode).set(CareRoomGate::getAck,ack);
+            careRoomGateService.update(updateWrapper);
+        } else if("CfgRegionsAck".equals(type)) { //区域
+            UpdateWrapper<CareRoomRegion> updateWrapper = new UpdateWrapper<>();
+            updateWrapper.lambda().eq(CareRoomRegion::getRoomId,devCode).set(CareRoomRegion::getAck,ack);
+            careRoomRegionService.update(updateWrapper);
+        }
+    }
+
+
+
+}

+ 3 - 66
src/main/java/com/care/mqtt/service/MqttMsgService.java

@@ -49,14 +49,8 @@ public class MqttMsgService {
     private CareEventOrderChambService careEventOrderChambService;
     @Resource
     private MqttMsgRedisService mqttMsgRedisService;
-    @Resource
-    private CareRoomSensorService careRoomSensorService;
-    @Resource
-    private CareRoomService careRoomService;
-    @Resource
-    private CareRoomGateService careRoomGateService;
-    @Resource
-    private CareRoomRegionService careRoomRegionService;
+
+
 
     private static Logger logger = LoggerFactory.getLogger(MqttMsgService.class);
 
@@ -67,66 +61,9 @@ public class MqttMsgService {
      * @throws Exception
      */
     public void handleMessage(String topic, String mqttMessageStr) throws Exception {
-        if(topic.contains("event")){ //工单的消息
-            handleOrderEventMessage(topic,mqttMessageStr);
-        } else { //房间信息配置的返回消息
-            handleConfigureRoomInfoResultMessage(topic,mqttMessageStr);
-        }
+        handleOrderEventMessage(topic,mqttMessageStr);
     }
 
-    /**
-     * 查询配置房间信息的结果消息 处理
-     * @param topic
-     * @param mqttMessageStr
-     */
-    private void handleConfigureRoomInfoResultMessage(String topic, String mqttMessageStr) {
-        Map map = JSON.parseObject(mqttMessageStr);
-        String type = (String) map.get("type");
-        if(StringUtils.isEmpty(type)) {
-            logger.error("不是正确格式的消息");
-            return;
-        }
-        Map msg = (Map) map.get("msg");
-        if(msg == null) {
-            logger.error("不是正确格式的消息");
-            return;
-        }
-        String ack = (String)msg.get("ack");
-        if(StringUtils.isEmpty(ack)) {
-            logger.error("不是激活返回消息");
-            return;
-        }
-
-        logger.info("是激活返回消息: ack == " + ack);
-
-        QueryWrapper<CareDevice> queryWrapper = new QueryWrapper<>();
-        queryWrapper.lambda().eq(CareDevice::getTopic,topic.replaceAll("control","event"));
-        CareDevice careDevice = careDeviceService.getOne(queryWrapper);
-        if(careDevice == null) {
-            logger.error("数据库中没找到对应设备");
-            return;
-        }
-        String devCode = careDevice.getDevCode();
-
-        if("CfgAreaAck".equals(type)) { //房间信息
-            UpdateWrapper<CareRoom> updateWrapper = new UpdateWrapper<>();
-            updateWrapper.lambda().eq(CareRoom::getRoomId,devCode).set(CareRoom::getAck,ack);
-            careRoomService.update(updateWrapper);
-        } else if("CfgDevicePositionAck".equals(type)) { //传感器
-            UpdateWrapper<CareRoomSensor> updateWrapper = new UpdateWrapper<>();
-            updateWrapper.lambda().eq(CareRoomSensor::getRoomId,devCode).set(CareRoomSensor::getAck,ack);
-            careRoomSensorService.update(updateWrapper);
-        } else if("CfgGatesAck".equals(type)) { //门
-            UpdateWrapper<CareRoomGate> updateWrapper = new UpdateWrapper<>();
-            updateWrapper.lambda().eq(CareRoomGate::getRoomId,devCode).set(CareRoomGate::getAck,ack);
-            careRoomGateService.update(updateWrapper);
-        } else if("CfgRegionsAck".equals(type)) { //区域
-            UpdateWrapper<CareRoomRegion> updateWrapper = new UpdateWrapper<>();
-            updateWrapper.lambda().eq(CareRoomRegion::getRoomId,devCode).set(CareRoomRegion::getAck,ack);
-            careRoomRegionService.update(updateWrapper);
-        }
-
-    }
         /**
          * 工单事件的消息 处理
          * @param topic

+ 6 - 12
src/main/java/com/care/mqtt/service/MqttPublishService.java

@@ -8,7 +8,6 @@ import com.care.common.util.CommonConfUtil;
 import com.care.mqtt.tool.MqttDataConnector;
 
 import com.care.common.entity.CareDevice;
-import com.care.mqtt.tool.MqttDataConnectorPool;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 
@@ -23,7 +22,7 @@ import javax.annotation.Resource;
 @Service
 public class MqttPublishService {
     @Resource
-    private MqttConnectorPoolService mqttConnectorPoolService;
+    private MqttConnectorInstallPoolService mqttConnectorInstallPoolService;
     @Resource
     private RedisUtil redisUtil;
 
@@ -36,20 +35,15 @@ public class MqttPublishService {
             CareDevice careDevicePublish = new CareDevice();
             BeanUtil.copyProperties(careDevice,careDevicePublish);
 
-            String topic = careDevicePublish.getTopic();
-            String cliendId = careDevicePublish.getClientId();
-            if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(cliendId)) {
+            String topicInstall = careDevicePublish.getTopicInstall();
+            String cliendIdInstall = careDevicePublish.getClientIdInstall();
+            if (StringUtils.isEmpty(topicInstall) || StringUtils.isEmpty(cliendIdInstall)) {
                 return false;
             }
 
-            String publishTopic = topic.replace("event","control");
-            String publishCliendId = cliendId + "_publish";
+            MqttDataConnector mqttDataConnector = mqttConnectorInstallPoolService.addInsallOne(careDevicePublish);
 
-            careDevicePublish.setTopic(publishTopic);
-            careDevicePublish.setClientId(publishCliendId);
-            MqttDataConnector mqttDataConnector = mqttConnectorPoolService.addOne(careDevicePublish);
-
-            mqttDataConnector.publish(publishTopic, data);
+            mqttDataConnector.publish(topicInstall, data);
 
             //往redis里设置一个超时时钟,超时之后还没返回就认为超时了
             Long timeout = Long.parseLong(CommonConfUtil.getConf("mqtt.dev.jihuo.timeout"));

+ 1 - 1
src/main/java/com/care/mqtt/tool/MqttDataConnector.java

@@ -23,7 +23,7 @@ public class MqttDataConnector {
      * @param configParams
      * @throws Exception
      */
-    public   void  createIotDataSource(Map<String, String> configParams) throws  Exception {
+    public void  createIotDataSource(Map<String, String> configParams) throws  Exception {
         //访问ip:port  tcp://   or  ssl://
         String host = configParams.get("MQTT_HOST");
         //客户端id 保持唯一

+ 1 - 1
src/main/java/com/care/mqtt/tool/MqttDataConnectorPool.java

@@ -4,7 +4,7 @@ package com.care.mqtt.tool;
 import java.util.HashMap;
 import java.util.Map;
 
-public class MqttDataConnectorPool {
+public class MqttDataConnectorCache {
     private static Map<String, MqttDataConnector> mqttDataConnectorMap = new HashMap<>();
     public static Map<String, MqttDataConnector> getMqttDataConnectorMap() {
         return mqttDataConnectorMap;

+ 11 - 1
src/main/java/com/care/mqtt/tool/MqttMessageListener.java

@@ -1,6 +1,7 @@
 package com.care.mqtt.tool;
 
 
+import com.care.mqtt.service.MqttMsgInstallService;
 import com.care.mqtt.service.MqttMsgService;
 import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
@@ -15,11 +16,15 @@ public class MqttMessageListener implements IMqttMessageListener {
     private static Logger logger = LoggerFactory.getLogger(MqttMsgService.class);
 
     private MqttMsgService mqttMsgService;
+    private MqttMsgInstallService mqttMsgInstallService;
 
     public void setMqttMsgService(MqttMsgService mqttMsgService) {
         this.mqttMsgService = mqttMsgService;
     }
 
+    public void setMqttMsgInstallService(MqttMsgInstallService mqttMsgInstallService) {
+        this.mqttMsgInstallService = mqttMsgInstallService;
+    }
 
     @Override
     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
@@ -34,7 +39,12 @@ public class MqttMessageListener implements IMqttMessageListener {
             logger.error("不是正确格式的消息");
             return;
         }
-        mqttMsgService.handleMessage(topic,mqttMessageStr);
+        if (mqttMsgService != null){
+            mqttMsgService.handleMessage(topic,mqttMessageStr);
+        } else if(mqttMsgInstallService != null){
+            mqttMsgInstallService.handleMessage(topic,mqttMessageStr);
+        }
+
     }
 
 

+ 7 - 1
src/main/resources/common.properties

@@ -16,4 +16,10 @@ mqtt.password=public
 #是否开启mqtt消息接受服务,开启:1,不开启:0
 mqtt.on=1
 #设备激活返回消息超时(秒)
-mqtt.dev.jihuo.timeout=15
+mqtt.dev.jihuo.timeout=15
+
+#设备工作时接受消息的mqtt topic
+mqtt.topic.work.template=5JPD/monitor/XXXXX/event
+
+#设备安装时发送消息的mqtt topic
+mqtt.topic.install.template=5JPD/monintor/XXXXX/control