实际开发中,我们经常需要保证消息由生产者发送到消费者接收并消费的整个过程中,消息不丢失,这就需要在RabbitMQ中实现消息的可靠投递。主要包含以下两部分:
发送端可靠性
:指生产者发送消息到RabbitMQ Broker这个过程中保证消息不丢失接收端可靠性
:指RabbitMQ Broker将消息发送给消费者并且消费者正确处理消息的这个过程中保证消息不丢失
发送端可靠性
发送端可靠性主要包含3个方面:
- 消息抵达交换机的确认机制
- 消息抵达队列的确认机制
- 消息发送的重试机制
1.消息抵达交换机确认
消息抵达交换机的确认机制作用是让生产者知晓消息是否正常抵达交换机,实现过程如下:
首先在yaml配置中开启发送端确认
然后在自定义配置类中给RabbitTemplate设置发送端确认回调函数
@Configuration
public class MyRabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
// 添加json格式序列化器
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@PostConstruct
public void initRabbitTemplate(){
// 设置发送端确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 当前消息的唯一关联数据(唯一id)
* @param ack rabbitmq服务端是否成功收到消息
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(correlationData);
System.out.println(ack);
System.out.println(cause);
}
});
}
}
测试正确发送消息
// 测试可靠投递
@Test
public void testReliableDelivery(){
// 创建消息关联的唯一id
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
// 发送消息,带上消息的唯一id
rabbitTemplate.convertAndSend("amq.direct", "baobao",
new Person("文亮", 18), correlationData);
}
再来测试错误的发送消息,随便传一个不存在的交换机
注意:
发送端确认只是确认消息有没有正确到达交换机,而不是队列
。所以如果目标交换机收到了,但是由于路由键传的不对,没有匹配到队列,那么也算发送成功的- 无论消息是否正确投递到目标交换机,发送端确认的回调方法总是会执行的。但是如果
由于rabbitmq服务器宕机,导致连接不上而发送消息失败的情况,是不会触发回调方法的
- 在SpringBoot2.2以上版本中,
publisher-comfirms
已经被废弃,需要用以下配置代替spring: rabbitmq: publisher-confirm-type: correlated
2.消息抵达队列确认
消息抵达队列确认机制有2种实现方式:
消息回退机制
:如果消息无法正确投递到队列,以回调形式通知生产者备份交换机
:如果消息无法正确投递到队列,则交由备份交换机处理,备份交换机会将消息投递到其绑定的备份队列
2.1 消息回退机制
首先在yaml中配置return机制
在自定义配置类中设置回调
// 设置消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有正确投递给指定的队列,就会触发这个失败回调
* @param message 投递失败的消息的详细信息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 这个消息发送给哪个交换机
* @param routingKey 发送这个消息使用的路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("message:" + message);
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
}
});
当消息正确发送或者指定的交换机不存在时,这个确认抵达队列的回调不会触发
。只有抵达了交换机,因为路由键匹配不到队列等原因导致发送失败才会触发
2.2 备份交换机
除了用回退机制来处理无法路由到队列的消息,还可以使用备份交换机来处理。备份交换机可以理解为RabbitMQ中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理。然后我们就可以监听备份交换机绑定的队列,来实现对这些未路由成功消息的处理
注意:备份交换机的类型需要是
fanout
我们构建下面的架构
其中的关键点是正常接收消息的交换机需要设置alternate-exchange
参数指定备份交换机
@Configuration
@Slf4j
public class RabbitConfig {
// 添加json格式序列化器
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
// 创建普通交换机
@Bean
public Exchange confirmExchange(){
return ExchangeBuilder.directExchange("confirm.exchange").durable(true).alternate("backup.exchange").build();
}
// 创建备份交换机
@Bean
public FanoutExchange backupExchange(){
return ExchangeBuilder.fanoutExchange("backup.exchange").durable(true).build();
}
// 创建普通队列
@Bean
public Queue confirmQueue(){
return QueueBuilder.durable("confirm.queue").build();
}
// 创建备份队列
@Bean
public Queue backupQueue(){
return QueueBuilder.durable("backup.queue").build();
}
// 创建普通交换机和普通队列的绑定关系
@Bean
public Binding confirmBinding(@Qualifier("confirmExchange") Exchange exchange, @Qualifier("confirmQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();
}
// 创建备份交换机和备份队列的绑定关系
@Bean
public Binding backupBinding(@Qualifier("backupExchange") FanoutExchange exchange, @Qualifier("backupQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange);
}
}
然后测试发送消息给普通交换机,故意把路由键写错
@Test
public void testSendMessage() {
for (int i = 0; i < 5; i++) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("confirm.exchange", "confirm111",
new User("baobao" + i, 18, new Date()), correlationData);
}
}
此时消息就会全部由备份交换机路由到备份队列中
观察普通交换机的详细信息,发现其绑定备份交换机的参数就是alternate-exchange
注意:在同时启用消息回退机制和备份交换机的情况下,
备份交换机的优先级高于消息回退机制
。也就是说,无法路由到队列的消息将直接交给备份交换机,而不会回退给生产者的回调方法
3.发送端重试机制
一般情况下,只要我们编写代码时,交换机名称和路由键名称没有写错,那么消息就能正确投递到队列,也不会触发消息投递失败的回调。所以我们更应该关注的是rabbitmq所在物理服务器宕机的情况,这种情况即便交换机名称和路由键名称都正确也无法投递成功,这个时候就需要发送端重试机制。默认情况下消息只会发送1次,超时后如果没有发送成功就不会再重发,我们需要开启发送端重试,其相关的配置如下:
spring:
rabbitmq:
template:
retry: # 发送端重试机制配置
enabled: true # 是否开启发送端重试,默认关闭
max-attempts: 3 # 消息发送失败时最大重试次数,默认为3
initial-interval: 1000ms # 发送失败时重试的时间间隔,默认1s
multiplier: 1 # 发送失败时重试时间间隔乘数因子,默认为1
max-interval: 10000ms # 发送失败时重试时间间隔最大值,默认10s
这里有关时间间隔的几个参数解释如下,假设几个参数设置如下:
initial-interval
:3000msmultiplier
:3max-interval
:10000ms
那么第一次重试间隔是3秒,如果仍然发送失败,那么第二次重试与第一次重试的时间间隔是3 * 3 = 9秒。以此类推,第三次重试与第二次重试的时间间隔是9 * 3 = 27秒,但又因为max-interval
为10秒,27秒超过了10秒,所以第三次重试与第二次重试的时间间隔会取10秒
如果想要真正保证发送成功,可以用以下思路来实现:
- 在发送消息前,先把消息持久化到MySQL
- 发送消息到交换机成功后,在确认回调中删除之前持久化的消息
- 消息由交换机路由到队列失败时,在返回回调中重新将删除的消息持久化到MySQL;或者利用备份交换机,在备份队列消费者中重新持久化消息到MySQL
- MySQL中存在的消息都是投递失败的消息。可以创建一个定时任务,周期性查询MySQL中保存的发送失败的消息,进行重发
接收端可靠性
1.消费端确认机制基本配置
消费端确认机制的目的是保证每个消息在接收到并正确处理完成后,RabbitMQ才可以从queue中删除消息
然而默认的消费端是自动确认的。也就是说当消息消费者应用启动后,MQ会将积压的所有消息一次性发送给消费者,而消费者接收到消息后会调用监听方法依次处理每个消息,只有前1个消息处理完以后才会处理下一个消息
。监听方法的执行有如下情况:
- 只要消息监听方法正常执行完成,就会自动回复
ack
给RabbitMQ,RabbitMQ会认为消息消费成功,自动将消息从queue中删除 - 如果消息监听方法执行时抛出异常,此时认为消息没有被正确处理,自动回复
nack
给RabbitMQ,RabbitMQ接收到nack
后会认为消息没有被正常消费,然后尝试将消息重新发送给其他消费者,如果此时消费执行过程中还出异常,那么又将回复nack
,RabbitMQ继续尝试重新发送,默认情况下重试发送的次数是无限制的,也就是说如果一直消费不成功,RabbitMQ将一直重发消息。这就导致一个问题,比如我们在扣除商品库存以后出了异常,那么RabbitMQ会一只重复发送消息这个unack
的消息,可能导致库存被重复扣减
解决方案是设置手动消息确认
。首先在yaml中配置
只要设置了手动确认模式,那么只要我们没有手动确认过,即使监听方法处理完毕消息了,由于MQ没有收到该消息的ack
,所以也不会将消息从queue中删除。即使在处理过程中消费端宕机,消息也不会丢失,而是由unack重新变成ready,消费端恢复正常重新连接进来后,会重新将消息发给它处理
如何手动对消息进行ack确认?需要在消息监听方法中利用channel
对象
@RabbitListener(queues = {"baobao"})
public class PersonService {
/**
* 监听队列baobao中的消息,有消息会自动取出并回调该方法
* @param message 原生消息的详细信息,包括消息头+消息体
* @param person 从消息体中解码出的javabean
* @param channel 当前传输的数据通道
*/
@RabbitHandler
public void listen(Message message, Person person, Channel channel){
// 消息投递的标签号,在同一个channel内按顺序递增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag:" + deliveryTag + ">>>>>>" + person);
try {
// 手动决定是否确认消息
if (deliveryTag % 2 == 0){
// 确认消息
// 参数1:消息标签号 参数2:是否批量确认。一般为false,处理1个确认1个;否则会一次性确认掉当前消息标签号之前的全部消息,可能造成丢失
channel.basicAck(deliveryTag, false);
System.out.println("确认了消息:" + deliveryTag);
}else {
// 拒绝消息
// 参数1:消息标签号 参数2:是否批量拒绝,一般为false 参数3:拒绝后是否将消息重新入队
channel.basicNack(deliveryTag, false, false);
System.out.println("拒绝了消息:" + deliveryTag);
}
} catch (IOException e) {
e.printStackTrace();
}
}
我们一次性发送5个消息来测试
// 测试可靠投递
@Test
public void testReliableDelivery(){
for (int i = 0;i < 5;i++) {
// 创建消息关联的唯一id
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
// 发送消息,带上消息的唯一id
rabbitTemplate.convertAndSend("amq.direct", "baobao",
new Person("文亮" + i, 18), correlationData);
}
}
debug时模拟消费端宕机,在手动确认模式下,消息不会自动确认并删除,而是重新变成ready状态
正常执行完方法
总结:
默认情况下,rabbitmq在分发消息给消费者时,处理方式是将所有消息按照消费者的数量平均分配,一次性发送给所有消费者,然后等待消费者的响应:
- 如果消费者响应
ack
,代表消费成功,rabbitmq会从队列中删除该条消息。响应ack
分为两种情况:
- 自动响应:这是默认方式。当消费者处理消息的方法正常执行完成时自动回复
ack
给rabbitmq- 手动确认:需要在配置文件中开启。在代码中手动控制回复
ack
的时机- 如果消费者响应
nack
,则代表消费失败,rabbitmq不会删除该消息,并且会尝试重新发送消息(默认重发的次数无限制)。响应nack
同样分为两种情况:
- 自动响应:这是默认方式。当消费者处理消息的方法执行抛出异常时自动回复
nack
给rabbitmq- 手动确认:需要在配置文件中开启。在代码中手动控制回复
nack
的时机,并且可以控制回复nack
的同时是否要求该消息重新入队,如果不要求重新入队,那么rabbitmq会直接删除该消息而不是尝试重发
2.限制重试次数
在消费端自动确认时出异常返回nack
,或者手动确认时返回nack
并将消息重新入队时,rabbitmq都会尝试将消息重新发送给消费者,如果重发后仍然消费失败返回nack
,那么将继续重新发送,如此循环。默认的重发次数是无限的
,但在实际开发中,一旦消费出异常,很可能是由于我们代码的健壮性不足导致的异常,而代码又不会自动修复,rabbitmq收到消费者的nack
后,会将该消息放到消息队列的队首,并立即尝试将队首的消息再次发送给消费端,而此时消费端收到消息处理时大概率还是会抛出跟之前一样的异常,然后再次回复nack
,如此无限循环会占满CPU。所以我们需要限制消费失败时的重试次数
与消费失败重试次数相关的配置如下:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 是否开启限制消费失败重试次数,默认关闭
initial-interval: 1000ms # 重试间隔,默认1s
max-attempts: 3 # 最大重试次数,默认3次
multiplier: 1 # 重试时间间隔乘数因子,默认为1
max-interval: 10000ms # 重试时间间隔最大值,默认10s
这些参数的含义与发送端重试机制的配置相同,这里不再赘述
下面我们测试一下配置后的效果。首先发送5条消息给mq,初始状态如下:
然后开启消费者处理消息,消费者代码如下:
@Component
@Slf4j
public class UserConsumer {
@RabbitListener(queues = "test.queue")
public void handleUserMessage(User user) throws InterruptedException {
log.info("开始处理消息");
log.info(user.toString());
// 休眠23秒,模拟处理时间很长
TimeUnit.SECONDS.sleep(23);
// 模拟异常,返回nack给mq
int i = 10 / 0;
log.info("消息处理结束");
}
}
刚启动消费者时,rabbitmq默认会一次性把5个消息都发送给消费者:
当消费第一个消息时,处理23秒以后会触发异常,返回nack
给mq,此时mq会在收到nack
1秒(重试间隔)后重新将消息发送给消费者,消费者继续消费失败返回nack
,如此循环。总结一下就是每24秒会重试一次,直到达到最大重试次数
并且注意到,当达到最大重试次数后,会抛出org.springframework.amqp.rabbit.support.ListenerExecutionFailedException
异常,并打印日志Retries exhausted for message ...
。此时该消息彻底消费失败,不再重试,rabbitmq会将该消息从队列中删除
后续的4个消息的流程也与第一个消息一样,直到所有消息全部都达到最大重试次数消费失败,并从队列中移除
实际开发中,什么情况需要消费端重试,什么情况不需要消费端重试呢?主要有以下情况:
如果是消费者收到消息后,调用第三方接口去完成业务,那么此时需要消费端重试。因为由于网络问题导致的第三方接口调用失败,多次重试是有可能成功的。此时只要合理配置好限制重试次数相关参数即可
如果是消费者消息处理的代码健壮性问题导致抛出异常,此时不需要重试。因为代码不会自动修复,就算重试多次也还是会消费失败,需要修复代码发布新版本解决。此时的具体处理分为2种情况:
- 如果是自动消息确认,那么需要抓取异常后,将该消息保存到数据库或者发送到死信队列中,后期人工进行补偿。由于抓取了异常,并没有抛出,所以会自动回复
ack
给rabbitmq,mq会认为消息消费成功并从队列中将其删除@RabbitHandler public void listen(Channel channel){ try{ // 处理消息逻辑... }catch(Exception e){ // 出异常,将该消息保存到数据库或者发送到死信队列中,后期人工进行补偿 } }
- 如果开启了手动确认消息,在抓取异常后,回复
nack
的同时不再将消息重新入队,同时将该消息保存到数据库或者发送到死信队列中,后期人工进行补偿@RabbitHandler public void listen(Channel channel){ try{ // 处理消息逻辑... // 正常处理完消息,回复成功ack channel.basicAck() }catch(Exception e){ // 出异常,拒绝消息nack,并且不让消息重新入队 channel.basicNack/Reject(requeue:false) // 将该消息保存到数据库或者发送到死信队列中,后期人工进行补偿 } }
3.保证消息幂等性
由于有消费端重试机制的存在,我们在消费端处理消息时,很有可能因为处理重发消息导致重复消费。比如我们处理消息时具体要进行的业务是往数据库插入一条订单数据或者更新订单数据,如果重复消费将导致插入多条相同的订单数据,无法实现幂等性
我们要明确在实际环境中,
由于网络抖动的存在,消息重复是无法避免的
,比如下列场景:
- 消息生产者发送消息到MQ时,MQ已经接收到消息,但是返回确认给生产者时由于网络原因丢失,此时生产者会发送重复的消息给MQ
- MQ将消息发送给消费者时,消费者已经接收到消息并且正确消费,返回ack给MQ时由于网络原因丢失,导致MQ认为消息没有正常消费,重复发送消息给消费者
解决方案是:根据业务特性,选取业务中唯一的某个属性,比如订单号作为区分消息是否重复的属性。在进行插入订单之前,先从数据库查询一下该订单号的数据是否存在,如果存在说明是重复消费,如果不存在则插入。伪代码如下:
@RabbitHandler
public void listen(OrderDTO orderDTO, Channel channel){
try{
// 查询订单是否已存在
OrderPO orderPO = orderMapper.getOrder(orderDTO.getOrderSn());
// 已存在,直接返回
if(orderPO != null) {
log.info("订单已存在,不能重复消费");
return;
}
// 插入订单
orderMapper.insertOrder(orderDTO);
}catch(Exception e){
// 将该消息保存到数据库或者发送到死信队列中,后期人工进行补偿
}
}
但是以上方式还不能100%保证幂等性。因为在订单还没插入时,可能重试消息就来了。重试消息与正常消息会分别处于不同线程同时处理消息,此时仍然可能将订单插入多次。最保险的做法是利用数据库的特性:
- 针对插入数据的情况,给数据库表中订单号字段设置唯一键,这样就从数据库层面避免了插入相同订单号的数据。如果数据库表没有一个业务字段可以作为唯一键,那么可以额外增加一张消费消息表,使用消息的id作为唯一索引,每次处理完消息后往表中插入已消费成功的消息(注意要将业务处理和插入已消费成功的消息的操作放到同一个事务中),然后再每次处理消息前,从表中查询该消息是否已经存在,已存在说明已经被消费过,直接返回避免重复消费
- 针对非幂等的更新数据的情况,可以利用加锁的方式保证幂等性
评论区