12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- package com.care.mqtt.service;
- import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
- import com.care.mqtt.tool.MqttDataConnector;
- import com.care.mqtt.tool.MqttMessageListener;
- import com.care.mqtt.tool.MqttDataConnectorPool;
- import com.care.common.entity.*;
- import com.care.common.enums.*;
- import com.care.common.service.*;
- import com.care.common.util.CommonConfUtil;
- import org.springframework.stereotype.Service;
- import org.springframework.util.StringUtils;
- import javax.annotation.Resource;
- import java.util.*;
- /**
- * @Author: stw
- * @Date: 2021/8/14
- * @Desc:
- */
- @Service
- public class MqttConnectorPoolService {
- @Resource
- private CareDeviceService careDeviceService;
- @Resource
- private MqttMsgService mqttMsgService;
- public void init() {
- QueryWrapper<CareDevice> queryWrapper = new QueryWrapper();
- queryWrapper.lambda().eq(CareDevice::getActiveStatus, DeviceActiveStatusEnum.ACTIVE.getValue());
- List<CareDevice> listCareDevice = careDeviceService.list(queryWrapper);
- for (CareDevice careDevice : listCareDevice) {
- addOne(careDevice);
- }
- }
- /**
- * 添加一个设备到服务
- */
- public MqttDataConnector addOne(CareDevice careDevice) {
- if (StringUtils.isEmpty(careDevice.getTopic()) || StringUtils.isEmpty(careDevice.getClientId())) {
- return null;
- }
- String cliendId = careDevice.getClientId();
- MqttDataConnector xbMqttDataConnector = MqttDataConnectorPool.getMqttDataConnectorMap().get(cliendId);
- if ( xbMqttDataConnector != null) {
- return xbMqttDataConnector;
- }
- xbMqttDataConnector = new MqttDataConnector();
- Map<String, String> configParams = new HashMap<>();
- String host = "ws://" + CommonConfUtil.getConf("mqtt.broker") + ":" + CommonConfUtil.getConf("mqtt.port");
- String username = CommonConfUtil.getConf("mqtt.username");
- String password = CommonConfUtil.getConf("mqtt.password");
- String topic = careDevice.getTopic();
- configParams.put("MQTT_HOST", host);
- configParams.put("MQTT_CLIENTID", cliendId);
- configParams.put("MQTT_USERNAME", username);
- configParams.put("MQTT_PASSWORD", password);
- try {
- xbMqttDataConnector.createIotDataSource(configParams);
- MqttMessageListener mqttMessageListener = new MqttMessageListener();
- mqttMessageListener.setMqttMsgService(mqttMsgService);
- xbMqttDataConnector.setReSubscribe(topic, mqttMessageListener);
- MqttDataConnectorPool.getMqttDataConnectorMap().put(cliendId, xbMqttDataConnector);
- return xbMqttDataConnector;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
- /**
- * 从服务中去除一个设备
- */
- public void removeOne(CareDevice careDevice) {
- String clientId = careDevice.getClientId();
- if (StringUtils.isEmpty(clientId)) {
- return;
- }
- if (MqttDataConnectorPool.getMqttDataConnectorMap().get(clientId) != null) {
- MqttDataConnectorPool.getMqttDataConnectorMap().get(clientId).destroy();
- MqttDataConnectorPool.getMqttDataConnectorMap().remove(clientId);
- }
- }
- }
|