当前位置 博文首页 > 文章内容

    Java-Mqtt-ActiveMq(2)

    作者: 栏目:未分类 时间:2020-09-25 11:02:06

    本站于2023年9月4日。收到“大连君*****咨询有限公司”通知
    说我们IIS7站长博客,有一篇博文用了他们的图片。
    要求我们给他们一张图片6000元。要不然法院告我们

    为避免不必要的麻烦,IIS7站长博客,全站内容图片下架、并积极应诉
    博文内容全部不再显示,请需要相关资讯的站长朋友到必应搜索。谢谢!

    另祝:版权碰瓷诈骗团伙,早日弃暗投明。

    相关新闻:借版权之名、行诈骗之实,周某因犯诈骗罪被判处有期徒刑十一年六个月

    叹!百花齐放的时代,渐行渐远!



    Java-Mqtt-ActiveMq(2)

    不使用回调函数获取订阅信息

    1、maven

    <!-- https://mvnrepository.com/artifact/org.fusesource.hawtbuf/hawtbuf -->
            <dependency>
                <groupId>org.fusesource.hawtbuf</groupId>
                <artifactId>hawtbuf</artifactId>
                <version>1.9</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.fusesource.hawtdispatch/hawtdispatch -->
            <dependency>
                <groupId>org.fusesource.hawtdispatch</groupId>
                <artifactId>hawtdispatch</artifactId>
                <version>1.22</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.fusesource.hawtdispatch/hawtdispatch-transport -->
            <dependency>
                <groupId>org.fusesource.hawtdispatch</groupId>
                <artifactId>hawtdispatch-transport</artifactId>
                <version>1.22</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.fusesource.mqtt-client/mqtt-client -->
            <dependency>
                <groupId>org.fusesource.mqtt-client</groupId>
                <artifactId>mqtt-client</artifactId>
                <version>1.15</version>
            </dependency>

    2、code

    /**
     * Mqtt服务消费者(订阅)
     *
     * @author: zy
     * @date: 2020-09-23 10:17
     */
    @Component
    @Slf4j
    public class MqttConsumer {
    
        /**
         * ActiveMq服务地址
         */
        private static String MqUrl;
    
        @Value("${config.MQ-URL}")
        public void setMqUrl(String mqUrl) {
            MqUrl = mqUrl;
        }
    
        /**
         * ActiveMq客户端id
         */
        private static String MqClinetId;
    
        @Value("${config.MQ-SUB-CLIENTID}")
        public void setMqClinetId(String mqClinetId) {
            MqClinetId = mqClinetId;
        }
    
        /**
         * ActiveMq用户名
         */
        private static String MqUserName;
    
        @Value("${config.MQ-USERNAME}")
        public void setMqUserName(String mqUserName) {
            MqUserName = mqUserName;
        }
    
        /**
         * ActiveMq用户密码
         */
        private static String MqPassword;
    
        @Value("${config.MQ-PASSWORD}")
        public void setMqPassword(String mqPassword) {
            MqPassword = mqPassword;
        }
    
        /**
         * 连接前是否清空回话信息
         */
        private final static boolean CLEAN_START = true;
        /**
         * 低耗网络,但是又需要及时获取数据,心跳30s
         */
        private final static short KEEP_ALIVE = 30;
        /**
         * 设置重新连接次数
         */
        public final static long RECONNECTION_ATTEMPT_MAX = 6;
        /**
         * 设置重连时间间隔
         */
        public final static long RECONNECTION_DELAY = 2000;
        /**
         * 发送最大缓冲为2M
         */
        public final static int SEND_BUFFER_SIZE = 2 * 1024 * 1024;
    
    
        /**
         * Mqtt-订阅
         *
         * @param topicList 主题Topic列表
         */
        public static void doSub(List<String> topicList) {
    
            Topic[] topics;
            String jsonStr;
            List<Topic> topicArrayList = new ArrayList<>();
            for (String topic : topicList) {
                topicArrayList.add(new Topic(topic, QoS.AT_LEAST_ONCE));
            }
            topics = topicArrayList.toArray(new Topic[topicArrayList.size()]);
            MQTT mqtt = new MQTT();
    
            //1、创建MQTT对象
            try {
                // 设置mqtt broker的ip和端口
                mqtt.setHost(MqUrl);
                // 连接前清空会话信息
                mqtt.setCleanSession(CLEAN_START);
                // 设置重新连接的次数
                mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
                // 设置重连的间隔时间
                mqtt.setReconnectDelay(RECONNECTION_DELAY);
                // 设置心跳时间
                mqtt.setKeepAlive(KEEP_ALIVE);
                // 设置缓冲的大小
                mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
                //设置客户端id
                mqtt.setClientId(MqClinetId);
                mqtt.setUserName(MqUserName);
                mqtt.setPassword(MqPassword);
                final FutureConnection connection = mqtt.futureConnection();
                connection.connect();
                connection.subscribe(topics);
                while (true) {
                    Future<Message> futrueMessage = connection.receive();
                    Message message = futrueMessage.await();
                    log.info("----------" + "message-topic:" + message.getTopic() + "----------");
                    log.info("----------" + "message-payloadBuffer:" + message.getPayloadBuffer() + "----------");
                    String msg = String.valueOf(message.getPayloadBuffer());
    
                    if (msg.substring(0, 3).equals("hex")) {
                        msg = msg.substring(5);
                        byte[] bytes = Hex.decodeHex(msg.toCharArray());
                        jsonStr = new String(bytes, "GBK");
                        log.info("----------" + "message-jsonStr:" + jsonStr + "----------");
                    } else {
                        jsonStr = msg.substring(7);
                        log.info("----------" + "message-jsonStr:" + jsonStr + "----------");
                    }
                    // TODO 根据topic不同调用不同的服务、接口
    
    
                }
            } catch (URISyntaxException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
            }
        }
    
    
    }