suntianwu 3 years ago
parent
commit
698956992c
1 changed files with 62 additions and 30 deletions
  1. 62 30
      src/main/java/com/care/bms/mqtt/MqttTool.java

+ 62 - 30
src/main/java/com/care/bms/mqtt/MqttTool.java

@@ -20,40 +20,12 @@ public class MqttTool {
                             CareStationService careStationService) {
         List<CareDevice> listCareDevice = careDeviceService.list();
         for (CareDevice careDevice : listCareDevice) {
-            if (StringUtils.isEmpty(careDevice.getTopic()) || StringUtils.isEmpty(careDevice.getClientId())) {
-                continue;
-            }
-            MqttDataConnector xbMqttDataConnector = new MqttDataConnector();
-            Map<String, String> configParams = new HashMap<>();
-            String host = "ws://" + CommonConfUtil.getConf("mqtt.broker") + ":" + CommonConfUtil.getConf("mqtt.port");
-            String cliendId = careDevice.getClientId();
-            String username = CommonConfUtil.getConf("mqtt.username");
-            String password = CommonConfUtil.getConf("mqtt.password");
-            String topic = careDevice.getTopic();
-            configParams.put("MQTT_HOST", host);
-            configParams.put("MQTT_CLIENTID", cliendId);
-            configParams.put("MQTT_USERNAME", username);
-            configParams.put("MQTT_PASSWORD", password);
-            try {
-                xbMqttDataConnector.createIotDataSource(configParams);
-                MqttMessageListener mqttMessageListener = new MqttMessageListener();
-                mqttMessageListener.setCareDeviceService(careDeviceService);
-                mqttMessageListener.setBmsEventOrderService(bmsEventOrderService);
-                mqttMessageListener.setCareEventOrderService(careEventOrderService);
-                mqttMessageListener.setCareHouseService(careHouseService);
-                mqttMessageListener.setCareOrganizationService(careOrganizationService);
-                mqttMessageListener.setCareStationService(careStationService);
-
-                xbMqttDataConnector.setReSubscribe(topic, mqttMessageListener);
-                mqttDataConnectorMap.put(cliendId, xbMqttDataConnector);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
+            addOne(careDevice,careDeviceService,bmsEventOrderService,careEventOrderService,careHouseService,careOrganizationService,careStationService);
         }
     }
 
     /**
-     * 当设备表变更时需要重新初始化
+     * 重新初始化
      * @param careDeviceService
      * @param bmsEventOrderService
      * @param careEventOrderService
@@ -74,4 +46,64 @@ public class MqttTool {
         init(careDeviceService,bmsEventOrderService,careEventOrderService,careHouseService,careOrganizationService,careStationService);
     }
 
+    /**
+     * 添加一个设备到服务
+     * @param careDeviceService
+     * @param bmsEventOrderService
+     * @param careEventOrderService
+     * @param careHouseService
+     * @param careOrganizationService
+     * @param careStationService
+     */
+    public static void addOne(CareDevice careDevice,
+                              CareDeviceService careDeviceService,
+                              BmsEventOrderService bmsEventOrderService,
+                              CareEventOrderService careEventOrderService,
+                              CareHouseService careHouseService,
+                              CareOrganizationService careOrganizationService,
+                              CareStationService careStationService) {
+        if (StringUtils.isEmpty(careDevice.getTopic()) || StringUtils.isEmpty(careDevice.getClientId())) {
+            return;
+        }
+        MqttDataConnector xbMqttDataConnector = new MqttDataConnector();
+        Map<String, String> configParams = new HashMap<>();
+        String host = "ws://" + CommonConfUtil.getConf("mqtt.broker") + ":" + CommonConfUtil.getConf("mqtt.port");
+        String cliendId = careDevice.getClientId();
+        String username = CommonConfUtil.getConf("mqtt.username");
+        String password = CommonConfUtil.getConf("mqtt.password");
+        String topic = careDevice.getTopic();
+        configParams.put("MQTT_HOST", host);
+        configParams.put("MQTT_CLIENTID", cliendId);
+        configParams.put("MQTT_USERNAME", username);
+        configParams.put("MQTT_PASSWORD", password);
+        try {
+            xbMqttDataConnector.createIotDataSource(configParams);
+            MqttMessageListener mqttMessageListener = new MqttMessageListener();
+            mqttMessageListener.setCareDeviceService(careDeviceService);
+            mqttMessageListener.setBmsEventOrderService(bmsEventOrderService);
+            mqttMessageListener.setCareEventOrderService(careEventOrderService);
+            mqttMessageListener.setCareHouseService(careHouseService);
+            mqttMessageListener.setCareOrganizationService(careOrganizationService);
+            mqttMessageListener.setCareStationService(careStationService);
+
+            xbMqttDataConnector.setReSubscribe(topic, mqttMessageListener);
+            mqttDataConnectorMap.put(cliendId, xbMqttDataConnector);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 从服务中去除一个设备
+     */
+    public static void removeOne(CareDevice careDevice) {
+        String clientId = careDevice.getClientId();
+        if (StringUtils.isEmpty(clientId)) {
+            return;
+        }
+        if (mqttDataConnectorMap.get(clientId) != null) {
+            mqttDataConnectorMap.get(clientId).destroy();
+            mqttDataConnectorMap.remove(clientId);
+        }
+    }
 }