Przeglądaj źródła

第二版,后台bug

suntianwu 3 lat temu
rodzic
commit
53dbea9862

+ 3 - 3
src/main/java/com/care/CareSpringStart.java

@@ -1,6 +1,6 @@
 package com.care;
 
-import com.care.bms.mqtt.MqttInit;
+import com.care.common.init.InitRunner;
 import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -38,8 +38,8 @@ public class CareSpringStart {
     }
 
     @Bean
-    public MqttInit mqttInit() {
-        return new MqttInit();
+    public InitRunner mqttInit() {
+        return new InitRunner();
     }
 }
 

+ 7 - 6
src/main/java/com/care/bms/mqtt/MqttMessageListener.java

@@ -4,6 +4,8 @@ package com.care.bms.mqtt;
 import com.care.bms.service.MqttMsgService;
 import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
@@ -12,16 +14,14 @@ import java.util.Map;
  * 订阅信息监听类
  */
 public class MqttMessageListener implements IMqttMessageListener {
+    private static Logger logger = LoggerFactory.getLogger(MqttMsgService.class);
+
     private MqttMsgService mqttMsgService;
 
-    private Map<String,Thread> threadMap;
     public void setMqttMsgService(MqttMsgService mqttMsgService) {
         this.mqttMsgService = mqttMsgService;
     }
 
-    public void setThreadMap(Map<String, Thread> threadMap) {
-        this.threadMap = threadMap;
-    }
 
     @Override
     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
@@ -31,11 +31,12 @@ public class MqttMessageListener implements IMqttMessageListener {
 //        System.out.println(mqttMessage.isRetained());
 //        System.out.println(mqttMessage.isDuplicate());
         String mqttMessageStr = mqttMessage.toString();
-        System.out.println("mqttMessageStr == " + mqttMessageStr);
+        logger.info("mqttMessageStr == " + mqttMessageStr);
         if(mqttMessageStr == null || !mqttMessageStr.startsWith("{")){
+            logger.error("不是正确格式的消息");
             return;
         }
-        mqttMsgService.handleMessage(threadMap,topic,mqttMessageStr);
+        mqttMsgService.handleMessage(topic,mqttMessageStr);
     }
 
 

+ 1 - 1
src/main/java/com/care/bms/service/BmsEventOrderService.java

@@ -590,7 +590,7 @@ public class BmsEventOrderService {
         order.setStatus("0");
         order.setCreateTime(DateUtil.date());
         order.setModifyTime(DateUtil.date());
-        this.careEventOrderService.saveOrder(order,null);
+        this.careEventOrderService.saveOrder(order);
     }
 
 

+ 5 - 5
src/main/java/com/care/bms/service/DeviceService.java

@@ -52,14 +52,14 @@ public class DeviceService {
     @Resource
     private CareSysUserService careSysUserService;
     @Resource
-    private MqttInitService mqttInitService;
+    private MqttConnectorPoolInitService mqttConnectorPoolInitService;
     /**
      * 删除设备
      * @param id
      * @return
      */
     public boolean deleteById(Long id){
-        mqttInitService.removeOne(this.careDeviceService.getById(id));
+        mqttConnectorPoolInitService.removeOne(this.careDeviceService.getById(id));
         return this.careDeviceService.removeById(id);
     }
 
@@ -151,7 +151,7 @@ public class DeviceService {
         device.setClientId(HexUtil.toHex(time));
         this.careDeviceService.save(device);
         //保存完后,启动通道
-        mqttInitService.addOne(device);
+        mqttConnectorPoolInitService.addOne(device);
 
     }
 
@@ -167,7 +167,7 @@ public class DeviceService {
         CareDevice device = this.careDeviceService.getById(vo.getId());
 
         if (DeviceActiveStatusEnum.ACTIVE.getValue().equals(device.getActiveStatus())) {
-            mqttInitService.removeOne(device);
+            mqttConnectorPoolInitService.removeOne(device);
         }
 
         MyBeanUtils.copyProperties(vo, device);
@@ -311,7 +311,7 @@ public class DeviceService {
 
         if(DeviceActiveStatusEnum.ACTIVE.getValue().equals(device.getActiveStatus())) {
             //保存完后,启动通道
-            mqttInitService.addOne(device);
+            mqttConnectorPoolInitService.addOne(device);
         }
     }
     /**

+ 8 - 3
src/main/java/com/care/bms/service/MqttInitService.java

@@ -20,7 +20,7 @@ import java.util.*;
  * @Desc:
  */
 @Service
-public class MqttInitService {
+public class MqttConnectorPoolInitService {
     @Resource
     private CareDeviceService careDeviceService;
     @Resource
@@ -57,11 +57,13 @@ public class MqttInitService {
             xbMqttDataConnector.createIotDataSource(configParams);
             MqttMessageListener mqttMessageListener = new MqttMessageListener();
             mqttMessageListener.setMqttMsgService(mqttMsgService);
-            Map<String,Thread> threadMap = new HashMap<>();
-            mqttMessageListener.setThreadMap(threadMap);
             xbMqttDataConnector.setReSubscribe(topic, mqttMessageListener);
 
             MqttDataConnectorPool.getMqttDataConnectorMap().put(cliendId, xbMqttDataConnector);
+
+            //设备上线
+            careDevice.setStatus(DeviceStatusEnum.ONLINE.getValue());
+
         } catch (Exception e) {
             e.printStackTrace();
         }
@@ -79,6 +81,9 @@ public class MqttInitService {
             MqttDataConnectorPool.getMqttDataConnectorMap().get(clientId).destroy();
             MqttDataConnectorPool.getMqttDataConnectorMap().remove(clientId);
         }
+
+        //设备下线
+        careDevice.setStatus(DeviceStatusEnum.OFFLINE.getValue());
     }
 
 }

+ 170 - 137
src/main/java/com/care/bms/service/MqttMsgService.java

@@ -1,21 +1,27 @@
 package com.care.bms.service;
 
+
 import cn.hutool.core.date.DateUtil;
 import com.alibaba.fastjson.JSON;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.care.bigscreen.service.BigScreenService;
-import com.care.common.entity.CareDevice;
-import com.care.common.entity.CareEventOrder;
-import com.care.common.entity.CareHouse;
-import com.care.common.entity.CareMqttMsg;
+
+import com.care.common.entity.*;
+
+import com.care.common.enums.ChambOrderStatusEnum;
+import com.care.common.enums.MqttMsgStatusEnum;
 import com.care.common.enums.OrderStatusEnum;
 import com.care.common.enums.OrderTypeEnum;
 import com.care.common.service.*;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
 import java.util.Date;
-import java.util.List;
+
 import java.util.Map;
 
 /**
@@ -39,167 +45,194 @@ public class MqttMsgService {
     private BigScreenService bigScreenService;
     @Resource
     private CareMqttMsgService careMqttMsgService;
+    @Resource
+    private CareEventOrderChambService careEventOrderChambService;
+    @Resource
+    private MqttThreadPoolInitService MqttThreadPoolInitService;
 
+    private static Logger logger = LoggerFactory.getLogger(MqttMsgService.class);
 
-    public void handleMessage(Map<String,Thread> threadMap,String topic, String mqttMessageStr) throws Exception {
+    /**
+     * 处理mqtt消息
+     * @param topic
+     * @param mqttMessageStr
+     * @throws Exception
+     */
+    public void handleMessage(String topic, String mqttMessageStr) throws Exception {
         try {
+
+            Map map = JSON.parseObject(mqttMessageStr);
+            String type = (String) map.get("type");
+
+            if(StringUtils.isEmpty(type)){
+                logger.error("不是正确格式的消息");
+                return;
+            }
             QueryWrapper<CareDevice> queryWrapper = new QueryWrapper<>();
             queryWrapper.lambda().eq(CareDevice::getTopic,topic);
             CareDevice careDevice = careDeviceService.getOne(queryWrapper);
-            if(careDevice != null) {
-                CareMqttMsg careMqttMsg = new CareMqttMsg();
-                careMqttMsg.setMqttMessage(mqttMessageStr);
-                careMqttMsg.setTopic(topic);
-                careMqttMsg.setDevId(careDevice.getId());
-                careMqttMsg.setCreateTime(new Date());
-                this.careMqttMsgService.save(careMqttMsg);
-            } else {
-                System.out.println("数据库中没找到对应设备");
+            if(careDevice == null) {
+                logger.error("数据库中没找到对应设备");
                 return;
             }
+            Date date = DateUtil.date();
+
+            CareMqttMsg careMqttMsg = new CareMqttMsg();
+            careMqttMsg.setMqttMessage(mqttMessageStr);
+            careMqttMsg.setTopic(topic);
+            careMqttMsg.setDevId(careDevice.getId());
+            careMqttMsg.setType(type);
+            careMqttMsg.setCreateTime(date);
 
-            Map map = JSON.parseObject(mqttMessageStr);
-            String type = (String) map.get("type");
-            Map msgMap = (Map)map.get("msg");
             if ("BreathHeartRate".equals(type)) {
+                careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
+                this.careMqttMsgService.save(careMqttMsg);
+
                 Map msg = (Map) map.get("msg");
                 Integer breath = (Integer) msg.get("breath");
                 Integer heart = (Integer) msg.get("heart");
                 careDevice.setBreathRate(breath);
                 careDevice.setHeartRate(heart);
                 careDeviceService.updateById(careDevice);
-            } else if ("FallDown".equals(type)) { //跌到
-                Integer conf = (Integer)msgMap.get("conf");
-                if(conf == 255){ //取消
-                    threadMap.forEach((key,value) -> {
-                        if(key.startsWith("FallDown")) {
-                            if(!value.isInterrupted()){
-                                value.interrupt();//停止线程
-                                threadMap.remove(key);
-                                value = null;
-                            }
-                        }
-                    });
-                } else {
-                    Integer fallResponseTime = careDevice.getFallResponseTime();
-                    MyThread myThread = new MyThread();
-                    myThread.setCareDevice(careDevice);
-                    myThread.setOrderType(OrderTypeEnum.DI_EDAO.getValue());
-                    myThread.setSleepTime(fallResponseTime);
-                    myThread.setDate(DateUtil.date());
-                    threadMap.put("FallDown-" + System.currentTimeMillis(),myThread);
-                    myThread.start();
-                }
-
-
-            } else if ("StayTooLong".equals(type)) { //久滞
-                Integer conf = (Integer)msgMap.get("conf");
-                if(conf == 255){ //取消
-                    threadMap.forEach((key,value) -> {
-                        if(key.startsWith("StayTooLong")) {
-                            if(!value.isInterrupted()){
-                                value.interrupt();//停止线程
-                                threadMap.remove(key);
-                                value = null;
-                            }
-                        }
-                    });
-                } else {
-                    Integer longlagResponseTime = careDevice.getLonglagResponseTime();
-                    MyThread myThread = new MyThread();
-                    myThread.setCareDevice(careDevice);
-                    myThread.setOrderType(OrderTypeEnum.JIU_ZHI.getValue());
-                    myThread.setSleepTime(longlagResponseTime);
-                    myThread.setDate(DateUtil.date());
-                    threadMap.put("StayTooLong-" + System.currentTimeMillis(),myThread);
-                    myThread.start();
-                }
-
+                return;
             }
-        } catch (Exception e){
-            e.printStackTrace();
-        }
-        System.out.println(topic);
-    }
-
-    private void saveOrder(CareDevice careDevice,String orderType,Date date){
-        CareHouse careHouse = careHouseService.getById(careDevice.getHouseId());
-
-        CareEventOrder order = new CareEventOrder();
-        order.setOrgId(careHouse.getOrgId());
-        order.setOrgName(careOrganizationService.getById(careHouse.getOrgId()).getName());
-        order.setStationId(careHouse.getStationId());
-        order.setStationName(careStationService.getById(careHouse.getStationId()).getName());
-        order.setHouseId(careHouse.getId());
-        order.setDevId(careDevice.getId());
-        order.setDevName(careDevice.getDevName());
-        order.setHouseName(careHouse.getName());
-        order.setMemberId(careDevice.getMemberId());
-        order.setTitle(careHouse.getAddr());
-        order.setOrderType(orderType);
-        order.setStatus(OrderStatusEnum.TODO.getValue());
-        order.setCreateTime(date);
-        order.setModifyTime(date);
-
-        //判断房屋是否有未完成的告警事件
-        QueryWrapper<CareEventOrder> queryWrapper = new QueryWrapper<>();
-        queryWrapper.lambda().eq(CareEventOrder::getHouseId,order.getHouseId()).and(
-                wrapper-> wrapper.eq(CareEventOrder::getStatus, OrderStatusEnum.TODO.getValue())
-                .or().eq(CareEventOrder::getStatus,OrderStatusEnum.DOING.getValue()));
-
-        CareEventOrder orderDb = careEventOrderService.getOne(queryWrapper);
-        careEventOrderService.saveOrder(order,orderDb);
-        if(orderDb != null){//有,不再生成新的告警工单,只作为子事件插入到当前工单的历史记录中。
-            //通知页面
-            this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate");
-        } else {  //新增,生成新的告警事件工单
-            //通知页面
-            this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderAdd");
-        }
-
-        this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
-    }
 
-    class MyThread extends Thread {
-        private CareDevice careDevice;
-        private String orderType;
-        private Integer sleepTime; //分钟
-        private Date date;
-
-        public Date getDate() {
-            return date;
-        }
+            if( careDevice.getHouseId() == null ) {
+                logger.error("设备没有配置房屋地址");
+                return;
+            }
 
-        public void setDate(Date date) {
-            this.date = date;
-        }
+            CareEventOrder order = getNewOrder(careDevice);
+            order.setCreateTime(date);
+            order.setModifyTime(date);
 
-        public void setCareDevice(CareDevice careDevice) {
-            this.careDevice = careDevice;
-        }
+            Map msgMap = (Map)map.get("msg");
+            Integer conf = (Integer)msgMap.get("conf");
+            if(conf != 255) {
+                order.setStatus(OrderStatusEnum.TODO.getValue());
+            } else {//取消
+                order.setStatus(OrderStatusEnum.CANCEL.getValue());
+            }
 
-        public void setOrderType(String orderType) {
-            this.orderType = orderType;
-        }
+            if ("FallDown".equals(type)) { //跌到
+                order.setOrderType(OrderTypeEnum.DI_EDAO.getValue());
+            } else if("StayTooLong".equals(type)){
+                order.setOrderType(OrderTypeEnum.JIU_ZHI.getValue());
+            }
 
-        public void setSleepTime(Integer sleepTime) {
-            this.sleepTime = sleepTime;
-        }
+            //判断房屋是否有未完成的告警事件,不区分类型
+            QueryWrapper<CareEventOrder> queryWrapper2 = new QueryWrapper<>();
+            queryWrapper2.lambda().eq(CareEventOrder::getHouseId,careDevice.getHouseId()).and(
+                    wrapper-> wrapper.eq(CareEventOrder::getStatus, OrderStatusEnum.TODO.getValue())
+                            .or().eq(CareEventOrder::getStatus,OrderStatusEnum.DOING.getValue()));
+            CareEventOrder orderDb = careEventOrderService.getOne(queryWrapper2);
+
+            if(orderDb != null) { //有未完成的告警事件,不再生成新的告警工单,只作为子事件插入到当前工单的历史记录中。
+                if(conf != 255) {
+                    careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
+                    this.careMqttMsgService.save(careMqttMsg);
+
+                    careEventOrderService.saveHisOrder(order,orderDb);
+                    //通知页面
+                    this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate");
+                    this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
+                } else { //取消
+                    careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
+                    this.careMqttMsgService.save(careMqttMsg);
+
+                    if(orderDb.getOrderType().equals(order.getOrderType())){//看取消的类型,和当前一致,则 更新当前的工单状态为取消,记录一条取消his,管家工单标记为取消
+                        //更新当前的工单状态为取消
+                        orderDb.setStatus(OrderStatusEnum.CANCEL.getValue());
+                        this.careEventOrderService.updateById(orderDb);
+
+                         //管家工单标记为取消
+                        CareEventOrderChamb careEventOrderChamb = this.careEventOrderChambService.getChambOrderByOrderId(orderDb.getId());
+                        careEventOrderChamb.setStatus(ChambOrderStatusEnum.CANCEL.getValue());
+                        this.careEventOrderChambService.updateById(careEventOrderChamb);
+                        //记录一条取消his
+                        careEventOrderService.saveHisOrder(order,orderDb);
+                        //通知页面
+                        this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate");
+                        this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
+                    } else { // 不一致,则只记录一条取消his
+                        //记录一条取消his
+                        careEventOrderService.saveHisOrder(order,orderDb);
+                    }
 
+                }
 
-        @Override
-        public void run(){
-            try {
-                if(sleepTime != null && sleepTime > 0){
-                    Thread.sleep(sleepTime * 60 * 1000);
+            } else { //无未完成的告警事件
+                Integer delayTime = getDelayTime(type,careDevice);
+                if (delayTime == null || delayTime == 0){ //没有配置响应时间,立即生成新的告警事件工单
+                    if(conf != 255) {
+                        careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
+                        this.careMqttMsgService.save(careMqttMsg);
+
+                        //生成新的工单
+                        careEventOrderService.saveOrder(order);
+                        //通知页面
+                        this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderAdd");
+                        this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
+                    } else { //取消 不处理工单
+                        careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
+                        this.careMqttMsgService.save(careMqttMsg);
+                    }
+
+                } else { //有配置响应时间,不处理工单,进行调度处理
+                    if(conf != 255) { //不是取消
+                        careMqttMsg.setStatus(MqttMsgStatusEnum.NO_HANDLE.getValue());
+                        careMqttMsg.setDelayTime(delayTime);
+                        this.careMqttMsgService.save(careMqttMsg);
+
+                        //进行调度处理
+                        MqttThreadPoolInitService.addScheduled(careMqttMsg,careMqttMsg.getDelayTime() * 60 * 1000);
+                    } else { //取消: 更新mqtt消息为已取消,等待执行的调度到时会取消执行
+                        careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
+                        this.careMqttMsgService.save(careMqttMsg);
+
+                        //更新mqtt消息为已取消
+                        UpdateWrapper<CareMqttMsg> updateWrapper = new UpdateWrapper<>();
+                        updateWrapper.lambda().eq(CareMqttMsg::getDevId,careDevice.getId()).eq(CareMqttMsg::getType,type).eq(CareMqttMsg::getStatus,MqttMsgStatusEnum.NO_HANDLE.getValue())
+                                .set(CareMqttMsg::getStatus,MqttMsgStatusEnum.CANCEL.getValue());
+                        this.careMqttMsgService.update(updateWrapper);
+                    }
                 }
-                saveOrder(careDevice,orderType,date);
-            } catch (Exception e){
-                e.printStackTrace();
             }
 
+        } catch (Exception e){
+            e.printStackTrace();
         }
+    }
 
+
+    private Integer getDelayTime(String type, CareDevice careDevice) {
+        if ("FallDown".equals(type)) { //跌到
+            return careDevice.getFallResponseTime();
+        } else if ("StayTooLong".equals(type)) { //久滞
+            return careDevice.getLonglagResponseTime();
+        } else {
+            return 0;
+        }
     }
 
+  public CareEventOrder getNewOrder(CareDevice careDevice) {
+      CareHouse careHouse = careHouseService.getById(careDevice.getHouseId());
+      if (careHouse == null){
+          logger.error("设备没有配置房屋地址");
+          return null;
+      }
+      CareEventOrder order = new CareEventOrder();
+      order.setOrgId(careDevice.getOrgId());
+      order.setOrgName(careOrganizationService.getById(careDevice.getOrgId()).getName());
+      order.setStationId(careDevice.getStationId());
+      order.setStationName(careStationService.getById(careDevice.getStationId()).getName());
+      order.setHouseId(careHouse.getId());
+      order.setDevId(careDevice.getId());
+      order.setDevName(careDevice.getDevName());
+      order.setHouseName(careHouse.getName());
+      order.setMemberId(careDevice.getMemberId());
+      order.setTitle(careHouse.getAddr());
+      return order;
+  }
+
 }

+ 142 - 0
src/main/java/com/care/bms/service/MqttThreadPoolInitService.java

@@ -0,0 +1,142 @@
+package com.care.bms.service;
+
+
+import cn.hutool.core.collection.CollUtil;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.care.bigscreen.service.BigScreenService;
+import com.care.common.entity.CareDevice;
+import com.care.common.entity.CareEventOrder;
+import com.care.common.entity.CareMqttMsg;
+import com.care.common.enums.*;
+import com.care.common.service.CareDeviceService;
+
+import com.care.common.service.CareEventOrderService;
+import com.care.common.service.CareMqttMsgService;
+import org.springframework.stereotype.Service;
+
+
+import javax.annotation.Resource;
+import java.util.Date;
+import java.util.List;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @Author: stw
+ * @Date: 2021/8/14
+ * @Desc:
+ */
+@Service
+public class MqttThreadPoolInitService {
+    @Resource
+    private CareDeviceService careDeviceService;
+    @Resource
+    private CareMqttMsgService careMqttMsgService;
+    @Resource
+    private CareEventOrderService careEventOrderService;
+    @Resource
+    private MqttMsgService mqttMsgService;
+    @Resource
+    private BigScreenService bigScreenService;
+    @Resource
+    private ScheduledThreadPoolExecutor scheduledThreadPool;
+
+    public ScheduledThreadPoolExecutor getScheduledThreadPool() {
+        return scheduledThreadPool;
+    }
+
+    /**
+     * 读取数据库mqtt msg表,把有延迟标记、未处理的消息生成调度任务放入线程池
+     */
+    public void init() {
+        QueryWrapper<CareDevice> queryWrapper = new QueryWrapper();
+        queryWrapper.lambda().eq(CareDevice::getActiveStatus, DeviceActiveStatusEnum.ACTIVE.getValue());
+        int devCount = careDeviceService.count(queryWrapper);
+
+        if (devCount < 5) {
+            devCount = 5;
+        }
+        //创建一个定长线程池,支持定时任务执行——延迟执行
+         scheduledThreadPool = new ScheduledThreadPoolExecutor(devCount);
+
+        //查询mqtt msg 表有延迟标记的未处理消息,放入线程池,等待延时时间结束执行创建工单
+        QueryWrapper<CareMqttMsg> queryWrapper2 = new QueryWrapper<>();
+        queryWrapper2.lambda().eq(CareMqttMsg::getStatus, MqttMsgStatusEnum.NO_HANDLE.getValue()).gt(CareMqttMsg::getDelayTime,0)
+        .orderByAsc(CareMqttMsg::getCreateTime);
+
+        List<CareMqttMsg> mqttMsgList = careMqttMsgService.list(queryWrapper2);
+        if (CollUtil.isNotEmpty(mqttMsgList)) {
+            for(CareMqttMsg careMqttMsg : mqttMsgList){
+
+                Long now = System.currentTimeMillis();
+                Long createTime = careMqttMsg.getCreateTime().getTime();
+                Long delayTime = careMqttMsg.getDelayTime() * 60 * 1000L;
+                Long diff = delayTime - (now - createTime);
+                if(diff <= 0) { //立即执行
+                    exe(careMqttMsg);
+                    continue;
+                }
+                addScheduled(careMqttMsg,diff.intValue());
+            }
+
+        }
+    }
+
+    public void addScheduled(CareMqttMsg careMqttMsg,int delay){
+
+       //executor.setCorePoolSize(newPoolSize);//todo
+
+        //延迟 diff 毫秒执行
+        scheduledThreadPool.schedule(new Runnable() {
+            public void run() {
+                exe(careMqttMsg);
+            }
+        }, delay, TimeUnit.MILLISECONDS);
+
+    }
+
+    private void exe(CareMqttMsg careMqttMsg){
+        String status = this.careMqttMsgService.getById(careMqttMsg).getStatus();
+        if(!MqttMsgStatusEnum.NO_HANDLE.getValue().equals(status)){ //不是未处理,不执行。
+            return;
+        }
+
+        CareDevice careDevice = careDeviceService.getById(careMqttMsg.getDevId());
+        CareEventOrder order = mqttMsgService.getNewOrder(careDevice);
+        order.setCreateTime(new Date());
+        order.setModifyTime(new Date());
+        order.setStatus(OrderStatusEnum.TODO.getValue());
+        if ("FallDown".equals(careMqttMsg.getType())) { //跌到
+            order.setOrderType(OrderTypeEnum.DI_EDAO.getValue());
+        } else if("StayTooLong".equals(careMqttMsg.getType())){ //久滞
+            order.setOrderType(OrderTypeEnum.JIU_ZHI.getValue());
+        }
+
+        //判断房屋是否有未完成的告警事件,不区分类型
+        QueryWrapper<CareEventOrder> queryWrapper2 = new QueryWrapper<>();
+        queryWrapper2.lambda().eq(CareEventOrder::getHouseId,careDevice.getHouseId()).and(
+                wrapper-> wrapper.eq(CareEventOrder::getStatus, OrderStatusEnum.TODO.getValue())
+                        .or().eq(CareEventOrder::getStatus,OrderStatusEnum.DOING.getValue()));
+
+        CareEventOrder orderDb = careEventOrderService.getOne(queryWrapper2);
+        if(orderDb != null) { //有未完成的告警事件,不再生成新的告警工单,只作为子事件插入到当前工单的历史记录中。
+            careEventOrderService.saveHisOrder(order,orderDb);
+            //通知页面
+            this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate");
+            this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
+        }  else { //无未完成的告警事件
+            //生成新的工单
+            careEventOrderService.saveOrder(order);
+            //通知页面
+            this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderAdd");
+            this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
+        }
+
+        careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
+        careMqttMsg.setModifyTime(new Date());
+        this.careMqttMsgService.updateById(careMqttMsg);
+    }
+
+}

+ 15 - 0
src/main/java/com/care/common/entity/CareMqttMsg.java

@@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import io.swagger.models.auth.In;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.experimental.Accessors;
@@ -41,14 +42,28 @@ public class CareMqttMsg implements Serializable {
     @TableField("TOPIC")
     private String topic;
 
+    @ApiModelProperty("消息类型:呼吸心率信息 BreathHeartRate,跌到 FallDown,久滞 StayTooLong")
+    @TableField("TYPE")
+    private String type;
+
 
     @ApiModelProperty("设备ID")
     @TableField("DEV_ID")
     private Long devId;
 
+    @ApiModelProperty("状态:0,未处理;1,已取消;2,已处理")
+    @TableField("STATUS")
+    private String status;
+
+    @ApiModelProperty("延迟时间(单位:分钟)")
+    @TableField("DELAY_TIME")
+    private Integer delayTime;
 
     @ApiModelProperty("createTime")
     @TableField("CREATE_TIME")
     private Date createTime;
 
+    @ApiModelProperty("modifyTime")
+    @TableField("MODIFY_TIME")
+    private Date modifyTime;
 }

+ 43 - 0
src/main/java/com/care/common/enums/MqttMsgStatusEnum.java

@@ -0,0 +1,43 @@
+package com.care.common.enums;
+
+
+import cn.hutool.core.util.StrUtil;
+
+/**
+ * @Author:java
+ * @Date: 2021/05/27
+ */
+public enum MqttMsgStatusEnum {
+    /**
+     *
+     */
+    NO_HANDLE("0","未处理"),
+    CANCEL("1","已取消"),
+    HANDLED("2","已处理");
+    private String value;
+    private String name;
+    MqttMsgStatusEnum(String value, String name) {
+        this.value = value;
+        this.name = name;
+    }
+    public String getValue() {
+        return value;
+    }
+    public String getName() {
+        return name;
+    }
+    public static String getCodeToName(String code){
+        if (StrUtil.isEmpty(code)){
+            return null;
+        }
+        if (NO_HANDLE.getValue().equals(code)){
+            return  NO_HANDLE.name;
+        }else if (CANCEL.getValue().equals(code)){
+            return  CANCEL.name;
+        }else if (HANDLED.getValue().equals(code)){
+            return  HANDLED.name;
+        }
+
+        return null;
+    }
+}

+ 10 - 6
src/main/java/com/care/bms/mqtt/MqttInit.java

@@ -1,7 +1,8 @@
-package com.care.bms.mqtt;
+package com.care.common.init;
 
 
-import com.care.bms.service.MqttInitService;
+import com.care.bms.service.MqttConnectorPoolInitService;
+import com.care.bms.service.MqttThreadPoolInitService;
 import com.care.common.util.CommonConfUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -12,19 +13,22 @@ import org.springframework.core.annotation.Order;
 
 
 @Order(1)
-public class MqttInit implements CommandLineRunner {
+public class InitRunner implements CommandLineRunner {
 
-    private static final Logger logger = LogManager.getLogger(MqttInit.class);
+    private static final Logger logger = LogManager.getLogger(InitRunner.class);
 
     @Autowired
-    private MqttInitService mqttInitService;
+    private MqttConnectorPoolInitService mqttConnectorPoolInitService;
+    @Autowired
+    private MqttThreadPoolInitService mqttThreadPoolInitService;
 
     @Override
     public void run(String... args) throws Exception {
         String mqttOn = CommonConfUtil.getConf("mqtt.on");
         if ("1".equals(mqttOn)){
             logger.info("初始化MQTTstart ...........................");
-            mqttInitService.init();
+            mqttConnectorPoolInitService.init();
+            mqttThreadPoolInitService.init();
             logger.info("初始化MQTTend ...........................");
         }
     }

+ 3 - 1
src/main/java/com/care/common/service/CareEventOrderService.java

@@ -19,5 +19,7 @@ public interface CareEventOrderService extends IService<CareEventOrder> {
      */
     EventStaVO statOrder(Long orgId, Long stationId);
 
-    boolean saveOrder(CareEventOrder order,CareEventOrder orderDb);
+    boolean saveHisOrder(CareEventOrder order,CareEventOrder orderDb);
+
+    boolean saveOrder(CareEventOrder order);
 }

+ 90 - 82
src/main/java/com/care/common/service/impl/CareEventOrderServiceImpl.java

@@ -5,7 +5,6 @@ import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.date.DateUtil;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.care.bigscreen.service.BigScreenService;
 import com.care.common.cache.RedisKeyConstant;
 import com.care.common.cache.RedisUtil;
 import com.care.common.entity.*;
@@ -53,8 +52,6 @@ public class CareEventOrderServiceImpl extends ServiceImpl<CareEventOrderMapper,
     @Resource
     private SmsSendService smsSendService;
     @Resource
-    private BigScreenService bigScreenService;
-    @Resource
     private CareHouseOlderRelService careHouseOlderRelService;
 
     /**
@@ -69,95 +66,106 @@ public class CareEventOrderServiceImpl extends ServiceImpl<CareEventOrderMapper,
         return this.baseMapper.statOrder(orgId,stationId);
     }
 
+    /**
+     * 更新,不再生成新的告警工单,只作为子事件插入到当前工单的历史记录中
+     * @param order
+     * @param orderDb
+     * @return
+     */
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public boolean saveOrder(CareEventOrder order,CareEventOrder orderDb){
-
-        if(orderDb != null){//更新,不再生成新的告警工单,只作为子事件插入到当前工单的历史记录中
-
-            orderDb.setOrderType(order.getOrderType());
-            this.updateById(orderDb);//更新
-
-            order.setId(orderDb.getId());
-
-            CareEventOrderHandleHis his = new CareEventOrderHandleHis();
-            his.setOrgId(order.getOrgId());
-            his.setStationId(order.getStationId());
-            his.setOrderId(order.getId());
-            his.setHouseId(order.getHouseId());
-            his.setOrderType(order.getOrderType());
-            if (OrderTypeEnum.ZHU_DONG_HU_JIAO.getValue().equals(order.getOrderType()) || OrderTypeEnum.HU_WAI_HU_JIAO.getValue().equals(order.getOrderType())){
-                his.setLogType(LogTypeEnum.OLDER_EVENT.getValue());
-                his.setLogObjectId(order.getOlderId());
-                his.setLogObjectName(order.getOlderName());
-                his.setLogResult(OrderTypeEnum.getCodeToName(order.getOrderType()));
-                his.setRelationTypeDesc(RelationTypeEnum.OLDER.getName());
-            } else {
-                his.setLogType(LogTypeEnum.DEV_EVENT.getValue());
-                his.setLogObjectId(order.getDevId());
-                his.setLogObjectName(order.getDevName());
-                his.setLogResult("发生"+OrderTypeEnum.getCodeToName(order.getOrderType()));
-                his.setRelationTypeDesc(RelationTypeEnum.HOUSE.getName());
-            }
-            his.setCreateTime(order.getCreateTime());
-            this.careEventOrderHandleHisService.save(his);
+    public boolean saveHisOrder(CareEventOrder order,CareEventOrder orderDb){
+        orderDb.setOrderType(order.getOrderType());
+        this.updateById(orderDb);//更新
+        order.setId(orderDb.getId());
+
+        CareEventOrderHandleHis his = new CareEventOrderHandleHis();
+        his.setOrgId(order.getOrgId());
+        his.setStationId(order.getStationId());
+        his.setOrderId(order.getId());
+        his.setHouseId(order.getHouseId());
+        his.setOrderType(order.getOrderType());
+        if (OrderTypeEnum.ZHU_DONG_HU_JIAO.getValue().equals(order.getOrderType()) || OrderTypeEnum.HU_WAI_HU_JIAO.getValue().equals(order.getOrderType())){
+            his.setLogType(LogTypeEnum.OLDER_EVENT.getValue());
+            his.setLogObjectId(order.getOlderId());
+            his.setLogObjectName(order.getOlderName());
+            his.setLogResult(OrderTypeEnum.getCodeToName(order.getOrderType()));
+            his.setRelationTypeDesc(RelationTypeEnum.OLDER.getName());
+        } else {
+            his.setLogType(LogTypeEnum.DEV_EVENT.getValue());
+            his.setLogObjectId(order.getDevId());
+            his.setLogObjectName(order.getDevName());
+            his.setLogResult("发生"+OrderTypeEnum.getCodeToName(order.getOrderType()));
+            his.setRelationTypeDesc(RelationTypeEnum.HOUSE.getName());
+        }
+        his.setCreateTime(order.getCreateTime());
+        this.careEventOrderHandleHisService.save(his);
 
-        } else { //新增,生成新的告警事件工单
-            this.baseMapper.insert(order);
-            CareEventOrderHandleHis his = new CareEventOrderHandleHis();
-            his.setOrgId(order.getOrgId());
-            his.setStationId(order.getStationId());
-            his.setOrderId(order.getId());
-            his.setHouseId(order.getHouseId());
-            his.setOrderType(order.getOrderType());
-            if (OrderTypeEnum.ZHU_DONG_HU_JIAO.getValue().equals(order.getOrderType()) || OrderTypeEnum.HU_WAI_HU_JIAO.getValue().equals(order.getOrderType())){
-                his.setLogType(LogTypeEnum.OLDER_EVENT.getValue());
-                his.setLogObjectId(order.getOlderId());
-                his.setLogObjectName(order.getOlderName());
-                his.setLogResult(OrderTypeEnum.getCodeToName(order.getOrderType()));
-                his.setRelationTypeDesc(RelationTypeEnum.OLDER.getName());
-            }else{
-                his.setLogType(LogTypeEnum.DEV_EVENT.getValue());
-                his.setLogObjectId(order.getDevId());
-                his.setLogObjectName(order.getDevName());
-                his.setLogResult("发生"+OrderTypeEnum.getCodeToName(order.getOrderType()));
-                his.setRelationTypeDesc(RelationTypeEnum.HOUSE.getName());
-            }
-            his.setCreateTime(order.getCreateTime());
+        return true;
+    }
 
-            addOlderToOrderOlder(order);
-            addChamberlainToOrder(order);
-            List<CareEventOrderContactStatus> contacts = addContactToOrderContact(order);
-            Object switchSms = redisUtil.get(RedisKeyConstant.SWITCH_SMS);
-            if (switchSms != null){
-                //给联系人发送短信
-                if (CollUtil.isNotEmpty(contacts)){
-                    CareHouse house = this.careHouseService.getById(order.getHouseId());
-                    contacts.forEach(item ->{
-                        boolean smsResult = smsSendService.sendSmsToLianxiren(order.getOrderType(),item.getContactPhone(),house.getName());
-                        if (smsResult){
-                            CareEventOrderHandleHis his2 = new CareEventOrderHandleHis();
-                            his2.setOrgId(item.getOrgId());
-                            his2.setStationId(item.getStationId());
-                            his2.setOrderId(item.getOrderId());
-                            his2.setLogType(LogTypeEnum.SMS.getValue());
-                            his2.setLogObjectId(item.getContactId());
-                            his2.setLogObjectName(item.getContactName());
-                            his2.setLogResult("短信通知");
-                            his2.setOpUserRole(UserRoleEnum.SEAT.getValue());
-                            his2.setCreateTime(DateUtil.date());
-                            his2.setRelationTypeDesc(item.getRelationTypeDesc());
-                            this.careEventOrderHandleHisService.save(his2);
-                        }
-                    });
-                }
+    /**
+     * 新增,生成新的告警事件工单
+     * @param order
+     * @return
+     */
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public boolean saveOrder(CareEventOrder order){
+        this.baseMapper.insert(order);
+        CareEventOrderHandleHis his = new CareEventOrderHandleHis();
+        his.setOrgId(order.getOrgId());
+        his.setStationId(order.getStationId());
+        his.setOrderId(order.getId());
+        his.setHouseId(order.getHouseId());
+        his.setOrderType(order.getOrderType());
+        if (OrderTypeEnum.ZHU_DONG_HU_JIAO.getValue().equals(order.getOrderType()) || OrderTypeEnum.HU_WAI_HU_JIAO.getValue().equals(order.getOrderType())){
+            his.setLogType(LogTypeEnum.OLDER_EVENT.getValue());
+            his.setLogObjectId(order.getOlderId());
+            his.setLogObjectName(order.getOlderName());
+            his.setLogResult(OrderTypeEnum.getCodeToName(order.getOrderType()));
+            his.setRelationTypeDesc(RelationTypeEnum.OLDER.getName());
+        }else{
+            his.setLogType(LogTypeEnum.DEV_EVENT.getValue());
+            his.setLogObjectId(order.getDevId());
+            his.setLogObjectName(order.getDevName());
+            his.setLogResult("发生"+OrderTypeEnum.getCodeToName(order.getOrderType()));
+            his.setRelationTypeDesc(RelationTypeEnum.HOUSE.getName());
+        }
+        his.setCreateTime(order.getCreateTime());
+
+        addOlderToOrderOlder(order);
+        addChamberlainToOrder(order);
+        List<CareEventOrderContactStatus> contacts = addContactToOrderContact(order);
+        Object switchSms = redisUtil.get(RedisKeyConstant.SWITCH_SMS);
+        if (switchSms != null){
+            //给联系人发送短信
+            if (CollUtil.isNotEmpty(contacts)){
+                CareHouse house = this.careHouseService.getById(order.getHouseId());
+                contacts.forEach(item ->{
+                    boolean smsResult = smsSendService.sendSmsToLianxiren(order.getOrderType(),item.getContactPhone(),house.getName());
+                    if (smsResult){
+                        CareEventOrderHandleHis his2 = new CareEventOrderHandleHis();
+                        his2.setOrgId(item.getOrgId());
+                        his2.setStationId(item.getStationId());
+                        his2.setOrderId(item.getOrderId());
+                        his2.setLogType(LogTypeEnum.SMS.getValue());
+                        his2.setLogObjectId(item.getContactId());
+                        his2.setLogObjectName(item.getContactName());
+                        his2.setLogResult("短信通知");
+                        his2.setOpUserRole(UserRoleEnum.SEAT.getValue());
+                        his2.setCreateTime(DateUtil.date());
+                        his2.setRelationTypeDesc(item.getRelationTypeDesc());
+                        this.careEventOrderHandleHisService.save(his2);
+                    }
+                });
             }
             this.careEventOrderHandleHisService.save(his);
-
         }
         return true;
     }
 
+
     /**
      * 将老人加入到 工单中
      * @param order