当前位置: 首页 > news >正文

Flink SQL联结与集合操作详解

一、前言

在上一篇文章中,我们学习了Flink SQL的聚合查询、窗口TVF和TopN等核心功能。本文将继续深入Flink SQL的查询能力,探讨联结(Join)操作、集合操作以及查询优化等高级特性。

联结查询是SQL中最常用也最复杂的操作之一,在流处理场景中更是如此。Flink SQL针对流处理的特点,提供了多种联结方式,包括常规联结(Regular Join)、间隔联结(Interval Join)和维表联结(Lookup Join)等。理解这些联结方式的原理和适用场景,是构建复杂流处理应用的关键。


二、联结(Join)查询

2.1 Flink SQL中的Join类型概览

Flink SQL中的联结查询大体上可以分为两类:SQL原生的联结查询方式,和流处理中特有的联结查询。

上图对比了Flink SQL中四种Join类型的特点:

Join类型联结条件多版本构建消费更新流重要考虑因素
Regular Join无限制性能受probe side状态影响
Lookup Join等值条件可能非常慢,依赖外部系统可用性
Temporal Join等值条件+完整主键支持事件时间必须正确定义
Interval Join等值条件+时间范围需要合理定义时间范围

2.2 常规联结(Regular Join)

常规联结(Regular Join)是SQL中原生定义的Join方式,是最通用的一类联结操作。它的具体语法与标准SQL的联结完全相同,通过关键字JOIN来联结两个表,后面用关键字ON来指明联结条件。

上图展示了Regular Join与Temporal Join在不同输入类型(append/upsert)下的输出行为差异。

Regular Join包含以下几种

Join类型说明输出行为
Inner Join等值内联结只有两条流Join到才输出 +[L, R]
Left Join左外联结左流数据到达后,无论有没有Join到右流都输出
Right Join右外联结与Left Join逻辑相反
Full Join全外联结左流或右流数据到达后,无论有没有Join到都输出

等值内联结(INNER Equi-JOIN)

SELECT*FROMwsINNERJOINws1ONws.id=ws1.id;

目前仅支持等值联结条件。内联结会返回两表中符合联结条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)中满足条件的部分。

等值外联结(OUTER Equi-JOIN)

-- 左外联结SELECT*FROMwsLEFTJOINws1ONws.id=ws1.id;-- 右外联结SELECT*FROMwsRIGHTJOINws1ONws.id=ws1.id;-- 全外联结SELECT*FROMwsFULLOUTERJOINws1ONws.id=ws.id;

Regular Join的注意事项

  • 实时Regular Join可以不是等值Join。等值Join和非等值Join区别在于,等值Join数据shuffle策略是Hash,会按照Join on中的等值条件作为id发往对应的下游;非等值Join数据shuffle策略是Global,所有数据发往一个并发
  • 流的上游是无限的数据,Flink会将两条流的所有数据都存储在State中,所以Flink任务的State会无限增大,因此需要为State配置合适的TTL,以防止State过大

2.3 间隔联结(Interval Join)

间隔联结(Interval Join)返回符合约束条件的两条流中数据的笛卡尔积。与常规联结不同,间隔联结多了一个时间间隔的限制。

上图展示了Interval Join的时间窗口机制:对于Orders流中的每个事件,在Shipments流中查找落在时间窗口内的匹配事件。

语法要点

  • 两表的联结不需要用JOIN关键字,直接在FROM后将要联结的两表列出来,用逗号分隔
  • 联结条件用WHERE子句来定义,用一个等值表达式描述
  • 时间间隔限制在WHERE子句中通过AND追加

时间间隔定义方式

-- 方式一:精确匹配ltime=rtime-- 方式二:范围匹配ltime>=rtimeANDltime<rtime+INTERVAL'10'MINUTE-- 方式三:BETWEEN语法ltimeBETWEENrtime-INTERVAL'10'SECONDANDrtime+INTERVAL'5'SECOND

案例

SELECT*FROMws,ws1WHEREws.id=ws1.idANDws.etBETWEENws1.et-INTERVAL'2'SECONDANDws1.et+INTERVAL'2'SECOND;

2.4 维表联结(Lookup Join)

Lookup Join其实就是维表Join,实时获取外部缓存的Join。Lookup的意思就是实时查找。


