Browse Source

优化在线离线

lilt085163@126.com 3 năm trước cách đây
mục cha
commit
74556b3649

+ 30 - 18
src/main/java/com/care/mqtt/service/MqttHearbeatService.java

@@ -2,12 +2,17 @@ package com.care.mqtt.service;
 
 
 import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.util.NumberUtil;
+import cn.hutool.core.util.StrUtil;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.care.common.cache.RedisKeyConstant;
 import com.care.common.cache.RedisUtil;
 import com.care.common.entity.CareDevice;
 import com.care.common.enums.*;
 import com.care.common.service.CareDeviceService;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
@@ -22,6 +27,7 @@ import java.util.List;
  * @Desc:
  */
 @Service
+@Slf4j
 public class MqttHearbeatService {
     @Resource
     private CareDeviceService careDeviceService;
@@ -33,35 +39,41 @@ public class MqttHearbeatService {
      * 读取数据库设备表,激活的,轮训看redis中是否有心跳消息,有则置为在线,没有置为下线
      */
     public void rotationDev() {
-
+        log.info("设备在线离线轮训:开始");
         QueryWrapper<CareDevice> queryWrapper = new QueryWrapper<>();
         queryWrapper.lambda().eq(CareDevice::getActiveStatus, DeviceActiveStatusEnum.ACTIVE.getValue());
 
         List<CareDevice> careDeviceList = careDeviceService.list(queryWrapper);
+        Long nowLong = DateUtil.current();
         if (CollUtil.isNotEmpty(careDeviceList)) {
-            List<CareDevice> careDeviceListNew = new ArrayList<>();
+            log.info("设备在线离线轮训:激活设备数量={}",careDeviceList.size());
             for (CareDevice careDevice : careDeviceList) {
-                CareDevice careDeviceNew = new CareDevice();
-                careDeviceNew.setId(careDevice.getId());
 
-                if (redisUtil.hget(RedisKeyConstant.MQTT_DEV_HEARBEAT_INFO,careDevice.getTopic()) != null) { //在线
-                    if(!DeviceStatusEnum.ONLINE.getValue().equals(careDevice.getStatus())){
-                        careDeviceNew.setStatus(DeviceStatusEnum.ONLINE.getValue());
-                        careDeviceListNew.add(careDeviceNew);
-                    }
-                } else { //离线
-                    if(!DeviceStatusEnum.OFFLINE.getValue().equals(careDevice.getStatus())){
-                        careDeviceNew.setStatus(DeviceStatusEnum.OFFLINE.getValue());
-                        careDeviceListNew.add(careDeviceNew);
+                try{
+                    Object value = redisUtil.hget(RedisKeyConstant.MQTT_DEV_HEARBEAT_INFO,careDevice.getTopic());
+                    Long _time =  value != null ? (Long) value:0;
+                    log.info("设备在线离线轮训:code={},开始进行检测,redis中的时间{},当前时间{},间隔{}",careDevice.getDevCode(),_time,nowLong,nowLong-_time);
+                    if ((nowLong-_time) <= 6*60*1000) { //在线
+                        if(!DeviceStatusEnum.ONLINE.getValue().equals(careDevice.getStatus())){
+                            log.info("设备在线离线轮训:id={},code={},设置为在线",careDevice.getId(),careDevice.getDevCode());
+                            UpdateWrapper<CareDevice> deviceUpdateWrapper = new UpdateWrapper<>();
+                            deviceUpdateWrapper.lambda().set(CareDevice::getStatus,DeviceStatusEnum.ONLINE.getValue()).eq(CareDevice::getId,careDevice.getId());
+                            this.careDeviceService.update(deviceUpdateWrapper);
+                        }
+                    } else { //离线
+                        if(!DeviceStatusEnum.OFFLINE.getValue().equals(careDevice.getStatus())){
+                            log.info("设备在线离线轮训:id={},code={},设置为离线",careDevice.getId(),careDevice.getDevCode());
+                            UpdateWrapper<CareDevice> deviceUpdateWrapper = new UpdateWrapper<>();
+                            deviceUpdateWrapper.lambda().set(CareDevice::getStatus,DeviceStatusEnum.OFFLINE.getValue()).eq(CareDevice::getId,careDevice.getId());
+                            this.careDeviceService.update(deviceUpdateWrapper);
+                        }
                     }
+                }catch (Exception e){
+                    log.error("在线状态更新失败:{}",careDevice,e);
                 }
             }
-
-            if (CollUtil.isNotEmpty(careDeviceListNew)){
-                careDeviceService.updateBatchById(careDeviceListNew);
-            }
-
         }
+        log.info("设备在线离线轮训:结束");
     }
 
 }

+ 1 - 1
src/main/java/com/care/mqtt/service/MqttMsgService.java

@@ -85,7 +85,7 @@ public class MqttMsgService {
             //处理心跳,放入redis
             if ("HeartBeat".equals(type)) {
                 logger.info("{} 接收到MQTT  HeartBeat消息,{}",topic,mqttMessageStr);
-                redisUtil.hset(RedisKeyConstant.MQTT_DEV_HEARBEAT_INFO, topic, "1", RedisKeyConstant.MQTT_DEV_HEARBEAT_INFO_TIME);
+                redisUtil.hset(RedisKeyConstant.MQTT_DEV_HEARBEAT_INFO, topic, DateUtil.current(), RedisKeyConstant.MQTT_DEV_HEARBEAT_INFO_TIME);
                 return;
             }