当前位置: 首页 > news >正文

Flink源码阅读:双流操作

Window Join

我们先回顾一下 window join 的使用方法。

DataStream<Tuple2<String, Double>> result = source1.join(source2) .where(record -> record.f0) .equalTo(record -> record.f0) .window(TumblingEventTimeWindows.of(Time.seconds(2L))) .apply(new JoinFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>>() { @Override public Tuple2<String, Double> join(Tuple2<String, Double> record1, Tuple2<String, Double> record2) throws Exception { return Tuple2.of(record1.f0, record1.f1); } });

上述调用链路类的流转如下:

在 WithWindow 的 apply 方法中,是构建了一个 coGroupedWindowedStream,然后调用它的 apply 方法。

public <T> SingleOutputStreamOperator<T> apply( JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { // clean the closure function = input1.getExecutionEnvironment().clean(function); coGroupedWindowedStream = input1.coGroup(input2) .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) .allowedLateness(allowedLateness); return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType); }

这里可以看出,Window Join 的底层是转换成 coGroup 进行处理的。

在 JoinCoGroupFunction 中,coGroup 方法就是对两个流进行两层遍历,然后将其应用到我们自定义的 JoinFunction 上。

private static class JoinCoGroupFunction<T1, T2, T> extends WrappingFunction<JoinFunction<T1, T2, T>> implements CoGroupFunction<T1, T2, T> { private static final long serialVersionUID = 1L; public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) { super(wrappedFunction); } @Override public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception { for (T1 val1 : first) { for (T2 val2 : second) { out.collect(wrappedFunction.join(val1, val2)); } } } }

CoGroup

CoGroup 的整体用法和流程与 Join 都类似,我们就不逐个介绍了。我们直接来看 apply 方法。

public <T> SingleOutputStreamOperator<T> apply( CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) { // clean the closure function = input1.getExecutionEnvironment().clean(function); UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType()); UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2); SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 = input1.map(new Input1Tagger<T1, T2>()); taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false); taggedInput1.returns(unionType); SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 = input2.map(new Input2Tagger<T1, T2>()); taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false); taggedInput2.returns(unionType); DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2); // we explicitly create the keyed stream to manually pass the key type information in windowedStream = new KeyedStream<TaggedUnion<T1, T2>, KEY>( unionStream, unionKeySelector, keyType) .window(windowAssigner); if (trigger != null) { windowedStream.trigger(trigger); } if (evictor != null) { windowedStream.evictor(evictor); } if (allowedLateness != null) { windowedStream.allowedLateness(allowedLateness); } return windowedStream.apply( new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType); }

在 apply 方法中,先把两个流进行合并,然后创建了 windowedStream,并把窗口相关的属性设置好,最后是调用 windowedStream 的 apply 方法。

在调用windowedStream.apply方法时,又将 function 包装成了 CoGroupWindowFunction。

private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window> extends WrappingFunction<CoGroupFunction<T1, T2, T>> implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> { private static final long serialVersionUID = 1L; public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) { super(userFunction); } @Override public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out) throws Exception { List<T1> oneValues = new ArrayList<>(); List<T2> twoValues = new ArrayList<>(); for (TaggedUnion<T1, T2> val : values) { if (val.isOne()) { oneValues.add(val.getOne()); } else { twoValues.add(val.getTwo()); } } wrappedFunction.coGroup(oneValues, twoValues, out); } }

在 CoGroupWindowFunction 的 apply 方法中是将主键为 key 的流分开两个流,再去调用 JoinCoGroupFunction 的 coGroup 方法。这里的 values 都是相同的 key,原因是在 window 中维护的 windowState,它内部是一个 stateTable,窗口的 namespace 和 key 共同维护一个 state,当窗口触发时,就会对相同 key 的数据调用 apply 方法。

Interval Join

梳理完了 Window Join 和 CoGroup 之后,我们再接着看 Interval Join。还是先来回顾一下用法。

DataStream<Tuple2<String, Double>> intervalJoinResult = source1.keyBy(record -> record.f0) .intervalJoin(source2.keyBy(record -> record.f0)) .between(Time.seconds(-2), Time.seconds(2)) .process(new ProcessJoinFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>>() { @Override public void processElement(Tuple2<String, Double> record1, Tuple2<String, Double> record2, ProcessJoinFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>>.Context context, Collector<Tuple2<String, Double>> out) throws Exception { out.collect(Tuple2.of(record1.f0, record1.f1 + record2.f1)); } });

通过用法可以看出,interval join 传入的对象是两个 KeyedStream,接着使用 between 方法定义 interval join 的上下边界,最后调用 process 方法执行计算逻辑。

在调用过程中,类型的转换如下图。

我们主要关注 process 的逻辑。

public <OUT> SingleOutputStreamOperator<OUT> process( ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction, TypeInformation<OUT> outputType) { Preconditions.checkNotNull(processJoinFunction); Preconditions.checkNotNull(outputType); final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction); if (isEnableAsyncState) { final AsyncIntervalJoinOperator<KEY, IN1, IN2, OUT> operator = new AsyncIntervalJoinOperator<>( lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, leftLateDataOutputTag, rightLateDataOutputTag, left.getType() .createSerializer( left.getExecutionConfig().getSerializerConfig()), right.getType() .createSerializer( right.getExecutionConfig().getSerializerConfig()), cleanedUdf); return left.connect(right) .keyBy(keySelector1, keySelector2) .transform("Interval Join [Async]", outputType, operator); } else { final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator = new IntervalJoinOperator<>( lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, leftLateDataOutputTag, rightLateDataOutputTag, left.getType() .createSerializer( left.getExecutionConfig().getSerializerConfig()), right.getType() .createSerializer( right.getExecutionConfig().getSerializerConfig()), cleanedUdf); return left.connect(right) .keyBy(keySelector1, keySelector2) .transform("Interval Join", outputType, operator); } }

