当前位置: 首页 > news >正文

Flink知识点(一)|Flink中的双流关联

在FLink中,经常会遇到双流关联的的场景,比如订单流和支付流的关联,用户行为流和用户信息流等。那么,在Flink中都提供了哪些双流的关联呢?下面主要从DataStream和Flink SQL角度入手说明。


DataStream

Window Join(窗口Join)

两条流在同一个时间窗口内进行Join,窗口结束时出触发计算。

// 设置10秒的窗口期stream1.join(stream2).where(r->r.id).equalTo(r->r.id).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply((e1,e2)->e1+" "+e2);

Window Join的特点

  • 只能做 inner join
  • 两条流的数据必须落在同一个窗口内才能匹配
  • 支持 Tumbling / Sliding / Session Window

Window Join只能做 inner join,join不上的数据会直接丢弃,没有机制输出到侧输出流,所以谨慎使用。


Interval Join(区间Join)

基于事件时间,允许一条流的数据在另一条流数据的时间范围内进行匹配。

// stream1流的一个记录可以在stream2前后5秒的这个区间匹配stream1.keyBy(r->r.id).intervalJoin(stream2.keyBy(r->r.id)).between(Time.seconds(-5),Time.seconds(5)).process(newProcessJoinFunction<>(){@OverridepublicvoidprocessElement(Leftl,Rightr,Contextctx,Collector<String>out){out.collect(l+" "+r);}});

Interval Join的特点:

  • 只支持 inner join
  • 只支持 event time
  • 比 Window Join 更灵活,适合时间偏差场景

同样,Interval Join只能做 inner join,join不上的数据会直接丢弃,没有机制输出到侧输出流,所以谨慎使用。


CoProcessFunction(底层 API)

CoprocessFunction是最灵活的方式,可以实现inner / left / right / full outer join,以及各种自定义逻辑。

