Flink编程模型与API(四)
Transformation 类算子是 Apache Flink 中用于定义数据流处理的基本构建块。它们允许对DataStream数据流进行转换和操作,包括数据转换、数据操作和数据重组,通过Transformation类算子,可以对输入数据流进行映射、过滤、聚合等操作,生成新的DataStream数据流作为输出,以满足特定的处理需求。下面分别介绍Flink中常见的Transformation类算子。
map
map用于对输入的DataStream数据流中的每个元素进行映射操作,它接受一个函数作为参数,该函数将每个输入元素转换为一个新的元素,并生成一个新的数据流作为输出。DataStream类型数据通过map函数进行数据转换后还会得到DataStream类型,其中数据格式可能会发生变化。 下图演示将输入数据集中的每个数值全部加1处理,经过map算子转换后输出到下游数据集。
flatMap
flatMap算子用于对输入的DataStream中的每个元素进行扁平化映射操作的算子,它接受一个函数作为参数,该函数将每个输入元素转换为零个或多个新的元素,并生成一个新的DataStream数据流作为输出。DataStream类型数据通过map函数进行数据转换后还会得到DataStream类型,其中数据格式可能会发生变化。
与map算子不同,flatMap算子可以生成比输入更多的元素,因此可以用于扁平化操作。下图表示通过flatMap算子对输入数据集中每行数据按照逗号分割得到新的数据流输出到下游。
Filter
keyBy
KeyBy算子用于将输入的DataStream按照指定的键或键选择器函数进行分组操作,它接受一个键选择器函数作为参数,该函数根据输入元素返回一个键,用于将数据流中的元素分组到不同的分区中,相同键的元素分配到同一个分区中,以便后续的操作可以基于键对数据进行聚合、合并或其他操作。
KeyBy算子使用时可以通过KeySelector函数来指定key键,DataStream通过KeyBy算子处理后得到的是KeyedStream对象,该对象也是DataStream。默认KeyBy算子会对数据流中指定的key键的hash值与Flink分区数(并行度)进行取模运算,从而决定该条数据后续被哪个并行度处理,如果Flink DataStream类型是POJOs类型,需要在该类型中重写hashCode方法,否则后续不能正确的将相同数据进行分组处理。
下图表示通过KeyBy算子将DataStream中的数据按照指定的key进行分组统计value总和。
Aggregations
Aggregations(聚合函数)是Flink中用于对输入数据进行聚合操作的函数集合,它们可以应用于KeyedStream上,将一组输入元素聚合为一个输出元素。
Flink提供了多种聚合函数,包括sum、min、minBy、max、maxBy,这些函数都是常见的聚合操作,作用如下:
sum:针对输入keyedStream对指定列进行sum求和操作。
min:针对输入keyedStream对指定列进行min最小值操作,结果流中其他列保持最开始第一条数据的值。
minBy:同min类似,对指定的字段进行min最小值操作minBy返回的是最小值对应的整个对象。
max:针对输入keyedStream对指定列进行max最大值操作,结果流中其他列保持最开始第一条数据的值。
maxBy:同max类似,对指定的字段进行max最大值操作,maxBy返回的是最大值对应的整个对象。
Java代码实现
Java代码和Scala代码执行后结果如下:
# sum执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=150} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=200} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=290} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=590} # min 执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=30} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=30} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=30} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=30} # minBy 执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343474909, duration=120} StationLog{sid='sid1', callOut='18600000001', callIn='18600000002', callType='fail', callTime=1685343474909, duration=30} StationLog{sid='sid1', callOut='18600000001', callIn='18600000002', callType='fail', callTime=1685343474909, duration=30} StationLog{sid='sid1', callOut='18600000001', callIn='18600000002', callType='fail', callTime=1685343474909, duration=30} StationLog{sid='sid1', callOut='18600000001', callIn='18600000002', callType='fail', callTime=1685343474909, duration=30} # max 执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=300} # maxBy 执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343559342, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343559342, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343559342, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343559342, duration=120} StationLog{sid='sid1', callOut='18600000004', callIn='18600000005', callType='success', callTime=1685343559342, duration=300}reduce
union
union算子是Flink流处理框架中数据流合并算子,可以将多个输入的DataStream多个数据流进行合并,并输出一个新的DataStream数据流作为结果,适用于需要将多个数据流合并为一个流的场景。
需要注意的是union合并的数据流类型必须相同,合并之后的数据流包含两个或多个流中所有元素,并且数据类型不变。下图表示将两个流进行合并得到合并后的结果流,并将结果输出到下游。
connect
connect算子将两个输入的DataStream数据流作为参数,将两个不同数据类型的DataStream数据流连接在一起,生成一个ConnectedStreams对象作为结果,与union算子不同,union只是简单的将两个类型一样的流合并在一起,而connect算子可以将不同类型的DataStream连接在一起,并且connect只能连接两个流。
connect生成的结果保留了两个输入流的类型信息,例如:dataStream1数据集为(String, Int)元祖类型,dataStream2数据集为Int类型,通过connect连接算子将两个不同数据类型的流结合在一起,其内部数据为[(String, Int), Int]的混合数据类型,保留了两个原始数据集的数据类型。
对于连接后的数据流可以使用map、flatMap、process等算子进行操作,但内部方法使用的是CoMapFunction、CoFlatMapFunction、CoProcessFunction等函数来进行处理,这些函数称作“协处理函数”,分别接收两个输入流中的元素,并生成一个新的数据流作为输出,输出结果DataStream类型保持一致。
Java代码实现
iterate
iterate算子用于实现迭代计算的算子,它允许对输入的DataStream进行多次迭代操作,直到迭代条件不满足时迭代停止,该算子适合迭代计算场景,例如:机器学习中往往会对损失函数进行判断是否到达某个精度来判断训练是否需要结束就可以使用该算子来完成。
