当前位置: 首页 > news >正文

Kafka :存储、复制与可靠性

本章目标

本章从底层解释 Kafka 为什么吞吐高、为什么能容错,以及什么配置会影响丢消息和重复消息。

Kafka 日志存储模型

Kafka 的 partition 本质是追加日志。每个 partition 在磁盘上对应一个目录,目录中有多个日志段文件。

典型文件:

00000000000000000000.log 00000000000000000000.index 00000000000000000000.timeindex 00000000000001000000.log 00000000000001000000.index 00000000000001000000.timeindex
文件作用
.log保存真实消息内容
.indexoffset 到物理位置的稀疏索引
.timeindex时间戳到 offset 的索引

Kafka 高吞吐的核心原因:

  • 顺序追加写磁盘,避免随机写。
  • 充分利用 OS page cache。
  • 批量发送和批量落盘。
  • sendfile / zero-copy 降低内核态和用户态拷贝。
  • partition 并行分布在多个 broker。

Retention 与 Compaction

Kafka 删除数据不是因为消费者消费了,而是因为保留策略。

常见配置:

retention.ms=604800000 retention.bytes=107374182400 segment.bytes=1073741824 segment.ms=604800000

含义:

  • retention.ms:保留多久。
  • retention.bytes:最多保留多大。
  • segment.bytes:单个日志段大小。
  • segment.ms:日志段滚动时间。

Delete 策略

默认策略,超过时间或大小后删除旧日志段。

适合:

  • 行为日志。
  • 订单事件流水。
  • 操作日志。

Compact 策略

按 key 保留最新值,旧值会被压缩清理。

适合:

  • 用户最新状态。
  • 配置变更。
  • 数据库 CDC 的最新快照。

配置示例:

kafka-configs --bootstrap-server localhost:9092\--entity-type topics\--entity-name user-profile-snapshot\--alter\--add-configcleanup.policy=compact

副本复制

每个 partition 可以有多个 replica。一个 replica 是 leader,其余是 follower。

Producer 和 Consumer 默认只与 leader 交互,follower 从 leader 拉取数据。

关键概念:

概念说明
Leader当前处理读写请求的副本
Follower从 leader 复制数据的副本
ARAssigned Replicas,所有分配副本
ISRIn-Sync Replicas,与 leader 保持同步的副本
OSROut-of-Sync Replicas,落后太多的副本
LEOLog End Offset,日志末尾位置
HWHigh Watermark,消费者可见的最高已提交位置

ISR 与 HW

Kafka 不会把 leader 刚写入但尚未被足够副本确认的消息立即暴露为“稳定数据”。HW 表示已经被 ISR 副本确认的安全位置,消费者只能读到 HW 之前的消息。

这解决的问题:

  • leader 写入一条消息后立刻宕机。
  • follower 没来得及复制。
  • 新 leader 不包含那条消息。
  • 如果消费者之前已经读到那条消息,就会出现“读到的数据后来消失”。

Kafka 通过 HW 避免消费者读到未提交数据。

Producer 可靠性配置

可靠性从 producer 开始:

acks=all enable.idempotence=true retries=2147483647 max.in.flight.requests.per.connection=5 delivery.timeout.ms=120000 request.timeout.ms=30000

acks=0

Producer 发出去就认为成功,不等待 broker。吞吐高,但可能丢消息。

适合:可丢弃日志、埋点采样。

acks=1

Leader 写入成功就返回。leader 宕机且 follower 未同步时可能丢消息。

适合:一般日志,但不适合核心交易。

acks=all

Leader 等 ISR 中副本确认后返回。配合min.insync.replicas可以显著降低丢消息风险。

生产建议:

replication.factor=3 min.insync.replicas=2 acks=all unclean.leader.election.enable=false

含义:3 副本中至少 2 个同步副本确认,才认为写入成功;不同步副本不能被选为 leader。

幂等生产者

