侧边栏壁纸
博主头像
包包博主等级

talk is cheap,show me the code

  • 累计撰写 25 篇文章
  • 累计创建 59 个标签
  • 累计收到 55 条评论

SpringBoot整合RabbitMQ

包包
2021-07-06 / 0 评论 / 3 点赞 / 1,805 阅读 / 6,219 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-04-19,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

基本整合

引入maven依赖

<!--amqp依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

全局配置文件中配置RabbitMQ连接信息

spring:
  rabbitmq:
    host: 192.168.0.117 #rabbitmq服务器地址
    port: 5672         #端口号
    username: guest    #用户名
    password: guest    #密码
    #virtual-host:     #虚拟主机

RabbitMQ的自动配置类RabbitAutoConfiguration自动配置了连接工厂ConnectionFactoryConnectionFactory从配置RabbitProperties中获取连接信息完成连接到RabbitMQ服务器。程序中可以注入RabbitTemplate给RabbitMQ发送和接收消息

测试发送数据

@RunWith(SpringRunner.class)
@SpringBootTest
public class MQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendDirect(){
        // Message需要自己构造一个;定义消息体内容和消息头
        // rabbitTemplate.send(exchage,routeKey,message);
        HashMap<String, Object> map = new HashMap<>();
        map.put("name", "baobao");
        map.put("age", 18);
        map.put("list", Arrays.asList(1,2,3,4,5));
        // 将map对象序列化以后,以baobao为路由键发送到exchange.direct,exchange会根据路由键将消息路由到具体的队列
        rabbitTemplate.convertAndSend("exchange.direct", "baobao", map);
    }
}

可以发现默认发给RabbitMQ的数据以jdk的方式进行序列化,并且消息头中的content_type保存了消息体的类型

测试接收数据

@Test
public void testReceiveDirect(){
    // 从指定队列中接收数据
    Object data = rabbitTemplate.receiveAndConvert("baobao");
    System.out.println(data.getClass());
    System.out.println(data);
}

自定义json序列化

在SpringBoot中,默认可以发送给队列的数据有4种:

  • 字符串
  • 字节数组
  • 实现了Serializable接口的对象
  • Message对象实例,是标准的消息对象,包含消息头和消息体

默认发送的对象实例,是以JDK方式进行序列化的,我们也可以定制json的序列化

在自定义配置文件中添加一个自己的MessageConverter,类型是Jackson2JsonMessageConverter

@Configuration
public class MyRabbitConfig {
    // 添加json格式序列化器
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

测试json序列化

// 测试广播和json序列化
@Test
public void testJsonSerilize(){
    // 广播可以不需要传路由键
    rabbitTemplate.convertAndSend("exchange.fanout", "", new Person("文亮", 18));
}

可以发现消息头中的content_type属性保存了消息体的类型为application/json__TypeId__属性保存了javabean的全类名,用于反序列化

// 测试接收json数据反序列化
@Test
public void testReceiveJson(){
    Object o = rabbitTemplate.receiveAndConvert("baobao.news");
    System.out.println(o.getClass());
    System.out.println(o);
}

监听消息

1.@RabbitListener

先在主程序上加@EnableRabbit,开启基于注解的RabbitMQ模式

@SpringBootApplication
@EnableRabbit
public class MainApplicationMQ {
    public static void main(String[] args) {
        SpringApplication.run(MainApplicationMQ.class, args);
    }
}

编写1个Service,声明一个监听方法,方法上标注@RabbitListener,传入需要监听的队列名。监听方法可以接收的参数如下(无需保证参数顺序):

