123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- package com.care.mqtt.tool;
- import org.apache.commons.lang3.StringUtils;
- import org.eclipse.paho.client.mqttv3.*;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.HashMap;
- import java.util.Map;
- public class MqttDataConnector {
- private static Logger logger = LoggerFactory.getLogger(MqttDataConnector.class);
- private MqttClient mqttClient;
- private MqttConnectOptions mqttConnectOptions;
- /**
- * 创建连接
- * @param configParams
- * @throws Exception
- */
- public void createIotDataSource(Map<String, String> configParams) throws Exception {
- //访问ip:port tcp:// or ssl://
- String host = configParams.get("MQTT_HOST");
- //客户端id 保持唯一
- String clientId = configParams.get("MQTT_CLIENTID");
- //用户名
- String userName = configParams.get("MQTT_USERNAME");
- //密码
- String password = configParams.get("MQTT_PASSWORD");
- //ssl 验证时是双向验证还是单向验证
- String sslType = configParams.get("MQTT_SSLTYPE");
- String cleanSession = configParams.get("CLEAN_SESSION");
- //缓存两种模式 存在内存 文件 设置成null 缓存在内存中 最多缓存65535条信息
- //ScheduledExecutorService 可以设置线程池大小 默认10;发布消息方法是异步的
- this.mqttClient = new MqttClient(host,clientId,null);//new MqttDefaultFilePersistence()
- mqttConnectOptions = new MqttConnectOptions();
- // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
- // 这里设置为true表示每次连接到服务器都以新的身份连接
- mqttConnectOptions.setCleanSession(Boolean.parseBoolean(cleanSession));
- // mqttConnectOptions.sto
- // 设置超时时间 s
- mqttConnectOptions.setConnectionTimeout(30);
- // 设置会话心跳时间
- mqttConnectOptions.setKeepAliveInterval(60);
- mqttConnectOptions.setAutomaticReconnect(true);//设置自动重连
- //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
- // mqttConnectOptions.setWill("sec", "close".getBytes(), 2, true);
- if("two".equals(sslType)){
- //服务端证书路径
- String rootCrtPath = configParams.get("MQTT_SSLROOT_CRTPATH");
- //客户端证书路径
- String clientCrtPath = configParams.get("MQTT_SSLCLIENT_CRTPATH");
- //客户端密匙路径
- String clientKeyPath = configParams.get("MQTT_SSLCLIENT_KEYPATH");
- //密匙加密密码
- String clientPassword = configParams.get("MQTT_SSLPASSWORD");
- //ssl 协议版本 一般看mqtt服务端broker设置 不设置默认为TLSv1.1
- String sslProtocol = configParams.get("MQTT_SSLPROTOCOL");
- logger.info("sslProtocol======{}",sslProtocol);
- mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactory(rootCrtPath, clientCrtPath, clientKeyPath, clientPassword,sslProtocol));
- }else if("one".equals(sslType)){
- String rootCrtPath = configParams.get("MQTT_SSLROOT_CRTPATH");
- String sslProtocol = configParams.get("MQTT_SSLPROTOCOL");
- mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactorySingle(rootCrtPath,sslProtocol));
- }
- if(StringUtils.isNotBlank(userName)){
- mqttConnectOptions.setUserName(userName);
- }
- if (StringUtils.isNotBlank(password)){
- mqttConnectOptions.setPassword(password.toCharArray());
- }
- //mqttConnectOptions.setWill(); //可以设置断线发送、接收消息
- mqttClient.connect(mqttConnectOptions);
- logger.info("mqtt 连接成功!!!");
- }
- /**
- *
- * testTopic/# #多层通配符 +单层通配符
- * topicFilter
- * @param topicFilter
- * @param iotMqttMessageListener
- * @throws Exception
- */
- public void setReSubscribe(String topicFilter, MqttMessageListener iotMqttMessageListener) throws Exception {
- int qos = 2;//0:最多一次 、1:最少一次 、2:只有一次
- mqttClient.subscribe(topicFilter,qos, iotMqttMessageListener);
- mqttClient.setCallback(new MqttCallbackExtended() {
- @Override
- public void connectComplete(boolean reconnect, String serverURI) {
- logger.info("reconnect ---> " + reconnect + " serverURI--->" + serverURI);
- }
- @Override
- public void connectionLost(Throwable cause) {
- logger.error("connectionLost cause ---> " + cause);
- }
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- }
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- }
- });
- }
- /**
- * 取消订阅
- *
- * @param topics
- * @throws Exception
- */
- public void unsubscribe(String[] topics) throws Exception{
- mqttClient.unsubscribe(topics);
- }
- /**
- * 是否处于连接状态
- * @return
- */
- public boolean isConnected() {
- return mqttClient.isConnected();
- }
- /**
- * 发布数据
- * @param
- * @param data
- * @throws Exception
- */
- public void publish(String publicLiveTopic, String data) throws Exception{
- MqttMessage mqttMessage = new MqttMessage();
- mqttMessage.setPayload(data.getBytes("UTF-8"));
- //QoS:发布消息的服务质量,即:保证消息传递的次数(消费者收到的次数)
- //0:最多一次,即:<=1;每个消息只发一次,也不会缓存下来。
- //1:至少一次,即:>=1;一直发送确保消费者至少收到一次,发送失败会缓存下来。
- //2:一次,即:=1 一直发送确保消费者只能收到一次;发送失败会缓存下来 。
- mqttMessage.setQos(1);
- //消费者断开连接后是否接受离线消息
- mqttMessage.setRetained(true);
- mqttClient.publish(publicLiveTopic,mqttMessage);
- logger.info("topic:{} send content {} ",publicLiveTopic,data);
- logger.info("topic:{} send dataSize {}kb ",publicLiveTopic,data.length()/1024.0);
- }
- /**
- * 断开连接
- */
- public void destroy() {
- try {
- this.mqttClient.disconnect();
- logger.info("mqtt 手动断开连接!!!");
- } catch (MqttException e) {
- //e.printStackTrace();
- logger.error("手动断开连接报错error={}",e.getMessage());
- }
- }
- /**
- * 设置连接
- * @param args
- */
- public static void main(String[] args) {
- MqttDataConnector xbMqttDataConnector =new MqttDataConnector();
- Map<String,String> configParams =new HashMap<>();
- String host="ws://www.rfcare.cn:8083";
- //String host="ssl://10.251.80.151:1883";
- configParams.put("MQTT_HOST",host);
- configParams.put("MQTT_CLIENTID","pinan05");
- configParams.put("MQTT_USERNAME","test");
- configParams.put("MQTT_PASSWORD","public");
- configParams.put("CLEAN_SESSION", "false");
- // configParams.put("MQTT_SSLTYPE","two");
- //
- // configParams.put("MQTT_SSLROOT_CRTPATH","./root.crt");
- // configParams.put("MQTT_SSLCLIENT_CRTPATH","./client.crt");
- // configParams.put("MQTT_SSLCLIENT_KEYPATH","./client.key");
- // configParams.put("MQTT_SSLPASSWORD","123456");
- // configParams.put("MQTT_SSLPROTOCOL","TLSv1.1");
- try {
- xbMqttDataConnector.createIotDataSource(configParams);
- MqttMessageListener mqttMessageListener=new MqttMessageListener();
- xbMqttDataConnector.setReSubscribe("5JPD/monitor/a1/event", mqttMessageListener);
- while (true){
- xbMqttDataConnector.publish("5JPD/monitor/a1/event","{\n" +
- " \"type\": \"FallDown\",\n" +
- " \"msg\": {\n" +
- " \"ts\": 1,\n" +
- " \"tid\": 2,\n" +
- " \"conf\": 3,\n" +
- " \"x\": 1.2,\n" +
- " \"y\": 0.3,\n" +
- " \"z\": 3.4\n" +
- " }\n" +
- "}");
- Thread.sleep(5000);
- }
- }catch ( Exception e) {
- e.printStackTrace();
- }
- }
- }
|