package com.care.mqtt.service; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.care.bigscreen.service.BigScreenService; import com.care.common.cache.RedisKeyConstant; import com.care.common.cache.RedisUtil; import com.care.common.entity.*; import com.care.common.enums.*; import com.care.common.service.*; import com.care.common.vo.device.MqttMsgVO; import io.swagger.models.auth.In; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.math.BigDecimal; import java.util.Date; import java.util.List; import java.util.Map; import java.util.UUID; /** * @Author: stw * @Date: 2021/8/14 * @Desc: */ @Service public class MqttMsgService { @Resource private CareHouseService careHouseService; @Resource private CareStationService careStationService; @Resource private CareDeviceService careDeviceService; @Resource private CareEventOrderService careEventOrderService; @Resource private CareOrganizationService careOrganizationService; @Resource private BigScreenService bigScreenService; @Resource private CareMqttMsgService careMqttMsgService; @Resource private MqttMsgRedisService mqttMsgRedisService; @Resource private RedisUtil redisUtil; @Resource private CareToiletInoutDetailService careToiletInoutDetailService; @Resource private CareBedroomInbedDetailService careBedroomInbedDetailService; @Resource private CareWalkingDistanceDetailService careWalkingDistanceDetailService; private static Logger logger = LoggerFactory.getLogger(MqttMsgService.class); /** * 处理mqtt消息 * @param topic * @param mqttMessageStr * @throws Exception */ public void handleMessage(String topic, String mqttMessageStr) throws Exception { handleOrderEventMessage(topic,mqttMessageStr); } /** * 工单事件的消息 处理 * @param topic * @param mqttMessageStr */ private void handleOrderEventMessage(String topic, String mqttMessageStr) { try { Map map = JSON.parseObject(mqttMessageStr); String type = (String) map.get("type"); if(StringUtils.isEmpty(type)){ logger.error("不是正确格式的消息"); return; } //处理心跳,放入redis if ("HeartBeat".equals(type)) { logger.info("{} 接收到MQTT HeartBeat消息,{}",topic,mqttMessageStr); redisUtil.hset(RedisKeyConstant.MQTT_DEV_HEARBEAT_INFO, topic, DateUtil.current(), RedisKeyConstant.MQTT_DEV_HEARBEAT_INFO_TIME); return; } if(!"BreathHeartRate".equals(type) && !"FallDown".equals(type) && !"StayTooLong".equals(type) && !"OptInOut".equals(type) && !"PeopleEvent".equals(type)){ // logger.error("不是要处理的消息"); return; } logger.info("{} 接收到MQTT 事件消息,{}",topic,mqttMessageStr); CareDevice careDevice = getCareDevice(topic); if(careDevice == null) { logger.error("数据库中没找到对应设备"); return; } Date date = DateUtil.date(); CareMqttMsg careMqttMsg = new CareMqttMsg(); careMqttMsg.setMqttMessage(mqttMessageStr); careMqttMsg.setTopic(topic); careMqttMsg.setDevId(careDevice.getId()); careMqttMsg.setType(type); careMqttMsg.setCreateTime(date); Map msg = (Map) map.get("msg"); Integer tid = (Integer) msg.get("tid"); careMqttMsg.setTid(tid); if ("BreathHeartRate".equals(type)) { careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue()); this.careMqttMsgService.save(careMqttMsg); Integer breath = (Integer) msg.get("breath"); Integer heart = (Integer) msg.get("heart"); if (breath !=careDevice.getBreathRate() || heart !=careDevice.getHeartRate() ){ careDevice.setBreathRate(breath); careDevice.setHeartRate(heart); careDeviceService.updateById(careDevice); } return; } if (careDevice.getHouseId() == null) { // logger.error("设备没有配置房屋地址"); return; } //活动信息 if ("PeopleEvent".equals(type)) { careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue()); this.careMqttMsgService.save(careMqttMsg); handlePeopleEvent(msg,careDevice,date); return; } //进出事件 if ("OptInOut".equals(type)) { careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue()); this.careMqttMsgService.save(careMqttMsg); handleOptInOut(msg,careDevice,date); return; } handleOrderEvent(map,type,careDevice,date,careMqttMsg); } catch (Exception e){ e.printStackTrace(); } } private void handlePeopleEvent(Map msg,CareDevice careDevice,Date date){ Integer amount = (Integer) msg.get("amount"); List targets = (List) msg.get("targets"); String moveState = DeviceMoveStateEnum.STATIC.getValue(); if(amount == 0){ //无人 moveState = DeviceMoveStateEnum.NONE.getValue(); } else { //有人,静止或活动 for(Map target : targets) { Integer tid = (Integer) target.get("tid"); Integer cls = (Integer) target.get("cls"); double x = Double.parseDouble(String.valueOf(target.get("x"))) ; double y = Double.parseDouble(String.valueOf(target.get("y"))) ; if (cls == 1) { //是人 //先查是否存在 Integer lastIndex = (Integer)redisUtil.hget(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_LAST_INDEX,careDevice.getDevCode() + "_" + tid); logger.warn("lastIndex:{}",lastIndex); if(lastIndex == null) {//之前没有存储redis,则存储第一个 redisUtil.hset(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO, 1 + "@" + careDevice.getDevCode() + "_" + tid , System.currentTimeMillis() + "_" + x + "_" + y , RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_TIME); redisUtil.hset(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_LAST_INDEX, careDevice.getDevCode() + "_" + tid , 1 , RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_ACTIVE_LAST_INDEX_TIME); } else{ //计算当前和最近之间时间,若超过1秒,删除之前的存储的,重新计数存储当前的 String lastContent = (String)redisUtil.hget(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO,lastIndex + "@" + careDevice.getDevCode() + "_" + tid ); logger.warn("lastIndex:{},lastContent:{}",lastIndex,lastContent); long duration = (date.getTime() - Long.parseLong(lastContent.split("_")[0])); logger.warn("lastIndex:{},duration:{}",lastIndex,duration); if (duration > 3000) { //超过1秒,删除之前的存储的,从头开始 logger.warn("超过3000,lastIndex:{},duration:{}",lastIndex,duration); delBeforeContent(lastIndex,careDevice.getDevCode(),tid); redisUtil.hset(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO, 1 + "@" + careDevice.getDevCode() + "_" + tid , System.currentTimeMillis() + "_" + x + "_" + y, RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_TIME); redisUtil.hset(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_LAST_INDEX, careDevice.getDevCode() + "_" + tid , 1 , RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_ACTIVE_LAST_INDEX_TIME); } else { //存储以供后面计算 redisUtil.hset(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO, (lastIndex + 1) + "@" + careDevice.getDevCode() + "_" + tid , System.currentTimeMillis() + "_" + x + "_" + y, RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_TIME); redisUtil.hset(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_LAST_INDEX, careDevice.getDevCode() + "_" + tid , lastIndex + 1 , RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_ACTIVE_LAST_INDEX_TIME); if(lastIndex >= 9) { int currIndex = lastIndex + 1;//当前是第10个或以上 开始计算 //求前5个点平均坐标和后5个点的平均坐标,二者之间的距离即结果 double distance = getDistance(currIndex,careDevice.getDevCode(),tid); logger.warn("计算结果:lastIndex:{},duration:{},distance{}",lastIndex,duration,distance); if(distance > 0.3){ moveState = DeviceMoveStateEnum.MOVE.getValue(); //存储行走距离明细表 CareWalkingDistanceDetail careWalkingDistanceDetail = new CareWalkingDistanceDetail(); careWalkingDistanceDetail.setDevId(careDevice.getId()); careWalkingDistanceDetail.setTid(tid); careWalkingDistanceDetail.setDevScene(careDevice.getDevScene()); careWalkingDistanceDetail.setWalkingDistance(distance); //careWalkingDistanceDetail.setWalkingDuration(duration); 时间计算暂时不要 careWalkingDistanceDetail.setCreateTime(date); careWalkingDistanceDetail.setModifyTime(date); careWalkingDistanceDetailService.save(careWalkingDistanceDetail); } } } } } } } if(!moveState.equals(careDevice.getMoveStatus())){ //更新数据库dev careDevice.setMoveStatus(moveState); careDeviceService.updateById(careDevice); } } private void delBeforeContent(int lastIndex,String devCode,Integer tid) { for(int i = lastIndex; i >= 1; i--) { redisUtil.hset(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO, i + "@" + devCode + "_" + tid, null, RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_TIME); } } private double getDistance(int currIndex, String devCode,Integer tid) { double totalX1 = 0; double totalY1 = 0; for(int i = currIndex - 9; i <= currIndex - 5; i++) { String content = (String)redisUtil.hget(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO,i + "@" + devCode + "_" + tid ); double x = Double.parseDouble(String.valueOf(content.split("_")[1])); totalX1 += x; double y = Double.parseDouble(String.valueOf(content.split("_")[2])); totalY1 += y; } double avgX1 = totalX1 / 5; double avgY1 = totalY1 / 5; double totalX2 = 0; double totalY2 = 0; for(int i = currIndex - 4; i <= currIndex; i++) { String content = (String)redisUtil.hget(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO,i + "@" + devCode + "_" + tid ); double x = Double.parseDouble(String.valueOf(content.split("_")[1])); totalX2 += x; double y = Double.parseDouble(String.valueOf(content.split("_")[2])); totalY2 += y; } double avgX2 = totalX2 / 5; double avgY2 = totalY2 / 5; double distance = getDistance(avgX1,avgY1,avgX2,avgY2); return distance; } private double getDistance(double x1, double y1,double x2, double y2) { double distance = Math.sqrt(Math.abs((x1 - x2)*(x1 - x2))+Math.abs((y1 - y2)*(y1 - y2))); return distance; } private void handleOptInOut(Map msg,CareDevice careDevice,Date date){ Integer event = (Integer) msg.get("event"); //1 进事件,2 出事件 Integer type = (Integer) msg.get("type"); Integer subType = (Integer) msg.get("subType"); Integer tid = (Integer) msg.get("tid"); if(DeviceSceneEnum.TOILET.getValue().equals(careDevice.getDevScene()) && type == 2 ){ //type字段的2代表进出事件,卫生间 if(subType == 1) { //subtype字段的1代表门 if(event == 1) { //进事件,先存起来 redisUtil.hset(RedisKeyConstant.OPTINOUT_TOILET_INFO, careDevice.getDevCode()+ "_" + tid, System.currentTimeMillis(), RedisKeyConstant.OPTINOUT_TOILET_INFO_TIME); } else {//出事件, //先查是否存在进事件 Long inTime = (Long)redisUtil.hget(RedisKeyConstant.OPTINOUT_TOILET_INFO,careDevice.getDevCode()+ "_" + tid); if(inTime != null) { //存在进事件 CareToiletInoutDetail careToiletInoutDetail = new CareToiletInoutDetail(); careToiletInoutDetail.setDevId(careDevice.getId()); careToiletInoutDetail.setTid(tid); careToiletInoutDetail.setInTime(new Date(inTime)); careToiletInoutDetail.setOutTime(date); careToiletInoutDetail.setDuration((date.getTime() - inTime) / 1000); careToiletInoutDetail.setCreateTime(date); careToiletInoutDetail.setModifyTime(date); careToiletInoutDetailService.save(careToiletInoutDetail); redisUtil.hset(RedisKeyConstant.OPTINOUT_TOILET_INFO, careDevice.getDevCode()+ "_" + tid, null, RedisKeyConstant.OPTINOUT_TOILET_INFO_TIME); } else { //不存在进事件 //do nothing } } } } else if(DeviceSceneEnum.BEDROOM.getValue().equals(careDevice.getDevScene()) && type == 1 ) { //type字段的1代表上下床事件,卧室 if(subType == 1) { //subtype字段的1代表床 if(event == 1) { //上床事件,先存起来 redisUtil.hset(RedisKeyConstant.OPTINOUT_BEDROOM_INFO, careDevice.getDevCode()+ "_" + tid, System.currentTimeMillis(), RedisKeyConstant.OPTINOUT_BEDROOM_INFO_TIME); if (!DeviceInbedStateEnum.INBED.getValue().equals(careDevice.getInbedStatus())){ //更新数据库dev careDevice.setInbedStatus(DeviceInbedStateEnum.INBED.getValue()); careDeviceService.updateById(careDevice); } } else {//下床事件, //先查是否存在上床事件 Long inTime = (Long)redisUtil.hget(RedisKeyConstant.OPTINOUT_BEDROOM_INFO,careDevice.getDevCode()+ "_" + tid); if(inTime != null) { //存在上床事件 CareBedroomInbedDetail careBedroomInbedDetail = new CareBedroomInbedDetail(); careBedroomInbedDetail.setDevId(careDevice.getId()); careBedroomInbedDetail.setTid(tid); careBedroomInbedDetail.setInTime(new Date(inTime)); careBedroomInbedDetail.setOutTime(date); careBedroomInbedDetail.setDuration((date.getTime() - inTime) / 1000); careBedroomInbedDetail.setCreateTime(date); careBedroomInbedDetail.setModifyTime(date); careBedroomInbedDetailService.save(careBedroomInbedDetail); redisUtil.hset(RedisKeyConstant.OPTINOUT_BEDROOM_INFO, careDevice.getDevCode()+ "_" + tid, null, RedisKeyConstant.OPTINOUT_BEDROOM_INFO_TIME); } else { //不存在上床事件 //do nothing } if (!DeviceInbedStateEnum.OUTBED.getValue().equals(careDevice.getInbedStatus())) { //更新数据库dev careDevice.setInbedStatus(DeviceInbedStateEnum.OUTBED.getValue()); careDeviceService.updateById(careDevice); } } } } } private void handleOrderEvent(Map map,String type,CareDevice careDevice,Date date, CareMqttMsg careMqttMsg){ String key = RedisKeyConstant.CREATE_ORDER + ":" + careDevice.getId(); String requestId = UUID.randomUUID().toString(); boolean result = redisUtil.tryLock(key,requestId,10); try { if (result) { CareEventOrder order = getNewOrder(careDevice); if (order == null) { //logger.error("设备没有配置房屋地址"); return; } order.setCreateTime(date); order.setModifyTime(date); Map msgMap = (Map) map.get("msg"); Integer conf = (Integer) msgMap.get("conf"); if (conf != 255) { order.setStatus(OrderStatusEnum.TODO.getValue()); } else {//取消 order.setStatus(OrderStatusEnum.CANCEL.getValue()); } if ("FallDown".equals(type)) { //跌到 order.setOrderType(OrderTypeEnum.DIE_DAO.getValue()); } else if ("StayTooLong".equals(type)) { order.setOrderType(OrderTypeEnum.JIU_ZHI.getValue()); } order.setTid(careMqttMsg.getTid()); //判断房屋是否有未完成的告警事件,区分类型和人 QueryWrapper queryWrapper2 = new QueryWrapper<>(); queryWrapper2.lambda().eq(CareEventOrder::getDevId, careDevice.getId()) .eq(CareEventOrder::getOrderType,order.getOrderType()) .eq(CareEventOrder::getOrgId,careDevice.getOrgId()) .eq(CareEventOrder::getStationId,careDevice.getStationId()) .eq(CareEventOrder::getTid,order.getTid()) .and( wrapper -> wrapper.eq(CareEventOrder::getStatus, OrderStatusEnum.TODO.getValue()) .or().eq(CareEventOrder::getStatus, OrderStatusEnum.DOING.getValue()) ); CareEventOrder orderDb = careEventOrderService.getOne(queryWrapper2); if (orderDb != null) { //有未完成的告警事件,不再生成新的告警工单,只作为子事件插入到当前工单的历史记录中。 if (conf != 255) { careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue()); this.careMqttMsgService.save(careMqttMsg); careEventOrderService.saveHisOrder(order, orderDb,false); //通知页面 this.bigScreenService.pushRtEventFlag(order.getOrgId(),order.getStationId(), order.getId().toString(), "orderUpdate"); this.bigScreenService.pushRtEventFlag(order.getOrgId(),order.getStationId(), order.getId().toString(), "bigscreenRefresh"); } else { //取消 careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue()); this.careMqttMsgService.save(careMqttMsg); //更新当前的工单状态为取消 order.setId(orderDb.getId()); this.careEventOrderService.autoCancelOrder(order); //通知页面 this.bigScreenService.pushRtEventFlag(order.getOrgId(),order.getStationId(), order.getId().toString(), "orderUpdate"); this.bigScreenService.pushRtEventFlag(order.getOrgId(),order.getStationId(), order.getId().toString(), "bigscreenRefresh"); } } else { //无未完成的告警事件 Integer delayTime = getDelayTime(type, careDevice); if (delayTime == null || delayTime == 0) { //没有配置响应时间,立即生成新的告警事件工单 if (conf != 255) { careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue()); this.careMqttMsgService.save(careMqttMsg); //生成新的工单 this.careEventOrderService.saveOrder(order); //通知页面 this.bigScreenService.pushRtEventFlag(order.getOrgId(),order.getStationId(), order.getId().toString(), "orderAdd"); this.bigScreenService.pushRtEventFlag(order.getOrgId(),order.getStationId(), order.getId().toString(), "bigscreenRefresh"); } else { //取消 不处理工单 careMqttMsg.setStatus(MqttMsgStatusEnum.CANCEL.getValue()); this.careMqttMsgService.save(careMqttMsg); } } else { //有配置响应时间,不处理工单,进行调度处理 if (conf != 255) { //不是取消 careMqttMsg.setStatus(MqttMsgStatusEnum.NO_HANDLE.getValue()); careMqttMsg.setDelayTime(delayTime); this.careMqttMsgService.save(careMqttMsg); //进行调度处理 MqttMsgVO mqttMsgVO = new MqttMsgVO(); BeanUtil.copyProperties(careMqttMsg, mqttMsgVO); mqttMsgRedisService.addRedis(mqttMsgVO, date.getTime() + careMqttMsg.getDelayTime() * 60 * 1000); } else { //取消: 更新mqtt消息为已取消,等待执行的调度到时会取消执行 careMqttMsg.setStatus(MqttMsgStatusEnum.CANCEL.getValue()); this.careMqttMsgService.save(careMqttMsg); //更新mqtt消息为已取消 UpdateWrapper updateWrapper = new UpdateWrapper<>(); updateWrapper.lambda().eq(CareMqttMsg::getDevId, careDevice.getId()).eq(CareMqttMsg::getType, type).eq(CareMqttMsg::getStatus, MqttMsgStatusEnum.NO_HANDLE.getValue()) .set(CareMqttMsg::getStatus, MqttMsgStatusEnum.CANCEL.getValue()); this.careMqttMsgService.update(updateWrapper); } } } } else { try { Thread.sleep(3000); handleOrderEvent(map,type,careDevice,date,careMqttMsg); } catch (Exception e){ e.printStackTrace(); } } } finally { redisUtil.releaseLock(key,requestId); } } private CareDevice getCareDevice(String topic){ QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.lambda().eq(CareDevice::getTopic,topic); CareDevice careDevice = careDeviceService.getOne(queryWrapper); return careDevice; } private Integer getDelayTime(String type, CareDevice careDevice) { if ("FallDown".equals(type)) { //跌到 return careDevice.getFallResponseTime(); } else if ("StayTooLong".equals(type)) { //久滞 return careDevice.getLonglagResponseTime(); } else { return 0; } } public CareEventOrder getNewOrder(CareDevice careDevice) { CareHouse careHouse = careHouseService.getById(careDevice.getHouseId()); if (careHouse == null){ // logger.error("设备没有配置房屋地址"); return null; } CareEventOrder order = new CareEventOrder(); order.setOrgId(careDevice.getOrgId()); order.setOrgName(careOrganizationService.getById(careDevice.getOrgId()).getName()); order.setStationId(careDevice.getStationId()); order.setStationName(careStationService.getById(careDevice.getStationId()).getName()); order.setHouseId(careHouse.getId()); order.setDevId(careDevice.getId()); order.setDevCode(careDevice.getDevCode()); order.setDevName(careDevice.getDevName()); order.setHouseName(careHouse.getName()); order.setMemberId(careDevice.getMemberId()); if(careHouse.getHouseNumber() != null ){ order.setTitle(careHouse.getAddr() + careHouse.getHouseNumber()); } else { order.setTitle(careHouse.getAddr()); } return order; } }