Kaynağa Gözat

定时任务时间修改

gao.qiang 10 saat önce
ebeveyn
işleme
567646674a

+ 70 - 18
hazard-admin/src/main/java/com/ozs/web/controller/websocket/WebSocketServer.java

@@ -7,6 +7,7 @@ import org.springframework.stereotype.Component;
 import javax.websocket.*;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -37,6 +38,12 @@ public class WebSocketServer {
      */
     private String userId;
 
+
+    // ========== 新增:心跳相关配置 ==========
+    private static final long HEARTBEAT_INTERVAL = 20000; // 心跳间隔:20秒(比中间件超时短)
+    private static final long PONG_TIMEOUT = 10000; // Pong响应超时:10秒(超过则断开)
+    private final Map<String, Long> lastPongTime = new ConcurrentHashMap<>(); // 记录每个客户端最后一次Pong时间
+
     /**
      * 获取用户是否在线
      */
@@ -60,9 +67,10 @@ public class WebSocketServer {
             this.userId = userId;
             webSockets.add(this);
             sessionPool.put(userId, session);
-                System.out.println("链接成功" + webSockets.size());
+            lastPongTime.put(userId, System.currentTimeMillis()); // 初始化:记录连接成功时间(作为初始Pong时间)
             log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
         } catch (Exception e) {
+            log.error("【websocket错误】连接建立失败,用户ID: {}", userId, e);
         }
     }
 
@@ -74,10 +82,10 @@ public class WebSocketServer {
         try {
             webSockets.remove(this);
             sessionPool.remove(this.userId);
-            System.out.println("链接失败" + webSockets.size());
-            log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
+            lastPongTime.remove(this.userId); // 清理:移除该用户的Pong时间记录
+            log.info("【websocket消息】连接断开,用户ID: {}, 总数为:{}", this.userId, webSockets.size());
         } catch (Exception e) {
-            log.error("onClose移除连接失败", e);
+            log.error("【websocket错误】onClose移除连接失败,用户ID: {}", this.userId, e);
         }
     }
 
@@ -88,9 +96,14 @@ public class WebSocketServer {
      */
     @OnMessage
     public void onMessage(String message) {
-        log.info("【websocket消息】收到客户端消息:" + message);
-        log.info("【收到消息】userId: {}, Session是否打开: {}, 当前webSockets大小: {}",
-                this.userId, this.session.isOpen(), webSockets.size());
+        // 处理客户端主动发送的心跳消息(如果客户端是发送文本心跳,而非Pong包)
+        if ("heartbeat".equals(message.trim())) {
+            lastPongTime.put(this.userId, System.currentTimeMillis()); // 更新Pong时间
+            log.info("【websocket心跳】收到客户端文本心跳,用户ID: {}", this.userId);
+            return;
+        }
+        // 处理业务消息
+        log.info("【websocket消息】收到客户端业务消息,用户ID: {}, 消息:{}", this.userId, message);
     }
 
     /**
@@ -101,11 +114,11 @@ public class WebSocketServer {
      */
     @OnError
     public void onError(Session session, Throwable error) {
-        log.error("用户错误,原因:" + error.getMessage());
-        log.error("用户错误,原因:" + error.getLocalizedMessage());
-        log.error("用户错误,原因sessionsessionsession:" + session.getId());
-        log.error("用户错误,原因 error: {}", error);
-//        error.printStackTrace();
+        log.error("【websocket错误】用户ID: {}, SessionID: {}, 原因:{}", this.userId, session.getId(), error.getMessage(), error);
+        // 错误时主动清理连接
+        webSockets.remove(this);
+        sessionPool.remove(this.userId);
+        lastPongTime.remove(this.userId);
     }
 
     // 此为广播消息
@@ -117,7 +130,7 @@ public class WebSocketServer {
                     webSocket.session.getAsyncRemote().sendText(message);
                 }
             } catch (Exception e) {
-                e.printStackTrace();
+                log.error("【websocket错误】广播消息失败", e);
             }
         }
     }
