|
大家好,我是 @明人只说暗话,本文为大家介绍RabbitMQ的【发布确认】相关的概念和用法,水平一般,能力有限,若有错误之处,欢迎指正。
本文参考RabbitMQ官网文档:
用过消息中间件的朋友,即使没遇到过,也应该听说过消失丢失的问题。但凡使用消息中间件,如何解决消息丢失问题便是一个绕不过去的坎。
消失丢失大概分为三种情况:
一、生产者问题。因为应用程序故障、网络抖动等各种原因,生产者没有成功向broker发送消息。
二、消息中间件自身问题。broker没有将消息保存好,也许在消息持久化之前broker宕机了,导致消息丢失。
三、消费者问题。消费者在消费消息时,因为没有合适的处理措施,导致broker将消费失败的消息从队列中删除了。
为了解决消失丢失的问题,各个消息中间件都有自己的解决方案,本文主要讲述RabbitMQ是如何解决上述第一种情况的(第二种情况可以采用持久化机制;第三种情况可以采用消息应答)。
其实,还有一种情况,因为路由或者交换机等配置的不对,导致消息发送失败。

相关参数说明
spring.rabbitmq.publisher-confirm-type
要开启发布确认功能,需要配置spring.rabbitmq.publisher-confirm-type参数,可选值有三个:SIMPLE、CORRELATED和NONE。
spring.rabbitmq.template.mandatory
是否启用强制消息。
当服务器无法将消息路由到队列时,如果将spring.rabbitmq.template.mandatory设置为true,它将使用return方法返回不可路由的消息。
代码示例
配置信息

生产者代码
@Slf4j
@Component
public class PublisherConfirmsProducer {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
/*
* 回调函数
* @param correlationData 消息内容
* @param ack 是否成功
* @param cause 失败的原因
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("已经收到消息,Id为 : {}", correlationData.getId());
} else {
log.info("未接收到消息:{},原因:{}", correlationData.getId(), cause);
}
});
/*
* 消息退回,指消息未达到目的地被回退
*/
rabbitTemplate.setReturnsCallback(returnCallback -> log.info("【消息被退回】交换机:{},消息:{},退回原因:{}",
returnCallback.getExchange(), new String(returnCallback.getMessage().getBody()), returnCallback.getReplyText()));
}
public void send1() {
init();
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend("directExchange", "rk001", "这是一条可以消费的消息", correlationData);
}
public void send2() {
init();
CorrelationData correlationData = new CorrelationData("2");
rabbitTemplate.convertAndSend("directExchange", "rk002", "这是一条无法到达的消息", correlationData);
}
}如上面代码所示。
我们首先通过rabbitTemplate.setConfirmCallback方法,设置了发送消息的回调函数(需要将参数spring.rabbitmq.publisher-confirm-type设置为SIMPLE或者CORRELATED)。
correlationData参数表示我们发送消息时附带的额外参数,可以是业务ID等,方便执行其他业务逻辑。
ack参数表示消息是否发送成功,true表示消息发送成功,false表示发送失败。
cause参数表示没有发送到消息的错误信息。
其次,通过rabbitTemplate.setReturnsCallback方法,设置了消息没有正常到达broker(如routing key 不匹配等)时的处理逻辑(需要将参数spring.rabbitmq.template.mandatory设置为true)。
最后,我们发送了两条消息,第一条消息可以正常被消费,第二条消息因为routing key 不匹配,所以无法到达队列中。
消费者代码
@Component
@Slf4j
public class PublisherConfirmsConsumer {
@RabbitListener(bindings = {@QueueBinding(value = @Queue("queue.pc.001"),
exchange = @Exchange(value = "directExchange"), key = {"rk.001"})})
public void process1(Message message, Channel channel) {
log.info(">>>>>>>>>> Receive: {}", message.getPayload());
}
@RabbitListener(bindings = {@QueueBinding(value = @Queue("queue.pc.002"),
exchange = @Exchange("directExchange"), key = {"rk.001"})})
public void process2(Message message, Channel channel) {
log.info(">>>>>>>>>> Receive: {}", message.getPayload());
}
}通过routing key绑定交换机和队列。
单元测试代码
@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
class PublisherConfirmsProducerTest {
@Resource
private PublisherConfirmsProducer publisherConfirmsProducer;
@Test
void send() {
publisherConfirmsProducer.send1();
publisherConfirmsProducer.send2();
}
}测试结果

如上图所示,第一条消息被正常消费了,触发了rabbitTemplate.setConfirmCallback方法;
第二条消息因为routing key不匹配,因此没有路由到队列中被消费(NO_ROUTE),触发了rabbitTemplate.setReturnsCallback方法。
<hr/>我是 @明人只说暗话,以上就是本文内容,欢迎点赞、评论、关注。 |
|