当前位置: 首页 > news >正文

kafka源码-@KafkaListener消费端的poll调用逻辑

单独展开项目中 @KafkaListener 消费端从启动注册到 poll 拉取再到 listen() 被调用的完整源码调用链。版本对齐:Spring Boot 2.7.18 → spring-kafka 2.8.11 → kafka-clients 3.1.x。

一、与项目的对应关系

消费代码:

// 监听test_topic主题 @KafkaListener(topics = "test_topic") public void listen(String msg) { System.out.println("SpringBoot消费者收到消息:" + msg); }

关键配置(application.yml):

  • group-id: test-boot-group
  • enable-auto-commit: true
  • auto-offset-reset: earliest
  • StringDeserializer二、完整调用链

二、完整调用链

三、阶段 1:启动时注册监听器(还没到 poll)

3.1 启用@KafkaListener处理

KafkaAutoConfiguration通过内部类@EnableKafka激活注解驱动:

// KafkaAnnotationDrivenConfiguration.EnableKafkaConfiguration @EnableKafka // 导入 KafkaListenerAnnotationBeanPostProcessor 等 Bean

同时创建默认工厂:

@Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(...) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); // 注入 ConsumerFactory、pollTimeout 等 return factory; }

3.2 扫描KafkaMsgConsumer.listen

KafkaListenerAnnotationBeanPostProcessor在 Bean 初始化后扫描@KafkaListener

// processKafkaListener MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>(); endpoint.setMethod(method); // listen(String msg) endpoint.setBean(bean); // KafkaMsgConsumer 实例 endpoint.setTopics(["test_topic"]); endpoint.setGroupId(...); // 未指定则用默认 KafkaListenerContainerFactory factory = resolveContainerFactory(...); // kafkaListenerContainerFactory this.registrar.registerEndpoint(endpoint, factory);

3.3 注册并启动容器

所有单例 Bean 就绪后:

// KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated() this.registrar.afterPropertiesSet(); // → registerAllEndpoints() // KafkaListenerEndpointRegistrar.registerAllEndpoints() this.endpointRegistry.registerListenerContainer(descriptor.endpoint, factory, true); // autoStartup=true → 立刻 start()

四、阶段 2:创建容器与消费者(poll 前的准备)

4.1 工厂创建ConcurrentMessageListenerContainer

// ConcurrentKafkaListenerContainerFactory.createContainerInstance() ContainerProperties properties = new ContainerProperties("test_topic"); return new ConcurrentMessageListenerContainer<>(consumerFactory, properties);

默认concurrency = 1,所以只创建 1 个子容器KafkaMessageListenerContainer

4.2 绑定 MessageListener 适配器

// MethodKafkaListenerEndpoint.createMessageListener() RecordMessagingMessageListenerAdapter adapter = new RecordMessagingMessageListenerAdapter(bean, method, errorHandler); adapter.setHandlerMethod(new HandlerAdapter(invocableHandlerMethod)); // invocableHandlerMethod = 对 KafkaMsgConsumer.listen(String) 的反射封装

listen(String msg)参数只有 payload,监听器类型为ListenerType.SIMPLE

4.3 启动容器,提交消费线程

// KafkaMessageListenerContainer.doStart() this.listenerConsumer = new ListenerConsumer(listener, listenerType); consumerExecutor.submitListenable(this.listenerConsumer); // 异步线程,非 HTTP 线程

线程名类似:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1

4.4ListenerConsumer构造函数:创建KafkaConsumer并订阅

// ListenerConsumer 构造 this.autoCommit = determineAutoCommit(consumerProperties); // true(你的配置) this.consumer = consumerFactory.createConsumer( this.consumerGroupId, // "test-boot-group" clientId, clientIdSuffix, consumerProperties); subscribeOrAssignTopics(this.consumer); // → consumer.subscribe(["test_topic"], rebalanceListener)

底层:

