Sfoglia il codice sorgente

Merge branch 'master' of http://124.70.58.209:3000/ytrd-project-management/GeoHazardMonitor

wyyay 2 anni fa
parent
commit
20e79f6cbe

+ 5 - 4
hazard-admin/src/main/java/com/ozs/web/controller/accountmanagment/MsgAlarmController.java

@@ -103,7 +103,7 @@ public class MsgAlarmController extends BaseController {
     @ApiOperation(value = "报警信息分页")
     @PostMapping("/list")
     public AjaxResult selectMsgAlarm(@RequestBody MsgAlarmVo msgAlarmVo) {
-        BaseCameraManagement baseCameraManagement=new BaseCameraManagement();
+        List<BaseCameraManagement> baseCameraManagementList=new ArrayList<>();
         LambdaQueryWrapper<BaseCameraManagement> wrapper = new LambdaQueryWrapper<BaseCameraManagement>();
         if (!ObjectUtils.isEmpty(msgAlarmVo.getRailwayCode())) {
             wrapper.eq(BaseCameraManagement::getRailwayCode, msgAlarmVo.getRailwayCode());
@@ -115,14 +115,15 @@ public class MsgAlarmController extends BaseController {
             wrapper.le(BaseCameraManagement::getEndMile, msgAlarmVo.getEndMile());
         }
         if (!ObjectUtils.isEmpty(msgAlarmVo.getRailwayCode())||!ObjectUtils.isEmpty(msgAlarmVo.getBeginMile())||!ObjectUtils.isEmpty(msgAlarmVo.getEndMile())) {
-            baseCameraManagement = baseCameraManagementService.getOne(wrapper);
+            baseCameraManagementList = baseCameraManagementService.list(wrapper);
         }
         LambdaQueryWrapper<MsgAlarm> lw = new LambdaQueryWrapper<MsgAlarm>();
         if (!ObjectUtils.isEmpty(msgAlarmVo.getLineDir())) {
             lw.eq(MsgAlarm::getLineDir, msgAlarmVo.getLineDir());
         }
-        if (!ObjectUtils.isEmpty(baseCameraManagement.getCameraCode())) {
-            lw.eq(MsgAlarm::getCameraCode, baseCameraManagement.getCameraCode());
+        if (baseCameraManagementList.size()>0) {
+            List<String> collect = baseCameraManagementList.stream().map(BaseCameraManagement::getCameraCode).collect(Collectors.toList());
+            lw.in(MsgAlarm::getCameraCode, collect);
         }
         if (!ObjectUtils.isEmpty(msgAlarmVo.getAlarmType())) {
             lw.eq(MsgAlarm::getAlarmType, msgAlarmVo.getAlarmType());

+ 21 - 21
hazard-admin/src/main/java/com/ozs/web/controller/websocket/WebSocketConfig.java

@@ -1,21 +1,21 @@
-package com.ozs.web.controller.websocket;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
-/**
- * @Author : sunhh
- * @create 2023/3/7 16:17
- */
-@Configuration
-public class WebSocketConfig {
-    /**
-     * 	注入ServerEndpointExporter,
-     * 	这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
-     */
-    @Bean
-    public ServerEndpointExporter serverEndpointExporter() {
-        return new ServerEndpointExporter();
-    }
-}
+//package com.ozs.web.controller.websocket;
+//
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+//
+///**
+// * @Author : sunhh
+// * @create 2023/3/7 16:17
+// */
+//@Configuration
+//public class WebSocketConfig {
+//    /**
+//     * 	注入ServerEndpointExporter,
+//     * 	这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
+//     */
+//    @Bean
+//    public ServerEndpointExporter serverEndpointExporter() {
+//        return new ServerEndpointExporter();
+//    }
+//}

+ 129 - 129
hazard-admin/src/main/java/com/ozs/web/controller/websocket/WebSocketServer.java

@@ -1,129 +1,129 @@
-package com.ozs.web.controller.websocket;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import javax.websocket.*;
-import javax.websocket.server.PathParam;
-import javax.websocket.server.ServerEndpoint;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-/**
- * @Author : sunhh
- * @create 2023/3/7 16:15
- */
-@Slf4j
-@Component
-@ServerEndpoint("/websocket/{userId}")  // 接口路径 ws://localhost:8087/webSocket/userId;
-public class WebSocketServer {
-
-    //与某个客户端的连接会话,需要通过它来给客户端发送数据
-    private Session session;
-    /**
-     * 用户ID
-     */
-    private String userId;
-
-    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
-    //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
-    //  注:底下WebSocket是当前类名
-    private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>();
-    // 用来存在线连接用户信息
-    private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>();
-
-    /**
-     * 链接成功调用的方法
-     */
-    @OnOpen
-    public void onOpen(Session session, @PathParam(value="userId")String userId) {
-        try {
-            this.session = session;
-            this.userId = userId;
-            webSockets.add(this);
-            sessionPool.put(userId, session);
-            log.info("【websocket消息】有新的连接,总数为:"+webSockets.size());
-        } catch (Exception e) {
-        }
-    }
-
-    /**
-     * 链接关闭调用的方法
-     */
-    @OnClose
-    public void onClose() {
-        try {
-            webSockets.remove(this);
-            sessionPool.remove(this.userId);
-            log.info("【websocket消息】连接断开,总数为:"+webSockets.size());
-        } catch (Exception e) {
-        }
-    }
-    /**
-     * 收到客户端消息后调用的方法
-     *
-     * @param message
-     */
-    @OnMessage
-    public void onMessage(String message) {
-        log.info("【websocket消息】收到客户端消息:"+message);
-    }
-
-    /** 发送错误时的处理
-     * @param session
-     * @param error
-     */
-    @OnError
-    public void onError(Session session, Throwable error) {
-
-        log.error("用户错误,原因:"+error.getMessage());
-        error.printStackTrace();
-    }
-
-
-    // 此为广播消息
-    public void sendAllMessage(String message) {
-        log.info("【websocket消息】广播消息:"+message);
-        for(WebSocketServer webSocket : webSockets) {
-            try {
-                if(webSocket.session.isOpen()) {
-                    webSocket.session.getAsyncRemote().sendText(message);
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    // 此为单点消息
-    public void sendOneMessage(String userId, String message) {
-        Session session = sessionPool.get(userId);
-        if (session != null&&session.isOpen()) {
-            try {
-                log.info("【websocket消息】 单点消息:"+message);
-                session.getAsyncRemote().sendText(message);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    // 此为单点消息(多人)
-    public void sendMoreMessage(String[] userIds, String message) {
-        for(String userId:userIds) {
-            Session session = sessionPool.get(userId);
-            if (session != null&&session.isOpen()) {
-                try {
-                    log.info("【websocket消息】 单点消息:"+message);
-                    session.getAsyncRemote().sendText(message);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-
-    }
-}
+//package com.ozs.web.controller.websocket;
+//
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.stereotype.Component;
+//
+//import javax.websocket.*;
+//import javax.websocket.server.PathParam;
+//import javax.websocket.server.ServerEndpoint;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.concurrent.ConcurrentHashMap;
+//import java.util.concurrent.CopyOnWriteArraySet;
+//
+///**
+// * @Author : sunhh
+// * @create 2023/3/7 16:15
+// */
+//@Slf4j
+//@Component
+//@ServerEndpoint("/websocket/{userId}")  // 接口路径 ws://localhost:8087/webSocket/userId;
+//public class WebSocketServer {
+//
+//    //与某个客户端的连接会话,需要通过它来给客户端发送数据
+//    private Session session;
+//    /**
+//     * 用户ID
+//     */
+//    private String userId;
+//
+//    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
+//    //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
+//    //  注:底下WebSocket是当前类名
+//    private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>();
+//    // 用来存在线连接用户信息
+//    private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>();
+//
+//    /**
+//     * 链接成功调用的方法
+//     */
+//    @OnOpen
+//    public void onOpen(Session session, @PathParam(value="userId")String userId) {
+//        try {
+//            this.session = session;
+//            this.userId = userId;
+//            webSockets.add(this);
+//            sessionPool.put(userId, session);
+//            log.info("【websocket消息】有新的连接,总数为:"+webSockets.size());
+//        } catch (Exception e) {
+//        }
+//    }
+//
+//    /**
+//     * 链接关闭调用的方法
+//     */
+//    @OnClose
+//    public void onClose() {
+//        try {
+//            webSockets.remove(this);
+//            sessionPool.remove(this.userId);
+//            log.info("【websocket消息】连接断开,总数为:"+webSockets.size());
+//        } catch (Exception e) {
+//        }
+//    }
+//    /**
+//     * 收到客户端消息后调用的方法
+//     *
+//     * @param message
+//     */
+//    @OnMessage
+//    public void onMessage(String message) {
+//        log.info("【websocket消息】收到客户端消息:"+message);
+//    }
+//
+//    /** 发送错误时的处理
+//     * @param session
+//     * @param error
+//     */
+//    @OnError
+//    public void onError(Session session, Throwable error) {
+//
+//        log.error("用户错误,原因:"+error.getMessage());
+//        error.printStackTrace();
+//    }
+//
+//
+//    // 此为广播消息
+//    public void sendAllMessage(String message) {
+//        log.info("【websocket消息】广播消息:"+message);
+//        for(WebSocketServer webSocket : webSockets) {
+//            try {
+//                if(webSocket.session.isOpen()) {
+//                    webSocket.session.getAsyncRemote().sendText(message);
+//                }
+//            } catch (Exception e) {
+//                e.printStackTrace();
+//            }
+//        }
+//    }
+//
+//    // 此为单点消息
+//    public void sendOneMessage(String userId, String message) {
+//        Session session = sessionPool.get(userId);
+//        if (session != null&&session.isOpen()) {
+//            try {
+//                log.info("【websocket消息】 单点消息:"+message);
+//                session.getAsyncRemote().sendText(message);
+//            } catch (Exception e) {
+//                e.printStackTrace();
+//            }
+//        }
+//    }
+//
+//    // 此为单点消息(多人)
+//    public void sendMoreMessage(String[] userIds, String message) {
+//        for(String userId:userIds) {
+//            Session session = sessionPool.get(userId);
+//            if (session != null&&session.isOpen()) {
+//                try {
+//                    log.info("【websocket消息】 单点消息:"+message);
+//                    session.getAsyncRemote().sendText(message);
+//                } catch (Exception e) {
+//                    e.printStackTrace();
+//                }
+//            }
+//        }
+//
+//    }
+//}

+ 82 - 87
hazard-admin/src/main/java/com/ozs/web/core/util/CameraUtil.java

@@ -29,7 +29,7 @@ public class CameraUtil {
 
     private static String url;
     private static String historyUrl;
-    private static String ffmpegPath = "C:\\Users\\Administrator.DESKTOP-0NUUTMM\\Downloads\\ffmpeg-5.1.2-essentials_build\\ffmpeg-5.1.2-essentials_build\\bin\\ffmpeg.exe";
+    private static String ffmpegPath;
     private static String mappingUrl;
     private static String flvPath;
     private static String filePath;
@@ -168,99 +168,94 @@ public class CameraUtil {
      */
     public static void convetor(List<String> fromVideoFileList,
                                 String NewfilePath) throws IOException {
+        List<String> voidTS = new ArrayList<>();
+        Process process = null;
+        ProcessBuilder builder = null;
+        List<String> command = null;
 
-
-        try {
-            List<String> voidTS = new ArrayList<>();
-            Process process = null;
-            ProcessBuilder builder = null;
-            List<String> command = null;
-            for (int i = 0; i < fromVideoFileList.size(); i++) {
-                String fromVideoFile = fromVideoFileList.get(i);
-                command = new ArrayList<String>();
-                command.add(ffmpegPath);
-                command.add("-y");
-                command.add("-i");
-                command.add(fromVideoFile);
-                command.add("-vcodec");
-                command.add("copy");
-                command.add("-bsf:v");
-                command.add("h264_mp4toannexb");
-                command.add("-f");
-                command.add("mpegts");
-                command.add(fromVideoFile.substring(0,
-                        fromVideoFile.lastIndexOf(".")) + ".ts");
-                builder = new ProcessBuilder(command);
-                voidTS.add(fromVideoFile.substring(0,
-                        fromVideoFile.lastIndexOf("."))
-                        + ".ts");
-                try {
-                    process = builder.start();
-                    InputStream errorStream = process
-                            .getErrorStream();
-                    InputStreamReader inputStreamReader = new InputStreamReader(
-                            errorStream);
-                    BufferedReader br = new BufferedReader(
-                            inputStreamReader);
-                    String line = "";
-                    StringBuffer sb = new StringBuffer();
-                    while ((line = br.readLine()) != null) {
-                        sb.append(line);
-                    }
-                    String regexDuration = "Duration: (.*?), start: (.*?), bitrate: (\\d*) kb\\/s";
-                    Pattern pattern = Pattern
-                            .compile(regexDuration);
-                    Matcher m = pattern.matcher(sb.toString());
-                    System.out.println(sb.toString());
-                    br.close();
-                    inputStreamReader.close();
-                    errorStream.close();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-            List<String> dos = new ArrayList<>();
-            StringBuffer tsPath = new StringBuffer();
-            tsPath.append(ffmpegPath);
-            tsPath.append(" -i ");
-            tsPath.append("concat:");
-            for (int t = 0; t < voidTS.size(); t++) {
-                if (t != voidTS.size() - 1) {
-                    tsPath.append(voidTS.get(t) + "|");
-                } else {
-                    tsPath.append(voidTS.get(t));
-                }
-            }
-            tsPath.append(" -vcodec ");
-            tsPath.append(" copy ");
-            tsPath.append(" -bsf:a ");
-            tsPath.append(" aac_adtstoasc ");
-            tsPath.append(" -movflags ");
-            tsPath.append(" +faststart ");
-            tsPath.append(NewfilePath);
-            Process pr = Runtime.getRuntime().exec(
-                    tsPath.toString());
-            process.getInputStream();
-            pr.getOutputStream().close();
-            pr.getInputStream().close();
-            pr.getErrorStream().close();
+        for (int i = 0; i < fromVideoFileList.size(); i++) {
+            String fromVideoFile = fromVideoFileList.get(i);
+            command = new ArrayList<String>();
+            command.add(ffmpegPath);
+            command.add("-y");
+            command.add("-i");
+            command.add(fromVideoFile);
+            command.add("-vcodec");
+            command.add("copy");
+            command.add("-bsf:v");
+            command.add("h264_mp4toannexb");
+            command.add("-f");
+            command.add("mpegts");
+            fromVideoFile = fromVideoFile.substring(fromVideoFile.lastIndexOf("/") + 1, fromVideoFile.length());
+            String n = flvPath + fromVideoFile.substring(0,
+                    fromVideoFile.lastIndexOf("."));
+            command.add(n + ".ts");
+            builder = new ProcessBuilder(command);
+            voidTS.add(n + ".ts");
             try {
-                pr.waitFor();
-                Thread.sleep(1000);
-                pr.destroy();
-            } catch (InterruptedException e) {
-                // TODO Auto-generated catch block
+                process = builder.start();
+                InputStream errorStream = process
+                        .getErrorStream();
+                InputStreamReader inputStreamReader = new InputStreamReader(
+                        errorStream);
+                BufferedReader br = new BufferedReader(
+                        inputStreamReader);
+                String line = "";
+                StringBuffer sb = new StringBuffer();
+                while ((line = br.readLine()) != null) {
+                    sb.append(line);
+                }
+                String regexDuration = "Duration: (.*?), start: (.*?), bitrate: (\\d*) kb\\/s";
+                Pattern pattern = Pattern
+                        .compile(regexDuration);
+                Matcher m = pattern.matcher(sb.toString());
+                System.out.println(sb.toString());
+                br.close();
+                inputStreamReader.close();
+                errorStream.close();
+            } catch (IOException e) {
                 e.printStackTrace();
             }
-            //删除生成的ts文件
-            for (String filePath : voidTS) {
-                File file = new File(filePath);
-                file.delete();
-                pr.destroy();
+        }
+        List<String> dos = new ArrayList<>();
+        StringBuffer tsPath = new StringBuffer();
+        tsPath.append(ffmpegPath);
+        tsPath.append(" -i ");
+        tsPath.append("concat:");
+        for (int t = 0; t < voidTS.size(); t++) {
+            if (t != voidTS.size() - 1) {
+                tsPath.append(voidTS.get(t) + "|");
+            } else {
+                tsPath.append(voidTS.get(t));
             }
-        } catch (Exception e) {
+        }
+        tsPath.append(" -vcodec ");
+        tsPath.append(" copy ");
+        tsPath.append(" -bsf:a ");
+        tsPath.append(" aac_adtstoasc ");
+        tsPath.append(" -movflags ");
+        tsPath.append(" +faststart ");
+        tsPath.append(NewfilePath);
+        Process pr = Runtime.getRuntime().exec(
+                tsPath.toString());
+        process.getInputStream();
+        pr.getOutputStream().close();
+        pr.getInputStream().close();
+        pr.getErrorStream().close();
+        try {
+            pr.waitFor();
+            Thread.sleep(1000);
+            pr.destroy();
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
             e.printStackTrace();
         }
+        //删除生成的ts文件
+        for (String filePath : voidTS) {
+            File file = new File(filePath);
+            file.delete();
+            pr.destroy();
+        }
     }
 
     public static void main(String[] args) throws IOException {

+ 17 - 0
hazard-sdk/pom.xml

@@ -14,6 +14,23 @@
     <artifactId>hazard-sdk</artifactId>
 
     <dependencies>
+        <!-- webSocket web端 消息推送 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-thymeleaf</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
+
+        <!--lombok -->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.16.18</version>
+        </dependency>
 
         <!-- SpringBoot Web容器 -->
         <dependency>

+ 19 - 2
hazard-sdk/src/main/java/com/ozs/controller/upload/GeoHazardMonitorTokenController.java

@@ -24,6 +24,7 @@ import com.ozs.vo.ReqDeviceVo;
 import com.ozs.vo.ReqMsgAlarmVo;
 import com.ozs.vo.RespGeoHazardMonitorVo;
 import com.ozs.vo.RespMsgAlarmVo;
+import com.ozs.websocket.WebSocketServer;
 import io.lettuce.core.ScriptOutputType;
 import io.swagger.models.auth.In;
 import lombok.extern.slf4j.Slf4j;
@@ -69,7 +70,8 @@ public class GeoHazardMonitorTokenController {
     BaseCameraManagementService baseCameraManagementService;
 //    @Autowired
 //    private ISysUserService userService;
-
+    @Autowired
+    private WebSocketServer webSocketServer;
 
     /**
      * 获取web访问令牌
@@ -131,6 +133,21 @@ public class GeoHazardMonitorTokenController {
         if (StringUtils.isNotEmpty(token)) {
             String s = SM4Utils.decryptData_ECB(parameter, "4370780c9a8c43e5");
             ReqMsgAlarmVo reqMsgAlarmVo = JSON.parseObject(s, ReqMsgAlarmVo.class);
+            // 推送消息reqMsgAlarmVo
+            // 初始化 -> 运行中 -> 成功或失败
+//            try {
+//                // 1、初始化
+//                webSocketServer.sendMoreMessage(ids,objStr(pipelineId,1));
+//                Thread.sleep(3 * 1000);
+//                // 2、运行中
+//                webSocketServer.sendMoreMessage(ids,objStr(pipelineId,2));
+//                Thread.sleep(5 * 1000);
+//                // 3、成功或失败
+//                webSocketServer.sendMoreMessage(ids,Math.random() * 10 > 5 ? objStr(pipelineId,6) : objStr(pipelineId,7));
+//            } catch (InterruptedException e) {
+//                e.printStackTrace();
+//                throw new RuntimeException("构建失败");
+//            }
             log.info("reqMsgAlarmVo:{}", reqMsgAlarmVo);
             RespMsgAlarmVo respMsgAlarmVo = new RespMsgAlarmVo();
             LambdaQueryWrapper<BaseCameraManagement> wrapper = new LambdaQueryWrapper<BaseCameraManagement>();
@@ -223,7 +240,7 @@ public class GeoHazardMonitorTokenController {
                 jsonObject.put("message", "信息编码为空!");
                 return SM4Utils.encryptData_ECB(JSONObject.toJSONString(jsonObject), "4370780c9a8c43e5");
             }
-//            SysUser sysUser = userService.selectUserById(1L);
+            //SysUser sysUser = userService.selectUserById(1L);
             lw.eq(BaseDeviceDynamicManagement::getCameraCode, reqDeviceVo.getCameraCode());
             BaseDeviceDynamicManagement baseDynamicManagement = baseDeviceDynamicManagementService.getOne(lw);
             baseDynamicManagement.setElectricity(reqDeviceVo.getElectricity());

+ 21 - 0
hazard-sdk/src/main/java/com/ozs/websocket/WebSocketConfig.java

@@ -0,0 +1,21 @@
+package com.ozs.websocket;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+/**
+ * @Author : sunhh
+ * @create 2023/3/7 16:17
+ */
+@Configuration
+public class WebSocketConfig {
+    /**
+     * 	注入ServerEndpointExporter,
+     * 	这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
+     */
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+}

+ 129 - 0
hazard-sdk/src/main/java/com/ozs/websocket/WebSocketServer.java

@@ -0,0 +1,129 @@
+package com.ozs.websocket;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * @Author : sunhh
+ * @create 2023/3/7 16:15
+ */
+@Slf4j
+@Component
+@ServerEndpoint("/websocket/{userId}")  // 接口路径 ws://localhost:8087/webSocket/userId;
+public class WebSocketServer {
+
+    //与某个客户端的连接会话,需要通过它来给客户端发送数据
+    private Session session;
+    /**
+     * 用户ID
+     */
+    private String userId;
+
+    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
+    //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
+    //  注:底下WebSocket是当前类名
+    private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>();
+    // 用来存在线连接用户信息
+    private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>();
+
+    /**
+     * 链接成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(Session session, @PathParam(value="userId")String userId) {
+        try {
+            this.session = session;
+            this.userId = userId;
+            webSockets.add(this);
+            sessionPool.put(userId, session);
+            log.info("【websocket消息】有新的连接,总数为:"+webSockets.size());
+        } catch (Exception e) {
+        }
+    }
+
+    /**
+     * 链接关闭调用的方法
+     */
+    @OnClose
+    public void onClose() {
+        try {
+            webSockets.remove(this);
+            sessionPool.remove(this.userId);
+            log.info("【websocket消息】连接断开,总数为:"+webSockets.size());
+        } catch (Exception e) {
+        }
+    }
+    /**
+     * 收到客户端消息后调用的方法
+     *
+     * @param message
+     */
+    @OnMessage
+    public void onMessage(String message) {
+        log.info("【websocket消息】收到客户端消息:"+message);
+    }
+
+    /** 发送错误时的处理
+     * @param session
+     * @param error
+     */
+    @OnError
+    public void onError(Session session, Throwable error) {
+
+        log.error("用户错误,原因:"+error.getMessage());
+        error.printStackTrace();
+    }
+
+
+    // 此为广播消息
+    public void sendAllMessage(String message) {
+        log.info("【websocket消息】广播消息:"+message);
+        for(WebSocketServer webSocket : webSockets) {
+            try {
+                if(webSocket.session.isOpen()) {
+                    webSocket.session.getAsyncRemote().sendText(message);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    // 此为单点消息
+    public void sendOneMessage(String userId, String message) {
+        Session session = sessionPool.get(userId);
+        if (session != null&&session.isOpen()) {
+            try {
+                log.info("【websocket消息】 单点消息:"+message);
+                session.getAsyncRemote().sendText(message);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    // 此为单点消息(多人)
+    public void sendMoreMessage(String[] userIds, String message) {
+        for(String userId:userIds) {
+            Session session = sessionPool.get(userId);
+            if (session != null&&session.isOpen()) {
+                try {
+                    log.info("【websocket消息】 单点消息:"+message);
+                    session.getAsyncRemote().sendText(message);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+    }
+}