Przeglądaj źródła

增加日志打印

lilt085163@126.com 3 lat temu
rodzic
commit
3790f33612

+ 3 - 0
src/main/java/com/care/mqtt/service/MqttMsgService.java

@@ -184,7 +184,9 @@ public class MqttMsgService {
                         //计算当前和最近之间时间,若超过1秒,删除之前的存储的,重新计数存储当前的
                         String lastContent = (String)redisUtil.hget(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO,lastIndex + "@" + careDevice.getDevCode() + "_" + tid );
                         long duration = (date.getTime() - Long.parseLong(lastContent.split("_")[0]));
+                        logger.debug("lastIndex:{},duration:{}",lastIndex,duration);
                         if (duration > 3000) { //超过1秒,删除之前的存储的,从头开始
+                            logger.debug("超过3000,lastIndex:{},duration:{}",lastIndex,duration);
                             delBeforeContent(lastIndex,careDevice.getDevCode(),tid);
                             redisUtil.hset(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO, 1 + "@" + careDevice.getDevCode() + "_" + tid , System.currentTimeMillis() +  "_" + x + "_" + y, RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_TIME);
                             redisUtil.hset(RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_LAST_INDEX, careDevice.getDevCode() + "_" + tid , 1 , RedisKeyConstant.PEOPLEEVENT_ACTIVE_INFO_ACTIVE_LAST_INDEX_TIME);
@@ -196,6 +198,7 @@ public class MqttMsgService {
                                 int currIndex = lastIndex + 1;//当前是第10个或以上 开始计算
                                 //求前5个点平均坐标和后5个点的平均坐标,二者之间的距离即结果
                                 double distance = getDistance(currIndex,careDevice.getDevCode(),tid);
+                                logger.debug("计算结果:lastIndex:{},duration:{},distance{}",lastIndex,duration,distance);
                                 if(distance > 0.3){
                                     moveState = DeviceMoveStateEnum.MOVE.getValue();
                                     //存储行走距离明细表

+ 0 - 1
src/main/java/com/care/mqtt/tool/MqttMessageListener.java

@@ -1,7 +1,6 @@
 package com.care.mqtt.tool;
 
 
-import com.care.common.constant.Constants;
 import com.care.mqtt.service.MqttMsgInstallService;
 import com.care.mqtt.service.MqttMsgService;
 import org.eclipse.paho.client.mqttv3.IMqttMessageListener;

+ 233 - 0
src/test/java/com/mqtt/MqttDataConnectorTest.java

@@ -0,0 +1,233 @@
+package com.mqtt;
+
+import cn.hutool.core.util.IdUtil;
+import com.care.mqtt.tool.MqttMessageListener;
+import com.care.mqtt.tool.SslUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.eclipse.paho.client.mqttv3.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class MqttDataConnectorTest {
+
+
+
+    private MqttClient mqttClient;
+    private MqttConnectOptions mqttConnectOptions;
+
+
+
+    /**
+     * 创建连接
+     * @param configParams
+     * @throws Exception
+     */
+    public void  createIotDataSource(Map<String, String> configParams) throws  Exception {
+        //访问ip:port  tcp://   or  ssl://
+        String host = configParams.get("MQTT_HOST");
+        //客户端id 保持唯一
+        String clientId = configParams.get("MQTT_CLIENTID");
+        //用户名
+        String userName = configParams.get("MQTT_USERNAME");
+        //密码
+        String password = configParams.get("MQTT_PASSWORD");
+        //ssl  验证时是双向验证还是单向验证
+        String sslType = configParams.get("MQTT_SSLTYPE");
+
+        String cleanSession = configParams.get("CLEAN_SESSION");
+
+        //缓存两种模式 存在内存 文件  设置成null 缓存在内存中 最多缓存65535条信息
+        //ScheduledExecutorService 可以设置线程池大小 默认10;发布消息方法是异步的
+        this.mqttClient = new MqttClient(host,clientId,null);//new MqttDefaultFilePersistence()
+        mqttConnectOptions = new MqttConnectOptions();
+        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
+        // 这里设置为true表示每次连接到服务器都以新的身份连接
+        mqttConnectOptions.setCleanSession(Boolean.parseBoolean(cleanSession));
+        // mqttConnectOptions.sto
+        // 设置超时时间 s
+        mqttConnectOptions.setConnectionTimeout(30);
+        // 设置会话心跳时间
+        mqttConnectOptions.setKeepAliveInterval(60);
+        mqttConnectOptions.setAutomaticReconnect(true);//设置自动重连
+        //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
+        // mqttConnectOptions.setWill("sec", "close".getBytes(), 2, true);
+        if("two".equals(sslType)){
+            //服务端证书路径
+            String rootCrtPath = configParams.get("MQTT_SSLROOT_CRTPATH");
+            //客户端证书路径
+            String clientCrtPath = configParams.get("MQTT_SSLCLIENT_CRTPATH");
+            //客户端密匙路径
+            String clientKeyPath = configParams.get("MQTT_SSLCLIENT_KEYPATH");
+            //密匙加密密码
+            String clientPassword = configParams.get("MQTT_SSLPASSWORD");
+            //ssl 协议版本 一般看mqtt服务端broker设置  不设置默认为TLSv1.1
+            String sslProtocol = configParams.get("MQTT_SSLPROTOCOL");
+            mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactory(rootCrtPath, clientCrtPath, clientKeyPath, clientPassword,sslProtocol));
+
+        }else if("one".equals(sslType)){
+            String rootCrtPath = configParams.get("MQTT_SSLROOT_CRTPATH");
+            String sslProtocol = configParams.get("MQTT_SSLPROTOCOL");
+
+            mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactorySingle(rootCrtPath,sslProtocol));
+        }
+        if(StringUtils.isNotBlank(userName)){
+            mqttConnectOptions.setUserName(userName);
+        }
+        if (StringUtils.isNotBlank(password)){
+            mqttConnectOptions.setPassword(password.toCharArray());
+        }
+        //mqttConnectOptions.setWill();   //可以设置断线发送、接收消息
+        mqttClient.connect(mqttConnectOptions);
+        log.info("mqtt 连接成功!!!");
+
+
+    }
+
+
+
+    /**
+     *
+     * testTopic/#   #多层通配符    +单层通配符
+     * topicFilter
+     * @param topicFilter
+     * @param iotMqttMessageListener
+     * @throws Exception
+     */
+    public void setReSubscribe(String topicFilter, MqttMessageListenerTest iotMqttMessageListener) throws Exception {
+        int qos = 2;//0:最多一次 、1:最少一次 、2:只有一次
+        mqttClient.subscribe(topicFilter,qos, iotMqttMessageListener);
+        mqttClient.setCallback(new MqttCallbackExtended() {
+            @Override
+            public void connectComplete(boolean reconnect, String serverURI) {
+                log.info("topic reconnect ---> " + reconnect + "       serverURI--->" + serverURI);
+            }
+
+            @Override
+            public void connectionLost(Throwable cause) {
+                log.error("{}订阅丢失 connectionLost cause ---> {}"  ,topicFilter,cause.getMessage());
+            }
+
+            @Override
+            public void messageArrived(String topic, MqttMessage message) throws Exception {
+                log.debug("{}接收到新消息 ---> {}"  ,topic,message.toString());
+            }
+
+            @Override
+            public void deliveryComplete(IMqttDeliveryToken token) {
+                try {
+                    log.error("消息已发送 ---> {}"  ,token.getMessage().toString());
+                } catch (MqttException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+
+    /**
+     * 取消订阅
+     *
+     * @param topics
+     * @throws Exception
+     */
+    public void unsubscribe(String[] topics)  throws Exception{
+        mqttClient.unsubscribe(topics);
+
+
+    }
+    /**
+     * 是否处于连接状态
+     * @return
+     */
+    public boolean isConnected() {
+        return mqttClient.isConnected();
+    }
+
+
+    /**
+     * 发布数据
+     * @param
+     * @param data
+     * @throws Exception
+     */
+    public void publish(String publicLiveTopic, String data)   throws Exception{
+        MqttMessage mqttMessage = new MqttMessage();
+        mqttMessage.setPayload(data.getBytes("UTF-8"));
+        //QoS:发布消息的服务质量,即:保证消息传递的次数(消费者收到的次数)
+        //0:最多一次,即:<=1;每个消息只发一次,也不会缓存下来。
+        //1:至少一次,即:>=1;一直发送确保消费者至少收到一次,发送失败会缓存下来。
+        //2:一次,即:=1       一直发送确保消费者只能收到一次;发送失败会缓存下来 。
+        mqttMessage.setQos(0);
+        //消费者断开连接后是否接受离线消息
+        mqttMessage.setRetained(false);
+        log.info("topic:{} send  content {} ",publicLiveTopic,data);
+        mqttClient.publish(publicLiveTopic,mqttMessage);
+        log.info("topic:{} send  dataSize {}kb ",publicLiveTopic,data.length()/1024.0);
+
+    }
+
+    /**
+     * 断开连接
+     */
+    public void destroy() {
+        try {
+            this.mqttClient.disconnect();
+            log.info("mqtt 手动断开连接!!!");
+        } catch (MqttException e) {
+            //e.printStackTrace();
+            log.error("手动断开连接报错error={}",e.getMessage());
+        }
+    }
+
+
+
+    /**
+     * 设置连接
+     * @param args
+     */
+    public static void main(String[] args) {
+        MqttDataConnectorTest xbMqttDataConnector =new MqttDataConnectorTest();
+        Map<String,String> configParams =new HashMap<>();
+        String host="ws://49.233.41.108:8083";
+        String MQTT_CLIENTID = IdUtil.fastUUID();
+        configParams.put("MQTT_HOST",host);
+        configParams.put("MQTT_CLIENTID",MQTT_CLIENTID);
+        configParams.put("MQTT_USERNAME","shangxingkeji");
+        configParams.put("MQTT_PASSWORD","sxkj@1234");
+        configParams.put("CLEAN_SESSION", "false");
+        String[] msgArray = new String[]{
+                 "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":3,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                 "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":4,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                 "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":5,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":6,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":7,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":8,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":9,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":10,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":11,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":12,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":18,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":19,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":13,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":14,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+                "{\"type\":\"PeopleEvent\",\"mdid\":\"wd1\",\"msg\":{\"ts\":970408046,\"amount\":1,\"targets\":[{\"tid\":11721,\"cls\":1,\"posture\":1,\"x\":15,\"y\":1,\"z\":0.83149576,\"length\":1.7,\"width\":0.6,\"thick\":0.35}]}}",
+        };
+        try {
+
+            xbMqttDataConnector.createIotDataSource(configParams);
+            MqttMessageListenerTest mqttMessageListener=new MqttMessageListenerTest();
+            xbMqttDataConnector.setReSubscribe("/shangxingkeji/test2/event", mqttMessageListener);
+            for (int i=0;i<msgArray.length;i++){
+                xbMqttDataConnector.publish("/shangxingkeji/test2/event",msgArray[i]);
+                Thread.sleep(100);
+            }
+            Thread.sleep(10000);
+            xbMqttDataConnector.destroy();
+        }catch ( Exception e) {
+            e.printStackTrace();
+        }
+
+    }
+}

+ 32 - 0
src/test/java/com/mqtt/MqttMessageListenerTest.java

@@ -0,0 +1,32 @@
+package com.mqtt;
+
+
+import com.care.mqtt.service.MqttMsgInstallService;
+import com.care.mqtt.service.MqttMsgService;
+import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * 订阅信息监听类
+ */
+public class MqttMessageListenerTest implements IMqttMessageListener {
+    private static Logger logger = LoggerFactory.getLogger(MqttMsgService.class);
+
+    @Override
+    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
+//        System.out.println(mqttMessage.getId());
+//        System.out.println(mqttMessage.getPayload());
+//        System.out.println(mqttMessage.getQos());
+//        System.out.println(mqttMessage.isRetained());
+//        System.out.println(mqttMessage.isDuplicate());
+        String mqttMessageStr = mqttMessage.toString();
+        //logger.info("mqttMessageStr == " + mqttMessageStr);
+        logger.info("接收到的消息"+mqttMessageStr);
+
+    }
+
+
+}