Browse Source

定时任务时间修改

gao.qiang 3 days atrás
parent
commit
d26b4ec942

+ 1 - 32
hazard-admin/src/main/java/com/ozs/web/controller/websocket/WebSocketServer.java

@@ -164,38 +164,7 @@ public class WebSocketServer {
     }
 
 
-    /**
-     * 新增:定时发送Ping包(心跳核心)
-     * 每20秒给所有在线客户端发一次Ping,触发客户端回复Pong
-     */
-    @Scheduled(fixedRate = HEARTBEAT_INTERVAL)
-    public void sendPingToAllClients() {
-        long currentTime = System.currentTimeMillis();
-        for (WebSocketServer webSocket : webSockets) {
-            Session session = webSocket.session;
-            String userId = webSocket.userId;
-            try {
-                if (session.isOpen()) {
-                    // 发送Ping包(WebSocket标准操作,客户端会自动回复Pong)
-                    session.getBasicRemote().sendPing(ByteBuffer.wrap(new byte[0]));
-                    log.debug("【websocket心跳】给用户ID: {} 发送Ping包", userId);
-
-                    // 检查是否超过Pong超时(如果10秒内没收到Pong,说明客户端失联,主动断开)
-                    long lastPong = lastPongTime.getOrDefault(userId, 0L);
-                    if (currentTime - lastPong > PONG_TIMEOUT) {
-                        log.warn("【websocket心跳】用户ID: {} 超过Pong超时,主动断开连接", userId);
-                        session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Pong timeout"));
-                    }
-                }
-            } catch (Exception e) {
-                log.error("【websocket心跳】给用户ID: {} 发送Ping失败,主动清理连接", userId, e);
-                // 发送失败时,清理无效连接
-                sessionPool.remove(userId);
-                webSockets.remove(webSocket);
-                lastPongTime.remove(userId);
-            }
-        }
-    }
+
 
     @Scheduled(fixedRate = 30000) // 每30秒执行一次
     public void cleanInvalidSessions() {

+ 56 - 15
hazard-admin/src/main/java/com/ozs/web/core/util/InsertUserStateUtil.java

@@ -4,9 +4,7 @@ package com.ozs.web.core.util;
 import com.alibaba.fastjson2.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.ozs.service.entity.BaseUser;
-import com.ozs.service.entity.MsgWebPush;
 import com.ozs.service.entity.UserStateStatistics;
-import com.ozs.service.entity.vo.MsgAlarmResp;
 import com.ozs.service.mapper.BaseUserMapper;
 import com.ozs.service.service.UserStateStatisticsService;
 import com.ozs.web.controller.websocket.WebSocketServer;
@@ -60,21 +58,48 @@ public class InsertUserStateUtil {
 
 
             // 使用Set提高查找性能
-            Set<String> onlineSet = new HashSet<>(onlineUser);
             Set<String> userSet = new HashSet<>(userIdList);
             Set<String> userAll = new HashSet<>(userIdsAll);
-
-            // 计算差集和交集
-            Set<String> offlineUsers = userSet.stream()
-                    .filter(userId -> !onlineSet.contains(userId))
-                    .collect(Collectors.toSet());
-            Set<String> offlineUsersAll = userAll.stream()
-                    .filter(userId -> !onlineSet.contains(userId))
-                    .collect(Collectors.toSet());
-            Set<String> onlineUsers = onlineSet.stream()
-                    .filter(userSet::contains)
-                    .collect(Collectors.toSet());
-
+            Set<String> initialOnlineSet = new HashSet<>(onlineUser);
+
+            // 重试配置
+            int maxRetry = 3;
+            long retryDelay = 3000;
+            int currentRetry = 0;
+            Set<String> offlineUsers = calculateOfflineUsers(userSet, initialOnlineSet);
+            // 存储最终最新的在线用户集合(默认用初始值,重试后更新)
+            Set<String> latestOnlineSet = new HashSet<>(initialOnlineSet);
+
+            // 差集存在时,重试拉取在线用户并重新比对
+            while (!offlineUsers.isEmpty() && currentRetry < maxRetry) {
+                try {
+                    currentRetry++;
+                    log.info("第{}次检测到离线用户:{},等待{}秒后重新检查在线状态",
+                            currentRetry, offlineUsers, retryDelay / 1000);
+
+                    Thread.sleep(retryDelay);
+
+                    // 重新获取最新在线用户列表并更新到 latestOnlineSet
+                    List<String> latestOnlineUser = webSocketServer.getOnlineUser();
+                    latestOnlineSet = new HashSet<>(latestOnlineUser);
+
+                    // 重新计算差集(基于最新在线状态)
+                    offlineUsers = calculateOfflineUsers(userSet, latestOnlineSet);
+
+                } catch (InterruptedException e) {
+                    log.error("重试等待时线程被中断", e);
+                    Thread.currentThread().interrupt();
+                    break;
+                }
+            }
+            // 关键修改:用 latestOnlineSet(重试后最新状态)计算最终结果
+            // 1. 所有需推送用户中的离线用户(userAll - 最新在线)
+            Set<String> offlineUsersAll = calculateOfflineUsers(userAll, latestOnlineSet);
+             // 2. 工务段用户中的在线用户(工务段用户 ∩ 最新在线)
+            Set<String> onlineUsers = calculateIntersection(userSet, latestOnlineSet);
+
+             // 后续可添加结果日志打印
+            log.info("最终工务段在线用户:{},所有需推送用户中的离线用户:{}", onlineUsers, offlineUsersAll);
             // 批量查询用户信息,减少数据库查询
             Map<String, BaseUser> userMap = baseUserMapper.getUsersByIds(
                     Stream.concat(offlineUsers.stream(), onlineUsers.stream())
@@ -177,7 +202,23 @@ public class InsertUserStateUtil {
             service.saveBatch(toSave);
         }
     }
+    /**
+     * 计算差集:targetSet - onlineSet(目标集合中不在在线集合的元素)
+     */
+    private Set<String> calculateOfflineUsers(Set<String> targetSet, Set<String> onlineSet) {
+        return targetSet.stream()
+                .filter(userId -> !onlineSet.contains(userId))
+                .collect(Collectors.toSet());
+    }
 
+    /**
+     * 计算交集:targetSet ∩ onlineSet(同时存在于两个集合的元素)
+     */
+    private Set<String> calculateIntersection(Set<String> targetSet, Set<String> onlineSet) {
+        return targetSet.stream()
+                .filter(onlineSet::contains)
+                .collect(Collectors.toSet());
+    }
     private String objStr(Integer count) {
         JSONObject obj = new JSONObject();
         obj.put("data", count);