Flink 实时数仓开发实战:Catalog 快照,让 DDL 只写一次
前两篇文章我们解决了两件事:怎么跑(像 Hive 那样用 Flink SQL),怎么管(像后端那样 CI/CD)。
传统的 Flink SQL DDL 散落在每个 SQL 脚本里。一个实时数仓有几十张表,每张表的CREATE TABLE出现在引用它的每个脚本中。改一个字段类型所有脚本全要改。这也是为什么需要一个Flink SQL 元数据中心。
本文介绍的Catalog 快照解决的就是这个问题:把 DDL 从 SQL 脚本中剥离,序列化为一个自包含的 JSON 文件,作业启动时自动恢复到内存 Catalog。实现DDL 写一次,所有作业共用。
本文将深入 Catalog 快照的设计思路和底层原理 ——为什么必须是"快照?"、如何在 Flink 现有 API 基础上实现 DDL 语义的完全兼容?。本文基于 flink-sql-bootstrap 搭建,如果你不想直接用该项目,文中也介绍了内部原理能够指引你如何自建。
为什么是"快照"?
先看快照长什么样。这是一个最简版本——只定义两张表,两个 UDF:
{ |
"version": 1, |
"snapshotId": "20240622-155500-a1b2", |
"catalogName": "platform", |
"databaseName": "default", |
"tables": [ |
{ |
"database": "default", |
"name": "ods_words", |
"columns": [ |
{ "name": "sentence", "type": "STRING", "nullable": true } |
], |
"options": { |
"connector": "datagen", |
"rows-per-second": "1" |
} |
}, |
{ |
"database": "default", |
"name": "dws_word_count", |
"columns": [ |
{ "name": "word", "type": "STRING", "nullable": false }, |
{ "name": "cnt", "type": "BIGINT", "nullable": false } |
], |
"options": { |
"connector": "print" |
} |
} |
], |
"views": [], |
"udfs": [ |
{ |
"name": "my_reverse", |
"className": "examples.udf.MyReverseFunction", |
"functionLanguage": "JAVA", |
"jarRef": "example-udf-reverse.jar" |
}, |
{ |
"name": "my_substring", |
"className": "examples.udf.MySubstringFunction", |
"functionLanguage": "JAVA", |
"jarRef": "example-udf-substring.jar" |
} |
] |
} |
配合这份快照,SQL 脚本里不再需要任何 DDL:
INSERT INTO dws_word_count |
SELECT my_reverse(my_substring(word, 0, 2)) AS word, COUNT(*) AS cnt |
FROM ods_words |
CROSS JOIN UNNEST(SPLIT(sentence, ' ')) AS t(word) |
GROUP BY my_reverse(my_substring(word, 0, 2)); |
可以看到,表和 UDF 全部来自 Catalog 快照,SQL 脚本里没有任何 DDL,只剩业务逻辑。
那为什么不直接用 Flink 自带的 Catalog?Flink 提供了一些内置的 Catalog,但是它们都有一定的劣势:
GenericInMemoryCatalog是纯内存版本,进程重启就消失;HiveCatalog可以持久化到 Hive Metastore,但需要额外部署 HMS 服务;JDBCCatalog同理。这些 Catalog 启动时都要连到某个"活的"元数据中心,拿到的永远是最新的DDL。
Catalog 快照设计的初衷是启停幂等性。想象一种场景:你的 Flink 作业已经跑了三周,因为集群维护需要重启。但就在这三周里,上游业务改了 ODS 表的字段类型。如果你重启时去一个"活的"元数据中心拉取最新 DDL,拿到的是新 Schema——和 Checkpoint 里保存的状态对不上,轻则启动失败,重则静默恢复后产出错误数据。
Catalog 快照解决的就是这个问题:把部署时刻的 DDL 冻结为一个不可变的 JSON 文件。部署时什么样,重启后还是什么样。这也是为什么这份 JSON 应该和 SQL 脚本一起进 Git——部署时刻的完整元数据状态被永远锁定。
flink-sql-bootstrap 项目中快照的"不变性"是约定,不是代码强制的,仅在
Catalog定义中定义了snapshotId。flink-sql-bootstrap 负责把 DDL 从 SQL 脚本中剥离,你负责让这份 DDL 在部署周期内不变——URL 锁死版本号、和脚本一起进 Git 即可。比如说你搭建了一个 REST 服务,那么资源定义则为https://catalog-server/snapshot/{snapshot-id}。
快速开始
接下来我们将基于 flink-sql-bootstrap 项目及内置的示例带你体验一下 Catalog 快照的乐趣。
你需要从 GitHub Releases 下载 JAR,确保${FLINK_HOME}/lib下有flink-sql-gateway-*.jar(从${FLINK_HOME}/opt拷贝即可)。
一个完整的带 Catalog 快照的部署命令:
$FLINK_HOME/bin/flink run \ |
--target local \ |
flink-sql-bootstrap-${version}.jar \ |
--script-file classpath:example-word-count-advanced.sql \ |
--catalog-file classpath:example-catalog.json \ |
--dependency classpath:example-udf-reverse.jar \ |
--dependency classpath:example-udf-substring.jar |
其中example-catalog.json就是上文展示的快照,example-word-count-advanced.sql的内容就是上文那个只含 DML 的脚本:
INSERT INTO dws_word_count |
SELECT my_reverse(my_substring(word, 0, 2)) AS word, COUNT(*) AS cnt |
FROM ods_words |
CROSS JOIN UNNEST(SPLIT(sentence, ' ')) AS t(word) |
GROUP BY my_reverse(my_substring(word, 0, 2)); |
JAR 内已内置这些示例文件,无需额外准备。
--catalog-file和--script-file一样支持五种协议:classpath:、file://、http(s)://、hdfs://、s3://。--dependency用于加载 UDF 的 JAR 包。
执行后输出:
+I[6a, 1] |
+I[00, 1] |
+I[a3, 1] |
+I[8f, 1] |
UDFmy_reverse和my_substring的组合效果是:取每个单词前两个字符再反转。整个过程中 SQL 脚本没有写任何CREATE TABLE或CREATE FUNCTION,所有 DDL 来自 Catalog 快照。
Flink SQL Bootstrap 是如何做到的?
在讲 flink-sql-bootstrap 怎么做之前,先简单说下 Flink 的 Catalog 是什么。
Flink SQL 解析SELECT * FROM orders时orders这张表从哪来?答案是Catalog,Flink 的元数据注册中心。它存储了当前 Session 中所有可用的表结构、视图定义和 UDF。你在 SQL Client 里CREATE TABLE,本质上就是往当前 Catalog 里注册了一条元数据。
Catalog 的核心 API 就几个,比如:createDatabase()、createTable()、createFunction()。flink-sql-bootstrap 做的事也很直接:把 JSON 快照翻译成这些元信息然后调用createXXX()API 构建 Catalog。整体架构如下图所示。
