Browse Source

Merge branch 'master' of http://124.70.58.209:3000/ytrd-project-management/GeoHazardMonitor

gao.qiang 6 months ago
parent
commit
72d1087d7f

+ 17 - 0
business-service/src/main/java/com/ozs/service/entity/vo/BaseCameraVO.java

@@ -6,6 +6,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NoArgsConstructor;
 
 
 import java.io.Serializable;
 import java.io.Serializable;
+import java.util.Date;
 
 
 /**
 /**
  * @author wyy
  * @author wyy
@@ -28,4 +29,20 @@ public class BaseCameraVO implements Serializable{
      * 通道编号
      * 通道编号
      */
      */
     private String channel;
     private String channel;
+
+    /**
+     * 下次执行时间
+     */
+    private String nextExecuteTime;
+
+    /**
+     * 是否为预警摄像头
+     */
+    private Boolean isAlarm;
+
+    /**
+     * 状态
+     */
+    private String status;
+
 }
 }

+ 43 - 0
business-service/src/main/java/com/ozs/service/service/RedisService.java

@@ -3,10 +3,14 @@ package com.ozs.service.service;
 import com.ozs.service.entity.vo.BaseCameraVO;
 import com.ozs.service.entity.vo.BaseCameraVO;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.SetOperations;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
 
 
