当前位置: 首页 > news >正文

【Kafka源码解读和使用指南】第16篇:RecordAccumulator源码深度解析——Kafka生产者的“消息缓冲区“秘密

上一篇【第15篇】Kafka集群元数据源码解析——生产者如何"认识"整个集群
下一篇【第17篇】MemoryRecords与RecordBatch源码解析——消息是怎么被"打包"的


摘要

KafkaProducer的send()方法是异步的——调用后消息并没有立即发送,而是被暂存到一个叫RecordAccumulator的缓冲区中。这个缓冲区是Kafka高吞吐量的核心秘密之一:它把多条零散的消息攒成一个Big Batch,然后由Sender线程一次性批量发送。这个设计像极了快递公司的"集包发货"——单个包裹单独寄很贵,但攒满一车再发就便宜了。本文将深入剖析RecordAccumulator的整体设计、ProducerBatch批次构建逻辑、linger.ms和batch.size这两个参数如何配合、消息追加(append)的完整流程,以及drain(排水)机制的饥饿预防策略。


一、RecordAccumulator的整体设计

1.1 一句话定位

RecordAccumulator =主线程和Sender线程之间的消息缓冲区,负责攒消息、形成批次、等待发送。

1.2 数据结构全景

【RecordAccumulator 核心数据结构】 RecordAccumulator │ ├── batches: ConcurrentMap<TopicPartition, Deque<RecordBatch>> │ │ │ │ "orders-0" ──► [Batch#1] ──► [Batch#2] ──► [Batch#3] │ │ ▲ │ │ └── 正在接收消息的"活跃"Batch(队尾) │ │ │ │ "orders-1" ──► [Batch#4] ──► [Batch#5] │ │ │ │ "orders-2" ──► [Batch#6] │ │ │ │ 外层ConcurrentMap(线程安全)+ 内层ArrayDeque(非线程安全,需synchronized保护) │ │ │ ├── batchSize: 每个RecordBatch底层ByteBuffer的默认大小(默认16KB) │ ├── compression: 压缩类型(none/gzip/snappy/lz4) │ ├── incomplete: Set<RecordBatch> → 未发送完成的批次集合 │ ├── free: BufferPool → 内存池,复用ByteBuffer │ ├── drainIndex: int → 防止饥饿的发送索引起点 │ └── appendsInProgress: AtomicInteger → 正在执行append操作的线程数

1.3 核心依赖关系

┌──────────────────────────────────────────────────────────────┐ │ RecordAccumulator │ │ │ │ ┌─────────────┐ ┌──────────────┐ ┌──────────────────┐ │ │ │ BufferPool │ │ RecordBatch │ │ MemoryRecords │ │ │ │ (内存池) │ │ (批次容器) │ │ (消息数据存储) │ │ │ │ │ │ │ │ │ │ │ │ - 复用ByteBuf │ │ - 批次管理 │ │ - ByteBuffer封装 │ │ │ │ - 避免频繁GC │ │ - 回调管理 │ │ - 压缩支持 │ │ │ └──────┬───────┘ └──────┬───────┘ └────────┬─────────┘ │ │ │ │ │ │ │ └─────────────────┴───────────────────┘ │ │ 三者协同工作 │ └──────────────────────────────────────────────────────────────┘

二、消息追加(append)完整流程

2.1 流程图

【RecordAccumulator.append() 完整流程】 KafkaProducer.send() 调用 │ ▼ ┌─────────────────────────────────────────────┐ │ append(tp, timestamp, key, value, callback) │ └──────────────────┬──────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────┐ │ 步骤①: getOrCreateDeque(tp) │ │ 在 batches 中查找 TopicPartition 对应的队列 │ │ 不存在则创建 ArrayDeque<RecordBatch> │ └──────────────────┬──────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────┐ │ 步骤②: synchronized(dq) { tryAppend() } │ │ 尝试向队尾最后一个RecordBatch追加消息 │ └──────────────────┬──────────────────────────┘ │ ┌────────┴────────┐ │ 追加成功? │ └────────┬────────┘ 是 │ │ 否 ▼ ▼ ┌──────────┐ ┌──────────────────────────┐ │ 直接返回 │ │ 步骤③: BufferPool.allocate() │ │ FutureRec │ │ 申请新的ByteBuffer │ └──────────┘ └──────────┬─────────────────┘ │ ▼ ┌──────────────────────────┐ │ 步骤④: synchronized(dq) │ │ 二次加锁 + 重试 tryAppend() │ └──────────┬───────────────┘ │ ┌────────┴────────┐ │ 追加成功? │ └────────┬────────┘ 是 │ │ 否 ▼ ▼ ┌──────────┐ ┌──────────────────────┐ │释放刚申请 │ │ 步骤⑤: 创建新的 │ │的Buffer │ │ RecordBatch + │ │返回结果 │ │ MemoryRecords │ └──────────┘ └──────────┬───────────┘ │ ▼ ┌──────────────────────┐ │ 步骤⑥: 将消息追加到 │ │ 新RecordBatch中 │ │ 将新Batch加入Deque队尾 │ │ 将新Batch加入incomplete │ └──────────┬───────────┘ │ ▼ ┌──────────────────────┐ │ 步骤⑦: 返回 │ │ RecordAppendResult │ │ (唤醒Sender的条件) │ └──────────────────────┘

2.2 为什么要"二次加锁+重试"?

这是RecordAccumulator最精妙的设计之一。关键在于:申请新Buffer可能会阻塞

// append() 方法的核心逻辑synchronized(dq){// 第一次尝试:直接往最后一个Batch里塞RecordAppendResultresult=tryAppend(timestamp,key,value,callback,dq);if(result!=null)returnresult;// 成功,直接返回}// ← synchronized块结束了,锁已释放!// 需要新Buffer,去BufferPool申请(这里可能阻塞!)ByteBufferbuffer=free.allocate(size,maxTimeToBlock);synchronized(dq){// 第二次加锁后重试一次——因为在你等Buffer的时候,// 其他线程可能已经往同一个Deque里加了新BatchRecordAppendResultresult=tryAppend(timestamp,key,value,callback,dq);if(result!=null){free.deallocate(buffer);// 用不上了,释放刚申请的Bufferreturnresult;}// 还是不行,那就用刚申请的Buffer创建全新RecordBatchMemoryRecordsrecords=MemoryRecords.emptyRecords(buffer,compression,batchSize);RecordBatchbatch=newRecordBatch(tp,records,time.milliseconds());FutureRecordMetadatafuture=batch.tryAppend(timestamp,key,value,callback,now);dq.addLast(batch);incomplete.add(batch);returnnewRecordAppendResult(future,dq.size()>1||batch.records.isFull(),true);}

如果整个流程都在一个synchronized块里,线程在等待BufferPool时持有Deque锁,其他线程就无法往同一个Deque追加消息了——这会造成不必要的阻塞。分成两个synchronized块,只在必要时持有锁,这是典型的"减小锁粒度"优化。

2.3 二次加锁重试还解决了一个问题:内部碎片

【内部碎片问题】 线程A(需要新Batch) 线程B(需要新Batch) │ │ ├─ 获取Deque锁 │ ├─ tryAppend() 失败 │ ├─ 释放Deque锁 │ ├─ BufferPool.allocate() ├─ 获取Deque锁 │ (等待中...) ├─ tryAppend() 失败 │ ├─ 释放Deque锁 │ ├─ BufferPool.allocate() ├─ 获取Deque锁 │ (等待中...) ├─ 二次tryAppend() 失败 │ ├─ 创建Batch(4)加入队尾 │ ├─ 释放Deque锁 │ │ ├─ 获取Deque锁 │ ├─ 二次tryAppend() 成功!← 赶上了线程A创建的Batch │ ├─ 释放Deque锁 │ │ │ 如果没有二次重试 → 线程B也会创建Batch(5) │ → Batch(4)收到一条消息后不再使用 → 内部碎片

三、linger.ms与batch.size——批量发送的"油门和刹车"

3.1 参数含义

参数默认值含义效果
batch.size16384 (16KB)每个Batch的ByteBuffer大小控制单批次最大数据量
linger.ms0Batch等待更多消息的最长时间人为制造延迟换取更大批量
buffer.memory32MBRecordAccumulator可用总内存防止内存溢出

3.2 两者如何配合决定发送时机

【linger.ms 和 batch.size 配合关系】 linger.ms = 0 (默认): ┌───── Batch满了(batch.size) ──► 立即发送 │ └───── Batch没满 ──► 也立即发送(不等待) linger.ms = 10 (设置等待): ┌───── Batch满了 ──► 立即发送 │ └───── Batch没满 ──► 等待最多10ms ├── 10ms内来新消息 → 追加 → 继续等待 └── 10ms超时 → 发送(即使没满) 效果对比: linger.ms=0: 延迟低 (接近0ms) 但可能发送很多小Batch linger.ms=10: 延迟略高 (~10ms) 但Batch更大 → 吞吐量更高

3.3 ready()方法的五个发送条件

publicReadyCheckResultready(Clustercluster,longnowMs){Set<Node>readyNodes=newHashSet<>();booleanunknownLeadersExist=false;booleanexhausted=this.free.queued()>0;// 条件3for(Map.Entry<TopicPartition,Deque<RecordBatch>>entry:batches.entrySet()){Deque<RecordBatch>deque=entry.getValue();synchronized(deque){RecordBatchbatch=deque.peekFirst();if(batch!=null){// 条件1: Batch满了 或 队列中有多个Batchbooleanfull=deque.size()>1||batch.records.isFull();// 条件2: 超过linger.ms设置的等待时间booleanexpired=waitedTimeMs>=timeToWaitMs;// 综合判断:5个条件满足其一即可booleansendable=full// 条件1:批次满了||expired// 条件2:等待超时||exhausted// 条件3:BufferPool空间耗尽(有线程在等)||flushInProgress()// 条件4:flush()被调用||closed;// 条件5:Producer正在关闭if(sendable&&!backingOff)readyNodes.add(leader);}}}returnnewReadyCheckResult(readyNodes,nextReadyCheckDelayMs,unknownLeadersExist);}

四、drain(排水)机制——如何批量取出消息

4.1 drain的作用

drain方法将RecordAccumulator中的数据"排"出来,供Sender线程构造ProduceRequest:

【drain转换映射】 输入: TopicPartition → RecordBatch 输出: NodeId → List<RecordBatch> batches: drain结果: "orders-0" → [B1, B2] Node#1 → [B1(from P0), B4(from P1)] "orders-1" → [B4] ──drain()──► "orders-2" → [B6, B7] Node#2 → [B6(from P2)] 转换逻辑: 通过Cluster元数据找到每个TopicPartition的Leader所在Node, 将Batch按照Node重新分组

4.2 drainIndex——防止饥饿的轮转索引起点

publicMap<Integer,List<RecordBatch>>drain(Clustercluster,Set<Node>nodes,intmaxSize,longnow){Map<Integer,List<RecordBatch>>batches=newHashMap<>();for(Nodenode:nodes){intsize=0;List<PartitionInfo>parts=cluster.partitionsForNode(node.id());List<RecordBatch>ready=newArrayList<>();// 关键:drainIndex 不是从0开始,而是从上一次结束的位置开始intstart=drainIndex=drainIndex%parts.size();do{PartitionInfopart=parts.get(drainIndex);Deque<RecordBatch>deque=getDeque(newTopicPartition(part.topic(),part.partition()));if(deque!=null){synchronized(deque){RecordBatchfirst=deque.peekFirst();if(first!=null){if(size+first.records.sizeInBytes()>maxSize&&!ready.isEmpty()){break;// 超出单次请求大小,本轮停止}RecordBatchbatch=deque.pollFirst();// 取出来batch.records.close();// 关闭写入,设为只读size+=batch.records.sizeInBytes();ready.add(batch);}}}// 索引递增,下次从下一位开始this.drainIndex=(this.drainIndex+1)%parts.size();}while(start!=drainIndex);// 遍历一圈后停止batches.put(node.id(),ready);}returnbatches;}

drainIndex的精妙之处:

【不用drainIndex(无轮转)vs 用drainIndex(有轮转)】 无轮转(始终从索引0开始): Node#1上有分区: P0, P1, P2, P3 每次drain: P0→P1→P2→P3(P0总是最先被处理) → P3可能永远抢不过P0 → P3所在业务"饥饿" 有轮转(drainIndex记录上次位置): 第1次drain: P0→P1→P2→P3 (drainIndex停在P0) 第2次drain: P1→P2→P3→P0 (drainIndex停在P1) 第3次drain: P2→P3→P0→P1 → 所有分区轮转公平 ✅

每次drain只从每个分区取一个RecordBatch:

  • 防止某个分区的大量消息独占带宽
  • 保证每个分区都有发送机会
  • 这是一种"小批量、快周转"的设计哲学

本篇小结

RecordAccumulator是KafkaProducer消息发送的"蓄水池",它的核心设计哲学可以总结为:

  • 攒小为大:通过批量发送减少网络往返次数,让吞吐量飞跃。batch.size控制单批大小,linger.ms控制等待时间
  • 减小锁粒度:append()方法的两个synchronized块分离,避免等待BufferPool时持有锁,这是高并发优化的经典技巧
  • 防止饥饿:drainIndex的轮转机制保证每个分区公平获得发送机会
  • 二次重试:申请新Buffer后再试一次append,减少内部碎片

消息进入RecordAccumulator后,最终存储在MemoryRecords中。下一篇,我们就扒开MemoryRecords和RecordBatch的内核,看看消息是怎么被"打包"存储的。


上一篇【第15篇】Kafka集群元数据源码解析——生产者如何"认识"整个集群
下一篇【第17篇】MemoryRecords与RecordBatch源码解析——消息是怎么被"打包"的


http://www.jsqmd.com/news/974010/

相关文章:

  • 从HAL库回看标准库:STM32F103的TIM1高级定时器,用标准库配置PWM互补输出更清晰吗?
  • 大模型系统提示词设计原理与安全实践指南
  • 高级应用:使用nli-distilroberta-base-v2进行文本聚类与相似度计算
  • 京东e卡回收怎么避坑,教你妥善处置闲置京东e卡 - 京顺回收
  • 生物信息学入门:让湿实验老手快速掌握RNA-seq分析
  • 如何用GetQzonehistory永久保存QQ空间记忆:免费开源备份工具完整指南
  • 2026深圳市权威认证贵金属回收 TOP5+黄金回收白银回收铂金回收门店地址电话推荐
  • 承重沙发脚生产厂商选哪家好 - 品牌推广大师
  • 从台湾到泰州:4000平米厂房背后的坚守,钰腾如何用笨功夫死磕品质?
  • 入行网安多年薪资不见涨?先看全等级薪资参考,再学高效逆袭策略
  • 2026甘肃国际旅行社排名:专业靠谱推荐榜前三名 - 资讯快报
  • 告别盲猜!手把手教你用CANoe和ISO15031标准,精准读取车辆VIN码和校准ID($09服务实战)
  • WinForms窗体缩放时控件自动等比适配的轻量封装类(含可运行示例)
  • 第七史诗自动化脚本终极指南:5分钟实现24小时游戏资源获取
  • 2026年6月劳力士全国官方售后网点最新名录|完整地址与服务热线权威指南 - 劳力士中国服务中心
  • 嵌入式开发必看:Ping-Pong、差分、压缩…实战中如何为你的MCU选择最‘香’的OTA升级方案?
  • Tadi 实验室:Splash 颜色格式助力颜色挑选,简单实现与多样应用
  • M1 Mac内存效率解析:8GB为何够用?统一内存架构与软硬件协同是关键
  • 广州增城祖传老黄金回收攻略|无钢印、无票据变现估价避坑指南 - 行行星
  • 避坑指南:Logisim运算器(Arithmetic)级联时,那些容易搞错的进位/借位连接
  • 百度网盘直链解析:5分钟突破限速的终极解决方案
  • 2026年国内中高端求职猎头服务专业度排行 实测维度对比 - 速递信息
  • 别再乱抛RuntimeException了!手把手教你设计一个实用的Java业务异常类(附完整代码)
  • 短信营销系统哪个靠谱?热门群发短信厂商推荐对比评测 - Qqinqin
  • 传统面膜敷越久补水越好,编写程序根据肤质,敷膜时长,计算皮肤水合度,预警过度敷膜损伤。
  • 3分钟快速上手:免费音乐歌词批量下载器完整指南
  • 如何用FlauBERT_small_cased快速实现法语文本特征提取?完整教程
  • 如何让老款Mac焕发新生:OpenCore Legacy Patcher完整使用指南
  • 数据即货币:个人与企业数据资产防护实战指南
  • Win10下用PHPStudy快速搭建PHP5.6.40环境,告别手动配置Apache的烦恼