Explorar el Código

Merge remote-tracking branch 'origin/master'

gao.qiang hace 2 años
padre
commit
4313ef8ce5

+ 19 - 1
hazard-admin/pom.xml

@@ -24,15 +24,33 @@
             <artifactId>spring-boot-starter-thymeleaf</artifactId>
             <artifactId>spring-boot-starter-thymeleaf</artifactId>
         </dependency>
         </dependency>
 
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+
         <dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-websocket</artifactId>
             <artifactId>spring-boot-starter-websocket</artifactId>
         </dependency>
         </dependency>
 
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.junit.vintage</groupId>
+                    <artifactId>junit-vintage-engine</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!--lombok -->
         <dependency>
         <dependency>
             <groupId>org.projectlombok</groupId>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
             <artifactId>lombok</artifactId>
-            <scope>provided</scope>
+            <version>1.16.18</version>
         </dependency>
         </dependency>
 
 
         <!-- spring-boot-devtools -->
         <!-- spring-boot-devtools -->

+ 3 - 6
hazard-admin/src/main/java/com/ozs/web/controller/websocket/WebSocketConfigOne.java

@@ -9,13 +9,10 @@ import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  * @create 2023/3/7 16:17
  * @create 2023/3/7 16:17
  */
  */
 @Configuration
 @Configuration
-public class WebSocketConfigOne {
-
+public class WebSocketConfig {
     /**
     /**
-     * 这个bean会自动注册使用了@ServerEndpoint注解声明的对象
-     * 没有的话会报404
-     *
-     * @return
+     * 	注入ServerEndpointExporter,
+     * 	这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
      */
      */
     @Bean
     @Bean
     public ServerEndpointExporter serverEndpointExporter() {
     public ServerEndpointExporter serverEndpointExporter() {

+ 0 - 31
hazard-admin/src/main/java/com/ozs/web/controller/websocket/WebSocketController.java

@@ -1,31 +0,0 @@
-package com.ozs.web.controller.websocket;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Controller;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.ResponseBody;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @Author : sunhh
- * @create 2023/3/7 16:18
- */
-@Controller
-@RequestMapping("/pushWebSocket")
-public class WebSocketController {
-
-    @Autowired
-    private WebSocketServer webSocketServer;
-
-    @GetMapping("/publish")
-    @ResponseBody
-    public Map publish(String userId, String message) {
-        webSocketServer.sendOneMessage(userId, message);
-        HashMap<String, Object> map = new HashMap<>();
-        map.put("code", 200);
-        return map;
-    }
-}

+ 80 - 13
hazard-admin/src/main/java/com/ozs/web/controller/websocket/WebSocketServer.java

@@ -3,60 +3,127 @@ package com.ozs.web.controller.websocket;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
-import javax.websocket.OnMessage;
-import javax.websocket.OnOpen;
-import javax.websocket.Session;
+import javax.websocket.*;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
 import javax.websocket.server.ServerEndpoint;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CopyOnWriteArraySet;
 
 
 /**
 /**
  * @Author : sunhh
  * @Author : sunhh
  * @create 2023/3/7 16:15
  * @create 2023/3/7 16:15
  */
  */
-@Component
 @Slf4j
 @Slf4j
-@ServerEndpoint("/webSocket/{userId}")
+@Component
+@ServerEndpoint("/websocket/{userId}")  // 接口路径 ws://localhost:8087/webSocket/userId;
 public class WebSocketServer {
 public class WebSocketServer {
+
     //与某个客户端的连接会话,需要通过它来给客户端发送数据
     //与某个客户端的连接会话,需要通过它来给客户端发送数据
     private Session session;
     private Session session;
-    private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
+    /**
+     * 用户ID
+     */
+    private String userId;
 
 
-    // 用来存在线连接数
-    private static final Map<String, Session> sessionPool = new HashMap<String, Session>();
+    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
+    //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
+    //  注:底下WebSocket是当前类名
+    private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>();
+    // 用来存在线连接用户信息
+    private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>();
 
 
     /**
     /**
-     * 连接成功调用的方法
+     * 接成功调用的方法
      */
      */
     @OnOpen
     @OnOpen
-    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
+    public void onOpen(Session session, @PathParam(value="userId")String userId) {
         try {
         try {
             this.session = session;
             this.session = session;
+            this.userId = userId;
             webSockets.add(this);
             webSockets.add(this);
             sessionPool.put(userId, session);
             sessionPool.put(userId, session);
+            log.info("【websocket消息】有新的连接,总数为:"+webSockets.size());
         } catch (Exception e) {
         } catch (Exception e) {
         }
         }
     }
     }
 
 
+    /**
+     * 链接关闭调用的方法
+     */
+    @OnClose
+    public void onClose() {
+        try {
+            webSockets.remove(this);
+            sessionPool.remove(this.userId);
+            log.info("【websocket消息】连接断开,总数为:"+webSockets.size());
+        } catch (Exception e) {
+        }
+    }
     /**
     /**
      * 收到客户端消息后调用的方法
      * 收到客户端消息后调用的方法
+     *
+     * @param message
      */
      */
     @OnMessage
     @OnMessage
     public void onMessage(String message) {
     public void onMessage(String message) {
-        log.info("websocket消息: 收到客户端消息:" + message);
+        log.info("【websocket消息】收到客户端消息:"+message);
+    }
+
+    /** 发送错误时的处理
+     * @param session
+     * @param error
+     */
+    @OnError
+    public void onError(Session session, Throwable error) {
+
+        log.error("用户错误,原因:"+error.getMessage());
+        error.printStackTrace();
+    }
+
+
+    // 此为广播消息
+    public void sendAllMessage(String message) {
+        log.info("【websocket消息】广播消息:"+message);
+        for(WebSocketServer webSocket : webSockets) {
+            try {
+                if(webSocket.session.isOpen()) {
+                    webSocket.session.getAsyncRemote().sendText(message);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
     }
     }
 
 
+    // 此为单点消息
     public void sendOneMessage(String userId, String message) {
     public void sendOneMessage(String userId, String message) {
         Session session = sessionPool.get(userId);
         Session session = sessionPool.get(userId);
-        if (session != null && session.isOpen()) {
+        if (session != null&&session.isOpen()) {
             try {
             try {
-                log.info("服务端推送消息:" + message);
+                log.info("【websocket消息】 单点消息:"+message);
                 session.getAsyncRemote().sendText(message);
                 session.getAsyncRemote().sendText(message);
             } catch (Exception e) {
             } catch (Exception e) {
                 e.printStackTrace();
                 e.printStackTrace();
             }
             }
         }
         }
     }
     }
+
+    // 此为单点消息(多人)
+    public void sendMoreMessage(String[] userIds, String message) {
+        for(String userId:userIds) {
+            Session session = sessionPool.get(userId);
+            if (session != null&&session.isOpen()) {
+                try {
+                    log.info("【websocket消息】 单点消息:"+message);
+                    session.getAsyncRemote().sendText(message);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+    }
 }
 }