package com.care.mqtt.service; import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.care.bigscreen.service.BigScreenService; import com.care.common.entity.*; import com.care.common.enums.ChambOrderStatusEnum; import com.care.common.enums.MqttMsgStatusEnum; import com.care.common.enums.OrderStatusEnum; import com.care.common.enums.OrderTypeEnum; import com.care.common.service.*; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Date; import java.util.Map; /** * @Author: stw * @Date: 2021/8/14 * @Desc: */ @Service public class MqttMsgService { @Resource private CareHouseService careHouseService; @Resource private CareStationService careStationService; @Resource private CareDeviceService careDeviceService; @Resource private CareEventOrderService careEventOrderService; @Resource private CareOrganizationService careOrganizationService; @Resource private BigScreenService bigScreenService; @Resource private CareMqttMsgService careMqttMsgService; @Resource private CareEventOrderChambService careEventOrderChambService; @Resource private MqttMsgRedisService mqttMsgRedisService; @Resource private CareRoomSensorService careRoomSensorService; @Resource private CareRoomService careRoomService; @Resource private CareRoomGateService careRoomGateService; @Resource private CareRoomRegionService careRoomRegionService; private static Logger logger = LoggerFactory.getLogger(MqttMsgService.class); /** * 处理mqtt消息 * @param topic * @param mqttMessageStr * @throws Exception */ public void handleMessage(String topic, String mqttMessageStr) throws Exception { if(topic.contains("event")){ //工单的消息 handleOrderEventMessage(topic,mqttMessageStr); } else { //房间信息配置的返回消息 handleConfigureRoomInfoResultMessage(topic,mqttMessageStr); } } /** * 查询配置房间信息的结果消息 处理 * @param topic * @param mqttMessageStr */ private void handleConfigureRoomInfoResultMessage(String topic, String mqttMessageStr) { Map map = JSON.parseObject(mqttMessageStr); String type = (String) map.get("type"); if(StringUtils.isEmpty(type)) { logger.error("不是正确格式的消息"); return; } Map msg = (Map) map.get("msg"); if(msg == null) { logger.error("不是正确格式的消息"); return; } String ack = (String)msg.get("ack"); if(StringUtils.isEmpty(ack)) { logger.error("不是激活返回消息"); return; } logger.info("是激活返回消息: ack == " + ack); QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.lambda().eq(CareDevice::getTopic,topic.replaceAll("control","event")); CareDevice careDevice = careDeviceService.getOne(queryWrapper); if(careDevice == null) { logger.error("数据库中没找到对应设备"); return; } String devCode = careDevice.getDevCode(); if("CfgAreaAck".equals(type)) { //房间信息 UpdateWrapper updateWrapper = new UpdateWrapper<>(); updateWrapper.lambda().eq(CareRoom::getRoomId,devCode).set(CareRoom::getAck,ack); careRoomService.update(updateWrapper); } else if("CfgDevicePositionAck".equals(type)) { //传感器 UpdateWrapper updateWrapper = new UpdateWrapper<>(); updateWrapper.lambda().eq(CareRoomSensor::getRoomId,devCode).set(CareRoomSensor::getAck,ack); careRoomSensorService.update(updateWrapper); } else if("CfgGatesAck".equals(type)) { //门 UpdateWrapper updateWrapper = new UpdateWrapper<>(); updateWrapper.lambda().eq(CareRoomGate::getRoomId,devCode).set(CareRoomGate::getAck,ack); careRoomGateService.update(updateWrapper); } else if("CfgRegionsAck".equals(type)) { //区域 UpdateWrapper updateWrapper = new UpdateWrapper<>(); updateWrapper.lambda().eq(CareRoomRegion::getRoomId,devCode).set(CareRoomRegion::getAck,ack); careRoomRegionService.update(updateWrapper); } } /** * 工单事件的消息 处理 * @param topic * @param mqttMessageStr */ private void handleOrderEventMessage(String topic, String mqttMessageStr) { try { Map map = JSON.parseObject(mqttMessageStr); String type = (String) map.get("type"); if(StringUtils.isEmpty(type)){ logger.error("不是正确格式的消息"); return; } QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.lambda().eq(CareDevice::getTopic,topic); CareDevice careDevice = careDeviceService.getOne(queryWrapper); if(careDevice == null) { logger.error("数据库中没找到对应设备"); return; } Date date = DateUtil.date(); CareMqttMsg careMqttMsg = new CareMqttMsg(); careMqttMsg.setMqttMessage(mqttMessageStr); careMqttMsg.setTopic(topic); careMqttMsg.setDevId(careDevice.getId()); careMqttMsg.setType(type); careMqttMsg.setCreateTime(date); if ("BreathHeartRate".equals(type)) { careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue()); this.careMqttMsgService.save(careMqttMsg); Map msg = (Map) map.get("msg"); Integer breath = (Integer) msg.get("breath"); Integer heart = (Integer) msg.get("heart"); careDevice.setBreathRate(breath); careDevice.setHeartRate(heart); careDeviceService.updateById(careDevice); return; } if( careDevice.getHouseId() == null ) { logger.error("设备没有配置房屋地址"); return; } CareEventOrder order = getNewOrder(careDevice); order.setCreateTime(date); order.setModifyTime(date); Map msgMap = (Map)map.get("msg"); Integer conf = (Integer)msgMap.get("conf"); if(conf != 255) { order.setStatus(OrderStatusEnum.TODO.getValue()); } else {//取消 order.setStatus(OrderStatusEnum.CANCEL.getValue()); } if ("FallDown".equals(type)) { //跌到 order.setOrderType(OrderTypeEnum.DI_EDAO.getValue()); } else if("StayTooLong".equals(type)){ order.setOrderType(OrderTypeEnum.JIU_ZHI.getValue()); } //判断房屋是否有未完成的告警事件,不区分类型 QueryWrapper queryWrapper2 = new QueryWrapper<>(); queryWrapper2.lambda().eq(CareEventOrder::getHouseId,careDevice.getHouseId()).and( wrapper-> wrapper.eq(CareEventOrder::getStatus, OrderStatusEnum.TODO.getValue()) .or().eq(CareEventOrder::getStatus,OrderStatusEnum.DOING.getValue())); CareEventOrder orderDb = careEventOrderService.getOne(queryWrapper2); if(orderDb != null) { //有未完成的告警事件,不再生成新的告警工单,只作为子事件插入到当前工单的历史记录中。 if(conf != 255) { careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue()); this.careMqttMsgService.save(careMqttMsg); careEventOrderService.saveHisOrder(order,orderDb); //通知页面 this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate"); this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh"); } else { //取消 careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue()); this.careMqttMsgService.save(careMqttMsg); if(orderDb.getOrderType().equals(order.getOrderType())){//看取消的类型,和当前一致,则 更新当前的工单状态为取消,记录一条取消his,管家工单标记为取消 //更新当前的工单状态为取消 orderDb.setStatus(OrderStatusEnum.CANCEL.getValue()); this.careEventOrderService.updateById(orderDb); //管家工单标记为取消 CareEventOrderChamb careEventOrderChamb = this.careEventOrderChambService.getChambOrderByOrderId(orderDb.getId()); careEventOrderChamb.setStatus(ChambOrderStatusEnum.CANCEL.getValue()); this.careEventOrderChambService.updateById(careEventOrderChamb); //记录一条取消his careEventOrderService.saveHisOrder(order,orderDb); //通知页面 this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate"); this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh"); } else { // 不一致,则只记录一条取消his //记录一条取消his careEventOrderService.saveHisOrder(order,orderDb); } } } else { //无未完成的告警事件 Integer delayTime = getDelayTime(type,careDevice); if (delayTime == null || delayTime == 0){ //没有配置响应时间,立即生成新的告警事件工单 if(conf != 255) { careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue()); this.careMqttMsgService.save(careMqttMsg); //生成新的工单 careEventOrderService.saveOrder(order); //通知页面 this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderAdd"); this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh"); } else { //取消 不处理工单 careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue()); this.careMqttMsgService.save(careMqttMsg); } } else { //有配置响应时间,不处理工单,进行调度处理 if(conf != 255) { //不是取消 careMqttMsg.setStatus(MqttMsgStatusEnum.NO_HANDLE.getValue()); careMqttMsg.setDelayTime(delayTime); this.careMqttMsgService.save(careMqttMsg); //进行调度处理 mqttMsgRedisService.addRedis(careMqttMsg,date.getTime() + careMqttMsg.getDelayTime() * 60 * 1000); } else { //取消: 更新mqtt消息为已取消,等待执行的调度到时会取消执行 careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue()); this.careMqttMsgService.save(careMqttMsg); //更新mqtt消息为已取消 UpdateWrapper updateWrapper = new UpdateWrapper<>(); updateWrapper.lambda().eq(CareMqttMsg::getDevId,careDevice.getId()).eq(CareMqttMsg::getType,type).eq(CareMqttMsg::getStatus,MqttMsgStatusEnum.NO_HANDLE.getValue()) .set(CareMqttMsg::getStatus,MqttMsgStatusEnum.CANCEL.getValue()); this.careMqttMsgService.update(updateWrapper); } } } } catch (Exception e){ e.printStackTrace(); } } private Integer getDelayTime(String type, CareDevice careDevice) { if ("FallDown".equals(type)) { //跌到 return careDevice.getFallResponseTime(); } else if ("StayTooLong".equals(type)) { //久滞 return careDevice.getLonglagResponseTime(); } else { return 0; } } public CareEventOrder getNewOrder(CareDevice careDevice) { CareHouse careHouse = careHouseService.getById(careDevice.getHouseId()); if (careHouse == null){ logger.error("设备没有配置房屋地址"); return null; } CareEventOrder order = new CareEventOrder(); order.setOrgId(careDevice.getOrgId()); order.setOrgName(careOrganizationService.getById(careDevice.getOrgId()).getName()); order.setStationId(careDevice.getStationId()); order.setStationName(careStationService.getById(careDevice.getStationId()).getName()); order.setHouseId(careHouse.getId()); order.setDevId(careDevice.getId()); order.setDevName(careDevice.getDevName()); order.setHouseName(careHouse.getName()); order.setMemberId(careDevice.getMemberId()); order.setTitle(careHouse.getAddr()); return order; } }