当前位置: 首页 > news >正文

Flink JobStatusChangedListener把作业状态变化 + Lineage 发到 DataHub / OpenLineage

1、它是什么?何时触发?

  • 你实现的是:JobStatusChangedListener

  • 触发时机:应用(作业)状态每次变化都会触发

    • 比如 CREATED / RUNNING / FAILING / FAILED / CANCELLING / CANCELED / FINISHED 等(以 Flink 的 job status 为准)
  • 血缘信息(lineage)在哪里?

    • JobCreatedEvent中包含 source/sink lineage 信息
    • 你在onEvent(JobStatusChangedEvent event)里收到事件,遇到JobCreatedEvent时可以解析并上报

2、插件怎么做:3 步走(和 FailureEnricher 很像)

要实现一个自定义 JobStatusChangedListener 插件,你需要:

  1. 实现JobStatusChangedListener
  2. 实现JobStatusChangedListenerFactory
  3. 使用 Java SPI 注册工厂类:
    创建文件
    META-INF/services/org.apache.flink.core.execution.JobStatusChangedListenerFactory
    内容是你的 Factory 全类名

打成 jar 后放入 Flink 的 plugins 目录(独立 classloader 隔离):

$FLINK_HOME/plugins/ job-status-changed-listener/ flink-lineage-listener.jar

3、最小示例代码(可直接改成 DataHub/OpenLineage 上报)

3.1 Factory 示例

packagecom.yourcompany.flink.lineage;publicclassMyJobStatusChangedListenerFactoryimplementsJobStatusChangedListenerFactory{@OverridepublicJobStatusChangedListenercreateListener(Contextcontext){returnnewMyJobStatusChangedListener(context);}}

3.2 Listener 示例(接收事件并处理 JobCreatedEvent)

packagecom.yourcompany.flink.lineage;publicclassMyJobStatusChangedListenerimplementsJobStatusChangedListener{privatefinalJobStatusChangedListenerFactory.Contextcontext;publicMyJobStatusChangedListener(JobStatusChangedListenerFactory.Contextcontext){this.context=context;}@OverridepublicvoidonEvent(JobStatusChangedEventevent){// 1) 所有状态变化都会到这里// 2) lineage 在 JobCreatedEvent 里if(eventinstanceofJobCreatedEvent){JobCreatedEventcreated=(JobCreatedEvent)event;// 伪代码:从 created 中提取 source/sink lineage// LineageInfo lineage = created.getLineageInfo();// sendToDatahubOrOpenLineage(lineage);}else{// 你也可以上报运行/失败/取消等状态,用于血缘系统里的运行实例关联// sendJobStatus(event.getNewStatus(), event.getJobId(), ...)}}}

3.3 SPI 文件内容

文件路径:

META-INF/services/org.apache.flink.core.execution.JobStatusChangedListenerFactory

文件内容(一个工厂类名一行):

com.yourcompany.flink.lineage.MyJobStatusChangedListenerFactory

4、配置:不配就不启动(关键)

Flink 启动时加载 JobStatusChangedListener 插件,但是否启用由配置决定:

execution.job-status-changed-listeners = com.yourcompany.flink.lineage.MyJobStatusChangedListenerFactory

注意:

  • 这里配置的是Factory 的类名(你给的例子也是 factory)
  • 如果该配置为空:不会启动任何 listener

5)实践建议:做成“血缘上报”生产可用版本

  • 只在 JobCreatedEvent 做血缘解析:避免每次状态变化都做重逻辑
  • 异步上报 + 超时兜底:上报系统不可用时别拖垮 JM 线程
  • 幂等/去重:同一个 jobId + runId(或提交时间)做幂等 key,避免重复写血缘
  • 关联运行实例:把 RUNNING/FAILED/FINISHED 状态也上报,用于 lineage 系统展示“本次运行”
  • 多环境标识:在上报里附加 clusterId、namespace、env、tenant 等标签,方便血缘平台分组
http://www.jsqmd.com/news/384759/

相关文章:

  • 【信息科学与工程学】【产品体系】第十二篇 制造业生产加工05
  • 【RT-DETR涨点改进】全网独家首发、特征融合改进篇| AAAI 2026 | 引入MAFM多维注意力引导融合模块,自适应加权策略选择最优信息,同时关注局部细节与全局结构,助力RT-DETR有效涨点
  • 风电场现在可不是单纯发电的工具人了,电网掉链子的时候也得出来扛事。今天咱们聊聊双馈风机怎么玩转调频,直接上硬菜不整虚的
  • 4层电梯S7-1200PLC与TP700触摸屏博途V15.1程序77,带IO表接线图CAD及运...
  • 2026年2月宜兴手工紫砂壶定制厂家推荐,高端手作与全案制作能力 - 品牌鉴赏师
  • app可能用到的功能----20个家常菜做法---添加菜谱功能
  • 分段润色降AI实操教程:一段一段改,AI率从80%降到8%
  • django基于Django框架的襄阳四方汽车检测站管理系统的设计与实现(编号:44750673)
  • 2026年国内有实力的不锈钢板现货厂家怎么选择,430不锈钢板/2205不锈钢板/不锈钢冷拉扁钢,不锈钢板厂家怎么选择 - 品牌推荐师
  • app家常菜菜谱UI设计
  • 洛谷 P10575 [蓝桥杯 2024 国 A] 下一次相遇
  • 鸿蒙 App 架构重建后,为何再次失控
  • Kimi和豆包写的论文AI率太高?手把手教你用工具快速解决
  • Seed团队最新发布的Seed2.0系列大语言模型
  • 恶性疟原虫显微镜图像的目标检测数据集分享(适用于目标检测任务)
  • 硕士论文AI率要求15%以下,怎么安全达标?亲测有效的方法
  • 携程任我行礼品卡闲置了要怎么回收变现 - 抖抖收
  • 中国20个最常见的家常菜-----app添加的菜做法
  • 公文和自媒体内容降AI全攻略:不只是论文才需要降AI
  • 2014-2015 Winter Petrozavodsk Camp, Andrew Stankevich Contest 47 (ASC 47) 总结
  • 2025年市面上有实力的仓储货架供应厂家电话,仓储货架/横梁货架/轻型货架/中型货架/穿梭式货架,仓储货架供应商有哪些 - 品牌推荐师
  • 实时数据库与关系型数据库 - 详解
  • IDEA 中使用 claude code 插件 - 实践
  • 真的太省时间!千笔ai写作,继续教育论文神器
  • 公司发的百联ok卡在哪里回收变现靠谱 - 抖抖收
  • 只要一台服务器,就能拥有自己的专属网盘:Cloudreve 部署完整记录
  • 把自己的网盘搬进服务器:OpenList 部署完整指南
  • 万方文察AIGC检测85%怎么办?实测两个工具帮你降到安全线
  • 盘点2026年热门的门禁品牌都有哪些,漏气探测器/甲醛检测仪家用/家用报警主机/人体存在传感器,门禁厂家有哪些 - 品牌推荐师
  • 公众号文章降AI率,自媒体去AI味工具推荐