从零开始:Java原生连接RabbitMQ完整流程(个人学习笔记001)
@TOC)
该文章仅用于个人复习与记录,如有错误,烦请指出,非常感谢
RabbitMQ 是一款开源的消息中间件(也称为消息队列),其核心作用是让不同的系统、服务或组件之间能够异步地传递数据。使用该中间件可以很好地实现业务异步处理、削峰填谷、服务解耦以及数据同步。
本文主要讲解如何使用原生 Java 集成 RabbitMQ。由于主播目前仍处于学习阶段,本文不涉及深层原理,仅说明基本的使用方法。
在进行学习和测试之前,需要先通过 Maven 导入 RabbitMQ 的依赖,并使用 Docker 拉取 RabbitMQ 的镜像,以确保 Java 程序能够与 RabbitMQ 正常连接。
Maven 依赖
xml
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.30.0</version> </dependency>Docker 命令
bash
# 拉取镜像 docker pull rabbitmq:management # 创建并运行容器 docker run -d --name rabbitmq -p 5672:5672 rabbitmq:management接下来,就可以愉快地学习 RabbitMQ 了。
一、基本消息队列(点对点模式)
RabbitMQ 中的组件并不多,在本模式中只讲解以下几种:生产者、消费者、队列。
- 生产者:消息的源头,负责发送消息。
- 消费者:消息的终点,负责接收并消费(处理)消息。
- 队列:消息实际存储的容器,等待消费者将其取走。
下面先给出一段最简单的生产者和消费者代码,然后分段解释其作用。
生产者代码
java
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); // 指定队列类型 Map<String, Object> map = Map.of("x-queue-type", "quorum"); channel.queueDeclare(QUEUE_NAME, true, false, false, map); String message = "Hello,World!"; // 发送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送消息成功:'" + message + "'"); } } }代码分段解释
java
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");ConnectionFactory是用于创建 Java 程序与 RabbitMQ 之间连接的工厂类。setHost()方法用于指定 RabbitMQ 服务器的地址,使 Java 程序能够连接到 RabbitMQ。此外还可以设置用户名、密码等其他属性。
java
try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); // 指定队列类型 Map<String, Object> map = Map.of("x-queue-type", "quorum"); channel.queueDeclare(QUEUE_NAME, true, false, false, map); String message = "Hello,World!"; // 发送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送消息成功:'" + message + "'"); }- 这里使用的是
try-with-resources语法,而非传统的try-catch。当退出try代码块时(无论以何种方式),Java 程序与 RabbitMQ 的连接会自动关闭。 Connection代表 Java 程序与 RabbitMQ 之间的连接,由factory直接创建。Channel是在Connection中开辟的通道,一个连接中可以创建多个通道,每个通道中也可以创建多个队列。Map.of()是一个静态工厂方法,用于创建包含指定键值对的Map。这里用于指定队列的类型。队列类型分为classic和quorum,RabbitMQ 官方推荐使用quorum。如果创建队列时不指定类型,则默认创建classic队列。queueDeclare()方法用于在通道中声明一个队列,其参数含义如下:
| 参数顺序 | 含义 | 说明 |
|---|---|---|
| 1 | 队列名称 | 队列的唯一标识 |
| 2 | 是否持久化 | 开启后,RabbitMQ 重启后队列不会消失 |
| 3 | 是否独占 | 开启后,队列只能在当前连接中使用 |
| 4 | 是否自动删除 | 开启后,当队列中的最后一个消费者取消订阅时,队列会被自动删除 |
| 5 | 扩展参数 | 用于配置队列的高级特性 |
queueDeclare()的幂等性:
- 如果队列已存在且参数一致,则什么也不做。
- 如果队列已存在但参数不一致,则会抛出异常。
channel.basicPublish()方法用于将消息发布到指定的队列中,其参数含义如下:
| 参数顺序 | 含义 | 说明 |
|---|---|---|
| 1 | 交换机名称 | 空字符串代表使用默认的无名交换机 |
| 2 | 队列名称或路由键 | 消息要传递的目的地,此处传入队列名称 |
| 3 | 消息属性 | 例如持久化、优先级、过期时间等 |
| 4 | 消息体 | 消息内容,通常以字节数组形式传递 |
生产者端的流程总结:
创建连接工厂 → 建立连接 → 创建通道 → 声明队列 → 发布消息
消费者代码
java
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 指定队列类型 Map<String, Object> map = Map.of("x-queue-type", "quorum"); channel.queueDeclare(QUEUE_NAME, true, false, false, map); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("[x] Received '" + message + "'"); }; boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {}); } }消费者代码解释
第 14 行之前的代码与生产者部分相同。
DeliverCallback是一个回调函数,当消费者收到消息时会自动调用该函数,执行 Lambda 表达式中的逻辑:consumerTag:RabbitMQ 为当前消费者生成的唯一标识符。delivery:封装了消息所有信息的对象,包含消息体,但它本身不是消息体。getBody():从delivery对象中取出消息体(字节数组)。
basicConsume()方法用于消费队列中的消息,其参数含义如下:
| 参数顺序 | 含义 | 说明 |
|---|---|---|
| 1 | 队列名称 | 从哪个队列消费消息 |
| 2 | 是否自动确认 | autoAck为true时,RabbitMQ 在发送消息后立即将其标记为已确认,无论消费者是否处理成功。生产环境不建议开启,否则消息可能丢失 |
| 3 | 消息回调 | 收到消息后执行的回调函数 |
| 4 | 取消回调 | 消费者被取消时执行的回调 |
消费者端的流程总结:
等待消息 → 从队列中取出消息 → 自动确认(如果启用)→ 消费消息
至此,最简单的点对点(生产者-队列-消费者)模式就介绍完了。
二、工作消息队列(点对多模式)
问题:在生产环境中,点对点的单生产者-单消费者模式性能过低,需要改为单生产者对应多个消费者以提升性能。
解决方案:代码与点对点模式相同,只需同时启动多个消费者即可。RabbitMQ 会以轮询的方式平均向各个消费者发送消息,从而提高消息的消费效率。
消息持久化
要实现消息持久化,需要同时开启队列持久化和消息持久化。由于涉及磁盘 I/O 操作,持久化模式的性能相对较低。
队列持久化:
java
boolean durable = true; Map<String, Object> args = Map.of("x-queue-type", "quorum"); channel.queueDeclare("hello", durable, false, false, args);消息持久化:
java
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, // 持久化标志 message.getBytes());公平分发
问题:轮询分发不够公平,正在忙碌的消费者仍然会收到新消息。
解决方案:使用basicQos方法,并将prefetchCount设置为 1。
java
int prefetchCount = 1; channel.basicQos(prefetchCount);这行代码的作用是:告知 RabbitMQ 每次只向一个消费者发送不超过一条消息。在消费者处理并确认前一条消息之前,RabbitMQ 不会向其发送新消息,而是将新消息派发给下一个空闲的消费者。
简单理解:“我一次只处理 1 条消息,没处理完别给我发新的。”
公平分发模式相比轮询模式效率更高。
三、交换机
在实际生产环境中,简单的点对点和点对多模式无法满足多样化的业务需求。交换机可以根据不同的类型或匹配规则,将消息发送到对应的队列,再由队列转发给消费者,从而适应复杂的业务逻辑。
交换机有以下几种可用类型:direct、topic、headers、fanout,以及默认的无名交换机。
无名交换机(默认交换机)
java
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());在前面的示例中,我们并没有显式使用交换机,但仍然能够向队列发送消息,这正是无名交换机发力了。basicPublish()方法的第一个参数(空字符串)就代表无名交换机。当使用无名交换机时,消息会被转发到与第二个参数名称匹配的队列。
fanout 交换机
首先创建一个fanout类型的交换机,命名为ex。
由于生产者不关心消息具体发往哪个队列,因此在生产者代码中只需要声明交换机即可,无需声明队列。
java
channel.exchangeDeclare("ex", "fanout");fanout交换机的功能非常简单:它会将接收到的消息广播给所有与之绑定的队列。这种模式常用于搭建日志系统。
有了交换机之后,如何让交换机把消息转发到指定的队列呢?这就需要用到绑定(Binding)。
java
channel.queueBind(queueName, "ex", "");通过绑定,ex交换机会将消息转发到queueName队列中。第三个参数(空字符串)是路由键,将在其他类型的交换机中详细讲解。
使用交换机发送消息
java
channel.basicPublish("ex", "", null, message.getBytes());参数说明:
- 交换机名称:
ex - 路由规则:空字符串(
fanout类型忽略路由键) - 消息属性:
null - 消息体:消息内容
可以看到,与仅使用队列时相比,发送消息的代码有以下变化:
- 参数 1 从无名交换机(空字符串)变成了我们自己声明的
fanout交换机。 - 参数 2 从队列名变成了路由规则。
使用交换机接收并消费消息
java
// 创建交换机(fanout类型) channel.exchangeDeclare("ex", "fanout"); // 让 RabbitMQ 自动生成一个随机的队列,并获取这个队列的名字 String queueName = channel.queueDeclare().getQueue(); // 绑定交换机与队列(fanout交换机,routingKey为空字符串) channel.queueBind(queueName, "ex", ""); int prefetchCount = 1; channel.basicQos(prefetchCount); DeliverCallback deliverCallback = (String consumerTag, Delivery delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("[x] Received '" + message + "'"); }; boolean autoAck = true; channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});消费者代码的变动主要体现在:
- 创建交换机
- 绑定交换机与队列
需要注意的是,消费者仍然需要声明队列(这里使用的是 RabbitMQ 自动生成的随机队列),因为消费者只能从队列中取出消息进行消费。
Direct 交换机
在实际生产环境中,可能需要根据消息的重要程度进行不同的转发处理,这时就需要用到direct交换机。
direct交换机会将消息转发给路由键完全匹配的队列,然后由对应的消费者取出消息进行消费。
java
channel.queueBind(queueName, "ex", "user"); channel.basicPublish("ex", "user", null, message.getBytes());在这段代码中:
- 队列以
"user"作为路由键绑定到交换机。 - 生产者发布消息时指定的路由键也是
"user"。 - 交换机将消息的路由键与队列的路由键进行匹配,匹配成功后将消息转发给对应的队列。
- 如果多个队列绑定了同一个路由键,交换机会将消息转发给所有这些队列。
Topic 交换机
问题:在生产环境中,有时希望一个队列能够同时监听不同路由的消息。
解决方案:使用topic交换机。topic交换机支持通配符,可以实现模糊匹配的路由规则。
通配符说明:
*(星号):可恰好替换一个单词。#(井号):可替代零个或多个单词。
示例场景:假设单词按顺序表示"速度.颜色.动物物种",则上图中的绑定关系可以总结为:
- Q1:监听所有橙色动物的消息。
- Q2:监听所有兔子的消息,以及所有懒动物的消息。
通过topic交换机,一个队列可以监听多个不同路由的消息,从而应对复杂的业务场景。
headers 交换机
headers交换机依靠哈希表和字典匹配的方式进行路由,性能比direct和topic交换机低很多,因此本文不作展开。
四、其他功能(待补充)
发布者确认
问题:生产者发送消息后,无法确认消息是否成功到达 RabbitMQ。
解决方案:使用发布者确认机制。当消息到达 RabbitMQ 时,RabbitMQ 会向生产者返回一条确认消息;如果未到达,则抛出异常。
单条消息确认:
java
// 发送消息 String message = "hello"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 等待确认(5秒超时) // 如果没有抛出异常,说明消息已到达 RabbitMQ channel.waitForConfirmsOrDie(5_000);批量消息确认:
java
// 开启确认模式 channel.confirmSelect(); // 批量确认参数 int batchSize = 100; int count = 0; // 发送 1000 条消息,每 100 条确认一次 for (int i = 0; i < 1000; i++) { String message = "msg" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); count++; if (count == batchSize) { // 等待这一批消息确认(5秒超时) channel.waitForConfirmsOrDie(5_000); System.out.println("Confirmed " + batchSize + " messages (up to msg" + i + ")"); count = 0; } } // 确认最后一批不足 batchSize 的消息 if (count > 0) { channel.waitForConfirmsOrDie(5_000); System.out.println("Confirmed last " + count + " messages"); }waitForConfirmsOrDie()方法会等待从上一次确认点到当前确认点之间的所有消息被确认,通过这种方式实现批量消息确认。
手动确认机制
前面介绍basicConsume()方法时提到过autoAck(自动确认)参数。在生产环境中不建议开启自动确认,而是使用手动确认机制。
手动确认只需一行代码:
java
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);这行代码的作用是告诉 RabbitMQ 服务器:“这条消息我已经处理完了,你可以将它从队列中删除了。”
使用手动确认机制后,如果消费者在处理消息时发生错误,队列中的消息依然存在,消费者可以重新尝试消费,从而避免消息丢失。
感谢浏览!
