Browse Source

webSocket 消息推送

sunhh 2 years ago
parent
commit
1ad31f7877

+ 5 - 0
hazard-admin/pom.xml

@@ -18,6 +18,11 @@
     </description>
 
     <dependencies>
+        <!-- webSocket web端 消息推送 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
 
         <!-- spring-boot-devtools -->
         <dependency>

+ 23 - 2
hazard-admin/src/main/java/com/ozs/web/controller/accountmanagment/MsgAlarmController.java

@@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.ozs.HazardApplication;
 import com.ozs.common.core.controller.BaseController;
 import com.ozs.common.core.domain.AjaxResult;
 import com.ozs.common.utils.StringUtils;
@@ -14,8 +15,10 @@ import com.ozs.service.entity.vo.HistoricalAlarmVo;
 import com.ozs.service.entity.vo.MsgAlarmVo;
 import com.ozs.service.service.MsgAlarmFrequencyService;
 import com.ozs.service.service.MsgAlarmService;
+import com.ozs.web.core.config.WebSocketService;
 import io.swagger.annotations.ApiOperation;
 import org.springframework.beans.BeanUtils;
+import org.springframework.boot.SpringApplication;
 import org.springframework.util.ObjectUtils;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
@@ -47,6 +50,24 @@ public class MsgAlarmController extends BaseController {
     @Resource
     MsgAlarmFrequencyService msgAlarmFrequencyService;
 
+    /**
+     * 测试消息推送
+     */
+    @PostMapping("/push")
+    @ApiOperation(value = "预报警信息分页")
+    public AjaxResult insertDate(String[] args) {
+        // 模拟需要推送的用户群
+        ArrayList<String> ids = new ArrayList<>();
+        ids.add("001");
+        ids.add("002");
+        ids.add("003");
+        ids.add("004");
+        ids.add("005");
+
+        WebSocketService webSocketService = new WebSocketService();
+        webSocketService.sendMsgToUsers(ids);
+        return AjaxResult.success("推送成功");
+    }
 
     /**
      * 报警信息分页
@@ -111,7 +132,7 @@ public class MsgAlarmController extends BaseController {
     @ApiOperation("报警消息详情历史报警次数信息")
     public AjaxResult alarmHistoricalAlarm(@PathVariable Long alarmId) {
         QueryWrapper<MsgAlarmFrequency> wrapper = new QueryWrapper<>();
-        wrapper.eq("alarm_id",alarmId);
+        wrapper.eq("alarm_id", alarmId);
         wrapper.orderByDesc("alarm_time");
         List<MsgAlarmFrequency> list = msgAlarmFrequencyService.list(wrapper);
         return AjaxResult.success(list);
@@ -121,7 +142,7 @@ public class MsgAlarmController extends BaseController {
     @ApiOperation("视频服务历史报警信息")
     public AjaxResult videoHistoricalAlarm(@PathVariable Long alarmCamera) {
         QueryWrapper<MsgAlarm> wrapper = new QueryWrapper<>();
-        wrapper.eq("alarm_camera",alarmCamera);
+        wrapper.eq("alarm_camera", alarmCamera);
         wrapper.orderByDesc("alarm_time");
         List<MsgAlarm> list = msgAlarmService.list(wrapper);
         return AjaxResult.success(list);

+ 113 - 0
hazard-admin/src/main/java/com/ozs/web/core/config/Server4WebSocket.java

@@ -0,0 +1,113 @@
+package com.ozs.web.core.config;
+
+import com.alibaba.fastjson2.JSONObject;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @Author : sunhh
+ * @create 2023/3/2 14:57
+ */
+@Component
+@ServerEndpoint(value = "/serverForWebSocket/{userId}")
+public class Server4WebSocket {
+    // 存储登录用户的 sid 以及 session
+    private static ConcurrentHashMap<String, Session> connections = new ConcurrentHashMap<>(32);
+    // 是WebSocket的Session
+    private Session session;
+    // 统计在线人数
+    private static AtomicInteger onlineCount = new AtomicInteger();
+
+    @OnOpen //事件 --登录的人.//当你登录之后建立连接,此方法便会执行
+    public void onopen(@PathParam("userId") String userId, Session session) {
+        this.session = session;
+        System.out.println("seesionId为" + session.getId());
+        if (connections.containsKey(userId)) {
+            connections.remove(userId);
+            connections.put(userId, session);
+        } else {
+            onlineCount.incrementAndGet();
+            connections.put(userId, session);
+            System.out.println("用户:" + userId + "-" + session.getId() + "上线了-" + session);
+            System.out.println("在线人数:" + onlineCount);
+        }
+        String content = new JSONObject(connections).toString();
+        System.out.println("在线用户信息:" + content);
+    }
+
+    @OnClose
+    public void onClose(Session session) {
+        for (String userId : connections.keySet()) {
+            if (connections.get(userId).equals(session)) {
+                System.out.println("用户:" + userId + "-关闭-" + session);
+                connections.remove(session);
+                onlineCount.decrementAndGet(); // 在线数减1
+            }
+        }
+    }
+
+    @OnMessage
+    public void onMessage(String msg, Session session) {
+        System.out.println("服务端收到客户端" + session.getId() + "的消息:" + msg);
+        // 客户端向服务端发送消息,然后再推送给其他的用户,可以在这里进行设置
+        try {
+            sendMessage(msg, session);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @OnError
+    public void onError(Session session, Throwable error) {
+        System.out.println("发生错误");
+        error.printStackTrace();
+    }
+
+    //推送
+    public void sendMessage(String msg, Session session) throws IOException {
+//        for (Session session1 : webSocketSet) {
+//            if (session != session1)
+//                session1.getBasicRemote().sendText(msg);
+//        }
+        System.out.println("推送:" + msg);
+        session.getBasicRemote().sendText(msg);
+    }
+
+    // 推送给指定的用户群
+    public void sendMsgToUsers(List<String> ids) {
+        ids.stream().forEach(s -> {
+            System.out.println("用户:" + s + "是否能够推送:" + connections.containsKey(s));
+            if (connections.containsKey(s)) {
+                if (connections.get(s).isOpen()) {
+                    try {
+                        System.out.println("开始推送");
+                        sendMessage("hello:" + s, connections.get(s));
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        });
+    }
+
+    // 推送给全部在线用户
+    public void sendMsgToAll() {
+        connections.keySet().stream().forEach(s -> {
+            if (connections.get(s).isOpen()) {
+                try {
+                    System.out.println("开始推送");
+                    sendMessage("hello:" + s, connections.get(s));
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+    }
+}

+ 21 - 0
hazard-admin/src/main/java/com/ozs/web/core/config/WebSocketService.java

@@ -0,0 +1,21 @@
+package com.ozs.web.core.config;
+
+import java.util.List;
+
+/**
+ * @Author : sunhh
+ * @create 2023/3/2 15:01
+ */
+public class WebSocketService {
+    // 推送给指定的在线的用户
+    public void sendMsgToUsers(List<String> ids) {
+        Server4WebSocket server4WebSocket = new Server4WebSocket();
+        server4WebSocket.sendMsgToUsers(ids);
+    }
+
+    // 推送给所有用户
+    public void sendMsgToAll() {
+        Server4WebSocket server4WebSocket = new Server4WebSocket();
+        server4WebSocket.sendMsgToAll();
+    }
+}

+ 18 - 0
hazard-admin/src/main/java/com/ozs/web/core/config/WebsocketConfig.java

@@ -0,0 +1,18 @@
+package com.ozs.web.core.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+/**
+ * @Author : sunhh
+ * @create 2023/3/2 10:41
+ */
+
+@Configuration
+public class WebsocketConfig {
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+}