当前位置 博文首页 > 文章内容

    Native RabbitMQ Topic Exchange

    作者: 栏目:未分类 时间:2020-09-05 15:00:54

    本站于2023年9月4日。收到“大连君*****咨询有限公司”通知
    说我们IIS7站长博客,有一篇博文用了他们的图片。
    要求我们给他们一张图片6000元。要不然法院告我们

    为避免不必要的麻烦,IIS7站长博客,全站内容图片下架、并积极应诉
    博文内容全部不再显示,请需要相关资讯的站长朋友到必应搜索。谢谢!

    另祝:版权碰瓷诈骗团伙,早日弃暗投明。

    相关新闻:借版权之名、行诈骗之实,周某因犯诈骗罪被判处有期徒刑十一年六个月

    叹!百花齐放的时代,渐行渐远!



    RabbitMQ原生编程,Topic交换器。适用于平台给下游服务下发消息的业务场景,配合每个下游服务都有自己的vhost,实现消息隔离发送。

    生产者:

    /**
     * create by zhangjianbing
     * time 2020年9月1日
     */
    public class TopicProducer {
    
        public final static String EXCHANGE_NAME = "topic_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("10.231.63.137");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wallet-sit");
            connectionFactory.setPassword("d6VrayFvcJyfY2Th");
            connectionFactory.setVirtualHost("wallet-sit");
            // 创建连接
            Connection connection = connectionFactory.newConnection();
            // 创建信道
            Channel channel = connection.createChannel();
            // 在信道中设置交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
    
            String queueName1 = "LONGBALL.CALLBACK.QUEUE";
            String queueName2 = "WALLET.CALLBACK.QUEUE";
    
            String routeKey1 = "LONGBALL.ROUTEKEY";
            String routeKey2 = "WALLET.ROUTEKEY";
    
            channel.queueDeclare(queueName1, true, false, false, null);
            channel.queueDeclare(queueName2, true, false, false, null);
    
            channel.queueBind(queueName1, EXCHANGE_NAME, routeKey1);
            channel.queueBind(queueName2, EXCHANGE_NAME, routeKey2);
    
            // 消息
            String message1 = "hello rabbit mq a";
            // 发送消息
            channel.basicPublish(EXCHANGE_NAME, routeKey1, null, message1.getBytes());
    
            // 消息
            String message2 = "hello rabbit mq b";
            // 发送消息
            channel.basicPublish(EXCHANGE_NAME, routeKey2, null, message2.getBytes());
    
            // 关闭信道和连接
            channel.close();
            connection.close();
    
        }
    
    }
    

    消费者:

    /**
     * create by zhangjianbing
     * time 2020年9月1日
     */
    @SuppressWarnings("Duplicates")
    public class TopicConsumer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("10.231.63.137");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wallet-sit");
            connectionFactory.setPassword("d6VrayFvcJyfY2Th");
            connectionFactory.setVirtualHost("wallet-sit");
            // 创建连接
            Connection connection = connectionFactory.newConnection();
            // 创建信道
            Channel channel = connection.createChannel();
            System.out.println("正在等待消息。。。。。。");
            // 声明一个消费者
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    System.out.println("【路由键】:" + envelope.getRoutingKey() + "【消息内容】:" + new String(body, StandardCharsets.UTF_8));
                }
            };
    
            // 队列名称
            String queueName1 = "LONGBALL.CALLBACK.QUEUE";
            // 消费者正式开始在指定队列上消费[队列名称、自动提交、消费者]
            channel.basicConsume(queueName1, true, consumer);
    
    //        // 队列名称
    //        String queueName2 = "WALLET.CALLBACK.QUEUE";
    //        // 消费者正式开始在指定队列上消费[队列名称、自动提交、消费者]
    //        channel.basicConsume(queueName2, true, consumer);
    
    
        }