|
- package com.care.mqtt.service;
- import cn.hutool.core.date.DateUtil;
- import com.alibaba.fastjson.JSON;
- import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
- import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
- import com.care.bigscreen.service.BigScreenService;
- import com.care.common.entity.*;
- import com.care.common.enums.ChambOrderStatusEnum;
- import com.care.common.enums.MqttMsgStatusEnum;
- import com.care.common.enums.OrderStatusEnum;
- import com.care.common.enums.OrderTypeEnum;
- import com.care.common.service.*;
- import org.apache.commons.lang3.StringUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import java.util.Date;
- import java.util.Map;
- /**
- * @Author: stw
- * @Date: 2021/8/14
- * @Desc:
- */
- @Service
- public class MqttMsgService {
- @Resource
- private CareHouseService careHouseService;
- @Resource
- private CareStationService careStationService;
- @Resource
- private CareDeviceService careDeviceService;
- @Resource
- private CareEventOrderService careEventOrderService;
- @Resource
- private CareOrganizationService careOrganizationService;
- @Resource
- private BigScreenService bigScreenService;
- @Resource
- private CareMqttMsgService careMqttMsgService;
- @Resource
- private CareEventOrderChambService careEventOrderChambService;
- @Resource
- private MqttMsgRedisService mqttMsgRedisService;
- @Resource
- private CareRoomSensorService careRoomSensorService;
- @Resource
- private CareRoomService careRoomService;
- @Resource
- private CareRoomGateService careRoomGateService;
- @Resource
- private CareRoomRegionService careRoomRegionService;
- private static Logger logger = LoggerFactory.getLogger(MqttMsgService.class);
- /**
- * 处理mqtt消息
- * @param topic
- * @param mqttMessageStr
- * @throws Exception
- */
- public void handleMessage(String topic, String mqttMessageStr) throws Exception {
- if(topic.contains("event")){ //工单的消息
- handleOrderEventMessage(topic,mqttMessageStr);
- } else { //房间信息配置的返回消息
- handleConfigureRoomInfoResultMessage(topic,mqttMessageStr);
- }
- }
- /**
- * 查询配置房间信息的结果消息 处理
- * @param topic
- * @param mqttMessageStr
- */
- private void handleConfigureRoomInfoResultMessage(String topic, String mqttMessageStr) {
- Map map = JSON.parseObject(mqttMessageStr);
- String type = (String) map.get("type");
- if(StringUtils.isEmpty(type)) {
- logger.error("不是正确格式的消息");
- return;
- }
- Map msg = (Map) map.get("msg");
- if(msg == null) {
- logger.error("不是正确格式的消息");
- return;
- }
- String ack = (String)msg.get("ack");
- if(StringUtils.isEmpty(ack)) {
- logger.error("不是激活返回消息");
- return;
- }
- logger.info("是激活返回消息: ack == " + ack);
- QueryWrapper<CareDevice> queryWrapper = new QueryWrapper<>();
- queryWrapper.lambda().eq(CareDevice::getTopic,topic.replaceAll("control","event"));
- CareDevice careDevice = careDeviceService.getOne(queryWrapper);
- if(careDevice == null) {
- logger.error("数据库中没找到对应设备");
- return;
- }
- String devCode = careDevice.getDevCode();
- if("CfgAreaAck".equals(type)) { //房间信息
- UpdateWrapper<CareRoom> updateWrapper = new UpdateWrapper<>();
- updateWrapper.lambda().eq(CareRoom::getRoomId,devCode).set(CareRoom::getAck,ack);
- careRoomService.update(updateWrapper);
- } else if("CfgDevicePositionAck".equals(type)) { //传感器
- UpdateWrapper<CareRoomSensor> updateWrapper = new UpdateWrapper<>();
- updateWrapper.lambda().eq(CareRoomSensor::getRoomId,devCode).set(CareRoomSensor::getAck,ack);
- careRoomSensorService.update(updateWrapper);
- } else if("CfgGatesAck".equals(type)) { //门
- UpdateWrapper<CareRoomGate> updateWrapper = new UpdateWrapper<>();
- updateWrapper.lambda().eq(CareRoomGate::getRoomId,devCode).set(CareRoomGate::getAck,ack);
- careRoomGateService.update(updateWrapper);
- } else if("CfgRegionsAck".equals(type)) { //区域
- UpdateWrapper<CareRoomRegion> updateWrapper = new UpdateWrapper<>();
- updateWrapper.lambda().eq(CareRoomRegion::getRoomId,devCode).set(CareRoomRegion::getAck,ack);
- careRoomRegionService.update(updateWrapper);
- }
- }
- /**
- * 工单事件的消息 处理
- * @param topic
- * @param mqttMessageStr
- */
- private void handleOrderEventMessage(String topic, String mqttMessageStr) {
- try {
- Map map = JSON.parseObject(mqttMessageStr);
- String type = (String) map.get("type");
- if(StringUtils.isEmpty(type)){
- logger.error("不是正确格式的消息");
- return;
- }
- QueryWrapper<CareDevice> queryWrapper = new QueryWrapper<>();
- queryWrapper.lambda().eq(CareDevice::getTopic,topic);
- CareDevice careDevice = careDeviceService.getOne(queryWrapper);
- if(careDevice == null) {
- logger.error("数据库中没找到对应设备");
- return;
- }
- Date date = DateUtil.date();
- CareMqttMsg careMqttMsg = new CareMqttMsg();
- careMqttMsg.setMqttMessage(mqttMessageStr);
- careMqttMsg.setTopic(topic);
- careMqttMsg.setDevId(careDevice.getId());
- careMqttMsg.setType(type);
- careMqttMsg.setCreateTime(date);
- if ("BreathHeartRate".equals(type)) {
- careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
- this.careMqttMsgService.save(careMqttMsg);
- Map msg = (Map) map.get("msg");
- Integer breath = (Integer) msg.get("breath");
- Integer heart = (Integer) msg.get("heart");
- careDevice.setBreathRate(breath);
- careDevice.setHeartRate(heart);
- careDeviceService.updateById(careDevice);
- return;
- }
- if( careDevice.getHouseId() == null ) {
- logger.error("设备没有配置房屋地址");
- return;
- }
- CareEventOrder order = getNewOrder(careDevice);
- order.setCreateTime(date);
- order.setModifyTime(date);
- Map msgMap = (Map)map.get("msg");
- Integer conf = (Integer)msgMap.get("conf");
- if(conf != 255) {
- order.setStatus(OrderStatusEnum.TODO.getValue());
- } else {//取消
- order.setStatus(OrderStatusEnum.CANCEL.getValue());
- }
- if ("FallDown".equals(type)) { //跌到
- order.setOrderType(OrderTypeEnum.DI_EDAO.getValue());
- } else if("StayTooLong".equals(type)){
- order.setOrderType(OrderTypeEnum.JIU_ZHI.getValue());
- }
- //判断房屋是否有未完成的告警事件,不区分类型
- QueryWrapper<CareEventOrder> queryWrapper2 = new QueryWrapper<>();
- queryWrapper2.lambda().eq(CareEventOrder::getHouseId,careDevice.getHouseId()).and(
- wrapper-> wrapper.eq(CareEventOrder::getStatus, OrderStatusEnum.TODO.getValue())
- .or().eq(CareEventOrder::getStatus,OrderStatusEnum.DOING.getValue()));
- CareEventOrder orderDb = careEventOrderService.getOne(queryWrapper2);
- if(orderDb != null) { //有未完成的告警事件,不再生成新的告警工单,只作为子事件插入到当前工单的历史记录中。
- if(conf != 255) {
- careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
- this.careMqttMsgService.save(careMqttMsg);
- careEventOrderService.saveHisOrder(order,orderDb);
- //通知页面
- this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate");
- this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
- } else { //取消
- careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
- this.careMqttMsgService.save(careMqttMsg);
- if(orderDb.getOrderType().equals(order.getOrderType())){//看取消的类型,和当前一致,则 更新当前的工单状态为取消,记录一条取消his,管家工单标记为取消
- //更新当前的工单状态为取消
- orderDb.setStatus(OrderStatusEnum.CANCEL.getValue());
- this.careEventOrderService.updateById(orderDb);
- //管家工单标记为取消
- CareEventOrderChamb careEventOrderChamb = this.careEventOrderChambService.getChambOrderByOrderId(orderDb.getId());
- careEventOrderChamb.setStatus(ChambOrderStatusEnum.CANCEL.getValue());
- this.careEventOrderChambService.updateById(careEventOrderChamb);
- //记录一条取消his
- careEventOrderService.saveHisOrder(order,orderDb);
- //通知页面
- this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderUpdate");
- this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
- } else { // 不一致,则只记录一条取消his
- //记录一条取消his
- careEventOrderService.saveHisOrder(order,orderDb);
- }
- }
- } else { //无未完成的告警事件
- Integer delayTime = getDelayTime(type,careDevice);
- if (delayTime == null || delayTime == 0){ //没有配置响应时间,立即生成新的告警事件工单
- if(conf != 255) {
- careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
- this.careMqttMsgService.save(careMqttMsg);
- //生成新的工单
- careEventOrderService.saveOrder(order);
- //通知页面
- this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"orderAdd");
- this.bigScreenService.pushRtEventFlag(order.getStationId().toString(),order.getId().toString(),"bigscreenRefresh");
- } else { //取消 不处理工单
- careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
- this.careMqttMsgService.save(careMqttMsg);
- }
- } else { //有配置响应时间,不处理工单,进行调度处理
- if(conf != 255) { //不是取消
- careMqttMsg.setStatus(MqttMsgStatusEnum.NO_HANDLE.getValue());
- careMqttMsg.setDelayTime(delayTime);
- this.careMqttMsgService.save(careMqttMsg);
- //进行调度处理
- mqttMsgRedisService.addRedis(careMqttMsg,date.getTime() + careMqttMsg.getDelayTime() * 60 * 1000);
- } else { //取消: 更新mqtt消息为已取消,等待执行的调度到时会取消执行
- careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
- this.careMqttMsgService.save(careMqttMsg);
- //更新mqtt消息为已取消
- UpdateWrapper<CareMqttMsg> updateWrapper = new UpdateWrapper<>();
- updateWrapper.lambda().eq(CareMqttMsg::getDevId,careDevice.getId()).eq(CareMqttMsg::getType,type).eq(CareMqttMsg::getStatus,MqttMsgStatusEnum.NO_HANDLE.getValue())
- .set(CareMqttMsg::getStatus,MqttMsgStatusEnum.CANCEL.getValue());
- this.careMqttMsgService.update(updateWrapper);
- }
- }
- }
- } catch (Exception e){
- e.printStackTrace();
- }
- }
- private Integer getDelayTime(String type, CareDevice careDevice) {
- if ("FallDown".equals(type)) { //跌到
- return careDevice.getFallResponseTime();
- } else if ("StayTooLong".equals(type)) { //久滞
- return careDevice.getLonglagResponseTime();
- } else {
- return 0;
- }
- }
- public CareEventOrder getNewOrder(CareDevice careDevice) {
- CareHouse careHouse = careHouseService.getById(careDevice.getHouseId());
- if (careHouse == null){
- logger.error("设备没有配置房屋地址");
- return null;
- }
- CareEventOrder order = new CareEventOrder();
- order.setOrgId(careDevice.getOrgId());
- order.setOrgName(careOrganizationService.getById(careDevice.getOrgId()).getName());
- order.setStationId(careDevice.getStationId());
- order.setStationName(careStationService.getById(careDevice.getStationId()).getName());
- order.setHouseId(careHouse.getId());
- order.setDevId(careDevice.getId());
- order.setDevName(careDevice.getDevName());
- order.setHouseName(careHouse.getName());
- order.setMemberId(careDevice.getMemberId());
- order.setTitle(careHouse.getAddr());
- return order;
- }
- }
|