当前位置: 首页 > news >正文

从零开始学Flink:实时数仓与维表时态Join实战

在前一篇 《Flink 双流 JOIN 实战详解》 中,我们用「订单流 + 支付流」搞懂了事实双流之间的时间关联。

但在真实的实时数仓项目里,光有事实流还不够,业务同学更关心的是:

  • 下单用户是新客还是老客
  • 用户当前的等级、城市、渠道
  • 商品所属品类、类目层级

这些信息通常存放在 维度表(维表)中,例如 MySQL 的 dim_userdim_product 等。我们希望在实时计算时,能把「事实流」和「维表」在时间维度上正确地关联起来,构建一张带有完整业务属性的明细宽表

这就是 维表时态 Join(Temporal Table Join) 要解决的问题。

本文我们就以「订单事实流 + 用户维表」为例,完成一个从 Kafka 到 MySQL 的简易实时数仓 Demo,并重点理解 Flink SQL 中维表时态 Join 的语法和注意事项。

一、业务场景与数仓目标

设想一个简化的电商业务场景:

  • Kafka 中有实时写入的 orders 订单事实流
  • MySQL 中维护一张 dim_user 用户维表,包含用户等级、所属城市、注册渠道等信息

我们想要在 Flink 中构建一张「订单明细宽表」,字段大致包括:

  • 订单信息:订单号、下单用户、下单金额、下单时间
  • 用户属性:用户昵称、等级、城市、注册渠道

并且要求:

  • 当我们回看 10 分钟前的某条订单时,看到的是 当时 用户的等级和城市,而不是被后续变更“冲掉”的最新值

这正是 时态 Join 和「实时数仓」的关键:按事件发生时刻回放维度视图

二、环境前提与依赖准备

1. 基础组件

本篇默认你已经完成前几篇中的环境准备:

  • Flink 1.20.1(WSL2 Ubuntu 下部署)
  • Kafka 集群已启动,且能正常写入 / 读取 Topic
  • Flink SQL Client 可以正常连接集群

在此基础上,我们还需要:

  • 一套可访问的 MySQL(本地或远程均可)
  • Flink 的 JDBC Connector JAR 包

和 Kafka 一样,JDBC 连接器也需要以 JAR 包形式放到 Flink 的 lib 目录中。

以 Flink 1.20.x 对应的 flink-connector-jdbc 为例:

  1. 确认 Flink 安装目录(假设为 /opt/flink):

    export FLINK_HOME=/opt/flink
    

  1. 下载 JDBC Connector JAR 到 Flink 的 lib 目录:

    cd $FLINK_HOME/lib
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.3.0-1.20/flink-connector-jdbc-3.3.0-1.20.jar
    
  2. 如果你使用的是独立集群或远程集群,需要重启 Flink 集群,让新 JAR 在 JobManager/TaskManager 上生效:

    cd $FLINK_HOME
    bin/stop-cluster.sh
    bin/start-cluster.sh
    
  3. 重启 Flink SQL Client,使用新 Connector:

    cd $FLINK_HOME
    bin/sql-client.sh
    

如果你在 Windows + WSL2 上部署,只需在 WSL2 内执行上述命令即可;或者手动下载 JAR 后拷贝到 lib 目录,步骤完全一致。

三、准备 MySQL 用户维度表 dim_user

首先在 MySQL 中准备一张简单的用户维度表,用来存用户的基础属性。

在 MySQL 中执行:

CREATE DATABASE IF NOT EXISTS realtime_dwh;
USE realtime_dwh;CREATE TABLE dim_user (user_id      VARCHAR(32)  PRIMARY KEY,user_name    VARCHAR(64),user_level   VARCHAR(16),city         VARCHAR(64),register_time DATETIME
);INSERT INTO dim_user (user_id, user_name, user_level, city, register_time) VALUES
('u_1', '张三', 'VIP1', '北京', '2025-12-01 10:00:00'),
('u_2', '李四', 'VIP2', '上海', '2025-12-05 11:00:00'),
('u_3', '王五', 'VIP1', '广州', '2025-12-10 12:00:00');

为了演示「时态」效果,你可以在后续实验中手动更新某个用户的等级或城市,例如:

UPDATE dim_user
SET user_level = 'VIP3'
WHERE user_id = 'u_2';

这样我们在 Flink 里做时态 Join 时,就能观察“变更前后”的区别。

接下来回到 Flink SQL Client,把 Kafka 中的订单事实流和 MySQL 中的维表都注册成 Flink 表。

1. Kafka 订单事实表 orders

和上一篇双流 JOIN 类似,我们假设 Kafka 中有一个 orders Topic,写入订单事实数据。

在 Flink SQL Client 中执行:

