RabbitMQ
安装
docker run \
-e RABBITMQ_DEFAULT_USER=用户名 \
-e RABBITMQ_DEFAULT_PASS=密码 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network 网络名\
-d \
rabbitmq:3.8-management
15672
:控制台端口
5672
:收发消息的端口
概念:
- publisher:生产者,也就是发送消息的一方
- consumer:消费者,也就是消费消息的一方
- queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
- exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
- virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
JAVA客户端-SpringAMQP
-
引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency> -
配置
application.yaml
文件spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码 -
然后在类中注入容器
发送消息:
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}接收消息:
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
Work Queue
任务模型,让多个消费者绑定到一个队列,共同消费队列中的消息。同一条消息只能被一个消费者处理。
但是默认情况下,每个消费者分配到的消息数都是平均的,不会因消费者的处理速度差异而改变,这会导致效率降低
在spring中有一个简单的配置,可以解决这个问题:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
交换机
接受发送者发送的消息,并将消息路由到与其绑定的队列。
Fanout
交换机 - 广播
会将接收到的消息路由到每一个跟其绑定的队列,也叫广播模式
- 1) 可以有多个队列
- 2) 每个队列都要绑定到
Exchange
(交换机) - 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
发送消息:
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
// 三个值,交换机名,交换机密码,信息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
接收消息:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
Direct
交换机 - 定向
会将接收到的消息根据规则路由到指定的队列,称为定向路由
在Direct
模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向
Exchange
发送消息时,也必须指定消息的RoutingKey
。 Exchange
不再把消息交给每一个绑定的队列,而是根据消息的Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
发送消息
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "hmall.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
接收消息
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
Topic
交换机 - 通配符订阅
基于RoutingKey
做消息路由,但是routingKey
通常是多个单词的组合,并且以.
分割
队列与交换机指定BindingKey
时可以使用通配符:
#
:代指0个或多个单词*
:代指一个单词
在交换机绑定队列时,指定对应的BindingKey
例子:
topic.queue1
:绑定的是china.#
,凡是以china.
开头的routing key
都会被匹配到,包括:china.news
china.weather
发送消息
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "hmall.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
接收消息
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
声明队列和交换机
SpringAMQP
提供了类来声明队列、交换机及其绑定关系:
Queue
:用于声明队列,可以用工厂类QueueBuilder
构建Exchange
:用于声明交换机,可以用工厂类ExchangeBuilder
构建Binding
:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder
构建