MqttDataConnector.java 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package com.care.mqtt.tool;
  2. import org.apache.commons.lang3.StringUtils;
  3. import org.eclipse.paho.client.mqttv3.*;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. public class MqttDataConnector {
  9. private static Logger logger = LoggerFactory.getLogger(MqttDataConnector.class);
  10. private MqttClient mqttClient;
  11. private MqttConnectOptions mqttConnectOptions;
  12. /**
  13. * 创建连接
  14. * @param configParams
  15. * @throws Exception
  16. */
  17. public void createIotDataSource(Map<String, String> configParams) throws Exception {
  18. //访问ip:port tcp:// or ssl://
  19. String host = configParams.get("MQTT_HOST");
  20. //客户端id 保持唯一
  21. String clientId = configParams.get("MQTT_CLIENTID");
  22. //用户名
  23. String userName = configParams.get("MQTT_USERNAME");
  24. //密码
  25. String password = configParams.get("MQTT_PASSWORD");
  26. //ssl 验证时是双向验证还是单向验证
  27. String sslType = configParams.get("MQTT_SSLTYPE");
  28. String cleanSession = configParams.get("CLEAN_SESSION");
  29. //缓存两种模式 存在内存 文件 设置成null 缓存在内存中 最多缓存65535条信息
  30. //ScheduledExecutorService 可以设置线程池大小 默认10;发布消息方法是异步的
  31. this.mqttClient = new MqttClient(host,clientId,null);//new MqttDefaultFilePersistence()
  32. mqttConnectOptions = new MqttConnectOptions();
  33. // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
  34. // 这里设置为true表示每次连接到服务器都以新的身份连接
  35. mqttConnectOptions.setCleanSession(Boolean.parseBoolean(cleanSession));
  36. // mqttConnectOptions.sto
  37. // 设置超时时间 s
  38. mqttConnectOptions.setConnectionTimeout(30);
  39. // 设置会话心跳时间
  40. mqttConnectOptions.setKeepAliveInterval(60);
  41. mqttConnectOptions.setAutomaticReconnect(true);//设置自动重连
  42. //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
  43. // mqttConnectOptions.setWill("sec", "close".getBytes(), 2, true);
  44. if("two".equals(sslType)){
  45. //服务端证书路径
  46. String rootCrtPath = configParams.get("MQTT_SSLROOT_CRTPATH");
  47. //客户端证书路径
  48. String clientCrtPath = configParams.get("MQTT_SSLCLIENT_CRTPATH");
  49. //客户端密匙路径
  50. String clientKeyPath = configParams.get("MQTT_SSLCLIENT_KEYPATH");
  51. //密匙加密密码
  52. String clientPassword = configParams.get("MQTT_SSLPASSWORD");
  53. //ssl 协议版本 一般看mqtt服务端broker设置 不设置默认为TLSv1.1
  54. String sslProtocol = configParams.get("MQTT_SSLPROTOCOL");
  55. logger.info("sslProtocol======{}",sslProtocol);
  56. mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactory(rootCrtPath, clientCrtPath, clientKeyPath, clientPassword,sslProtocol));
  57. }else if("one".equals(sslType)){
  58. String rootCrtPath = configParams.get("MQTT_SSLROOT_CRTPATH");
  59. String sslProtocol = configParams.get("MQTT_SSLPROTOCOL");
  60. mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactorySingle(rootCrtPath,sslProtocol));
  61. }
  62. if(StringUtils.isNotBlank(userName)){
  63. mqttConnectOptions.setUserName(userName);
  64. }
  65. if (StringUtils.isNotBlank(password)){
  66. mqttConnectOptions.setPassword(password.toCharArray());
  67. }
  68. //mqttConnectOptions.setWill(); //可以设置断线发送、接收消息
  69. mqttClient.connect(mqttConnectOptions);
  70. logger.info("mqtt 连接成功!!!");
  71. }
  72. /**
  73. *
  74. * testTopic/# #多层通配符 +单层通配符
  75. * topicFilter
  76. * @param topicFilter
  77. * @param iotMqttMessageListener
  78. * @throws Exception
  79. */
  80. public void setReSubscribe(String topicFilter, MqttMessageListener iotMqttMessageListener) throws Exception {
  81. int qos = 2;//0:最多一次 、1:最少一次 、2:只有一次
  82. mqttClient.subscribe(topicFilter,qos, iotMqttMessageListener);
  83. mqttClient.setCallback(new MqttCallbackExtended() {
  84. @Override
  85. public void connectComplete(boolean reconnect, String serverURI) {
  86. logger.info("reconnect ---> " + reconnect + " serverURI--->" + serverURI);
  87. }
  88. @Override
  89. public void connectionLost(Throwable cause) {
  90. logger.error("connectionLost cause ---> " + cause);
  91. }
  92. @Override
  93. public void messageArrived(String topic, MqttMessage message) throws Exception {
  94. }
  95. @Override
  96. public void deliveryComplete(IMqttDeliveryToken token) {
  97. }
  98. });
  99. }
  100. /**
  101. * 取消订阅
  102. *
  103. * @param topics
  104. * @throws Exception
  105. */
  106. public void unsubscribe(String[] topics) throws Exception{
  107. mqttClient.unsubscribe(topics);
  108. }
  109. /**
  110. * 是否处于连接状态
  111. * @return
  112. */
  113. public boolean isConnected() {
  114. return mqttClient.isConnected();
  115. }
  116. /**
  117. * 发布数据
  118. * @param
  119. * @param data
  120. * @throws Exception
  121. */
  122. public void publish(String publicLiveTopic, String data) throws Exception{
  123. MqttMessage mqttMessage = new MqttMessage();
  124. mqttMessage.setPayload(data.getBytes("UTF-8"));
  125. //QoS:发布消息的服务质量,即:保证消息传递的次数(消费者收到的次数)
  126. //0:最多一次,即:<=1;每个消息只发一次,也不会缓存下来。
  127. //1:至少一次,即:>=1;一直发送确保消费者至少收到一次,发送失败会缓存下来。
  128. //2:一次,即:=1 一直发送确保消费者只能收到一次;发送失败会缓存下来 。
  129. mqttMessage.setQos(1);
  130. //消费者断开连接后是否接受离线消息
  131. mqttMessage.setRetained(true);
  132. mqttClient.publish(publicLiveTopic,mqttMessage);
  133. logger.info("topic:{} send content {} ",publicLiveTopic,data);
  134. logger.info("topic:{} send dataSize {}kb ",publicLiveTopic,data.length()/1024.0);
  135. }
  136. /**
  137. * 断开连接
  138. */
  139. public void destroy() {
  140. try {
  141. this.mqttClient.disconnect();
  142. logger.info("mqtt 手动断开连接!!!");
  143. } catch (MqttException e) {
  144. //e.printStackTrace();
  145. logger.error("手动断开连接报错error={}",e.getMessage());
  146. }
  147. }
  148. /**
  149. * 设置连接
  150. * @param args
  151. */
  152. public static void main(String[] args) {
  153. MqttDataConnector xbMqttDataConnector =new MqttDataConnector();
  154. Map<String,String> configParams =new HashMap<>();
  155. String host="ws://www.rfcare.cn:8083";
  156. //String host="ssl://10.251.80.151:1883";
  157. configParams.put("MQTT_HOST",host);
  158. configParams.put("MQTT_CLIENTID","pinan05");
  159. configParams.put("MQTT_USERNAME","test");
  160. configParams.put("MQTT_PASSWORD","public");
  161. configParams.put("CLEAN_SESSION", "false");
  162. // configParams.put("MQTT_SSLTYPE","two");
  163. //
  164. // configParams.put("MQTT_SSLROOT_CRTPATH","./root.crt");
  165. // configParams.put("MQTT_SSLCLIENT_CRTPATH","./client.crt");
  166. // configParams.put("MQTT_SSLCLIENT_KEYPATH","./client.key");
  167. // configParams.put("MQTT_SSLPASSWORD","123456");
  168. // configParams.put("MQTT_SSLPROTOCOL","TLSv1.1");
  169. try {
  170. xbMqttDataConnector.createIotDataSource(configParams);
  171. MqttMessageListener mqttMessageListener=new MqttMessageListener();
  172. xbMqttDataConnector.setReSubscribe("5JPD/monitor/a1/event", mqttMessageListener);
  173. while (true){
  174. xbMqttDataConnector.publish("5JPD/monitor/a1/event","{\n" +
  175. " \"type\": \"FallDown\",\n" +
  176. " \"msg\": {\n" +
  177. " \"ts\": 1,\n" +
  178. " \"tid\": 2,\n" +
  179. " \"conf\": 3,\n" +
  180. " \"x\": 1.2,\n" +
  181. " \"y\": 0.3,\n" +
  182. " \"z\": 3.4\n" +
  183. " }\n" +
  184. "}");
  185. Thread.sleep(5000);
  186. }
  187. }catch ( Exception e) {
  188. e.printStackTrace();
  189. }
  190. }
  191. }