Kafka自动提交把消息吃了:一次“已提交未处理”+重平衡导致丢数和爆堆积
今天下午正准备摸鱼,监控把我从工位上拽了回来:order-consumer 的 LAG 从 0 直接飙到 30w+,下游还在吼“有订单没落库”。我当场一激灵,这不是堆积就是丢消息,两个一起来真把人整破防了。
事故现场
- 现象:
- Kafka 消费堆积(某分区 LAG 10w+);
- 业务对账发现少量“漏单”;
- 应用日志里全是外部接口超时和消费者重平衡的提示。
日志随手一截:
2026-03-08 14:00:03 INFO o.a.k.c.c.i.ConsumerCoordinator - [groupId=order-consumer] Committing offsets: {order-2=OffsetAndMetadata{offset=421341}} 2026-03-08 14:00:15 ERROR c.xxx.order.OrderListener - process order 1008611 failed java.net.SocketTimeoutException: Read timed out 2026-03-08 14:01:46 WARN o.a.k.c.c.i.ConsumerCoordinator - Offset commit failed: CommitFailedException: Commit cannot be completed since the group has already rebalanced 2026-03-08 14:01:46 INFO o.a.k.c.c.i.AbstractCoordinator - [groupId=order-consumer] (Re-)joining group命令行看一眼堆积:
$ kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group order-consumer --describe TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG order 2 421341 451782 30441 order 3 523001 823001 300000排查脑回路(踩坑实录)
- 先看 Kafka 集群:Broker 一切健康,生产端速率平稳,没有明显抖动。
- 盯消费端线程栈(jstack):一堆线程卡在对外 HTTP 调用(某三方风控),
read timeout=5s,遇到抖动就排队排成蛇形。 - 看消费者配置,直接裂开:
enable.auto.commit=true,而且处理逻辑里还有重 IO。也就是说:- 自动提交在后台按间隔提交“上次 poll 到的最大位移”,和处理是否成功没半毛钱关系;
- 一旦我们处理失败或进程挂了,就可能出现“已提交未处理”,消息就这么没了(对账看到的漏单就是这么来的)。
- 再一个坑:处理时间波动大,某些批次超过了
max.poll.interval.ms,消费者被踢出组触发 rebalance,CommitFailedException和重复消费一起出现,堆积更离谱。
问题基本锁定:
- 丢消息:自动提交导致。
- 堆积:长耗时处理 + poll 不及时 -> 重平衡 -> 反复拉取 -> 处理更慢,雪上加霜。
问题代码(反例)
@KafkaListener(topics = "order", groupId = "order-consumer") public void listen(String msg) { // enable.auto.commit = true(默认),每5s自动提交 OrderEvent evt = JSON.parseObject(msg, OrderEvent.class); // 重IO:外部HTTP + DB事务 riskClient.check(evt); // 偶发5s+超时 orderService.persist(evt); // 事务落库 // 这里异常了也没人处理,offset 可能已经提交 }application.yml(反例):
spring: kafka: consumer: enable-auto-commit: true max-poll-interval-ms: 300000 # 默认5分钟 max-poll-records: 500 # 一次拉太多修复方案(目标:至少一次 + 可控重试 + 有底线的失败兜底)
核心思路:
- 关闭自动提交,处理成功后手动提交;
- 控制单次处理量,避免超过
max.poll.interval.ms; - 配置重试与死信队列(DLQ);
- 做幂等,重复消费也不怕;
- 外部IO限时+隔离,别把消费者线程拖死。
1) 消费端改成手动提交 + 错误处理
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory( ConsumerFactory<String, String> cf, KafkaTemplate<String, String> kt) { ConcurrentKafkaListenerContainerFactory<String, String> f = new ConcurrentKafkaListenerContainerFactory<>(); f.setConsumerFactory(cf); f.setConcurrency(6); // 视CPU和partition数 f.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 失败重试 + 死信 DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kt); DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3L)); errorHandler.addNotRetryableExceptions(IllegalArgumentException.class); // 非重试类错误 f.setCommonErrorHandler(errorHandler); f.getContainerProperties().setPollTimeout(2000); return f; }@KafkaListener(topics = "order", groupId = "order-consumer") public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) { OrderEvent evt = JSON.parseObject(record.value(), OrderEvent.class); try { // 关键:外部IO加限时与熔断 riskClient.checkWithTimeout(evt, Duration.ofSeconds(2)); orderService.processWithIdempotency(evt); // 幂等处理 ack.acknowledge(); // 成功才提交 } catch (TransientNetworkException e) { // 交给DefaultErrorHandler重试,失败进DLQ throw e; } catch (Exception e) { // 不可恢复的就直接进DLQ(或记录+告警) throw new IllegalArgumentException("non-retryable", e); } }application.yml(关键参数):
spring: kafka: consumer: enable-auto-commit: false max-poll-interval-ms: 600000 # 拉长上限,避免长批处理被踢 max-poll-records: 200 # 降低单批处理量 isolation-level: read_committed2) 幂等处理(去重兜底)
数据库加唯一键,按业务ID去重:
ALTER TABLE t_order ADD UNIQUE KEY uk_biz (biz_id);@Transactional public void processWithIdempotency(OrderEvent evt) { int n = orderMapper.insertIgnore(evt.getBizId(), evt.getPayload()); if (n == 0) { // 重复消息,直接返回 return; } // 实际业务处理... }或用 Redis 版本:
boolean first = redisTemplate.opsForValue() .setIfAbsent("dedup:" + evt.getBizId(), "1", 1, TimeUnit.DAYS); if (!first) return;3) 生产端也补上“至少一次”的保险
props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); props.put(ProducerConfig.RETRIES_CONFIG, 5); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);并发不大时可设为1,追求最稳。
4) 外部IO隔离
- 给风控/三方调用加超时、限流、舱壁隔离(独立线程池);
- 兜不住就降级或旁路,别堵消费者主线程。
验证
- 发布后 LAG 从 30w 逐步清零,消费速率拉满且稳定;
- 再次压测制造超时,消费线程不再被踢出组,无
CommitFailedException; - 对账补跑后无新增漏单,重复消息被幂等逻辑吞掉。
踩坑总结
- 自动提交配长流程=定时炸弹,别用;成功再手动提交才是正道。
max.poll.interval.ms要和单批处理时间匹配,max.poll.records别贪大。- 重试+DLQ+幂等是三件套,少一件迟早挨打。