// DefaultKafkaConsumerFactory.createRawConsumer() return new KafkaConsumer<>(configProps, keyDeserializerSupplier.get(), // StringDeserializer valueDeserializerSupplier.get()); // StringDeserializer

此时消费者加入test-boot-group,触发 ConsumerCoordinator 做 rebalance,分配test_topic的分区。


五、阶段 3:消费主循环 —run()pollAndInvoke()

5.1 无限循环

// ListenerConsumer.run() while (isRunning()) { try { pollAndInvoke(); // 每一轮 = poll + 处理 } catch (Exception e) { handleConsumerException(e); } }

5.2 单轮 poll 流程

// ListenerConsumer.pollAndInvoke() idleBetweenPollIfNecessary(); // 空闲时可能 sleep pauseConsumerIfNecessary(); // 容器 pause 时跳过 this.lastPoll = System.currentTimeMillis(); ConsumerRecords<K, V> records = doPoll(); // 核心 invokeIfHaveRecords(records); // 有记录则调用监听器 // doPoll() → pollConsumer() private ConsumerRecords<K, V> pollConsumer() { beforePoll(); try { return this.consumer.poll(this.pollTimeout); // 默认 5000ms } catch (WakeupException ex) { return ConsumerRecords.empty(); // 停止容器时唤醒 } }

5.3 有消息时进入监听调用

// invokeIfHaveRecords() if (records != null && records.count() > 0) { invokeListener(records); // 非 batch → invokeRecordListener } // invokeRecordListener() → doInvokeWithRecords() Iterator<ConsumerRecord<K, V>> iterator = records.iterator(); while (iterator.hasNext()) { ConsumerRecord<K, V> record = checkEarlyIntercept(iterator.next()); doInvokeRecordListener(record, iterator); }

六、阶段 4:从ConsumerRecord到你的listen(String msg)

6.1 单条记录处理

// doInvokeRecordListener() invokeOnMessage(record); // invokeOnMessage() doInvokeOnMessage(record); if (!isManualImmediateAck) { ackCurrent(record); // autoCommit=true 时此处基本不手动提交 }

