package com.care.mqtt.tool; import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; public class MqttDataConnector { private static Logger logger = LoggerFactory.getLogger(MqttDataConnector.class); private MqttClient mqttClient; private MqttConnectOptions mqttConnectOptions; /** * 创建连接 * @param configParams * @throws Exception */ public void createIotDataSource(Map configParams) throws Exception { //访问ip:port tcp:// or ssl:// String host = configParams.get("MQTT_HOST"); //客户端id 保持唯一 String clientId = configParams.get("MQTT_CLIENTID"); //用户名 String userName = configParams.get("MQTT_USERNAME"); //密码 String password = configParams.get("MQTT_PASSWORD"); //ssl 验证时是双向验证还是单向验证 String sslType = configParams.get("MQTT_SSLTYPE"); String cleanSession = configParams.get("CLEAN_SESSION"); //缓存两种模式 存在内存 文件 设置成null 缓存在内存中 最多缓存65535条信息 //ScheduledExecutorService 可以设置线程池大小 默认10;发布消息方法是异步的 this.mqttClient = new MqttClient(host,clientId,null);//new MqttDefaultFilePersistence() mqttConnectOptions = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 mqttConnectOptions.setCleanSession(Boolean.parseBoolean(cleanSession)); // mqttConnectOptions.sto // 设置超时时间 s mqttConnectOptions.setConnectionTimeout(30); // 设置会话心跳时间 mqttConnectOptions.setKeepAliveInterval(60); mqttConnectOptions.setAutomaticReconnect(true);//设置自动重连 //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 // mqttConnectOptions.setWill("sec", "close".getBytes(), 2, true); if("two".equals(sslType)){ //服务端证书路径 String rootCrtPath = configParams.get("MQTT_SSLROOT_CRTPATH"); //客户端证书路径 String clientCrtPath = configParams.get("MQTT_SSLCLIENT_CRTPATH"); //客户端密匙路径 String clientKeyPath = configParams.get("MQTT_SSLCLIENT_KEYPATH"); //密匙加密密码 String clientPassword = configParams.get("MQTT_SSLPASSWORD"); //ssl 协议版本 一般看mqtt服务端broker设置 不设置默认为TLSv1.1 String sslProtocol = configParams.get("MQTT_SSLPROTOCOL"); logger.info("sslProtocol======{}",sslProtocol); mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactory(rootCrtPath, clientCrtPath, clientKeyPath, clientPassword,sslProtocol)); }else if("one".equals(sslType)){ String rootCrtPath = configParams.get("MQTT_SSLROOT_CRTPATH"); String sslProtocol = configParams.get("MQTT_SSLPROTOCOL"); mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactorySingle(rootCrtPath,sslProtocol)); } if(StringUtils.isNotBlank(userName)){ mqttConnectOptions.setUserName(userName); } if (StringUtils.isNotBlank(password)){ mqttConnectOptions.setPassword(password.toCharArray()); } //mqttConnectOptions.setWill(); //可以设置断线发送、接收消息 mqttClient.connect(mqttConnectOptions); logger.info("mqtt 连接成功!!!"); } /** * * testTopic/# #多层通配符 +单层通配符 * topicFilter * @param topicFilter * @param iotMqttMessageListener * @throws Exception */ public void setReSubscribe(String topicFilter, MqttMessageListener iotMqttMessageListener) throws Exception { int qos = 2;//0:最多一次 、1:最少一次 、2:只有一次 mqttClient.subscribe(topicFilter,qos, iotMqttMessageListener); mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { logger.info("reconnect ---> " + reconnect + " serverURI--->" + serverURI); } @Override public void connectionLost(Throwable cause) { logger.error("connectionLost cause ---> " + cause); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); } /** * 取消订阅 * * @param topics * @throws Exception */ public void unsubscribe(String[] topics) throws Exception{ mqttClient.unsubscribe(topics); } /** * 是否处于连接状态 * @return */ public boolean isConnected() { return mqttClient.isConnected(); } /** * 发布数据 * @param * @param data * @throws Exception */ public void publish(String publicLiveTopic, String data) throws Exception{ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(data.getBytes("UTF-8")); //QoS:发布消息的服务质量,即:保证消息传递的次数(消费者收到的次数) //0:最多一次,即:<=1;每个消息只发一次,也不会缓存下来。 //1:至少一次,即:>=1;一直发送确保消费者至少收到一次,发送失败会缓存下来。 //2:一次,即:=1 一直发送确保消费者只能收到一次;发送失败会缓存下来 。 mqttMessage.setQos(1); //消费者断开连接后是否接受离线消息 mqttMessage.setRetained(true); mqttClient.publish(publicLiveTopic,mqttMessage); logger.info("topic:{} send content {} ",publicLiveTopic,data); logger.info("topic:{} send dataSize {}kb ",publicLiveTopic,data.length()/1024.0); } /** * 断开连接 */ public void destroy() { try { this.mqttClient.disconnect(); logger.info("mqtt 手动断开连接!!!"); } catch (MqttException e) { //e.printStackTrace(); logger.error("手动断开连接报错error={}",e.getMessage()); } } /** * 设置连接 * @param args */ public static void main(String[] args) { MqttDataConnector xbMqttDataConnector =new MqttDataConnector(); Map configParams =new HashMap<>(); String host="ws://www.rfcare.cn:8083"; //String host="ssl://10.251.80.151:1883"; configParams.put("MQTT_HOST",host); configParams.put("MQTT_CLIENTID","pinan05"); configParams.put("MQTT_USERNAME","test"); configParams.put("MQTT_PASSWORD","public"); configParams.put("CLEAN_SESSION", "false"); // configParams.put("MQTT_SSLTYPE","two"); // // configParams.put("MQTT_SSLROOT_CRTPATH","./root.crt"); // configParams.put("MQTT_SSLCLIENT_CRTPATH","./client.crt"); // configParams.put("MQTT_SSLCLIENT_KEYPATH","./client.key"); // configParams.put("MQTT_SSLPASSWORD","123456"); // configParams.put("MQTT_SSLPROTOCOL","TLSv1.1"); try { xbMqttDataConnector.createIotDataSource(configParams); MqttMessageListener mqttMessageListener=new MqttMessageListener(); xbMqttDataConnector.setReSubscribe("5JPD/monitor/a1/event", mqttMessageListener); while (true){ xbMqttDataConnector.publish("5JPD/monitor/a1/event","{\n" + " \"type\": \"FallDown\",\n" + " \"msg\": {\n" + " \"ts\": 1,\n" + " \"tid\": 2,\n" + " \"conf\": 3,\n" + " \"x\": 1.2,\n" + " \"y\": 0.3,\n" + " \"z\": 3.4\n" + " }\n" + "}"); Thread.sleep(5000); } }catch ( Exception e) { e.printStackTrace(); } } }