Pārlūkot izejas kodu

第二版,重构mqtt

suntianwu 3 gadi atpakaļ
vecāks
revīzija
8840d82009

+ 19 - 0
src/main/java/com/care/bms/mqtt/MqttDataConnectorPool.java

@@ -0,0 +1,19 @@
+package com.care.bms.mqtt;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MqttDataConnectorPool {
+    public static Map<String, MqttDataConnector> getMqttDataConnectorMap() {
+        return mqttDataConnectorMap;
+    }
+
+    public static void setMqttDataConnectorMap(Map<String, MqttDataConnector> mqttDataConnectorMap) {
+        MqttDataConnectorPool.mqttDataConnectorMap = mqttDataConnectorMap;
+    }
+
+    private static Map<String, MqttDataConnector> mqttDataConnectorMap = new HashMap<>();
+
+
+}

+ 3 - 19
src/main/java/com/care/bms/mqtt/MqttInit.java

@@ -1,11 +1,7 @@
 package com.care.bms.mqtt;
 
 
-
-
-import com.care.bms.service.BmsEventOrderService;
-
-import com.care.common.service.*;
+import com.care.bms.service.MqttInitService;
 import com.care.common.util.CommonConfUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -21,26 +17,14 @@ public class MqttInit implements CommandLineRunner {
     private static final Logger logger = LogManager.getLogger(MqttInit.class);
 
     @Autowired
-    private CareDeviceService careDeviceService;
-    @Autowired
-    private BmsEventOrderService bmsEventOrderService;
-    @Autowired
-    private CareEventOrderService careEventOrderService;
-    @Autowired
-    private CareHouseService careHouseService;
-    @Autowired
-    private CareOrganizationService careOrganizationService;
-    @Autowired
-    private CareStationService careStationService;
-    @Autowired
-    private CareMqttMsgService careMqttMsgService;
+    private MqttInitService mqttInitService;
 
     @Override
     public void run(String... args) throws Exception {
         String mqttOn = CommonConfUtil.getConf("mqtt.on");
         if ("1".equals(mqttOn)){
             logger.info("初始化MQTTstart ...........................");
-            MqttTool.init(careDeviceService,bmsEventOrderService,careEventOrderService,careHouseService,careOrganizationService,careStationService,careMqttMsgService);
+            mqttInitService.init();
             logger.info("初始化MQTTend ...........................");
         }
     }

+ 12 - 245
src/main/java/com/care/bms/mqtt/MqttMessageListener.java

@@ -1,275 +1,42 @@
 package com.care.bms.mqtt;
 
 
-import cn.hutool.core.date.DateUtil;
-
-import com.alibaba.fastjson.JSON;
-
-import java.util.Date;
-import java.util.Map;
-
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.care.bms.service.BmsEventOrderService;
-import com.care.common.entity.CareDevice;
-import com.care.common.entity.CareEventOrder;
-import com.care.common.entity.CareHouse;
-import com.care.common.entity.CareMqttMsg;
-import com.care.common.enums.OrderStatusEnum;
-import com.care.common.enums.OrderTypeEnum;
-import com.care.common.service.*;
-import com.care.common.util.CommonConfUtil;
+import com.care.bms.service.MqttMsgService;
 import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 
+import java.util.Map;
+
 
 /**
  * 订阅信息监听类
  */
 public class MqttMessageListener implements IMqttMessageListener {
-    private Map<String,Thread> threadMap;
-    private CareDeviceService careDeviceService;
-    private BmsEventOrderService bmsEventOrderService;
-    private CareEventOrderService careEventOrderService;
-    private CareHouseService careHouseService;
-    private CareOrganizationService careOrganizationService;
-    private CareStationService careStationService;
-    private CareMqttMsgService careMqttMsgService;
+    private MqttMsgService mqttMsgService;
 
-    public Map<String, Thread> getThreadMap() {
-        return threadMap;
+    private Map<String,Thread> threadMap;
+    public void setMqttMsgService(MqttMsgService mqttMsgService) {
+        this.mqttMsgService = mqttMsgService;
     }
 
     public void setThreadMap(Map<String, Thread> threadMap) {
         this.threadMap = threadMap;
     }
 
-    public CareMqttMsgService getCareMqttMsgService() {
-        return careMqttMsgService;
-    }
-
-    public void setCareMqttMsgService(CareMqttMsgService careMqttMsgService) {
-        this.careMqttMsgService = careMqttMsgService;
-    }
-
-
-    public CareOrganizationService getCareOrganizationService() {
-        return careOrganizationService;
-    }
-
-    public void setCareOrganizationService(CareOrganizationService careOrganizationService) {
-        this.careOrganizationService = careOrganizationService;
-    }
-
-    public CareStationService getCareStationService() {
-        return careStationService;
-    }
-
-    public void setCareStationService(CareStationService careStationService) {
-        this.careStationService = careStationService;
-    }
-
-    public CareHouseService getCareHouseService() {
-        return careHouseService;
-    }
-
-    public void setCareHouseService(CareHouseService careHouseService) {
-        this.careHouseService = careHouseService;
-    }
-
-
-    public CareEventOrderService getCareEventOrderService() {
-        return careEventOrderService;
-    }
-
-    public void setCareEventOrderService(CareEventOrderService careEventOrderService) {
-        this.careEventOrderService = careEventOrderService;
-    }
-
-
-    public BmsEventOrderService getBmsEventOrderService() {
-        return bmsEventOrderService;
-    }
-
-    public void setBmsEventOrderService(BmsEventOrderService bmsEventOrderService) {
-        this.bmsEventOrderService = bmsEventOrderService;
-    }
-
-    public CareDeviceService getCareDeviceService() {
-        return careDeviceService;
-    }
-
-    public void setCareDeviceService(CareDeviceService careDeviceService) {
-        this.careDeviceService = careDeviceService;
-    }
-
-
     @Override
     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
-        try {
 //        System.out.println(mqttMessage.getId());
 //        System.out.println(mqttMessage.getPayload());
 //        System.out.println(mqttMessage.getQos());
 //        System.out.println(mqttMessage.isRetained());
 //        System.out.println(mqttMessage.isDuplicate());
-
-            String mqttMessageStr = mqttMessage.toString();
-            System.out.println("mqttMessageStr == " + mqttMessageStr);
-            if(mqttMessageStr == null || !mqttMessageStr.startsWith("{")){
-                return;
-            }
-
-            QueryWrapper<CareDevice> queryWrapper = new QueryWrapper<>();
-            queryWrapper.lambda().eq(CareDevice::getTopic,topic);
-            CareDevice careDevice = careDeviceService.getOne(queryWrapper);
-            if(careDevice != null) {
-                CareMqttMsg careMqttMsg = new CareMqttMsg();
-                careMqttMsg.setMqttMessage(mqttMessageStr);
-                careMqttMsg.setTopic(topic);
-                careMqttMsg.setDevId(careDevice.getId());
-                careMqttMsg.setCreateTime(new Date());
-                this.careMqttMsgService.save(careMqttMsg);
-            } else {
-                System.out.println("数据库中没找到对应设备");
-                return;
-            }
-
-            Map map = JSON.parseObject(mqttMessageStr);
-            String type = (String) map.get("type");
-            Map msgMap = (Map)map.get("msg");
-            if ("BreathHeartRate".equals(type)) {
-                Map msg = (Map) map.get("msg");
-                Integer breath = (Integer) msg.get("breath");
-                Integer heart = (Integer) msg.get("heart");
-                careDevice.setBreathRate(breath);
-                careDevice.setHeartRate(heart);
-                careDeviceService.updateById(careDevice);
-            } else if ("FallDown".equals(type)) { //跌到
-                Integer conf = (Integer)msgMap.get("conf");
-                if(conf == 255){ //取消
-                    threadMap.forEach((key,value) -> {
-                        if(key.startsWith("FallDown")) {
-                            if(!value.isInterrupted()){
-                                value.interrupt();//停止线程
-                                threadMap.remove(key);
-                                value = null;
-                            }
-                        }
-                    });
-                } else {
-                    Integer fallResponseTime = careDevice.getFallResponseTime();
-                    MyThread myThread = new MyThread();
-                    myThread.setCareDevice(careDevice);
-                    myThread.setOrderType(OrderTypeEnum.DI_EDAO.getValue());
-                    myThread.setSleepTime(fallResponseTime);
-                    myThread.setDate(DateUtil.date());
-                    threadMap.put("FallDown-" + System.currentTimeMillis(),myThread);
-                    myThread.start();
-                }
-
-
-            } else if ("StayTooLong".equals(type)) { //久滞
-                Integer conf = (Integer)msgMap.get("conf");
-                if(conf == 255){ //取消
-                    threadMap.forEach((key,value) -> {
-                        if(key.startsWith("StayTooLong")) {
-                            if(!value.isInterrupted()){
-                                value.interrupt();//停止线程
-                                threadMap.remove(key);
-                                value = null;
-                            }
-                        }
-                    });
-                } else {
-                    Integer longlagResponseTime = careDevice.getLonglagResponseTime();
-                    MyThread myThread = new MyThread();
-                    myThread.setCareDevice(careDevice);
-                    myThread.setOrderType(OrderTypeEnum.JIU_ZHI.getValue());
-                    myThread.setSleepTime(longlagResponseTime);
-                    myThread.setDate(DateUtil.date());
-                    threadMap.put("StayTooLong-" + System.currentTimeMillis(),myThread);
-                    myThread.start();
-                }
-
-            }
-        } catch (Exception e){
-            e.printStackTrace();
+        String mqttMessageStr = mqttMessage.toString();
+        System.out.println("mqttMessageStr == " + mqttMessageStr);
+        if(mqttMessageStr == null || !mqttMessageStr.startsWith("{")){
+            return;
         }
-        System.out.println(topic);
-    }
-
-    private void saveOrder(CareDevice careDevice,String orderType,Date date){
-        CareHouse careHouse = careHouseService.getById(careDevice.getHouseId());
-
-        CareEventOrder order = new CareEventOrder();
-        order.setOrgId(careHouse.getOrgId());
-        order.setOrgName(careOrganizationService.getById(careHouse.getOrgId()).getName());
-        order.setStationId(careHouse.getStationId());
-        order.setStationName(careStationService.getById(careHouse.getStationId()).getName());
-        order.setHouseId(careHouse.getId());
-        order.setDevId(careDevice.getId());
-        order.setDevName(careDevice.getDevName());
-        order.setHouseName(careHouse.getName());
-        order.setMemberId(careDevice.getMemberId());
-        order.setTitle(careHouse.getAddr());
-        order.setOrderType(orderType);
-        order.setStatus(OrderStatusEnum.TODO.getValue());
-        order.setCreateTime(date);
-        order.setModifyTime(date);
-        careEventOrderService.saveOrder(order);
-
+        mqttMsgService.handleMessage(threadMap,topic,mqttMessageStr);
     }
 
-    class MyThread extends Thread {
-        private CareDevice careDevice;
-        private String orderType;
-        private Integer sleepTime; //分钟
-        private Date date;
-
-        public Date getDate() {
-            return date;
-        }
-
-        public void setDate(Date date) {
-            this.date = date;
-        }
-
-        public CareDevice getCareDevice() {
-            return careDevice;
-        }
-
-        public void setCareDevice(CareDevice careDevice) {
-            this.careDevice = careDevice;
-        }
-
-        public String getOrderType() {
-            return orderType;
-        }
-
-        public void setOrderType(String orderType) {
-            this.orderType = orderType;
-        }
-
-        public Integer getSleepTime() {
-            return sleepTime;
-        }
-
-        public void setSleepTime(Integer sleepTime) {
-            this.sleepTime = sleepTime;
-        }
-
-
-        @Override
-        public void run(){
-            try {
-                if(sleepTime != null && sleepTime > 0){
-                    Thread.sleep(sleepTime * 60 * 1000);
-                }
-                saveOrder(careDevice,orderType,date);
-            } catch (Exception e){
-                e.printStackTrace();
-            }
-
-        }
-    }
 
 }

+ 0 - 118
src/main/java/com/care/bms/mqtt/MqttTool.java

@@ -1,118 +0,0 @@
-package com.care.bms.mqtt;
-
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.care.bms.service.BmsEventOrderService;
-import com.care.common.entity.CareDevice;
-import com.care.common.enums.DeviceActiveStatusEnum;
-import com.care.common.service.*;
-import com.care.common.util.CommonConfUtil;
-import org.springframework.util.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class MqttTool {
-    private static Map<String, MqttDataConnector> mqttDataConnectorMap = new HashMap<>();
-    public static void init(CareDeviceService careDeviceService,
-                            BmsEventOrderService bmsEventOrderService,
-                            CareEventOrderService careEventOrderService,
-                            CareHouseService careHouseService,
-                            CareOrganizationService careOrganizationService,
-                            CareStationService careStationService,
-                            CareMqttMsgService careMqttMsgService) {
-        QueryWrapper<CareDevice> queryWrapper = new QueryWrapper();
-        queryWrapper.lambda().eq(CareDevice::getActiveStatus, DeviceActiveStatusEnum.ACTIVE.getValue());
-        List<CareDevice> listCareDevice = careDeviceService.list(queryWrapper);
-        for (CareDevice careDevice : listCareDevice) {
-            addOne(careDevice,careDeviceService,bmsEventOrderService,careEventOrderService,careHouseService,careOrganizationService,careStationService,careMqttMsgService);
-        }
-    }
-
-    /**
-     * 重新初始化
-     * @param careDeviceService
-     * @param bmsEventOrderService
-     * @param careEventOrderService
-     * @param careHouseService
-     * @param careOrganizationService
-     * @param careStationService
-     */
-    public static void reInit(CareDeviceService careDeviceService,
-                              BmsEventOrderService bmsEventOrderService,
-                              CareEventOrderService careEventOrderService,
-                              CareHouseService careHouseService,
-                              CareOrganizationService careOrganizationService,
-                              CareStationService careStationService,
-                              CareMqttMsgService careMqttMsgService) {
-        mqttDataConnectorMap.forEach((key, value) -> {
-            value.destroy();
-            mqttDataConnectorMap.remove(key);
-        });
-        init(careDeviceService,bmsEventOrderService,careEventOrderService,careHouseService,careOrganizationService,careStationService,careMqttMsgService);
-    }
-
-    /**
-     * 添加一个设备到服务
-     * @param careDeviceService
-     * @param bmsEventOrderService
-     * @param careEventOrderService
-     * @param careHouseService
-     * @param careOrganizationService
-     * @param careStationService
-     */
-    public static void addOne(CareDevice careDevice,
-                              CareDeviceService careDeviceService,
-                              BmsEventOrderService bmsEventOrderService,
-                              CareEventOrderService careEventOrderService,
-                              CareHouseService careHouseService,
-                              CareOrganizationService careOrganizationService,
-                              CareStationService careStationService,
-                              CareMqttMsgService careMqttMsgService) {
-        if (StringUtils.isEmpty(careDevice.getTopic()) || StringUtils.isEmpty(careDevice.getClientId())) {
-            return;
-        }
-        MqttDataConnector xbMqttDataConnector = new MqttDataConnector();
-        Map<String, String> configParams = new HashMap<>();
-        String host = "ws://" + CommonConfUtil.getConf("mqtt.broker") + ":" + CommonConfUtil.getConf("mqtt.port");
-        String cliendId = careDevice.getClientId();
-        String username = CommonConfUtil.getConf("mqtt.username");
-        String password = CommonConfUtil.getConf("mqtt.password");
-        String topic = careDevice.getTopic();
-        configParams.put("MQTT_HOST", host);
-        configParams.put("MQTT_CLIENTID", cliendId);
-        configParams.put("MQTT_USERNAME", username);
-        configParams.put("MQTT_PASSWORD", password);
-        try {
-            xbMqttDataConnector.createIotDataSource(configParams);
-            MqttMessageListener mqttMessageListener = new MqttMessageListener();
-            mqttMessageListener.setCareDeviceService(careDeviceService);
-            mqttMessageListener.setBmsEventOrderService(bmsEventOrderService);
-            mqttMessageListener.setCareEventOrderService(careEventOrderService);
-            mqttMessageListener.setCareHouseService(careHouseService);
-            mqttMessageListener.setCareOrganizationService(careOrganizationService);
-            mqttMessageListener.setCareStationService(careStationService);
-            mqttMessageListener.setCareMqttMsgService(careMqttMsgService);
-            mqttMessageListener.setThreadMap(new HashMap<>());
-
-            xbMqttDataConnector.setReSubscribe(topic, mqttMessageListener);
-            mqttDataConnectorMap.put(cliendId, xbMqttDataConnector);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * 从服务中去除一个设备
-     */
-    public static void removeOne(CareDevice careDevice) {
-        String clientId = careDevice.getClientId();
-        if (StringUtils.isEmpty(clientId)) {
-            return;
-        }
-        if (mqttDataConnectorMap.get(clientId) != null) {
-            mqttDataConnectorMap.get(clientId).destroy();
-            mqttDataConnectorMap.remove(clientId);
-        }
-    }
-}

+ 5 - 14
src/main/java/com/care/bms/service/DeviceService.java

@@ -6,7 +6,6 @@ import cn.hutool.core.util.HexUtil;
 import cn.hutool.core.util.StrUtil;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.care.bms.mqtt.MqttSubscribeTool;
 import com.care.common.entity.*;
 import com.care.common.enums.*;
 import com.care.common.exception.BDException;
@@ -40,13 +39,6 @@ public class DeviceService {
     private CareStationService careStationService;
     @Resource
     private CareDeviceService careDeviceService;
-
-    @Resource
-    private BmsEventOrderService bmsEventOrderService;
-    @Resource
-    private CareEventOrderService careEventOrderService;
-    @Resource
-    private CareOrganizationService careOrganizationService;
     @Resource
     private CareOlderService careOlderService;
     @Resource
@@ -60,15 +52,14 @@ public class DeviceService {
     @Resource
     private CareSysUserService careSysUserService;
     @Resource
-    private CareMqttMsgService careMqttMsgService;
-
+    private MqttInitService mqttInitService;
     /**
      * 删除设备
      * @param id
      * @return
      */
     public boolean deleteById(Long id){
-        MqttSubscribeTool.removeOne(this.careDeviceService.getById(id));
+        mqttInitService.removeOne(this.careDeviceService.getById(id));
         return this.careDeviceService.removeById(id);
     }
 
@@ -160,7 +151,7 @@ public class DeviceService {
         device.setClientId(HexUtil.toHex(time));
         this.careDeviceService.save(device);
         //保存完后,启动通道
-        MqttSubscribeTool.addOne(device, careDeviceService, bmsEventOrderService, careEventOrderService, careHouseService, careOrganizationService, careStationService,careMqttMsgService);
+        mqttInitService.addOne(device);
 
     }
 
@@ -176,7 +167,7 @@ public class DeviceService {
         CareDevice device = this.careDeviceService.getById(vo.getId());
 
         if (DeviceActiveStatusEnum.ACTIVE.getValue().equals(device.getActiveStatus())) {
-            MqttSubscribeTool.removeOne(device);
+            mqttInitService.removeOne(device);
         }
 
         MyBeanUtils.copyProperties(vo, device);
@@ -320,7 +311,7 @@ public class DeviceService {
 
         if(DeviceActiveStatusEnum.ACTIVE.getValue().equals(device.getActiveStatus())) {
             //保存完后,启动通道
-            MqttSubscribeTool.addOne(device, careDeviceService, bmsEventOrderService, careEventOrderService, careHouseService, careOrganizationService, careStationService,careMqttMsgService);
+            mqttInitService.addOne(device);
         }
     }
     /**

+ 84 - 0
src/main/java/com/care/bms/service/MqttInitService.java

@@ -0,0 +1,84 @@
+package com.care.bms.service;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.care.bms.mqtt.MqttDataConnector;
+import com.care.bms.mqtt.MqttMessageListener;
+import com.care.bms.mqtt.MqttDataConnectorPool;
+import com.care.common.entity.*;
+import com.care.common.enums.*;
+import com.care.common.service.*;
+import com.care.common.util.CommonConfUtil;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+
+import javax.annotation.Resource;
+import java.util.*;
+
+/**
+ * @Author: stw
+ * @Date: 2021/8/14
+ * @Desc:
+ */
+@Service
+public class MqttInitService {
+    @Resource
+    private CareDeviceService careDeviceService;
+    @Resource
+    private  MqttMsgService mqttMsgService;
+
+    public void init() {
+        QueryWrapper<CareDevice> queryWrapper = new QueryWrapper();
+        queryWrapper.lambda().eq(CareDevice::getActiveStatus, DeviceActiveStatusEnum.ACTIVE.getValue());
+        List<CareDevice> listCareDevice = careDeviceService.list(queryWrapper);
+        for (CareDevice careDevice : listCareDevice) {
+            addOne(careDevice);
+        }
+    }
+
+    /**
+     * 添加一个设备到服务
+     */
+    public void addOne(CareDevice careDevice) {
+        if (org.springframework.util.StringUtils.isEmpty(careDevice.getTopic()) || org.springframework.util.StringUtils.isEmpty(careDevice.getClientId())) {
+            return;
+        }
+        MqttDataConnector xbMqttDataConnector = new MqttDataConnector();
+        Map<String, String> configParams = new HashMap<>();
+        String host = "ws://" + CommonConfUtil.getConf("mqtt.broker") + ":" + CommonConfUtil.getConf("mqtt.port");
+        String cliendId = careDevice.getClientId();
+        String username = CommonConfUtil.getConf("mqtt.username");
+        String password = CommonConfUtil.getConf("mqtt.password");
+        String topic = careDevice.getTopic();
+        configParams.put("MQTT_HOST", host);
+        configParams.put("MQTT_CLIENTID", cliendId);
+        configParams.put("MQTT_USERNAME", username);
+        configParams.put("MQTT_PASSWORD", password);
+        try {
+            xbMqttDataConnector.createIotDataSource(configParams);
+            MqttMessageListener mqttMessageListener = new MqttMessageListener();
+            mqttMessageListener.setMqttMsgService(mqttMsgService);
+            Map<String,Thread> threadMap = new HashMap<>();
+            mqttMessageListener.setThreadMap(threadMap);
+            xbMqttDataConnector.setReSubscribe(topic, mqttMessageListener);
+
+            MqttDataConnectorPool.getMqttDataConnectorMap().put(cliendId, xbMqttDataConnector);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 从服务中去除一个设备
+     */
+    public void removeOne(CareDevice careDevice) {
+        String clientId = careDevice.getClientId();
+        if (StringUtils.isEmpty(clientId)) {
+            return;
+        }
+        if (MqttDataConnectorPool.getMqttDataConnectorMap().get(clientId) != null) {
+            MqttDataConnectorPool.getMqttDataConnectorMap().get(clientId).destroy();
+            MqttDataConnectorPool.getMqttDataConnectorMap().remove(clientId);
+        }
+    }
+
+}

+ 184 - 0
src/main/java/com/care/bms/service/MqttMsgService.java

@@ -0,0 +1,184 @@
+package com.care.bms.service;
+
+import cn.hutool.core.date.DateUtil;
+import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.care.common.entity.CareDevice;
+import com.care.common.entity.CareEventOrder;
+import com.care.common.entity.CareHouse;
+import com.care.common.entity.CareMqttMsg;
+import com.care.common.enums.OrderStatusEnum;
+import com.care.common.enums.OrderTypeEnum;
+import com.care.common.service.*;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * @Author: stw
+ * @Date: 2021/8/14
+ * @Desc:
+ */
+@Service
+public class MqttMsgService {
+    @Resource
+    private CareHouseService careHouseService;
+    @Resource
+    private CareStationService careStationService;
+    @Resource
+    private CareDeviceService careDeviceService;
+    @Resource
+    private CareEventOrderService careEventOrderService;
+    @Resource
+    private CareOrganizationService careOrganizationService;
+
+    @Resource
+    private CareMqttMsgService careMqttMsgService;
+
+    public void handleMessage(Map<String,Thread> threadMap,String topic, String mqttMessageStr) throws Exception {
+        try {
+            QueryWrapper<CareDevice> queryWrapper = new QueryWrapper<>();
+            queryWrapper.lambda().eq(CareDevice::getTopic,topic);
+            CareDevice careDevice = careDeviceService.getOne(queryWrapper);
+            if(careDevice != null) {
+                CareMqttMsg careMqttMsg = new CareMqttMsg();
+                careMqttMsg.setMqttMessage(mqttMessageStr);
+                careMqttMsg.setTopic(topic);
+                careMqttMsg.setDevId(careDevice.getId());
+                careMqttMsg.setCreateTime(new Date());
+                this.careMqttMsgService.save(careMqttMsg);
+            } else {
+                System.out.println("数据库中没找到对应设备");
+                return;
+            }
+
+            Map map = JSON.parseObject(mqttMessageStr);
+            String type = (String) map.get("type");
+            Map msgMap = (Map)map.get("msg");
+            if ("BreathHeartRate".equals(type)) {
+                Map msg = (Map) map.get("msg");
+                Integer breath = (Integer) msg.get("breath");
+                Integer heart = (Integer) msg.get("heart");
+                careDevice.setBreathRate(breath);
+                careDevice.setHeartRate(heart);
+                careDeviceService.updateById(careDevice);
+            } else if ("FallDown".equals(type)) { //跌到
+                Integer conf = (Integer)msgMap.get("conf");
+                if(conf == 255){ //取消
+                    threadMap.forEach((key,value) -> {
+                        if(key.startsWith("FallDown")) {
+                            if(!value.isInterrupted()){
+                                value.interrupt();//停止线程
+                                threadMap.remove(key);
+                                value = null;
+                            }
+                        }
+                    });
+                } else {
+                    Integer fallResponseTime = careDevice.getFallResponseTime();
+                    MyThread myThread = new MyThread();
+                    myThread.setCareDevice(careDevice);
+                    myThread.setOrderType(OrderTypeEnum.DI_EDAO.getValue());
+                    myThread.setSleepTime(fallResponseTime);
+                    myThread.setDate(DateUtil.date());
+                    threadMap.put("FallDown-" + System.currentTimeMillis(),myThread);
+                    myThread.start();
+                }
+
+
+            } else if ("StayTooLong".equals(type)) { //久滞
+                Integer conf = (Integer)msgMap.get("conf");
+                if(conf == 255){ //取消
+                    threadMap.forEach((key,value) -> {
+                        if(key.startsWith("StayTooLong")) {
+                            if(!value.isInterrupted()){
+                                value.interrupt();//停止线程
+                                threadMap.remove(key);
+                                value = null;
+                            }
+                        }
+                    });
+                } else {
+                    Integer longlagResponseTime = careDevice.getLonglagResponseTime();
+                    MyThread myThread = new MyThread();
+                    myThread.setCareDevice(careDevice);
+                    myThread.setOrderType(OrderTypeEnum.JIU_ZHI.getValue());
+                    myThread.setSleepTime(longlagResponseTime);
+                    myThread.setDate(DateUtil.date());
+                    threadMap.put("StayTooLong-" + System.currentTimeMillis(),myThread);
+                    myThread.start();
+                }
+
+            }
+        } catch (Exception e){
+            e.printStackTrace();
+        }
+        System.out.println(topic);
+    }
+
+    private void saveOrder(CareDevice careDevice,String orderType,Date date){
+        CareHouse careHouse = careHouseService.getById(careDevice.getHouseId());
+
+        CareEventOrder order = new CareEventOrder();
+        order.setOrgId(careHouse.getOrgId());
+        order.setOrgName(careOrganizationService.getById(careHouse.getOrgId()).getName());
+        order.setStationId(careHouse.getStationId());
+        order.setStationName(careStationService.getById(careHouse.getStationId()).getName());
+        order.setHouseId(careHouse.getId());
+        order.setDevId(careDevice.getId());
+        order.setDevName(careDevice.getDevName());
+        order.setHouseName(careHouse.getName());
+        order.setMemberId(careDevice.getMemberId());
+        order.setTitle(careHouse.getAddr());
+        order.setOrderType(orderType);
+        order.setStatus(OrderStatusEnum.TODO.getValue());
+        order.setCreateTime(date);
+        order.setModifyTime(date);
+        careEventOrderService.saveOrder(order);
+    }
+
+    class MyThread extends Thread {
+        private CareDevice careDevice;
+        private String orderType;
+        private Integer sleepTime; //分钟
+        private Date date;
+
+        public Date getDate() {
+            return date;
+        }
+
+        public void setDate(Date date) {
+            this.date = date;
+        }
+
+        public void setCareDevice(CareDevice careDevice) {
+            this.careDevice = careDevice;
+        }
+
+        public void setOrderType(String orderType) {
+            this.orderType = orderType;
+        }
+
+        public void setSleepTime(Integer sleepTime) {
+            this.sleepTime = sleepTime;
+        }
+
+
+        @Override
+        public void run(){
+            try {
+                if(sleepTime != null && sleepTime > 0){
+                    Thread.sleep(sleepTime * 60 * 1000);
+                }
+                saveOrder(careDevice,orderType,date);
+            } catch (Exception e){
+                e.printStackTrace();
+            }
+
+        }
+
+    }
+
+}