办公问答网

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 146|回复: 0

rabbitmq:publisher confirms

[复制链接]

1

主题

4

帖子

4

积分

新手上路

Rank: 1

积分
4
发表于 2022-11-27 19:30:40 | 显示全部楼层 |阅读模式
0.背景

最近在我们的业务系统中遇到一个问题,
publisher行为:convertAndSend然后打日志。
consumer行为:@RabbitListener接到消息立刻打日志。
问题是,publisher打出了发送消息的日志,consumer没打出收到消息的日志。
基于这种情况,准备启用rabbitmq java client的ReturnCallback及ConfirmCallback机制,先确认消息是否成功发到了正确的queue里面。
之前没有用Callback,因为对于我们的场景,Rabbitmq还是非常稳定的,即使极少出现的异常情况,我们也有办法把丢掉的消息补发,因此没必要浪费Channel资源去让rabbitmq server给发送确认信息,也不想平白增加系统复杂性。
1.代码实现

一般我们使用rabbitmq可能会配置下面几个bean(不论通过何种方式,xml,@Configuretion,或者spring boot的autoconfigure),在此基础上,添加一些属性设置:
@Configuration
public class MqConfig {
   
    @Value("${rabbitmq.enableConfirm}")
    private boolean enableConfirm;

    @Value("${rabbitmq.enableReturn}")
    private boolean enableReturn;
   
    @Value("${rabbitmq.enableMessageCorrelation}")
    private boolean enableMessageCorrelation;
   
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        //省略其它属性设置...
        
        //根据配置决定是否开启 Confirm 机制
        connectionFactory.setPublisherConfirms(enableConfirm);
        //根据配置决定是否开启 Return 机制
        connectionFactory.setPublisherReturns(enableReturn);
        return connectionFactory;
    }
   
    @Bean
    public RabbitTemplate rabbitTemplate() throws Exception {
        //根据配置决定使用哪种 RabbitTemplate
        RabbitTemplate template = enableMessageCorrelation ?
                new CorrelationRabbitTemplate(connectionFactory()) :
                new RabbitTemplate(connectionFactory());
        //省略其它属性设置...
        
        //如果启用 Confirm 机制,设置 ConfirmCallback
        if (enableConfirm) {
            template.setConfirmCallback(confirmCallback());
        }
        //如果启用 Return 机制,设置 ReturnCallback,及打开 Mandatory
        if (enableReturn) {
            template.setReturnCallback(returnCallback());
            template.setMandatory(true);
        }
        return template;
    }
}对于Publisher而言,以上两个bean足以。
下面是 RabbitTemplate中需要的ConfirmCallback和ReturnCallback:
    @Bean
    @ConditionalOnMissingBean(value = RabbitTemplate.ConfirmCallback.class)
    public RabbitTemplate.ConfirmCallback confirmCallback() {
        return new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                // do something ...
        };
    }

    @Bean
    @ConditionalOnMissingBean(value = RabbitTemplate.ReturnCallback.class)
    public RabbitTemplate.ReturnCallback returnCallback() {
        return new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                // do something ...
            }
        };
    }ok,关于这两个Callback:

  • ConfirmCallback:每一条发到rabbitmq server的消息都会调一次confirm方法。

    • 如果消息成功到达exchange,则ack参数为true,反之为false;
    • cause参数是错误信息;
    • CorrelationData可以理解为context,在发送消息时传入的这个参数,此时会拿到。



  • ReturnCallback:成功到达exchange,但routing不到任何queue时会调用。

    • 可以看到这里能直接拿到message,exchange,routingKey信息。


这里会遇到一个问题,当ConfirmCallback被调用,且ack参数为false时,意味着这条消息可能发送失败了,那我可能想把这条消息在这里保存下来,比如打条日志,以免消息丢失,但对于ConfirmCallback,是不能像ReturnCallback一样直接拿到message的。
此时,我们可以利用CorrelationData这个参数,把message塞到这个参数里面。
我们扩展一下CorrelationData类:
public class MessageCorrelationData extends CorrelationData {

    // 其它属性 ...

    private Message message;

