MqttConnectorPoolService.java 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package com.care.mqtt.service;
  2. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  3. import com.care.mqtt.tool.MqttDataConnector;
  4. import com.care.mqtt.tool.MqttMessageListener;
  5. import com.care.mqtt.tool.MqttDataConnectorPool;
  6. import com.care.common.entity.*;
  7. import com.care.common.enums.*;
  8. import com.care.common.service.*;
  9. import com.care.common.util.CommonConfUtil;
  10. import org.springframework.stereotype.Service;
  11. import org.springframework.util.StringUtils;
  12. import javax.annotation.Resource;
  13. import java.util.*;
  14. /**
  15. * @Author: stw
  16. * @Date: 2021/8/14
  17. * @Desc:
  18. */
  19. @Service
  20. public class MqttConnectorPoolService {
  21. @Resource
  22. private CareDeviceService careDeviceService;
  23. @Resource
  24. private MqttMsgService mqttMsgService;
  25. public void init() {
  26. QueryWrapper<CareDevice> queryWrapper = new QueryWrapper();
  27. queryWrapper.lambda().eq(CareDevice::getActiveStatus, DeviceActiveStatusEnum.ACTIVE.getValue());
  28. List<CareDevice> listCareDevice = careDeviceService.list(queryWrapper);
  29. for (CareDevice careDevice : listCareDevice) {
  30. addOne(careDevice);
  31. }
  32. }
  33. /**
  34. * 添加一个设备到服务
  35. */
  36. public MqttDataConnector addOne(CareDevice careDevice) {
  37. if (StringUtils.isEmpty(careDevice.getTopic()) || StringUtils.isEmpty(careDevice.getClientId())) {
  38. return null;
  39. }
  40. String cliendId = careDevice.getClientId();
  41. MqttDataConnector xbMqttDataConnector = MqttDataConnectorPool.getMqttDataConnectorMap().get(cliendId);
  42. if ( xbMqttDataConnector != null) {
  43. return xbMqttDataConnector;
  44. }
  45. xbMqttDataConnector = new MqttDataConnector();
  46. Map<String, String> configParams = new HashMap<>();
  47. String host = "ws://" + CommonConfUtil.getConf("mqtt.broker") + ":" + CommonConfUtil.getConf("mqtt.port");
  48. String username = CommonConfUtil.getConf("mqtt.username");
  49. String password = CommonConfUtil.getConf("mqtt.password");
  50. String topic = careDevice.getTopic();
  51. configParams.put("MQTT_HOST", host);
  52. configParams.put("MQTT_CLIENTID", cliendId);
  53. configParams.put("MQTT_USERNAME", username);
  54. configParams.put("MQTT_PASSWORD", password);
  55. try {
  56. xbMqttDataConnector.createIotDataSource(configParams);
  57. MqttMessageListener mqttMessageListener = new MqttMessageListener();
  58. mqttMessageListener.setMqttMsgService(mqttMsgService);
  59. xbMqttDataConnector.setReSubscribe(topic, mqttMessageListener);
  60. MqttDataConnectorPool.getMqttDataConnectorMap().put(cliendId, xbMqttDataConnector);
  61. return xbMqttDataConnector;
  62. } catch (Exception e) {
  63. e.printStackTrace();
  64. }
  65. return null;
  66. }
  67. /**
  68. * 从服务中去除一个设备
  69. */
  70. public void removeOne(CareDevice careDevice) {
  71. String clientId = careDevice.getClientId();
  72. if (StringUtils.isEmpty(clientId)) {
  73. return;
  74. }
  75. if (MqttDataConnectorPool.getMqttDataConnectorMap().get(clientId) != null) {
  76. MqttDataConnectorPool.getMqttDataConnectorMap().get(clientId).destroy();
  77. MqttDataConnectorPool.getMqttDataConnectorMap().remove(clientId);
  78. }
  79. }
  80. }