suntianwu 3 gadi atpakaļ
vecāks
revīzija
6e554c1d0a

+ 16 - 0
pom.xml

@@ -246,6 +246,22 @@
             <artifactId>commons-beanutils</artifactId>
             <version>1.9.3</version>
         </dependency>
+        <!-- 引入MQTT -->
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk16</artifactId>
+            <version>1.46</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.66</version>
+        </dependency>
     </dependencies>
 
     <build>

+ 6 - 0
src/main/java/com/care/CareSpringStart.java

@@ -1,5 +1,6 @@
 package com.care;
 
+import com.care.bms.mqtt.MqttInit;
 import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -35,5 +36,10 @@ public class CareSpringStart {
         factory.addConnectorCustomizers((TomcatConnectorCustomizer) connector -> connector.setProperty("relaxedQueryChars", "|{}[]"));
         return factory;
     }
+
+    @Bean
+    public MqttInit mqttInit() {
+        return new MqttInit();
+    }
 }
 

+ 228 - 0
src/main/java/com/care/bms/mqtt/MqttDataConnector.java

@@ -0,0 +1,228 @@
+package com.care.bms.mqtt;
+
+import org.apache.commons.lang3.StringUtils;
+import org.eclipse.paho.client.mqttv3.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MqttDataConnector {
+
+
+    private static Logger logger = LoggerFactory.getLogger(MqttDataConnector.class);
+
+    private MqttClient mqttClient;
+    private MqttConnectOptions mqttConnectOptions;
+
+
+
+    /**
+     * 创建连接
+     * @param configParams
+     * @throws Exception
+     */
+    public   void  createIotDataSource(Map<String, String> configParams) throws  Exception {
+        //访问ip:port  tcp://   or  ssl://
+        String host = configParams.get("MQTT_HOST");
+        //客户端id 保持唯一
+        String clientId = configParams.get("MQTT_CLIENTID");
+        //用户名
+        String userName = configParams.get("MQTT_USERNAME");
+        //密码
+        String password = configParams.get("MQTT_PASSWORD");
+        //ssl  验证时是双向验证还是单向验证
+        String sslType=configParams.get("MQTT_SSLTYPE");
+
+
+        //缓存两种模式 存在内存 文件  设置成null 缓存在内存中 最多缓存65535条信息
+        //ScheduledExecutorService 可以设置线程池大小 默认10;发布消息方法是异步的
+        this.mqttClient = new MqttClient(host,clientId,null);//new MqttDefaultFilePersistence()
+        mqttConnectOptions = new MqttConnectOptions();
+        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
+        // 这里设置为true表示每次连接到服务器都以新的身份连接
+        mqttConnectOptions.setCleanSession(false);
+        // mqttConnectOptions.sto
+        // 设置超时时间 s
+        mqttConnectOptions.setConnectionTimeout(20);
+        // 设置会话心跳时间
+        mqttConnectOptions.setKeepAliveInterval(10);
+        mqttConnectOptions.setAutomaticReconnect(true);//设置自动重连
+        //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
+        // mqttConnectOptions.setWill("sec", "close".getBytes(), 2, true);
+        if("two".equals(sslType)){
+            //服务端证书路径
+            String rootCrtPath=configParams.get("MQTT_SSLROOT_CRTPATH");
+            //客户端证书路径
+            String clientCrtPath=configParams.get("MQTT_SSLCLIENT_CRTPATH");
+            //客户端密匙路径
+            String clientKeyPath=configParams.get("MQTT_SSLCLIENT_KEYPATH");
+            //密匙加密密码
+            String clientPassword=configParams.get("MQTT_SSLPASSWORD");
+            //ssl 协议版本 一般看mqtt服务端broker设置  不设置默认为TLSv1.1
+            String sslProtocol=configParams.get("MQTT_SSLPROTOCOL");
+            logger.info("sslProtocol======{}",sslProtocol);
+            mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactory(rootCrtPath, clientCrtPath, clientKeyPath, clientPassword,sslProtocol));
+
+        }else if("one".equals(sslType)){
+            String rootCrtPath=configParams.get("MQTT_SSLROOT_CRTPATH");
+            String sslProtocol=configParams.get("MQTT_SSLPROTOCOL");
+
+            mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactorySingle(rootCrtPath,sslProtocol));
+        }
+        if(StringUtils.isNotBlank(userName)){
+            mqttConnectOptions.setUserName(userName);
+        }
+        if (StringUtils.isNotBlank(password)){
+            mqttConnectOptions.setPassword(password.toCharArray());
+        }
+        //mqttConnectOptions.setWill();   //可以设置断线发送、接收消息
+        mqttClient.connect(mqttConnectOptions);
+        logger.info("mqtt 连接成功!!!");
+
+
+    }
+
+
+
+    /**
+     * 完成连接时 主要用于断开连接时 重新订阅
+     * 网上说 将 mqttConnectOptions.setCleanSession(false); 就可以继续接收,试了下没有效果,用的是开源版
+     * 的emq测试的,不知道企业版是否有效
+     *
+     * testTopic/#   #多层通配符    +单层通配符
+     * topicFilter
+     * @param topicFilter
+     * @param iotMqttMessageListener
+     * @throws Exception
+     */
+    public void setReSubscribe(String topicFilter, MqttMessageListener iotMqttMessageListener) throws Exception {
+        //先订阅一次
+        mqttClient.subscribe(topicFilter,iotMqttMessageListener);
+        //断线重连
+        mqttClient.setCallback(new MqttCallbackExtended() {
+            @Override
+            public void connectComplete(boolean reconnect, String serverURI) {
+                if(reconnect){
+                    try {
+                        mqttClient.subscribe(topicFilter,iotMqttMessageListener);
+                        logger.info("mqtt重新建立连接后,topic={} 重新订阅!",topicFilter);
+                    } catch (MqttException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+            @Override
+            public void connectionLost(Throwable cause) {
+
+            }
+
+            @Override
+            public void messageArrived(String topic, MqttMessage message) throws Exception {
+
+            }
+
+            @Override
+            public void deliveryComplete(IMqttDeliveryToken token) {
+
+            }
+        });
+    }
+
+    /**
+     * 取消订阅
+     *
+     * @param topics
+     * @throws Exception
+     */
+    public void unsubscribe(String[] topics)  throws Exception{
+        mqttClient.unsubscribe(topics);
+
+
+    }
+    /**
+     * 是否处于连接状态
+     * @return
+     */
+    public boolean isConnected() {
+        return mqttClient.isConnected();
+    }
+
+
+    /**
+     * 发布数据
+     * @param
+     * @param data
+     * @throws Exception
+     */
+    public void publish( String publicLiveTopic, String data)   throws Exception{
+        MqttMessage mqttMessage = new MqttMessage();
+        mqttMessage.setPayload(data.getBytes("UTF-8"));
+        //QoS:发布消息的服务质量,即:保证消息传递的次数(消费者收到的次数)
+        //0:最多一次,即:<=1;每个消息只发一次,也不会缓存下来。
+        //1:至少一次,即:>=1;一直发送确保消费者至少收到一次,发送失败会缓存下来。
+        //2:一次,即:=1       一直发送确保消费者只能收到一次;发送失败会缓存下来 。
+        mqttMessage.setQos(1);
+        //消费者断开连接后是否接受离线消息
+        mqttMessage.setRetained(true);
+        mqttClient.publish(publicLiveTopic,mqttMessage);
+        logger.info("topic:{} send  dataSize {}kb ",publicLiveTopic,data.length()/1024.0);
+
+    }
+
+    /**
+     * 断开连接
+     */
+    public void destroy() {
+        try {
+            this.mqttClient.disconnect();
+            logger.info("mqtt 手动断开连接!!!");
+        } catch (MqttException e) {
+            //e.printStackTrace();
+            logger.error("手动断开连接报错error={}",e.getMessage());
+        }
+    }
+
+
+
+    /**
+     * 设置连接
+     * @param args
+     */
+    public static void main(String[] args) {
+        MqttDataConnector xbMqttDataConnector =new MqttDataConnector();
+        Map<String,String> configParams =new HashMap<>();
+        String host="ws://www.rfcare.cn:8083";
+        //String host="ssl://10.251.80.151:1883";
+        configParams.put("MQTT_HOST",host);
+        configParams.put("MQTT_CLIENTID","pinan05");
+        configParams.put("MQTT_USERNAME","test");
+        configParams.put("MQTT_PASSWORD","public");
+//        configParams.put("MQTT_SSLTYPE","two");
+//
+//        configParams.put("MQTT_SSLROOT_CRTPATH","./root.crt");
+//        configParams.put("MQTT_SSLCLIENT_CRTPATH","./client.crt");
+//        configParams.put("MQTT_SSLCLIENT_KEYPATH","./client.key");
+//        configParams.put("MQTT_SSLPASSWORD","123456");
+//        configParams.put("MQTT_SSLPROTOCOL","TLSv1.1");
+
+
+        try {
+
+            xbMqttDataConnector.createIotDataSource(configParams);
+            MqttMessageListener mqttMessageListener=new MqttMessageListener();
+            xbMqttDataConnector.setReSubscribe("5JPD/monitor/a1/event", new MqttMessageListener());
+
+            while (true){
+                xbMqttDataConnector.publish("5JPD/monitor/a1/event","hello mqtt!999");
+                Thread.sleep(5000);
+            }
+
+        }catch ( Exception e) {
+            e.printStackTrace();
+        }
+
+    }
+}

+ 45 - 0
src/main/java/com/care/bms/mqtt/MqttInit.java

@@ -0,0 +1,45 @@
+package com.care.bms.mqtt;
+
+
+
+
+import com.care.bms.service.BmsEventOrderService;
+
+import com.care.common.service.*;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.core.annotation.Order;
+
+
+
+@Order(1)
+public class MqttInit implements CommandLineRunner {
+
+    private static final Logger logger = LogManager.getLogger(MqttInit.class);
+
+    @Autowired
+    private CareDeviceService careDeviceService;
+    @Autowired
+    private BmsEventOrderService bmsEventOrderService;
+    @Autowired
+    private CareEventOrderService careEventOrderService;
+    @Autowired
+    private CareHouseDeviceRelService careHouseDeviceRelService;
+    @Autowired
+    private CareHouseService careHouseService;
+    @Autowired
+    private CareOrganizationService careOrganizationService;
+    @Autowired
+    private CareStationService careStationService;
+
+    @Override
+    public void run(String... args) throws Exception {
+        logger.info("初始化MQTTstart ...........................");
+        MqttTool.init(careDeviceService,bmsEventOrderService,careEventOrderService,careHouseDeviceRelService,careHouseService,careOrganizationService,careStationService);
+        logger.info("初始化MQTTend ...........................");
+    }
+
+}
+

+ 165 - 0
src/main/java/com/care/bms/mqtt/MqttMessageListener.java

@@ -0,0 +1,165 @@
+package com.care.bms.mqtt;
+
+
+import cn.hutool.core.date.DateUtil;
+
+import com.alibaba.fastjson.JSON;
+
+import java.util.Map;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.care.bms.service.BmsEventOrderService;
+import com.care.common.entity.CareDevice;
+import com.care.common.entity.CareEventOrder;
+import com.care.common.entity.CareHouse;
+import com.care.common.entity.CareHouseDeviceRel;
+import com.care.common.enums.OrderTypeEnum;
+import com.care.common.service.*;
+import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+
+/**
+ * 订阅信息监听类
+ */
+public class MqttMessageListener implements IMqttMessageListener {
+    private CareDeviceService careDeviceService;
+    private BmsEventOrderService bmsEventOrderService;
+    private CareEventOrderService careEventOrderService;
+    private CareHouseDeviceRelService careHouseDeviceRelService;
+    private CareHouseService careHouseService;
+    private CareOrganizationService careOrganizationService;
+    private CareStationService careStationService;
+
+
+    public CareOrganizationService getCareOrganizationService() {
+        return careOrganizationService;
+    }
+
+    public void setCareOrganizationService(CareOrganizationService careOrganizationService) {
+        this.careOrganizationService = careOrganizationService;
+    }
+
+    public CareStationService getCareStationService() {
+        return careStationService;
+    }
+
+    public void setCareStationService(CareStationService careStationService) {
+        this.careStationService = careStationService;
+    }
+
+    public CareHouseDeviceRelService getCareHouseDeviceRelService() {
+        return careHouseDeviceRelService;
+    }
+
+    public void setCareHouseDeviceRelService(CareHouseDeviceRelService careHouseDeviceRelService) {
+        this.careHouseDeviceRelService = careHouseDeviceRelService;
+    }
+
+    public CareHouseService getCareHouseService() {
+        return careHouseService;
+    }
+
+    public void setCareHouseService(CareHouseService careHouseService) {
+        this.careHouseService = careHouseService;
+    }
+
+
+    public CareEventOrderService getCareEventOrderService() {
+        return careEventOrderService;
+    }
+
+    public void setCareEventOrderService(CareEventOrderService careEventOrderService) {
+        this.careEventOrderService = careEventOrderService;
+    }
+
+
+    public BmsEventOrderService getBmsEventOrderService() {
+        return bmsEventOrderService;
+    }
+
+    public void setBmsEventOrderService(BmsEventOrderService bmsEventOrderService) {
+        this.bmsEventOrderService = bmsEventOrderService;
+    }
+
+    public CareDeviceService getCareDeviceService() {
+        return careDeviceService;
+    }
+
+    public void setCareDeviceService(CareDeviceService careDeviceService) {
+        this.careDeviceService = careDeviceService;
+    }
+
+
+    @Override
+    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
+
+//        System.out.println(mqttMessage.getId());
+//        System.out.println(mqttMessage.getPayload());
+//        System.out.println(mqttMessage.getQos());
+//        System.out.println(mqttMessage.isRetained());
+//        System.out.println(mqttMessage.isDuplicate());
+
+        String mqttMessageStr = mqttMessage.toString();
+        System.out.println("mqttMessageStr == " + mqttMessageStr);
+
+        if(mqttMessageStr == null || !mqttMessageStr.startsWith("{")){
+            return;
+        }
+        QueryWrapper<CareDevice> queryWrapper = new QueryWrapper<>();
+        queryWrapper.lambda().eq(CareDevice::getTopic,topic);
+        if(careDeviceService == null){
+            return;
+        }
+        CareDevice careDevice = careDeviceService.getOne(queryWrapper);
+        if(careDevice == null) {
+            System.out.println("数据库中没找到对应设备");
+            return;
+        }
+        try {
+            Map map = JSON.parseObject(mqttMessageStr);
+            String type = (String) map.get("type");
+            if ("BreathHeartRate".equals(type)) {
+                Map msg = (Map) map.get("msg");
+                Integer breath = (Integer) map.get("breath");
+                Integer heart = (Integer) map.get("heart");
+                careDevice.setBreathRate(breath);
+                careDevice.setHeartRate(heart);
+                careDeviceService.updateById(careDevice);
+            } else if ("FallDown".equals(type)) {
+                saveOrder(careDevice, OrderTypeEnum.DI_EDAO.getValue());
+            } else if ("StayTooLong".equals(type)) {
+                saveOrder(careDevice, OrderTypeEnum.JIU_ZHI.getValue());
+            }
+        } catch (Exception e){
+            e.printStackTrace();
+        }
+        System.out.println(topic);
+    }
+
+    private void saveOrder(CareDevice careDevice,String orderType ){
+        QueryWrapper<CareHouseDeviceRel> queryWrapperCareHouseDeviceRel = new QueryWrapper<>();
+        queryWrapperCareHouseDeviceRel.lambda().eq(CareHouseDeviceRel::getDevId,careDevice.getId());
+        CareHouseDeviceRel careHouseDeviceRel = careHouseDeviceRelService.getOne(queryWrapperCareHouseDeviceRel);
+
+        CareHouse careHouse = careHouseService.getById(careHouseDeviceRel.getHouseId());
+
+        CareEventOrder order = new CareEventOrder();
+        order.setOrgId(careHouse.getOrgId());
+        order.setOrgName(careOrganizationService.getById(careHouse.getOrgId()).getName());
+        order.setStationId(careHouse.getStationId());
+        order.setStationName(careStationService.getById(careHouse.getStationId()).getName());
+        order.setHouseId(careHouse.getId());
+        order.setHouseName(careHouse.getName());
+        order.setTitle(careHouse.getAddr());
+        order.setOrderType(orderType);
+        order.setStatus("0");
+        order.setCreateTime(DateUtil.date());
+        order.setModifyTime(DateUtil.date());
+        careEventOrderService.save(order);
+        bmsEventOrderService.addOlderToOrderOlder(order);
+        bmsEventOrderService.addChamberlainToOrder(order);
+        bmsEventOrderService.addContactToOrderContact(order);
+    }
+
+}

+ 81 - 0
src/main/java/com/care/bms/mqtt/MqttTool.java

@@ -0,0 +1,81 @@
+package com.care.bms.mqtt;
+
+import com.care.bms.service.BmsEventOrderService;
+import com.care.common.entity.CareDevice;
+import com.care.common.service.*;
+import com.care.common.util.CommonConfUtil;
+import org.springframework.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MqttTool {
+    private static Map<String, MqttDataConnector> mqttDataConnectorMap = new HashMap<>();
+    public static void init(CareDeviceService careDeviceService,
+                            BmsEventOrderService bmsEventOrderService,
+                            CareEventOrderService careEventOrderService,
+                            CareHouseDeviceRelService careHouseDeviceRelService,
+                            CareHouseService careHouseService,
+                            CareOrganizationService careOrganizationService,
+                            CareStationService careStationService) {
+        List<CareDevice> listCareDevice = careDeviceService.list();
+        for (CareDevice careDevice : listCareDevice) {
+            if (StringUtils.isEmpty(careDevice.getTopic()) || StringUtils.isEmpty(careDevice.getClientId())) {
+                continue;
+            }
+            MqttDataConnector xbMqttDataConnector = new MqttDataConnector();
+            Map<String, String> configParams = new HashMap<>();
+            String host = "ws://" + CommonConfUtil.getConf("mqtt.broker") + ":" + CommonConfUtil.getConf("mqtt.port");
+            String cliendId = careDevice.getClientId();
+            String username = CommonConfUtil.getConf("mqtt.username");
+            String password = CommonConfUtil.getConf("mqtt.password");
+            String topic = careDevice.getTopic();
+            configParams.put("MQTT_HOST", host);
+            configParams.put("MQTT_CLIENTID", cliendId);
+            configParams.put("MQTT_USERNAME", username);
+            configParams.put("MQTT_PASSWORD", password);
+            try {
+                xbMqttDataConnector.createIotDataSource(configParams);
+                MqttMessageListener mqttMessageListener = new MqttMessageListener();
+                mqttMessageListener.setCareDeviceService(careDeviceService);
+                mqttMessageListener.setBmsEventOrderService(bmsEventOrderService);
+                mqttMessageListener.setCareEventOrderService(careEventOrderService);
+                mqttMessageListener.setCareHouseDeviceRelService(careHouseDeviceRelService);
+                mqttMessageListener.setCareHouseService(careHouseService);
+                mqttMessageListener.setCareOrganizationService(careOrganizationService);
+                mqttMessageListener.setCareStationService(careStationService);
+
+                xbMqttDataConnector.setReSubscribe(topic, mqttMessageListener);
+                mqttDataConnectorMap.put(cliendId, xbMqttDataConnector);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * 当设备表变更时需要重新初始化
+     * @param careDeviceService
+     * @param bmsEventOrderService
+     * @param careEventOrderService
+     * @param careHouseDeviceRelService
+     * @param careHouseService
+     * @param careOrganizationService
+     * @param careStationService
+     */
+    public static void reInit(CareDeviceService careDeviceService,
+                              BmsEventOrderService bmsEventOrderService,
+                              CareEventOrderService careEventOrderService,
+                              CareHouseDeviceRelService careHouseDeviceRelService,
+                              CareHouseService careHouseService,
+                              CareOrganizationService careOrganizationService,
+                              CareStationService careStationService) {
+        mqttDataConnectorMap.forEach((key, value) -> {
+            value.destroy();
+            mqttDataConnectorMap.remove(key);
+        });
+        init(careDeviceService,bmsEventOrderService,careEventOrderService,careHouseDeviceRelService,careHouseService,careOrganizationService,careStationService);
+    }
+
+}

+ 97 - 0
src/main/java/com/care/bms/mqtt/SslUtil.java

@@ -0,0 +1,97 @@
+package com.care.bms.mqtt;
+
+import org.apache.commons.lang3.StringUtils;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.openssl.PEMReader;
+import org.bouncycastle.openssl.PasswordFinder;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.security.Security;
+import java.security.cert.X509Certificate;
+
+public class SslUtil {
+    public static SSLSocketFactory getSocketFactory(final String caCrtFile, final String crtFile, final String keyFile,
+                                                    final String password, String protocal) throws Exception {
+        Security.addProvider(new BouncyCastleProvider());
+        // load CA certificate
+        PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile)))));
+        X509Certificate caCert = (X509Certificate)reader.readObject();
+        reader.close();
+
+        // load client certificate
+        reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(crtFile)))));
+        X509Certificate cert = (X509Certificate)reader.readObject();
+        reader.close();
+
+        // load client private key
+        reader = new PEMReader(
+                new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFile)))),
+                new PasswordFinder() {
+                    @Override
+                    public char[] getPassword() {
+                        return password.toCharArray();
+                    }
+                }
+        );
+        KeyPair key = (KeyPair)reader.readObject();
+        reader.close();
+
+        // CA certificate is used to authenticate server
+        KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
+        caKs.load(null, null);
+        caKs.setCertificateEntry("ca-certificate", caCert);
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        tmf.init(caKs);
+
+        // client key and certificates are sent to server so it can authenticate us
+        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+        ks.load(null, null);
+        ks.setCertificateEntry("certificate", cert);
+        ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), new java.security.cert.Certificate[]{cert});
+        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        kmf.init(ks, password.toCharArray());
+
+        // 空时 默认
+        if(StringUtils.isBlank(protocal)){
+            protocal= "TLSv1.1";
+        }
+
+        SSLContext context = SSLContext.getInstance(protocal);//"TLSv1.1"
+        context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+        return context.getSocketFactory();
+    }
+
+
+    public static SSLSocketFactory getSocketFactorySingle(final String caCrtFile, String protocol) throws Exception {
+        Security.addProvider(new BouncyCastleProvider());
+
+        // load CA certificate
+        PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile)))));
+        X509Certificate caCert = (X509Certificate)reader.readObject();
+        reader.close();
+        // client key and certificates are sent to server so it can authenticate us
+        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());//"JKS"
+        ks.load(null, null);
+        ks.setCertificateEntry("ca-certificate", caCert);
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());//"PKIX"
+        tmf.init(ks);
+        // finally, create SSL socket factory
+        if(StringUtils.isBlank(protocol)){
+            protocol= "TLSv1.1";
+        }
+        SSLContext context = SSLContext.getInstance(protocol);//"TLSv1.1"
+        context.init(null, tmf.getTrustManagers(), new SecureRandom());
+        return context.getSocketFactory();
+    }
+
+}

+ 6 - 0
src/main/java/com/care/common/entity/CareDevice.java

@@ -91,5 +91,11 @@ public class CareDevice implements Serializable {
     @TableField("HEART_RATE")
     private Integer heartRate;
 
+    @ApiModelProperty("设备mqtt消息topic")
+    @TableField("topic")
+    private String topic;
 
+    @ApiModelProperty("设备mqtt消息客户端ID")
+    @TableField("client_id")
+    private String clientId;
 }

+ 7 - 1
src/main/resources/common.properties

@@ -6,4 +6,10 @@ auth.jwt.secret=hong1mu2zhi3ruan4jian5
 auth.jwt.ttl.ms=86400000
 
 ############# 考拉外呼录音文件地址 #####################
-koala.mp3.url.host=http://49.235.128.177:8080/home/sharedfs/records
+koala.mp3.url.host=http://49.235.128.177:8080/home/sharedfs/records
+
+############# 设备MQTT消息配置 #####################
+mqtt.broker=www.rfcare.cn
+mqtt.port=8083
+mqtt.username=test
+mqtt.password=public