  • Message对象:原生消息的详细信息,包括消息头+消息体
  • 自定义实体类对象:用消息体反序列化后得到的Javabean
  • Channel对象:当前传输的数据通道
@Service
public class PersonService {
    /**
     * 监听队列baobao中的消息,有消息会自动取出并回调该方法
     * @param message 原生消息的详细信息,包括消息头+消息体
     * @param person  从消息体中解码出的javabean
     * @param channel 当前传输的数据通道
     */
    @RabbitListener(queues = "baobao")
    public void listen(Message message, Person person, Channel channel){
        System.out.println(message);
        System.out.println(person);
        System.out.println(channel);
    }
}

启动该消费者,从队列中消费1条消息

控制台打印结果

// message
(Body:'{"name":"文亮","age":18}' MessageProperties [headers={__TypeId__=com.baobao.springbootdemo.mq.bean.Person}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=amq.fanout, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-81sPttG477H4uOtS_e6tHA, consumerQueue=baobao])
// person
Person{name='文亮', age=18}
// channel
Cached Rabbit Channel: AMQChannel(amqp://guest@192.168.56.55:5672/,1), conn: Proxy@21a02097 Shared Rabbit Connection: SimpleConnection@2496b460 [delegate=amqp://guest@192.168.56.55:5672/, localPort= 6257]

注意:

  • 如果只有一个消费客户端,那么rabbitmq默认会将队列中的所有一次性发到消费者,但是消费者接收到消息后只能1个1个处理,只有处理完1个消息(即监听方法运行完毕,哪怕执行时间很长),才能继续处理下一个消息
  • 如果启动多个客户端,都对应同一个监听消息的方法,那么对于同一个消息,只有1个客户端可以接收到
  • 监听方法中的消息实例对象要与发送端对应,比如发送端发送字节数组那么接收端也要声明为字节数组参数;发送端发送Person对象那么接收端也要声明为Person类型参数

2.@RabbitHandler

我们还可以采用@RabbitListener配合@RabbitHandler的方式完成对消息的监听:

  • @RabbitListener:标注在类上,指定监听哪些队列
  • @RabbitHandler:标注在每个接收并处理不同消息的重载方法上,区分处理不同类型的消息
@Service
@RabbitListener(queues = {"baobao","baobao.news","baobao.map"})
public class PersonService {
    @RabbitHandler
    public void handlePersonMsg(Person person){
        System.out.println(person);
    }

    @RabbitHandler
    public void handleUserMsg(User user){
        System.out.println(user);
    }
}

创建交换机、队列、绑定关系

1.利用AmqpAdmin

给程序中注入AmqpAdmin可以实现对RabbitMQ的管理,它的declareXXX方法可以创建exchange、queue、binding等

public class MQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Test
    public void testAmqpAdmin(){
        // 创建一个Direct类型的exchange
        amqpAdmin.declareExchange(new DirectExchange("exchange.amqpadmin"));
        // 创建一个queue
        amqpAdmin.declareQueue(new Queue("queue.amqpadmin"));
        // 添加exchange和queue之间的绑定
        amqpAdmin.declareBinding(new Binding("queue.amqpadmin", Binding.DestinationType.QUEUE,
                "exchange.amqpadmin","queue.amqpadmin",null));
    }

使用amqpAdmin创建交换机、队列、绑定关系时,会先检查rabbitmq有是否已经存在对应的交换机、队列、绑定关系,如果不存在才创建,已存在就什么都不做

2.直接在容器中放置对象

另外还有一种方法可以创建Queue、exchange和绑定关系,直接在容器中放置即可。当执行任何操作rabbitmq的方法时,如果rabbitmq发现还没有队列、交换机或绑定关系,就会自动创建

@Configuration
public class MyRabbitConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 创建一个交换机:参数1 交换机名称,参数2 是否持久化,参数3 是否自动删除
    @Bean
    public Exchange exchange(){
        return new DirectExchange("test.direct", true, false);
    }

    // 创建一个队列:参数1 队列名称,参数2 是否持久化,参数3 是否排他,参数4 是否自动删除
    @Bean
    public Queue queue(){
        return new Queue("test.queue", true, false, false);
    }

    // 创建交换机和队列的绑定关系:参数1 绑定的目标,参数2 绑定的目标类型,参数3 交换机名称,参数4 路由键,参数5 绑定参数
    @Bean
    public Binding binding(){
        return new Binding("test.queue", Binding.DestinationType.QUEUE,"test.direct",
                "queue", null);
    }

注意:

  • 直接在容器中放置Bean相比于直接利用AmqpAdmin来创建交换机、队列、绑定关系的区别是,容器中放置Bean的方式是懒加载的,也就是说并不会在容器启动时就创建,而是等我们的应用第一次连接rabbitmq进行操作的时候才创建交换机、队列、绑定关系。其底层原理是:当连接第一次创建时,会回调连接创建的监听方法,从容器中查找所有ExchangeQueueBinding对象,然后利用AmqpAdmin将它们进行创建。也就是说SpringBoot应用刚启动时是不会创建这些对象的,只有程序首次连接rabbitmq获取connection时才会创建
  • 只有rabbitmq中不存在对应的交换机、队列、绑定关系时才会创建,已存在就什么都不做

另外也可以利用Builder模式链式创建

// 利用Builder模式链式创建
@Bean
public Exchange exchange2(){
    // 默认就是非自动删除
    return ExchangeBuilder.directExchange("direct.exchange").durable(true).build();
}

@Bean 
public Queue queue2(){
    // 默认就是非自动删除,不排他
    return QueueBuilder.durable("queue").build();
}

@Bean 
public Binding binding2(@Qualifier("queue2") Queue queue,@Qualifier("exchange2") Exchange exchange){
    return BindingBuilder.bind(queue).to(exchange).with("routingkey").noargs();
}

实际开发推荐使用Builder模式链式创建,链式方法的语义更加清晰

3

评论区