    // 各种构造函数及getter,setter ...
}现在,我们需要在发送消息时,把message设置到MessageCorrelationData中随message一起发出去。
在上面RabbitTemplate bean的配置中,可以看到这行代码:
RabbitTemplate template = enableMessageCorrelation ?
                new CorrelationRabbitTemplate(connectionFactory()) :
                new RabbitTemplate(connectionFactory());这个是因为不想改老代码,所以对RabbitTemplate类做一下扩展:
public class CorrelationRabbitTemplate extends RabbitTemplate {
   
    //...

    @Override
    public void send(final String exchange, final String routingKey,
                     final Message message, final CorrelationData correlationData)
            throws AmqpException {
        super.send(exchange, routingKey, message, correlationData == null ? new MessageCorrelationData(message) : correlationData);
    }
}不管调用RabbitTemplate的哪个方法发送消息,最终都是调用某个send方法,所以,我们重写这个方法,把MessageCorrelationData给塞进去。
ok,这样老代码不需要做任何改动,改一下rabbitmq的配置文件,就能在ConfirmCallback中拿到发出去但可能发送失败的message,拿到了message,那么,为所欲为吧。
2.为什么ConfirmCallback中不能直接拿到message

为了能在ConfirmCallback中拿到message,拐了好大一个弯,为什么不直接给我呢?这是一个很自然的问题。
简单追一下源码,看ConfirmCallback和ReturnCallback分别是在哪里被调用的。
可以在com.rabbitmq.client.impl.AMQChannel类中找到这个方法:
public void handleCompleteInboundCommand(AMQCommand command) throws IOException {
        // First, offer the command to the asynchronous-command
        // handling mechanism, which gets to act as a filter on the
        // incoming command stream.  If processAsync() returns true,
        // the command has been dealt with by the filter and so should
        // not be processed further.  It will return true for
        // asynchronous commands (deliveries/returns/other events),
        // and false for commands that should be passed on to some
        // waiting RPC continuation.
        if (!processAsync(command)) {
            // The filter decided not to handle/consume the command,
            // so it must be some reply to an earlier RPC.
            RpcContinuation nextOutstandingRpc = nextOutstandingRpc();
            // the outstanding RPC can be null when calling Channel#asyncRpc
            if(nextOutstandingRpc != null) {
                nextOutstandingRpc.handleCommand(command);
                markRpcFinished();
            }
        }
    }大部分rabbitmq java client与rabbitmq server的交互都会走到这里来,除了heartbeat等一些特殊的交互。
追踪到processAsync(command)这个方法:
@Override public boolean processAsync(Command command) throws IOException {
        Method method = command.getMethod();
        if (method instanceof Channel.Close) {
            asyncShutdown(command);
            return true;
        }
        if (isOpen()) {
            if (method instanceof Basic.Deliver) {
                processDelivery(command, (Basic.Deliver) method);
                return true;
            } else if (method instanceof Basic.Return) {
                callReturnListeners(command, (Basic.Return) method);
                return true;
            } else if (method instanceof Channel.Flow) {
                // ...
                return true;
            } else if (method instanceof Basic.Ack) {
                Basic.Ack ack = (Basic.Ack) method;
                callConfirmListeners(command, ack);
                handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
                return true;
            } else if (method instanceof Basic.Nack) {
                Basic.Nack nack = (Basic.Nack) method;
                callConfirmListeners(command, nack);
                handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
                return true;
            } else if (method instanceof Basic.RecoverOk) {
                // ...
                return false;
            } else if (method instanceof Basic.Cancel) {
                // ...
                return true;
            } else {
                return false;
            }
        } else {
            if (method instanceof Channel.CloseOk) {
                return false;
            } else {
                return true;
            }
        }
    }command中包含了method信息,根据method的不同,做相应的处理。
可以看到:

  • ReturnCallback在收到Basic.Return时调用;
  • ConfirmCallback在收到Basic.Ack或Basic.Nack时调用,根据debug,Channel.Close也会调。