Interval join 是基于 ConnectedStream 实现的,ConnectedStream 提供了更加通用的双流操作,它将两个流组合成一个 TwoInputTransformation,然后加入执行图中。

具体的 Operator 是 IntervalJoinOperator 或 AsyncIntervalJoinOperator,它们都是 TwoInputStreamOperator 的实现类,提供processElement1processElement2两个方法分别处理两个输入源的数据,最终都调用的是 processElement。

private <THIS, OTHER> void processElement( final StreamRecord<THIS> record, final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer, final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft) throws Exception { final THIS ourValue = record.getValue(); final long ourTimestamp = record.getTimestamp(); if (ourTimestamp == Long.MIN_VALUE) { throw new FlinkException( "Long.MIN_VALUE timestamp: Elements used in " + "interval stream joins need to have timestamps meaningful timestamps."); } if (isLate(ourTimestamp)) { sideOutput(ourValue, ourTimestamp, isLeft); return; } addToBuffer(ourBuffer, ourValue, ourTimestamp); for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket : otherBuffer.entries()) { final long timestamp = bucket.getKey(); if (timestamp < ourTimestamp + relativeLowerBound || timestamp > ourTimestamp + relativeUpperBound) { continue; } for (BufferEntry<OTHER> entry : bucket.getValue()) { if (isLeft) { collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp); } else { collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp); } } } long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp; if (isLeft) { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); } else { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); } }

在 IntervalJoinOperator 中维护了两个 MapState,每个消息进来的时候,都会加入到 MapState 中,key 是 timestamp,value 是一个元素的列表。然后遍历另一个 MapState,得到符合条件的数据。最后是为每条数据注册一个定时器,当时间超过有效范围后,会从 MapState 中清除这个时间戳的数据。

总结

本文我们梳理了 Flink 的三种双流操作的源码,我们了解到 Window Join 底层是通过 CoGroup 实现的。CoGroup 本身是将两个流合并成 WindowedStream 并依赖于 WindowState 进行数据 join。最后 Interval Join 是通过 ConnectedStreams 实现的,内部的 IntervalJoinOperator 会维护两个 MapState,通过 MapState 进行数据关联。

http://www.jsqmd.com/news/572975/

相关文章:

  • 深入理解 SQL 中的 DATEDIFF 函数
  • SDXL-Turbo参数详解:1步推理设置、CFG scale调优与英文提示词规范
  • OpenAirInterface项目解析 04 SSB实现
  • Step3-VL-10B-Base模型Python安装与环境变量配置详解
  • 用噪音打破听觉恐怖谷:RTE 开发者社区发布 RealNoise™ TTS:全球首个原生合成动态声场的语音大模型
  • 突破限制的完整方案:开源工具免费解锁Cursor Pro功能实战指南
  • 别再乱选ASCII/HEX了!野火串口调试助手发送接收区配置详解(附实战案例)
  • 实战演练:基于快马平台快速构建开yun架构的物联网监控系统
  • PlugY:暗黑破坏神2单机玩家的开源功能扩展工具
  • STM32智能门锁进阶:RC522 RFID模块SPI通讯与卡号鉴权实战
  • 如何在macOS和Linux上快速解除iOS 15-16设备的iCloud激活锁
  • 3步实现跨平台日历同步:从需求到落地
  • AI辅助技能提升:用快马生成智能代码审查工具,让AI成为你的编程导师
  • 支持400米深井测量与短信报警:地下水位监测站技术解析
  • S2-Pro模型推理服务高可用部署:基于Docker与Kubernetes的架构
  • 文章标题:基于三菱PLC的门禁系统设计与实施
  • 声纹识别的概念
  • OpenTelemetry Java Agent实战:5分钟为Spring Boot应用添加监控埋点
  • VS Code + Git + 阿里云效Codeup:三件套搞定团队协作,从配置到避坑一条龙
  • 提升NLP开发效率:基于快马平台快速生成定制化transformer文本分类项目
  • 千问3.5-2B部署实操手册:supervisor服务管理命令+端口监听+日志定位全解析
  • EcoVadis评估辅导选购指南:5大标准选对可持续发展伙伴 - 奋飞咨询ecovadis
  • LLD 自动发现场景 → 对应使用哪种探测方式(SNMP/HTTP/Agent)最优
  • AFSim仿真系统中的7大坐标系统详解:从世界坐标到天线坐标的完整指南
  • N_m3u8DL-CLI-SimpleG:M3U8视频下载终极指南,三步搞定在线视频
  • 探秘2026食品厂无尘车间:高效生产与卫生保障并存,净化车间/洁净车间/净化工程/无尘车间,无尘车间实力厂家怎么选购 - 品牌推荐师
  • 实战进阶:基于快马生成的代码,打造个人专属的Markdown笔记应用
  • 在Windows上解锁B站新体验:BiliBili-UWP客户端3分钟快速上手指南
  • 激光熔覆仿真:Ansys Workbench下的单层单道熔覆温度场仿真及误差率控制
  • MPV_PlayKit深度评测:老旧硬件的4K播放奇迹与跨平台解码方案