当前位置: 首页 > news >正文

Spring Kafka 动态消费实现案例

动态消费的使用场景

首先,什么是动态消费? 简单来讲,就是Spring Kafka提供了安全地在运行时调整消费状态的实现,可以随时调整消费者的消费状态。 比如暂时停止正在消费消息的消费者的消费,等到合适的时机再重新从中断的地方开始消费。 再比如关闭消费者线程不再消费,在执行某些操作(如修改消费者参数)之后,再重新创建消费者并开始消费。

下面详细说说。

  • 开关降级

当一个新功能发布之后,还没有正式使用,消费线程就没有创建的必要,此时@KafkaListener注解参数autoStartup = "false"就可以实现。 如果某个功能需要降级,也可以销毁消费线程,不再注册这个消费者。

  • 延迟消费

开始消费前需要进行自检、数据准备等操作。 有时候消费消息,需要提前检查某些配置或者数据是否已经存在,或者当服务启动之后,需要异步完成一些资源的初始化,才能正常消费。此时可以等待完成准备之后,再执行start()方法创建消费线程。

  • 限流背压

在高峰期由消费者端控制,防止系统崩溃,使用pause() 暂停拉取消息、resume()恢复拉取消息。

  • 参数调整

不停机修改消费配置(如 groupId、topic),使用stop() / start() ,重新创建消费者会触发消费者重平衡,会重新建立与 Kafka 的连接。

SpringKafka消费的实现机制

在实现Spring Kafka动态调整消费之前,需要先了解Spring Kafka的消费者是如何创建的

@KafkaListener 用于声明一个方法为Kafka消息监听器。 Spring Kafka 会在应用启动时自动完成以下工作:

  1. 扫描并解析 @KafkaListener 注解
  2. 创建对应的监听端点(KafkaListenerEndpoint)
  3. 将端点注册到 KafkaListenerEndpointRegistry
  4. 使用 KafkaListenerContainerFactory 创建监听容器
  5. 启动监听容器,开始消费消息

详细描述如下:

Java @KafkaListener ↓ 【后处理器PostProcessor】 Spring在启动时会加载一个后处理器:[KafkaListenerAnnotationBeanPostProcessor] 负责扫描所有Spring Bean,查找带有 @KafkaListener 的方法。 ↓ 【监听端点ListenerEndpoint】 创建监听端点:[MethodKafkaListenerEndpoint] 负责定义要监听的主题;消费组;监听方法;消息反序列化类型;容器工厂名称等。 ↓ 【全局注册表EndpointRegistry】 Processor将Endpoint注册到全局注册表:[KafkaListenerEndpointRegistry] 保存所有监听端点;管理监听容器的生命周期(启动、停止、暂停、恢复);提供运行时访问接口。 ↓ 【容器工厂ContainerFactory】 创建监听容器工厂:[KafkaListenerContainerFactory] 注册表使用工厂创建实际的监听容器 ↓ 【监听容器ListenerContainer】 拉取消息并调用监听方法:[ConcurrentMessageListenerContainer] 根据配置(autoStartup=true)自动启动所有监听容器 ↓ KafkaConsumer

通过上面的描述我们知道,KafkaListenerEndpointRegistry注册表管理者所有容器,MessageListenerContainer 接口的实现类保存着所有消费者容器,我们可以从消费者容器中获取到我们的消费者。

那么MessageListenerContainer提供了哪些方法呢? 主要是下面4个:

  • pause()

暂停拉取消息,不释放资源;

  • resume()

恢复拉取消息,不释放资源;

  • stop()

停止容器(可重新启动),不释放资源;

  • start()

启动容器,不释放资源;

这4个方法可以分为两组,分别是

  • pause() / resume()
  • stop() / start()

如何进行选择?下面详细讲解一下它们的差异。

pause() / resume():这两个方法是对消费者的轻量级控制,暂停或恢复消息拉取,不影响容器生命周期。适用场景是临时暂停消费(例如下游系统压力过大);或实现“背压”机制;或动态限流或批量处理控制。

pause():暂停拉取消息,但保持与 Kafka Broker 的连接,不会提交偏移量,也不会关闭线程;resume():恢复拉取消息,继续消费。

pause()之后,容器仍在运行状态(isRunning() == true,但isContainerPaused() == true);消费线程仍存在;不会触发消费者重平衡;恢复后从上次暂停的偏移继续消费。

stop() / start():重级控制,停止或启动整个监听容器,会释放或重建资源。适合场景:应用启动时延迟启动消费;动态启停某个监听器;部署或维护期间临时关闭消费;切换消费配置(如 groupId、topic)。

stop():停止容器运行;关闭所有 KafkaConsumer;释放线程池和资源;容器状态变为 isRunning() == false。

