当前位置: 首页 > news >正文

Flink 实时数仓开发实战:Catalog 快照,让 DDL 只写一次

前两篇文章我们解决了两件事:怎么跑(像 Hive 那样用 Flink SQL),怎么管(像后端那样 CI/CD)。

传统的 Flink SQL DDL 散落在每个 SQL 脚本里。一个实时数仓有几十张表,每张表的CREATE TABLE出现在引用它的每个脚本中。改一个字段类型所有脚本全要改。这也是为什么需要一个Flink SQL 元数据中心

本文介绍的Catalog 快照解决的就是这个问题:把 DDL 从 SQL 脚本中剥离,序列化为一个自包含的 JSON 文件,作业启动时自动恢复到内存 Catalog。实现DDL 写一次,所有作业共用

本文将深入 Catalog 快照的设计思路和底层原理 ——为什么必须是"快照?"、如何在 Flink 现有 API 基础上实现 DDL 语义的完全兼容?。本文基于 flink-sql-bootstrap 搭建,如果你不想直接用该项目,文中也介绍了内部原理能够指引你如何自建。

为什么是"快照"?

先看快照长什么样。这是一个最简版本——只定义两张表,两个 UDF:

{
"version": 1,
"snapshotId": "20240622-155500-a1b2",
"catalogName": "platform",
"databaseName": "default",
"tables": [
{
"database": "default",
"name": "ods_words",
"columns": [
{ "name": "sentence", "type": "STRING", "nullable": true }
],
"options": {
"connector": "datagen",
"rows-per-second": "1"
}
},
{
"database": "default",
"name": "dws_word_count",
"columns": [
{ "name": "word", "type": "STRING", "nullable": false },
{ "name": "cnt", "type": "BIGINT", "nullable": false }
],
"options": {
"connector": "print"
}
}
],
"views": [],
"udfs": [
{
"name": "my_reverse",
"className": "examples.udf.MyReverseFunction",
"functionLanguage": "JAVA",
"jarRef": "example-udf-reverse.jar"
},
{
"name": "my_substring",
"className": "examples.udf.MySubstringFunction",
"functionLanguage": "JAVA",
"jarRef": "example-udf-substring.jar"
}
]
}

配合这份快照,SQL 脚本里不再需要任何 DDL

INSERT INTO dws_word_count
SELECT my_reverse(my_substring(word, 0, 2)) AS word, COUNT(*) AS cnt
FROM ods_words
CROSS JOIN UNNEST(SPLIT(sentence, ' ')) AS t(word)
GROUP BY my_reverse(my_substring(word, 0, 2));

可以看到,表和 UDF 全部来自 Catalog 快照,SQL 脚本里没有任何 DDL,只剩业务逻辑。

那为什么不直接用 Flink 自带的 Catalog?Flink 提供了一些内置的 Catalog,但是它们都有一定的劣势:

  • GenericInMemoryCatalog是纯内存版本,进程重启就消失;
  • HiveCatalog可以持久化到 Hive Metastore,但需要额外部署 HMS 服务;
  • JDBCCatalog同理。这些 Catalog 启动时都要连到某个"活的"元数据中心,拿到的永远是最新的DDL。

Catalog 快照设计的初衷是启停幂等性。想象一种场景:你的 Flink 作业已经跑了三周,因为集群维护需要重启。但就在这三周里,上游业务改了 ODS 表的字段类型。如果你重启时去一个"活的"元数据中心拉取最新 DDL,拿到的是新 Schema——和 Checkpoint 里保存的状态对不上,轻则启动失败,重则静默恢复后产出错误数据。

Catalog 快照解决的就是这个问题:把部署时刻的 DDL 冻结为一个不可变的 JSON 文件。部署时什么样,重启后还是什么样。这也是为什么这份 JSON 应该和 SQL 脚本一起进 Git——部署时刻的完整元数据状态被永远锁定。

flink-sql-bootstrap 项目中快照的"不变性"是约定,不是代码强制的,仅在Catalog定义中定义了snapshotId。flink-sql-bootstrap 负责把 DDL 从 SQL 脚本中剥离,你负责让这份 DDL 在部署周期内不变——URL 锁死版本号、和脚本一起进 Git 即可。比如说你搭建了一个 REST 服务,那么资源定义则为https://catalog-server/snapshot/{snapshot-id}

快速开始

接下来我们将基于 flink-sql-bootstrap 项目及内置的示例带你体验一下 Catalog 快照的乐趣。

你需要从 GitHub Releases 下载 JAR,确保${FLINK_HOME}/lib下有flink-sql-gateway-*.jar(从${FLINK_HOME}/opt拷贝即可)。

