| 
 | 
 
大家好,我是 @明人只说暗话,本文为大家介绍RabbitMQ的【发布确认】相关的概念和用法,水平一般,能力有限,若有错误之处,欢迎指正。 
本文参考RabbitMQ官网文档: 
用过消息中间件的朋友,即使没遇到过,也应该听说过消失丢失的问题。但凡使用消息中间件,如何解决消息丢失问题便是一个绕不过去的坎。 
消失丢失大概分为三种情况: 
一、生产者问题。因为应用程序故障、网络抖动等各种原因,生产者没有成功向broker发送消息。 
二、消息中间件自身问题。broker没有将消息保存好,也许在消息持久化之前broker宕机了,导致消息丢失。 
三、消费者问题。消费者在消费消息时,因为没有合适的处理措施,导致broker将消费失败的消息从队列中删除了。 
为了解决消失丢失的问题,各个消息中间件都有自己的解决方案,本文主要讲述RabbitMQ是如何解决上述第一种情况的(第二种情况可以采用持久化机制;第三种情况可以采用消息应答)。 
其实,还有一种情况,因为路由或者交换机等配置的不对,导致消息发送失败。 
 
  
相关参数说明 
 
spring.rabbitmq.publisher-confirm-type 
 
要开启发布确认功能,需要配置spring.rabbitmq.publisher-confirm-type参数,可选值有三个:SIMPLE、CORRELATED和NONE。 
spring.rabbitmq.template.mandatory 
 
是否启用强制消息。 
当服务器无法将消息路由到队列时,如果将spring.rabbitmq.template.mandatory设置为true,它将使用return方法返回不可路由的消息。 
代码示例 
 
配置信息 
 
 
  
生产者代码 
 
@Slf4j 
@Component 
public class PublisherConfirmsProducer { 
 
    @Resource 
    private RabbitTemplate rabbitTemplate; 
 
    @PostConstruct 
    public void init() { 
        /* 
         * 回调函数 
         * @param correlationData 消息内容 
         * @param ack 是否成功 
         * @param cause 失败的原因 
         */ 
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { 
            if (ack) { 
                log.info("已经收到消息,Id为 : {}", correlationData.getId()); 
            } else { 
                log.info("未接收到消息:{},原因:{}", correlationData.getId(), cause); 
            } 
        }); 
 
        /* 
         * 消息退回,指消息未达到目的地被回退 
         */ 
        rabbitTemplate.setReturnsCallback(returnCallback -> log.info("【消息被退回】交换机:{},消息:{},退回原因:{}", 
                returnCallback.getExchange(), new String(returnCallback.getMessage().getBody()), returnCallback.getReplyText())); 
    } 
 
 
    public void send1() { 
        init(); 
        CorrelationData correlationData = new CorrelationData("1"); 
        rabbitTemplate.convertAndSend("directExchange", "rk001", "这是一条可以消费的消息", correlationData); 
    } 
 
    public void send2() { 
        init(); 
        CorrelationData correlationData = new CorrelationData("2"); 
        rabbitTemplate.convertAndSend("directExchange", "rk002", "这是一条无法到达的消息", correlationData); 
    } 
}如上面代码所示。 
我们首先通过rabbitTemplate.setConfirmCallback方法,设置了发送消息的回调函数(需要将参数spring.rabbitmq.publisher-confirm-type设置为SIMPLE或者CORRELATED)。 
correlationData参数表示我们发送消息时附带的额外参数,可以是业务ID等,方便执行其他业务逻辑。 
ack参数表示消息是否发送成功,true表示消息发送成功,false表示发送失败。 
cause参数表示没有发送到消息的错误信息。 
其次,通过rabbitTemplate.setReturnsCallback方法,设置了消息没有正常到达broker(如routing key 不匹配等)时的处理逻辑(需要将参数spring.rabbitmq.template.mandatory设置为true)。 
最后,我们发送了两条消息,第一条消息可以正常被消费,第二条消息因为routing key 不匹配,所以无法到达队列中。 
消费者代码 
 
@Component 
@Slf4j 
public class PublisherConfirmsConsumer { 
    @RabbitListener(bindings = {@QueueBinding(value = @Queue("queue.pc.001"), 
            exchange = @Exchange(value = "directExchange"), key = {"rk.001"})}) 
    public void process1(Message message, Channel channel) { 
        log.info(">>>>>>>>>> Receive: {}", message.getPayload()); 
    } 
 
    @RabbitListener(bindings = {@QueueBinding(value = @Queue("queue.pc.002"), 
            exchange = @Exchange("directExchange"), key = {"rk.001"})}) 
    public void process2(Message message, Channel channel) { 
        log.info(">>>>>>>>>> Receive: {}", message.getPayload()); 
    } 
}通过routing key绑定交换机和队列。 
单元测试代码 
 
@SpringBootTest(classes = RabbitmqDemoApplication.class) 
@RunWith(SpringJUnit4ClassRunner.class) 
class PublisherConfirmsProducerTest { 
 
    @Resource 
    private PublisherConfirmsProducer publisherConfirmsProducer; 
 
    @Test 
    void send() { 
        publisherConfirmsProducer.send1(); 
        publisherConfirmsProducer.send2(); 
    } 
}测试结果 
 
 
  
如上图所示,第一条消息被正常消费了,触发了rabbitTemplate.setConfirmCallback方法; 
第二条消息因为routing key不匹配,因此没有路由到队列中被消费(NO_ROUTE),触发了rabbitTemplate.setReturnsCallback方法。 
<hr/>我是 @明人只说暗话,以上就是本文内容,欢迎点赞、评论、关注。 |   
 
 
 
 |