• 售前

  • 售后

热门帖子
入门百科

RabbitMQ高级

[复制链接]
醉于山水 显示全部楼层 发表于 2022-1-16 20:10:49 |阅读模式 打印 上一主题 下一主题
RabbitMQ高级部门



RabbitMQ高级

消息丢失

消息应答机制

自动应答

  1. channel.basicConsume(QUEUE_NAME,true,consumer);
复制代码
手动应答


第二个参数: Multiple代表是否批量应答。
  1. channel.basicAck(envelope.getDeliveryTag(),false); # 肯定确认
  2. channel.basicNack(envelope.getDeliveryTag(),false); # 否定确认
  3. channel.basicReject(envelope.getDeliveryTag()); # 否定确认
复制代码
消息自动重新入队

消息在手动应答时,斲丧者宕机,消息不会丢失,会放到队列中重新斲丧。
消息恒久化

队列恒久化

  1. channel.queueDeclare(QUEUE_NAME,true,false,false,null);
复制代码
消息恒久化

  1. channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
复制代码
将消息标志为恒久化并不能完全包管不会丢失消息,假如须要更强有力的恒久化计谋,请参考发布确认章节。
不公中分发

意思就是假如这个使命我还没有处理惩罚完大概我还没有应答你,你先别分配给我。通过设置预取计数值为1,阐明我现在只能处理惩罚一个使命,然后 rabbitmq 就会把该使命分配给没有那么忙的谁人空闲斲丧者。
  1. int prefechCount = 1
  2. channel.basicQos(prefetchCount);
复制代码

预取值

prefechCount:该值界说通道上答应的未确认消息的最大数量

斲丧者1,斲丧消息快,预期值设置为5,斲丧5个消息之后,开始不公中分发。

斲丧者2,斲丧消息慢,预期值设置为5,斲丧5个消息之后,开始不公中分发。

消息发布确认

开开导布确认

  1. channel.confirmSelect();
复制代码
单个确认发布

长处: 同步等待确认,简单。
缺点:发布消息速率慢,吞吐量有限。
单个确认发布:发布消息总数:1000,耗时为:1427ms

  1.         // 开启消息确认模式
  2.         channel.confirmSelect();
  3.         // 记录开始发布时间
  4.         long begin = System.currentTimeMillis();
  5.         // 要发送的消息
  6.         for (int i = 1; i <= MSG_COUNT; i++) {
  7.             String message = i + "Hello,我是你的宝贝";
  8.             // 消息持久化
  9.             channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  10.             // 等待消息确认
  11.             channel.waitForConfirms();
  12.         }
  13.         // 记录结束发布时间
  14.         long end = System.currentTimeMillis();
  15.         System.out.println("单个确认发布:发布消息总数:" + MSG_COUNT + ",耗时为:" + (end - begin) + "ms");
复制代码
斲丧者02

  1.         // 记录开始发布时间
  2.         long begin = System.currentTimeMillis();
  3.         int outstandingCount = 0;
  4.         int batchCount = 100;
  5.         // 要发送的消息
  6.         for (int i = 1; i <= MSG_COUNT; i++) {
  7.             String message = i + "Hello,我是你的宝贝";
  8.             // 消息持久化
  9.             channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  10.             // 等待消息确认,批量确认,100次确认一次
  11.             if (outstandingCount % batchCount == 0) {
  12.                 channel.waitForConfirms();
  13.                 outstandingCount = 0;
  14.             }
  15.             outstandingCount++;
  16.         }
  17.         // 记录结束发布时间
  18.         long end = System.currentTimeMillis();
  19.         System.out.println("批量确认发布:发布消息总数:" + MSG_COUNT + ",耗时为:" + (end - begin) + "ms");
复制代码

