Browse Source

webSocket

sunhh 2 years ago
parent
commit
9b51a6b8fd

+ 21 - 21
hazard-admin/src/main/java/com/ozs/web/controller/websocket/WebSocketConfig.java

@@ -1,21 +1,21 @@
-package com.ozs.web.controller.websocket;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
-/**
- * @Author : sunhh
- * @create 2023/3/7 16:17
- */
-@Configuration
-public class WebSocketConfig {
-    /**
-     * 	注入ServerEndpointExporter,
-     * 	这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
-     */
-    @Bean
-    public ServerEndpointExporter serverEndpointExporter() {
-        return new ServerEndpointExporter();
-    }
-}
+//package com.ozs.web.controller.websocket;
+//
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+//
+///**
+// * @Author : sunhh
+// * @create 2023/3/7 16:17
+// */
+//@Configuration
+//public class WebSocketConfig {
+//    /**
+//     * 	注入ServerEndpointExporter,
+//     * 	这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
+//     */
+//    @Bean
+//    public ServerEndpointExporter serverEndpointExporter() {
+//        return new ServerEndpointExporter();
+//    }
+//}

+ 129 - 129
hazard-admin/src/main/java/com/ozs/web/controller/websocket/WebSocketServer.java

@@ -1,129 +1,129 @@
-package com.ozs.web.controller.websocket;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import javax.websocket.*;
-import javax.websocket.server.PathParam;
-import javax.websocket.server.ServerEndpoint;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-/**
- * @Author : sunhh
- * @create 2023/3/7 16:15
- */
-@Slf4j
-@Component
-@ServerEndpoint("/websocket/{userId}")  // 接口路径 ws://localhost:8087/webSocket/userId;
-public class WebSocketServer {
-
-    //与某个客户端的连接会话,需要通过它来给客户端发送数据
-    private Session session;
-    /**
-     * 用户ID
-     */
-    private String userId;
-
-    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
-    //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
-    //  注:底下WebSocket是当前类名
-    private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>();
-    // 用来存在线连接用户信息
-    private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>();
-
-    /**
-     * 链接成功调用的方法
-     */
-    @OnOpen
-    public void onOpen(Session session, @PathParam(value="userId")String userId) {
-        try {
-            this.session = session;
-            this.userId = userId;
-            webSockets.add(this);
-            sessionPool.put(userId, session);
-            log.info("【websocket消息】有新的连接,总数为:"+webSockets.size());
-        } catch (Exception e) {
-        }
-    }
-
-    /**
-     * 链接关闭调用的方法
-     */
-    @OnClose
-    public void onClose() {
-        try {
-            webSockets.remove(this);
-            sessionPool.remove(this.userId);
-            log.info("【websocket消息】连接断开,总数为:"+webSockets.size());
-        } catch (Exception e) {
-        }
-    }
-    /**
-     * 收到客户端消息后调用的方法
-     *
-     * @param message
-     */
-    @OnMessage
-    public void onMessage(String message) {
-        log.info("【websocket消息】收到客户端消息:"+message);
-    }
-
-    /** 发送错误时的处理
-     * @param session
-     * @param error
-     */
-    @OnError
-    public void onError(Session session, Throwable error) {
-
-        log.error("用户错误,原因:"+error.getMessage());
-        error.printStackTrace();
-    }
-
-
-    // 此为广播消息
-    public void sendAllMessage(String message) {
-        log.info("【websocket消息】广播消息:"+message);
-        for(WebSocketServer webSocket : webSockets) {
-            try {
-                if(webSocket.session.isOpen()) {
-                    webSocket.session.getAsyncRemote().sendText(message);
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    // 此为单点消息
-    public void sendOneMessage(String userId, String message) {
-        Session session = sessionPool.get(userId);
-        if (session != null&&session.isOpen()) {
-            try {
-                log.info("【websocket消息】 单点消息:"+message);
-                session.getAsyncRemote().sendText(message);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    // 此为单点消息(多人)
-    public void sendMoreMessage(String[] userIds, String message) {
-        for(String userId:userIds) {
-            Session session = sessionPool.get(userId);
-            if (session != null&&session.isOpen()) {
-                try {
-                    log.info("【websocket消息】 单点消息:"+message);
-                    session.getAsyncRemote().sendText(message);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-
-    }
-}
+//package com.ozs.web.controller.websocket;
+//
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.stereotype.Component;
+//
+//import javax.websocket.*;
+//import javax.websocket.server.PathParam;
+//import javax.websocket.server.ServerEndpoint;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.concurrent.ConcurrentHashMap;
+//import java.util.concurrent.CopyOnWriteArraySet;
+//
+///**
+// * @Author : sunhh
+// * @create 2023/3/7 16:15
+// */
+//@Slf4j
+//@Component
+//@ServerEndpoint("/websocket/{userId}")  // 接口路径 ws://localhost:8087/webSocket/userId;
+//public class WebSocketServer {
+//
+//    //与某个客户端的连接会话,需要通过它来给客户端发送数据
+//    private Session session;
+//    /**
+//     * 用户ID
+//     */
+//    private String userId;
+//
+//    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
+//    //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
+//    //  注:底下WebSocket是当前类名
+//    private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>();
+//    // 用来存在线连接用户信息
+//    private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>();
+//
+//    /**
+//     * 链接成功调用的方法
+//     */
+//    @OnOpen
+//    public void onOpen(Session session, @PathParam(value="userId")String userId) {
+//        try {
+//            this.session = session;
+//            this.userId = userId;
+//            webSockets.add(this);
+//            sessionPool.put(userId, session);
+//            log.info("【websocket消息】有新的连接,总数为:"+webSockets.size());
+//        } catch (Exception e) {
+//        }
+//    }
+//
+//    /**
+//     * 链接关闭调用的方法
+//     */
+//    @OnClose
+//    public void onClose() {
+//        try {
+//            webSockets.remove(this);
+//            sessionPool.remove(this.userId);
+//            log.info("【websocket消息】连接断开,总数为:"+webSockets.size());
+//        } catch (Exception e) {
+//        }
+//    }
+//    /**
+//     * 收到客户端消息后调用的方法
+//     *
+//     * @param message
+//     */
+//    @OnMessage
+//    public void onMessage(String message) {
+//        log.info("【websocket消息】收到客户端消息:"+message);
+//    }
+//
+//    /** 发送错误时的处理
+//     * @param session
+//     * @param error
+//     */
+//    @OnError
+//    public void onError(Session session, Throwable error) {
+//
+//        log.error("用户错误,原因:"+error.getMessage());
+//        error.printStackTrace();
+//    }
+//
+//
+//    // 此为广播消息
+//    public void sendAllMessage(String message) {
+//        log.info("【websocket消息】广播消息:"+message);
+//        for(WebSocketServer webSocket : webSockets) {
+//            try {
+//                if(webSocket.session.isOpen()) {
+//                    webSocket.session.getAsyncRemote().sendText(message);
+//                }
+//            } catch (Exception e) {
+//                e.printStackTrace();
+//            }
+//        }
+//    }
+//
+//    // 此为单点消息
+//    public void sendOneMessage(String userId, String message) {
+//        Session session = sessionPool.get(userId);
+//        if (session != null&&session.isOpen()) {
+//            try {
+//                log.info("【websocket消息】 单点消息:"+message);
+//                session.getAsyncRemote().sendText(message);
+//            } catch (Exception e) {
+//                e.printStackTrace();
+//            }
+//        }
+//    }
+//
+//    // 此为单点消息(多人)
+//    public void sendMoreMessage(String[] userIds, String message) {
+//        for(String userId:userIds) {
+//            Session session = sessionPool.get(userId);
+//            if (session != null&&session.isOpen()) {
+//                try {
+//                    log.info("【websocket消息】 单点消息:"+message);
+//                    session.getAsyncRemote().sendText(message);
+//                } catch (Exception e) {
+//                    e.printStackTrace();
+//                }
+//            }
+//        }
+//
+//    }
+//}

+ 17 - 0
hazard-sdk/pom.xml

@@ -14,6 +14,23 @@
     <artifactId>hazard-sdk</artifactId>
 
     <dependencies>
+        <!-- webSocket web端 消息推送 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-thymeleaf</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
+
+        <!--lombok -->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.16.18</version>
+        </dependency>
 
         <!-- SpringBoot Web容器 -->
         <dependency>

+ 19 - 2
hazard-sdk/src/main/java/com/ozs/controller/upload/GeoHazardMonitorTokenController.java

@@ -24,6 +24,7 @@ import com.ozs.vo.ReqDeviceVo;
 import com.ozs.vo.ReqMsgAlarmVo;
 import com.ozs.vo.RespGeoHazardMonitorVo;
 import com.ozs.vo.RespMsgAlarmVo;
+import com.ozs.websocket.WebSocketServer;
 import io.lettuce.core.ScriptOutputType;
 import io.swagger.models.auth.In;
 import lombok.extern.slf4j.Slf4j;
@@ -69,7 +70,8 @@ public class GeoHazardMonitorTokenController {
     BaseCameraManagementService baseCameraManagementService;
 //    @Autowired
 //    private ISysUserService userService;
-
+    @Autowired
+    private WebSocketServer webSocketServer;
 
     /**
      * 获取web访问令牌
@@ -131,6 +133,21 @@ public class GeoHazardMonitorTokenController {
         if (StringUtils.isNotEmpty(token)) {
             String s = SM4Utils.decryptData_ECB(parameter, "4370780c9a8c43e5");
             ReqMsgAlarmVo reqMsgAlarmVo = JSON.parseObject(s, ReqMsgAlarmVo.class);
+            // 推送消息reqMsgAlarmVo
+            // 初始化 -> 运行中 -> 成功或失败
+//            try {
+//                // 1、初始化
+//                webSocketServer.sendMoreMessage(ids,objStr(pipelineId,1));
+//                Thread.sleep(3 * 1000);
+//                // 2、运行中
+//                webSocketServer.sendMoreMessage(ids,objStr(pipelineId,2));
+//                Thread.sleep(5 * 1000);
+//                // 3、成功或失败
+//                webSocketServer.sendMoreMessage(ids,Math.random() * 10 > 5 ? objStr(pipelineId,6) : objStr(pipelineId,7));
+//            } catch (InterruptedException e) {
+//                e.printStackTrace();
+//                throw new RuntimeException("构建失败");
+//            }
             log.info("reqMsgAlarmVo:{}", reqMsgAlarmVo);
             RespMsgAlarmVo respMsgAlarmVo = new RespMsgAlarmVo();
             LambdaQueryWrapper<BaseCameraManagement> wrapper = new LambdaQueryWrapper<BaseCameraManagement>();
@@ -223,7 +240,7 @@ public class GeoHazardMonitorTokenController {
                 jsonObject.put("message", "信息编码为空!");
                 return SM4Utils.encryptData_ECB(JSONObject.toJSONString(jsonObject), "4370780c9a8c43e5");
             }
-//            SysUser sysUser = userService.selectUserById(1L);
+            //SysUser sysUser = userService.selectUserById(1L);
             lw.eq(BaseDeviceDynamicManagement::getCameraCode, reqDeviceVo.getCameraCode());
             BaseDeviceDynamicManagement baseDynamicManagement = baseDeviceDynamicManagementService.getOne(lw);
             baseDynamicManagement.setElectricity(reqDeviceVo.getElectricity());

+ 21 - 0
hazard-sdk/src/main/java/com/ozs/websocket/WebSocketConfig.java

@@ -0,0 +1,21 @@
+package com.ozs.websocket;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+/**
+ * @Author : sunhh
+ * @create 2023/3/7 16:17
+ */
+@Configuration
+public class WebSocketConfig {
+    /**
+     * 	注入ServerEndpointExporter,
+     * 	这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
+     */
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+}

+ 129 - 0
hazard-sdk/src/main/java/com/ozs/websocket/WebSocketServer.java

@@ -0,0 +1,129 @@
+package com.ozs.websocket;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * @Author : sunhh
+ * @create 2023/3/7 16:15
+ */
+@Slf4j
+@Component
+@ServerEndpoint("/websocket/{userId}")  // 接口路径 ws://localhost:8087/webSocket/userId;
+public class WebSocketServer {
+
+    //与某个客户端的连接会话,需要通过它来给客户端发送数据
+    private Session session;
+    /**
+     * 用户ID
+     */
+    private String userId;
+
+    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
+    //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
+    //  注:底下WebSocket是当前类名
+    private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>();
+    // 用来存在线连接用户信息
+    private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>();
+
+    /**
+     * 链接成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(Session session, @PathParam(value="userId")String userId) {
+        try {
+            this.session = session;
+            this.userId = userId;
+            webSockets.add(this);
+            sessionPool.put(userId, session);
+            log.info("【websocket消息】有新的连接,总数为:"+webSockets.size());
+        } catch (Exception e) {
+        }
+    }
+
+    /**
+     * 链接关闭调用的方法
+     */
+    @OnClose
+    public void onClose() {
+        try {
+            webSockets.remove(this);
+            sessionPool.remove(this.userId);
+            log.info("【websocket消息】连接断开,总数为:"+webSockets.size());
+        } catch (Exception e) {
+        }
+    }
+    /**
+     * 收到客户端消息后调用的方法
+     *
+     * @param message
+     */
+    @OnMessage
+    public void onMessage(String message) {
+        log.info("【websocket消息】收到客户端消息:"+message);
+    }
+
+    /** 发送错误时的处理
+     * @param session
+     * @param error
+     */
+    @OnError
+    public void onError(Session session, Throwable error) {
+
+        log.error("用户错误,原因:"+error.getMessage());
+        error.printStackTrace();
+    }
+
+
+    // 此为广播消息
+    public void sendAllMessage(String message) {
+        log.info("【websocket消息】广播消息:"+message);
+        for(WebSocketServer webSocket : webSockets) {
+            try {
+                if(webSocket.session.isOpen()) {
+                    webSocket.session.getAsyncRemote().sendText(message);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    // 此为单点消息
+    public void sendOneMessage(String userId, String message) {
+        Session session = sessionPool.get(userId);
+        if (session != null&&session.isOpen()) {
+            try {
+                log.info("【websocket消息】 单点消息:"+message);
+                session.getAsyncRemote().sendText(message);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    // 此为单点消息(多人)
+    public void sendMoreMessage(String[] userIds, String message) {
+        for(String userId:userIds) {
+            Session session = sessionPool.get(userId);
+            if (session != null&&session.isOpen()) {
+                try {
+                    log.info("【websocket消息】 单点消息:"+message);
+                    session.getAsyncRemote().sendText(message);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+    }
+}