• 售前

  • 售后

热门帖子
入门百科

【RabbitMQ】SpringBoot集成RabbitMQ项目实战:简单队列模式、三种互换机模

[复制链接]
yfyffuuy 显示全部楼层 发表于 2022-1-8 17:44:26 |阅读模式 打印 上一主题 下一主题
文章目录





    • 1 本文背景
    • 2 集成 RabbitMQ 简单消息队列模式


      • 2.1 步调
      • 2.2 测试效果
         
    • 3 集成 RabbitMQ Fanout 交换机模式


      • 3.1 步调
      • 3.2 测试效果
         
    • 4 集成 RabbitMQ Direct 交换机模式


      • 4.1 步调
      • 4.2 测试效果
         
    • 5 集成 RabbitMQ Topic 交换机模式


      • 5.1 步调
      • 5.2 测试效果
        


1 本文背景

近期在做一个电商秒杀项目,在服务优化的阶段用到了 RabbitMQ 这个消息中心件,让秒杀请求不再瞬时冲击秒杀接口,而是利用消息中心件来让请求如队列般列队而来。
2 集成 RabbitMQ 简单消息队列模式


2.1 步调

下面起首先容最底子的一种 SpringBoot 集成 RabbitMQ 简单消息队列模式的方式:添加 pom 依赖、 yml 配置、配置类、消息发送者、消息接收者
1、添加 pom 依赖,这里需要添加 amqp,amqp 是一个提供同一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中心件设计,基于此协议的客户端与消息中心件可传递消息,并不受客户端/中心件差别产品,差别的开发语言等条件的限制。
  1. <dependency>
  2.    <groupId>org.springframework.boot</groupId>
  3.    <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
复制代码
2、yaml 配置文件,各参数阐明已给出注释
  1. spring:
  2.         rabbitmq:
  3.         host: 192.168.10.132
  4.         username: guest
  5.         password: guest
  6.         # 虚拟主机
  7.         virtual-host: /
  8.         port: 5672
  9.         listener:
  10.           simple:
  11.             # 消费者最小数量
  12.             concurrency: 10
  13.             # 消费者最大数量
  14.             max-concurrency: 10
  15.             # 限制消费者每次只处理一条消息,处理完再继续下一条消息
  16.             prefetch: 1
  17.             # 启动时是否默认启动容器,默认true
  18.             auto-startup: true
  19.             # 被拒绝时重新进入队列
  20.             default-requeue-rejected: true
  21.         template:
  22.           retry:
  23.             # 发布重试,默认false
  24.             enabled: true
  25.             # 重试时间 默认1000ms
  26.             initial-interval: 1000
  27.             # 重试最大次数,默认3次
  28.             max-attempts: 3
  29.             # 重试最大间隔时间,默认10000ms
  30.             max-interval: 1000
  31.             # 重试间隔的乘数。比如配2.0 第一次等10s,第二次等20s,第三次等40s
  32.             multiplier: 1.0
复制代码
3、配置类
  1. @Configuration
  2. public class RabbitMQConfig {
  3.     @Bean
  4.     public Queue queue() {
  5.         return new Queue("queue", true);
  6.     }
  7. }
复制代码
4、消息生产者(发送者)
  1. @Service
  2. @Slf4j
  3. public class MQSender {
  4.     @Autowired
  5.     private RabbitTemplate rabbitTemplate;
  6.     public void send(Object msg) {
  7.         log.info("发送消息:" + msg);
  8.         rabbitTemplate.convertAndSend("queue", msg);
  9.     }
  10. }
复制代码
5、消息消费者(接收者)
  1. @Service
  2. @Slf4j
  3. public class MQReceiver {
  4.     @RabbitListener(queues = "queue")
  5.     public void receive(Object msg) {
  6.         log.info("接收消息:" + msg);
  7.     }
  8. }
复制代码
2.2 测试效果

这里写一个接口调用消息发送者举行消息的发送
  1. /**
  2. * 测试 rabbitmq
  3. */
  4. @RequestMapping("/mq")
  5. @ResponseBody
  6. public void mq() {
  7.     mqSender.send("Hello");
  8. }
复制代码
访问此请求

日记打印情况

rabbitmq_management UI 管控页面的情况

可见,根据上述测试情况,简单模式下,消息生产者与消息消费者已对接乐成。
3 集成 RabbitMQ Fanout 交换机模式

Fanout 交换机模式对应 RabbitMQ 消息队列的发布/订阅模式,此模式不处置惩罚路由键,只需要简单的将队里绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的全部队列上Fanout交换机转发消息是最快的。

