当前位置: 首页 > news >正文

Kafka自动提交把消息吃了:一次“已提交未处理”+重平衡导致丢数和爆堆积

今天下午正准备摸鱼,监控把我从工位上拽了回来:order-consumer 的 LAG 从 0 直接飙到 30w+,下游还在吼“有订单没落库”。我当场一激灵,这不是堆积就是丢消息,两个一起来真把人整破防了。


事故现场

  • 现象:
    • Kafka 消费堆积(某分区 LAG 10w+);
    • 业务对账发现少量“漏单”;
    • 应用日志里全是外部接口超时和消费者重平衡的提示。

日志随手一截:

2026-03-08 14:00:03 INFO o.a.k.c.c.i.ConsumerCoordinator - [groupId=order-consumer] Committing offsets: {order-2=OffsetAndMetadata{offset=421341}} 2026-03-08 14:00:15 ERROR c.xxx.order.OrderListener - process order 1008611 failed java.net.SocketTimeoutException: Read timed out 2026-03-08 14:01:46 WARN o.a.k.c.c.i.ConsumerCoordinator - Offset commit failed: CommitFailedException: Commit cannot be completed since the group has already rebalanced 2026-03-08 14:01:46 INFO o.a.k.c.c.i.AbstractCoordinator - [groupId=order-consumer] (Re-)joining group

命令行看一眼堆积:

$ kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group order-consumer --describe TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG order 2 421341 451782 30441 order 3 523001 823001 300000

排查脑回路(踩坑实录)

  1. 先看 Kafka 集群:Broker 一切健康,生产端速率平稳,没有明显抖动。
  2. 盯消费端线程栈(jstack):一堆线程卡在对外 HTTP 调用(某三方风控),read timeout=5s,遇到抖动就排队排成蛇形。
  3. 看消费者配置,直接裂开:enable.auto.commit=true,而且处理逻辑里还有重 IO。也就是说:
    • 自动提交在后台按间隔提交“上次 poll 到的最大位移”,和处理是否成功没半毛钱关系;
    • 一旦我们处理失败或进程挂了,就可能出现“已提交未处理”,消息就这么没了(对账看到的漏单就是这么来的)。
  4. 再一个坑:处理时间波动大,某些批次超过了max.poll.interval.ms,消费者被踢出组触发 rebalance,CommitFailedException和重复消费一起出现,堆积更离谱。

问题基本锁定:

  • 丢消息:自动提交导致。
  • 堆积:长耗时处理 + poll 不及时 -> 重平衡 -> 反复拉取 -> 处理更慢,雪上加霜。

问题代码(反例)

@KafkaListener(topics = "order", groupId = "order-consumer") public void listen(String msg) { // enable.auto.commit = true(默认),每5s自动提交 OrderEvent evt = JSON.parseObject(msg, OrderEvent.class); // 重IO:外部HTTP + DB事务 riskClient.check(evt); // 偶发5s+超时 orderService.persist(evt); // 事务落库 // 这里异常了也没人处理,offset 可能已经提交 }

application.yml(反例):

spring: kafka: consumer: enable-auto-commit: true max-poll-interval-ms: 300000 # 默认5分钟 max-poll-records: 500 # 一次拉太多

修复方案(目标:至少一次 + 可控重试 + 有底线的失败兜底)

核心思路:

  • 关闭自动提交,处理成功后手动提交;
  • 控制单次处理量,避免超过max.poll.interval.ms
  • 配置重试与死信队列(DLQ);
  • 做幂等,重复消费也不怕;
  • 外部IO限时+隔离,别把消费者线程拖死。

1) 消费端改成手动提交 + 错误处理

@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory( ConsumerFactory<String, String> cf, KafkaTemplate<String, String> kt) { ConcurrentKafkaListenerContainerFactory<String, String> f = new ConcurrentKafkaListenerContainerFactory<>(); f.setConsumerFactory(cf); f.setConcurrency(6); // 视CPU和partition数 f.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 失败重试 + 死信 DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kt); DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3L)); errorHandler.addNotRetryableExceptions(IllegalArgumentException.class); // 非重试类错误 f.setCommonErrorHandler(errorHandler); f.getContainerProperties().setPollTimeout(2000); return f; }
@KafkaListener(topics = "order", groupId = "order-consumer") public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) { OrderEvent evt = JSON.parseObject(record.value(), OrderEvent.class); try { // 关键:外部IO加限时与熔断 riskClient.checkWithTimeout(evt, Duration.ofSeconds(2)); orderService.processWithIdempotency(evt); // 幂等处理 ack.acknowledge(); // 成功才提交 } catch (TransientNetworkException e) { // 交给DefaultErrorHandler重试,失败进DLQ throw e; } catch (Exception e) { // 不可恢复的就直接进DLQ(或记录+告警) throw new IllegalArgumentException("non-retryable", e); } }

application.yml(关键参数):

spring: kafka: consumer: enable-auto-commit: false max-poll-interval-ms: 600000 # 拉长上限,避免长批处理被踢 max-poll-records: 200 # 降低单批处理量 isolation-level: read_committed

2) 幂等处理(去重兜底)

数据库加唯一键,按业务ID去重:

ALTER TABLE t_order ADD UNIQUE KEY uk_biz (biz_id);
@Transactional public void processWithIdempotency(OrderEvent evt) { int n = orderMapper.insertIgnore(evt.getBizId(), evt.getPayload()); if (n == 0) { // 重复消息,直接返回 return; } // 实际业务处理... }

或用 Redis 版本:

boolean first = redisTemplate.opsForValue() .setIfAbsent("dedup:" + evt.getBizId(), "1", 1, TimeUnit.DAYS); if (!first) return;

3) 生产端也补上“至少一次”的保险

props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); props.put(ProducerConfig.RETRIES_CONFIG, 5); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

并发不大时可设为1,追求最稳。

4) 外部IO隔离

  • 给风控/三方调用加超时、限流、舱壁隔离(独立线程池);
  • 兜不住就降级或旁路,别堵消费者主线程。