上图展示了Lookup Join的工作原理:

  • 左流(Source)的数据到达时,去外部维表(Paimon Table/MySQL等)中查找匹配数据
  • 通过LRU缓存加速查询,减少对外部系统的访问压力
  • 将关联后的结果输出到Sink

语法

表AJOIN维度表名FORSYSTEM_TIMEASOF表A.proc_timeAS别名ONxx.字段=别名.字段

案例:MySQL维表Join

-- 创建维表CREATETABLECustomers(idINT,name STRING,country STRING,zip STRING)WITH('connector'='jdbc','url'='jdbc:mysql://hadoop102:3306/customerdb','table-name'='customers');-- 使用Lookup JoinSELECTo.order_id,o.total,c.country,c.zipFROMOrdersASoJOINCustomersFORSYSTEM_TIMEASOFo.proc_timeAScONo.customer_id=c.id;

Lookup Join的特点

  • 仅支持处理时间字段
  • 左流(事实表)每来一条数据,都会去外部维表查找维度数据
  • 如果维表数据发生变化,已经关联过的数据不会自动更新(基于处理时间快照)

三、Order By 和 Limit

3.1 Order By

Flink SQL支持Order By,但在实时任务中一般用的非常少。

实时任务中,Order By子句中必须要有时间属性字段,并且必须写在最前面且为升序

SELECT*FROMwsORDERBYet,idDESC;

3.2 Limit

SELECT*FROMwsLIMIT3;

四、SQL Hints

4.1 什么是SQL Hints

在执行查询时,可以在表名后面添加SQL Hints来临时修改表属性,对当前job生效。

上图展示了SQL Hints在查询优化中的作用:通过Hints可以影响查询执行计划,优化查询性能。

4.2 Hints语法

select*fromws1/*+ OPTIONS('rows-per-second'='10')*/;

常用Hints

Hint说明
OPTIONS('rows-per-second'='10')修改DataGen的生成速率
LOOKUP('table'='my_table2', 'async'='true')启用异步Lookup Join

异步Lookup Join案例

-- 同步Lookup(默认)SELECT/*+ LOOKUP('table'='my_table2', 'async'='false') */*FROMmy_table1ASt1JOINmy_table2FORSYSTEM_TIMEASOFt1.proctimeASt2ONt1.a=t2.c;-- 异步Lookup(提升吞吐量)SELECT/*+ LOOKUP('table'='my_table2', 'async'='true') */*FROMmy_table1ASt1JOINmy_table2FORSYSTEM_TIMEASOFt1.proctimeASt2ONt1.a=t2.c;

五、集合操作

5.1 集合操作概述

Flink SQL支持标准SQL中的集合操作,包括UNION、UNION ALL、INTERSECT、INTERSECT ALL、EXCEPT和EXCEPT ALL。

上图通过维恩图直观展示了四种集合操作的结果集关系:

  • UNION:合并两个集合并去重
  • UNION ALL:合并两个集合不去重
  • INTERSECT:取两个集合的交集
  • EXCEPT:取左集合中不在右集合中的元素

5.2 UNION 和 UNION ALL

-- UNION:合并并去重(SELECTidFROMws)UNION(SELECTidFROMws1);-- UNION ALL:合并不去重(SELECTidFROMws)UNIONALL(SELECTidFROMws1);

5.3 INTERSECT 和 INTERSECT ALL

-- INTERSECT:交集并去重(SELECTidFROMws)INTERSECT(SELECTidFROMws1);-- INTERSECT ALL:交集不去重(SELECTidFROMws)INTERSECTALL(SELECTidFROMws1);

5.4 EXCEPT 和 EXCEPT ALL

-- EXCEPT:差集并去重(SELECTidFROMws)EXCEPT(SELECTidFROMws1);-- EXCEPT ALL:差集不去重(SELECTidFROMws)EXCEPTALL(SELECTidFROMws1);

5.5 流式集合操作的特点

上述SQL在流式任务中,如果一条左流数据先来了,没有从右流集合数据中找到对应的数据时会直接输出,当右流对应数据后续来了之后,会下发回撤流将之前的数据给撤回。这也是一个回撤流。

5.6 IN 子查询

In子查询的结果集只能有一列:

SELECTid,vcFROMwsWHEREidIN(SELECTidFROMws1);

上述SQL的In子句和之前介绍到的Inner Join类似。并且In子查询也会涉及到大状态问题,要注意设置State的TTL。


