Skip to main content

RabbitMQ

RabbitMQ官网

安装

docker run \
-e RABBITMQ_DEFAULT_USER=用户名 \
-e RABBITMQ_DEFAULT_PASS=密码 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network 网络名\
-d \
rabbitmq:3.8-management

15672:控制台端口 5672:收发消息的端口

概念:

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

JAVA客户端-SpringAMQP

  1. 引入依赖

        <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  2. 配置application.yaml文件

    spring:
    rabbitmq:
    host: 192.168.150.101 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码
  3. 然后在类中注入容器

    发送消息:

    @SpringBootTest
    public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, spring amqp!";
    // 发送消息
    rabbitTemplate.convertAndSend(queueName, message);
    }
    }

    接收消息:

    @Component
    public class SpringRabbitListener {
    // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
    }

Work Queue

任务模型,让多个消费者绑定到一个队列,共同消费队列中的消息。同一条消息只能被一个消费者处理。

但是默认情况下,每个消费者分配到的消息数都是平均的,不会因消费者的处理速度差异而改变,这会导致效率降低

在spring中有一个简单的配置,可以解决这个问题:

spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

交换机

接受发送者发送的消息,并将消息路由到与其绑定的队列。

Fanout交换机 - 广播

会将接收到的消息路由到每一个跟其绑定的队列,也叫广播模式

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息

发送消息:

@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
// 三个值,交换机名,交换机密码,信息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}

接收消息:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

Direct交换机 - 定向

会将接收到的消息根据规则路由到指定的队列,称为定向路由

Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

发送消息

@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "hmall.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

接收消息

@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

Topic交换机 - 通配符订阅

基于RoutingKey做消息路由,但是routingKey通常是多个单词的组合,并且以.分割

队列与交换机指定BindingKey时可以使用通配符:

  • #:代指0个或多个单词
  • *:代指一个单词

在交换机绑定队列时,指定对应的BindingKey

例子:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather

发送消息

/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "hmall.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

接收消息

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

声明队列和交换机

SpringAMQP提供了类来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

基于Bean声明

两种创建方式:

示例,声明Fanout交换机

@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange() {
// return new FanoutExchange("hmall.fanout");
return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
}

@Bean
public Queue fanoutQueue1() {
// return new Queue("fanout.queue1");
return QueueBuilder.durable("fanout.queue1").build();
}

@Bean
public Binding fanoutQueue1Binding(Queue fanoutQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
}
}

基于注解声明

@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1", durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red, blue"}
))
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
}

消息转换器

默认情况下Spring采用的序列化方式是JDK序列化。JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

配置JSON转换器

  1. 在接收和发送消息的服务中加入依赖:

    <dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
    </dependency>

注意:如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

  1. 配置消息转换器,在接收和发送消息的服务的启动类中加入Bean

    @Bean
    public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
    }

消费者接收Object

publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:

@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}