幂等生产者解决“发送成功但响应丢失,producer 重试导致重复写入”的问题。

开启:

enable.idempotence=true

Kafka 为 producer 分配 PID,并为每个 partition 维护 sequence number。broker 发现同一 PID、同一 partition 上重复 sequence,会去重。

边界:

  • 幂等只保证单 producer session 内、单 partition 上的写入不重复。
  • producer 重启后 PID 变化,业务层仍建议有eventId做幂等。

Kafka 事务

Kafka 事务解决“多条消息要么一起对消费者可见,要么一起不可见”的问题。

配置:

transactional.id=order-tx-producer-1 enable.idempotence=true

事务流程:

beginTransaction send topic A send topic B sendOffsetsToTransaction commitTransaction

消费者如果只想读已提交事务数据:

isolation.level=read_committed

适用场景:

  • 从一个 topic 消费,处理后写入另一个 topic,同时提交消费 offset。
  • Kafka Streams exactly-once 处理。

不适用场景:

  • 直接保证数据库和 Kafka 的强一致事务。数据库不参与 Kafka 事务。
  • 跨外部 HTTP 服务的全局事务。

数据库 + Kafka 更常用的是 Outbox 模式:

业务事务写订单表 + outbox_event 表 -> 后台任务/CDC 发送 Kafka -> 标记已发送

Consumer 可靠性

Consumer 可靠性重点不是 Kafka 能否保存消息,而是 offset 提交时机。

错误做法:

poll -> commit offset -> 业务处理

处理失败后消息不会再被消费。

推荐做法:

poll -> 业务处理成功 -> commit offset

如果业务处理成功但提交 offset 失败,消息可能重复消费。因此消费者业务必须支持幂等。

端到端语义

语义条件说明
At most once先提交 offset 后处理可能丢,不重复
At least once处理成功后提交 offset不易丢,可能重复
Exactly onceKafka 事务 + 幂等 + read_committed只覆盖 Kafka 内链路

在业务系统中,最常见、最务实的是:

Kafka 至少一次投递 + 消费端业务幂等

实操:可靠性配置检查

查看 topic 配置:

dockercomposeexeckafka kafka-configs\--bootstrap-server localhost:9092\--entity-type topics\--entity-name order-events\--describe

创建 3 副本 topic 的生产命令在单 broker demo 中不可用,但生产环境应类似:

kafka-topics --bootstrap-server kafka-1:9092\--create\--topicorder-events\--partitions12\--replication-factor3\--configmin.insync.replicas=2

检查 ISR:

kafka-topics --bootstrap-server kafka-1:9092\--describe\--topicorder-events

重点看:

Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

如果 ISR 长期少于副本数,说明 follower 落后或 broker 异常,需要排查网络、磁盘、GC、负载。

04 性能调优、压测与容量规划

本章目标

Kafka 调优不是记一堆参数,而是围绕目标吞吐、延迟、可靠性和成本做取舍。本章给出可落地的调优路线:

  • Producer 批量、压缩、并发。
  • Broker 磁盘、网络、线程、页缓存。
  • Consumer 拉取、批处理、并发和背压。
  • Topic 分区和容量规划。
  • 压测方法与指标解释。

性能问题先分类

表现可能原因优先排查
Producer 发送慢批次太小、网络慢、broker 写入慢producer metrics、request latency
Consumer 堆积消费逻辑慢、分区太少、下游慢lag、处理耗时、线程池
Broker CPU 高压缩消耗、请求太多、TLS/SASLCPU、请求队列、网络线程
Broker 磁盘忙顺序写压力大、page cache 不足iostat、log flush、磁盘延迟
Rebalance 频繁消费者心跳超时、实例波动consumer group logs
某分区热点key 分布不均partition bytes in/out

Producer 调优

批量发送

Producer 不是每条消息都立刻发送一个网络请求,而是先进入本地缓冲区,按 topic-partition 聚合成批次。