CREATE TABLE orders (order_id     STRING,user_id      STRING,order_amount DECIMAL(10, 2),order_time   TIMESTAMP_LTZ(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink-orders-dim','scan.startup.mode' = 'earliest-offset','format' = 'json','json.timestamp-format.standard' = 'ISO-8601'
);

你可以沿用上一篇中 Kafka 造数的方式,用 kafka-console-producer.sh 发送 JSON 订单数据,只需要保证字段名一致。

2. MySQL 用户维表 dim_user(JDBC Lookup 表)

然后把刚才在 MySQL 中建好的 dim_user 注册为 Flink 的 JDBC 表:

CREATE TABLE dim_user (user_id       STRING,user_name     STRING,user_level    STRING,city          STRING,register_time TIMESTAMP(3),PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/realtime_dwh','table-name' = 'dim_user','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = '1qaz@WSX'
);

注意几点:

  • PRIMARY KEY (user_id) NOT ENFORCED 告诉 Flink 这是一张以 user_id 为主键的表,是做时态 Join 的前提
  • 这里使用的是典型的 JDBC Lookup 模式,Flink 会在 Join 时按需去 MySQL 查维度信息

在生产环境中,你可以把 MySQL 作为维度存储,或者通过 CDC 把维表变更同步到 Kafka,构造成 changelog 流,这些都可以和 Temporal Join 结合使用。

五、维表时态 Join:把订单打上用户维度

有了订单事实表 orders 和维度表 dim_user,就可以通过时态 Join 来构建订单明细宽表。

1. 基础时态 Join 语法

Flink SQL 中的 Temporal Table Join 对于 JDBC 这类 外部维表,通常采用「处理时间(Processing Time)」语义来做 Lookup Join,典型写法如下:

SELECTo.order_id,o.user_id,d.user_name,d.user_level,d.city,o.order_amount,o.order_time
FROM orders AS o
LEFT JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;

FlinkJoin
这里有几个关键点:

  • proc_time AS PROCTIME() 是在 orders 上定义的处理时间字段
  • FOR SYSTEM_TIME AS OF o.proc_time 表示“以 Flink 处理这条订单记录的当前时间,去查维表的一个快照”,这是 JDBC Lookup 支持的典型用法
  • Join 条件依然是 user_id 等值关联
  • 使用 LEFT JOIN 可以保留找不到维度的订单,并用空值来表示“维度缺失”

在 SQL Client 中执行这段查询,会看到实时流式刷新的结果,每一行订单都带上了对应的用户属性。

2. 验证时态效果:修改维表再观察 Join

为了验证这是“时态 Join”而不是“始终查最新维度”,可以按下面步骤操作:

  1. 先往 Kafka 的 orders Topic 写入几条订单数据,例如用户 u_2 下单的记录

  2. 观察 Flink SQL 中 Join 后的结果,此时 u_2 的等级是 VIP2

  3. 回到 MySQL,执行:

    UPDATE dim_user
    SET user_level = 'VIP3'
    WHERE user_id = 'u_2';
    
  4. 再写入一批新的订单,仍然是用户 u_2

bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic orders

在命令行中输入一条 JSON 数据(按回车发送一条):

{"order_id":"o_3","user_id":"u_2","order_amount":200.00,"order_time":"2026-02-19T14:42:00Z"}

FlinkJoin
这时你会看到:

  • 变更前的订单,维度字段仍然显示 VIP2
  • 变更后的订单,维度字段变成了 VIP3

这就说明 Flink 的时态 Join 确实是“按订单发生时刻去回放维度视图”的,而不是简单查当前最新值。

六、把结果写回 Kafka 或 MySQL,形成实时数仓明细层

在真实项目中,我们不会只在 SQL Client 里 SELECT 一下就结束,而是要把 Join 后的订单明细宽表,写回到下游存储,形成实时数仓的一个层级。

例如,可以把结果写回 Kafka,作为 DWD 层的订单宽表:

CREATE TABLE dwd_order_user_wide (order_id     STRING,user_id      STRING,user_name    STRING,user_level   STRING,city         STRING,order_amount DECIMAL(10, 2),order_time   TIMESTAMP_LTZ(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'dwd_order_user_wide','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink-dwd-order-wide','scan.startup.mode' = 'earliest-offset','format' = 'json','json.timestamp-format.standard' = 'ISO-8601'
);INSERT INTO dwd_order_user_wide
SELECTo.order_id,o.user_id,d.user_name,d.user_level,d.city,o.order_amount,o.order_time
FROM orders AS o
LEFT JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;

这样,下游的实时应用或 BI 查询就可以直接订阅 dwd_order_user_wide 这个 Topic,拿到已经打好用户标签的订单明细数据。

你也可以把结果同步到 MySQL、ClickHouse 等分析型数据库中,构建实时明细表,为报表和可视化提供数据。

七、小结与下一步建议

通过这篇文章,我们完成了这样一件事:

  • 在 Kafka 中维护订单事实流 orders
  • 在 MySQL 中维护用户维度表 dim_user
  • 使用 Flink SQL 的 JDBC Connector 把 MySQL 注册为维表
  • 利用 FOR SYSTEM_TIME AS OF 语法做维表时态 Join
  • 将 Join 结果写回 Kafka,形成实时数仓中的一张订单明细宽表

这背后有几个非常重要的实时数仓设计理念:

  • 事实流是不断追加的事件序列,维表是相对缓慢变更的业务视图
  • 时态 Join 让你能够“按事件发生的时间点”,回看当时的维度快照
  • 实时数仓的 DWD 层,往往就是「事实表 + 多个维表时态 Join」后形成的明细宽表

在后续的文章中,我们可以继续沿着这个方向深入:

  • 在一个任务里同时关联多张维表,构建更宽的明细表
  • 引入 CDC,把维表变更实时同步到 Kafka,再在 Flink 中构建 changelog 维表
  • 把实时数仓的明细层、汇总层(DWS)、指标主题层(ADS)串起来,做一个端到端的实时数仓小项目

如果你已经跑通了本文的 Demo,不妨试着自己设计一张商品维表 dim_product,再给订单打上商品品类维度,体验一下“事实 + 多维表时态 Join”在 Flink SQL 里的完整味道。


原文来自:http://blog.daimajiangxin.com.cn

http://www.jsqmd.com/news/394203/

相关文章:

  • 奥数-几何 - ace-
  • 基于小波神经网络WNN的短时负荷预测附Matlab代码
  • P2757 等差子序列 Sol
  • 晶抗生物2026年市场评测:用户选择背后的产品逻辑,小鼠的elisa试剂盒/酶联免疫试剂盒,晶抗生物公司推荐排行 - 品牌推荐师
  • 题解:洛谷 P7910 [CSP-J 2021] 插入排序
  • 基于完整集成经验模态分解(CEEMDAN)和近似熵(ApEn)CEENDAN-ApEn信号去噪附Matlab代码
  • 微信小程序Python知茶叶知识科普商城考试错题
  • 基于线性判别分析和三比值法的变压器故障识别附Matlab代码
  • 三菱FX5U+MCGS(昆仑通态)程序 1、完整的上下料接驳台项目分享; 2、三菱FX5U全S...
  • 揭秘V8引擎的类型混淆漏洞:安全开发的警示与启示
  • 电网“搭线“指南:用VSG预同步玩转三电平逆变器
  • 奥数-数论 - ace-
  • 告别 DNS 污染与封锁:手把手教你免费搭建独享 Cloudflare DoH 服务器,全球都可访问!
  • 题解:洛谷 P2671 [NOIP 2015 普及组] 求和
  • YOLO26涨点改进 | 全网独家创新,注意力改进篇| SCI一区Top | 引入AFCA自适应细粒度通道注意力,联合建模全局与局部通道依赖关系,适合目标检测、图像去雾、关键点检测、图像分类、图像分割
  • 【一文读懂】RAG的重要组成-向量数据库
  • 告别 DNS 污染与封锁:手把手教你免费搭建独享 Cloudflare DoH 服务器,全球都可访问!使用Cloudflare Zero Trust功能。
  • 实测对比后!千笔,口碑爆棚的降AIGC工具
  • RAG系统优化指南:Chunk分块策略详解,从入门到精通,收藏这一篇就够了!!
  • 题解:洛谷 P7072 [CSP-J 2020] 直播获奖
  • 2026最新!千笔ai写作,MBA论文写作利器
  • 奥数-代数 - ace-
  • 【STFT-CNN-BiGRU的故障诊断】基于短时傅里叶变换(STFT)结合卷积神经网络(CNN)与双向门控循环单元BiGRU的故障诊断研究附Matlab代码
  • 2026年35岁程序员的5条出路:AI赛道疯狂抢人,年薪百万不是梦
  • 【无人机部署】基于k - means、网格、随机算法改变UAV的数量来观察不同放置策略对总链路比特率的影响附matlab代码
  • 【图像加密】基于维纳滤波器和运动模糊的点扩散函数的图像加密算法研究附matlab代码
  • 【AI大模型】带你解析9种提速又提效的Transformer优化方案!
  • 一文总结!2026年大模型Agent RL训练多轮planning技术,收藏这篇就够了!
  • COMSOL激光超声仿真:激光超声-3维lamb波的数值模拟 版本为6.1,低于此版本打不开此模型
  • 实测对比后!千笔,普遍认可的降AIGC工具