瀏覽代碼

相机树开发

gao.qiang 3 月之前
父節點
當前提交
627ef2a4c5

+ 11 - 5
business-service/src/main/java/com/ozs/service/utils/PublishCallback.java

@@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.util.List;
 
 /**
@@ -26,17 +27,24 @@ public class PublishCallback implements MqttCallback {
 
     public static final Logger logger = LoggerFactory.getLogger(PublishCallback.class);
 
+
     //在断开连接时调用
     @Override
     public void connectionLost(Throwable cause) {
         // 连接丢失后,一般在这里面进行重连
-        System.out.println("连接断开,可以做重连" + cause);
+        logger.error("连接断开,原因:" + cause);
+        try {
+            logger.error("------connectionLost-------");
+            PublishClient.reconnect();
+        } catch (Exception e) {
+            logger.error("重连失败:" + e);
+        }
     }
 
     //接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用
     @Override
     public void deliveryComplete(IMqttDeliveryToken token) {
-        System.out.println("deliveryComplete---------" + token.isComplete());
+        logger.error("deliveryComplete---------" + token.isComplete());
     }
 
     //接收已经预订的发布
@@ -48,8 +56,6 @@ public class PublishCallback implements MqttCallback {
         // subscribe后得到的消息会执行到这里面
         logger.info("接收消息主题 : {}", topic);
         logger.info("接收消息内容 : {}", new String(message.getPayload()));
-        System.out.println("接收消息主题 : " + topic);
-        System.out.println("接收消息Qos : " + message.getQos());
         if ("heart".equals(topic)) {
             try {
                 String s = new String(message.getPayload());
@@ -100,7 +106,7 @@ public class PublishCallback implements MqttCallback {
                 logger.error(e.getMessage());
             }
         }
-        System.out.println("接收消息内容 : " + new String(message.getPayload()));
+        logger.info("接收消息内容 : " + new String(message.getPayload()));
 
     }
 }

+ 34 - 7
business-service/src/main/java/com/ozs/service/utils/PublishClient.java

@@ -71,7 +71,7 @@ public class PublishClient {
             MqttException {
         MqttDeliveryToken token = topic.publish(message);
         token.waitForCompletion();
-        System.out.println("message is published completely! " + token.isComplete());
+        log.info("message is published completely! " + token.isComplete());
     }
 
     public static void pull(String sign, String deviceSn) throws MqttException {
@@ -122,7 +122,7 @@ public class PublishClient {
         res.put("data", map);
         String s = JSONObject.toJSONString(res);
 
-        System.out.println(s);
+        log.info(s);
 
     }
 
@@ -160,7 +160,7 @@ public class PublishClient {
                 publish(getMqttTopic("update_" + code), message);
                 log.info("updateDeviceSn---end");
             } catch (MqttException e) {
-                System.out.println("updateDeviceSn-------" + e.getMessage());
+                log.error("updateDeviceSn-------" + e.getMessage());
             }
         }
     }
@@ -203,7 +203,7 @@ public class PublishClient {
                  */
                 publish(getMqttTopic("config_" + baseCameraManagement.getCameraCode()), message);
             } catch (MqttException e) {
-                System.out.println(e.getMessage());
+                log.error(e.getMessage());
             }
     }
 
@@ -242,7 +242,7 @@ public class PublishClient {
                  */
                 publish(getMqttTopic("config_" + baseCameraManagement.getCameraCode()), message);
             } catch (MqttException e) {
-                System.out.println(e.getMessage());
+                log.error(e.getMessage());
             }
     }
 
@@ -265,11 +265,11 @@ public class PublishClient {
         /**
          * 设置超时时间
          */
-        options.setConnectionTimeout(10);
+        options.setConnectionTimeout(60);
         /**
          * 设置会话心跳时间
          */
-        options.setKeepAliveInterval(20);
+        options.setKeepAliveInterval(40);
         try {
             /**
              * 设置发布回调
@@ -282,7 +282,34 @@ public class PublishClient {
             String[] topic1 = {"config", "update", "heart", "test"};
             client.subscribe(topic1);
         } catch (Exception e) {
+            log.error(e.getMessage());
             e.printStackTrace();
         }
     }
+
+    public static void reconnect() throws MqttException {
+        log.error("尝试重连...");
+        if (client != null) {
+            try {
+                client.close();
+            } catch (MqttException e) {
+                log.error("关闭现有连接时出错:" + e);
+            }
+        }
+
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setCleanSession(true);
+        options.setUserName(USER_NAME);
+        options.setPassword(PASS_WORD.toCharArray());
+        options.setConnectionTimeout(60);
+        options.setKeepAliveInterval(40);
+
+        client = new MqttClient(HOST, String.valueOf(clientId), new MemoryPersistence());
+        client.setCallback(new PublishCallback());
+        client.connect(options);
+        int[] Qos = {1};
+        String[] topics = {"config", "update", "heart", "test"};
+        client.subscribe(topics, Qos);
+        log.error("重连成功!");
+    }
 }