kafka源码-@KafkaListener消费端的poll调用逻辑
单独展开项目中 @KafkaListener 消费端从启动注册到 poll 拉取再到 listen() 被调用的完整源码调用链。版本对齐:Spring Boot 2.7.18 → spring-kafka 2.8.11 → kafka-clients 3.1.x。
一、与项目的对应关系
消费代码:
// 监听test_topic主题 @KafkaListener(topics = "test_topic") public void listen(String msg) { System.out.println("SpringBoot消费者收到消息:" + msg); }关键配置(application.yml):
group-id: test-boot-groupenable-auto-commit: trueauto-offset-reset: earliestStringDeserializer二、完整调用链
二、完整调用链
三、阶段 1:启动时注册监听器(还没到 poll)
3.1 启用@KafkaListener处理
KafkaAutoConfiguration通过内部类@EnableKafka激活注解驱动:
// KafkaAnnotationDrivenConfiguration.EnableKafkaConfiguration @EnableKafka // 导入 KafkaListenerAnnotationBeanPostProcessor 等 Bean同时创建默认工厂:
@Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(...) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); // 注入 ConsumerFactory、pollTimeout 等 return factory; }3.2 扫描KafkaMsgConsumer.listen
KafkaListenerAnnotationBeanPostProcessor在 Bean 初始化后扫描@KafkaListener:
// processKafkaListener MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>(); endpoint.setMethod(method); // listen(String msg) endpoint.setBean(bean); // KafkaMsgConsumer 实例 endpoint.setTopics(["test_topic"]); endpoint.setGroupId(...); // 未指定则用默认 KafkaListenerContainerFactory factory = resolveContainerFactory(...); // kafkaListenerContainerFactory this.registrar.registerEndpoint(endpoint, factory);3.3 注册并启动容器
所有单例 Bean 就绪后:
// KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated() this.registrar.afterPropertiesSet(); // → registerAllEndpoints() // KafkaListenerEndpointRegistrar.registerAllEndpoints() this.endpointRegistry.registerListenerContainer(descriptor.endpoint, factory, true); // autoStartup=true → 立刻 start()四、阶段 2:创建容器与消费者(poll 前的准备)
4.1 工厂创建ConcurrentMessageListenerContainer
// ConcurrentKafkaListenerContainerFactory.createContainerInstance() ContainerProperties properties = new ContainerProperties("test_topic"); return new ConcurrentMessageListenerContainer<>(consumerFactory, properties);默认concurrency = 1,所以只创建 1 个子容器KafkaMessageListenerContainer。
4.2 绑定 MessageListener 适配器
// MethodKafkaListenerEndpoint.createMessageListener() RecordMessagingMessageListenerAdapter adapter = new RecordMessagingMessageListenerAdapter(bean, method, errorHandler); adapter.setHandlerMethod(new HandlerAdapter(invocableHandlerMethod)); // invocableHandlerMethod = 对 KafkaMsgConsumer.listen(String) 的反射封装listen(String msg)参数只有 payload,监听器类型为ListenerType.SIMPLE。
4.3 启动容器,提交消费线程
// KafkaMessageListenerContainer.doStart() this.listenerConsumer = new ListenerConsumer(listener, listenerType); consumerExecutor.submitListenable(this.listenerConsumer); // 异步线程,非 HTTP 线程线程名类似:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1。
4.4ListenerConsumer构造函数:创建KafkaConsumer并订阅
// ListenerConsumer 构造 this.autoCommit = determineAutoCommit(consumerProperties); // true(你的配置) this.consumer = consumerFactory.createConsumer( this.consumerGroupId, // "test-boot-group" clientId, clientIdSuffix, consumerProperties); subscribeOrAssignTopics(this.consumer); // → consumer.subscribe(["test_topic"], rebalanceListener)底层:
// DefaultKafkaConsumerFactory.createRawConsumer() return new KafkaConsumer<>(configProps, keyDeserializerSupplier.get(), // StringDeserializer valueDeserializerSupplier.get()); // StringDeserializer此时消费者加入test-boot-group,触发 ConsumerCoordinator 做 rebalance,分配test_topic的分区。
五、阶段 3:消费主循环 —run()→pollAndInvoke()
5.1 无限循环
// ListenerConsumer.run() while (isRunning()) { try { pollAndInvoke(); // 每一轮 = poll + 处理 } catch (Exception e) { handleConsumerException(e); } }5.2 单轮 poll 流程
// ListenerConsumer.pollAndInvoke() idleBetweenPollIfNecessary(); // 空闲时可能 sleep pauseConsumerIfNecessary(); // 容器 pause 时跳过 this.lastPoll = System.currentTimeMillis(); ConsumerRecords<K, V> records = doPoll(); // 核心 invokeIfHaveRecords(records); // 有记录则调用监听器 // doPoll() → pollConsumer() private ConsumerRecords<K, V> pollConsumer() { beforePoll(); try { return this.consumer.poll(this.pollTimeout); // 默认 5000ms } catch (WakeupException ex) { return ConsumerRecords.empty(); // 停止容器时唤醒 } }5.3 有消息时进入监听调用
// invokeIfHaveRecords() if (records != null && records.count() > 0) { invokeListener(records); // 非 batch → invokeRecordListener } // invokeRecordListener() → doInvokeWithRecords() Iterator<ConsumerRecord<K, V>> iterator = records.iterator(); while (iterator.hasNext()) { ConsumerRecord<K, V> record = checkEarlyIntercept(iterator.next()); doInvokeRecordListener(record, iterator); }六、阶段 4:从ConsumerRecord到你的listen(String msg)
6.1 单条记录处理
// doInvokeRecordListener() invokeOnMessage(record); // invokeOnMessage() doInvokeOnMessage(record); if (!isManualImmediateAck) { ackCurrent(record); // autoCommit=true 时此处基本不手动提交 }6.2 适配器分发(ListenerType.SIMPLE)
// doInvokeOnMessage() switch (this.listenerType) { case SIMPLE: this.listener.onMessage(record); // RecordMessagingMessageListenerAdapter break; ... }6.3RecordMessagingMessageListenerAdapter→ 反射调用
// RecordMessagingMessageListenerAdapter.onMessage(record, ack, consumer) Object result = invokeHandler(record, acknowledgment, message, consumer); // MessagingMessageListenerAdapter.invokeHandler() return this.handlerMethod.invoke(message, data, acknowledgment, consumer); // data = ConsumerRecord,InvocableHandlerMethod 从 record.value() 提取 String // HandlerAdapter.invoke() return invocableHandlerMethod.invoke(message, providedArgs); // 最终调用 KafkaMsgConsumer.listen("hello")项目的listen(String msg)收到的msg,就是ConsumerRecord.value()经StringDeserializer反序列化后的结果。
七、阶段 5:KafkaConsumer.poll()内部(kafka-clients)
Spring 调用的是:
// KafkaConsumer.poll(Duration timeout) private ConsumerRecords<K, V> poll(Timer timer, boolean includeMetadataInTimeout) { do { // ① 协调器:心跳、rebalance、更新分区分配 updateAssignmentMetadataIfNeeded(timer, false); // ② 拉取数据 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); if (!records.isEmpty()) { // ③ 预取下一批(pipeline) if (fetcher.sendFetches() > 0) { client.transmitSends(); } // ④ 拦截器 + 返回 return interceptors.onConsume(new ConsumerRecords<>(records)); } } while (timer.notExpired()); return ConsumerRecords.empty(); // 超时无数据 }pollForFetches内部:
// 本地缓冲区有数据 → 直接返回(已反序列化) Map<...> records = fetcher.fetchedRecords(); if (!records.isEmpty()) return records; // 否则发 FetchRequest 到 Broker,NetworkClient.poll() 等待响应 fetcher.sendFetches(); client.poll(pollTimeout, ...); return fetcher.fetchedRecords();数据流:
Broker 磁盘
→ FetchResponse(字节)
→ Fetcher 解析
→ StringDeserializer.deserialize() // value 变 "hello"
→ ConsumerRecord("test_topic", partition, offset, "hello")
→ ConsumerRecords
→ Spring ListenerConsumer
→ KafkaMsgConsumer.listen("hello")
八、offset 提交(你的enable-auto-commit: true)
项目的配置下 不由 Spring 手动 commit,而是由KafkaConsumer内部协调器按auto.commit.interval.ms(默认 5s)自动提交。
// ackCurrent() 在 autoCommit=true 时 else if (... && !this.autoCommit) { this.acks.add(record); // 不满足,跳过 } // → 不进入 Spring 手动提交逻辑含义:
listen()正常返回后,offset 不会立刻提交- 通常在接下来几秒内由消费者客户端后台提交
- 若
listen()抛异常,默认错误处理下该条消息可能被重复消费(取决于提交时机)
九、精简版调用栈(对照源码用)
[启动]
SpringApplication.run()
└─ @EnableKafka → KafkaListenerAnnotationBeanPostProcessor
└─ processKafkaListener(KafkaMsgConsumer.listen)
└─ registrar.registerEndpoint(MethodKafkaListenerEndpoint)
└─ KafkaListenerEndpointRegistry.registerListenerContainer(..., autoStartup=true)
[容器启动]
ConcurrentKafkaListenerContainerFactory.createListenerContainer()
└─ ConcurrentMessageListenerContainer.doStart()
└─ KafkaMessageListenerContainer.doStart()
└─ SimpleAsyncTaskExecutor.submit(ListenerConsumer)
[消费线程初始化]
ListenerConsumer.<init>()
├─ DefaultKafkaConsumerFactory.createConsumer("test-boot-group", ...)
│ └─ new KafkaConsumer<>(configs, StringDeserializer, StringDeserializer)
└─ consumer.subscribe(["test_topic"], rebalanceListener)
[消费循环 - 独立线程]
ListenerConsumer.run()
└─ while (isRunning) pollAndInvoke()
├─ consumer.poll(Duration.ofMillis(5000)) // KafkaConsumer
│ ├─ ConsumerCoordinator.poll() // 心跳/rebalance
│ ├─ Fetcher.sendFetches() → NetworkClient // 向 Broker 发 Fetch
│ └─ Fetcher.fetchedRecords() // 反序列化 → ConsumerRecords
└─ invokeListener(records)
└─ doInvokeWithRecords()
└─ doInvokeRecordListener(record)
└─ invokeOnMessage(record)
└─ listener.onMessage(record) // RecordMessagingMessageListenerAdapter
└─ HandlerAdapter.invoke()
└─ InvocableHandlerMethod.invoke()
└─ KafkaMsgConsumer.listen("hello") // 你的业务方法
十、与生产侧的对比(同一进程)
| 维度 | 生产侧 (KafkaTemplate.send) | 消费侧 (@KafkaListener) |
|---|---|---|
触发线程 | Tomcat HTTP 线程 |
|
是否阻塞等待 |
|
|
核心 API |
|
|
你的业务入口 |
|
|
与 Broker 交互 | ProducerRequest | FetchRequest + 心跳 |
Producer 把消息写入 Broker 后,消费线程在下一轮(或当前轮)poll中拉到,listen()才会执行——两者完全异步、不同线程。
十一、几个容易忽略的细节
1. 首次poll可能为空
Rebalance 完成前,poll可能返回空集合;auto-offset-reset: earliest决定从最早还是最新 offset 开始读。
2. 同一分区内顺序消费
concurrency=1时,同一 partition 的消息在doInvokeWithRecords的 while 循环中串行调用listen()。
3.poll超时不是错误
5s 内无新消息 → 返回ConsumerRecords.empty()→ 进入 idle 检测 → 继续下一轮 poll。
4. 停止应用时
doStop()→listenerConsumer.wakeup()→poll抛WakeupException→ 返回空记录 → 循环退出。
5. 与KafkaTemplate.send的衔接点
唯一的交汇点是 Kafka Broker 上的test_topic;Spring 生产链路和消费链路在代码层没有直接调用关系。
源码对照:
- KafkaMessageListenerContainer(
ListenerConsumer.run/pollAndInvoke) - RecordMessagingMessageListenerAdapter
- KafkaConsumer.poll
