Browse Source

病害属性修改

gao.qiang 1 year ago
parent
commit
86cf1dd06f

+ 15 - 0
business-service/pom.xml

@@ -35,6 +35,21 @@
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>5.7.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>mqtt-client</artifactId>
+            <version>0.4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.54</version>
+        </dependency>
 
     </dependencies>
 

+ 31 - 0
business-service/src/main/java/com/ozs/service/entity/vo/Data.java

@@ -0,0 +1,31 @@
+package com.ozs.service.entity.vo;
+
+/**
+ * @author Administrator
+ */
+public class Data {
+    /**
+     * 推流状态 0 关闭, 1 开启
+     */
+    private Integer stream;
+    /**
+     * 告警状态 0 关闭, 1 开启
+     */
+    private Integer alarm;
+
+    public Integer getStream() {
+        return stream;
+    }
+
+    public void setStream(Integer stream) {
+        this.stream = stream;
+    }
+
+    public Integer getAlarm() {
+        return alarm;
+    }
+
+    public void setAlarm(Integer alarm) {
+        this.alarm = alarm;
+    }
+}

+ 56 - 0
business-service/src/main/java/com/ozs/service/entity/vo/Heartbeat.java

@@ -0,0 +1,56 @@
+package com.ozs.service.entity.vo;
+
+
+/**
+ * @author Administrator
+ */
+public class Heartbeat {
+    /**
+     * 名称
+     */
+    private String Name;
+    /**
+     *
+     */
+    private Data data;
+    /**
+     * 状态码
+     */
+    private Integer code;
+    /**
+     * 数据上传的签名
+     */
+    private String sign;
+
+    public String getName() {
+        return Name;
+    }
+
+    public void setName(String name) {
+        Name = name;
+    }
+
+    public Data getData() {
+        return data;
+    }
+
+    public void setData(Data data) {
+        this.data = data;
+    }
+
+    public Integer getCode() {
+        return code;
+    }
+
+    public void setCode(Integer code) {
+        this.code = code;
+    }
+
+    public String getSign() {
+        return sign;
+    }
+
+    public void setSign(String sign) {
+        this.sign = sign;
+    }
+}

+ 57 - 0
business-service/src/main/java/com/ozs/service/utils/PublishCallback.java

@@ -0,0 +1,57 @@
+package com.ozs.service.utils;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ *  callback回调:
+ * @author Administrator
+ */
+public class PublishCallback implements MqttCallback {
+
+    //在断开连接时调用
+    @Override
+    public void connectionLost(Throwable cause) {
+        // 连接丢失后,一般在这里面进行重连
+        System.out.println("连接断开,可以做重连");
+    }
+
+    //接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        System.out.println("deliveryComplete---------" + token.isComplete());
+    }
+
+    //接收已经预订的发布
+    @Override
+    public void messageArrived(String topic, MqttMessage message) throws Exception {
+        // subscribe后得到的消息会执行到这里面
+        System.out.println("接收消息主题 : " + topic);
+        System.out.println("接收消息Qos : " + message.getQos());
+        System.out.println("接收消息内容 : " + new String(message.getPayload()));
+    }
+
+    public static void main(String[] args) {
+        String s=new String("{\n" +
+                "    \"Name\": \"HeartRequest\",\n" +
+                "    \"device_sn\": \"202211010001\" \n" +
+                "    \"rssi\": 1,          \n" +
+                "    \"battery_level\":99,       \n" +
+                "    \"timestamp\": 1610959147, \n" +
+                "    \"language\": 0, \n" +
+                "    \"sw_ver\": \"LuatOS-Air_V3205_RDA8910_RBLFTVT_0xb0000__TW06-530_1.3.1\", \n" +
+                "    \"hw_ver\": \"vLuatOS-Air_V3205_RDA8910_RBLFTVT_0xb0000__TW06-530_1.3.1\", \n" +
+                "    \"stream\": 0, \n" +
+                "    \"model\":[\"Model_Helmet2_V213_P_c69cbfa9d7de4234f76d6f3811986e3a.bovai\",\"model_2\"], \n" +
+                "    \"type\": 0,\n" +
+                "    \"sign\": \"b867d0e545d9abbf74c2d11b62798250\"\n" +
+                "}");
+
+        JSONObject jsonObject =  JSON.parseObject(s);
+        Object sign = jsonObject.get("sign");
+        System.out.println(sign.toString());
+    }
+}

+ 90 - 0
business-service/src/main/java/com/ozs/service/utils/PublishClient.java

