|
@@ -1,17 +1,16 @@
|
|
|
package com.care.mqtt.tool;
|
|
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.eclipse.paho.client.mqttv3.*;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
|
|
|
+@Slf4j
|
|
|
public class MqttDataConnector {
|
|
|
|
|
|
|
|
|
- private static Logger logger = LoggerFactory.getLogger(MqttDataConnector.class);
|
|
|
|
|
|
private MqttClient mqttClient;
|
|
|
private MqttConnectOptions mqttConnectOptions;
|
|
@@ -63,7 +62,6 @@ public class MqttDataConnector {
|
|
|
String clientPassword = configParams.get("MQTT_SSLPASSWORD");
|
|
|
//ssl 协议版本 一般看mqtt服务端broker设置 不设置默认为TLSv1.1
|
|
|
String sslProtocol = configParams.get("MQTT_SSLPROTOCOL");
|
|
|
- logger.info("sslProtocol======{}",sslProtocol);
|
|
|
mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactory(rootCrtPath, clientCrtPath, clientKeyPath, clientPassword,sslProtocol));
|
|
|
|
|
|
}else if("one".equals(sslType)){
|
|
@@ -80,7 +78,7 @@ public class MqttDataConnector {
|
|
|
}
|
|
|
//mqttConnectOptions.setWill(); //可以设置断线发送、接收消息
|
|
|
mqttClient.connect(mqttConnectOptions);
|
|
|
- logger.info("mqtt 连接成功!!!");
|
|
|
+ log.info("mqtt 连接成功!!!");
|
|
|
|
|
|
|
|
|
}
|
|
@@ -101,23 +99,23 @@ public class MqttDataConnector {
|
|
|
mqttClient.setCallback(new MqttCallbackExtended() {
|
|
|
@Override
|
|
|
public void connectComplete(boolean reconnect, String serverURI) {
|
|
|
- logger.info("topic reconnect ---> " + reconnect + " serverURI--->" + serverURI);
|
|
|
+ log.info("topic reconnect ---> " + reconnect + " serverURI--->" + serverURI);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void connectionLost(Throwable cause) {
|
|
|
- logger.error("{}订阅丢失 connectionLost cause ---> {}" ,topicFilter,cause.getMessage());
|
|
|
+ log.error("{}订阅丢失 connectionLost cause ---> {}" ,topicFilter,cause.getMessage());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
|
|
- logger.error("{}接收到新消息 ---> {}" ,topic,message.toString());
|
|
|
+ log.debug("{}接收到新消息 ---> {}" ,topic,message.toString());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
try {
|
|
|
- logger.error("{}消息已发送 ---> {}" ,token.getMessage().toString());
|
|
|
+ log.error("{}消息已发送 ---> {}" ,token.getMessage().toString());
|
|
|
} catch (MqttException e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
@@ -161,9 +159,9 @@ public class MqttDataConnector {
|
|
|
mqttMessage.setQos(0);
|
|
|
//消费者断开连接后是否接受离线消息
|
|
|
mqttMessage.setRetained(false);
|
|
|
- logger.info("topic:{} send content {} ",publicLiveTopic,data);
|
|
|
+ log.info("topic:{} send content {} ",publicLiveTopic,data);
|
|
|
mqttClient.publish(publicLiveTopic,mqttMessage);
|
|
|
- logger.info("topic:{} send dataSize {}kb ",publicLiveTopic,data.length()/1024.0);
|
|
|
+ log.info("topic:{} send dataSize {}kb ",publicLiveTopic,data.length()/1024.0);
|
|
|
|
|
|
}
|
|
|
|
|
@@ -173,10 +171,10 @@ public class MqttDataConnector {
|
|
|
public void destroy() {
|
|
|
try {
|
|
|
this.mqttClient.disconnect();
|
|
|
- logger.info("mqtt 手动断开连接!!!");
|
|
|
+ log.info("mqtt 手动断开连接!!!");
|
|
|
} catch (MqttException e) {
|
|
|
//e.printStackTrace();
|
|
|
- logger.error("手动断开连接报错error={}",e.getMessage());
|
|
|
+ log.error("手动断开连接报错error={}",e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|