【RabbitMQ】路由模式(使用案例)
文章目录
- 1. Routing(路由模式)
- 2、引入依赖
- 3. 生产者代码编写
- 3.1 创建交换机
- 3.2 声明队列
- 3.3 绑定交换机和队列
- 3.4 发送消息
- 3.5 完整代码
- 4. 消费者代码编写
- 4.1 消费者一
- 4.1 消费者二
- 5. 运行程序
1. Routing(路由模式)
队列和交换机的绑定,不能是任意的绑定了,而是要指定一个 BindingKey(RoutingKey 的一种)
消息的发送方在向 Exchange 发送消息时,也需要指定消息的 RoutingKey。
Exchange 也不再把消息交给每一个绑定的 key,而是根据消息的 RoutingKey 进行判断,只有队列绑定时的 BindingKey 和发送消息的 RoutingKey 完全一致,才会接收到消息。
接下来我们看看 Routing 模式的实现,步骤:
- 1、引入依赖
- 2、编写生产者代码
- 3、编写消费者代码
2、引入依赖
先引入 rabbitmq 的依赖
<!-- Source: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version><scope>compile</scope></dependency>3. 生产者代码编写
和发布订阅模式的区别是:交换机类型不同,绑定队列的 BindingKey 不同。
那么先去 Constants.java 里面定义交换机和队列。
// 路由模式publicstaticfinalStringDIRECT_EXCHANGE="direct.exchange";// 声明交换机publicstaticfinalStringDIRECT_QUEUE1="direct.queue1";// 声明队列publicstaticfinalStringDIRECT_QUEUE2="direct.queue2";// 声明队列3.1 创建交换机
定义交换机类型为 BuiltinExchangeType.DIRECT
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT,true);3.2 声明队列
代码如下所示:
channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);3.3 绑定交换机和队列
代码如下所示:
channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");我们就按照下面这个绑定关系来完成实验:
3.4 发送消息
还是老样子,发送消息和上面的图要对应起来。
代码如下所示:
// 6. 发送消息Stringmsg="Hello direct, my routing key is : a....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"a",null,msg.getBytes());Stringmsg_b="Hello direct, my routing key is : b....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"b",null,msg_b.getBytes());Stringmsg_c="Hello direct, my routing key is : c....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"c",null,msg_c.getBytes());System.out.println("消息发送成功");3.5 完整代码
代码如下所示:
packagedirect;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactory=newConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnection=factory.newConnection();// 2. 开启 channel 通道Channelchannel=connection.createChannel();// 3. 声明交换机(使用内置的交换机即可)channel.exchangeDeclare(Constants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT,true);// 4. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);// 5. 绑定队列和交换机channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");// 6. 发送消息Stringmsg="Hello direct, my routing key is : a....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"a",null,msg.getBytes());Stringmsg_b="Hello direct, my routing key is : b....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"b",null,msg_b.getBytes());Stringmsg_c="Hello direct, my routing key is : c....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"c",null,msg_c.getBytes());System.out.println("消息发送成功");// 7. 资源释放channel.close();connection.close();}}4. 消费者代码编写
Routing 模式的消费者代码 和 Publish / Subscribe代码 一样,同样复制出来两份,然后修改消费的队列名称就可以了。
4.1 消费者一
代码如下所示:
packagedirect;importcom.rabbitmq.client.*;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactory=newConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnection=factory.newConnection();// 2. 开启 channel 通道Channelchannel=connection.createChannel();// 3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);// 4. 接收消息并消费DefaultConsumerconsumer=newDefaultConsumer(channel){// 从队列中收到消息后, 就会执行的方法@OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{// 收到消息以后就进行打印System.out.println("接收到消息: "+newString(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);// 5. 不需要释放资源}}4.1 消费者二
代码如下所示:
packagedirect;importcom.rabbitmq.client.*;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer2{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactory=newConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnection=factory.newConnection();// 2. 开启 channel 通道Channelchannel=connection.createChannel();// 3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);// 4. 接收消息并消费DefaultConsumerconsumer=newDefaultConsumer(channel){// 从队列中收到消息后, 就会执行的方法@OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{// 收到消息以后就进行打印System.out.println("接收到消息: "+newString(body));}};channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);// 5. 不需要释放资源}}5. 运行程序
先运行生产者代码
可以看到 direct.queue1 队列中,路由了一条消息。direct.queue2 队列中,路由了三条消息
对应图如下所示:
exchange 下队列和 Routing Key 的绑定关系,如下所示:
然后运行 Consumer1 消费者代码:
运行 Consumer2 消费者代码:
同时可以看到,队列中的消息已经被全部给消费完了
