Flink 实时数仓开发实战:像后端那样 CI/CD
概览
第一篇我们解决了"怎么写"——一条flink run跑起完整的 Multi-Statement SQL 脚本。这一篇解决"怎么管":让 Flink SQL 作业的研发流程具备和 Java 后端同样的工程能力——可检测、可追溯、可回滚、自动化。
本文将深入Flink SQL Validate的底层原理(Calcite 解析、验证),这也是各个大厂内部实时研发流程中最基础、最重要的一环,让你不仅知道怎么用,更理解为什么它能在不连 Flink 集群的情况下精确校验语法。
像上一篇 Flink 实时数仓开发实战:像 Hive 那样用 Flink SQL 一样,本文也提供了一个示例项目 Flink SQL Bootstrap Examples - CI/CD 帮助大家快速搭建本地环境,并一步步演示 CI/CD 流水线。
为什么需要 CI/CD
Flink SQL 及 CI/CD 的引入可以从以下四个方面大大提升研发效率:
- 代码量
- 技术栈门槛
- 维护成本
- 迭代效率
这也是为什么大厂普遍使用 Flink SQL 作为实时研发的核心原因。并且 CI/CD 它还保障了四个核心能力:
| 能力 | 说明 |
|---|---|
| 可检测 | 编译不过不能合、单测不过不能上线——机器过滤低级错误,人专注逻辑 |
| 可追溯 | 谁改的、什么时候改的、为什么改——git log + pipeline 记录一条链路 |
| 可回滚 | 出了事不至于手忙脚乱,重新部署上一个版本,几分钟恢复 |
| 自动化 | 从提交到上线,不需要人做机器能做的事 |
这些能力虽然在服务端已经是家常便饭,但是对于数据开发这些能力只有大厂内部高度集成的研发平台才能提供。但这些能力往往是最基础、最核心的研发流程,它的缺失就像一个雷:不知道什么时候、在哪里就会炸一下。
接下来我们将以flink-sql-bootstrap + Gitlab为样板提供一种轻量、可靠的方案:基于公司现有的服务端 CI/CD 流程(可能是 Gitlab Runner 或 Jenkins)搭建大数据自己的实时研发 CI/CD 流程。
快速开始
示例中将非常多繁琐的工作都帮读者做好了,一个命令就能够完成所有环境的安装、配置,甚至 Gitlab ssh 的创建和配置:
cd example-cicd/docker |
bash setup.sh |
这个脚本自动完成 8 件事:
| 步骤 | 说明 |
|---|---|
| 1. 检查 Docker + Docker Compose | 前置依赖校验 |
2. 构建flink-ci-runner:1.20.4 | CI Runner 镜像(Flink 1.20.4 + Python3 + git) |
| 3. 启动 GitLab CE + Runner | 两个容器,桥接网络 |
| 4. 预拉取 Runner Helper 镜像 | 防止 CI job 因网络问题拉取超时 |
| 5. 生成/上传 SSH 公钥到 GitLab | 免密推送代码 |
| 6. 在 GitLab 上创建项目仓库 | API 自动创建,不需要手动操作 |
| 7. 创建并注册 Project Runner | POST /api/v4/user/runners注册 |
8. 配置本地 git remotegitlab | ssh://git@localhost:2224,一键推送 |
环境就绪后,打开http://localhost:8929(用户名root,密码flink1234)就能看到 GitLab 面板。随便改个 SQL 文件,推送代码即可触发流水线:
git push gitlab main |
流水线设计
架构总览
示例中搭建的 CI/CD 流水线分为了以下几个阶段:
- 规范检查:我们的数仓有一些必要的规范,命名规范、SQL 语法规范等等,这一步是对 SQL 代码是否满足数仓规范的检查
- SQL校验:基于 Flink 内置的 Calcite 解析、验证能力对提交的 SQL 代码进行语法检查、语义检查
- 语法检查:检查 SQL 语法是否符合 Flink SQL 语法规范
- 语义检查:解析 SQL 查询的 Catalog,解析表名、字段名、函数名,这些 Catalog 中都有吗?引用的对吗?类型对吗?
- 动态编排:SQL Script 的发布往往是需要编排的,比如:上游新增了一个字段需要先发 DWD 再发 ADS,搞反了会导致发布失败,因此需要有一定的编排策略(当然可以根据自己的实际情况看下是否保留这一步)
- 权限审批(未实现):真实的发布是需要走审批流程的,示例中为了简单没有实现这一环,读者可以根据自己的实际情况接入公司内部的审批系统
- 部署线上:审批通过后,将 SQL Script 部署到线上(当然可能涉及到重启的方式,示例中为了简单没有实现这一环)
示例中,我们将 PR 作为触发 CI 的时机:用户提交了 PR 且涉及到了.sql文件的变更则触发 CI 流程。我们将合并作为触发 CD 的流程:用户合并了 PR 且合并到了main分支则触发 CD 流程。
当然,CD 流程只是为了演示整体的链路。实际 CD 流程可能涉及到权限、审批流、部署顺序、部署时间、部署方式等问题,读者可以根据自己的实际情况进行调整。
整条流水线由 4 个脚本和 1 个.gitlab-ci.yml驱动。
流水线脚本
| 脚本 | 做了什么 | 方式 |
|---|---|---|
check-warehouse-naming.sh | 检查{层级}_{业务}_{后缀}.sql三段式命名合规性 | 全量扫描warehouse/ |
validate-sql.sh | Flink SQL 语法 + 语义校验(表是否存在、字段是否在、类型是否匹配) | 增量(git diff),CI 下自动获取变更文件 |
generate-deploy-pipeline.sh | 检测两次 commit 间的 SQL 变更,调用 Python 编排脚本 | 增量 |
build-deploy-order.py | 从文件名解析层级,按dwd → dws → ads排序,输出child-pipeline.yml | 被 shell 脚本调用 |
核心校验命令只有一条,不连 Flink 集群,2 秒出结果:
$FLINK_HOME/bin/flink run --target local $BOOTSTRAP_JAR \ |
--script-file file://<sql文件> --validate |
部署则是两步串联:generate-deploy-pipeline.sh检测变更 →build-deploy-order.py按层级排序、生成子流水线。子流水线通过 GitLab 的trigger+artifact机制按 stage 顺序执行,--catalog-file注入生产环境表结构。
流水线配置
stages: |
- validate |
- rule-check |
- deploy |
warehouse-naming-check: # 全量命名规范检查 |
stage: rule-check |
script: bash scripts/check-warehouse-naming.sh |
rules: |
- if: $CI_PIPELINE_SOURCE == "merge_request_event" # MR 触发 |
changes: [example-cicd/warehouse/**/*.sql] |
- if: $CI_COMMIT_BRANCH == "main" # main 触发 |
changes: [example-cicd/warehouse/**/*.sql] |
flink-sql-validate: # 增量语法校验 |
stage: validate |
script: bash scripts/validate-sql.sh |
rules: # 同上,MR 和 main 都触发 |
generate-deploy-pipeline: # 生成部署子流水线(仅 main) |
stage: deploy |
script: bash scripts/generate-deploy-pipeline.sh |
artifacts: [example-cicd/child-pipeline.yml] |
rules: |
- if: $CI_COMMIT_BRANCH == "main" |
changes: [example-cicd/warehouse/**/*.sql] |
deploy-jobs: # 触发子流水线(仅 main) |
stage: deploy |
needs: [generate-deploy-pipeline] |
trigger: |
include: |
- artifact: example-cicd/child-pipeline.yml |
job: generate-deploy-pipeline |
strategy: depend |
rules: # 同 generate-deploy-pipeline |
几个关键设计:
changes:确保只有 SQL 变更才触发,改脚本、文档不跑流水线- MR 只跑 CI(规范检查 + 语法校验),CD 仅 main 分支触发
strategy: depend保证子流水线挂了,父流水线也标红
本地调试
所有脚本都脱离 CI 环境变量独立可跑:
# 校验单个文件 |
bash scripts/validate-sql.sh --file warehouse/orders/dwd_orders_di.sql |
# 模拟增量校验 |
CI_COMMIT_BEFORE_SHA=HEAD~2 CI_COMMIT_SHA=HEAD bash scripts/validate-sql.sh |
# 手动生成部署子流水线 |
bash scripts/generate-deploy-pipeline.sh --from HEAD~2 --to HEAD |
底层原理拆解
SQL 的多语句切分机制已在 上一篇(阶段一:智能切分)中详细讨论——六种状