六、总结

本文详细讲解了Flink SQL中的联结与集合操作:

  1. 常规联结(Regular Join):包括Inner/Left/Right/Full Join,语法与标准SQL一致,但需要注意流式场景下状态无限增长的问题

  2. 间隔联结(Interval Join):在等值联结的基础上增加时间间隔限制,适合有时间范围关联需求的场景

  3. 维表联结(Lookup Join):流与外部存储(MySQL/Redis/HBase等)的实时关联,仅支持处理时间

  4. Order By/Limit:实时任务中Order By必须包含时间属性字段且放在最前面

  5. SQL Hints:临时修改表属性,常用于优化Lookup Join(同步/异步)和DataGen参数

  6. 集合操作:UNION/UNION ALL、INTERSECT/INTERSECT ALL、EXCEPT/EXCEPT ALL,流式场景下会产生回撤流

  7. In子查询:结果集只能有一列,底层类似于Inner Join,需要注意大状态问题

理解这些联结和集合操作的原理与适用场景,是构建复杂流处理SQL应用的基础。下一篇文章我们将继续深入Flink SQL的Connector与Catalog实战。


如果本文对你有帮助,欢迎点赞 👍 + 收藏 ⭐ + 关注 🔖,你的支持是我持续创作的动力!

http://www.jsqmd.com/news/1076493/

相关文章:

  • 昇腾950适配DeepSeek V4-Pro推理实战:CUDA转CANN避坑指南
  • 深度解析Sunshine游戏串流服务器的架构设计与技术实践
  • Java实习面试必备:核心知识点全解析
  • 计算机毕业设计之基于微信小程序主持接单程序的设定
  • AI代理命令注入漏洞剖析:从WS MCP协议风险到企业级三层防护方案
  • 告别刮削难题:MetaShark让Jellyfin中文影视库焕然一新
  • Mythos推理增强中间件:可验证AI推理的工程化实践
  • 【限时公开】JetBrains内部文档节选:IDEA中文语言包加载优先级规则(含intl.properties手动注入技巧)
  • 便携手电电源优化:FP6291 升压 IC 适配 9V 灯珠设计方案,单节锂电升压驱动 9V 高压 LED 手电筒电路设计详解
  • ROS2 SHM 零拷贝 40~50μs 完整延迟拆解
  • 猫抓浏览器扩展:免费强大的资源嗅探工具使用完全指南
  • 大屏数字人智能交互新方案:语音通话问答 + 一键调取后台数据,重塑线下大屏数字化体验
  • 智能运维2.0:从范式跃迁到落地实操——理论框架与实施指南
  • 六种扩散模型控制技术实战指南:从提示词到潜空间操作
  • Ashby 一体化解决方案:助力不同规模企业招聘,多维度资源对比与支持服务全揭秘
  • 个人开发小程序与公司开发:哪种方式更适合你?
  • 客户问我:AI搜索来了,网站还能活多久?
  • 5分钟实战指南:使用zteOnu高效获取中兴光猫超级管理员权限
  • 【Springboot毕设全套源码+文档】基于SpringBoot和Vue的机票预定系统的设计与实现(丰富项目+远程调试+讲解+定制)
  • 推文情绪分析实战:用RoBERTa做机器学习情感识别
  • 专业的花箱护栏制造企业
  • 物联网与可穿戴设备在慢性病远程监护中的系统架构与工程实践
  • 如何灵活设置公式中各个部分的颜色?
  • AI幻觉的本质:不是Bug而是理性选择
  • 论文省心了!高效论文写作全流程一键生成论文工具推荐(2026 最新)
  • MitoHiFi:三步搞定PacBio HiFi数据的线粒体基因组组装
  • 【课程设计/毕业设计】基于 LSTM 学习评估的 Django 线上考试管理系统设计与实现 面向智能测评的 Django+LSTM 在线考试系统设计与实现【附源码、数据库、万字文档】
  • 和利时LK271 PROFINET 主站通信模块使用方法
  • 计算机Python毕设实战-基于 Echarts+Python 的图书进销存监测管理系统设计与实现 基于 Echarts+Python 的图书零【完整源码+LW+部署说明+演示视频,全bao一条龙等】
  • 如何轻松搭建自己的离线翻译服务器:LibreTranslate完全指南