关键配置:

linger.ms=10 batch.size=65536 buffer.memory=67108864 compression.type=lz4

调优思路:

  • 延迟敏感:linger.ms小一些,例如 0-5ms。
  • 吞吐优先:linger.ms适当增大,例如 10-50ms。
  • 消息较小:提高batch.size更容易合批。
  • 网络或磁盘压力大:开启lz4zstd压缩。

可靠性与吞吐取舍

配置吞吐可靠性说明
acks=0不等确认
acks=1中高leader 写入即成功
acks=all等 ISR 确认
compression.type=noneCPU 低不直接影响网络和磁盘压力高
compression.type=lz4常见较优不直接影响综合性能好

Broker 调优

磁盘

Kafka 强依赖磁盘顺序 IO。生产建议:

  • 使用 SSD 或高性能云盘。
  • 日志目录分散到多块盘。
  • 保留足够 page cache。
  • 不要把 broker 和重 IO 服务混部。
  • 监控磁盘使用率、IO wait、读写延迟。

关键配置:

log.dirs=/data/kafka-logs-1,/data/kafka-logs-2 log.segment.bytes=1073741824 log.retention.hours=168 num.recovery.threads.per.data.dir=2

网络线程和 IO 线程

num.network.threads=8 num.io.threads=16 queued.max.requests=500 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400

调优原则:

  • 请求队列积压说明 broker 处理不过来。
  • 网络线程不足时 request queue 会升高。
  • IO 线程不足时磁盘相关处理延迟升高。
  • 不要盲目调大线程,先看 CPU 是否还有余量。

Consumer 调优

批处理

max.poll.records=500 fetch.min.bytes=1048576 fetch.max.wait.ms=500 max.partition.fetch.bytes=1048576

如果消费逻辑支持批量写库,应该尽量批处理:

poll 500 条 -> 批量校验 -> 批量写入数据库 -> 提交 offset

比每条消息一次数据库写入更稳定。

背压

当下游数据库或 HTTP 服务变慢时,消费者继续高速拉取会导致内存和线程池堆积。

策略:

  • 降低max.poll.records
  • 暂停对应 partition:consumer.pause(partitions)
  • 下游恢复后再resume
  • 对非核心业务使用限流和降级。
  • 对核心业务保留堆积容量和告警阈值。

容量规划

输入指标

容量规划至少需要这些数字:

指标示例用途
峰值 TPS30,000 msg/s估算请求量
平均消息大小1 KB估算带宽和磁盘
保留时间7 天估算存储
副本数3存储乘数
压缩比0.5修正存储和网络
目标峰值利用率60%保留冗余

存储估算

公式:

每日原始数据量 = TPS * 消息大小 * 86400 实际存储 = 每日原始数据量 * 保留天数 * 副本数 * 压缩比 / 磁盘目标利用率

示例:

TPS = 30000 消息大小 = 1KB 保留 = 7天 副本 = 3 压缩比 = 0.5 磁盘目标利用率 = 0.7 每日原始数据 = 30000 * 1KB * 86400 = 2471 GB 实际存储 = 2471 * 7 * 3 * 0.5 / 0.7 = 37065 GB

大约需要 36 TB 可用磁盘容量。

Partition 估算

如果单 partition 写入能力按 10 MB/s,峰值写入约:

30000 msg/s * 1KB = 30 MB/s

写入角度至少 3 个 partition。但消费角度如果需要 24 个消费者并行处理,则 topic 至少要 24 个 partition。

建议:

partition = max(写入吞吐所需, 消费并行度所需) * 未来增长系数

压测工具

Kafka 自带压测脚本:

Producer 压测:

kafka-producer-perf-test\--topicperf-test\--num-records1000000\--record-size1024\--throughput-1\--producer-propsbootstrap.servers=localhost:9092acks=allcompression.type=lz4

Consumer 压测:

