RabbitMQ(七大模式+微服务+自用)
一、前置准备
- 安装并启动 RabbitMQ(默认端口 5672)
- JDK 8+、Maven、IDEA
- 所有项目通用工具类 + 通用 pom,直接复制
二、全局统一配置(所有项目必用)
1. 公共连接工具类 ConnectionUtil.java
java
运行
package com.mq.util; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * RabbitMQ 公共连接工具类(所有项目通用) */ public class ConnectionUtil { // 获取RabbitMQ连接 public static Connection getConnection() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); return factory.newConnection(); } // 关闭连接和通道 public static void close(Connection conn, Channel channel) { try { if (channel != null) channel.close(); if (conn != null) conn.close(); } catch (Exception e) { e.printStackTrace(); } } }2. 通用 pom.xml(原生 Java 项目)
xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mq</groupId> <!-- 替换为项目名:simple-demo / work-demo 等 --> <artifactId>项目名</artifactId> <version>1.0</version> <dependencies> <!-- RabbitMQ客户端依赖 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.18.0</version> </dependency> </dependencies> </project>三、项目 1:simple-demo 简单模式
项目结构
plaintext
simple-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ └── Consumer.java └── pom.xml完整代码
生产者 Producer.java
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE = "simple_queue"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); String msg = "简单模式消息"; channel.basicPublish("", QUEUE, null, msg.getBytes()); System.out.println("发送:" + msg); ConnectionUtil.close(conn, channel); } }消费者 Consumer.java
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer { private static final String QUEUE = "simple_queue"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); DeliverCallback callback = (tag, msg) -> { System.out.println("接收:" + new String(msg.getBody())); }; channel.basicConsume(QUEUE, true, callback, tag -> {}); System.out.println("等待消息..."); } }✅ 运行顺序(必须先开消费者!)
- 运行Consumer(保持运行)
- 运行Producer
- 消费者控制台打印消息
四、项目 2:work-demo 工作队列模式
项目结构
plaintext
work-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ ├── Consumer1.java │ └── Consumer2.java └── pom.xml完整代码
生产者 Producer.java
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE = "work_queue"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); // 发送10条消息 for (int i = 1; i <= 10; i++) { String msg = "任务" + i; channel.basicPublish("", QUEUE, null, msg.getBytes()); System.out.println("发送:" + msg); } ConnectionUtil.close(conn, channel); } }消费者 1 Consumer1.java
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer1 { private static final String QUEUE = "work_queue"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); DeliverCallback callback = (tag, msg) -> { System.out.println("消费者1接收:" + new String(msg.getBody())); }; channel.basicConsume(QUEUE, true, callback, tag -> {}); } }消费者 2 Consumer2.java
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer2 { private static final String QUEUE = "work_queue"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); DeliverCallback callback = (tag, msg) -> { System.out.println("消费者2接收:" + new String(msg.getBody())); }; channel.basicConsume(QUEUE, true, callback, tag -> {}); } }✅ 运行顺序
- 运行Consumer1
- 运行Consumer2
- 运行Producer
- 两个消费者轮流接收消息(轮询分发)
五、项目 3:fanout-demo 发布订阅模式
项目结构
plaintext
fanout-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ ├── Consumer1.java │ └── Consumer2.java └── pom.xml完整代码
生产者 Producer.java
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String EXCHANGE = "fanout_exchange"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE, "fanout"); String msg = "广播消息"; channel.basicPublish(EXCHANGE, "", null, msg.getBytes()); System.out.println("发送:" + msg); ConnectionUtil.close(conn, channel); } }消费者 1 Consumer1.java
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer1 { private static final String EXCHANGE = "fanout_exchange"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE, "fanout"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, EXCHANGE, ""); DeliverCallback callback = (tag, msg) -> { System.out.println("消费者1接收:" + new String(msg.getBody())); }; channel.basicConsume(queue, true, callback, tag -> {}); } }消费者 2 Consumer2.java
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer2 { private static final String EXCHANGE = "fanout_exchange"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE, "fanout"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, EXCHANGE, ""); DeliverCallback callback = (tag, msg) -> { System.out.println("消费者2接收:" + new String(msg.getBody())); }; channel.basicConsume(queue, true, callback, tag -> {}); } }✅ 运行顺序
- 运行Consumer1
- 运行Consumer2
- 运行Producer
- 两个消费者都收到同一条消息(广播)
六、项目 4:direct-demo 路由模式
项目结构
plaintext
direct-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ ├── ConsumerInfo.java │ └── ConsumerError.java └── pom.xml完整代码
生产者 Producer.java
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String EXCHANGE = "direct_exchange"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE, "direct"); // 发送不同路由键消息 channel.basicPublish(EXCHANGE, "info", null, "info日志".getBytes()); channel.basicPublish(EXCHANGE, "error", null, "error日志".getBytes()); ConnectionUtil.close(conn, channel); } }消费者 ConsumerInfo.java
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class ConsumerInfo { public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare("direct_exchange", "direct"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "direct_exchange", "info"); channel.basicConsume(queue, true, (t, m) -> { System.out.println("info接收:" + new String(m.getBody())); }, t -> {}); } }消费者 ConsumerError.java
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class ConsumerError { public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare("direct_exchange", "direct"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "direct_exchange", "error"); channel.basicConsume(queue, true, (t, m) -> { System.out.println("error接收:" + new String(m.getBody())); }, t -> {}); } }✅ 运行顺序
- 运行ConsumerInfo
- 运行ConsumerError
- 运行Producer
- 各自只收到对应路由键的消息
七、项目 5:topic-demo 主题模式(完整代码)
项目结构
plaintext
topic-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ ├── Consumer1.java │ └── Consumer2.java └── pom.xmlProducer.java(生产者)
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String EXCHANGE = "topic_exchange"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE, "topic"); // 发送两条带路由键的消息 channel.basicPublish(EXCHANGE, "user.save", null, "用户新增".getBytes()); channel.basicPublish(EXCHANGE, "order.pay", null, "订单支付".getBytes()); System.out.println("主题模式消息发送完成"); ConnectionUtil.close(conn, channel); } }Consumer1.java(匹配 user.#)
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer1 { private static final String EXCHANGE = "topic_exchange"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE, "topic"); String queue = channel.queueDeclare().getQueue(); // 绑定通配符:匹配所有 user 开头的路由 channel.queueBind(queue, EXCHANGE, "user.#"); channel.basicConsume(queue, true, (tag, msg) -> { System.out.println("消费者1(user.#) 接收:" + new String(msg.getBody())); }, tag -> {}); } }Consumer2.java(匹配 *.pay)
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer2 { private static final String EXCHANGE = "topic_exchange"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE, "topic"); String queue = channel.queueDeclare().getQueue(); // 绑定通配符:匹配所有以 .pay 结尾的路由 channel.queueBind(queue, EXCHANGE, "*.pay"); channel.basicConsume(queue, true, (tag, msg) -> { System.out.println("消费者2(*.pay) 接收:" + new String(msg.getBody())); }, tag -> {}); } }✅ 运行顺序
- 运行Consumer1
- 运行Consumer2
- 运行Producer
- 结果:
- 消费者 1 收到:用户新增
- 消费者 2 收到:订单支付
八、项目 6:headers-demo 首部匹配模式(完整代码)
项目结构
plaintext
headers-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ └── Consumer.java └── pom.xmlProducer.java
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.util.Map; public class Producer { private static final String EXCHANGE = "headers_exchange"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE, "headers"); // 设置消息头 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .headers(Map.of("type", "sms")).build(); channel.basicPublish(EXCHANGE, "", props, "短信消息".getBytes()); System.out.println("首部模式消息发送完成"); ConnectionUtil.close(conn, channel); } }Consumer.java
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.util.Map; public class Consumer { private static final String EXCHANGE = "headers_exchange"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE, "headers"); String queue = channel.queueDeclare().getQueue(); // 匹配headers:任意一个满足即可 Map<String, Object> args = Map.of("x-match", "any", "type", "sms"); channel.queueBind(queue, EXCHANGE, "", args); channel.basicConsume(queue, true, (tag, msg) -> { System.out.println("首部匹配消费者接收:" + new String(msg.getBody())); }, tag -> {}); } }✅ 运行顺序
- 运行Consumer
- 运行Producer
- 消费者收到:短信消息
九、项目 7:delay-demo 延迟队列(完整代码)
项目结构
plaintext
delay-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ └── Consumer.java └── pom.xmlProducer.java(生产者 —— 发送到过期队列)
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.util.HashMap; import java.util.Map; public class Producer { // 过期队列(消息在这里等待5秒) private static final String WAIT_QUEUE = "wait_queue"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); // ========== 核心:过期队列配置(死信转发) ========== Map<String, Object> params = new HashMap<>(); // 消息过期时间 5秒 params.put("x-message-ttl", 5000); // 过期后转发到死信交换机 params.put("x-dead-letter-exchange", ""); // 过期后路由到真正队列 params.put("x-dead-letter-routing-key", "real_delay_queue"); // 声明过期等待队列 channel.queueDeclare(WAIT_QUEUE, true, false, false, params); // 发送消息(不设置expiration,队列统一过期) String msg = "我是延迟5秒的消息!"; channel.basicPublish("", WAIT_QUEUE, null, msg.getBytes()); System.out.println("已发送延迟消息,等待5秒后到达消费者..."); ConnectionUtil.close(conn, channel); } }Consumer.java(消费者 —— 监听真正队列)
java
运行
package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer { // 真正消费的队列(5秒后消息才会来) private static final String REAL_QUEUE = "real_delay_queue"; public static void main(String[] args) throws Exception { Connection conn = ConnectionUtil.getConnection(); Channel channel = conn.createChannel(); // 声明真正的消费队列 channel.queueDeclare(REAL_QUEUE, true, false, false, null); System.out.println("延迟消费者已启动,等待5秒后收到消息..."); // 监听消费 channel.basicConsume(REAL_QUEUE, true, (tag, msg) -> { System.out.println("✅ 收到延迟消息:" + new String(msg.getBody())); }, tag -> {}); } }十、项目 8:springboot-rabbitmq-demo(完整代码)
pom.xml
xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.15</version> </parent> <modelVersion>4.0.0</modelVersion> <groupId>com.mq</groupId> <artifactId>springboot-rabbitmq-demo</artifactId> <version>1.0</version> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> </project>application.yml
yaml
spring: rabbitmq: host: localhost port: 5672 username: guest password: guestRabbitApplication.java(启动类)
java
运行
package com.mq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RabbitApplication { public static void main(String[] args) { SpringApplication.run(RabbitApplication.class, args); } }RabbitConfig.java
java
运行
package com.mq.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue queue(){ return new Queue("boot_queue"); } }ProducerController.java
java
运行
package com.mq.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { @Autowired private RabbitTemplate template; @GetMapping("/send") public String send(){ template.convertAndSend("boot_queue", "SpringBoot集成RabbitMQ消息"); return "消息发送成功"; } }Consumer.java
java
运行
package com.mq.consumer; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { @RabbitListener(queues = "boot_queue") public void receive(String msg){ System.out.println("SpringBoot消费者接收:"+msg); } }✅ 运行顺序
- 启动RabbitApplication
- 浏览器访问:
http://localhost:8080/send - 控制台打印消息
十一、项目 9:rabbitmq-microservice-demo 微服务异步通信(完整代码)
启动类 MicroApplication.java
java
运行
package com.mq.micro; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class MicroApplication { public static void main(String[] args) { SpringApplication.run(MicroApplication.class,args); } }配置类 RabbitMicroConfig.java
java
运行
package com.mq.micro.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMicroConfig { public static final String ORDER_EXCHANGE = "order_business_exchange"; public static final String STOCK_QUEUE = "stock_reduce_queue"; public static final String SMS_QUEUE = "sms_send_queue"; public static final String ROUTING_STOCK = "order.stock"; public static final String ROUTING_SMS = "order.sms"; @Bean public DirectExchange orderExchange(){ return new DirectExchange(ORDER_EXCHANGE,true,false); } @Bean public Queue stockQueue(){ return new Queue(STOCK_QUEUE,true); } @Bean public Queue smsQueue(){ return new Queue(SMS_QUEUE,true); } @Bean public Binding stockBinding(){ return BindingBuilder.bind(stockQueue()).to(orderExchange()).with(ROUTING_STOCK); } @Bean public Binding smsBinding(){ return BindingBuilder.bind(smsQueue()).to(orderExchange()).with(ROUTING_SMS); } }订单生产者 OrderProducer.java
java
运行
package com.mq.micro.order; import com.mq.micro.config.RabbitMicroConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class OrderProducer { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/create/order/{orderNo}") public String createOrder(@PathVariable String orderNo){ String orderMsg = "订单:"+orderNo+" 支付完成"; rabbitTemplate.convertAndSend(RabbitMicroConfig.ORDER_EXCHANGE, RabbitMicroConfig.ROUTING_STOCK,orderMsg); rabbitTemplate.convertAndSend(RabbitMicroConfig.ORDER_EXCHANGE, RabbitMicroConfig.ROUTING_SMS,orderMsg); return "订单创建成功,消息已推送"; } }库存消费者 StockConsumer.java
java
运行
package com.mq.micro.stock; import com.mq.micro.config.RabbitMicroConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class StockConsumer { @RabbitListener(queues = RabbitMicroConfig.STOCK_QUEUE) public void reduceStock(String msg){ System.out.println("【库存微服务】收到:"+msg); System.out.println("【库存微服务】执行商品库存扣减..."); } }短信消费者 SmsConsumer.java
java
运行
package com.mq.micro.sms; import com.mq.micro.config.RabbitMicroConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class SmsConsumer { @RabbitListener(queues = RabbitMicroConfig.SMS_QUEUE) public void sendSms(String msg){ System.out.println("【短信微服务】收到:"+msg); System.out.println("【短信微服务】执行发送下单成功短信..."); } }✅ 运行顺序
- 启动RabbitMQ
- 启动MicroApplication
- 浏览器访问:
plaintext
http://localhost:8080/create/order/ORDER20260520- 控制台输出:
plaintext
【库存微服务】收到:订单:ORDER20260520 支付完成 【库存微服务】执行商品库存扣减... 【短信微服务】收到:订单:ORDER20260520 支付完成 【短信微服务】执行发送下单成功短信...十二、所有项目通用运行口诀(零基础必背)
- 原生 Java 项目:先运行消费者,再运行生产者
- SpringBoot 项目:直接启动,访问接口即可
- 多个消费者:全部先启动,再发消息
- 交换机模式:消费者必须先绑定队列