3.1 步调

在上述 2 中的简单队列模式中修改配置类,改为两个队列、一个交换机
  1. @Configuration
  2. public class RabbitMQConfig {
  3.     private static final String QUEUE01 = "queue_fanout01";
  4.     private static final String QUEUE02 = "queue_fanout02";
  5.     private static final String EXCHANGE = "fanoutExchange";
  6.     @Bean
  7.     public Queue queue01() {
  8.         return new Queue(QUEUE01);
  9.     }
  10.     @Bean
  11.     public Queue queue02() {
  12.         return new Queue(QUEUE02);
  13.     }
  14.     @Bean
  15.     public FanoutExchange fanoutExchange() {
  16.         return new FanoutExchange(EXCHANGE);
  17.     }
  18.     @Bean
  19.     public Binding binding01() {
  20.         return BindingBuilder.bind(queue01()).to(fanoutExchange());
  21.     }
  22.     @Bean
  23.     public Binding binding02() {
  24.         return BindingBuilder.bind(queue02()).to(fanoutExchange());
  25.     }
  26. }
复制代码
消息发送方中,设置交换机名
  1. @Service
  2. @Slf4j
  3. public class MQSender {
  4.     @Autowired
  5.     private RabbitTemplate rabbitTemplate;
  6.     public void send(Object msg) {
  7.         log.info("发送消息:" + msg);
  8.         rabbitTemplate.convertAndSend("fanoutExchange", "", msg);
  9.     }
  10. }
复制代码
消息接收方中配置两个监听的接收者
  1. @Service
  2. @Slf4j
  3. public class MQReceiver {
  4.     @RabbitListener(queues = "queue_fanout01")
  5.     public void receive01(Object msg) {
  6.         log.info("QUEUE01接受消息:" + msg);
  7.     }
  8.     @RabbitListener(queues = "queue_fanout02")
  9.     public void receive02(Object msg) {
  10.         log.info("QUEUE02接受消息:" + msg);
  11.     }
  12. }
复制代码
3.2 测试效果

编写测试接口
  1. /**
  2. *fanout模式:测试 rabbitmq
  3. */
  4. @RequestMapping("/mq/fanout")
  5. @ResponseBody
  6. public void mq01() {
  7.     mqSender.send("Hello");
  8. }
复制代码
再次运行项目后,在 UI 管控页面即可看到我们自己注册的交换机

点开此交换机后也可看到我们绑定的两个队列

访问此请求举行 Fanout 模式交换机的测试

日记打印情况

rabbitmq_management UI 管控页面的情况

4 集成 RabbitMQ Direct 交换机模式

Direct 交换机模式对应 RabbitMQ 消息队列的路由模式,全部发送到Direct Exchange的消息被转发到RouteKey中指定的Queue,留意,Direct模式可以利用RabbitMQ自带的Exchange(default Exchange),所以不需要将Exchange举行任何绑定(binding)操纵,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被扬弃。重点:routing key与队列queues 的key保持同等,即可以路由到对应的queue中。

4.1 步调

配置类
  1. @Configuration
  2. public class RabbitMQConfigDirect {
  3.     private static final String QUEUE01 = "queue_direct01";
  4.     private static final String QUEUE02 = "queue_direct02";
  5.     private static final String EXCHANGE = "directExchange";
  6.     private static final String ROUTINGKEY01 = "queue.red";
  7.     private static final String ROUTINGKEY02 = "queue.green";
  8.     @Bean
  9.     public Queue queue01() {
  10.         return new Queue(QUEUE01);
  11.     }
  12.     @Bean
  13.     public Queue queue02() {
  14.         return new Queue(QUEUE02);
  15.     }
  16.     @Bean
  17.     public DirectExchange directExchange() {
  18.         return new DirectExchange(EXCHANGE);
  19.     }
  20.     @Bean
  21.     public Binding binding01() {
  22.         return BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01);
  23.     }
  24.     @Bean
  25.     public Binding binding02() {
  26.         return BindingBuilder.bind(queue02()).to(directExchange()).with(ROUTINGKEY02);
  27.     }
  28. }
复制代码
消息发送方
  1. @Service
  2. @Slf4j
  3. public class MQSender {
  4.     @Autowired
  5.     private RabbitTemplate rabbitTemplate;
  6.     public void send01(Object msg) {
  7.         log.info("发送red消息:" + msg);
  8.         rabbitTemplate.convertAndSend("directExchange", "queue.red", msg);
  9.     }
  10.     public void send02(Object msg) {
  11.         log.info("发送green消息:" + msg);
  12.         rabbitTemplate.convertAndSend("directExchange", "queue.green", msg);
  13.     }
  14. }