+import java.util.List;
 import java.util.Set;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 
 /**
 /**
  * @author wyy
  * @author wyy
@@ -114,11 +118,50 @@ public class RedisService {
         redisTemplate.opsForSet().add(key, object);
         redisTemplate.opsForSet().add(key, object);
     }
     }
 
 
+    // 将对象添加到hash集合
+    public void addToHash(String key, String hashKey, BaseCameraVO vo) {
+        redisTemplate.opsForHash().put(key, hashKey, vo);
+    }
+
     // 从set集合中删除对象
     // 从set集合中删除对象
     public long removeFromSet(String key, BaseCameraVO object) {
     public long removeFromSet(String key, BaseCameraVO object) {
         return redisTemplate.opsForSet().remove(key, object);
         return redisTemplate.opsForSet().remove(key, object);
     }
     }
 
 
+    // 删除set集合
+    public boolean delete(String key) {
+        return redisTemplate.delete(key);
+    }
+
+    // 根据KEY清空set集合
+    public boolean removeBySetItemKey(String key, String itemKey) {
+        SetOperations<String, BaseCameraVO> setOperations = redisTemplate.opsForSet();
+        List<BaseCameraVO> _filters = setOperations.members(key).stream()
+                .filter(f -> f.getCameraCode().equalsIgnoreCase(itemKey)).collect(Collectors.toList());
+        if (!CollectionUtils.isEmpty(_filters)) {
+            for (BaseCameraVO ele : _filters) {
+                setOperations.remove(key, ele);
+            }
+        }
+        return true;
+    }
+
+    // 根据KEY清空set集合
+    public boolean clearFromSet(String key) {
+        SetOperations<String, BaseCameraVO> setOperations = redisTemplate.opsForSet();
+        Set<BaseCameraVO> sets = setOperations.members(key);
+        for (BaseCameraVO ele : sets) {
+            setOperations.remove(key, ele);
+        }
+        return true;
+    }
+
+    // 根据项KEY判断是否存在set集合中
+    public boolean existFromSet(String key, String itemKey) {
+        return !CollectionUtils.isEmpty(redisTemplate.opsForSet().members(key).stream()
+                .filter(f -> f.getCameraCode().equalsIgnoreCase(itemKey)).collect(Collectors.toList()));
+    }
+
     // 获取set集合中的所有对象
     // 获取set集合中的所有对象
     public Set<BaseCameraVO> getSetMembers(String key) {
     public Set<BaseCameraVO> getSetMembers(String key) {
         return redisTemplate.opsForSet().members(key);
         return redisTemplate.opsForSet().members(key);

+ 47 - 21
business-service/src/main/java/com/ozs/service/service/impl/CameraCaptureService.java

@@ -11,6 +11,7 @@ import org.springframework.stereotype.Service;
 import javax.annotation.Resource;
 import javax.annotation.Resource;
 import java.io.*;
 import java.io.*;
 import java.net.HttpURLConnection;
 import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
 import java.net.URL;
 import java.net.URL;
 import java.util.Date;
 import java.util.Date;
 
 
@@ -25,11 +26,20 @@ public class CameraCaptureService {
     @Resource
     @Resource
     MinioUtils minioUtils;
     MinioUtils minioUtils;
 
 
-    public void getCapture(URL url, String fileName) throws IOException {
+
+    private final static Integer CONST_TIME_OUT = 60 * 1000;
+
+    public boolean getCapture(URL url, String fileName) throws IOException {
+        return getCapture(url, "", fileName);
+    }
+
+    public boolean getCapture(URL url, String cameraCode, String fileName) throws IOException {
         HttpURLConnection connection = null;
         HttpURLConnection connection = null;
-        BufferedInputStream bis = null;
-        BufferedOutputStream bos = null;
+//        BufferedInputStream bis = null;
+//        BufferedOutputStream bos = null;
         ByteArrayOutputStream byteArrayOutputStream = null;
         ByteArrayOutputStream byteArrayOutputStream = null;
+        ByteArrayInputStream byteArrayInputStream = null;
+        FileOutputStream fileOutputStream = null;
         try {
         try {
             connection = (HttpURLConnection) url.openConnection();
             connection = (HttpURLConnection) url.openConnection();
             // http正文内,因此需要设为true
             // http正文内,因此需要设为true
@@ -49,47 +59,62 @@ public class CameraCaptureService {
             //connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8");
             //connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8");
             connection.setRequestProperty("Charsert", "UTF-8");
             connection.setRequestProperty("Charsert", "UTF-8");
             //设置读取时间为30秒
             //设置读取时间为30秒
-            connection.setConnectTimeout(30 * 1000);
-            connection.setReadTimeout(180 * 1000);
+            connection.setConnectTimeout(CONST_TIME_OUT);
+            connection.setReadTimeout(CONST_TIME_OUT);
             connection.connect();
             connection.connect();
 
 
             // 返回流
             // 返回流
-            System.out.println("responseCode=" + connection.getResponseCode()+"url"+url.toString());
-            log.info("responseCode="+connection.getResponseCode());
+//            System.out.println("responseCode: " + connection.getResponseCode()+", url: "+url.toString());
+//            log.info("responseCode: " + connection.getResponseCode()+", url: " + url);
             if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
             if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
                 InputStream input = connection.getInputStream();
                 InputStream input = connection.getInputStream();
-                bis = new BufferedInputStream(input);
+//                bis = new BufferedInputStream(input);
                 byte [] bytes = new byte[102400];
                 byte [] bytes = new byte[102400];
                 int index = 0;
                 int index = 0;
                 byteArrayOutputStream = new ByteArrayOutputStream();
                 byteArrayOutputStream = new ByteArrayOutputStream();
                 while (-1 != (index = input.read(bytes,0,bytes.length))){
                 while (-1 != (index = input.read(bytes,0,bytes.length))){
                     byteArrayOutputStream.write(bytes,0,index);
                     byteArrayOutputStream.write(bytes,0,index);
                 }
                 }
-                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
-//                String uploadUrl = "/data/test_picture/"+ IdUtils.fastSimpleUUID()+".jpeg";
+                byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
+                //String uploadUrl = "/data/test_picture/"+ IdUtils.fastSimpleUUID()+".jpeg";
+//                String uploadUrl = "d://tmp/c/"+ (cameraCode + "-" + (System.currentTimeMillis() / 1000)) +".jpeg";
 //                if (log.isDebugEnabled()) {
 //                if (log.isDebugEnabled()) {
-//                    log.debug("生成至服务器本地开始,路径为======" + uploadUrl);
+//                    log.debug("生成至服务器本地开始,路径为: " + uploadUrl);
 //                }
 //                }
-//                IOUtils.copy(byteArrayInputStream, new FileOutputStream(uploadUrl));
+                //fileOutputStream = new FileOutputStream(uploadUrl);
+                //IOUtils.copy(byteArrayInputStream, fileOutputStream);
 //                if (log.isDebugEnabled()) {
 //                if (log.isDebugEnabled()) {
-//                    log.debug("生成至服务器本地结束,路径为======");
+//                    log.debug("生成至服务器本地结束,路径为: " + uploadUrl);
+//                }
+               //byteArrayInputStream.reset();
+//                if (log.isDebugEnabled()) {
+//                    log.debug("getCapture======" + fileName);
 //                }
 //                }
-//                byteArrayInputStream.reset();
-                if (log.isDebugEnabled()) {
-                    log.debug("getCapture======" + fileName);
-                }
                 minioUtils.minIoClientUpload(byteArrayInputStream, fileName);
                 minioUtils.minIoClientUpload(byteArrayInputStream, fileName);
+//                if (log.isDebugEnabled()) {
+//                    log.debug("fileName: " + fileName + ", 图片结束截图时间: " + new Date());
+//                }
+                return true;
+            } else {
                 if (log.isDebugEnabled()) {
                 if (log.isDebugEnabled()) {
-                    log.debug("fileName:"+fileName+"图片结束截图时间:"+new Date());
+                    log.debug("请求摄像头截图接口出错, url: " + url + ", 反馈错误状态码: " + connection.getResponseCode() +
+                            ", 错误内容: " + connection.getResponseMessage());
                 }
                 }
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
             //先关闭外层的缓冲流,再关闭内层的流,但是在关闭外层流的同时,
             //先关闭外层的缓冲流,再关闭内层的流,但是在关闭外层流的同时,
             //内层流也会自动的进行关闭,关于内层流的关闭,可以省略
             //内层流也会自动的进行关闭,关于内层流的关闭,可以省略
-            e.printStackTrace();
+//            e.printStackTrace();
+            log.error("图片截图出现异常情形: " + e.getMessage());
         } finally {
         } finally {
-            if (bis != null) {
-                bis.close();
+//            if (bis != null) {
+//                bis.close();
+//            }
+            if (fileOutputStream != null) {
+                fileOutputStream.close();
+            }
+            if (byteArrayInputStream != null) {
+                byteArrayInputStream.close();
             }
             }
             if (connection != null) {
             if (connection != null) {
                 connection.disconnect();
                 connection.disconnect();
@@ -99,5 +124,6 @@ public class CameraCaptureService {
                 byteArrayOutputStream.close();
                 byteArrayOutputStream.close();
             }
             }
         }
         }
+        return false;
     }
     }
 }
 }

+ 6 - 0
hazard-admin/pom.xml

@@ -108,6 +108,12 @@
             </exclusions>
             </exclusions>
         </dependency>
         </dependency>
 
 
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+            <version>5.8.26</version>
+        </dependency>
+
     </dependencies>
     </dependencies>
 
 
     <build>
     <build>

+ 2 - 0
hazard-admin/src/main/java/com/ozs/HazardApplication.java

@@ -3,6 +3,7 @@ package com.ozs;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
 import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.scheduling.annotation.EnableAsync;
 
 
 /**
 /**
  * 启动程序
  * 启动程序
@@ -10,6 +11,7 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
  * @author ruoyi
  * @author ruoyi
  */
  */
 @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
 @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