@@ -127,10 +140,10 @@ public class WebSocketServer {
         Session session = sessionPool.get(userId);
         if (session != null && session.isOpen()) {
             try {
-                log.info("【websocket消息】 单点消息:" + message);
+                log.info("【websocket消息】单点消息,用户ID: {}, 消息:{}", userId, message);
                 session.getAsyncRemote().sendText(message);
             } catch (Exception e) {
-                e.printStackTrace();
+                log.error("【websocket错误】单点消息失败,用户ID: {}", userId, e);
             }
         }
     }
@@ -141,11 +154,45 @@ public class WebSocketServer {
             Session session = sessionPool.get(userId);
             if (session != null && session.isOpen()) {
                 try {
-                    log.info("【websocket消息】 单点消息:" + message);
+                    log.info("【websocket消息】多人消息,用户ID: {}, 消息:{}", userId, message);
                     session.getAsyncRemote().sendText(message);
                 } catch (Exception e) {
-                    e.printStackTrace();
+                    log.error("【websocket错误】多人消息失败,用户ID: {}", userId, e);
+                }
+            }
+        }
+    }
+
+
+    /**
+     * 新增:定时发送Ping包(心跳核心)
+     * 每20秒给所有在线客户端发一次Ping,触发客户端回复Pong
+     */
+    @Scheduled(fixedRate = HEARTBEAT_INTERVAL)
+    public void sendPingToAllClients() {
+        long currentTime = System.currentTimeMillis();
+        for (WebSocketServer webSocket : webSockets) {
+            Session session = webSocket.session;
+            String userId = webSocket.userId;
+            try {
+                if (session.isOpen()) {
+                    // 发送Ping包(WebSocket标准操作,客户端会自动回复Pong)
+                    session.getBasicRemote().sendPing(ByteBuffer.wrap(new byte[0]));
+                    log.debug("【websocket心跳】给用户ID: {} 发送Ping包", userId);
+
+                    // 检查是否超过Pong超时(如果10秒内没收到Pong,说明客户端失联,主动断开)
+                    long lastPong = lastPongTime.getOrDefault(userId, 0L);
+                    if (currentTime - lastPong > PONG_TIMEOUT) {
+                        log.warn("【websocket心跳】用户ID: {} 超过Pong超时,主动断开连接", userId);
+                        session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Pong timeout"));
+                    }
                 }
+            } catch (Exception e) {
+                log.error("【websocket心跳】给用户ID: {} 发送Ping失败,主动清理连接", userId, e);
+                // 发送失败时,清理无效连接
+                sessionPool.remove(userId);
+                webSockets.remove(webSocket);
+                lastPongTime.remove(userId);
             }
         }
     }
@@ -156,10 +203,15 @@ public class WebSocketServer {
                 .filter(entry -> !entry.getValue().isOpen())
                 .map(Map.Entry::getKey)
                 .collect(Collectors.toList());
+
         invalidUserIds.forEach(userId -> {
             sessionPool.remove(userId);
             webSockets.removeIf(ws -> ws.userId.equals(userId));
+            lastPongTime.remove(userId); // 同步清理Pong时间记录
         });
-        log.info("清理无效连接,剩余总数: {}", webSockets.size());
+
+        if (!invalidUserIds.isEmpty()) {
+            log.info("【websocket清理】清理无效连接数: {}, 剩余总数: {}", invalidUserIds.size(), webSockets.size());
+        }
     }
 }

+ 1 - 1
hazard-admin/src/main/resources/logback.xml

@@ -4,7 +4,7 @@
     <property name="log.path" value="/data/service/logs/hazard-admin"/>
 <!--      <property name="log.path" value="/Users/sunhuanhuan/Documents/project/106/project/logs"/>-->
     <!-- 日志输出格式 -->
-    <property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
+    <property name="log.pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
 
     <!-- 控制台输出 -->
     <appender name="console" class="ch.qos.logback.core.ConsoleAppender">

+ 1 - 1
hazard-sdk/src/main/resources/logback.xml

@@ -4,7 +4,7 @@
     <!--	<property name="log.path" value="/home/base/logs" />-->
     <property name="log.path" value="/data/service/logs/hazard-sdk"/>
     <!-- 日志输出格式 -->
-    <property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
+	<property name="log.pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n" />
 
     <!-- 控制台输出 -->
     <appender name="console" class="ch.qos.logback.core.ConsoleAppender">