当前位置: 首页 > news >正文

【RabbitMQ】路由模式(使用案例)

文章目录

  • 1. Routing(路由模式)
  • 2、引入依赖
  • 3. 生产者代码编写
    • 3.1 创建交换机
    • 3.2 声明队列
    • 3.3 绑定交换机和队列
    • 3.4 发送消息
    • 3.5 完整代码
  • 4. 消费者代码编写
    • 4.1 消费者一
    • 4.1 消费者二
  • 5. 运行程序

1. Routing(路由模式)

队列和交换机的绑定,不能是任意的绑定了,而是要指定一个 BindingKey(RoutingKey 的一种)

消息的发送方在向 Exchange 发送消息时,也需要指定消息的 RoutingKey。

Exchange 也不再把消息交给每一个绑定的 key,而是根据消息的 RoutingKey 进行判断,只有队列绑定时的 BindingKey 和发送消息的 RoutingKey 完全一致,才会接收到消息。

接下来我们看看 Routing 模式的实现,步骤:

  • 1、引入依赖
  • 2、编写生产者代码
  • 3、编写消费者代码

2、引入依赖

先引入 rabbitmq 的依赖

<!-- Source: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version><scope>compile</scope></dependency>

3. 生产者代码编写

和发布订阅模式的区别是:交换机类型不同,绑定队列的 BindingKey 不同。

那么先去 Constants.java 里面定义交换机和队列。

// 路由模式publicstaticfinalStringDIRECT_EXCHANGE="direct.exchange";// 声明交换机publicstaticfinalStringDIRECT_QUEUE1="direct.queue1";// 声明队列publicstaticfinalStringDIRECT_QUEUE2="direct.queue2";// 声明队列

3.1 创建交换机

定义交换机类型为 BuiltinExchangeType.DIRECT

channel.exchangeDeclare(Constants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT,true);

3.2 声明队列

代码如下所示:

channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);

3.3 绑定交换机和队列

代码如下所示:

channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");

我们就按照下面这个绑定关系来完成实验:

3.4 发送消息

还是老样子,发送消息和上面的图要对应起来。

代码如下所示:

// 6. 发送消息Stringmsg="Hello direct, my routing key is : a....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"a",null,msg.getBytes());Stringmsg_b="Hello direct, my routing key is : b....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"b",null,msg_b.getBytes());Stringmsg_c="Hello direct, my routing key is : c....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"c",null,msg_c.getBytes());System.out.println("消息发送成功");

3.5 完整代码

代码如下所示:

packagedirect;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactory=newConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnection=factory.newConnection();// 2. 开启 channel 通道Channelchannel=connection.createChannel();// 3. 声明交换机(使用内置的交换机即可)channel.exchangeDeclare(Constants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT,true);// 4. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);// 5. 绑定队列和交换机channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");// 6. 发送消息Stringmsg="Hello direct, my routing key is : a....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"a",null,msg.getBytes());Stringmsg_b="Hello direct, my routing key is : b....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"b",null,msg_b.getBytes());Stringmsg_c="Hello direct, my routing key is : c....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"c",null,msg_c.getBytes());System.out.println("消息发送成功");// 7. 资源释放channel.close();connection.close();}}

4. 消费者代码编写

Routing 模式的消费者代码 和 Publish / Subscribe代码 一样,同样复制出来两份,然后修改消费的队列名称就可以了。

4.1 消费者一

代码如下所示:

packagedirect;importcom.rabbitmq.client.*;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactory=newConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnection=factory.newConnection();// 2. 开启 channel 通道Channelchannel=connection.createChannel();// 3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);// 4. 接收消息并消费DefaultConsumerconsumer=newDefaultConsumer(channel){// 从队列中收到消息后, 就会执行的方法@OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{// 收到消息以后就进行打印System.out.println("接收到消息: "+newString(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);// 5. 不需要释放资源}}

4.1 消费者二

代码如下所示:

