博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot rabbitmq操作
阅读量:7098 次
发布时间:2019-06-28

本文共 6004 字,大约阅读时间需要 20 分钟。

hot3.png

 

一、基本配置

spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest

二、DirectExchange模式

@Configurationpublic class DirectExchangeConfig {    public static final String QUEUE_NAME = "email_queue";    public static final String EXCHANGE_NAME = "email_exchange";    @Bean(name = "email_queue")    public Queue directQuery() {        return new Queue(QUEUE_NAME);    }    @Bean(name = "email_exchange")    public DirectExchange directExchange() {        return new DirectExchange(EXCHANGE_NAME);    }    @Bean    public Binding directBinding(@Qualifier("email_queue") Queue query, @Qualifier("email_exchange") DirectExchange directExchange) {        return BindingBuilder.bind(query).to(directExchange).with(QUEUE_NAME);    }}@Componentpublic class HelloSender {    @Autowired    private AmqpTemplate template;    public void send(String value) {        template.convertAndSend("queue", value);    }    public void sendDirect(String value) {        template.convertAndSend(DirectExchangeConfig.EXCHANGE_NAME, DirectExchangeConfig.QUEUE_NAME, value);    }    public void sendDelayMessage(String value) {        for (int i = 0; i < 100; i++) {            template.convertAndSend(TTLDirectExchangeConfig.QUEUE_TTL_NAME, (Object) value, new DelayMessagePostProcessor(i * 1000));        }    }}@Componentpublic class HelloReceive {    @RabbitListener(queues = "queue")    //监听器监听指定的Queue    public void processC(String str) {        System.out.println("Receive:" + str);    }    @RabbitListener(queues = DirectExchangeConfig.QUEUE_NAME)    //监听器监听指定的Queue    public void processCD(String str) {        System.out.println("Receive CD:" + str);    }    @RabbitListener(queues = TTLDirectExchangeConfig.QUEUE_NAME)    //监听器监听指定的Queue    public void processCDelay(String str) {        System.out.println("Receive processCDelay:" + str);    }}

延迟队列配置

@Configurationpublic class TTLDirectExchangeConfig {    public static final String QUEUE_NAME = "sms_queue";    public static final String EXCHANGE_NAME = "sms_exchange";    public static final String QUEUE_TTL_NAME = "sms_queue_ttl";    @Bean(name = "sms_queue")    public Queue directQuery() {        return new Queue(QUEUE_NAME);    }    @Bean    public Queue smsQueueDelayPerMessageTTL() {        return QueueBuilder.durable(QUEUE_TTL_NAME)                .withArgument("x-dead-letter-exchange", EXCHANGE_NAME)                .withArgument("x-dead-letter-routing-key", QUEUE_NAME)                .build();    }    @Bean(name = "sms_exchange")    public DirectExchange directExchange() {        return new DirectExchange(EXCHANGE_NAME);    }    @Bean    public Binding directBinding(@Qualifier("sms_queue") Queue query, @Qualifier("sms_exchange") DirectExchange directExchange) {        return BindingBuilder.bind(query).to(directExchange).with(QUEUE_NAME);    }}public class DelayMessagePostProcessor implements MessagePostProcessor {    private long ttl = 0L;    public DelayMessagePostProcessor(long ttl) {        this.ttl = ttl;    }    @Override    public Message postProcessMessage(Message message) throws AmqpException {        message.getMessageProperties().setExpiration(Long.toString(ttl));        return message;    }}@Componentpublic class HelloSender {    @Autowired    private AmqpTemplate template;    public void send(String value) {        template.convertAndSend("queue", value);    }    public void sendDirect(String value) {        template.convertAndSend(DirectExchangeConfig.EXCHANGE_NAME, DirectExchangeConfig.QUEUE_NAME, value);    }    public void sendDelayMessage(String value) {        for (int i = 0; i < 100; i++) {            template.convertAndSend(TTLDirectExchangeConfig.QUEUE_TTL_NAME, (Object) value, new DelayMessagePostProcessor(i * 1000));        }    }}@Componentpublic class HelloReceive {    @RabbitListener(queues = "queue")    //监听器监听指定的Queue    public void processC(String str) {        System.out.println("Receive:" + str);    }    @RabbitListener(queues = DirectExchangeConfig.QUEUE_NAME)    //监听器监听指定的Queue    public void processCD(String str) {        System.out.println("Receive CD:" + str);    }    @RabbitListener(queues = TTLDirectExchangeConfig.QUEUE_NAME)    //监听器监听指定的Queue    public void processCDelay(String str) {        System.out.println("Receive processCDelay:" + str);    }}

基本介绍

在上上篇博客中已经简单说了下Exchange是干什么的,在message到达Exchange后,Exchange会根据route规则进入对应的Queue中,message可能进入一个Queue也可能进入对应多个Queue,至于进入哪个Queue或者是说哪个Queue都不进入,这要依据ExChange的ExchangeType和Exchange所绑定的路由规则,实现AMQP0.9.1协议的RabbitMQ Broker提供了四种ExChangeType。

