当前位置: 首页 > news >正文

springboot中的消息队列和用法

消息队列简介

消息队列是一种异步通信机制,允许应用程序通过发送和接收消息进行解耦。在Spring Boot中,常用的消息队列实现包括RabbitMQ、Apache Kafka和ActiveMQ。

集成RabbitMQ

RabbitMQ是一个开源消息代理,支持多种消息协议。在Spring Boot中集成RabbitMQ需要添加依赖:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

配置RabbitMQ连接信息:

spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest

创建消息生产者:

@Service public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String exchange, String routingKey, String message) { rabbitTemplate.convertAndSend(exchange, routingKey, message); } }

创建消息消费者:

@Component public class MessageConsumer { @RabbitListener(queues = "queue.name") public void receiveMessage(String message) { System.out.println("Received message: " + message); } }

集成Apache Kafka

Kafka是一个分布式流处理平台,适合处理高吞吐量数据。添加Kafka依赖:

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

配置Kafka连接信息:

spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=group_id spring.kafka.consumer.auto-offset-reset=earliest

创建Kafka生产者:

@Service public class KafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); } }

创建Kafka消费者:

@Component public class KafkaConsumer { @KafkaListener(topics = "topic.name", groupId = "group_id") public void listen(String message) { System.out.println("Received message: " + message); } }

消息队列使用场景

消息队列适用于异步处理、应用解耦、流量削峰等场景。例如在电商系统中,订单创建后通过消息队列通知库存系统减库存,而不是直接调用库存服务。

消息确认机制

为了保证消息可靠传递,Spring Boot支持消息确认机制。在RabbitMQ中配置生产者确认:

spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true

在Kafka中配置手动提交偏移量:

spring.kafka.consumer.enable-auto-commit=false

消息序列化

默认情况下,Spring Boot使用SimpleMessageConverter进行消息序列化。可以自定义消息转换器:

@Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); }

错误处理

实现ErrorHandler接口可以处理消息消费过程中的异常:

@Bean public ErrorHandler errorHandler() { return (e, record) -> { // 自定义错误处理逻辑 }; }

性能优化

对于高吞吐量场景,可以调整消费者并发数量:

spring.kafka.listener.concurrency=3

RabbitMQ中可以预取消息数量:

spring.rabbitmq.listener.simple.prefetch=10

消息队列实现票务系统

// 配置RabbitMQ连接 @Configuration public class RabbitMQConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }
// 定义票务消息DTO @Data @AllArgsConstructor @NoArgsConstructor public class TicketMessage { private String ticketId; private String eventName; private String userId; private LocalDateTime purchaseTime; private int quantity; }
// 创建消息生产者服务 @Service public class TicketProducer { private static final String TICKET_QUEUE = "ticket.queue"; @Autowired private RabbitTemplate rabbitTemplate; public void sendTicketPurchase(TicketMessage ticketMessage) { rabbitTemplate.convertAndSend(TICKET_QUEUE, ticketMessage); System.out.println("Sent ticket purchase message: " + ticketMessage); } }
// 创建消息消费者服务 @Service public class TicketConsumer { private static final String TICKET_QUEUE = "ticket.queue"; @RabbitListener(queues = TICKET_QUEUE) public void receiveTicketPurchase(TicketMessage ticketMessage) { System.out.println("Received ticket purchase message: " + ticketMessage); // 处理票务逻辑:更新库存 processTicket(ticketMessage); } private void processTicket(TicketMessage ticket) { // 实现票务处理逻辑 减库存,插数据 } }
// 创建队列配置 @Configuration public class TicketQueueConfig { @Bean public Queue ticketQueue() { return new Queue("ticket.queue", true); } }

异常处理

// 添加异常处理器 @Component public class RabbitMQErrorHandler implements RabbitListenerErrorHandler { @Override public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) { System.err.println("Error processing message: " + exception.getMessage()); // 可添加重试或死信队列逻辑 return null; } }

测试代码

// 测试消息发送 @SpringBootTest public class TicketSystemTest { @Autowired private TicketProducer ticketProducer; @Test public void testSendTicketMessage() { TicketMessage message = new TicketMessage( "TICKET-12345", "Concert", "USER-1001", LocalDateTime.now(), 2 ); ticketProducer.sendTicketPurchase(message); } }
http://www.jsqmd.com/news/581891/

相关文章:

  • 2026届最火的AI辅助论文网站横评
  • Warcraft Font Merger:解决游戏多语言显示问题的字体优化方案
  • 三步掌握数字记忆:WeChatMsg全面数据管理指南
  • PX4飞控系统全面解析:从底层架构到实战应用的深度指南
  • C++ 并发核心模型总结—— 从阻塞 IO 到 Reactor + 协程的完整理解(附 mini epoll + Reactor demo)
  • 3个关键步骤构建企业级本地语音合成系统:tts-vue深度解析
  • C++的std--ranges选择管理
  • 心理学知识分享(2026.4.3)
  • 大模型面试宝典(2026版)发布!收藏这份程序员进阶指南,高薪Offer等你拿!
  • 视频获取工具新纪元:N_m3u8DL-CLI-SimpleG全方位解析
  • Stanford CoreNLP:自然语言处理工具包的技术解析与实战指南
  • 牛客网Java面试题总结(金三银四最新版)
  • 开源项目实战部署指南:从环境搭建到应用优化
  • SiameseAOE模型对比实验:与传统规则和词典方法的性能评估
  • 万象视界灵坛惊艳效果:像素风勋章系统动态升级——从‘青铜神谕者’到‘万象先知’的成长路径可视化
  • Go Routine 调度模型详解
  • Go Context 取消机制原理
  • 解锁Zotero插件管理新范式:让学术效率提升300%的实战指南
  • 二次封装ElementUI日期范围组件:打造带限制规则的Vue2 v-model响应式通用组件
  • JX3Toy终极指南:如何用自动化脚本提升剑网3游戏效率300%
  • 3大核心功能打造完美暗黑2角色:d2s-editor存档编辑工具全解析
  • 3分钟解锁音频自由:开源音频处理工具全方位解决方案
  • C++的std--span动态范围与静态范围在API设计中的灵活性选择
  • 番茄小说下载器:5分钟掌握离线阅读终极解决方案
  • 终极游戏自动化指南:如何用JX3Toy实现剑网3全门派DPS优化
  • 量化交易开发实战指南:从入门到部署
  • 凤铝新家装门窗工厂怎么样,从详细介绍看上海选购指南 - 工业品网
  • 从手速焦虑到技术制胜:DamaiHelper如何重塑抢票游戏规则
  • AI 模型微调与再训练实践
  • 《Spring Boot微服务架构下的电商秒杀系统设计与实现》