当前位置: 首页 > news >正文

Flink编程模型与API(四)

Transformation 类算子是 Apache Flink 中用于定义数据流处理的基本构建块。它们允许对DataStream数据流进行转换和操作,包括数据转换、数据操作和数据重组,通过Transformation类算子,可以对输入数据流进行映射、过滤、聚合等操作,生成新的DataStream数据流作为输出,以满足特定的处理需求。下面分别介绍Flink中常见的Transformation类算子。

map

map用于对输入的DataStream数据流中的每个元素进行映射操作,它接受一个函数作为参数,该函数将每个输入元素转换为一个新的元素,并生成一个新的数据流作为输出。DataStream类型数据通过map函数进行数据转换后还会得到DataStream类型,其中数据格式可能会发生变化。 下图演示将输入数据集中的每个数值全部加1处理,经过map算子转换后输出到下游数据集。

flatMap

flatMap算子用于对输入的DataStream中的每个元素进行扁平化映射操作的算子,它接受一个函数作为参数,该函数将每个输入元素转换为零个或多个新的元素,并生成一个新的DataStream数据流作为输出。DataStream类型数据通过map函数进行数据转换后还会得到DataStream类型,其中数据格式可能会发生变化。

与map算子不同,flatMap算子可以生成比输入更多的元素,因此可以用于扁平化操作。下图表示通过flatMap算子对输入数据集中每行数据按照逗号分割得到新的数据流输出到下游。

Filter

keyBy

KeyBy算子用于将输入的DataStream按照指定的键或键选择器函数进行分组操作,它接受一个键选择器函数作为参数,该函数根据输入元素返回一个键,用于将数据流中的元素分组到不同的分区中,相同键的元素分配到同一个分区中,以便后续的操作可以基于键对数据进行聚合、合并或其他操作。

KeyBy算子使用时可以通过KeySelector函数来指定key键,DataStream通过KeyBy算子处理后得到的是KeyedStream对象,该对象也是DataStream。默认KeyBy算子会对数据流中指定的key键的hash值与Flink分区数(并行度)进行取模运算,从而决定该条数据后续被哪个并行度处理,如果Flink DataStream类型是POJOs类型,需要在该类型中重写hashCode方法,否则后续不能正确的将相同数据进行分组处理。

下图表示通过KeyBy算子将DataStream中的数据按照指定的key进行分组统计value总和。

Aggregations

Aggregations(聚合函数)是Flink中用于对输入数据进行聚合操作的函数集合,它们可以应用于KeyedStream上,将一组输入元素聚合为一个输出元素。

Flink提供了多种聚合函数,包括sum、min、minBy、max、maxBy,这些函数都是常见的聚合操作,作用如下:

sum:针对输入keyedStream对指定列进行sum求和操作。
min:针对输入keyedStream对指定列进行min最小值操作,结果流中其他列保持最开始第一条数据的值。
minBy:同min类似,对指定的字段进行min最小值操作minBy返回的是最小值对应的整个对象。
max:针对输入keyedStream对指定列进行max最大值操作,结果流中其他列保持最开始第一条数据的值。
maxBy:同max类似,对指定的字段进行max最大值操作,maxBy返回的是最大值对应的整个对象。
Java代码实现

Java代码和Scala代码执行后结果如下:

# sum执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=150} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=200} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=290} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=590} # min 执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=30} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=30} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=30} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=30} # minBy 执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343474909, duration=120} StationLog{sid='sid1', callOut='18600000001', callIn='18600000002', callType='fail', callTime=1685343474909, duration=30} StationLog{sid='sid1', callOut='18600000001', callIn='18600000002', callType='fail', callTime=1685343474909, duration=30} StationLog{sid='sid1', callOut='18600000001', callIn='18600000002', callType='fail', callTime=1685343474909, duration=30} StationLog{sid='sid1', callOut='18600000001', callIn='18600000002', callType='fail', callTime=1685343474909, duration=30} # max 执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=300} # maxBy 执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343559342, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343559342, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343559342, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343559342, duration=120} StationLog{sid='sid1', callOut='18600000004', callIn='18600000005', callType='success', callTime=1685343559342, duration=300}

reduce

union

union算子是Flink流处理框架中数据流合并算子,可以将多个输入的DataStream多个数据流进行合并,并输出一个新的DataStream数据流作为结果,适用于需要将多个数据流合并为一个流的场景。