        这四种ExchangeType分别是Direct exchange,Fanout exchange,Topic exchange和Headers exchange。这四种累的exchange分别有以下一些属性,分别是:

        name:名称

        Durability:持久化标志,如果为true,则表明此exchange是持久化的。

        Auto-delete:删除标志,当所有队列在完成使用此exchange时,是否删除

        Arguments:这个暂时不清楚

       从上面Exchange的属性来看,一个Exchange可能是持久化的,也有可能不需要持久化,这还得看具体的使用情况,下面就来分别介绍下这四种ExchangeType。

 

Direct Exchange:

       实现方式如下:

                   

        DirectExchange是RabbitMQ Broker的默认Exchange,它有一个特别的属性对一些简单的应用来说是非常有用的,在使用这个类型的Exchange时,可以不必指定routing key的名字,在此类型下创建的Queue有一个默认的routing key,这个routing key一般同Queue同名。

        适用场景:

        这种类型的Exchange,通常是将同一个message以一种循环的方式分发到不同的Queue,即不同的消费者手中,使用这种方式,值得注意的是message在消费者之间做了一个均衡,而不是说message在Queues之间做了均衡。

 

Fanout Exchange:

         实现方式如下:

                  

         使用这种类型的Exchange,会忽略routing key的存在,直接将message广播到所有的Queue中。

         适用场景:

                第一:大型玩家在玩在线游戏的时候,可以用它来广播重大消息。这让我想到电影微微一笑很倾城中,有款游戏需要在世界上公布玩家重大消息,也许这个就是用的MQ实现的。这让我不禁佩服肖奈,人家在大学的时候就知道RabbitMQ的这种特性了。

                第二:体育新闻实时更新到手机客户端。

                第三:群聊功能,广播消息给当前群聊中的所有人。

 

Topic Exchange:

       实现方式如下:

         

       Topic Exchange是根据routing key和Exchange的类型将message发送到一个或者多个Queue中,我们经常拿他来实现各种publish/subscribe,即发布订阅,这也是我们经常使用到的ExchangeType。

        使用场景:

               新闻的分类更新

               同意任务多个工作者协调完成

               同一问题需要特定人员知晓

         Topic Exchange的使用场景很多,我们公司就在使用这种模式,将足球事件信息发布,需要使用这些事件消息的人只需要绑定对应的Exchange就可以获取最新消息。

 

Headers Exchange:

        实现方式如下:

            

         Headers Exchange不同于上面三种Exchange,它是根据Message的一些头部信息来分发过滤Message,忽略routing key的属性,如果Header信息和message消息的头信息相匹配,那么这条消息就匹配上了。

        关于Headers Exchange我知道的并不多,但是这篇博客,我会在我持续深入理解RabbitMQ的基础上不断调整和更新,若有什么地方理解偏差,还请大家一起讨论。越来越体会到看英文资料的重要性。

转载于:https://my.oschina.net/fellowtraveler/blog/1974680

你可能感兴趣的文章
差分信号(Differential Signal)
查看>>
Aix项目_shell_rsh_01
查看>>
HDU 5726 GCD 求给定序列中与查询段相等的GCD个数
查看>>
python实训第四天
查看>>
5-4-3原则
查看>>
html图像入门
查看>>
C# Mongo Client 2.4.2创建索引
查看>>
我的第四个网页制作:列表标签
查看>>
【python进阶】详解元类及其应用2
查看>>
简单实用的菜单栏
查看>>
AMap行政区查询服务
查看>>
SpringBoot2.0源码分析(一):SpringBoot简单分析
查看>>
Java,net上的几篇文章
查看>>
Chrome的Awesome Screenshot的插件离线下载
查看>>
改变self.navigationItem的显示标题
查看>>
Revit2014机电系统类型BUG
查看>>
函数指针
查看>>
数学图形之Boy surface
查看>>
Objective-C中把数组中字典中的数据转换成URL
查看>>
mysqld: unrecognized service
查看>>