一个完整的带 Catalog 快照的部署命令:

$FLINK_HOME/bin/flink run \
--target local \
flink-sql-bootstrap-${version}.jar \
--script-file classpath:example-word-count-advanced.sql \
--catalog-file classpath:example-catalog.json \
--dependency classpath:example-udf-reverse.jar \
--dependency classpath:example-udf-substring.jar

其中example-catalog.json就是上文展示的快照,example-word-count-advanced.sql的内容就是上文那个只含 DML 的脚本:

INSERT INTO dws_word_count
SELECT my_reverse(my_substring(word, 0, 2)) AS word, COUNT(*) AS cnt
FROM ods_words
CROSS JOIN UNNEST(SPLIT(sentence, ' ')) AS t(word)
GROUP BY my_reverse(my_substring(word, 0, 2));

JAR 内已内置这些示例文件,无需额外准备。

--catalog-file--script-file一样支持五种协议:classpath:file://http(s)://hdfs://s3://--dependency用于加载 UDF 的 JAR 包。

执行后输出:

+I[6a, 1]
+I[00, 1]
+I[a3, 1]
+I[8f, 1]

UDFmy_reversemy_substring的组合效果是:取每个单词前两个字符再反转。整个过程中 SQL 脚本没有写任何CREATE TABLECREATE FUNCTION,所有 DDL 来自 Catalog 快照。

Flink SQL Bootstrap 是如何做到的?

在讲 flink-sql-bootstrap 怎么做之前,先简单说下 Flink 的 Catalog 是什么。

Flink SQL 解析SELECT * FROM ordersorders这张表从哪来?答案是Catalog,Flink 的元数据注册中心。它存储了当前 Session 中所有可用的表结构、视图定义和 UDF。你在 SQL Client 里CREATE TABLE,本质上就是往当前 Catalog 里注册了一条元数据。

Catalog 的核心 API 就几个,比如:createDatabase()createTable()createFunction()。flink-sql-bootstrap 做的事也很直接:把 JSON 快照翻译成这些元信息然后调用createXXX()API 构建 Catalog。整体架构如下图所示。

http://www.jsqmd.com/news/1091237/

相关文章:

  • MSPM0定时器实战:QEI编码器解码与PWM电机控制全解析
  • 吸氢机流量会虚标吗?3个家用检测方法,轻松识破行业猫腻
  • OpenCode 个人习惯设置大全
  • OBS-ASIO插件终极指南:实现专业音频设备的低延迟录制与直播
  • 宏与函数的本质区别(理解场景的前提)
  • 深入解析EASY-HWID-SPOOFER:内核级硬件信息修改技术实现
  • CompressO:免费开源跨平台媒体压缩工具终极指南
  • GD32F303串口驱动开发:从寄存器到中断与环形缓冲区的实战解析
  • 如何3分钟快速安装TrollStore:TrollInstallerX全面指南
  • 创维E900V22C电视盒子刷机指南:三步变身专业4K媒体播放器
  • 客户细分化技术中的聚类分析分类模型与细分策略
  • 3分钟快速上手:用Barrier实现一套键鼠控制多台电脑的终极方案
  • 2026博尔塔拉黄金回收白银回收铂金回收旧料回收怎么选?五家高实价铂金白银线下门店测评清单 + 联系方式
  • Redis 内存分配器调优方案
  • PySpark实战:从数据清洗到模型部署的泰坦尼克号幸存者预测完整流程
  • 江协的51单片机的学习
  • STK与MATLAB联动实战:Walker星座建模与参数解析
  • SQLModel零基础教程(二)- 字段高级配置 数据校验,复用Pydantic能力
  • Vivado HLS高层次综合的设计理念
  • 重磅官宣!射击冠军张梦影签约爱依克品牌形象大使。
  • 配方灵活调配需求选天伟生物或单品类发酵企业分析
  • OpenMontage:一站式AI视频生成全链路开源工具部署与应用指南
  • C++ 命名空间(namespace)全方位实战教学(零基础入门到工程高阶)
  • OpCore-Simplify:黑苹果配置的终极简化指南,3步完成专业级EFI构建
  • 【深度学习】OpenCV 实战:从图片中精确提取扇子区域
  • 告别快餐式传奇!冰雪传奇点卡版以经典公平机制留住玩家
  • [深圳] SHEIN 内推:算法/大模型/后端/数据/安全/测试/iOS,20-80k
  • 告别路径迷宫:一站式配置VSCode智能路径解析与跳转
  • 从零构建WordPress渗透测试靶场:实战演练与安全加固
  • LeetCode 热题 100——3.字母异位词分组