侧边栏壁纸
博主头像
包包博主等级

talk is cheap,show me the code

  • 累计撰写 25 篇文章
  • 累计创建 59 个标签
  • 累计收到 55 条评论

RabbitMQ死信队列和延时队列

包包
2021-07-07 / 0 评论 / 15 点赞 / 2,635 阅读 / 8,605 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-04-19,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

死信队列

1.死信概念

死信,顾名思义就是无法被消费的消息。一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,接收死信消息的队列就叫死信队列

2.成为死信的条件

一个消息如果满足下列条件之一,会进入到死信路由(注意是路由,不是队列,一个路由可以对应多个队列):

  • 消息的TTL到了,消息过期了仍然没有被消费
  • 消息被consumer拒收了(手动确认中调用basicReject或者basicNack),并且拒收方法的requeue参数是false,也就是说不会重新入队被其他消费者消费
  • 队列的长度限制满了,排在前面的消息会被丢弃或者进入死信路由

一旦某个队列中有消息满足了成为死信的条件,如果该队列设置了死信交换机(Dead Letter Exchange)和死信路由键,那么满足死信条件的消息就会交由死信交换机,死信交换机会根据死信路由键将死信消息投递到对应的死信队列

注意:死信交换机本质上就是一个普通的交换机,只是因为队列设置了参数指定了死信交换机,这个普通的交换机才成为了死信的接收者

2.1 消息的TTL过期

TTL指消息的存活时间,如果消息从进入队列开始,直到达到TTL仍然没有被任何消费者消费,那么这个消息将成为死信

RabbitMQ可以对队列和消息分别设置TTL。对队列设置TTL对队列中的所有消息都生效。如果队列和消息同时设置了TTL,那么会取TTL小的。可以通过设置消息的expiration字段或者队列的x-message-ttl属性来设置TTL

我们按照如下架构在rabbitmq中创建队列交换机和队列

其中的要点是:

  • 要给普通队列normal.queue设置以下参数:
    • x-message-ttl:指定消息过期TTL
    • x-dead-letter-exchange:指定队列关联的死信交换机
    • x-dead-letter-routing-key:指定队列死信交换机绑定的路由键
  • 死信交换机dead.letter.exchange要通过指定的死信路由键x-dead-letter-routing-key绑定到死信队列dead.letter.queue
@Configuration
@Slf4j
public class RabbitConfig {
    // 添加json格式序列化器
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    // 创建普通交换机
    @Bean
    public Exchange normalExchange(){
        return ExchangeBuilder.directExchange("normal.exchange").durable(true).build();
    }

    // 创建普通队列,设置ttl为5秒,绑定死信交换机
    @Bean
    public Queue normalQueue(){
        return QueueBuilder.durable("normal.queue").ttl(5000)
                .deadLetterExchange("dead.letter.exchange").deadLetterRoutingKey("dead").build();
    }

    // 创建普通交换机和普通队列的绑定关系
    @Bean
    public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
    }

    // 创建死信交换机
    @Bean
    public Exchange deadLetterExchange(){
        return ExchangeBuilder.directExchange("dead.letter.exchange").durable(true).build();
    }

    // 创建死信队列
    @Bean
    public Queue deadLetterQueue(){
        return QueueBuilder.durable("dead.letter.queue").build();
    }

    // 创建普通交换机和普通队列的绑定关系
    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterExchange") Exchange exchange, @Qualifier("deadLetterQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("dead").noargs();
    }
}

测试发送5条消息给普通交换机normal.exchange

@Test
public void testSendMessage() {
    for (int i = 0; i < 5; i++) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("normal.exchange", "normal",
                                      new User("baobao" + i, 18, new Date()), correlationData);
    }
}

发现一开始消息出现在普通队列

5秒之后TTL到期,消息会全部转移到死信队列中

2.2 消息被consumer拒收

沿用上一小节的架构,只是在创建普通队列时去掉TTL设置

注意:由于修改了普通队列的设置,所以在后续启动程序之前要先在控制台删掉原来的普通队列由程序重新创建,否则会报错

然后在普通队列消费者中拒收消息(注意拒收的前提是要开启消息的手动确认)

