当前位置: 首页 > news >正文

Flink Traces 用 Span 把“到底慢在哪”讲清楚

1. Flink Traces 的核心模型:Trace = 一棵 Span 树

  • Span表示一次发生在某段时间内的过程(有开始、结束、属性)。
  • Flink 当前支持的 trace 结构是:一棵树(tree of spans)
  • 重要限制:所有子 span 必须在一次addSpan调用里“一次性”上报,不能先报 parent,过一会儿再补 child,也不能单独上报 child/parent。

这意味着:如果你要记录一个过程的多级细分(父 span + 多个子 span),要在内存里构造好整棵树后再提交。

2. 在用户函数里上报 Trace:从 RuntimeContext 拿到 MetricGroup

Flink 把 tracing 的入口放在 MetricGroup 上:

  • 你需要在继承RichFunction的用户函数中,通过:
    getRuntimeContext().getMetricGroup()
    拿到MetricGroup,然后调用:
    MetricGroup#addSpan(SpanBuilder)

2.1 Java 示例:上报一个父 Span + 多个子 Span

下面的写法体现了两个关键点:
1)父 span 可以设置 start/end(可选)
2)子 span 必须在同一次 addSpan 中一起提交(包括多级 children)

publicclassMyClass{privateMetricGroupmetricGroup;voiddoSomething(){longstartTs=System.currentTimeMillis();// ... 业务逻辑longendTs=System.currentTimeMillis();metricGroup.addSpan(Span.builder(MyClass.class,"SomeAction").setStartTsMillis(startTs)// Optional.setEndTsMillis(endTs)// Optional.setAttribute("foo","bar")// Optional.addChild(Span.builder(MyClass.class,"ChildAction")// Optional).addChildren(List.of(Span.builder(MyClass.class,"AnotherChildAction"))));}}

你可以把它理解成:“一次 addSpan = 提交一棵 span 树”

2.2 Python(PyFlink)怎么写?

你提供的内容里 Python 部分还没展开具体 API。思路上是一样的:

  • 在 RichFunction 生命周期里拿到 runtime context
  • 从 metric group 构造 span builder
  • 一次性提交包含 children 的 span 树

如果你用的是 PyFlink,建议你优先确认你当前 Flink 版本对应的 PyFlink tracing API 是否已对齐(有些能力可能先在 Java 侧完善)。

3. TraceReporter:把 trace 发到外部系统

Span 上报之后,最终会交给TraceReporter输出到外部系统(比如你现有的可观测平台)。

你给的原文里提示:TraceReporter 的配置需要参考 “trace reporters documentation”。这里我不硬编配置项(不同版本/发行版可能不同),但给你一个落地建议:

  • 先把 TraceReporter 打到日志/本地(最容易验证是否“有数据”)
  • 再接入链路系统(如 OpenTelemetry 体系、Jaeger/Tempo/自研平台等)

验证路径很简单:先确认 Flink 侧确实在 addSpan 后产生输出,再做下一步接入。

4. Flink 内置 System Traces:Checkpoint 与 Job Initialization

除了你自己上报的 trace,Flink 也会自动上报一些系统级 traces。你提供的内容里重点是两类:

  • Checkpointing
  • Job Initialization(作业初始化/恢复)

并且它们的特点是:
Flink 会在事件到达终态(COMPLETED 或 FAILED)时,上报一个“单 span trace”,覆盖整个事件生命周期。

4.1 Checkpoint Trace(CheckpointStatsTracker)

Scope:org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
Name:Checkpoint

包含的关键 Attributes(你在排障时非常有用):

  • startTs:checkpoint 开始时间戳
  • endTs:checkpoint 结束时间戳
  • checkpointId:checkpoint ID
  • checkpointedSize:本次实际 checkpoint 的状态大小(bytes),增量场景可能小于 fullSize
  • fullSize:引用的完整状态大小(bytes)
  • checkpointStatusFAILED/COMPLETED
  • checkpointType:例如"Checkpoint","Full Checkpoint","Terminate Savepoint"
  • isUnaligned:是否为 unaligned checkpoint

怎么用它做判断:

  • checkpointedSize很小但耗时很长:可能不是“上传体积”导致,而是对齐、反压、阻塞等
  • isUnaligned=true且后续恢复慢:要重点关注初始化 trace 里的 buffer restore 相关阶段

4.2 JobInitialization Trace

Name:JobInitialization

Attributes 里把恢复/初始化拆得很细,尤其适合定位“恢复为什么慢”:

基础字段:

  • startTs/endTs
  • checkpointId(可选):从哪个 checkpoint 恢复
  • fullSize:用于恢复的 referenced state 完整大小

聚合耗时字段(Flink 会对所有 subtasks 做 max/sum 聚合):

  • (Max/Sum)MailboxStartDurationMs:subtask 创建到类/对象初始化完成的时间
  • (Max/Sum)ReadOutputDataDurationMs:读取 unaligned checkpoint 输出 buffers 的时间
  • (Max/Sum)InitializeStateDurationMs:初始化 state backend 的时间(含下载 state 文件)
  • (Max/Sum)GateRestoreDurationMs:读取 unaligned checkpoint 输入 buffers 的时间

RocksDB Incremental(可选,当前只提到对 RocksDB 增量恢复支持):

  • (Max/Sum)DownloadStateDurationMs:从 DFS 下载 state 文件耗时
  • (Max/Sum)RestoreStateDurationMs:本地化完成后的 restore 耗时
  • (Max/Sum)RestoredStateSizeBytes.[location]:按位置统计恢复的状态大小(LOCAL_MEMORY/LOCAL_DISK/REMOTE/UNKNOWN)
  • (Max/Sum)RestoreAsyncCompactionDurationMs:增量恢复后的异步 compaction 耗时

怎么用它快速定位瓶颈:

  • DownloadStateDurationMs很高:网络/DFS 吞吐或并发连接数是重点
  • InitializeStateDurationMs很高但下载不高:更多是 state backend 初始化与加载开销(例如 RocksDB 打开、恢复元数据)
  • GateRestoreDurationMs / ReadOutputDataDurationMs很高:unaligned checkpoint 的 buffer restore 成本要重点评估(尤其高并发/高吞吐作业)

5. 设计你自己的业务 Trace:建议的落地套路

下面这套做法很适合在生产里逐步加 tracing,不容易把自己搞乱:

1)先选一个“关键路径”
比如:外部调用(HTTP/DB)、复杂解析、写出 sink、或你怀疑有抖动的算子

2)Span 命名遵循可检索的规则
例如:

  • OrderEnrich
  • CallUserProfileService
  • DeserializePayload
  • BuildIndexDocument

