MqttConnectorPoolService.java 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package com.care.mqtt.service;
  2. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  3. import com.care.mqtt.tool.MqttDataConnector;
  4. import com.care.mqtt.tool.MqttMessageListener;
  5. import com.care.mqtt.tool.MqttDataConnectorCache;
  6. import com.care.common.entity.*;
  7. import com.care.common.enums.*;
  8. import com.care.common.service.*;
  9. import com.care.common.util.CommonConfUtil;
  10. import org.springframework.stereotype.Service;
  11. import org.springframework.util.StringUtils;
  12. import javax.annotation.Resource;
  13. import java.util.*;
  14. /**
  15. * @Author: stw
  16. * @Date: 2021/8/14
  17. * @Desc:
  18. */
  19. @Service
  20. public class MqttConnectorPoolService {
  21. @Resource
  22. private CareDeviceService careDeviceService;
  23. @Resource
  24. private MqttMsgService mqttMsgService;
  25. public void init() {
  26. QueryWrapper<CareDevice> queryWrapper = new QueryWrapper();
  27. queryWrapper.lambda().eq(CareDevice::getActiveStatus, DeviceActiveStatusEnum.ACTIVE.getValue());
  28. List<CareDevice> listCareDevice = careDeviceService.list(queryWrapper);
  29. for (CareDevice careDevice : listCareDevice) {
  30. addOne(careDevice);
  31. }
  32. }
  33. /**
  34. * 添加一个设备到服务
  35. */
  36. public MqttDataConnector addOne(CareDevice careDevice) {
  37. if (StringUtils.isEmpty(careDevice.getTopic()) || StringUtils.isEmpty(careDevice.getClientId())) {
  38. return null;
  39. }
  40. String cliendId = careDevice.getClientId();
  41. MqttDataConnector xbMqttDataConnector = MqttDataConnectorCache.getMqttDataConnectorMap().get(cliendId);
  42. if ( xbMqttDataConnector != null) {
  43. return xbMqttDataConnector;
  44. }
  45. xbMqttDataConnector = new MqttDataConnector();
  46. Map<String, String> configParams = new HashMap<>();
  47. String host = "ws://" + CommonConfUtil.getConf("mqtt.broker") + ":" + CommonConfUtil.getConf("mqtt.port");
  48. String username = CommonConfUtil.getConf("mqtt.username");
  49. String password = CommonConfUtil.getConf("mqtt.password");
  50. String topic = careDevice.getTopic();
  51. configParams.put("MQTT_HOST", host);
  52. configParams.put("MQTT_CLIENTID", cliendId);
  53. configParams.put("MQTT_USERNAME", username);
  54. configParams.put("MQTT_PASSWORD", password);
  55. configParams.put("CLEAN_SESSION", "false");
  56. try {
  57. xbMqttDataConnector.createIotDataSource(configParams);
  58. MqttMessageListener mqttMessageListener = new MqttMessageListener();
  59. mqttMessageListener.setMqttMsgService(mqttMsgService);
  60. xbMqttDataConnector.setReSubscribe(topic, mqttMessageListener);
  61. MqttDataConnectorCache.getMqttDataConnectorMap().put(cliendId, xbMqttDataConnector);
  62. return xbMqttDataConnector;
  63. } catch (Exception e) {
  64. e.printStackTrace();
  65. }
  66. return null;
  67. }
  68. /**
  69. * 从服务中去除一个设备
  70. */
  71. public void removeOne(CareDevice careDevice) {
  72. String clientId = careDevice.getClientId();
  73. if (StringUtils.isEmpty(clientId)) {
  74. return;
  75. }
  76. if (MqttDataConnectorCache.getMqttDataConnectorMap().get(clientId) != null) {
  77. MqttDataConnectorCache.getMqttDataConnectorMap().get(clientId).destroy();
  78. MqttDataConnectorCache.getMqttDataConnectorMap().remove(clientId);
  79. }
  80. }
  81. }