start():重新创建 KafkaConsumer;重新订阅主题;启动消费线程;容器状态变为 isRunning() == true。

Consumer动态消费实现案例

消费者Consumer注解:

ini@KafkaListener( topics = "${spring.kafka.multiple.consumer.mybiz.topics}", concurrency = "${spring.kafka.multiple.consumer.mybiz.concurrency}", containerFactory = "mybizConsumer", id = "mybizListenerId", // 容器id autoStartup = "false" // 是否自动注册,是"true",否"false",默认是 )

服务启动之后,由于 autoStartup = "false" ,不会注册消费者。

可以通过下面的管理器调整消费状态:

typescript @Service public class KafkaListenerAdjuster { @Autowired private KafkaListenerEndpointRegistry registry; // 暂停消费 public void pause(String listenerId) { MessageListenerContainer container = registry.getListenerContainer(listenerId); if (container != null && container.isRunning()) { container.pause(); } } // 恢复消费 public void resume(String listenerId) { MessageListenerContainer container = registry.getListenerContainer(listenerId); if (container != null) { container.resume(); } } // 停止消费 public void stop(String listenerId) { MessageListenerContainer container = registry.getListenerContainer(listenerId); if (container != null) { container.stop(); } } // 启动消费 public void start(String listenerId) { MessageListenerContainer container = registry.getListenerContainer(listenerId); if (container != null && !container.isRunning()) { container.start(); } } // 动态调整并发数 public void changeConcurrency(String listenerId, int concurrency) { ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer(listenerId); if (container != null) { container.setConcurrency(concurrency); } } }

总结

以上的实现在实际项目开发和生产环境,可以和分布式配置管理框架或注册中心(Spring Cloud Config、Nacos、ZooKeeper、Apollo、Consul等)结合,通过事件如SpringEvent等进行动态调整。在分布式系统或容器中,需要确保每台机器的消费状态都得到了调整。

除了动态调整消费状态之外,还可以 Spring Kafka 源码的基础之上,实现其他能力的动态调整,如增加自定义过滤器,可以实现消息的过滤消费,或者在消息中增加灰度标识等。

http://www.jsqmd.com/news/74111/

相关文章:

  • 40亿参数掀起AI革命:Qwen3-4B-FP8如何重塑轻量级智能应用新格局
  • 算法题 二叉搜索树中的插入操作
  • 【Docker Scout AI漏洞扫描揭秘】:如何利用人工智能精准发现容器安全盲点
  • 第三届教育发展与社会科学国际学术会议 (EDSS 2026)
  • 主题:**“数据质量监控漏关键规则,后来补Great Expectations才稳住血检数据一致性”**
  • DeepSeek-VL2重磅发布:新一代混合专家视觉语言模型引领多模态理解革命
  • SCHNEIDER BSH0702P01F2A 模块
  • Bili2text终极指南:免费快速将B站视频转为可编辑文字
  • Wan2.2-T2V-A14B在核磁共振成像原理科普中的微观世界构建
  • 哔哩下载姬:3分钟学会B站视频下载的终极指南
  • 18、游戏音效与音乐的添加与优化
  • 时序数据库选型指南,从大数据视角看新一代列式存储引擎的核心优势
  • 突破自动驾驶感知瓶颈:HunyuanWorld-Mirror引领3D环境建模新范式
  • 消费级显卡也能玩转多模态交互:Qwen2.5-Omni-7B-AWQ模型深度解析
  • Qt Creator
  • 1位数码管模拟值实验萌新速成大法
  • 告别强制训练!眼调节训练灯让近视防控契合孩子学业节奏
  • 英雄联盟智能辅助工具:自动化游戏体验全面解析
  • 2、Cocos2D游戏开发入门指南
  • 揭秘Docker环境下Agent服务迁移难题:3步实现跨环境稳定部署
  • 木材种类识别与分类:基于Mask R-CNN的MDF、MFC、OSB、实橡木和实松木自动检测技术详解
  • Wan2.2-T2V-A14B模型轻量化部署方案探索与实践
  • DeepSeek-V2-Chat-0628横空出世:开源大模型性能天花板再突破,多维度评测登顶行业前列
  • 卧室图像生成新突破:解析google/ddpm-bedroom-256扩散模型的技术实力与应用价值
  • 仿生记忆技术突破:字节跳动AHN-GDN模型实现百万字文本处理效率跃升
  • 高速电路设计
  • OpenAI Whisper语音模型现已登陆亚马逊SageMaker JumpStart,开启智能音频处理新纪元
  • 小米14C刷国际版步骤
  • 智能营销AI平台建设:Serverless架构的探索与实践
  • 智谱AI开源90亿参数轻量模型GLM-Z1-9B-0414:小参数大能力的技术突破