复制代码
消息接收方
  1. @Service
  2. @Slf4j
  3. public class MQReceiver {
  4.     @RabbitListener(queues = "queue_direct01")
  5.     public void receive01(Object msg) {
  6.         log.info("QUEUE01接受消息:" + msg);
  7.     }
  8.     @RabbitListener(queues = "queue_direct02")
  9.     public void receive02(Object msg) {
  10.         log.info("QUEUE02接受消息:" + msg);
  11.     }
  12. }
复制代码
4.2 测试效果

编写测试接口
  1. /**
  2. * 测试 RabbitMQ
  3. */
  4. @RequestMapping("/mq/direct01")
  5. @ResponseBody
  6. public void mq01() {
  7.     mqSender.send01("Hello,Red");
  8. }
  9. /**
  10. * 测试 RabbitMQ
  11. */
  12. @RequestMapping("/mq/direct02")
  13. @ResponseBody
  14. public void mq02() {
  15.     mqSender.send02("Hello,Green");
  16. }
复制代码
访问测试请求


5 集成 RabbitMQ Topic 交换机模式

Topic 交换机模式对应 RabbitMQ 消息队列的主题模式,全部发送到Topic Exchange的消息被转发到全部管线RouteKey中指定Topic的Queue上Exchange将RouteKey和某Topic举行模糊匹配,此时队列需要绑定一个Topic,对于routing key匹配模式界说规则举比方下:

5.1 步调

配置类
  1. @Configuration
  2. public class RabbitMQConfigTopic {
  3.     private static final String QUEUE01 = "queue_topic01";
  4.     private static final String QUEUE02 = "queue_topic02";
  5.     private static final String EXCHANGE = "topicExchange";
  6.     private static final String ROUTINGKEY01 = "#.queue.#";
  7.     private static final String ROUTINGKEY02 = "*.queue.#";
  8.     @Bean
  9.     public Queue queue01() {
  10.         return new Queue(QUEUE01);
  11.     }
  12.     @Bean
  13.     public Queue queue02() {
  14.         return new Queue(QUEUE02);
  15.     }
  16.     @Bean
  17.     public TopicExchange topicExchange() {
  18.         return new TopicExchange(EXCHANGE);
  19.     }
  20.     @Bean
  21.     public Binding binding01() {
  22.         return BindingBuilder.bind(queue01()).to(topicExchange()).with(ROUTINGKEY01);
  23.     }
  24.     @Bean
  25.     public Binding binding02() {
  26.         return BindingBuilder.bind(queue02()).to(topicExchange()).with(ROUTINGKEY02);
  27.     }
  28. }
复制代码
消息发送方
  1. @Service
  2. @Slf4j
  3. public class MQSender {
  4.     @Autowired
  5.     private RabbitTemplate rabbitTemplate;
  6.     public void send01(Object msg) {
  7.         log.info("发送消息(被01队列接受):" + msg);
  8.         rabbitTemplate.convertAndSend("topicExchange", "queue.red.message", msg);
  9.     }
  10.     public void send02(Object msg) {
  11.         log.info("发送消息(被两个queue接受):" + msg);
  12.         rabbitTemplate.convertAndSend("topicExchange", "message.queue.green.abc", msg);
  13.     }
  14. }
复制代码
消息接收方
  1. @Service
  2. @Slf4j
  3. public class MQReceiver {
  4.     @RabbitListener(queues = "queue_topic01")
  5.     public void receive01(Object msg) {
  6.         log.info("QUEUE01接受消息:" + msg);
  7.     }
  8.     @RabbitListener(queues = "queue_topic02")
  9.     public void receive02(Object msg) {
  10.         log.info("QUEUE02接受消息:" + msg);
  11.     }
  12. }
复制代码
5.2 测试效果

编写测试接口
  1. /**
  2. * 测试 RabbitMQ
  3. */
  4. @RequestMapping("/mq/topic01")
  5. @ResponseBody
  6. public void mq01() {
  7.     mqSender.send01("Hello,Red");
  8. }
  9. /**
  10. * 测试 RabbitMQ
  11. */
  12. @RequestMapping("/mq/topic02")
  13. @ResponseBody
  14. public void mq02() {
  15.     mqSender.send02("Hello,Green");
  16. }
复制代码
访问测试请求


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作