|
@@ -1,4 +1,4 @@
|
|
|
-package com.care.bms.service;
|
|
|
+package com.care.mqtt.service;
|
|
|
|
|
|
|
|
|
import cn.hutool.core.date.DateUtil;
|
|
@@ -48,7 +48,15 @@ public class MqttMsgService {
|
|
|
@Resource
|
|
|
private CareEventOrderChambService careEventOrderChambService;
|
|
|
@Resource
|
|
|
- private MqttThreadPoolInitService MqttThreadPoolInitService;
|
|
|
+ private MqttThreadPoolService MqttThreadPoolService;
|
|
|
+ @Resource
|
|
|
+ private CareRoomSensorService careRoomSensorService;
|
|
|
+ @Resource
|
|
|
+ private CareRoomService careRoomService;
|
|
|
+ @Resource
|
|
|
+ private CareRoomGateService careRoomGateService;
|
|
|
+ @Resource
|
|
|
+ private CareRoomRegionService careRoomRegionService;
|
|
|
|
|
|
private static Logger logger = LoggerFactory.getLogger(MqttMsgService.class);
|
|
|
|
|
@@ -59,6 +67,64 @@ public class MqttMsgService {
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
public void handleMessage(String topic, String mqttMessageStr) throws Exception {
|
|
|
+ if(topic.contains("event")){ //工单的消息
|
|
|
+ handleOrderEventMessage(topic,mqttMessageStr);
|
|
|
+ } else { //房间信息配置的返回消息
|
|
|
+ handleConfigureRoomInfoResultMessage(topic,mqttMessageStr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 查询配置房间信息的结果消息 处理
|
|
|
+ * @param topic
|
|
|
+ * @param mqttMessageStr
|
|
|
+ */
|
|
|
+ private void handleConfigureRoomInfoResultMessage(String topic, String mqttMessageStr) {
|
|
|
+ Map map = JSON.parseObject(mqttMessageStr);
|
|
|
+ String type = (String) map.get("type");
|
|
|
+
|
|
|
+ if(StringUtils.isEmpty(type)) {
|
|
|
+ logger.error("不是正确格式的消息");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ QueryWrapper<CareDevice> queryWrapper = new QueryWrapper<>();
|
|
|
+ queryWrapper.lambda().eq(CareDevice::getTopic,topic.replaceAll("event","control"));
|
|
|
+ CareDevice careDevice = careDeviceService.getOne(queryWrapper);
|
|
|
+ if(careDevice == null) {
|
|
|
+ logger.error("数据库中没找到对应设备");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String devCode = careDevice.getDevCode();
|
|
|
+
|
|
|
+ Map msg = (Map) map.get("msg");
|
|
|
+ String ack = String.valueOf(msg.get("ack"));
|
|
|
+
|
|
|
+ if("CfgAreaAck".equals(type)) { //房间信息
|
|
|
+ UpdateWrapper<CareRoom> updateWrapper = new UpdateWrapper<>();
|
|
|
+ updateWrapper.lambda().eq(CareRoom::getRoomId,devCode).set(CareRoom::getAck,ack);
|
|
|
+ careRoomService.update(updateWrapper);
|
|
|
+ } else if("CfgDevicePositionAck".equals(type)) { //传感器
|
|
|
+ UpdateWrapper<CareRoomSensor> updateWrapper = new UpdateWrapper<>();
|
|
|
+ updateWrapper.lambda().eq(CareRoomSensor::getRoomId,devCode).set(CareRoomSensor::getAck,ack);
|
|
|
+ careRoomSensorService.update(updateWrapper);
|
|
|
+ } else if("CfgGatesAck".equals(type)) { //门
|
|
|
+ UpdateWrapper<CareRoomGate> updateWrapper = new UpdateWrapper<>();
|
|
|
+ updateWrapper.lambda().eq(CareRoomGate::getRoomId,devCode).set(CareRoomGate::getAck,ack);
|
|
|
+ careRoomGateService.update(updateWrapper);
|
|
|
+ } else if("CfgRegionsAck".equals(type)) { //区域
|
|
|
+ UpdateWrapper<CareRoomRegion> updateWrapper = new UpdateWrapper<>();
|
|
|
+ updateWrapper.lambda().eq(CareRoomRegion::getRoomId,devCode).set(CareRoomRegion::getAck,ack);
|
|
|
+ careRoomRegionService.update(updateWrapper);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 工单事件的消息 处理
|
|
|
+ * @param topic
|
|
|
+ * @param mqttMessageStr
|
|
|
+ */
|
|
|
+ private void handleOrderEventMessage(String topic, String mqttMessageStr) {
|
|
|
+
|
|
|
try {
|
|
|
|
|
|
Map map = JSON.parseObject(mqttMessageStr);
|
|
@@ -75,6 +141,7 @@ public class MqttMsgService {
|
|
|
logger.error("数据库中没找到对应设备");
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
Date date = DateUtil.date();
|
|
|
|
|
|
CareMqttMsg careMqttMsg = new CareMqttMsg();
|
|
@@ -145,7 +212,7 @@ public class MqttMsgService {
|
|
|
orderDb.setStatus(OrderStatusEnum.CANCEL.getValue());
|
|
|
this.careEventOrderService.updateById(orderDb);
|
|
|
|
|
|
- //管家工单标记为取消
|
|
|
+ //管家工单标记为取消
|
|
|
CareEventOrderChamb careEventOrderChamb = this.careEventOrderChambService.getChambOrderByOrderId(orderDb.getId());
|
|
|
careEventOrderChamb.setStatus(ChambOrderStatusEnum.CANCEL.getValue());
|
|
|
this.careEventOrderChambService.updateById(careEventOrderChamb);
|
|
@@ -185,7 +252,7 @@ public class MqttMsgService {
|
|
|
this.careMqttMsgService.save(careMqttMsg);
|
|
|
|
|
|
//进行调度处理
|
|
|
- MqttThreadPoolInitService.addScheduled(careMqttMsg,careMqttMsg.getDelayTime() * 60 * 1000);
|
|
|
+ MqttThreadPoolService.addScheduled(careMqttMsg,careMqttMsg.getDelayTime() * 60 * 1000);
|
|
|
} else { //取消: 更新mqtt消息为已取消,等待执行的调度到时会取消执行
|
|
|
careMqttMsg.setStatus(MqttMsgStatusEnum.HANDLED.getValue());
|
|
|
this.careMqttMsgService.save(careMqttMsg);
|
|
@@ -204,7 +271,6 @@ public class MqttMsgService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
private Integer getDelayTime(String type, CareDevice careDevice) {
|
|
|
if ("FallDown".equals(type)) { //跌到
|
|
|
return careDevice.getFallResponseTime();
|