需要注意的是union合并的数据流类型必须相同,合并之后的数据流包含两个或多个流中所有元素,并且数据类型不变。下图表示将两个流进行合并得到合并后的结果流,并将结果输出到下游。

connect

connect算子将两个输入的DataStream数据流作为参数,将两个不同数据类型的DataStream数据流连接在一起,生成一个ConnectedStreams对象作为结果,与union算子不同,union只是简单的将两个类型一样的流合并在一起,而connect算子可以将不同类型的DataStream连接在一起,并且connect只能连接两个流。

connect生成的结果保留了两个输入流的类型信息,例如:dataStream1数据集为(String, Int)元祖类型,dataStream2数据集为Int类型,通过connect连接算子将两个不同数据类型的流结合在一起,其内部数据为[(String, Int), Int]的混合数据类型,保留了两个原始数据集的数据类型。

对于连接后的数据流可以使用map、flatMap、process等算子进行操作,但内部方法使用的是CoMapFunction、CoFlatMapFunction、CoProcessFunction等函数来进行处理,这些函数称作“协处理函数”,分别接收两个输入流中的元素,并生成一个新的数据流作为输出,输出结果DataStream类型保持一致。

Java代码实现

iterate

iterate算子用于实现迭代计算的算子,它允许对输入的DataStream进行多次迭代操作,直到迭代条件不满足时迭代停止,该算子适合迭代计算场景,例如:机器学习中往往会对损失函数进行判断是否到达某个精度来判断训练是否需要结束就可以使用该算子来完成。

http://www.jsqmd.com/news/932670/

相关文章:

  • AI编程应用
  • ImageGlass:90+格式支持的Windows图片浏览器,你的专业视觉助手
  • 3分钟免费解锁IDM完整版:简单高效的激活脚本使用指南
  • 2026 苏州瓷砖空鼓翘边维修优选榜单 各区靠谱修缮企业盘点 - 吉修匠
  • 你的QQ音乐文件只能在特定App播放?这个macOS工具帮你彻底解锁音乐自由
  • 3步免费解锁WeMod专业版:Wand-Enhancer完全使用指南
  • C++ Lambda表达式:从入门到精通
  • Flink的函数接口与富函数类
  • Veo 2企业级工作流集成指南:如何在Adobe Premiere+Runway+Veo 2三端同步触发场景切换(含时间码精准对齐协议)
  • 因瓦36选购,上海三青股份有哪些优势 - mypinpai
  • 2026年零基础无人机考证机构评测:航拍无人机培训/院校低空专业共建/零基础学无人机/低空合规加盟/低空无人机院校加盟/选择指南 - 优质品牌商家
  • Obsidian科研模板库:研究者的终极知识管理解决方案
  • 细聊讯灵招商负责人的好用之处 - mypinpai
  • 思源宋体CN:7款免费中文字体快速上手完全指南
  • 字节跳动2026年算法面试高频题及最优解法(附实战演练)
  • 如何快速分析虚幻引擎Pak文件:5个可视化技巧
  • 2026年名酒回收服务评测:旭日名酒及同行对比解析 - 优质品牌商家
  • Ubuntu换源后`apt update`还是慢?除了镜像源,你可能忽略了这3个关键设置(附Ubuntu 18.04/20.04实测)
  • AI视频版权归属混乱,创作者损失超$2.7亿/年,如何用区块链存证自救?
  • 2026年6月杭州门窗推荐排行榜 品牌实力实测盘点 - 优质品牌商家
  • Sora 2立体视频生成实战指南:5步完成从文本提示→深度图生成→视差校准→双目合成→HDR10+输出全流程
  • BGP配置
  • CKKS同态加密实战:用Python实现一个能算‘密文’的AI模型保护方案
  • 标识牌设计制作多少钱 - mypinpai
  • 2026年航宇顺物流航空急件服务多少钱 - mypinpai
  • Sora 2音乐视频制作提速300%:基于FFmpeg+Whisper+Custom Diffusion的端到端流水线
  • 不只是心跳:深入理解Aurix TC3XX时钟树如何影响你的系统性能与功耗
  • Win11双显卡(核显+独显)如何为不同CUDA版本指定GPU?实测避坑指南
  • 用Backtrader回测SMA双均线策略:20/60周期参数实战与避坑指南
  • 实战指南:如何用Tessent的Automotive-Grade ATPG提升汽车芯片测试质量