@Component
@Slf4j
public class UserConsumer {
    @RabbitListener(queues = "normal.queue")
    public void handleUserMessage(Message message, User user, Channel channel) throws IOException {
        log.info(user.toString());
        // 拒收消息,不重新入队,让消息成为死信
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }
}

此时发送消息之后再启动消费者,消息将会全部转移到死信队列中

2.3 队列达到最大长度

继续沿用之前的架构,只是要在创建普通队列时添加x-max-length参数,指定队列的最大长度

// 创建普通队列
@Bean
public Queue normalQueue(){
    return QueueBuilder.durable("normal.queue").maxLength(5) // 指定最大长度为5
        .deadLetterExchange("dead.letter.exchange").deadLetterRoutingKey("dead").build();
}

此时尝试发送8条消息给普通队列,最终会发现普通队列只有5条消息,另外3条消息被转移到了死信队列

延时队列具体实现

延时队列顾名思义就是队列中的消息希望在一定时间后再被处理,其使用场景举例如下:

  • 订单在10分钟之内未支付则自动取消
  • 拍卖成功后进入公示状态,公示10分钟后取消公示状态
  • 用户注册成功后,如果超过24小时没有登录则进行短信提醒

1.死信队列实现方式

我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某个指定的交换机。结合这2点就可以实现一个延时队列

我们可以先给一个普通的交换机发送我们希望具有延时功能的消息,该交换机将这些消息放到一个特殊的队列,这个队列做了如下设置:

  1. 指定了队列中消息的TTL
  2. 指定了消息超时成为死信之后,要将死信交给哪个交换机
  3. 指定了将死信交给指定交换机后,交换机用什么路由键将这些死信发送给指定的死信队列

这样最终我们只要从指定的死信队列中取出的消息就是延时消息了

考虑这样一个场景,一个订单模块生成订单后,要在1分钟后检查订单是否已经支付,未支付则取消订单,那么就可以用延时队列来实现:

  1. 在rabbitmq中创建一个交换机order.event.exchange,绑定两个队列:
    • 延时队列order.delay.queue,绑定路由键为order.create
    • 死信队列order.release.queue,绑定路由键为order.release
  2. 生成订单后,将消息发送给交换机order.event.exchange,指定路由键为order.create,然后交换机根据路由键将消息交给延时队列order.delay.queue
  3. 延时队列order.delay.queue设置了3个参数
    • x-message-ttl=60000,指定队列中消息的TTL为60秒
    • x-dead-letter-exchange=order.event.exchange,指定队列中消息成为死信后交给交换机order.event.exchange
    • x-dead-letter-routing-key=order.release,指定死信交给交换机order.event.exchange时用的路由键
  4. 延时队列order.delay.queue中的消息成为死信后,会再次交还给交换机order.event.exchange,然后根据路由键会到达死信队列order.release.queue。从死信队列中取出的消息就是延时了1分钟的消息,这样就可以检查订单是否已支付了

首先在容器中创建出交换机和队列,并添加绑定关系