再去官网看一下basic.ack这些是什么东西。
根据官网描述,个人对callback过程中client与server的交互做如下总结,如有错误,欢迎指正:
client: confirm.select; // 如果收到消息并处理完毕请通知我
server: confirm.select-ok; // 好的,我已经给这个 channel 开通了 confirm 模式,你用这个 channel 发消息给我,我还用这个 channel 通知你
... // 可能还有一些其它对话
client: basic.publish; // 给你发了一条消息,注意查收
server: basic.nack; // 不好意思,我这边处理出了点问题,你重新发一次
client: basic.publish; // 好吧
server: basic.nack; // 你这个 exchange 不存在
... // 可能伴随着 channel shutdown 和 reset
client: basic.publish; // ok,我的错,我已纠正,应该没问题了
server: basic.return; // 抱歉,你这个 exchange 上没这个 routingKey
server: basic.ack; // 虽然 routingKey 没有,但这个 exchange 没问题
client: basic.publish; // ok,我改了
server: basic.ack; // 收到,所有的 queue 都已落盘,没问题官网好像没有介绍basic.ack等这些东西分别包含哪些信息在里面,可能在AMQP协议里面有。
那么回到代码,找一下,两个callback的那些参数都是在哪里设置进去的。
对于ReturnCallback,找到com.rabbitmq.client.impl.ChannelN类的这个方法:
private void callReturnListeners(Command command, Basic.Return basicReturn) {
        try {
            for (ReturnListener l : this.returnListeners) {
                l.handleReturn(basicReturn.getReplyCode(),
                    basicReturn.getReplyText(),
                    basicReturn.getExchange(),
                    basicReturn.getRoutingKey(),
                    (BasicProperties) command.getContentHeader(),
                    command.getContentBody());
            }
        } catch (Throwable ex) {
            getConnection().getExceptionHandler().handleReturnListenerException(this, ex);
        }
    }可以看到,消息体就是command.getContentBody()这里拿到的,所以是basic.return附带的。
对于ConfirmCallback,找到org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl类的processAck方法:
private synchronized void processAck(long seq, boolean ack, boolean multiple, boolean remove) {
        if (multiple) {
            // ...
        }
        else {
            Listener listener = this.listenerForSeq.remove(seq);
            if (listener != null) {
                SortedMap<Long, PendingConfirm> confirmsForListener = this.pendingConfirms.get(listener);
                PendingConfirm pendingConfirm;
                if (remove) {
                    pendingConfirm = confirmsForListener.remove(seq);
                }
                else {
                    pendingConfirm = confirmsForListener.get(seq);
                }
                if (pendingConfirm != null) {
                    doHandleConfirm(ack, listener, pendingConfirm);
                }
            }
            else {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(this.delegate.toString() + " No listener for seq:" + seq);
                }
            }
        }
    }可以看到,basic.ack给了一个seq参数,这个参数是server给确认的时候在delivery-tag里面带过来的,所以,我们看到server并没有给CorrelationData数据,那么CorrelationData就应该是放在本地内存里的。
进一步查看源码可以知道,这个seq参数是对当前channel发送的消息的一个序列号,发送消息的时候,CorrelationData放在本地与seq关联,server给confirm的时候会给一个delivery-tag,在这种场景下用来指明这是这个channel中第几条消息的confirm,再从本地内存中取出相应的CorrelationData交给ConfirmCallback。
这样来看,ConfirmCallback是有能力直接提供message信息的,只是java client没有这么实现。
3.CorrelationData的id与MessageProperties的correlationId

从上面可以知道CorrelationData其实不依赖它的id属性来区分,这个id属性完全可以不设置,实际上我上面的示例中就没有设置,并不影响功能。
那么这个id是干嘛用的呢?看了下源码,只在org.springframework.amqp.rabbit.AsyncRabbitTemplate类里面用到了,具体没有深究。
MessageProperties类里面有一个CorrelationId属性,这个官网有一个做RPC的示例。
都是用来做reply确认的,除此以外,好像没啥关联?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|办公问答网

GMT+8, 2025-3-15 18:46 , Processed in 0.082282 second(s), 22 queries .

Powered by Discuz! X3.4

© 2001-2013 Comsenz Inc. Templated By 【未来科技 www.veikei.com】设计

快速回复 返回顶部 返回列表