|
|
@@ -4,9 +4,7 @@ package com.ozs.web.core.util;
|
|
|
import com.alibaba.fastjson2.JSONObject;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
import com.ozs.service.entity.BaseUser;
|
|
|
-import com.ozs.service.entity.MsgWebPush;
|
|
|
import com.ozs.service.entity.UserStateStatistics;
|
|
|
-import com.ozs.service.entity.vo.MsgAlarmResp;
|
|
|
import com.ozs.service.mapper.BaseUserMapper;
|
|
|
import com.ozs.service.service.UserStateStatisticsService;
|
|
|
import com.ozs.web.controller.websocket.WebSocketServer;
|
|
|
@@ -60,21 +58,48 @@ public class InsertUserStateUtil {
|
|
|
|
|
|
|
|
|
// 使用Set提高查找性能
|
|
|
- Set<String> onlineSet = new HashSet<>(onlineUser);
|
|
|
Set<String> userSet = new HashSet<>(userIdList);
|
|
|
Set<String> userAll = new HashSet<>(userIdsAll);
|
|
|
-
|
|
|
- // 计算差集和交集
|
|
|
- Set<String> offlineUsers = userSet.stream()
|
|
|
- .filter(userId -> !onlineSet.contains(userId))
|
|
|
- .collect(Collectors.toSet());
|
|
|
- Set<String> offlineUsersAll = userAll.stream()
|
|
|
- .filter(userId -> !onlineSet.contains(userId))
|
|
|
- .collect(Collectors.toSet());
|
|
|
- Set<String> onlineUsers = onlineSet.stream()
|
|
|
- .filter(userSet::contains)
|
|
|
- .collect(Collectors.toSet());
|
|
|
-
|
|
|
+ Set<String> initialOnlineSet = new HashSet<>(onlineUser);
|
|
|
+
|
|
|
+ // 重试配置
|
|
|
+ int maxRetry = 3;
|
|
|
+ long retryDelay = 3000;
|
|
|
+ int currentRetry = 0;
|
|
|
+ Set<String> offlineUsers = calculateOfflineUsers(userSet, initialOnlineSet);
|
|
|
+ // 存储最终最新的在线用户集合(默认用初始值,重试后更新)
|
|
|
+ Set<String> latestOnlineSet = new HashSet<>(initialOnlineSet);
|
|
|
+
|
|
|
+ // 差集存在时,重试拉取在线用户并重新比对
|
|
|
+ while (!offlineUsers.isEmpty() && currentRetry < maxRetry) {
|
|
|
+ try {
|
|
|
+ currentRetry++;
|
|
|
+ log.info("第{}次检测到离线用户:{},等待{}秒后重新检查在线状态",
|
|
|
+ currentRetry, offlineUsers, retryDelay / 1000);
|
|
|
+
|
|
|
+ Thread.sleep(retryDelay);
|
|
|
+
|
|
|
+ // 重新获取最新在线用户列表并更新到 latestOnlineSet
|
|
|
+ List<String> latestOnlineUser = webSocketServer.getOnlineUser();
|
|
|
+ latestOnlineSet = new HashSet<>(latestOnlineUser);
|
|
|
+
|
|
|
+ // 重新计算差集(基于最新在线状态)
|
|
|
+ offlineUsers = calculateOfflineUsers(userSet, latestOnlineSet);
|
|
|
+
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("重试等待时线程被中断", e);
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 关键修改:用 latestOnlineSet(重试后最新状态)计算最终结果
|
|
|
+ // 1. 所有需推送用户中的离线用户(userAll - 最新在线)
|
|
|
+ Set<String> offlineUsersAll = calculateOfflineUsers(userAll, latestOnlineSet);
|
|
|
+ // 2. 工务段用户中的在线用户(工务段用户 ∩ 最新在线)
|
|
|
+ Set<String> onlineUsers = calculateIntersection(userSet, latestOnlineSet);
|
|
|
+
|
|
|
+ // 后续可添加结果日志打印
|
|
|
+ log.info("最终工务段在线用户:{},所有需推送用户中的离线用户:{}", onlineUsers, offlineUsersAll);
|
|
|
// 批量查询用户信息,减少数据库查询
|
|
|
Map<String, BaseUser> userMap = baseUserMapper.getUsersByIds(
|
|
|
Stream.concat(offlineUsers.stream(), onlineUsers.stream())
|
|
|
@@ -177,7 +202,23 @@ public class InsertUserStateUtil {
|
|
|
service.saveBatch(toSave);
|
|
|
}
|
|
|
}
|
|
|
+ /**
|
|
|
+ * 计算差集:targetSet - onlineSet(目标集合中不在在线集合的元素)
|
|
|
+ */
|
|
|
+ private Set<String> calculateOfflineUsers(Set<String> targetSet, Set<String> onlineSet) {
|
|
|
+ return targetSet.stream()
|
|
|
+ .filter(userId -> !onlineSet.contains(userId))
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+ }
|
|
|
|
|
|
+ /**
|
|
|
+ * 计算交集:targetSet ∩ onlineSet(同时存在于两个集合的元素)
|
|
|
+ */
|
|
|
+ private Set<String> calculateIntersection(Set<String> targetSet, Set<String> onlineSet) {
|
|
|
+ return targetSet.stream()
|
|
|
+ .filter(onlineSet::contains)
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+ }
|
|
|
private String objStr(Integer count) {
|
|
|
JSONObject obj = new JSONObject();
|
|
|
obj.put("data", count);
|