@@ -0,0 +1,90 @@
+package com.ozs.service.utils;
+
+import com.alibaba.fastjson2.JSON;
+import com.ozs.service.entity.vo.Data;
+import com.ozs.service.entity.vo.Heartbeat;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
+import org.eclipse.paho.client.mqttv3.MqttTopic;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+
+/**
+ * 发布客户端
+ * @author Administrator
+ */
+public class PublishClient {
+    //mqtt服务器地址
+    public static final String HOST = "tcp://124.70.58.209:1883";
+    //主题
+    public static final String TOPIC = "heart_DEVICESN";
+    //mqtt 客户机ID
+    private static final String clientid = "server";
+    private MqttClient client;//客户端实例
+    private MqttTopic topic11;//主题实例
+    private String userName = "guest";  //非必须
+    private String passWord = "guest";  //非必须
+    private MqttMessage message;
+    //初始化客户端实例
+    public PublishClient() throws MqttException {
+        //MemoryPersistence设置clientid的保存形式,默认为以内存保存
+        client = new MqttClient(HOST, clientid, new MemoryPersistence());
+        connect();
+    }
+    //连接服务器
+    private void connect() {
+        //连接配置
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setCleanSession(true);//不保存,每次重启新client
+        options.setUserName(userName);
+        options.setPassword(passWord.toCharArray());
+        // 设置超时时间
+        options.setConnectionTimeout(10);
+        // 设置会话心跳时间
+        options.setKeepAliveInterval(20);
+        try {
+            //设置发布回调
+            client.setCallback(new PublishCallback());
+            client.connect(options);
+            topic11 = client.getTopic(TOPIC);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+    //发布
+    public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
+            MqttException {
+        MqttDeliveryToken token = topic.publish(message);
+        token.waitForCompletion();
+        System.out.println("message is published completely! "+ token.isComplete());
+    }
+    //测试类
+    public static void main(String[] args) throws MqttException, InterruptedException {
+        //发布客户端
+        PublishClient server = new PublishClient();
+        //每隔10s发一条
+        for (;;){
+            Heartbeat test = new Heartbeat();
+            test.setName("HeartResponse");
+            Data data = new Data();
+            data.setAlarm(0);
+            data.setStream(0);
+            test.setData(data);
+            test.setCode(200);
+            test.setSign("b867d0e545d9abbf74c2d11b62798250");
+            String s = JSON.toJSONString(test);
+
+
+            server.message = new MqttMessage();
+            server.message.setQos(1);//保证消息能到达一次
+            server.message.setRetained(true);//消息保留
+            server.message.setPayload(s.getBytes());//消息内容
+            server.publish(server.topic11 , server.message);//发布
+            Thread.sleep(10000);
+        }
+    }
+}

+ 64 - 0
business-service/src/main/java/com/ozs/service/utils/SubscribeClient.java

@@ -0,0 +1,64 @@
+package com.ozs.service.utils;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttTopic;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * 订阅客户端
+ * @author Administrator
+ */
+public class SubscribeClient {
+    //mqtt服务器ip
+    public static final String HOST = "tcp://124.70.58.209:1883";
+    //主题
+    public static final String TOPIC1 = "heart";
+//    public static final String TOPIC1 = "heart_DEVICESN";
+    //mqtt 客户机ID
+    private String clientid = "client";
+    private MqttClient client;
+    private MqttConnectOptions options;
+    private String userName = "guest";
+    private String passWord = "guest";
+    @SuppressWarnings("unused")
+    private ScheduledExecutorService scheduler;
+    public SubscribeClient(String clientid){
+        this.clientid = clientid;
+    }
+    public void start() {
+        try {
+            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
+            client = new MqttClient(HOST, clientid, new MemoryPersistence());
+            // MQTT的连接设置
+            options = new MqttConnectOptions();
+            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
+            options.setCleanSession(true);
+            // 设置连接的用户名
+            options.setUserName(userName);
+            // 设置连接的密码
+            options.setPassword(passWord.toCharArray());
+            // 设置超时时间 单位为秒
+            options.setConnectionTimeout(10);
+            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
+            options.setKeepAliveInterval(20);
+            // 设置回调
+            client.setCallback(new PublishCallback());
+            MqttTopic topic = client.getTopic(TOPIC1);
+            //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
+            // 遗嘱
+            options.setWill(topic, "close".getBytes(), 2, true);
+            client.connect(options);
+            //订阅消息
+            int[] Qos  = {1};
+            String[] topic1 = {TOPIC1};
+            client.subscribe(topic1, Qos);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 31 - 0
hazard-admin/src/main/java/com/ozs/web/controller/mqtt/MqttController.java

@@ -0,0 +1,31 @@
+package com.ozs.web.controller.mqtt;
+
+
+import com.ozs.common.annotation.Log;
+import com.ozs.common.core.domain.AjaxResult;
+import com.ozs.common.enums.BusinessType;
+import com.ozs.service.utils.SubscribeClient;
+import io.swagger.annotations.ApiOperation;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+
+/**
+ * @author Administrator
+ */
+@RestController
+@RequestMapping("/service/mqtt")
+@Slf4j
+public class MqttController {
+
+
+    @ApiOperation(value = "设备心跳")
+    @GetMapping("/deviceHeartbeat")
+    @Log(title = "MQTT", businessType = BusinessType.SELECT)
+    public AjaxResult selectCameraAll() {
+        new SubscribeClient("client_1").start();
+        return AjaxResult.success();
+    }
+}