队列已满


  1.         /**
  2.          *  线程安全的哈希表,适合高并发场景
  3.          */
  4.         ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();
  5.         /**
  6.          * @deliveryTag: 消息序列号
  7.          * @multiple:
  8.          *    true:确认小于等于当前序列号的消息
  9.          *    false: 确认当前序列号的消息
  10.          */
  11.         ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
  12.             if (multiple == true) {
  13.                 // 如果是批量确认,删除已经应答的,剩下的就是未应答的
  14.                 // 返回小于等于当前序列号的确认消息
  15.                 ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(deliveryTag, true);
  16.                 longStringConcurrentNavigableMap.clear();
  17.             } else {
  18.                 // 如果是单个确认,删除已经应答的,剩下的就是未应答的
  19.                 map.remove(deliveryTag);
  20.             }
  21.             System.out.println("消息应答成功,序列号:"+deliveryTag);
  22.         };
  23.         ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
  24.             String s = map.get(deliveryTag);
  25.             System.out.println("消息应答失败:" + s + ",序列号:" + deliveryTag);
  26.         };
  27.         // 添加一个异步监听器
  28.         channel.addConfirmListener(ackCallback, nackCallback);
  29.         // 记录开始发布时间
  30.         long begin = System.currentTimeMillis();
  31.         // 要发送的消息
  32.         for (int i = 1; i <= MSG_COUNT; i++) {
  33.             String message = i + "Hello,我是你的宝贝";
  34.             // 消息持久化
  35.             channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  36.             // 消息确认
  37.             map.put(channel.getNextPublishSeqNo(),message);
  38.         }
  39.         // 记录结束发布时间
  40.         long end = System.currentTimeMillis();
  41.         System.out.println("异步确认发布:发布消息总数:" + MSG_COUNT + ",耗时为:" + (end - begin) + "ms");
复制代码
消息被拒绝

  1.         // 创建普通交换机
  2.         channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);
  3.         //设置过期时间10s
  4.         AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration("10000").build();
  5.         for (int i = 1; i < 11; i++) {
  6.             String msg = "Hello,你好:" + i;
  7.             channel.basicPublish(NORMAL_EXCHANGE, "normal", basicProperties, msg.getBytes());
  8.         }
  9.         connection.close();
  10.         channel.close();
复制代码



耽误队列

耽误队列就是用来存放须要在指定时间被处理惩罚的元素的队列。
架构图


使用TTL及死信队列实现延时队列

设置类

  1.         // 创建普通交换机
  2.         channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);
  3.         //创建死信交换机
  4.         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);
  5.         //正常队列绑定死信队列
  6.         HashMap<String,Object> params = new HashMap<>();
  7.         params.put("x-dead-letter-exchange",DEAD_EXCHANGE);
  8.         params.put("x-dead-letter-routing-key","dead");
  9.         // 创建普通队列
  10.         channel.queueDeclare(NORMAL_QUEUE, true, false, false, params);
  11.         // 创建死信队列
  12.         channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
  13.         // 绑定交换机跟队列
  14.         channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
  15.         channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
  16.         // 监听消息
  17.         DeliverCallback deliverCallback = (s, delivery) -> {
  18.             String message = new String(delivery.getBody(), "utf-8");
  19.             System.out.println("消费者01消费消息:" + message);
  20.         };
  21.         channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, s -> {});
复制代码
生产者

  1.         // 创建死信交换机
  2.         channel.exchangeDeclare(Consumer1.DEAD_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);
  3.         // 创建死信队列
  4.         channel.queueDeclare(Consumer1.DEAD_QUEUE, true, false, false, null);
  5.         // 绑定交换机跟队列
  6.         channel.queueBind(Consumer1.DEAD_QUEUE,Consumer1.DEAD_EXCHANGE,"dead");
  7.         // 监听消息
  8.         DeliverCallback deliverCallback = (s, delivery) -> {
  9.             String message = new String(delivery.getBody(), "utf-8");
  10.             System.out.println("消费者01消费消息:" + message);
  11.         };
  12.         channel.basicConsume(Consumer1.DEAD_QUEUE, true, deliverCallback, s -> {});
复制代码
斲丧者

  1.         // 设置普通队列长度
  2.         params.put("x-max-length",6);
复制代码
运行结果


延时队列优化

标题:第一条消息在 10S 后酿成了死信消息,然后被斲丧者斲丧掉,第二条消息在 40S 之后酿成了死信消息,然后被斲丧掉,如许一个延时队列就打造完成了。不外,假如如许使用的话,岂不是每增长一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,假如须要一个小时后处理惩罚,那么就须要增长 TTL 为一个小时的队列,假如是预定集会会议室然后提前关照如许的场景,岂不是要增长无数个队列才气满意需求?
架构图


设置类

  1.         // 监听消息
  2.         DeliverCallback deliverCallback = (s, delivery) -> {
  3.             String message = new String(delivery.getBody(), "UTF-8");
  4.             if(message.equals("Hello,你好:7")){
  5.                 // 第二个参数代表是否重复加入队列
  6.                 channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);
  7.                 System.out.println("消费者01拒绝消息:" + message);
  8.             }else{
  9.                 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
  10.                 System.out.println("消费者01消费消息:" + message);
  11.             }
  12.         };
  13.         channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, s -> {});