6.2 适配器分发(ListenerType.SIMPLE

// doInvokeOnMessage() switch (this.listenerType) { case SIMPLE: this.listener.onMessage(record); // RecordMessagingMessageListenerAdapter break; ... }

6.3RecordMessagingMessageListenerAdapter→ 反射调用

// RecordMessagingMessageListenerAdapter.onMessage(record, ack, consumer) Object result = invokeHandler(record, acknowledgment, message, consumer); // MessagingMessageListenerAdapter.invokeHandler() return this.handlerMethod.invoke(message, data, acknowledgment, consumer); // data = ConsumerRecord,InvocableHandlerMethod 从 record.value() 提取 String // HandlerAdapter.invoke() return invocableHandlerMethod.invoke(message, providedArgs); // 最终调用 KafkaMsgConsumer.listen("hello")

项目的listen(String msg)收到的msg,就是ConsumerRecord.value()StringDeserializer反序列化后的结果。

七、阶段 5:KafkaConsumer.poll()内部(kafka-clients)

Spring 调用的是:

// KafkaConsumer.poll(Duration timeout) private ConsumerRecords<K, V> poll(Timer timer, boolean includeMetadataInTimeout) { do { // ① 协调器:心跳、rebalance、更新分区分配 updateAssignmentMetadataIfNeeded(timer, false); // ② 拉取数据 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); if (!records.isEmpty()) { // ③ 预取下一批(pipeline) if (fetcher.sendFetches() > 0) { client.transmitSends(); } // ④ 拦截器 + 返回 return interceptors.onConsume(new ConsumerRecords<>(records)); } } while (timer.notExpired()); return ConsumerRecords.empty(); // 超时无数据 }

pollForFetches内部:

// 本地缓冲区有数据 → 直接返回(已反序列化) Map<...> records = fetcher.fetchedRecords(); if (!records.isEmpty()) return records; // 否则发 FetchRequest 到 Broker,NetworkClient.poll() 等待响应 fetcher.sendFetches(); client.poll(pollTimeout, ...); return fetcher.fetchedRecords();

数据流:

Broker 磁盘

→ FetchResponse(字节)

→ Fetcher 解析

→ StringDeserializer.deserialize() // value 变 "hello"

→ ConsumerRecord("test_topic", partition, offset, "hello")

→ ConsumerRecords

→ Spring ListenerConsumer

→ KafkaMsgConsumer.listen("hello")


八、offset 提交(你的enable-auto-commit: true

项目的配置下 不由 Spring 手动 commit,而是由KafkaConsumer内部协调器按auto.commit.interval.ms(默认 5s)自动提交。

// ackCurrent() 在 autoCommit=true 时 else if (... && !this.autoCommit) { this.acks.add(record); // 不满足,跳过 } // → 不进入 Spring 手动提交逻辑

含义:

  • listen()正常返回后,offset 不会立刻提交
  • 通常在接下来几秒内由消费者客户端后台提交
  • listen()抛异常,默认错误处理下该条消息可能被重复消费(取决于提交时机)

九、精简版调用栈(对照源码用)

[启动]

SpringApplication.run()

└─ @EnableKafka → KafkaListenerAnnotationBeanPostProcessor

└─ processKafkaListener(KafkaMsgConsumer.listen)

└─ registrar.registerEndpoint(MethodKafkaListenerEndpoint)

└─ KafkaListenerEndpointRegistry.registerListenerContainer(..., autoStartup=true)

[容器启动]

ConcurrentKafkaListenerContainerFactory.createListenerContainer()

└─ ConcurrentMessageListenerContainer.doStart()

└─ KafkaMessageListenerContainer.doStart()

└─ SimpleAsyncTaskExecutor.submit(ListenerConsumer)

[消费线程初始化]

ListenerConsumer.<init>()

├─ DefaultKafkaConsumerFactory.createConsumer("test-boot-group", ...)

│ └─ new KafkaConsumer<>(configs, StringDeserializer, StringDeserializer)

└─ consumer.subscribe(["test_topic"], rebalanceListener)

[消费循环 - 独立线程]

ListenerConsumer.run()

└─ while (isRunning) pollAndInvoke()

├─ consumer.poll(Duration.ofMillis(5000)) // KafkaConsumer

│ ├─ ConsumerCoordinator.poll() // 心跳/rebalance

│ ├─ Fetcher.sendFetches() → NetworkClient // 向 Broker 发 Fetch

│ └─ Fetcher.fetchedRecords() // 反序列化 → ConsumerRecords

└─ invokeListener(records)

└─ doInvokeWithRecords()

└─ doInvokeRecordListener(record)

└─ invokeOnMessage(record)

└─ listener.onMessage(record) // RecordMessagingMessageListenerAdapter

└─ HandlerAdapter.invoke()

└─ InvocableHandlerMethod.invoke()

└─ KafkaMsgConsumer.listen("hello") // 你的业务方法


十、与生产侧的对比(同一进程)

维度生产侧 (KafkaTemplate.send)消费侧 (@KafkaListener)

触发线程

Tomcat HTTP 线程

ListenerConsumer专用线程

是否阻塞等待

send异步立即返回

poll最多阻塞pollTimeout(默认 5s)

核心 API

KafkaProducer.send

KafkaConsumer.poll

你的业务入口

KafkaTestController.send

KafkaMsgConsumer.listen

与 Broker 交互

ProducerRequest

FetchRequest + 心跳

Producer 把消息写入 Broker 后,消费线程在下一轮(或当前轮)poll中拉到,listen()才会执行——两者完全异步、不同线程。


十一、几个容易忽略的细节

1. 首次poll可能为空

Rebalance 完成前,poll可能返回空集合;auto-offset-reset: earliest决定从最早还是最新 offset 开始读。

2. 同一分区内顺序消费

concurrency=1时,同一 partition 的消息在doInvokeWithRecords的 while 循环中串行调用listen()

3.poll超时不是错误

5s 内无新消息 → 返回ConsumerRecords.empty()→ 进入 idle 检测 → 继续下一轮 poll。

4. 停止应用时

doStop()listenerConsumer.wakeup()pollWakeupException→ 返回空记录 → 循环退出。

5. 与KafkaTemplate.send的衔接点

唯一的交汇点是 Kafka Broker 上的test_topic;Spring 生产链路和消费链路在代码层没有直接调用关系。


源码对照:

  • KafkaMessageListenerContainer(ListenerConsumer.run/pollAndInvoke
  • RecordMessagingMessageListenerAdapter
  • KafkaConsumer.poll
http://www.jsqmd.com/news/1040228/

相关文章:

  • 洛雪音乐音源终极指南:5分钟打造你的免费高品质音乐库
  • 2026年诚信的江阴不锈钢管/航空航天管/江苏316H电站锅炉管批量采购厂家推荐 - 行业平台推荐
  • 基于python农产品销售数据分析可视化系统销量数据分析1(设计源文件+万字报告+讲解)(支持资料、图片参考_降重降ai)
  • Qwen3.5原生多模态智能体架构解析与工程落地指南
  • 3分钟学会:Windows上最轻量的安卓APK安装工具完全指南
  • 常见问题解决 --- trae的mcp服务不可用
  • OA与CMS系统漏洞挖掘:从权限边界突破到实战提权
  • TC820双斜积分ADC:从原理到3位半数字电压表设计实战
  • GeekAI会话安全深度剖析:从令牌管理到端到端加密的实战加固方案
  • 豆包智能感从何而来:五层能力涌现机制解析
  • 2026年可靠的家用调味一烤竹盐/四川富硒一烤竹盐/四川高温煅烧一烤竹盐/益鼎天养一烤竹盐可靠供应商推荐 - 行业平台推荐
  • 网络安全信息收集实战:MSCAN+NMAP+NC+Python构建自动化侦察框架
  • 京东后端Agent开发面经整理
  • Kimi K2.5多模态训练范式深度解析:MoE架构与解耦式视觉编码
  • 2026年比较好的玄武岩玻璃纤维带/玻璃纤维绳源头工厂推荐 - 品牌宣传支持者
  • HIS医院信息系统实战指南:从单体到微服务的全景式部署与运维
  • TC1030低功耗运放:1.8V单电源与独立关断的物联网传感方案
  • Gemini 3.1 Pro科研提示词公式:四层指令激活学术推理
  • 还在为协作绘图工具选择困难?Drawnix开源白板的终极解决方案
  • 行测的思维判断电子版pdf|行测判断推理|粉笔行测思维
  • 2026年热门的浙江大型设备搬运吊装/宁波工厂设备搬运吊装/整厂设备搬运吊装定制加工厂家推荐 - 行业平台推荐
  • 2026年比较好的温润调养九烤竹盐/成都无添加天然九烤竹盐/九烤竹盐/九烤竹盐四川竹盐生产厂家推荐 - 品牌宣传支持者
  • AI驱动的自动化渗透测试:HexStrike-AI项目实战与家庭网络安全评估
  • Web安全实战:从XSS与CSRF攻防到代码审计与SDL实践
  • Windows热键冲突智能侦探:精准定位被占用快捷键的终极秘籍
  • SH9自指螺旋拓扑与圈量子引力的严格同构证明(世毫九实验室原创研究)
  • 2026年优秀的宁波工厂设备搬运吊装/浙江重型设备搬运吊装批量采购厂家推荐 - 品牌宣传支持者
  • Windows平台OpenClaw v2.7.9配置实操|从下载到运行的完整指引
  • 2026年评价高的南充房车汽车龙头企业/南充越野房车/四川越野房车/四驱房车厂家选择推荐 - 品牌宣传支持者
  • 从Nmap到自动化闭环:构建匹配现代漏洞发现速度的修复体系