MqttMsgService.java 11 KB


  1. package com.care.bms.service;
  2. import cn.hutool.core.date.DateUtil;
  3. import com.alibaba.fastjson.JSON;
  4. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  5. import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
  6. import com.care.bigscreen.service.BigScreenService;
  7. import com.care.common.entity.*;
  8. import com.care.common.enums.ChambOrderStatusEnum;
  9. import com.care.common.enums.MqttMsgStatusEnum;
  10. import com.care.common.enums.OrderStatusEnum;
  11. import com.care.common.enums.OrderTypeEnum;
  12. import com.care.common.service.*;
  13. import org.apache.commons.lang3.StringUtils;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16. import org.springframework.stereotype.Service;
  17. import javax.annotation.Resource;
  18. import java.util.Date;
  19. import java.util.Map;
  20. /**
  21. * @Author: stw
  22. * @Date: 2021/8/14
  23. * @Desc:
  24. */
  25. @Service
  26. public class MqttMsgService {
  27. @Resource
  28. private CareHouseService careHouseService;
  29. @Resource
  30. private CareStationService careStationService;
  31. @Resource
  32. private CareDeviceService careDeviceService;
  33. @Resource
  34. private CareEventOrderService careEventOrderService;
  35. @Resource
  36. private CareOrganizationService careOrganizationService;
  37. @Resource
  38. private BigScreenService bigScreenService;
  39. @Resource
  40. private CareMqttMsgService careMqttMsgService;
  41. @Resource
  42. private CareEventOrderChambService careEventOrderChambService;
  43. @Resource
  44. private MqttThreadPoolInitService MqttThreadPoolInitService;
  45. private static Logger logger = LoggerFactory.getLogger(MqttMsgService.class);
  46. /**
  47. * 处理mqtt消息
  48. * @param topic
  49. * @param mqttMessageStr
  50. * @throws Exception
  51. */
  52. public void handleMessage(String topic, String mqttMessageStr) throws Exception {
  53. try {
  54. Map map = JSON.parseObject(mqttMessageStr);
  55. String type = (String) map.get("type");
  56. if(StringUtils.isEmpty(type)){
  57. logger.error("不是正确格式的消息");
  58. return;
  59. }
  60. QueryWrapper<CareDevice> queryWrapper = new QueryWrapper<>();
  61. queryWrapper.lambda().eq(CareDevice::getTopic,topic);
  62. CareDevice careDevice = careDeviceService.getOne(queryWrapper);
  63. if(careDevice == null) {
  64. logger.error("数据库中没找到对应设备");
  65. return;
  66. }
  67. Date date = DateUtil.date();
  68. CareMqttMsg careMqttMsg = new CareMqttMsg();
  69. careMqttMsg.setMqttMessage(mqttMessageStr);
  70. careMqttMsg.setTopic(topic);
  71. careMqttMsg.setDevId(careDevice.getId());
  72. careMqttMsg.setType(type);
  73. careMqttMsg.setCreateTime(date);
  74. if ("BreathHeartRate".equals(type)) {
  75. careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
  76. this.careMqttMsgService.save(careMqttMsg);
  77. Map msg = (Map) map.get("msg");
  78. Integer breath = (Integer) msg.get("breath");
  79. Integer heart = (Integer) msg.get("heart");
  80. careDevice.setBreathRate(breath);
  81. careDevice.setHeartRate(heart);
  82. careDeviceService.updateById(careDevice);
  83. return;
  84. }
  85. if( careDevice.getHouseId() == null ) {
  86. logger.error("设备没有配置房屋地址");
  87. return;
  88. }
  89. CareEventOrder order = getNewOrder(careDevice);
  90. order.setCreateTime(date);
  91. order.setModifyTime(date);
  92. Map msgMap = (Map)map.get("msg");
  93. Integer conf = (Integer)msgMap.get("conf");
  94. if(conf != 255) {
  95. order.setStatus(OrderStatusEnum.TODO.getValue());
  96. } else {//取消
  97. order.setStatus(OrderStatusEnum.CANCEL.getValue());
  98. }
  99. if ("FallDown".equals(type)) { //跌到
  100. order.setOrderType(OrderTypeEnum.DI_EDAO.getValue());
  101. } else if("StayTooLong".equals(type)){
  102. order.setOrderType(OrderTypeEnum.JIU_ZHI.getValue());
  103. }
  104. //判断房屋是否有未完成的告警事件,不区分类型
  105. QueryWrapper<CareEventOrder> queryWrapper2 = new QueryWrapper<>();
  106. queryWrapper2.lambda().eq(CareEventOrder::getHouseId,careDevice.getHouseId()).and(
  107. wrapper-> wrapper.eq(CareEventOrder::getStatus, OrderStatusEnum.TODO.getValue())
  108. .or().eq(CareEventOrder::getStatus,OrderStatusEnum.DOING.getValue()));
  109. CareEventOrder orderDb = careEventOrderService.getOne(queryWrapper2);
  110. if(orderDb != null) { //有未完成的告警事件,不再生成新的告警工单,只作为子事件插入到当前工单的历史记录中。
  111. if(conf != 255) {
  112. careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
  113. this.careMqttMsgService.save(careMqttMsg);
  114. careEventOrderService.saveHisOrder(order,orderDb);
  115. //通知页面
  116. this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate");
  117. this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
  118. } else { //取消
  119. careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
  120. this.careMqttMsgService.save(careMqttMsg);
  121. if(orderDb.getOrderType().equals(order.getOrderType())){//看取消的类型,和当前一致,则 更新当前的工单状态为取消,记录一条取消his,管家工单标记为取消
  122. //更新当前的工单状态为取消
  123. orderDb.setStatus(OrderStatusEnum.CANCEL.getValue());
  124. this.careEventOrderService.updateById(orderDb);
  125. //管家工单标记为取消
  126. CareEventOrderChamb careEventOrderChamb = this.careEventOrderChambService.getChambOrderByOrderId(orderDb.getId());
  127. careEventOrderChamb.setStatus(ChambOrderStatusEnum.CANCEL.getValue());
  128. this.careEventOrderChambService.updateById(careEventOrderChamb);
  129. //记录一条取消his
  130. careEventOrderService.saveHisOrder(order,orderDb);
  131. //通知页面
  132. this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate");
  133. this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
  134. } else { // 不一致,则只记录一条取消his
  135. //记录一条取消his
  136. careEventOrderService.saveHisOrder(order,orderDb);
  137. }
  138. }
  139. } else { //无未完成的告警事件
  140. Integer delayTime = getDelayTime(type,careDevice);
  141. if (delayTime == null || delayTime == 0){ //没有配置响应时间,立即生成新的告警事件工单
  142. if(conf != 255) {
  143. careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
  144. this.careMqttMsgService.save(careMqttMsg);
  145. //生成新的工单
  146. careEventOrderService.saveOrder(order);
  147. //通知页面
  148. this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderAdd");
  149. this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
  150. } else { //取消 不处理工单
  151. careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
  152. this.careMqttMsgService.save(careMqttMsg);
  153. }
  154. } else { //有配置响应时间,不处理工单,进行调度处理
  155. if(conf != 255) { //不是取消
  156. careMqttMsg.setStatus(MqttMsgStatusEnum.NO_HANDLE.getValue());
  157. careMqttMsg.setDelayTime(delayTime);
  158. this.careMqttMsgService.save(careMqttMsg);
  159. //进行调度处理
  160. MqttThreadPoolInitService.addScheduled(careMqttMsg,careMqttMsg.getDelayTime() * 60 * 1000);
  161. } else { //取消: 更新mqtt消息为已取消,等待执行的调度到时会取消执行
  162. careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
  163. this.careMqttMsgService.save(careMqttMsg);
  164. //更新mqtt消息为已取消
  165. UpdateWrapper<CareMqttMsg> updateWrapper = new UpdateWrapper<>();
  166. updateWrapper.lambda().eq(CareMqttMsg::getDevId,careDevice.getId()).eq(CareMqttMsg::getType,type).eq(CareMqttMsg::getStatus,MqttMsgStatusEnum.NO_HANDLE.getValue())
  167. .set(CareMqttMsg::getStatus,MqttMsgStatusEnum.CANCEL.getValue());
  168. this.careMqttMsgService.update(updateWrapper);
  169. }
  170. }
  171. }
  172. } catch (Exception e){
  173. e.printStackTrace();
  174. }
  175. }
  176. private Integer getDelayTime(String type, CareDevice careDevice) {
  177. if ("FallDown".equals(type)) { //跌到
  178. return careDevice.getFallResponseTime();
  179. } else if ("StayTooLong".equals(type)) { //久滞
  180. return careDevice.getLonglagResponseTime();
  181. } else {
  182. return 0;
  183. }
  184. }
  185. public CareEventOrder getNewOrder(CareDevice careDevice) {
  186. CareHouse careHouse = careHouseService.getById(careDevice.getHouseId());
  187. if (careHouse == null){
  188. logger.error("设备没有配置房屋地址");
  189. return null;
  190. }
  191. CareEventOrder order = new CareEventOrder();
  192. order.setOrgId(careDevice.getOrgId());
  193. order.setOrgName(careOrganizationService.getById(careDevice.getOrgId()).getName());
  194. order.setStationId(careDevice.getStationId());
  195. order.setStationName(careStationService.getById(careDevice.getStationId()).getName());
  196. order.setHouseId(careHouse.getId());
  197. order.setDevId(careDevice.getId());
  198. order.setDevName(careDevice.getDevName());
  199. order.setHouseName(careHouse.getName());
  200. order.setMemberId(careDevice.getMemberId());
  201. order.setTitle(careHouse.getAddr());
  202. return order;
  203. }
  204. }