stream1.keyBy(r->r.id).connect(stream2.keyBy(r->r.id)).process(newCoProcessFunction<Left,Right,String>(){// 用 State 缓存两侧数据,自行实现匹配逻辑MapState<String,Left>leftState;MapState<String,Right>rightState;@OverridepublicvoidprocessElement1(Leftl,Contextctx,Collector<String>out){// 查 rightState,匹配则输出,否则存入 leftState}@OverridepublicvoidprocessElement2(Rightr,Contextctx,Collector<String>out){// 查 leftState,匹配则输出,否则存入 rightState}});

CoProcessFunction的特点:

  • 完全自定义,支持所有 join 类型
  • 需要手动管理 State 和 TTL,防止状态无限增长
  • 适合复杂业务逻辑

使用CoProcessFunction实现Left Join

下面使用CoProcessFunction实现Left Join。

publicclassLeftJoinExample{// 左流数据@Data@AllArgsConstructorpublicstaticclassOrder{publicStringorderId;publicStringuserId;publiclongtimestamp;}// 右流数据@Data@AllArgsConstructorpublicstaticclassPayment{publicStringorderId;publicdoubleamount;publiclongtimestamp;}// 输出结果@Data@AllArgsConstructorpublicstaticclassJoinResult{publicStringorderId;publicStringuserId;publicDoubleamount;// 右流未匹配时为 null}publicstaticclassLeftJoinFunctionextendsCoProcessFunction<Order,Payment,JoinResult>{// 未匹配的左流数据privateMapState<String,Order>orderState;// 右流数据缓存privateMapState<String,Payment>paymentState;// 等待匹配的超时时间privatestaticfinallongJOIN_TIMEOUT=10_000L;@Overridepublicvoidopen(Configurationparameters){orderState=getRuntimeContext().getMapState(newMapStateDescriptor<>("order-state",String.class,Order.class));paymentState=getRuntimeContext().getMapState(newMapStateDescriptor<>("payment-state",String.class,Payment.class));}@OverridepublicvoidprocessElement1(Orderorder,Contextctx,Collector<JoinResult>out)throwsException{Paymentpayment=paymentState.get(order.orderId);if(payment!=null){// 右流已有数据,直接 join 输出out.collect(newJoinResult(order.orderId,order.userId,payment.amount));paymentState.remove(order.orderId);}else{// 右流暂无数据,缓存左流,注册超时定时器orderState.put(order.orderId,order);ctx.timerService().registerEventTimeTimer(order.timestamp+JOIN_TIMEOUT);}}@OverridepublicvoidprocessElement2(Paymentpayment,Contextctx,Collector<JoinResult>out)throwsException{Orderorder=orderState.get(payment.orderId);if(order!=null){// 左流已有数据,直接 join 输出out.collect(newJoinResult(order.orderId,order.userId,payment.amount));orderState.remove(payment.orderId);}else{// 左流暂无数据,缓存右流等待paymentState.put(payment.orderId,payment);}}@OverridepublicvoidonTimer(longtimestamp,OnTimerContextctx,Collector<JoinResult>out)throwsException{// 超时触发,左流数据仍未匹配,输出 null 右侧(left join 语义)for(Map.Entry<String,Order>entry:orderState.entries()){Orderorder=entry.getValue();if(order.timestamp+JOIN_TIMEOUT<=timestamp){out.collect(newJoinResult(order.orderId,order.userId,null));orderState.remove(entry.getKey());}}}}publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Order>orderStream=env.fromElements(newOrder("o1","u1",1000L),newOrder("o2","u2",2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((e,t)->e.timestamp));DataStream<Payment>paymentStream=env.fromElements(newPayment("o1",99.9,3000L)// o2 没有对应支付,left join 应输出 null).assignTimestampsAndWatermarks(WatermarkStrategy.<Payment>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((e,t)->e.timestamp));DataStream<JoinResult>result=orderStream.keyBy(o->o.orderId).connect(paymentStream.keyBy(p->p.orderId)).process(newLeftJoinFunction());result.print();env.execute();}}

上面是实现Left Join的简单Demo,在生产环境中建议给状态设置TTL,并使用RocksDB state backend。


选型

场景推荐方式
简单等值 join,时间对齐Window Join
有时间偏差的 inner joinInterval Join
需要 outer join 或复杂逻辑CoProcessFunction

Flink SQL

如果使用Table API或者SQL,支持更丰富的语义:

类型说明
Regular Join全量 join,状态持续保留
Interval Join基于时间区间
Temporal Join流与版本表/维表 join
Lookup Join流关联外部维表(如 MySQL、HBase)

Regular Join

全量 join,两侧数据都会永久保留在 state 中,适合数据量不大的场景。

-- inner joinSELECTo.order_id,p.amountFROMorders oJOINpayments pONo.order_id=p.order_id-- left joinSELECTo.order_id,p.amountFROMorders oLEFTJOINpayments pONo.order_id=p.order_id-- full outer joinSELECTo.order_id,p.amountFROMorders oFULLOUTERJOINpayments pONo.order_id=p.order_id

Regular Join的特点:

  • state 无限增长,生产环境需要配置 state TTL:
table.exec.state.ttl:86400000# 1天,单位毫秒

Interval Join

基于事件时间区间的 join,state 可以自动清理,比 Regular Join 更适合生产。

SELECTo.order_id,p.amountFROMorders o,payments pWHEREo.order_id=p.order_idANDp.pay_timeBETWEENo.order_time-INTERVAL'5'MINUTEANDo.order_time+INTERVAL'10'MINUTE

Interval Join的特点:

  • 只支持 inner join
  • 两张表都必须有事件时间属性
  • 区间范围要合理,太大会导致 state 积压

Temporal Join(时态表Join)

流关联一张有版本概念的表,获取事件发生时刻对应的快照数据,典型场景是汇率、价格表。

-- 定义版本表(有主键 + 事件时间)CREATETABLEexchange_rates(currency STRING,rateDOUBLE,update_timeTIMESTAMP(3),WATERMARKFORupdate_timeASupdate_time-INTERVAL'5'SECOND,PRIMARYKEY(currency)NOTENFORCED)WITH(...);-- 流关联版本表,取事件发生时的汇率SELECTo.order_id,o.amount*r.rateASamount_usdFROMorders oLEFTJOINexchange_ratesFORSYSTEM_TIMEASOFo.order_timeASrONo.currency=r.currency

Temporal Join的特点:

  • 获取的是事件时间对应的历史快照,不是最新值
  • state 可以自动清理

Lookup Join

流关联外部维表(MySQL、HBase、Redis 等),实时查询外部系统补充字段。

-- 定义外部维表CREATETABLEuser_info(user_id STRING,user_name STRING,ageINT,PRIMARYKEY(user_id)NOTENFORCED)WITH('connector'='jdbc','url'='jdbc:mysql://localhost:3306/db','table-name'='user_info');-- 使用处理时间做 lookup joinSELECTo.order_id,u.user_name,o.amountFROMorders oLEFTJOINuser_infoFORSYSTEM_TIMEASOFo.proc_timeASuONo.user_id=u.user_id

Lookup Join的特点:

  • 只支持处理时间(Processing Time)
  • 每条左流数据触发一次外部查询

如果不想每条左流数据触发一次外部查询,可以使用缓存机制,但对于变动频繁的维表不合适。

CREATETABLEuser_info(user_id STRING,user_name STRING)WITH('connector'='jdbc','url'='jdbc:mysql://localhost:3306/db','table-name'='user_info',-- 开启缓存'lookup.cache'='PARTIAL',-- 最大缓存行数'lookup.partial-cache.max-rows'='10000',-- 写入后过期时间'lookup.partial-cache.expire-after-write'='60s',-- 读取后过期时间'lookup.partial-cache.expire-after-access'='30s',-- 是否缓存空值'lookup.partial-cache.cache-missing-key'='true');

Window Join

基于窗口的 join,窗口内数据匹配,窗口结束后 state 自动清理。

SELECTo.order_id,p.amountFROMTABLE(TUMBLE(TABLEorders,DESCRIPTOR(order_time),INTERVAL'10'MINUTES))oJOINTABLE(TUMBLE(TABLEpayments,DESCRIPTOR(pay_time),INTERVAL'10'MINUTES))pONo.order_id=p.order_idANDo.window_start=p.window_startANDo.window_end=p.window_end

Window Join的特点:

  • 支持 Tumbling / Hopping / Cumulate 三种窗口。
  • 只支持 inner join。

选型

类型outer joinstate 自动清理适用场景
Regular Join数据量小,逻辑简单
Interval Join有时间关联的双流
Temporal Join关联历史版本快照
Lookup Join关联外部维表
Window Join窗口内聚合后关联

适用场景

1. 订单 + 支付流
订单产生后,等待支付事件到来进行关联,判断是否支付成功。时间有偏差,适合 Interval Join 或 CoProcessFunction。

2. 行为流 + 用户信息流
用户点击/浏览行为流,关联用户画像流做实时enrichment。用户信息变化不频繁,适合 Temporal Join 或 Lookup Join。

3. 广告曝光 + 点击流
曝光事件和点击事件分属两条流,需要关联计算点击率。时间窗口内匹配,适合 Window Join 或 Interval Join。

4. 日志流 + 告警规则流
实时日志流关联动态告警规则流,规则会实时更新。适合 CoProcessFunction,规则侧用 broadcast state。

5. 交易流 + 风控事件流
交易发生后,关联风控系统产生的风险事件,判断是否拦截。对延迟敏感,适合 Interval Join。

6. 多源数据合并
同一业务实体的数据来自不同系统(如 MySQL binlog + Kafka 业务流),需要合并成完整记录。适合 CoProcessFunction 自定义 state 管理。

7. 实时对账
银行/支付场景,两条流分别来自不同系统的流水记录,需要实时核对是否一致。适合 CoProcessFunction 实现 full outer join。

http://www.jsqmd.com/news/494833/

相关文章:

  • TCL发布会解析:Q9M Pro领衔,T7M系列双星登场,163吋Micro LED双曜压轴
  • 森林防火系统早期烟雾识别的误报率控制:面向测试工程师的实战指南
  • 【LLM基础】6. LLM 推理时的温度值、top_p、top_k等采样算法原理
  • 『NAS』将NAS变成单词收割机-QwertyLearner
  • 基于深度学习的表格识别技术:通过多模态预处理、神经网络分析和高精度OCR识别,实现复杂银行流水的自动化解析
  • 【第10篇】Mamba 100篇合集 · 从入门到天花板
  • 少走弯路:10个降AI率网站开源免费测评与推荐
  • java面试题总结2
  • LeetCode 1727.重新排列后的最大子矩阵:枚举矩形底边是哪一行 + 排序
  • 2026年塑料瓶粉碎机厂家实力榜TOP3,谁是行业领头羊?
  • 2026年主流论文降AI率工具实测:亲测有效的神器全在这
  • Windows系统漏洞MS17-010全解析
  • 一次签名毁掉数亿美元,深度拆解DeFi历史级漏洞
  • geocode.com.cn:经纬度查询省市县乡街道的地理编码服务
  • 花2千块法人号码核验百万条号码,结果一半是空号”:B端拓客的核验陷阱,该到头了,终于找到了个便宜的法人号码核验就是氪迹科技
  • 7-2 然后是几点
  • 2026年AI编程实战:如何用Gemini 3.1 Pro与国内镜像站提升开发效率
  • 2026年知网AIGC检测4.0升级了什么?这样降AI才有效
  • 做立辉物性表学到的word技巧
  • RabbitMQ在大数据领域的应用场景全解析
  • Linux命令 date详解
  • 推荐一款免费数据库监控诊断工具!AI智能诊断优化,20+数据库一站式支持
  • Spring Boot 3.5正式普及!Java虚拟线程+GraalVM原生镜像,启动仅0.3秒
  • 软件运营管理化的日常活动执行
  • 即兴喜剧AI测试:机器学习“现挂”的意外笑点
  • 读写锁基本概念
  • 【MinerU】技术深度解析:开源PDF文档智能提取的利器
  • Go JSON 序列化性能对比与优化
  • 救命神器! 全场景通用降AI率平台 千笔·降AIGC助手 VS PaperRed
  • 百度文库免vip下载文档_百度文库vip兑换码