复制代码
生产者

  1. @Configuration
  2. public class RabbitMQConfig {
  3.     // 普通队列
  4.     public static final String QUEUE_10 = "QA";
  5.     public static final String QUEUE_40 = "QB";
  6.     // 死信队列
  7.     public static final String DEAD_QUEUE = "QD";
  8.     // 普通交换机
  9.     public static final String DIRECT_EXCHANGE = "X";
  10.     // 死信交换机
  11.     public static final String DEAD_DIRECT_EXCHANGE = "Y";
  12.     // 普通交换机routingkey
  13.     public static final String A_ROUTING_KEY = "XA";
  14.     public static final String B_ROUTING_KEY = "XB";
  15.     // 死信交换机routingkey
  16.     public static final String Y_ROUTING_KEY = "YD";
  17.     @Bean("xExchange")
  18.     public Exchange xExchange(){
  19.         return ExchangeBuilder.directExchange(DIRECT_EXCHANGE).durable(true).build();
  20.     }
  21.     @Bean("yExchange")
  22.     public Exchange yExchange(){
  23.         return ExchangeBuilder.directExchange(DEAD_DIRECT_EXCHANGE).durable(true).build();
  24.     }
  25.     @Bean("queue_10")
  26.     public Queue queue_10(){
  27.         HashMap<String, Object> args = new HashMap<>();
  28.         // 绑定死信交换机
  29.         args.put("x-dead-letter-exchange",DEAD_DIRECT_EXCHANGE);
  30.         args.put("x-dead-letter-routing-key",Y_ROUTING_KEY);
  31.         // 设置队列有效期10s
  32.         args.put("x-message-ttl",10000);
  33.         return QueueBuilder.durable(QUEUE_10).withArguments(args).build();
  34.     }
  35.     @Bean("queue_40")
  36.     public Queue queue_40(){
  37.         HashMap<String, Object> args = new HashMap<>();
  38.         // 绑定死信交换机
  39.         args.put("x-dead-letter-exchange",DEAD_DIRECT_EXCHANGE);
  40.         args.put("x-dead-letter-routing-key",Y_ROUTING_KEY);
  41.         // 设置队列有效期10s
  42.         args.put("x-message-ttl",40000);
  43.         return QueueBuilder.durable(QUEUE_40).withArguments(args).build();
  44.     }
  45.     @Bean("queue_dead")
  46.     public Queue queue_dead(){
  47.         return QueueBuilder.durable(DEAD_QUEUE).build();
  48.     }
  49.     @Bean
  50.     public Binding queueABind(@Qualifier("queue_10") Queue queue,@Qualifier("xExchange") Exchange exchange){
  51.         return BindingBuilder.bind(queue).to(exchange).with(A_ROUTING_KEY).noargs();
  52.     }
  53.     @Bean
  54.     public Binding queueBBind(@Qualifier("queue_40") Queue queue,@Qualifier("xExchange") Exchange exchange){
  55.         return BindingBuilder.bind(queue).to(exchange).with(B_ROUTING_KEY).noargs();
  56.     }
  57.     @Bean
  58.     public Binding queueDBind(@Qualifier("queue_dead") Queue queue,@Qualifier("yExchange") Exchange exchange){
  59.         return BindingBuilder.bind(queue).to(exchange).with(Y_ROUTING_KEY).noargs();
  60.     }
  61. }
复制代码
标题


消息属性上设置 TTL 的方式,消息大概并不会按时“殒命“,由于 RabbitMQ 只会查抄第一个消息是否逾期,假如逾期则丢到死信队列,假如第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到实行。
使用RabbitMQ插件实现延时队列

安装插件

插件名称:rabbitmq_delayed_message_exchange
下载地点:https://www.rabbitmq.com/community-plugins.html
插件上传到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/plugins 目次
  1. @Slf4j
  2. @RestController
  3. @RequestMapping("ttl")
  4. public class SendMsgController {
  5.      @Autowired
  6.      private RabbitTemplate rabbitTemplate;
  7.      @GetMapping("sendMsg/{message}")
  8.      public void sendMsg(@PathVariable String message){
  9.           log.info("当前时间:{},给两个ttl队列发送了一条消息:{}",new Date(),message);
  10.           rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.A_ROUTING_KEY,"消息来自ttl为10s的队列" + message);
  11.           rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.B_ROUTING_KEY,"消息来自ttl为40s的队列" + message);
  12.      }
  13. }
