侧边栏壁纸
博主头像
包包博主等级

talk is cheap,show me the code

  • 累计撰写 25 篇文章
  • 累计创建 59 个标签
  • 累计收到 55 条评论

RabbitMQ实现消费端限流与非公平分配

包包
2021-07-10 / 0 评论 / 0 点赞 / 1,869 阅读 / 2,152 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-04-19,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

Qos机制概述

默认情况下,rabbitmq在分发消息给消费者时,处理方式是将所有消息按照消费者的数量平均分配,一次性发送给所有消费者,然后等待消费者的响应:

  • 如果消费者响应ack,代表消费成功,rabbitmq会从队列中删除该条消息。响应ack分为两种情况:
    • 自动响应:这是默认方式。当消费者处理消息的方法正常执行完成时自动回复ack给rabbitmq
    • 手动确认:需要在配置文件中开启。在代码中手动控制回复ack的时机
  • 如果消费者响应nack,则代表消费失败,rabbitmq不会删除该消息,并且会尝试重新发送消息(默认重发的次数无限制)。响应nack同样分为两种情况:
    • 自动响应:这是默认方式。当消费者处理消息的方法执行抛出异常时自动回复nack给rabbitmq
    • 手动确认:需要在配置文件中开启。在代码中手动控制回复nack的时机,并且可以控制回复nack的同时是否要求该消息重新入队,如果不要求重新入队,那么rabbitmq会直接删除该消息而不是尝试重发

上述rabbitmq分发消息的默认策略会存在2个问题:

  1. 如果rabbitmq中积压的消息非常多,那么一次性发送给消费者,可能导致消费者内存等资源被占满,无法正常处理消息
  2. 如果多个同时在线的消费者处理消息的能力差距很大,那么默认的平均分配消息的策略将会导致能力强的消费者很快处理完所有消息,能力差的消费者却仍然在处理,不利于消息的处理效率

在rabbitmq中可以用Qos机制解决以上问题。Qos机制的原理是当消费者有一定数量prefetchCount(可手动配置)的消息未被ack确认时,rabbitmq不会给消费者发送新的消息。这样就很好地解决了上述2个问题:

  1. 消费端限流:rabbitmq刚开始只会一次性发送prefetchCount数量的消息给消费者,而不是发送所有消息,此时未被确认的消息数量就是prefetchCount。消费者每处理完1条消息并回复ack时,rabbitmq在收到ack后,此时未被确认的消息数量为prefetchCount-1,这时rabbitmq才会再发送1条消息给消费者。如此直到mq发送完所有消息
  2. 非公平分配消息:能力强的消费者处理消息速度快,即回复ack的速度快,那么就会促使rabbitmq将剩余的消息更多地发给它,达到一种能者多劳的效果

整合SpringBoot测试

首先在yml中添加rabbitmq的配置,其中关键配置是prefetch,代表多少消息未被ack时,rabbitmq不会给消费者发送新的消息

spring:
  rabbitmq:
    host: 192.168.153.130
    port: 5672
    username: guest
    password: guest
    #virtual-host:
    listener:
      simple:
        prefetch: 2  # 代表多少消息未被ack时,rabbitmq不会给消费者发送新的消息

先解释一下rabbitmq控制台队列标签中,针对消息的几个状态的说明:

  • Ready:代表保存在rabbitmq本地,未发送给消费者的消息数量
  • Unack:代表已经发送给消费者,但是还未收到消费者ack或者nack响应的消息数量
  • Total:队列中消息总数量,Ready + Unack之和

然后我们先用下列代码给rabbitmq发送5条消息

@Test
public void testSendMessage() {
    for (int i = 0; i < 5; i++) {
        rabbitTemplate.convertAndSend("test.direct", "queue", new User("baobao" + i, 18, new Date()));
    }
}

此时控制台的初始状态如下

然后我们创建消费者

@Component
@Slf4j
public class UserConsumer {
    @RabbitListener(queues = "test.queue")
    public void handleUserMessage(User user) throws InterruptedException {
        log.info("开始处理消息");
        log.info(user.toString());
        // 休眠23秒,模拟消息处理的时间很长
        TimeUnit.SECONDS.sleep(23);
        log.info("消息处理结束");
    }
}

启动消费者后观察日志打印和控制台消息数量的变化:

  • 由于prefetch为2,所以rabbitmq刚开始会发送2条消息给消费者,消费者开始顺序处理消息

  • 当第1条消息处理正常处理完以后,自动回复了ack给rabbitmq,此时Unacked消息数量由2减少为1,小于了prefetch,rabbitmq就会再发送一条消息给消费者,发送后Ready消息数量减1,Unacked消息数量又增加为2,达到prefetch,此时rabbitmq又会停止继续发送消息给消费者

  • 当第2条消息也处理完以后,同理rabbitmq会继续发送1条消息给消费者

  • 当第3条消息也处理完之后,继续发送mq中剩余的最后一条消息给消费者

  • 最后当消费者处理完所有消息,mq的消息数量全部归0

0

评论区