package com.care.mqtt.service; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.care.mqtt.tool.MqttDataConnector; import com.care.mqtt.tool.MqttMessageListener; import com.care.mqtt.tool.MqttDataConnectorPool; import com.care.common.entity.*; import com.care.common.enums.*; import com.care.common.service.*; import com.care.common.util.CommonConfUtil; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.util.*; /** * @Author: stw * @Date: 2021/8/14 * @Desc: */ @Service public class MqttConnectorPoolService { @Resource private CareDeviceService careDeviceService; @Resource private MqttMsgService mqttMsgService; public void init() { QueryWrapper queryWrapper = new QueryWrapper(); queryWrapper.lambda().eq(CareDevice::getActiveStatus, DeviceActiveStatusEnum.ACTIVE.getValue()); List listCareDevice = careDeviceService.list(queryWrapper); for (CareDevice careDevice : listCareDevice) { addOne(careDevice); } } /** * 添加一个设备到服务 */ public MqttDataConnector addOne(CareDevice careDevice) { if (StringUtils.isEmpty(careDevice.getTopic()) || StringUtils.isEmpty(careDevice.getClientId())) { return null; } String cliendId = careDevice.getClientId(); MqttDataConnector xbMqttDataConnector = MqttDataConnectorPool.getMqttDataConnectorMap().get(cliendId); if ( xbMqttDataConnector != null) { return xbMqttDataConnector; } xbMqttDataConnector = new MqttDataConnector(); Map configParams = new HashMap<>(); String host = "ws://" + CommonConfUtil.getConf("mqtt.broker") + ":" + CommonConfUtil.getConf("mqtt.port"); String username = CommonConfUtil.getConf("mqtt.username"); String password = CommonConfUtil.getConf("mqtt.password"); String topic = careDevice.getTopic(); configParams.put("MQTT_HOST", host); configParams.put("MQTT_CLIENTID", cliendId); configParams.put("MQTT_USERNAME", username); configParams.put("MQTT_PASSWORD", password); try { xbMqttDataConnector.createIotDataSource(configParams); MqttMessageListener mqttMessageListener = new MqttMessageListener(); mqttMessageListener.setMqttMsgService(mqttMsgService); xbMqttDataConnector.setReSubscribe(topic, mqttMessageListener); MqttDataConnectorPool.getMqttDataConnectorMap().put(cliendId, xbMqttDataConnector); return xbMqttDataConnector; } catch (Exception e) { e.printStackTrace(); } return null; } /** * 从服务中去除一个设备 */ public void removeOne(CareDevice careDevice) { String clientId = careDevice.getClientId(); if (StringUtils.isEmpty(clientId)) { return; } if (MqttDataConnectorPool.getMqttDataConnectorMap().get(clientId) != null) { MqttDataConnectorPool.getMqttDataConnectorMap().get(clientId).destroy(); MqttDataConnectorPool.getMqttDataConnectorMap().remove(clientId); } } }