+@EnableAsync(proxyTargetClass = true)
 public class HazardApplication
 public class HazardApplication
 {
 {
     public static void main(String[] args)
     public static void main(String[] args)

+ 110 - 0
hazard-admin/src/main/java/com/ozs/web/controller/shotschedule/CaptureStreamListener.java

@@ -0,0 +1,110 @@
+package com.ozs.web.controller.shotschedule;
+
+import cn.hutool.core.date.DateField;
+import cn.hutool.core.date.DateTime;
+import cn.hutool.core.date.DateUnit;
+import cn.hutool.core.lang.Dict;
+import cn.hutool.json.JSONUtil;
+import com.ozs.common.utils.StringUtils;
+import com.ozs.service.entity.vo.BaseCameraVO;
+import com.ozs.service.service.RedisService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
+import org.springframework.data.redis.connection.stream.ObjectRecord;
+import org.springframework.data.redis.connection.stream.RecordId;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.data.redis.stream.StreamListener;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+@Component
+@Slf4j
+public class CaptureStreamListener implements StreamListener<String, ObjectRecord<String, String>> {
+
+    @Resource
+    private StringRedisTemplate stringRedisTemplate;
+
+    @Resource
+    public RedisTemplate<String, String> redisTemplate;
+
+    /**
+     * 生成 runable 任务服务提供类
+     */
+    @Resource
+    private GenPictureTaskService genPictureTaskService;
+
+    /**
+     * Redis 缓存服务提供类
+     */
+    @Resource
+    private RedisService redisService;
+
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+    }
+
+    @Override
+    @Async
+    public void onMessage(ObjectRecord<String, String> message) {
+        ShotPictureTaskExecutors.RUNNING_ATOMIC_INT.incrementAndGet();
+        BaseCameraVO vo = null;
+        try {
+            // 消息ID
+            RecordId recordId = message.getId();
+            // 消息的key和value
+            String value = message.getValue();
+            if (StringUtils.isNotBlank(value) && value.indexOf("cameraCode") == -1) {
+                stringRedisTemplate.opsForStream().acknowledge(RedisPushService.STREAM_KEY, RedisPushService.STREAM_CONSUMER_GROUP_KEY, recordId.getValue());
+                stringRedisTemplate.opsForStream().delete(RedisPushService.STREAM_KEY, recordId.getValue());
+                ShotPictureTaskExecutors.RUNNING_ATOMIC_INT.decrementAndGet();
+                return;
+            }
+            Dict dataDict = JSONUtil.toBean(value, Dict.class);
+
+            String cameraCode = dataDict.getStr("cameraCode");
+
+            Boolean isAlarm = dataDict.getBool("isAlarm");
+            String camera_type_hash = isAlarm ? ShotPictureTaskExecutors.CAMERA_ALARM_HASH : ShotPictureTaskExecutors.CAMERA_NORMAL_HASH;
+
+            Object cacheObj = redisTemplate.opsForHash().get(camera_type_hash, cameraCode);
+            vo = JSONUtil.toBean(ObjectUtils.toString(cacheObj), BaseCameraVO.class);
+            vo.setStatus("running");
+            if (vo.getIsAlarm() == null) {
+                vo.setIsAlarm(isAlarm);
+            }
+            redisService.addToHash(camera_type_hash, vo.getCameraCode(), vo);
+
+            DateTime nextExecuteTime = DateTime.now();
+            if (StringUtils.isNotBlank(dataDict.getStr("nextExecuteTime"))) {
+                nextExecuteTime = DateTime.of(dataDict.getStr("nextExecuteTime"), "yyyy-MM-dd HH:mm:ss");
+            }
+            String ymd = DateTime.now().toString("yyyy-MM-dd");
+            DateTime now = DateTime.now();
+            // 超过时间则进行抓拍
+            if (now.isAfterOrEquals(nextExecuteTime) && nextExecuteTime.between(now, DateUnit.MINUTE) < 2) {
+                genPictureTaskService.getNormalPicture(cameraCode, dataDict.getStr("channel"), dataDict.getStr("address"), ymd);
+            }
+            // 通过RedisTemplate手动确认消息
+            stringRedisTemplate.opsForStream().acknowledge(RedisPushService.STREAM_KEY, RedisPushService.STREAM_CONSUMER_GROUP_KEY, recordId.getValue());
+            stringRedisTemplate.opsForStream().delete(RedisPushService.STREAM_KEY, recordId.getValue());
+        } catch (Exception e) {
+            log.error("摄像头抓图出现异常, ", e);
+        } finally {
+            Boolean isAlarm = vo.getIsAlarm();
+            if (isAlarm) {
+                vo.setNextExecuteTime(DateTime.now().offset(DateField.SECOND, 30).toString("yyyy-MM-dd HH:mm:ss"));
+            } else {
+                vo.setNextExecuteTime(DateTime.now().offset(DateField.MINUTE, 1).toString("yyyy-MM-dd HH:mm:ss"));
+            }
+            vo.setStatus("complete");
+            redisService.addToHash(isAlarm ? ShotPictureTaskExecutors.CAMERA_ALARM_HASH :
+                    ShotPictureTaskExecutors.CAMERA_NORMAL_HASH, vo.getCameraCode(), vo);
+            ShotPictureTaskExecutors.RUNNING_ATOMIC_INT.decrementAndGet();
+        }
+    }
+
+}

+ 58 - 0
hazard-admin/src/main/java/com/ozs/web/controller/shotschedule/GenPictureTaskService.java

@@ -0,0 +1,58 @@
+package com.ozs.web.controller.shotschedule;
+
+import com.ozs.common.utils.uuid.IdUtils;
+import com.ozs.service.entity.vo.BaseCameraVO;
+import com.ozs.service.service.impl.CameraCaptureService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.springframework.util.ObjectUtils;
+
+import javax.annotation.Resource;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author lyao
+ * @description 生成监控图片
+ * @creat 2024/3/14
+ */
+@Slf4j
+@Service
+public class GenPictureTaskService {
+
+    /**
+     * 摄像头截图服务接口
+     */
+    @Resource
+    CameraCaptureService cameraCaptureService;
+
+    @Value("${shot.urlAddress}")
+    private String urlAddress;
+
+    public boolean getNormalPicture(String cameraCode, String channel, String address, String ymd) {
+        String uuid = IdUtils.fastSimpleUUID();
+        long start = System.currentTimeMillis();
+        if (!ObjectUtils.isEmpty(cameraCode)) {
+            try {
+//                if (log.isDebugEnabled()) {
+//                    log.debug("请求url======" + urlAddress + cameraCode + "/" + channel);
+//                }
+                URL url = new URL(urlAddress + cameraCode + "/" + channel);
+                String fileName = address + "/" + "normal" + "/" + ymd + "/" + uuid + ".jpeg";
+//                if (log.isDebugEnabled()) {
+//                    log.debug("正常摄像头截图开始, url: " + url + ", fileName: " + fileName);
+//                }
+                boolean capture = cameraCaptureService.getCapture(url, cameraCode, fileName);
+//                if (log.isDebugEnabled()) {
+//                    log.debug("正常摄像头截图结束, url: " + url + (capture ? "" : ", 出现错误") + ", 共计耗时: " + ((System.currentTimeMillis() - start) / 1000) + "s");
+//                }
+                return capture;
+            } catch (Throwable e) {
+                log.error("正常摄像头截图异常", e);
+            }
+        }
+        return false;
+    }
+
+}

+ 30 - 0
hazard-admin/src/main/java/com/ozs/web/controller/shotschedule/RedisPushService.java

@@ -0,0 +1,30 @@
+package com.ozs.web.controller.shotschedule;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.connection.stream.StreamRecords;
+import org.springframework.data.redis.connection.stream.StringRecord;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.Collections;
+
+@Service
+@Slf4j
+public class RedisPushService {
+
+    @Resource
+    private StringRedisTemplate stringRedisTemplate;
+
+    public final static String STREAM_KEY = "capture";
+    public final static String STREAM_CONSUMER_GROUP_KEY = "capture_group_1";
+
+    public void pushMsg(String msg) {
+        // 创建消息记录, 以及指定stream
+        StringRecord stringRecord = StreamRecords.string(Collections.singletonMap("data", msg)).withStreamKey(STREAM_KEY);
+        // 将消息添加至消息队列中
+        this.stringRedisTemplate.opsForStream().add(stringRecord);
+//        log.info("{}已发送消息: {}", STREAM_KEY, msg);
+    }
+
+}

+ 102 - 0
hazard-admin/src/main/java/com/ozs/web/controller/shotschedule/RedisStreamConsumerConfig.java

@@ -0,0 +1,102 @@
+package com.ozs.web.controller.shotschedule;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Bean;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.connection.stream.Consumer;
+import org.springframework.data.redis.connection.stream.ObjectRecord;
+import org.springframework.data.redis.connection.stream.ReadOffset;
+import org.springframework.data.redis.connection.stream.StreamOffset;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.stream.StreamListener;
+import org.springframework.data.redis.stream.StreamMessageListenerContainer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+
+@Component
+@Slf4j
+public class RedisStreamConsumerConfig {
+
+    //注入Redis模板
+    @Resource
+    private RedisTemplate<String, Object> redisTemplate;
+
+    @Bean
+    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> captureConsumerListener(
+            RedisConnectionFactory connectionFactory, CaptureStreamListener streamListener) {
+        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
+                streamContainer(RedisPushService.STREAM_KEY, connectionFactory, streamListener);
+        container.start();
+        return container;
+    }
+
+    /**
+     * @param myStream 从哪个流接收数据
+     * @param connectionFactory
+     * @param streamListener 绑定的监听类
+     * @return
+     */
+    private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(String myStream,
+        RedisConnectionFactory connectionFactory, StreamListener<String, ObjectRecord<String, String>> streamListener) {
+        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
+                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
+                        .builder()
+                        .pollTimeout(Duration.ofSeconds(5)) // 拉取消息超时时间
+                        .batchSize(1) // 批量抓取消息
+                        .targetType(String.class) // 传递的数据类型
+//                        .executor(Executors.newFixedThreadPool(1))
+                        .build();
+        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer
+                .create(connectionFactory, options);
+        //指定消费最新的消息
+        StreamOffset<String> offset = StreamOffset.create(myStream, ReadOffset.lastConsumed());
+        //创建消费者
+        StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = null;
+        try {
+            streamReadRequest = buildStreamReadRequest(offset, streamListener);
+        } catch (Exception e) {
+            log.error(e.getMessage());
+        }
+        //指定消费者对象
+        container.register(streamReadRequest, streamListener);
+        return container;
+    }
+
+    private StreamMessageListenerContainer.StreamReadRequest<String> buildStreamReadRequest(StreamOffset<String> offset,
+                            StreamListener<String, ObjectRecord<String, String>> streamListener) throws Exception {
+        Consumer consumer = null;
+        if (streamListener instanceof CaptureStreamListener) {
+            //如果 流不存在 创建 stream 流
+            if( !redisTemplate.hasKey(RedisPushService.STREAM_KEY)){
+                redisTemplate.opsForStream().add(RedisPushService.STREAM_KEY, Collections.singletonMap("status", "init"));
+                log.debug("初始化stream: " + RedisPushService.STREAM_KEY + "成功!");
+            }
+            //创建消费者组
+            try {
+                redisTemplate.opsForStream().createGroup(RedisPushService.STREAM_KEY, RedisPushService.STREAM_CONSUMER_GROUP_KEY);
+            } catch (Exception e) {
+                log.debug("消费者组: " + RedisPushService.STREAM_CONSUMER_GROUP_KEY + "已存在");
+            }
+            consumer = Consumer.from(RedisPushService.STREAM_CONSUMER_GROUP_KEY, RedisPushService.STREAM_KEY);
+        } else {
+            throw new Exception("无法识别的stream key");
+        }
+        StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer.StreamReadRequest.builder(offset)
+                .errorHandler((error) -> {
+                    error.printStackTrace();
+                    log.error(error.getMessage());
+                })
+                .cancelOnError(e -> false)
+                .consumer(consumer)
+                //关闭自动ack确认
+                .autoAcknowledge(false)
+                .build();
+        return streamReadRequest;
+    }
+
+}

+ 66 - 98
hazard-admin/src/main/java/com/ozs/web/controller/shotschedule/ShotPictureExecutors.java

@@ -1,98 +1,66 @@
-package com.ozs.web.controller.shotschedule;
-
-import com.ozs.common.core.domain.entity.SysDictData;
-import com.ozs.common.core.redis.RedisCache;
-import com.ozs.common.utils.DictUtils;
-import com.ozs.system.service.ISysDictDataService;
-import com.ozs.system.service.ISysDictTypeService;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.scheduling.annotation.EnableAsync;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.util.CollectionUtils;
-
-import javax.annotation.Resource;
-import java.util.List;
-
-/**
- * @author wyy
- * @subject
- * @creat 2024/3/1
- */
-@Configuration
-@EnableScheduling
-@EnableAsync
-@Slf4j
-public class ShotPictureExecutors {
-    @Autowired
-    private ISysDictDataService dictDataService;
-    @Autowired
-    private ISysDictTypeService iSysDictTypeService;
-    @Autowired
-    private TaskService taskService; //任务
-    @Value("${shot.storeAddress}")
-    private String storeAddress;
-    @Resource
-    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
-    @Autowired
-    RedisCache redisCache;
-
-    /**
-     * 处理非预警摄像头截图
-     */
-    @Async
-    @Scheduled(fixedRate = 60000)
-    public void handleNormalPicture() {
-
-        try {
-            Integer status = redisCache.getCacheObject("shot_switch");
-            if(status == null){
-                List<SysDictData> dataList = iSysDictTypeService.selectDictDataByType("shot_switch");
-                if(CollectionUtils.isEmpty(dataList)){
-                    redisCache.setCacheObject("shot_switch",2);
-                    status = redisCache.getCacheObject("shot_switch");
-                }else{
-                    redisCache.setCacheObject("shot_switch",dataList.get(0).getStatus());
-                    status = redisCache.getCacheObject("shot_switch");
-                }
-            }
-            if(status == 1){
-                this.threadPoolTaskExecutor.execute(taskService::getNormalPicture);
-            }
-        } catch (Exception e) {
-            e.getMessage();
-            log.error("正常摄像机截图定时任务失败",e);
-        }
-    }
-
-    /**
-     * 处理预警摄像头截图
-     */
-    @Scheduled(fixedRate = 500)
-    public void handleAlarmPicture() {
-        try {
-            Integer status = redisCache.getCacheObject("shot_switch");
-            if(status == null){
-                List<SysDictData> dataList = iSysDictTypeService.selectDictDataByType("shot_switch");
-                if(CollectionUtils.isEmpty(dataList)){
-                    redisCache.setCacheObject("shot_switch",2);
-                    status = redisCache.getCacheObject("shot_switch");
-                }else{
-                    redisCache.setCacheObject("shot_switch",dataList.get(0).getStatus());
-                    status = redisCache.getCacheObject("shot_switch");
-                }
-            }
-            if(status == 1){
-                this.threadPoolTaskExecutor.execute(taskService::getAlarmPicture);
-            }
-        } catch (Exception e) {
-            e.getMessage();
-            log.error("报警摄像机截图定时任务失败",e);
-        }
-    }
-}
+//package com.ozs.web.controller.shotschedule;
+//
+//import com.ozs.system.service.ISysDictDataService;
+//import com.ozs.system.service.ISysDictTypeService;
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.scheduling.annotation.Async;
+//import org.springframework.scheduling.annotation.EnableAsync;
+//import org.springframework.scheduling.annotation.EnableScheduling;
+//import org.springframework.scheduling.annotation.Scheduled;
+//import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+//
+//import javax.annotation.Resource;
+//
+///**
+// * @author wyy
+// * @subject
+// * @creat 2024/3/1
+// */
+//@Configuration
+//@EnableScheduling
+//@EnableAsync
+//@Slf4j
+//public class ShotPictureExecutors {
+//    @Autowired
+//    private ISysDictDataService dictDataService;
+//    @Autowired
+//    private ISysDictTypeService iSysDictTypeService;
+//    @Autowired
+//    private TaskService taskService; //任务
+//    @Value("${shot.storeAddress}")
+//    private String storeAddress;
+//    @Resource
+//    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+//
+//    /**
+//     * 处理非预警摄像头截图
+//     */
+//    @Async
+//    @Scheduled(fixedRate = 60000)
+//    public synchronized void handleNormalPicture() {
+//
+//        try {
+//
+//            this.threadPoolTaskExecutor.execute(taskService::getNormalPicture);
+//        } catch (Exception e) {
+//            e.getMessage();
+//            log.error("正常摄像机截图定时任务失败",e);
+//        }
+//    }
+//
+//    /**
+//     * 处理预警摄像头截图
+//     */
+//    @Scheduled(fixedRate = 500)
+//    public void handleAlarmPicture() {
+//        try {
+//            this.threadPoolTaskExecutor.execute(taskService::getAlarmPicture);
+//        } catch (Exception e) {
+//            e.getMessage();
+//            log.error("报警摄像机截图定时任务失败",e);
+//        }
+//    }
+//}

+ 226 - 0
hazard-admin/src/main/java/com/ozs/web/controller/shotschedule/ShotPictureTaskExecutors.java

@@ -0,0 +1,226 @@
+package com.ozs.web.controller.shotschedule;
+
+import cn.hutool.core.date.DateTime;
+import cn.hutool.core.date.DateUnit;
+import cn.hutool.core.lang.Dict;
+import cn.hutool.json.JSONUtil;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.ozs.common.core.domain.entity.SysDictData;
+import com.ozs.common.core.redis.RedisCache;
+import com.ozs.common.utils.DictUtils;
+import com.ozs.service.entity.BaseCameraManagement;
+import com.ozs.service.entity.MsgAlarm;
+import com.ozs.service.entity.vo.BaseCameraVO;
+import com.ozs.service.mapper.BaseCameraManagementMapper;
+import com.ozs.service.mapper.MsgAlarmMapper;
+import com.ozs.service.service.RedisService;
+import com.ozs.system.service.ISysDictTypeService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.util.CollectionUtils;
+
+import javax.annotation.Resource;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * @author lyao
+ * @description 截图服务 - 调度任务
+ * @creat 2024/3/14
+ */
+@Configuration
+@EnableScheduling
+@EnableAsync
+@Slf4j
+public class ShotPictureTaskExecutors {
+
+    @Value("${shot.storeAddress}")
+    private String storeAddress;
+
+    @Value("${shot.isMaster}")
+    private Boolean isMaster;
+
+    /**
+     * Redis 缓存服务提供类
+     */
+    @Resource
+    private RedisService redisService;
+
+    @Resource
+    public RedisTemplate<String, String> redisTemplate;
+
+    @Resource
+    private ISysDictTypeService iSysDictTypeService;
+
+    @Resource
+    private RedisPushService redisPushService;
+
+    @Resource
+    private MsgAlarmMapper msgAlarmMapper;
+
+    @Resource
+    private BaseCameraManagementMapper baseCameraManagementMapper;
+    @Autowired
+    RedisCache redisCache;
+
+
+    /**
+     * REDIS 抓拍摄像头分布式锁
+     */
+    private final static String LOCK_CAMELA_NORMAL_KEY = "lock_camela_normal";
+
+    public static final String CAMERA_ALARM_HASH = "camera_alarm_hash";
+    public static final String CAMERA_NORMAL_HASH = "camera_normal_hash";
+
+    public final static AtomicInteger RUNNING_ATOMIC_INT = new AtomicInteger(0);
+
+    /**
+     * 处理非预警摄像头截图, 间隔: 60s
+     */
+    @Async
+    @Scheduled(fixedDelay = 60 * 1000, initialDelay = 3 * 1000)
+    public synchronized void initCameraConfig() {
+        if (!isMaster) return;
+        // 获取分布式锁
+        boolean lockSuccess = redisTemplate.opsForValue().setIfAbsent(LOCK_CAMELA_NORMAL_KEY, "execute", Duration.ofSeconds(60));
+        if (lockSuccess) {
+            // 查询未解除的报警摄像头(报警摄像头逻辑沿用wangyy代码)
+            List<String> lockedMsgAlarmCameraCodes = msgAlarmMapper.selectList(new QueryWrapper<MsgAlarm>().eq("is_lock", 2))
+                    .stream().map(f -> f.getCameraCode()).distinct().collect(Collectors.toList());
+            try {
+                QueryWrapper<BaseCameraManagement> queryWrapper = new QueryWrapper<>();
+                queryWrapper.select("distinct camera_code, channel");
+                // 获取数据库中最新摄像头相关内容
+                List<BaseCameraManagement> dbAlarms = baseCameraManagementMapper.selectList(queryWrapper).stream().collect(Collectors.toList());
+                if (!CollectionUtils.isEmpty(dbAlarms)) {
+                    Set<Object> hashAlarmKeys = redisService.redisTemplate.opsForHash().keys(CAMERA_ALARM_HASH);
+                    Set<Object> hashNormalKeys = redisService.redisTemplate.opsForHash().keys(CAMERA_NORMAL_HASH);
+                    // 数据库中存在的数据, 但是缓存中不存在, 则进行补全
+                    for (BaseCameraManagement dbAlarm : dbAlarms) {
+                        BaseCameraVO  vo = new BaseCameraVO();
+                        vo.setCameraCode(dbAlarm.getCameraCode());
+                        vo.setChannel(dbAlarm.getChannel());
+                        vo.setNextExecuteTime(DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
+                        if (lockedMsgAlarmCameraCodes.contains(vo.getCameraCode()) && !hashAlarmKeys.contains(vo.getCameraCode())) {
+                            vo.setIsAlarm(true);
+                            redisService.addToHash(CAMERA_ALARM_HASH, vo.getCameraCode(), vo);
+                        } else if (!lockedMsgAlarmCameraCodes.contains(vo.getCameraCode()) && !hashNormalKeys.contains(vo.getCameraCode())) {
+                            vo.setIsAlarm(false);
+                            redisService.addToHash(CAMERA_NORMAL_HASH, vo.getCameraCode(), vo);
+                        }
+                    }
+                    // 排除缓存中未删除, 但是系统中删除的情况
+                    for (Object hashAlarmKey : hashAlarmKeys) {
+                        // 如果在数据库中都不存在, 则进行删除删除
+                        if (!hashAlarmKeys.contains(hashAlarmKey) && !hashNormalKeys.contains(hashAlarmKey)) {
+                            redisService.redisTemplate.opsForHash().delete(CAMERA_ALARM_HASH, hashAlarmKey);
+                            redisService.redisTemplate.opsForHash().delete(CAMERA_NORMAL_HASH, hashAlarmKey);
+                        } else if (hashAlarmKeys.contains(hashAlarmKey) && hashNormalKeys.contains(hashAlarmKey)) {  // 如果存在于报警SET中, 则删除正常摄像头
+                            redisService.redisTemplate.opsForHash().delete(CAMERA_NORMAL_HASH, hashAlarmKey);
+                        }
+                    }
+                } else {
+                    // 如果未找到数据库数据, 则清空抓图摄像头集合
+                    redisService.redisTemplate.delete(CAMERA_ALARM_HASH);
+                    redisService.redisTemplate.delete(CAMERA_NORMAL_HASH);
+                }
+            } catch (Exception e) {
+                log.error("缓存 [摄像头抓图] 调度任务配置出现异常, ", e);
+            } finally {
+                redisTemplate.delete(LOCK_CAMELA_NORMAL_KEY);
+            }
+        }
+    }
+
+    /**
+     * 处理预警摄像头截图, 间隔: 5s
+     */
+    @Scheduled(fixedDelay = 5 * 1000, initialDelay = 10 * 1000)
+    public void batchProcess() {
+        if (!isMaster) return;
+        // 填充默认路径
+        String address = storeAddress;
+        //获取截图开关
+        Integer status = redisCache.getCacheObject("shot_switch");
+        if(status == null){
+            List<SysDictData> dataList = iSysDictTypeService.selectDictDataByType("shot_switch");
+            if(CollectionUtils.isEmpty(dataList)){
+                redisCache.setCacheObject("shot_switch",2);
+                status = redisCache.getCacheObject("shot_switch");
+            }else{
+                redisCache.setCacheObject("shot_switch",dataList.get(0).getStatus());
+                status = redisCache.getCacheObject("shot_switch");
+            }
+        }
+        // 判断开关是否执行
+        if (status != 1) {
+            return;
+        }
+        // 获取字典值: 截图存放路径
+        List<SysDictData> addressDataList = DictUtils.getDictCache("shot_address");
+        if(CollectionUtils.isEmpty(addressDataList) || Objects.isNull(addressDataList.get(0))){
+            DictUtils.setDictCache("shot_address", addressDataList = iSysDictTypeService.selectDictDataByType("shot_address"));
+        }
+        if(!CollectionUtils.isEmpty(addressDataList) && Objects.nonNull(addressDataList.get(0))){
+            address = addressDataList.get(0).getDictLabel();
+        }
+        // 设置路径地址为字典表最终值
+        Map<Object, Object> entries = redisService.redisTemplate.opsForHash().entries(CAMERA_NORMAL_HASH);
+        if (!CollectionUtils.isEmpty(entries)) {
+            final String finalAddress = address;
+            for (Map.Entry<Object, Object> entry : entries.entrySet()) {
+                BaseCameraVO item = (BaseCameraVO) entry.getValue();
+                DateTime nextExecuteTime = DateTime.of(item.getNextExecuteTime(), "yyyy-MM-dd HH:mm:ss");
+                boolean after = item.getNextExecuteTime() == null ||
+                        DateTime.now().isAfterOrEquals(nextExecuteTime) ||
+                        (nextExecuteTime.between(DateTime.now(), DateUnit.MINUTE) > 5 && "running".equalsIgnoreCase(item.getStatus()));
+                if ((nextExecuteTime.between(DateTime.now(), DateUnit.MINUTE) > 5 && "running".equalsIgnoreCase(item.getStatus()))) {
+                    item.setStatus("");
+                }
+                if (after && (StringUtils.isBlank(item.getStatus()) || "complete".equalsIgnoreCase(item.getStatus()))) {
+                    Dict dict = new Dict();
+                    dict.put("cameraCode", item.getCameraCode());
+                    dict.put("channel", item.getChannel());
+                    dict.put("address", finalAddress);
+                    dict.put("isAlarm", false);
+                    dict.put("nextExecuteTime", item.getNextExecuteTime());
+                    redisPushService.pushMsg(JSONUtil.toJsonStr(dict));
+                }
+            }
+        }
+        Map<Object, Object> alarmEntries = redisService.redisTemplate.opsForHash().entries(CAMERA_ALARM_HASH);
+        if (!CollectionUtils.isEmpty(alarmEntries)) {
+            final String finalAddress = address;
+            for (Map.Entry<Object, Object> entry : alarmEntries.entrySet()) {
+                BaseCameraVO item = (BaseCameraVO) entry.getValue();
+                DateTime nextExecuteTime = DateTime.of(item.getNextExecuteTime(), "yyyy-MM-dd HH:mm:ss");
+                boolean after = item.getNextExecuteTime() == null ||
+                        DateTime.now().isAfterOrEquals(nextExecuteTime) ||
+                        (nextExecuteTime.between(DateTime.now(), DateUnit.MINUTE) > 5 && "running".equalsIgnoreCase(item.getStatus()));
+                if ((nextExecuteTime.between(DateTime.now(), DateUnit.MINUTE) > 5 && "running".equalsIgnoreCase(item.getStatus()))) {
+                    item.setStatus("");
+                }
+                if (after && (StringUtils.isBlank(item.getStatus()) || "complete".equalsIgnoreCase(item.getStatus()))) {
+                    Dict dict = new Dict();
+                    dict.put("cameraCode", item.getCameraCode());
+                    dict.put("channel", item.getChannel());
+                    dict.put("address", finalAddress);
+                    dict.put("isAlarm", true);
+                    dict.put("nextExecuteTime", item.getNextExecuteTime());
+                    redisPushService.pushMsg(JSONUtil.toJsonStr(dict));
+                }
+            }
+        }
+    }
+
+}

+ 35 - 0
hazard-admin/src/main/java/com/ozs/web/core/init/InitatialConfig.java

@@ -0,0 +1,35 @@
+package com.ozs.web.core.init;
+
+import com.ozs.common.core.text.StrFormatter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.concurrent.ThreadPoolExecutor;
+
+@Component
+@Slf4j
+public class InitatialConfig implements ApplicationRunner {
+
+    @Resource
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+    @Override
+    public void run(ApplicationArguments args) throws Exception {
+        ThreadPoolExecutor exector = threadPoolTaskExecutor.getThreadPoolExecutor();
+        while (true) {
+            if (threadPoolTaskExecutor.getActiveCount() > 0) {
+                String threadMsg = StrFormatter.format("当前线程池核心线程数: {}, 最大线程数: {}, 当前线程池大小: {}, " +
+                        "共收到任务数: {}, 完成任务数: {}, 活动线程数: {}, 等待任务数: {}",
+                        new Object[] { exector.getCorePoolSize(), exector.getMaximumPoolSize(), exector.getPoolSize(), exector.getTaskCount(),
+                                exector.getCompletedTaskCount(), exector.getActiveCount(), exector.getQueue().size() });
+                log.debug(threadMsg);
+            }
+            // 延迟3秒打印一次
+            Thread.sleep(3 * 1000);
+        }
+    }
+
+}

+ 1 - 1
hazard-admin/src/main/java/com/ozs/web/core/util/CameraUtil.java

@@ -463,7 +463,7 @@ ffmpeg -i "concat:1.ts|2.ts" -c copy output.mp4
         }
         }
         
         
         StringBuilder cmd = new StringBuilder(ffmpegPath);
         StringBuilder cmd = new StringBuilder(ffmpegPath);