@Configuration
public class MyRabbitConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 创建订单事件交换机
    @Bean
    public Exchange orderEventExchange(){
        return ExchangeBuilder.directExchange("order.event.exchange").durable(true).build();
    }

    // 创建延时队列
    @Bean
    public Queue orderDelayQueue(){
        // 创建队列需要的参数
        Map<String, Object> args = new HashMap<>();
        // 设置TTL
        args.put("x-message-ttl", 60000);
        // 设置死信的目的交换机
        args.put("x-dead-letter-exchange", "order.event.exchange");
        // 设置死信交给目的交换机时的路由键
        args.put("x-dead-letter-routing-key", "order.release");
        return QueueBuilder.durable("order.delay.queue").withArguments(args).build();
    }

    // 创建死信队列
    @Bean
    public Queue orderReleaseQueue(){
        return QueueBuilder.durable("order.release.queue").build();
    }

    // 设置交换机和延时队列的绑定关系
    @Bean
    public Binding bindingDelayQueue(@Qualifier("orderEventExchange") Exchange exchange, @Qualifier("orderDelayQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("order.create").noargs();
    }

    // 设置交换机和死信队列的绑定关系
    @Bean
    public Binding bindingReleaseQueue(@Qualifier("orderEventExchange") Exchange exchange, @Qualifier("orderReleaseQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("order.release").noargs();
    }

然后监听死信队列

@Service
@RabbitListener(queues = "order.release.queue")
public class OrderReleaseQueueService {
    @RabbitHandler
    public void getReleaseOrderMessage(Message message,String data,Channel channel) throws IOException {
        System.out.println("收到延时消息时间:" + LocalTime.now());
        System.out.println(data);
        System.out.println(message);
        // 确认消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

往延时队列中发送一个测试消息,1分钟后可以收到死信队列的消息

// 测试延时消息
@Test
public void testDelayQueue() throws InterruptedException {
    System.out.println("发送延时消息的时间:" + LocalTime.now());
    rabbitTemplate.convertAndSend("order.event.exchange", "order.create",
                                  "test delay queue");
    TimeUnit.MINUTES.sleep(3);
}

2.插件实现方式

利用死信队列来实现延时队列有以下缺陷:

  • 如果采用在消息属性上设置TTL而非使用队列TTL的方式,消息可能并不会按时死亡,因为RabbitMQ只会检查第1个消息是否过期,如果过期则丢到死信队列,如果第1个消息的延时时长很长,而第2个消息的延时时长很短,第2个消息并不会优先得到执行
  • 需要创建1个普通队列加1个对应的死信队列,创建的队列过多

插件方式实现延时队列解决了以上问题。插件方式直接将延时的操作交给交换机时间,交换机接收到消息后自己延时一段时间再投递给指定的队列

要想使用插件,首先在官网下载对应版本的rabbitmq_delayed_message_exchange插件

下载后将其上传到服务器,然后拷贝到rabbitmq容器的/plugins目录中

docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins

然后进入rabbitmq容器,启用插件

docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

插件启用成功后重启rabbitmq,可以发现控制台创建交换机时多了一种延迟交换机的类型

我们按照如下架构在程序中创建交换机和队列

@Configuration
@Slf4j
public class RabbitConfig {
    // 添加json格式序列化器
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    // 创建延时交换机
    @Bean
    public Exchange delayExchange(){
        return ExchangeBuilder.directExchange("delay.exchange").delayed().durable(true).build();
    }

    // 创建队列
    @Bean
    public Queue delayQueue(){
        return QueueBuilder.durable("delay.queue").build();
    }

    // 创建延时交换机和队列的绑定关系
    @Bean
    public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();
    }
}

然后测试发送不同TTL的消息给延时交换机

@Test
public void testSendMessage() {
    for (int i = 0; i < 5; i++) {

        rabbitTemplate.convertAndSend("delay.exchange", "delay",
                                      new User("baobao" + i, 18, new Date()),
                                      message -> {
                                          message.getMessageProperties().setDelay(15000);
                                          return message;
                                      });
        System.out.println("发送完毕");
    }
}

这里有几个点需要注意:

  • 发送消息的时候用到了convertAndSend的第4个参数MessagePostProcessor,它是一个函数式接口,作用是在消息发送之前对消息进行一些设置,我们在这里对消息进行了延时设置

    @FunctionalInterface
    public interface MessagePostProcessor {
    
    	/**
    	 * Change (or replace) the message.
    	 * @param message the message.
    	 * @return the message.
    	 * @throws AmqpException an exception.
    	 */
    	Message postProcessMessage(Message message) throws AmqpException;
    
    	/**
    	 * Change (or replace) the message and/or change its correlation data.
    	 * @param message the message.
    	 * @param correlation the correlation data.
    	 * @return the message.
    	 * @since 1.6.7
    	 */
    	default Message postProcessMessage(Message message, Correlation correlation) {
    		return postProcessMessage(message);
    	}
    
    }
    
  • message.getMessageProperties().setDelay(15000)设置消息延时15秒,本质上是给消息的header增加x-delay字段

运行程序向延迟交换机发送延时消息,可以看出队列接收到消息的时间比发送时间延迟了约15秒,达到了延时效果

同时观察创建的延迟交换机,其本质是x-delayed-message类型的交换机,并且具有参数x-delayed-type=direct,表示是direct类型的延迟队列

总结:插件方式实现延时消息比死信队列方式更加简单,推荐使用

15

评论区