欢迎光临
【RabbitMQ | 第五篇】RabbitMQ实现消息的可靠抵达
   

【RabbitMQ | 第五篇】RabbitMQ实现消息的可靠抵达

【RabbitMQ | 第五篇】RabbitMQ实现消息的可靠抵达,在这里插入图片描述,第1张

文章目录

  • 5.RabbitMQ实现消息的可靠抵达
    • 5.1引入背景
    • 5.2确认机制分类
      • 5.2.1ConfirmCallback (确认模式:消息生产者确认)
        • (1)开启确认配置
        • (2)实现ConfirmCallback回调接口
        • 5.2.2ReturnCallback(回退模式:交换机确认)
          • (1)开启回退配置
          • (2)实现ReturnCallback回调接口
          • 5.2.3ACK机制(确认模式:消费者确认)
            • (1)消息接收确认模式类型
            • (2)手动确认回复方法
            • (3)basicAck方法
            • (4)basicNack方法
            • (5)basicReject方法
            • (6)开启手动ack机制
            • (7)消费者消费消息并手动确认
            • 5.3总结

              5.RabbitMQ实现消息的可靠抵达

              5.1引入背景

              为保证消息不丢失,可靠抵达,可以使用事务消息,但是性能下降250倍,为此引入确认机制,来实现消息的可靠抵达

              5.2确认机制分类

              RabbitMQ 消息确认机制分为两大类:发送方确认、接收方确认

              其中发送方确认又分为:生产者到交换器到确认、交换器到队列的确认。如下图

              【RabbitMQ | 第五篇】RabbitMQ实现消息的可靠抵达,image-20240319095647841,第2张

              • confirmCallback 确认模式:确认消息是否到达交换机
              • returnCallback退回模式:若消息没有传递给指定队列,就触发这个失败回调
              • ack机制:消费者确认模式
                • CK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除
                • 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中

                  5.2.1ConfirmCallback (确认模式:消息生产者确认)

                  (1)开启确认配置
                  #老版本
                  spring:
                  	rabbitmq:
                  		publisher-confirms: true
                  		
                  #新版本
                  spring:
                  	rabbitmq:
                  		publisher-confirms-type: correlated
                  
                  (2)实现ConfirmCallback回调接口
                  • ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
                    @Configuration
                    public class MyRabbitConfig {
                        
                        @Autowired
                        RabbitTemplate rabbitTemplate;
                        @Bean
                        public MessageConverter messageConverter(){
                            return new Jackson2JsonMessageConverter();
                        }
                        
                        //消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才会调用 confirmCallback。
                        //确认消息送到交换机(Exchange)回调
                        @PostConstruct		//创建MyBabbiConfig对象后,执行该方法
                        public void initRabbitTemplate(){
                            //确认消息送到队列(Queue)回调
                            rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
                            {
                                @Override
                                public void returnedMessage(ReturnedMessage returnedMessage)
                                {
                                    System.out.println("\n确认消息送到队列(Queue)结果:");
                                    System.out.println("发生消息:" + returnedMessage.getMessage());
                                    System.out.println("回应码:" + returnedMessage.getReplyCode());
                                    System.out.println("回应信息:" + returnedMessage.getReplyText());
                                    System.out.println("交换机:" + returnedMessage.getExchange());
                                    System.out.println("路由键:" + returnedMessage.getRoutingKey());
                                }
                            });
                    	}
                    }		
                    

                    被 broker 接收到只能表示 message 已经到达交换机,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback

                    5.2.2ReturnCallback(回退模式:交换机确认)

                    • 过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调
                    • 该方法可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非你代码写错了。
                      (1)开启回退配置
                      spring:
                      	rabbit:
                      		#开启发送端消息抵达Queue确认
                      		publisher-returns: true
                      		#只要消息不能抵达queue时,该消息不会被丢弃,而是会被返回给生产者:可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据
                      		template:
                      			mandatory: true
                      
                      (2)实现ReturnCallback回调接口
                      @Configuration
                      public class MyRabbitConfig {
                          
                          @Autowired
                          RabbitTemplate rabbitTemplate;
                          @Bean
                          public MessageConverter messageConverter(){
                              return new Jackson2JsonMessageConverter();
                          }
                          
                          @PostConstruct		//创建MyBabbiConfig对象后,执行该方法
                          public void initRabbitTemplate(){
                              
                              //确认消息送到队列(Queue)失败回调:注意是失败
                              rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
                              {
                                  @Override
                                  public void returnedMessage(ReturnedMessage returnedMessage)
                                  {
                                      System.out.println("\n确认消息送到队列(Queue)结果:");
                                      System.out.println("发生消息:" + returnedMessage.getMessage());
                                      System.out.println("回应码:" + returnedMessage.getReplyCode());
                                      System.out.println("回应信息:" + returnedMessage.getReplyText());
                                      System.out.println("交换机:" + returnedMessage.getExchange());
                                      System.out.println("路由键:" + returnedMessage.getRoutingKey());
                                  }
                              });
                              
                              //确认消息送到队列(Queue)回调
                              rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
                              {
                                  @Override
                                  public void returnedMessage(ReturnedMessage returnedMessage)
                                  {
                                      System.out.println("\n确认消息送到队列(Queue)结果:");
                                      System.out.println("发生消息:" + returnedMessage.getMessage());
                                      System.out.println("回应码:" + returnedMessage.getReplyCode());
                                      System.out.println("回应信息:" + returnedMessage.getReplyText());
                                      System.out.println("交换机:" + returnedMessage.getExchange());
                                      System.out.println("路由键:" + returnedMessage.getRoutingKey());
                                  }
                              });
                      	}
                      }
                      

                      5.2.3ACK机制(确认模式:消费者确认)

                      1. 消费者确认发生在监听队列的消费者处理业务失败,如:发生了异常,不符合要求的数据等,这些场景我们就需要手动处理消息,比如重新发送或者丢弃
                      2. RabbitMQ 消息确认机制 (ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
                      (1)消息接收确认模式类型
                      1. AcknowledgeMode.NONE:自动确认

                        • 默认自动ack,消息被消费者收到(注意:只是收到),就会从broker的queue中移除
                        • 存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了
                        • AcknowledgeMode.AUTO:根据情况确认。

                        • AcknowledgeMode.MANUAL:手动确认

                          确认过程:就算消费者已经拿到了消息,但是没有确认,队列中的消息仍然不能移除,只不过状态由ready变为unacked,消息处理分为以下三种情况:

                          1. 消息处理成功ack(),接受下一个消息,此消息broker就会移除
                          2. 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
                          3. 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人

                          【RabbitMQ | 第五篇】RabbitMQ实现消息的可靠抵达,img,第3张

                      (2)手动确认回复方法
                      • 消费者获取到消息,成功处理,可以回复Ack给Broker
                        • basic.ack:用于肯定确认broker将移除此消息
                        • basic.nack:用于否定确认可以指定broker是否丢弃此消息,可以批量
                        • basic.reject:用于否定确认当前消息;同上,但不能批量
                          (3)basicAck方法

                          basicAck 方法用于确认当前消息,Channel 类中的 basicAck 方法定义如下:

                          void basicAck(long deliveryTag, boolean multiple) throws IOException;
                          

                          参数说明:

                          1. long deliveryTag唯一标识 ID,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
                          2. boolean multiple: 是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。
                          (4)basicNack方法
                          1. basicNack 方法用于否定当前消息
                          2. basicReject 方法一次只能拒绝一条消息
                          3. 如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现,方法定义如下:
                          void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
                          

                          参数说明:

                          1. long deliveryTag: 唯一标识 ID。
                          2. boolean multiple: 上面已经解释。
                          3. boolean requeue: 如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。
                          (5)basicReject方法

                          basicReject 方法用于明确拒绝当前的消息而不是确认。

                          Channel 类中的basicReject 方法定义如下:

                          void basicReject(long deliveryTag, boolean requeue) throws IOException;
                          

                          参数说明:

                          1. long deliveryTag: 唯一标识 ID。
                          2. boolean requeue: 上面已经解释。

                          测试场景:

                          • 发送五个消息测试,
                          • 此时关闭服务服务,消息的状态由unacked变为ready,下次客户端服务启动又会接收到消息ready变为unacked
                          • 除非手动确认
                          (6)开启手动ack机制
                          spring:
                          	listener:
                          		simple:
                          			acknowledge-mode: manual
                          
                          (7)消费者消费消息并手动确认
                          package com.pjb.receiver;
                           
                          import com.rabbitmq.client.Channel;
                          import org.springframework.amqp.core.Message;
                          import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
                          import org.springframework.stereotype.Component;
                           
                          /**
                           * 接收者
                           * @author pan_junbiao
                           **/
                          @Component
                          public class Receiver implements ChannelAwareMessageListener
                          {
                              @Override
                              public void onMessage(Message message, Channel channel) throws Exception
                              {
                                  long deliveryTag = message.getMessageProperties().getDeliveryTag();
                                  try
                                  {
                                      if ("queue_name".equals(message.getMessageProperties().getConsumerQueue()))
                                      {
                                          System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
                                          System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
                                          System.out.println("执行queue_name中的消息的业务处理流程......");
                                      }
                           
                                      if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue()))
                                      {
                                          System.out.println("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue());
                                          System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
                                          System.out.println("执行fanout.A中的消息的业务处理流程......");
                                      }
                           
                                      /**
                                       * 确认消息,参数说明:
                                       * long deliveryTag:唯一标识 ID。
                                       * boolean multiple:是否批处理,当该参数为 true 时,
                                       * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
                                       */
                                      channel.basicAck(deliveryTag, true);
                           
                                      /**
                                       * 否定消息,参数说明:
                                       * long deliveryTag:唯一标识 ID。
                                       * boolean multiple:是否批处理,当该参数为 true 时,
                                       * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
                                       * boolean requeue:如果 requeue 参数设置为 true,
                                       * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
                                       * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
                                       * 而不会把它发送给新的消费者。
                                       */
                                      //channel.basicNack(deliveryTag, true, false);
                                  }
                                  catch (Exception e)
                                  {
                                      e.printStackTrace();
                           
                                      /**
                                       * 拒绝消息,参数说明:
                                       * long deliveryTag:唯一标识 ID。
                                       * boolean requeue:如果 requeue 参数设置为 true,
                                       * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
                                       * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
                                       * 而不会把它发送给新的消费者。
                                       */
                                      channel.basicReject(deliveryTag, true);
                                  }
                              }
                          }
                          

                          5.3总结

                          RabbitMQ系列第五篇介绍了实现消息的可靠抵达的两大模式:发送者确认、消费者确认;其中发送确认又可以分为消息生产者到交换机的确认(confirmcallback接口:消息到达交换机回调)、交换机到队列的确认(returncallback接口:消息到达不了队列回调);而消费者回调ACK机制可分为自动确认、手动确认、根据情况确认三种类型;自动确认可能会出现消息丢失问题(消息到达消费者后,队列立刻删除该消息,但是此时消费者次此时出现异常或者宕机),手动确认的三个方法(basicAck、basicNack、basicReject)

                          【RabbitMQ | 第五篇】RabbitMQ实现消息的可靠抵达,在这里插入图片描述,第4张

                           
打赏
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《【RabbitMQ | 第五篇】RabbitMQ实现消息的可靠抵达》
文章链接:https://goodmancom.com/wl/175969.html