|
|
@@ -0,0 +1,180 @@
|
|
|
+package com.ozs.web.core.util;
|
|
|
+
|
|
|
+
|
|
|
+import com.alibaba.fastjson2.JSONObject;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
+import com.ozs.service.entity.BaseUser;
|
|
|
+import com.ozs.service.entity.MsgWebPush;
|
|
|
+import com.ozs.service.entity.UserStateStatistics;
|
|
|
+import com.ozs.service.entity.vo.MsgAlarmResp;
|
|
|
+import com.ozs.service.mapper.BaseUserMapper;
|
|
|
+import com.ozs.service.service.UserStateStatisticsService;
|
|
|
+import com.ozs.web.controller.websocket.WebSocketServer;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.context.annotation.Configuration;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
+import org.springframework.scheduling.annotation.EnableAsync;
|
|
|
+import org.springframework.scheduling.annotation.EnableScheduling;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
+import org.springframework.util.ObjectUtils;
|
|
|
+
|
|
|
+import java.util.*;
|
|
|
+import java.util.function.Function;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
+
|
|
|
+
|
|
|
+@Configuration
|
|
|
+@EnableScheduling
|
|
|
+@EnableAsync
|
|
|
+public class InsertUserStateUtil {
|
|
|
+
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(InsertUserStateUtil.class);
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private WebSocketServer webSocketServer;
|
|
|
+ @Autowired
|
|
|
+ private BaseUserMapper baseUserMapper;
|
|
|
+ @Autowired
|
|
|
+ private UserStateStatisticsService userStateStatisticsService;
|
|
|
+
|
|
|
+ @Async
|
|
|
+ @Scheduled(cron = "0 */5 * * * ?")
|
|
|
+ @Transactional
|
|
|
+ public void tableCreationAndDelete() {
|
|
|
+ // 获取在线用户和工务段用户
|
|
|
+ List<String> onlineUser = webSocketServer.getOnlineUser();
|
|
|
+ List<String> userIdList = baseUserMapper.getUserIdsLists();
|
|
|
+ //需要推送信息的用户
|
|
|
+ List<String> userIdsAll = baseUserMapper.getUserIdsAll();
|
|
|
+ //获取属于工段用户的用户ID
|
|
|
+ List<String> userLists =baseUserMapper.getUserLists();
|
|
|
+
|
|
|
+
|
|
|
+ // 使用Set提高查找性能
|
|
|
+ Set<String> onlineSet = new HashSet<>(onlineUser);
|
|
|
+ Set<String> userSet = new HashSet<>(userIdList);
|
|
|
+ Set<String> userAll = new HashSet<>(userIdsAll);
|
|
|
+
|
|
|
+ // 计算差集和交集
|
|
|
+ Set<String> offlineUsers = userSet.stream()
|
|
|
+ .filter(userId -> !onlineSet.contains(userId))
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+ Set<String> offlineUsersAll = userAll.stream()
|
|
|
+ .filter(userId -> !onlineSet.contains(userId))
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+ Set<String> onlineUsers = onlineSet.stream()
|
|
|
+ .filter(userSet::contains)
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+
|
|
|
+ // 批量查询用户信息,减少数据库查询
|
|
|
+ Map<String, BaseUser> userMap = baseUserMapper.getUsersByIds(
|
|
|
+ Stream.concat(offlineUsers.stream(), onlineUsers.stream())
|
|
|
+ .collect(Collectors.toList())
|
|
|
+ ).stream().collect(Collectors.toMap(BaseUser::getUserId, user -> user));
|
|
|
+
|
|
|
+ // 批量查询最新用户状态
|
|
|
+ List<String> allUserIds = Stream.concat(offlineUsers.stream(), onlineUsers.stream())
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ Map<String, UserStateStatistics> latestUserStateMap = getUserLatestStateMap(allUserIds);
|
|
|
+ log.info("用户状态 {}", latestUserStateMap);
|
|
|
+ // 处理离线用户
|
|
|
+ processUserStates(offlineUsers, userMap, latestUserStateMap, 2, userStateStatisticsService);
|
|
|
+
|
|
|
+ // 处理在线用户
|
|
|
+ processUserStates(onlineUsers, userMap, latestUserStateMap, 1, userStateStatisticsService);
|
|
|
+ // 推送信息
|
|
|
+ String data = objStr(offlineUsersAll.size());
|
|
|
+ log.info("准备推送数据: {}, 目标用户数: {}", data, userLists.size());
|
|
|
+ // 先批量检查在线状态,减少重复调用
|
|
|
+ List<String> onlineUserList = userLists.stream()
|
|
|
+ .filter(webSocketServer::userOnline)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ log.info("在线用户数: {}", onlineUserList.size());
|
|
|
+
|
|
|
+ // 批量推送
|
|
|
+ onlineUserList.parallelStream().forEach(userId -> {
|
|
|
+ try {
|
|
|
+ webSocketServer.sendOneMessage(userId, data);
|
|
|
+ log.info("推送成功 - 用户: {}", userId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("推送消息给用户 {} 时发生异常: {}", userId, e.getMessage(), e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ log.info("推送完成,成功推送数: {}", onlineUserList.size());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 批量获取用户最新状态
|
|
|
+ */
|
|
|
+ private Map<String, UserStateStatistics> getUserLatestStateMap(List<String> userIds) {
|
|
|
+ if (ObjectUtils.isEmpty(userIds)) {
|
|
|
+ return new HashMap<>();
|
|
|
+ }
|
|
|
+
|
|
|
+ // 使用批量查询替代循环中的单个查询
|
|
|
+ List<UserStateStatistics> latestStates = userStateStatisticsService.list(
|
|
|
+ new LambdaQueryWrapper<UserStateStatistics>()
|
|
|
+ .in(UserStateStatistics::getUserId, userIds)
|
|
|
+ .orderByDesc(UserStateStatistics::getCreateTime)
|
|
|
+ );
|
|
|
+
|
|
|
+ // 按用户ID分组,取每个用户的最新状态
|
|
|
+ return latestStates.stream()
|
|
|
+ .collect(Collectors.toMap(
|
|
|
+ UserStateStatistics::getUserId,
|
|
|
+ Function.identity(),
|
|
|
+ (existing, replacement) -> existing.getCreateTime().after(replacement.getCreateTime()) ? existing : replacement
|
|
|
+ ));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理用户状态更新
|
|
|
+ */
|
|
|
+ private void processUserStates(Set<String> userIds,
|
|
|
+ Map<String, BaseUser> userMap,
|
|
|
+ Map<String, UserStateStatistics> latestStateMap,
|
|
|
+ Integer targetState,
|
|
|
+ UserStateStatisticsService service) {
|
|
|
+ List<UserStateStatistics> toSave = new ArrayList<>();
|
|
|
+
|
|
|
+ for (String userId : userIds) {
|
|
|
+ UserStateStatistics latestState = latestStateMap.get(userId);
|
|
|
+ BaseUser user = userMap.get(userId);
|
|
|
+
|
|
|
+ if (user == null) continue;
|
|
|
+
|
|
|
+ // 判断是否需要新增记录
|
|
|
+ if (ObjectUtils.isEmpty(latestState) ||
|
|
|
+ !targetState.equals(latestState.getIsDisable())) {
|
|
|
+
|
|
|
+ UserStateStatistics newState = new UserStateStatistics();
|
|
|
+ newState.setUserId(userId);
|
|
|
+ newState.setUserName(user.getUserName());
|
|
|
+ newState.setNikeName(user.getNickName());
|
|
|
+ newState.setIsDisable(targetState);
|
|
|
+ newState.setCreateTime(new Date());
|
|
|
+
|
|
|
+ toSave.add(newState);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 批量保存
|
|
|
+ if (!toSave.isEmpty()) {
|
|
|
+ service.saveBatch(toSave);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private String objStr(Integer count) {
|
|
|
+ JSONObject obj = new JSONObject();
|
|
|
+ obj.put("data", count);
|
|
|
+ obj.put("code", 120);
|
|
|
+ log.info("【websocket消息】 单点消息:" + obj.toJSONString());
|
|
|
+ return obj.toJSONString();
|
|
|
+ }
|
|
|
+}
|