kafka-consumer-perf-test\--bootstrap-server localhost:9092\--topicperf-test\--messages1000000\--groupperf-test-group

看结果时重点关注:

  • records/sec
  • MB/sec
  • avg latency
  • p95/p99 latency
  • producer error rate
  • consumer lag

热点分区治理

热点分区常见原因:

  • key 分布不均,例如大量消息 key 都是system
  • 某个大客户、热门商品、热门直播间流量过高。
  • 分区数量不足。

治理方法:

方法示例代价
key 打散userId + randomBucket牺牲严格顺序
大客户单独 topicvip-order-eventstopic 增多
增加 partition12 -> 24key 映射变化
分业务拆 topic订单、支付、履约分离架构调整

如果必须保证单订单顺序,可以按orderId分区;如果只需要用户级聚合,可以按userId分区;如果更关注吞吐,可以使用更细粒度散列 key。

http://www.jsqmd.com/news/742903/

相关文章:

  • 不止是浮起来:用UE5 Water插件和蓝图,给你的小船加上真实物理驾驶与动态尾浪
  • ODesign:多模态分子设计与生成世界模型解析
  • AI开发环境一键部署:基于Docker的本地化AI工作空间解决方案
  • C#网络编程避坑指南:从Socket到TcpClient,我踩过的那些异步和资源释放的坑
  • Nemotron-Cascade:强化学习驱动的模型级联推理框架
  • 别再手动备份了!用StableBit DrivePool给Windows做个“云盘级”本地存储池(附详细配置)
  • Kafka Streams、Connect 与生态
  • Cocos Creator 3.x 项目上架前必做:一键生成五种尺寸图标并替换APP图标的懒人教程
  • 低轨卫星C语言星载软件功耗优化实战手册(NASA/JAXA/北斗在轨验证版)
  • 终极指南:使用TegraRcmGUI图形化工具实现Windows平台Switch破解注入
  • SD-PPP技术架构深度解析:Photoshop与AI工作流集成方案
  • 街头巷尾的绝味面饼大盘点,硬菜、软糯、酥香,满满都是情怀
  • ARM Fast Models跟踪组件在Cortex-M85调试中的应用
  • Vim插件sideways.vim:高效重构代码列表项的智能工具
  • 坑啊浪费我时间!!!!!基于真实工程对比的 AI 辅助三维建模能力边界与落地方案
  • Altech DO-1 Modbus监控器:工业物联网数据采集解决方案
  • 逆向实战:我是如何一步步解开美团外卖App的mtgsig3.0签名(附关键代码片段)
  • GD32H759I-EVAL开发板TLI驱动LCD避坑指南:从GPIO配置到图层叠加的实战经验
  • Performance-Fish:让RimWorld后期卡顿彻底消失的性能优化模组
  • 自动驾驶实时导航:BEV与Ego-Video双模态融合技术解析
  • Arm CI-700互联架构的时钟与电源管理机制解析
  • 非线性干涉仪色散效应与量子OCT补偿技术
  • 【农业物联网驱动代码安全红线】:IEEE 11073-20601合规性检查清单+6类未定义行为(UB)在土壤pH传感器驱动中的真实案例
  • 写接口,不写实现:LangChain4j 的 @AiService 到底有多优雅?
  • YOLO11性能暴增:主干网络升级 | 替换为PoolFormer主干,用最简单的池化操作替代自注意力,化繁为简的艺术
  • LMOps:构建大语言模型应用开发的工业化流水线
  • 如何用Boss直聘批量投递工具实现高效求职?日均50+投递的智能方案
  • 机器学习模型表格数据检索:方法与评估框架
  • 2026成都靠谱市场调查报告公司:专业的市场调查公司推荐/专业的市场调研公司推荐/专业的市场调研机构推荐/四川做市场调研的公司推荐/选择指南 - 优质品牌商家
  • AI代码生成质量守卫:eslint-plugin-ai-guard实战指南