packagedirect;importcom.rabbitmq.client.*;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer2{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactory=newConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnection=factory.newConnection();// 2. 开启 channel 通道Channelchannel=connection.createChannel();// 3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);// 4. 接收消息并消费DefaultConsumerconsumer=newDefaultConsumer(channel){// 从队列中收到消息后, 就会执行的方法@OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{// 收到消息以后就进行打印System.out.println("接收到消息: "+newString(body));}};channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);// 5. 不需要释放资源}}

5. 运行程序

先运行生产者代码

可以看到 direct.queue1 队列中,路由了一条消息。direct.queue2 队列中,路由了三条消息

对应图如下所示:

exchange 下队列和 Routing Key 的绑定关系,如下所示:

然后运行 Consumer1 消费者代码:

运行 Consumer2 消费者代码:

同时可以看到,队列中的消息已经被全部给消费完了

http://www.jsqmd.com/news/669537/

相关文章:

  • 第 32 课:任务卡片按状态分组与本地持久化
  • Windows Cleaner:终极免费开源工具,快速解决C盘爆红问题
  • 推荐系统常用指标NDCG含义及公式
  • 2026年本地工业通风降温/正负压通风降温/局部通风降温/通风降温管道优质供应商推荐 - 行业平台推荐
  • 力扣204
  • Hermes Agent 项目总览
  • Pixel Fashion Atelier部署教程:Mac M2/M3芯片通过MLX适配Stable Diffusion方案
  • 基于SpringBoot + Vue的社区互助系统
  • 2026年高精度浙江立式加工中心/立卧两用加工中心/加工中心/天车式加工中心厂家精选合集 - 品牌宣传支持者
  • 2026年口碑好的江苏减速机/江苏行星减速机优质厂家推荐榜 - 品牌宣传支持者
  • 2026年靠谱的连栋种植温室大棚/广东玻璃种植温室大棚推荐厂家精选 - 品牌宣传支持者
  • 图论——BFS搜索模板(python)
  • 2026年质量好的高压直流继电器/汽车继电器/小型继电器/信号继电器厂家选择推荐 - 行业平台推荐
  • win10、11系统磁盘空间不够,显示存储池占用,磁盘管理显示存储池分区,导致不能使用的解决方案
  • wan2.1-vae惊艳效果:2048×2048下1:1人脸特写——毛孔、睫毛、唇纹级细节
  • 2026年靠谱的浙江汽车空气悬挂/底盘空气悬挂高口碑品牌推荐 - 品牌宣传支持者
  • 2026年冲压车间岗位通风降温/工业通风降温厂家对比推荐 - 行业平台推荐
  • 后端接口必备:统一返回码设计,让系统更规范、协作更高效
  • 图论——求岛屿的最大面积(python)
  • 2026年质量好的南通钢丝绳电动葫芦/电动葫芦/南通环链电动葫芦/南通电动葫芦长期合作厂家推荐 - 行业平台推荐
  • 自指宇宙学研究大纲:存在如何通过自我描述而实在化(世毫九实验室原创理论)
  • A、B、C、D、E类IPv4地址划分和使用
  • 2026年口碑好的自动牵引绳/狗狗牵引绳/反光牵引绳厂家推荐与选型指南 - 行业平台推荐
  • 2026年比较好的宁波抽屉式模具架/宁波标准模具架/金属模具架源头工厂推荐 - 品牌宣传支持者
  • AGI武器化临界点已至:全球7国军方内部评估报告泄露,5个致命伦理漏洞亟待封堵
  • 2026年知名的电渗析开关电源/宁波电渗析开关电源/电催化氧化开关电源多家厂家对比分析 - 行业平台推荐
  • OBS StreamFX 终极指南:免费打造专业级直播效果的完整方案
  • 忍者像素绘卷真实作品展示:16色限制下高表现力角色原画集
  • 2026年口碑好的南通移动式升降平台/南通升降平台/移动式升降平台/升降平台主流厂家对比评测 - 行业平台推荐
  • 2026年热门的温室大棚骨架批发/温室大棚骨架/连体温室大棚骨架厂家综合对比分析 - 品牌宣传支持者