验证

  • 发布后 LAG 从 30w 逐步清零,消费速率拉满且稳定;
  • 再次压测制造超时,消费线程不再被踢出组,无CommitFailedException
  • 对账补跑后无新增漏单,重复消息被幂等逻辑吞掉。

踩坑总结

  • 自动提交配长流程=定时炸弹,别用;成功再手动提交才是正道。
  • max.poll.interval.ms要和单批处理时间匹配,max.poll.records别贪大。
  • 重试+DLQ+幂等是三件套,少一件迟早挨打。
http://www.jsqmd.com/news/453836/

相关文章:

  • 把 AI助手搬进飞书!OpenClaw接入完整指南
  • 2026广州GEO优化公司排名TOP5|本地实力派盘点,亚森SEO稳居榜首!
  • 周红伟:2026年OpenClaw最佳实践:一键部署+免费API配置+集成8大股票分析Skills及避坑指南
  • matlab麻雀搜索算法(SSA)优化BP神经网络,权值和阈值,一个压缩包共三个文件,包括有数...
  • 深度学习在财务报表舞弊识别中的应用:构建一个智能审计助手
  • Rokid UXR 的手势追踪虚拟中更真实的手实战开发【含 工程源码 和 最终完成APK】
  • 开发者的临时文件自动化工具:提升效率与系统整洁度的关键方案
  • 别只当它是管家,RT-Thread 会自己生长
  • 权威解读:企业合作政策如何让非科班生通过国内AI认证实现“弯道超车”?
  • 2026年房山及燕山地区装修套餐全解析:五大优质服务商深度推荐 - 品牌2026
  • openclaw gateway status报错且gate无法正常运行解决办法
  • 无数绘画测试!Nano Banana 2 vs GPT Image 1.5,谁才是最厉害的模型
  • LeetCode-35.搜索插入位置
  • 基于javaweb的作业智能推荐系统的设计与实现
  • 2026超纯水机厂家推荐:进口与国产品牌实力对比 - 品牌推荐大师
  • 光学神经网络:进展与挑战(Optical Neural Networks: Progress and Challenges)
  • 如何本地部署大模型(以PaddleOCR-VL-1.5为例)
  • 2026年房山环保家装公司怎么选?五家实力装企深度解析 - 品牌2026
  • Gemini 3.1 Flash Image Preview (Nano Banana 2) 深度技术评测与极速接入指南
  • 2026最新25万级SUV智驾领先双能源车型推荐!权威榜单发布 - 十大品牌榜
  • ESP-IDF Chip revision问题解决方案
  • 2026年 信捷电气厂家推荐排行榜:江苏/安徽/上海/苏州/常州/合肥/芜湖/南京/镇江/南通/徐州/无锡,专业自动化解决方案与技术创新实力深度解析 - 品牌企业推荐师(官方)
  • 留学申请服务费用多少,考虑性价比,澳洲、马来西亚有啥好选择 - mypinpai
  • ARM Cortex-R52 内核详解(三)——异常处理机制
  • 【hello-agent】Plan-and-Solve
  • 2026 电池放电仪、电池内阻仪厂家:技术与口碑的双重认证 - 深度智识库
  • 计算机毕业设计源码:基于Python的智能推荐电商平台 Django Vue3 Scrapy爬虫 协同过滤推荐算法 大模型 购物 多模态 deepseek agent(建议收藏)✅
  • 2026最新25万级SUV/25万级SUV智驾/25万级SUV轿跑/25万级SUV双能源车型推荐:智驾领跑,实力重塑出行标杆 - 十大品牌榜
  • 2026年广州响应式网站建设,谁才是真正的口碑之选?
  • 2026年全国ITSS咨询服务机构费用盘点,上海擎标价格合理 - mypinpai