3)Attributes 只放“有助于定位”的少量字段
例如:

  • table,topic,sink,api,status,errorType
  • 不建议把大文本、长 JSON 直接塞进去(既影响成本也不利于聚合)

4)子 span 划分要能对齐你的排障假设
例如一个 enrichment:

  • 父:EnrichUser

    • 子:ParseInput
    • 子:FetchCache
    • 子:CallRemote
    • 子:AssembleOutput

5)注意“一次性提交 span 树”的限制
如果你的子步骤是分散在不同函数/回调里,建议在内存里先收集时间片段,最后统一组装 span tree 再addSpan

6. 小结:Traces 在 Flink 里真正的价值

  • 系统 trace(checkpoint / initialization)直接给你“恢复/Checkpoint 慢在哪”的结构化证据
  • 业务 trace让你把“算子内部的关键耗时段”可视化、可聚合、可对比
  • 与 Metrics 配合:Metrics 告诉你“慢”,Traces 告诉你“慢在第几步、慢了多久、是哪类状态/哪类恢复阶段”
http://www.jsqmd.com/news/411562/

相关文章:

  • 2026年 塑胶模具厂家实力推荐榜:精密制造与创新设计引领行业新标杆 - 品牌企业推荐师(官方)
  • 2026绝缘靴手套耐压测试仪可靠品牌推荐:绝缘靴(手套)测试装置/pd局部放电测试仪/便携式局放检测仪/便携式局放测试仪/选择指南 - 优质品牌商家
  • 不踩雷! 9个降AIGC平台测评:专科生降AI率必看攻略
  • 京东e卡回收正规平台,从资质到流程的三大规范性指标 - 京回收小程序
  • 2026年知识库部署实力厂商推荐:服务商、方案商、全场景部署供应商全覆盖 - 品牌2025
  • 2026年值得关注的石材供应商,品质与服务并存,文化石/蘑菇石/冰裂纹/碎拼石/脚踏石/贴墙石,石材生产厂家找哪家 - 品牌推荐师
  • 2026年2月金属软管厂家推荐,精准检测与稳定性能深度解析 - 品牌鉴赏师
  • 计算机毕业设计springboot原创音乐分享网站 基于SpringBoot的独立音乐人作品发布与交流平台 SpringBoot框架下的原创音频内容分享与社区互动系统
  • 探寻2026电动骨组织手术设备市场:优质动力源头厂家推荐,运动医学/电动骨刨削动力,电动骨组织手术设备厂商选哪家 - 品牌推荐师
  • 格式总出错?8个AI论文写作软件测评:本科生毕业论文+科研写作必备工具推荐
  • 国内试剂盒厂家排名优选,省钱又省心的检测试剂盒采购指南 - 包罗万闻
  • 2026年十大高清视频素材网站推荐:UP主剪辑师必备素材库,涵盖延时视频等影视剪辑素材 - 品牌2026
  • AI需求正爆炸!a16z合伙人爆算力投资与应用落地的万亿财富密码
  • 计算机毕业设计springboot电脑行业进销存管理系统 基于SpringBoot的电脑配件销售与库存管控平台 电脑数码产品供应链及仓储信息化管理系统
  • 优思学院:精益六西格玛打破组织壁垒
  • 掼蛋扑克哪个品牌耐用?2026年厂家排名与推荐,解决频繁更换与体验不佳痛点 - 十大品牌推荐
  • 2026年西南防火卷帘门权威厂家推荐榜 - 优质品牌商家
  • 绕过现代EDR/HIDS:针对内存扫描与行为监控的规避技术实战
  • 2026年 热流道厂家推荐排行榜:针阀式/开放式/多腔热流道系统,高腔/精密/耐腐蚀热流道,医疗包装与电子产品专用热流道品牌深度解析 - 品牌企业推荐师(官方)
  • 信创环境下CKEDITOR如何实现微信公众号内容无缝粘贴上传?
  • 码垛自动化这事说简单也简单,说复杂能让人头秃。最近在威纶通平台上折腾出一套自动计算程序,直接把参数往里一扔就能生成码垛路径,今天就跟大伙唠唠实现思路
  • 2026年不锈钢天沟厂家盘点:这五家值得关注,不锈钢板/实心钢棒/201不锈钢冷热轧板材,不锈钢天沟源头厂家口碑推荐 - 品牌推荐师
  • 2026年正规的碳纤维注塑制品,碳纤维注塑制品,碳纤维注塑件厂家用户优选榜单 - 品牌鉴赏师
  • 机械行业CKEDITOR网页编辑器粘贴PPT图表时数据标签会丢失吗?
  • PP-Structure的在提取图片文字转markdown的局限;媒体流接口响应体(Response);会议内容实时转文字中说话人分离(Diarization);
  • 2026年2月四川焊机/空压机/发电机/二保焊机/激光焊机/租赁公司竞争格局深度分析:从设备提供商向综合服务商的范式转移 - 2026年企业推荐榜
  • RAG系统优化
  • 2026年无害化设备厂家联系电话推荐:精选推荐与使用指南 - 品牌推荐
  • 2026年无害化设备厂家联系电话推荐:专业服务直达 - 品牌推荐
  • 2026 西北建筑拆除加固领域实力甄选:五大专业服务商深度解析 - 深度智识库