package com.care.mqtt.service; import cn.hutool.core.bean.BeanUtil; import com.care.common.cache.RedisUtil; import com.care.common.constant.Constants; import com.care.common.util.CommonConfUtil; import com.care.mqtt.tool.MqttDataConnector; import com.care.common.entity.CareDevice; import com.care.mqtt.tool.MqttDataConnectorPool; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import javax.annotation.Resource; /** * @Author: stw * @Date: 2021/8/14 * @Desc: */ @Service public class MqttPublishService { @Resource private MqttConnectorPoolService mqttConnectorPoolService; @Resource private RedisUtil redisUtil; /** * 发布消息 */ public boolean publish(CareDevice careDevice,String data,String devCode,String roomType) { try { CareDevice careDevicePublish = new CareDevice(); BeanUtil.copyProperties(careDevice,careDevicePublish); String topic = careDevicePublish.getTopic(); String cliendId = careDevicePublish.getClientId(); if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(cliendId)) { return false; } String publishTopic = topic.replace("event","control"); String publishCliendId = cliendId + "_publish"; careDevicePublish.setTopic(publishTopic); careDevicePublish.setClientId(publishCliendId); MqttDataConnector mqttDataConnector = mqttConnectorPoolService.addOne(careDevicePublish); mqttDataConnector.publish(publishTopic, data); //往redis里设置一个超时时钟,超时之后还没返回就认为超时了 Long timeout = Long.parseLong(CommonConfUtil.getConf("mqtt.dev.jihuo.timeout")); redisUtil.set(Constants.MQTT_DEV_JIHUO_TIMEOUT_KEY + ":" + devCode + ":" + roomType,"", timeout); return true; } catch (Exception e) { e.printStackTrace(); return false; } } }