|
@@ -4,32 +4,36 @@ package com.care.mqtt.service;
|
|
|
import cn.hutool.core.collection.CollUtil;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
import com.care.bigscreen.service.BigScreenService;
|
|
|
+import com.care.common.cache.RedisUtil;
|
|
|
+import com.care.common.constant.Constants;
|
|
|
import com.care.common.entity.CareDevice;
|
|
|
import com.care.common.entity.CareEventOrder;
|
|
|
import com.care.common.entity.CareMqttMsg;
|
|
|
-import com.care.common.enums.*;
|
|
|
+import com.care.common.enums.MqttMsgStatusEnum;
|
|
|
+import com.care.common.enums.OrderStatusEnum;
|
|
|
+import com.care.common.enums.OrderTypeEnum;
|
|
|
import com.care.common.service.CareDeviceService;
|
|
|
-
|
|
|
import com.care.common.service.CareEventOrderService;
|
|
|
import com.care.common.service.CareMqttMsgService;
|
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
-
|
|
|
import javax.annotation.Resource;
|
|
|
import java.util.Date;
|
|
|
import java.util.List;
|
|
|
-
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
/**
|
|
|
+ * 使用redis轮训方案实现延迟触发
|
|
|
* @Author: stw
|
|
|
* @Date: 2021/8/14
|
|
|
* @Desc:
|
|
|
*/
|
|
|
@Service
|
|
|
-public class MqttThreadPoolService {
|
|
|
+public class MqttMsgRedisService {
|
|
|
@Resource
|
|
|
private CareDeviceService careDeviceService;
|
|
|
@Resource
|
|
@@ -41,25 +45,13 @@ public class MqttThreadPoolService {
|
|
|
@Resource
|
|
|
private BigScreenService bigScreenService;
|
|
|
@Resource
|
|
|
- private ScheduledThreadPoolExecutor scheduledThreadPool;
|
|
|
+ private RedisUtil redisUtil;
|
|
|
|
|
|
- public ScheduledThreadPoolExecutor getScheduledThreadPool() {
|
|
|
- return scheduledThreadPool;
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
- * 读取数据库mqtt msg表,把有延迟标记、未处理的消息生成调度任务放入线程池
|
|
|
+ * 读取数据库mqtt msg表,把有延迟标记、未处理的消息生成调redis结构放入redis
|
|
|
*/
|
|
|
public void init() {
|
|
|
- QueryWrapper<CareDevice> queryWrapper = new QueryWrapper();
|
|
|
- queryWrapper.lambda().eq(CareDevice::getActiveStatus, DeviceActiveStatusEnum.ACTIVE.getValue());
|
|
|
- int devCount = careDeviceService.count(queryWrapper);
|
|
|
-
|
|
|
- if (devCount < 5) {
|
|
|
- devCount = 5;
|
|
|
- }
|
|
|
- //创建一个定长线程池,支持定时任务执行——延迟执行
|
|
|
- scheduledThreadPool = new ScheduledThreadPoolExecutor(devCount);
|
|
|
|
|
|
//查询mqtt msg 表有延迟标记的未处理消息,放入线程池,等待延时时间结束执行创建工单
|
|
|
QueryWrapper<CareMqttMsg> queryWrapper2 = new QueryWrapper<>();
|
|
@@ -73,30 +65,58 @@ public class MqttThreadPoolService {
|
|
|
Long now = System.currentTimeMillis();
|
|
|
Long createTime = careMqttMsg.getCreateTime().getTime();
|
|
|
Long delayTime = careMqttMsg.getDelayTime() * 60 * 1000L;
|
|
|
- Long diff = delayTime - (now - createTime);
|
|
|
- if(diff <= 0) { //立即执行
|
|
|
+ Long execTime = createTime + delayTime;
|
|
|
+
|
|
|
+ if(execTime <= now) { //立即执行
|
|
|
exe(careMqttMsg);
|
|
|
continue;
|
|
|
}
|
|
|
- addScheduled(careMqttMsg,diff.intValue());
|
|
|
+ addRedis(careMqttMsg,execTime);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void addScheduled(CareMqttMsg careMqttMsg,int delay){
|
|
|
|
|
|
- //executor.setCorePoolSize(newPoolSize);//todo
|
|
|
|
|
|
- //延迟 diff 毫秒执行
|
|
|
- scheduledThreadPool.schedule(new Runnable() {
|
|
|
- public void run() {
|
|
|
+ /**
|
|
|
+ * 把延迟消息放入redis
|
|
|
+ * @param careMqttMsg
|
|
|
+ * @param execTime
|
|
|
+ */
|
|
|
+ public void addRedis(CareMqttMsg careMqttMsg, long execTime){
|
|
|
+
|
|
|
+ String msg = String.valueOf(careMqttMsg.getId());
|
|
|
+ redisUtil.zSetAdd(Constants.MQTT_DELAY_MSG_KEY, msg, execTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 轮训redis
|
|
|
+ */
|
|
|
+ public void rotationRedis(){
|
|
|
+ long startTime = 0L;
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ //查询符合时间条件下的集合
|
|
|
+ Set set = redisUtil.zSetRangeByScore(Constants.MQTT_DELAY_MSG_KEY, startTime, endTime);
|
|
|
+
|
|
|
+ if (CollUtil.isNotEmpty(set)) {
|
|
|
+ //具体业务操作
|
|
|
+ set.forEach(item -> {
|
|
|
+ CareMqttMsg careMqttMsg = careMqttMsgService.getById((Long)item);
|
|
|
exe(careMqttMsg);
|
|
|
- }
|
|
|
- }, delay, TimeUnit.MILLISECONDS);
|
|
|
+ });
|
|
|
|
|
|
+ //移除集合
|
|
|
+ redisUtil.zSetRemoveRangeByScore(Constants.MQTT_DELAY_MSG_KEY, startTime, endTime);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 执行创建工单动作
|
|
|
+ * @param careMqttMsg
|
|
|
+ */
|
|
|
+
|
|
|
private void exe(CareMqttMsg careMqttMsg){
|
|
|
String status = this.careMqttMsgService.getById(careMqttMsg).getStatus();
|
|
|
if(!MqttMsgStatusEnum.NO_HANDLE.getValue().equals(status)){ //不是未处理,不执行。
|
|
@@ -105,8 +125,8 @@ public class MqttThreadPoolService {
|
|
|
|
|
|
CareDevice careDevice = careDeviceService.getById(careMqttMsg.getDevId());
|
|
|
CareEventOrder order = mqttMsgService.getNewOrder(careDevice);
|
|
|
- order.setCreateTime(new Date());
|
|
|
- order.setModifyTime(new Date());
|
|
|
+ order.setCreateTime(careMqttMsg.getCreateTime());
|
|
|
+ order.setModifyTime(careMqttMsg.getCreateTime());
|
|
|
order.setStatus(OrderStatusEnum.TODO.getValue());
|
|
|
if ("FallDown".equals(careMqttMsg.getType())) { //跌到
|
|
|
order.setOrderType(OrderTypeEnum.DI_EDAO.getValue());
|