-        cmd.append(" -f concat -safe 0 -i ")
+        cmd.append(" -f concat -safe 0 -y -i ")
         		.append(argFile.getAbsolutePath())
         		.append(argFile.getAbsolutePath())
 	        	.append(" -c copy ")
 	        	.append(" -c copy ")
 	        	.append(newfilePath);
 	        	.append(newfilePath);

+ 5 - 1
hazard-admin/src/main/resources/application.yml

@@ -85,6 +85,8 @@ spring:
         core-size: 4
         core-size: 4
         #cpu内核数 + 1
         #cpu内核数 + 1
         max-size: 8
         max-size: 8
+        # 超时秒数(s), 需要根据实际情况调整来达到业务目的
+#        keep-alive: 10
   # redis 配置
   # redis 配置
   redis:
   redis:
     # 地址
     # 地址
@@ -96,7 +98,7 @@ spring:
     # 密码
     # 密码
     password: 106@qwe123
     password: 106@qwe123
     # 连接超时时间
     # 连接超时时间
-    timeout: 100s
+    timeout: 300s
     lettuce:
     lettuce:
       pool:
       pool:
         # 连接池中的最小空闲连接
         # 连接池中的最小空闲连接
@@ -174,3 +176,5 @@ mqtt:
 shot:
 shot:
   urlAddress: http://124.70.58.209:9080/snap/
   urlAddress: http://124.70.58.209:9080/snap/
   storeAddress: camera_picture
   storeAddress: camera_picture
+  # 是否为主节点, 一个主节点设置为TRUE, 其他都要设定为FALSE
+  isMaster: true