当前位置: 首页 > news >正文

RabbitMQ工作模式实践

前期准备

pom.xml:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>4.0.6</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>review-workmodern-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>review-demo</name> <description>review-demo</description> <url/> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webmvc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webmvc-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.20.0</version> </dependency> <!-- 日志框架依赖:SLF4J + Simple实现(用于控制台输出) --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.36</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>

一、简单模式

  • 一个生产者发送消息到队列,一个消费者从队列接收消息
  • 无交换机参与(使用默认空交换机)
  • 一对一通信,消息仅被消费一次

1.创建生产者

package com.example.reviewdemo.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection=connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("simple2_queue",true,false,false,null); String message="简单队列"; channel.basicPublish("","simple2_queue",null,message.getBytes() ); System.out.println("已发送:"+message); channel.close(); connection.close(); } }

2.创建消费者

package com.example.reviewdemo.simple; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("simple2_queue", true, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag: " + consumerTag); //消费者标识 System.out.println("Exchange: " + envelope.getExchange()); //交换机名称 System.out.println("RoutingKey: " + envelope.getRoutingKey()); //路由键 System.out.println("properties: " + properties); //消息属性 System.out.println("body: " + new String(body)); //消息内容 } }; channel.basicConsume("simple2_queue", true, consumer); } }

3.验证

  • 运行生产者,控制台输出"消息已发送: Hello Simple Mode!"
  • 运行消费者,控制台输出"收到消息: Hello Simple Mode!"
  • 在RabbitMQ管理界面查看队列状态,确认消息被消费

二、工作队列模式

  • 一个生产者发送消息到队列,多个消费者共同监听队列
  • RabbitMQ默认采用轮询(Round-Robin)分发机制
  • 消息不会重复,每个消费者接收不同消息

1.创建生产者

package com.example.reviewdemo.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static final String QUEUE_NAME="work2_queue"; public static void main(String[] args) throws Exception { ConnectionFactory connnectionFactory=new ConnectionFactory(); connnectionFactory.setHost("localhost"); connnectionFactory.setPort(5672); connnectionFactory.setUsername("guest"); connnectionFactory.setPassword("guest"); connnectionFactory.setVirtualHost("/"); try (Connection connection = connnectionFactory.newConnection(); Channel channel = connection.createChannel()) { // 3. 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 4. 模拟发送10个耗时任务 for (int i = 1; i <= 10; i++) { // 创建任务消息,包含不同耗时:1-3秒 Task 1 = 耗时1秒 String message = "Task " + i + " (耗时" + (i % 3 + 1) + "秒)"; // 5. 发布消息到队列 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[生产者] 发送任务: " + message); // 模拟任务产生间隔 Thread.sleep(500); } System.out.println("[生产者] 所有任务已发送完毕!"); } // channel.close(); // connection.close(); } }

2.创建消费者

package com.example.reviewdemo.work; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { // 队列名称,必须与生产者一致 private static final String QUEUE_NAME = "work2_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("等待接收工作队列消息..."); // 4. 设置服务质量(QoS),实现公平分发 channel.basicQos(1); // 限制预取数量为1,实现公平分发 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[消费者] 处理任务: " + message); try { Thread.sleep(1000 * (Integer.parseInt(message.split(" ")[1] .replace("(", "").replace(")", "")))); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("[消费者] 任务完成: " + message); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, false, consumer); } }

3.验证

  • 启动两个消费者实例(在不同终端运行Consumer)
  • 运行生产者,发送10条任务消息
  • 观察两个消费者交替接收任务,实现轮询分发
  • 查看消费者处理时间,确认工作队列模式下的负载均衡效果

三、发布/订阅模式

  • 使用Fanout类型的交换机
  • 生产者将消息发送到交换机,交换机将消息广播到所有绑定的队列
  • 每个消费者接收相同的消息
  • 适用于需要"一对多"消息分发的场景

1.创建生产者

package com.example.reviewdemo.publish_subscribe; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static final String EXCHANGE_NAME = "logs2"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); System.out.println("准备向交换机 " + EXCHANGE_NAME + " 发送广播消息..."); // 4. 发送5条广播消息 for (int i = 1; i <= 5; i++) { String message = "Message " + i; // 5. 发布消息到Fanout交换机 // exchange: 交换机名称;routingKey: "" - 空字符串(Fanout交换机忽略路由键) // props: null - 使用默认消息属性;body: 消息内容 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("[生产者] 发送消息: " + message); // 6. 模拟消息产生间隔 Thread.sleep(1000); } System.out.println("[生产者] 所有广播消息已发送完毕!"); } }

2.创建消费者

package com.example.reviewdemo.publish_subscribe; import com.rabbitmq.client.*; import java.io.UnsupportedEncodingException; public class Consumer1 { public static final String EXCHANGE_NAME = "logs2"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare("", true, false, true, null).getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("队列 " + queueName + " 已绑定到交换机 " + EXCHANGE_NAME); System.out.println("等待接收发布订阅消息..."); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { // 7. 将字节数组转换为字符串 String message = new String(body, "UTF-8"); // 8. 处理消息 System.out.println("[消费者] 收到消息: " + message); } }; // 9. 开始消费消息 channel.basicConsume(queueName, true, consumer); } }

3.验证

  • 启动多个消费者实例(在不同终端运行Consumer)
  • 运行生产者,发送5条消息
  • 观察所有消费者实例都接收到相同的消息
  • 查看RabbitMQ管理界面,确认队列与交换机的绑定关系

四、路由模式

  • 使用Direct类型的交换机
  • 生产者发送消息时指定Routing Key
  • 交换机根据Routing Key匹配绑定的队列
  • 适用于精确消息路由的场景

1.创建生产者

package com.example.reviewdemo.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private static final String EXCHANGE_NAME = "direct_logs2"; public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String[] routingKeys = {"info", "warning", "error"}; // 5. 发送三种不同级别的日志消息 for (String routingKey : routingKeys) { // 构造消息内容 String message = "Log message with routing key: " + routingKey; // 6. 发布消息到Direct交换机 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println("[生产者] 发送消息: " + message); } System.out.println("[生产者] 所有路由消息已发送完毕!"); } }

2.创建消费者

package com.example.reviewdemo.routing; import com.rabbitmq.client.*; import java.io.IOException; import java.io.UnsupportedEncodingException; public class Consumer { private static final String EXCHANGE_NAME = "direct_logs2"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); // 5. 将队列绑定到交换机,并指定路由键 // 根据不同的routingKey绑定队列 String[] routingKeys = {"info", "warning", "error" , "error2"}; for (String routingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME, routingKey); System.out.println("绑定队列: " + queueName + " 到路由键: " + routingKey); } System.out.println("等待接收路由消息..."); // 6. 创建消费者回调 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { // 7. 获取路由键和消息内容 String routingKey = envelope.getRoutingKey(); // 获取消息的路由键 String message = new String(body, "UTF-8"); // 8. 处理消息 System.out.println("[消费者] 收到消息: " + message); } }; // 9. 开始消费消息 channel.basicConsume(queueName, true, consumer); } }

3.验证

  • 运行生产者,发送不同routingKey的消息
  • 运行消费者,观察消费者接收的消息是否符合预期
  • 尝试修改消费者绑定的routingKey,验证消息路由效果

五、通配符模式

  • 使用Topic类型的交换机
  • 生产者发送消息时指定Routing Key(包含通配符)
  • 交换机根据通配符匹配规则将消息路由到队列
  • 适用于复杂路由规则的场景

1.创建生产者

package com.example.reviewdemo.topics; import com.rabbitmq.client.*; import java.io.UnsupportedEncodingException; import java.util.Random; public class Producer { private static final String EXCHANGE_NAME = "topic_logs2"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 4. 定义不同格式的路由键 // 发送不同主题的消息 String[] routingKeys = {"stock.usd.eur", "stock.eur.usd", "stock.usd", "stock.eur"}; // 5. 发送不同路由键的消息 for (String routingKey : routingKeys) { // 构造消息内容 String message = "Message with routing key: " + routingKey; // 6. 发布消息到Topic交换机 // exchange: Topic交换机名称;routingKey: 带层级的路由键,可以使用通配符模式 // props: null - 默认消息属性;body: 消息内容 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println("[生产者] 发送消息: " + message); } System.out.println("[生产者] 所有主题消息已发送完毕!"); } }

2.创建消费者

package com.example.reviewdemo.topics; import com.rabbitmq.client.*; import java.io.UnsupportedEncodingException; public class Consumer { private static final String EXCHANGE_NAME = "topic_logs2"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); // 5. 将队列绑定到交换机,使用通配符绑定键 // 绑定不同的通配符模式 * (星号):匹配一个单词 # (井号):匹配零个或多个单词 String[] bindingKeys = {"stock.*", "stock.#", "stock.usd.*", "stock.*.eur"}; for (String bindingKey : bindingKeys) { // 关键操作:队列使用通配符绑定到Topic交换机 channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); System.out.println("绑定队列: " + queueName + " 到绑定键: " + bindingKey); } System.out.println("等待接收通配符消息..."); // 6. 创建消费者回调 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { // 7. 获取路由键和消息内容 String message = new String(body, "UTF-8"); // 8. 处理消息 System.out.println("[消费者] 收到消息: " + message); } }; // 9. 开始消费消息 channel.basicConsume(queueName, true, consumer); } }

3.验证

  • 运行生产者,发送不同主题的消息
  • 运行消费者,观察消费者接收的消息是否符合通配符规则
  • 修改消费者绑定的通配符,验证消息路由效果
http://www.jsqmd.com/news/855179/

相关文章:

  • BGA底部填充胶:嵌入式主控板可靠性设计与工艺全解析
  • C++哈希介绍
  • C#学习笔记-入门篇
  • Perplexity写作辅助响应延迟骤增?紧急修复指南:5步定位模型层瓶颈(含实时诊断脚本)
  • 深入解析中断与异常:从概念到x86/ARM/RISC架构实践
  • 非 CTP 柜台连接天勤:众期融航易达等网关差异备忘
  • 超实用!PS 修改截图文字最简单方法,自然无破绽
  • 香橙派Lite全解析:从硬件到应用,玩转ARM开发板与物联网项目
  • 保姆级教程:用Python+OpenCV实现无人机吊舱图像与卫星地图的自动匹配(附代码)
  • uni-app项目上架前必做:手把手教你用Android Studio生成正式签名APK(从证书到发布)
  • 在ai应用开发中利用taotoken实现多模型聚合与成本优化
  • CAN总线接口电路设计实战:从差分信号原理到PCB布局避坑指南
  • 视频融合平台:服务正常运行但没有画面
  • 硬件研发必看:钡特电源 DF2-15S03XT 与金升阳 F1503XT-2WR3 属工业标准模块电源封装与性能
  • AI提速中国品牌全球化:供应链、组织、营销、本地化全面升级!
  • S32K3 FlexCAN驱动避坑指南:从波特率计算到邮箱锁定的实战心得
  • VCSA 8.0部署卡在初始化VCS服务、认证失败?NTP+DNS一招解决
  • COLMAP重建翻车实录:当你的相机内外参已知,却卡在database.db和images.txt对不上?
  • Java Snowy框架CI/CD云效自动化部署流程
  • Skeyevss 视频调阅使用说明
  • 16位微控制器:电池供电与物联网节点的性能功耗平衡之道
  • 3.1 vss-performance 多协议监听与SIP发送流水线异步化
  • Perplexity音乐搜索效率提升300%:实测5种专业级查询语法与避坑清单(附2024最新API响应数据)
  • CPU、MPU、MCU与SoC:从核心概念到实战选型全解析
  • 告别Navicat!用VSCode的Database Client插件搞定MySQL、Redis连接与可视化操作
  • 从开发者视角分享Taotoken文档与示例代码的上手便捷度
  • 【大模型12步学习路线 · 第10步 · ①原理篇】LLM 微调全景:Full FT / LoRA / QLoRA / DoRA / DPO,从 PEFT 到偏好对齐
  • Perplexity数学知识查询失效真相(2024最新算法限制深度拆解):为什么你的微积分提问总得不到严谨推导?
  • Linux符号链接原理与实战:从快捷方式到系统管理核心技能
  • DDFS信号发生器的低成本实现:告别专用芯片,用STC89C52和LM324就能搞定