فهرست منبع

修改 创建工单并发的 bug

suntianwu 3 سال پیش
والد
کامیت
ba37dcdd4c

+ 0 - 3
src/main/java/com/care/client/service/PinanbaoService.java

@@ -2,12 +2,10 @@ package com.care.client.service;
 
 import cn.hutool.core.bean.BeanUtil;
 import cn.hutool.core.collection.CollUtil;
-import cn.hutool.core.date.DateUtil;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.care.client.vo.*;
 import com.care.common.entity.*;
 import com.care.common.enums.*;
-import com.care.common.exception.BDException;
 import com.care.common.service.*;
 import com.care.common.vo.device.DeviceVO;
 import com.care.common.vo.order.*;
@@ -18,7 +16,6 @@ import org.springframework.web.multipart.MultipartFile;
 
 import javax.annotation.Resource;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 

+ 15 - 9
src/main/java/com/care/common/vo/device/MqttMsgVO.java

@@ -1,15 +1,9 @@
 package com.care.common.vo.device;
 
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableField;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
-import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.experimental.Accessors;
+
 
 import java.io.Serializable;
 import java.util.Date;
@@ -26,7 +20,6 @@ import java.util.Date;
 public class MqttMsgVO implements Serializable {
     private static final long serialVersionUID = -60036036914347085L;
 
-    @TableId(value = "ID", type = IdType.AUTO)
     private Long id;
 
 
@@ -37,12 +30,25 @@ public class MqttMsgVO implements Serializable {
     @ApiModelProperty("topic")
     private String topic;
 
+    @ApiModelProperty("消息类型:呼吸心率信息 BreathHeartRate,跌到 FallDown,久滞 StayTooLong")
+    private String type;
+
 
     @ApiModelProperty("设备ID")
     private Long devId;
 
+    @ApiModelProperty("状态:0,未处理;1,已取消;2,已处理")
+    private String status;
+
+    @ApiModelProperty("延迟时间(单位:分钟)")
+    private Integer delayTime;
+
+    @ApiModelProperty("执行时间(单位:毫秒)")
+    private Long exeTime;
+
     @ApiModelProperty("createTime")
-    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
     private Date createTime;
 
+    @ApiModelProperty("modifyTime")
+    private Date modifyTime;
 }

+ 98 - 80
src/main/java/com/care/mqtt/service/MqttMsgRedisService.java

@@ -1,10 +1,10 @@
 package com.care.mqtt.service;
 
 
+import cn.hutool.core.bean.BeanUtil;
 import cn.hutool.core.collection.CollUtil;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.care.bigscreen.service.BigScreenService;
-import com.care.common.cache.RedisKeyConstant;
 import com.care.common.cache.RedisUtil;
 import com.care.common.constant.Constants;
 import com.care.common.entity.CareDevice;
@@ -16,13 +16,15 @@ import com.care.common.enums.OrderTypeEnum;
 import com.care.common.service.CareDeviceService;
 import com.care.common.service.CareEventOrderService;
 import com.care.common.service.CareMqttMsgService;
+import com.care.common.vo.device.MqttMsgVO;
+import org.apache.commons.compress.utils.Lists;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
 import java.util.Date;
 import java.util.List;
 import java.util.Set;
-import java.util.UUID;
+
 
 
 /**
@@ -53,24 +55,35 @@ public class MqttMsgRedisService {
     public void init() {
 
         //查询mqtt msg 表有延迟标记的未处理消息,放入线程池,等待延时时间结束执行创建工单
-        QueryWrapper<CareMqttMsg> queryWrapper2 = new QueryWrapper<>();
-        queryWrapper2.lambda().eq(CareMqttMsg::getStatus, MqttMsgStatusEnum.NO_HANDLE.getValue()).gt(CareMqttMsg::getDelayTime,0)
+        QueryWrapper<CareMqttMsg> queryWrapper = new QueryWrapper<>();
+        queryWrapper.lambda().eq(CareMqttMsg::getStatus, MqttMsgStatusEnum.NO_HANDLE.getValue()).gt(CareMqttMsg::getDelayTime,0)
         .orderByAsc(CareMqttMsg::getCreateTime);
 
-        List<CareMqttMsg> mqttMsgList = careMqttMsgService.list(queryWrapper2);
+        List<CareMqttMsg> mqttMsgList = careMqttMsgService.list(queryWrapper);
         if (CollUtil.isNotEmpty(mqttMsgList)) {
-            for(CareMqttMsg careMqttMsg : mqttMsgList){
+            List<MqttMsgVO> list = Lists.newArrayList();
+
+            for(CareMqttMsg careMqttMsg : mqttMsgList) {
+                MqttMsgVO vo = new MqttMsgVO();
+                BeanUtil.copyProperties(careMqttMsg, vo);
+
+                Long createTime = vo.getCreateTime().getTime();
+                Long delayTime = vo.getDelayTime() * 60 * 1000L;
+                Long exeTime = createTime + delayTime;
+                vo.setExeTime(exeTime);
 
+                list.add(vo);
+            }
+            //按执行时间排序
+            List<MqttMsgVO> listNew = CollUtil.sortByProperty(list,"exeTime");
+            for(MqttMsgVO mqttMsgVO : listNew) {
                 Long now = System.currentTimeMillis();
-                Long createTime = careMqttMsg.getCreateTime().getTime();
-                Long delayTime = careMqttMsg.getDelayTime() * 60 * 1000L;
-                Long execTime = createTime + delayTime;
 
-                if(execTime <= now) { //立即执行
-                    exe(careMqttMsg);
+                if(mqttMsgVO.getExeTime() <= now) { //立即执行
+                    exe(mqttMsgVO);
                     continue;
                 }
-                addRedis(careMqttMsg,execTime);
+                addRedis(mqttMsgVO, mqttMsgVO.getExeTime());
             }
 
         }
@@ -80,12 +93,12 @@ public class MqttMsgRedisService {
 
     /**
      * 把延迟消息放入redis
-     * @param careMqttMsg
-     * @param execTime
+     * @param mqttMsgVO
+     * @param exeTime
      */
-    public void addRedis(CareMqttMsg careMqttMsg, long execTime){
-        String msg = String.valueOf(careMqttMsg.getId());
-        redisUtil.zSetAdd(Constants.MQTT_MSG_LAG_KEY, msg, execTime);
+    public void addRedis(MqttMsgVO mqttMsgVO, long exeTime){
+        String msg = String.valueOf(mqttMsgVO.getId());
+        redisUtil.zSetAdd(Constants.MQTT_MSG_LAG_KEY, msg, exeTime);
     }
 
     /**
@@ -98,13 +111,30 @@ public class MqttMsgRedisService {
         //查询符合时间条件下的集合
         Set set = redisUtil.zSetRangeByScore(Constants.MQTT_MSG_LAG_KEY, startTime, endTime);
 
+
         if (CollUtil.isNotEmpty(set)) {
             //具体业务操作
+            List<MqttMsgVO> list = Lists.newArrayList();
             set.forEach(item -> {
                 CareMqttMsg careMqttMsg = careMqttMsgService.getById(Long.parseLong(String.valueOf(item)));
-                exe(careMqttMsg);
+                MqttMsgVO vo = new MqttMsgVO();
+                BeanUtil.copyProperties(careMqttMsg, vo);
+
+                Long createTime = vo.getCreateTime().getTime();
+                Long delayTime = vo.getDelayTime() * 60 * 1000L;
+                Long exeTime = createTime + delayTime;
+                vo.setExeTime(exeTime);
+
+                list.add(vo);
             });
 
+            //按执行时间排序
+            List<MqttMsgVO> listNew = CollUtil.sortByProperty(list,"exeTime");
+
+            for(MqttMsgVO mqttMsgVO : listNew) {
+                exe(mqttMsgVO);
+            }
+
             //移除集合
             redisUtil.zSetRemoveRangeByScore(Constants.MQTT_MSG_LAG_KEY, startTime, endTime);
         }
@@ -112,71 +142,59 @@ public class MqttMsgRedisService {
 
     /**
      *  执行创建工单动作
-     * @param careMqttMsg
+     * @param mqttMsgVO
      */
 
-    private void exe(CareMqttMsg careMqttMsg){
-        String key = RedisKeyConstant.CREATE_ORDER + ":" + careMqttMsg.getDevId();
-        String requestId = UUID.randomUUID().toString();
-        boolean result = redisUtil.tryLock(key,requestId,10);
-        try {
-            if (result) {
-                String status = careMqttMsg.getStatus();
-                if(!MqttMsgStatusEnum.NO_HANDLE.getValue().equals(status)){ //不是未处理,不执行。
-                    return;
-                }
+    private void exe(MqttMsgVO mqttMsgVO){
+          CareMqttMsg careMqttMsg = new CareMqttMsg();
+          BeanUtil.copyProperties(mqttMsgVO, careMqttMsg);
+
+          String status = careMqttMsg.getStatus();
+          if(!MqttMsgStatusEnum.NO_HANDLE.getValue().equals(status)){ //不是未处理,不执行。
+              return;
+          }
+
+          CareDevice careDevice = careDeviceService.getById(careMqttMsg.getDevId());
+          CareEventOrder order = mqttMsgService.getNewOrder(careDevice);
+          if(order == null){
+              careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
+              careMqttMsg.setModifyTime(new Date());
+              this.careMqttMsgService.updateById(careMqttMsg);
+              return;
+          }
+          order.setCreateTime(careMqttMsg.getCreateTime());
+          order.setModifyTime(careMqttMsg.getCreateTime());
+          order.setStatus(OrderStatusEnum.TODO.getValue());
+          if ("FallDown".equals(careMqttMsg.getType())) { //跌到
+              order.setOrderType(OrderTypeEnum.DI_EDAO.getValue());
+          } else if("StayTooLong".equals(careMqttMsg.getType())){ //久滞
+              order.setOrderType(OrderTypeEnum.JIU_ZHI.getValue());
+          }
+
+          //判断房屋是否有未完成的告警事件,不区分类型
+          QueryWrapper<CareEventOrder> queryWrapper2 = new QueryWrapper<>();
+          queryWrapper2.lambda().eq(CareEventOrder::getDevId,careDevice.getId()).and(
+                  wrapper-> wrapper.eq(CareEventOrder::getStatus, OrderStatusEnum.TODO.getValue())
+                          .or().eq(CareEventOrder::getStatus,OrderStatusEnum.DOING.getValue()));
+
+          CareEventOrder orderDb = careEventOrderService.getOne(queryWrapper2);
+          if(orderDb != null) { //有未完成的告警事件,不再生成新的告警工单,只作为子事件插入到当前工单的历史记录中。
+              careEventOrderService.saveHisOrder(order,orderDb);
+              //通知页面
+              this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate");
+              this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
+          }  else { //无未完成的告警事件
+              //生成新的工单
+              this.careEventOrderService.saveOrder(order);
+              //通知页面
+              this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderAdd");
+              this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
+          }
+
+          careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
+          careMqttMsg.setModifyTime(new Date());
+          this.careMqttMsgService.updateById(careMqttMsg);
 
-                CareDevice careDevice = careDeviceService.getById(careMqttMsg.getDevId());
-                CareEventOrder order = mqttMsgService.getNewOrder(careDevice);
-                if(order == null){
-                    careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
-                    careMqttMsg.setModifyTime(new Date());
-                    this.careMqttMsgService.updateById(careMqttMsg);
-                    return;
-                }
-                order.setCreateTime(careMqttMsg.getCreateTime());
-                order.setModifyTime(careMqttMsg.getCreateTime());
-                order.setStatus(OrderStatusEnum.TODO.getValue());
-                if ("FallDown".equals(careMqttMsg.getType())) { //跌到
-                    order.setOrderType(OrderTypeEnum.DI_EDAO.getValue());
-                } else if("StayTooLong".equals(careMqttMsg.getType())){ //久滞
-                    order.setOrderType(OrderTypeEnum.JIU_ZHI.getValue());
-                }
-
-                //判断房屋是否有未完成的告警事件,不区分类型
-                QueryWrapper<CareEventOrder> queryWrapper2 = new QueryWrapper<>();
-                queryWrapper2.lambda().eq(CareEventOrder::getDevId,careDevice.getId()).and(
-                        wrapper-> wrapper.eq(CareEventOrder::getStatus, OrderStatusEnum.TODO.getValue())
-                                .or().eq(CareEventOrder::getStatus,OrderStatusEnum.DOING.getValue()));
-
-                CareEventOrder orderDb = careEventOrderService.getOne(queryWrapper2);
-                if(orderDb != null) { //有未完成的告警事件,不再生成新的告警工单,只作为子事件插入到当前工单的历史记录中。
-                    careEventOrderService.saveHisOrder(order,orderDb);
-                    //通知页面
-                    this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate");
-                    this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
-                }  else { //无未完成的告警事件
-                    //生成新的工单
-                    this.careEventOrderService.saveOrder(order);
-                    //通知页面
-                    this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderAdd");
-                    this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
-                }
-
-                careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
-                careMqttMsg.setModifyTime(new Date());
-                this.careMqttMsgService.updateById(careMqttMsg);
-            } else {
-                try {
-                    Thread.sleep(3000);
-                    exe(careMqttMsg);
-                } catch (Exception e){
-                    e.printStackTrace();
-                }
-            }
-        } finally {
-            redisUtil.releaseLock(key,requestId);
-        }
     }
 
 }

+ 5 - 1
src/main/java/com/care/mqtt/service/MqttMsgService.java

@@ -1,6 +1,7 @@
 package com.care.mqtt.service;
 
 
+import cn.hutool.core.bean.BeanUtil;
 import cn.hutool.core.date.DateUtil;
 import com.alibaba.fastjson.JSON;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -17,6 +18,7 @@ import com.care.common.enums.MqttMsgStatusEnum;
 import com.care.common.enums.OrderStatusEnum;
 import com.care.common.enums.OrderTypeEnum;
 import com.care.common.service.*;
+import com.care.common.vo.device.MqttMsgVO;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -230,7 +232,9 @@ public class MqttMsgService {
                             this.careMqttMsgService.save(careMqttMsg);
 
                             //进行调度处理
-                            mqttMsgRedisService.addRedis(careMqttMsg, date.getTime() + careMqttMsg.getDelayTime() * 60 * 1000);
+                            MqttMsgVO mqttMsgVO = new MqttMsgVO();
+                            BeanUtil.copyProperties(careMqttMsg, mqttMsgVO);
+                            mqttMsgRedisService.addRedis(mqttMsgVO, date.getTime() + careMqttMsg.getDelayTime() * 60 * 1000);
                         } else { //取消: 更新mqtt消息为已取消,等待执行的调度到时会取消执行
                             careMqttMsg.setStatus(MqttMsgStatusEnum.CANCEL.getValue());
                             this.careMqttMsgService.save(careMqttMsg);