复制代码
架构图


在我们自界说的互换机中,该范例消息支持耽误投递机制 消息转达后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据体系)表中,当到达投递时间时,才投递到目标队列中。
设置类

  1. @Slf4j
  2. @Component
  3. public class DeadLetterConsumer {
  4.     @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)
  5.     public void getMsg(String message) {
  6.         log.info("当前时间:{},收到一条消息:{}", new Date().toString(), message);
  7.     }
  8. }
复制代码
生产者

  1.     // 不绑定过期时间的队列
  2.     public static final String QUEUE_C = "QC";
  3.     public static final String C_ROUTING_KEY = "XC";
  4.    
  5.     @Bean("queue_c")
  6.     public Queue queue_c(){
  7.         HashMap<String, Object> args = new HashMap<>();
  8.         // 绑定死信交换机
  9.         args.put("x-dead-letter-exchange",DEAD_DIRECT_EXCHANGE);
  10.         args.put("x-dead-letter-routing-key",Y_ROUTING_KEY);
  11.         return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
  12.     }
  13.    
  14.     @Bean
  15.     public Binding queueCBind(@Qualifier("queue_c") Queue queue,@Qualifier("xExchange") Exchange exchange){
  16.         return BindingBuilder.bind(queue).to(exchange).with(C_ROUTING_KEY).noargs();
  17.     }
复制代码
斲丧者

  1.      @GetMapping("sendExpireMsg/{message}/{ttlTime}")
  2.      public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
  3.           log.info("当前时间:{},给队列发送了一条延时{}毫秒的消息:{}",new Date(),ttlTime,message);
  4.           rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.C_ROUTING_KEY,message,correlationData -> {
  5.                correlationData.getMessageProperties().setExpiration(ttlTime);
  6.                return correlationData;
  7.           });
  8.      }
复制代码
运行结果


RabbitMQ别的知识点

消息幂等性

概念

斲丧者在斲丧 MQ 中的消息时,MQ 已把消息发送给斲丧者,斲丧者在给 MQ 返回 ack 时网络停止,
故 MQ 未收到确认信息,该条消息会重新发给其他的斲丧者,大概在网络重连后再次发送给该斲丧者,但实际上该斲丧者已乐成斲丧了该条消息,造成斲丧者斲丧了重复的消息。
包管消息幂等性

MQ 斲丧者的幂等性的办理一样寻常使用全局 ID 大概写个唯一标识比如时间戳 大概 UUID 大概可按自己的规则天生一个全局唯一 id,每次斲丧消息时用该 id 先判断该消息是否已斲丧过。
唯一 ID+指纹码机制

指纹码:我们的一些规则大概时间戳加别的服务给到的唯一信息码,它并不肯定是我们体系天生的,根本都是由我们的业务规则拼接而来,但是肯定要包管唯一性,然后就使用查询语句举行判断这个 id 是否存在数据库中,上风就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,假如是单个数
据库就会有写入性能瓶颈固然也可以接纳分库分表提拔性能,但也不是我们最保举的方式。
Redis原子性

使用 redis 实行 setnx 下令,天然具有幂等性。从而实现不重复斲丧
优先级队列

要让队列实现优先级须要做的变乱有如下变乱:队列须要设置为优先级队列,消息须要设置消息的优先
级,斲丧者须要等待消息已经发送到队列中才去斲丧,由于如许才有时机对消息举行排序。
队列设置优先级

  1. rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  2. systemctl restart rabbitmq-server
复制代码
消息设置优先级

  1.     //延时队列(借助插件)
  2.     public static final String DELAYED_QUEUE_NAME = "delayed.queue";
  3.     public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
  4.     public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
  5.     // 延时交换机
  6.     @Bean("delayedExchange")
  7.     public CustomExchange delayedExchange(){
  8.         HashMap<String,Object> args = new HashMap<>();
  9.         args.put("x-delayed-type","direct");
  10.         return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,args);
  11.     }
  12.     // 延时队列
  13.     @Bean("queue_delay")
  14.     public Queue queue_delay(){
  15.         return new Queue(DELAYED_QUEUE_NAME);
  16.     }
  17.     // 绑定
  18.     @Bean
  19.     public Binding queueDelayedBind(@Qualifier("queue_delay") Queue queue,@Qualifier("delayedExchange") Exchange exchange){
  20.         return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTING_KEY).noargs();
  21.     }
复制代码
RabbitMQ集群


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作