MqttMsgService.java 14 KB


  1. package com.care.mqtt.service;
  2. import cn.hutool.core.date.DateUtil;
  3. import com.alibaba.fastjson.JSON;
  4. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  5. import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
  6. import com.care.bigscreen.service.BigScreenService;
  7. import com.care.common.entity.*;
  8. import com.care.common.enums.ChambOrderStatusEnum;
  9. import com.care.common.enums.MqttMsgStatusEnum;
  10. import com.care.common.enums.OrderStatusEnum;
  11. import com.care.common.enums.OrderTypeEnum;
  12. import com.care.common.service.*;
  13. import org.apache.commons.lang3.StringUtils;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16. import org.springframework.stereotype.Service;
  17. import javax.annotation.Resource;
  18. import java.util.Date;
  19. import java.util.Map;
  20. /**
  21. * @Author: stw
  22. * @Date: 2021/8/14
  23. * @Desc:
  24. */
  25. @Service
  26. public class MqttMsgService {
  27. @Resource
  28. private CareHouseService careHouseService;
  29. @Resource
  30. private CareStationService careStationService;
  31. @Resource
  32. private CareDeviceService careDeviceService;
  33. @Resource
  34. private CareEventOrderService careEventOrderService;
  35. @Resource
  36. private CareOrganizationService careOrganizationService;
  37. @Resource
  38. private BigScreenService bigScreenService;
  39. @Resource
  40. private CareMqttMsgService careMqttMsgService;
  41. @Resource
  42. private CareEventOrderChambService careEventOrderChambService;
  43. @Resource
  44. private MqttMsgRedisService mqttMsgRedisService;
  45. @Resource
  46. private CareRoomSensorService careRoomSensorService;
  47. @Resource
  48. private CareRoomService careRoomService;
  49. @Resource
  50. private CareRoomGateService careRoomGateService;
  51. @Resource
  52. private CareRoomRegionService careRoomRegionService;
  53. private static Logger logger = LoggerFactory.getLogger(MqttMsgService.class);
  54. /**
  55. * 处理mqtt消息
  56. * @param topic
  57. * @param mqttMessageStr
  58. * @throws Exception
  59. */
  60. public void handleMessage(String topic, String mqttMessageStr) throws Exception {
  61. if(topic.contains("event")){ //工单的消息
  62. handleOrderEventMessage(topic,mqttMessageStr);
  63. } else { //房间信息配置的返回消息
  64. handleConfigureRoomInfoResultMessage(topic,mqttMessageStr);
  65. }
  66. }
  67. /**
  68. * 查询配置房间信息的结果消息 处理
  69. * @param topic
  70. * @param mqttMessageStr
  71. */
  72. private void handleConfigureRoomInfoResultMessage(String topic, String mqttMessageStr) {
  73. Map map = JSON.parseObject(mqttMessageStr);
  74. String type = (String) map.get("type");
  75. if(StringUtils.isEmpty(type)) {
  76. logger.error("不是正确格式的消息");
  77. return;
  78. }
  79. Map msg = (Map) map.get("msg");
  80. if(msg == null) {
  81. logger.error("不是正确格式的消息");
  82. return;
  83. }
  84. String ack = (String)msg.get("ack");
  85. if(StringUtils.isEmpty(ack)) {
  86. logger.error("不是激活返回消息");
  87. return;
  88. }
  89. logger.info("是激活返回消息: ack == " + ack);
  90. QueryWrapper<CareDevice> queryWrapper = new QueryWrapper<>();
  91. queryWrapper.lambda().eq(CareDevice::getTopic,topic.replaceAll("control","event"));
  92. CareDevice careDevice = careDeviceService.getOne(queryWrapper);
  93. if(careDevice == null) {
  94. logger.error("数据库中没找到对应设备");
  95. return;
  96. }
  97. String devCode = careDevice.getDevCode();
  98. if("CfgAreaAck".equals(type)) { //房间信息
  99. UpdateWrapper<CareRoom> updateWrapper = new UpdateWrapper<>();
  100. updateWrapper.lambda().eq(CareRoom::getRoomId,devCode).set(CareRoom::getAck,ack);
  101. careRoomService.update(updateWrapper);
  102. } else if("CfgDevicePositionAck".equals(type)) { //传感器
  103. UpdateWrapper<CareRoomSensor> updateWrapper = new UpdateWrapper<>();
  104. updateWrapper.lambda().eq(CareRoomSensor::getRoomId,devCode).set(CareRoomSensor::getAck,ack);
  105. careRoomSensorService.update(updateWrapper);
  106. } else if("CfgGatesAck".equals(type)) { //门
  107. UpdateWrapper<CareRoomGate> updateWrapper = new UpdateWrapper<>();
  108. updateWrapper.lambda().eq(CareRoomGate::getRoomId,devCode).set(CareRoomGate::getAck,ack);
  109. careRoomGateService.update(updateWrapper);
  110. } else if("CfgRegionsAck".equals(type)) { //区域
  111. UpdateWrapper<CareRoomRegion> updateWrapper = new UpdateWrapper<>();
  112. updateWrapper.lambda().eq(CareRoomRegion::getRoomId,devCode).set(CareRoomRegion::getAck,ack);
  113. careRoomRegionService.update(updateWrapper);
  114. }
  115. }
  116. /**
  117. * 工单事件的消息 处理
  118. * @param topic
  119. * @param mqttMessageStr
  120. */
  121. private void handleOrderEventMessage(String topic, String mqttMessageStr) {
  122. try {
  123. Map map = JSON.parseObject(mqttMessageStr);
  124. String type = (String) map.get("type");
  125. if(StringUtils.isEmpty(type)){
  126. logger.error("不是正确格式的消息");
  127. return;
  128. }
  129. QueryWrapper<CareDevice> queryWrapper = new QueryWrapper<>();
  130. queryWrapper.lambda().eq(CareDevice::getTopic,topic);
  131. CareDevice careDevice = careDeviceService.getOne(queryWrapper);
  132. if(careDevice == null) {
  133. logger.error("数据库中没找到对应设备");
  134. return;
  135. }
  136. Date date = DateUtil.date();
  137. CareMqttMsg careMqttMsg = new CareMqttMsg();
  138. careMqttMsg.setMqttMessage(mqttMessageStr);
  139. careMqttMsg.setTopic(topic);
  140. careMqttMsg.setDevId(careDevice.getId());
  141. careMqttMsg.setType(type);
  142. careMqttMsg.setCreateTime(date);
  143. if ("BreathHeartRate".equals(type)) {
  144. careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
  145. this.careMqttMsgService.save(careMqttMsg);
  146. Map msg = (Map) map.get("msg");
  147. Integer breath = (Integer) msg.get("breath");
  148. Integer heart = (Integer) msg.get("heart");
  149. careDevice.setBreathRate(breath);
  150. careDevice.setHeartRate(heart);
  151. careDeviceService.updateById(careDevice);
  152. return;
  153. }
  154. if( careDevice.getHouseId() == null ) {
  155. logger.error("设备没有配置房屋地址");
  156. return;
  157. }
  158. CareEventOrder order = getNewOrder(careDevice);
  159. order.setCreateTime(date);
  160. order.setModifyTime(date);
  161. Map msgMap = (Map)map.get("msg");
  162. Integer conf = (Integer)msgMap.get("conf");
  163. if(conf != 255) {
  164. order.setStatus(OrderStatusEnum.TODO.getValue());
  165. } else {//取消
  166. order.setStatus(OrderStatusEnum.CANCEL.getValue());
  167. }
  168. if ("FallDown".equals(type)) { //跌到
  169. order.setOrderType(OrderTypeEnum.DI_EDAO.getValue());
  170. } else if("StayTooLong".equals(type)){
  171. order.setOrderType(OrderTypeEnum.JIU_ZHI.getValue());
  172. }
  173. //判断房屋是否有未完成的告警事件,不区分类型
  174. QueryWrapper<CareEventOrder> queryWrapper2 = new QueryWrapper<>();
  175. queryWrapper2.lambda().eq(CareEventOrder::getHouseId,careDevice.getHouseId()).and(
  176. wrapper-> wrapper.eq(CareEventOrder::getStatus, OrderStatusEnum.TODO.getValue())
  177. .or().eq(CareEventOrder::getStatus,OrderStatusEnum.DOING.getValue()));
  178. CareEventOrder orderDb = careEventOrderService.getOne(queryWrapper2);
  179. if(orderDb != null) { //有未完成的告警事件,不再生成新的告警工单,只作为子事件插入到当前工单的历史记录中。
  180. if(conf != 255) {
  181. careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
  182. this.careMqttMsgService.save(careMqttMsg);
  183. careEventOrderService.saveHisOrder(order,orderDb);
  184. //通知页面
  185. this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate");
  186. this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
  187. } else { //取消
  188. careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
  189. this.careMqttMsgService.save(careMqttMsg);
  190. if(orderDb.getOrderType().equals(order.getOrderType())){//看取消的类型,和当前一致,则 更新当前的工单状态为取消,记录一条取消his,管家工单标记为取消
  191. //更新当前的工单状态为取消
  192. orderDb.setStatus(OrderStatusEnum.CANCEL.getValue());
  193. this.careEventOrderService.updateById(orderDb);
  194. //管家工单标记为取消
  195. CareEventOrderChamb careEventOrderChamb = this.careEventOrderChambService.getChambOrderByOrderId(orderDb.getId());
  196. careEventOrderChamb.setStatus(ChambOrderStatusEnum.CANCEL.getValue());
  197. this.careEventOrderChambService.updateById(careEventOrderChamb);
  198. //记录一条取消his
  199. careEventOrderService.saveHisOrder(order,orderDb);
  200. //通知页面
  201. this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate");
  202. this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
  203. } else { // 不一致,则只记录一条取消his
  204. //记录一条取消his
  205. careEventOrderService.saveHisOrder(order,orderDb);
  206. }
  207. }
  208. } else { //无未完成的告警事件
  209. Integer delayTime = getDelayTime(type,careDevice);
  210. if (delayTime == null || delayTime == 0){ //没有配置响应时间,立即生成新的告警事件工单
  211. if(conf != 255) {
  212. careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
  213. this.careMqttMsgService.save(careMqttMsg);
  214. //生成新的工单
  215. careEventOrderService.saveOrder(order);
  216. //通知页面
  217. this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderAdd");
  218. this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
  219. } else { //取消 不处理工单
  220. careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
  221. this.careMqttMsgService.save(careMqttMsg);
  222. }
  223. } else { //有配置响应时间,不处理工单,进行调度处理
  224. if(conf != 255) { //不是取消
  225. careMqttMsg.setStatus(MqttMsgStatusEnum.NO_HANDLE.getValue());
  226. careMqttMsg.setDelayTime(delayTime);
  227. this.careMqttMsgService.save(careMqttMsg);
  228. //进行调度处理
  229. mqttMsgRedisService.addRedis(careMqttMsg,date.getTime() + careMqttMsg.getDelayTime() * 60 * 1000);
  230. } else { //取消: 更新mqtt消息为已取消,等待执行的调度到时会取消执行
  231. careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
  232. this.careMqttMsgService.save(careMqttMsg);
  233. //更新mqtt消息为已取消
  234. UpdateWrapper<CareMqttMsg> updateWrapper = new UpdateWrapper<>();
  235. updateWrapper.lambda().eq(CareMqttMsg::getDevId,careDevice.getId()).eq(CareMqttMsg::getType,type).eq(CareMqttMsg::getStatus,MqttMsgStatusEnum.NO_HANDLE.getValue())
  236. .set(CareMqttMsg::getStatus,MqttMsgStatusEnum.CANCEL.getValue());
  237. this.careMqttMsgService.update(updateWrapper);
  238. }
  239. }
  240. }
  241. } catch (Exception e){
  242. e.printStackTrace();
  243. }
  244. }
  245. private Integer getDelayTime(String type, CareDevice careDevice) {
  246. if ("FallDown".equals(type)) { //跌到
  247. return careDevice.getFallResponseTime();
  248. } else if ("StayTooLong".equals(type)) { //久滞
  249. return careDevice.getLonglagResponseTime();
  250. } else {
  251. return 0;
  252. }
  253. }
  254. public CareEventOrder getNewOrder(CareDevice careDevice) {
  255. CareHouse careHouse = careHouseService.getById(careDevice.getHouseId());
  256. if (careHouse == null){
  257. logger.error("设备没有配置房屋地址");
  258. return null;
  259. }
  260. CareEventOrder order = new CareEventOrder();
  261. order.setOrgId(careDevice.getOrgId());
  262. order.setOrgName(careOrganizationService.getById(careDevice.getOrgId()).getName());
  263. order.setStationId(careDevice.getStationId());
  264. order.setStationName(careStationService.getById(careDevice.getStationId()).getName());
  265. order.setHouseId(careHouse.getId());
  266. order.setDevId(careDevice.getId());
  267. order.setDevName(careDevice.getDevName());
  268. order.setHouseName(careHouse.getName());
  269. order.setMemberId(careDevice.getMemberId());
  270. order.setTitle(careHouse.getAddr());
  271. return order;
  272. }
  273. }