Flink任务提交与架构模型(五)
TaskSlot任务槽
提交到集群中的Flink程序最终会转换成一个个的Subtask,Subtask是Flink任务调度的基本单元,这些task最终被发送到不同的TaskManager节点上分布式执行,假设现在我们有一个TaskManager,一个Flink 任务有多个Subtask,这些Subtask能否正常在该TaskManager上启动?到底一个TaskManager上能同时执行多少个SubtasK?要了解这些内容就必须知道Flink中TaskSlot以及SlotSharingGroup(Slot共享组)相关内容。
TaskSlot任务槽
Flink集群中每个TaskManager是一个JVM进程,可以在TaskManagr中执行一个或者多个subtask,为了能控制一个TaskManager中接收多少个Task,TaskManager节点上可以提供taskslot(任务槽),一个TaskManager上可以划分多个taskslot,taskslot是Flink系统中资源调度的最小单元,可以对TaskManager上的资源进行明确划分,每个taskslot可以运行一个或者多个subtask,每个JobManager上至少有一个taskSlot。
每个taskSlot都有固定的资源,假设一个TaskManager有三个TaskSlots,那么每个TaskSlot会将TaskMananger中的内存均分,即每个任务槽的内存是总内存的1/3,分配资源意味着subtask不会与其他作业的subtask竞争内存,taskslot的作用就是分离任务的托管内存,不会发生cpu隔离。
通过调整taskSlot的数据量,用户可以指定每个TaskManager有多少task slot,TaskManager可以配置成单Slot模式,这样这个JobManager上运行的任务就独占了整个JVM进程,更多的taskSlot意味着更多的subtask可以共享同一个JVM,同一个JVM中的task共享TCP连接和心跳信息,共享数据集和数据结构,从而减少TaskManager中的task开销。
在Flink Standalone集群中我们可以通过配置FLINK_HOME/conf/flink-conf.yaml文件中的"taskmanager.numberOfTaskSlots"参数来指定每个JobManager启动后拥有几个taskslot,如果是基于其他模式提交任务,可以配置客户端中的$FLINK_HOME/conf/flink-conf.yaml配置文件。
#flink-conf.yamml文件中配置每个taskmanager拥有的taskslot个数
taskmanager.numberOfTaskSlots: 3
我们可以通过配置每个TaskManager上taskslot的数量来决定每个TaskManager上可以执行多少subtask,由于taskslot只会对内存进行隔离不会对CPU进行隔离,一台TaskManager taskslot越多意味着越多的taskslot争夺CPU资源,所以 taskslot的值设置建议和该 TaskManager 节点 CPU core 的数量保持一致。
TaskSlot共享&SlotSharingGroup共享组
默认情况下,Flink 允许 subtask 共享 taskSlot,即便它们是不同的 subtask,只要是来自于同一Flink作业即可(Flink不允许属于不同作业的task共享同一个slot),结果就是一个 slot 可以持有整个作业管道。
Flink中一个taskslot中可以运行多个subtask有什么好处呢?假设一个taskslot中只能运行一个subtask,上图中一共有13个subtask,对应的就需要13个slot资源,我们在提交Flink应用程序时需要关注我们程序中到底有多少subtask,然后再衡量Flink集群中slot个数是否足够,在一定程序上需要的slot资源较多。另外一个方面是在Flink中运行的task对CPU资源的占用不同,有CUP密集型task操作和CPU非密集型task操作情况,例如在Flink集群中source和map的操作只是读取数据进行转换,对应task运行占用的cpu资源极短,但是Window这种窗口聚合操作涉及大量数据计算,往往占用CPU资源时间长,这就会导致在运行任务时source/map、sink操作时间非常快,Window操作时间非常长,source/map对应的subtask会等待window对应的subtask执行,同样sink的对应的subtask也会等待window对应的subtask执行,站在集群slot角度上来看就出现了一些taskslot非常"繁忙",一些taskslot非常"轻松",集群的资源综合利用不高。
taskslot共享就可以很好地解决以上问题,Flink任务所有的subtask均衡的分散到不同的taskslot上执行,一个taskslot贯穿执行整个流程的subtask,这样每个taskslot、每个TaskManager上的资源使用情况非常均衡。所以允许 slot 共享有两个主要优点:
Flink 集群所需的 taskSlot 和作业中使用的最大并行度恰好一样,不需要关注Flink程序总共包含多少个 subtask。
容易获得更好的资源利用。如果没有 slot 共享,非密集 subtask(source/map())将阻塞和密集型 subtask(window())一样多的资源。通过 slot 共享,确保繁重的 subtask 在 TaskManager 之间公平分配。
在Flink中实现taskslot共享是通过SlotSharingGroup(Slot共享组,简称SSG)实现的,默认在Flink中有名称为"default"的默认SSG,所有算子操作都在当前这个SSG中,所以我们在执行Flink代码时会自动进行slot组共享。我们也可以在代码中手动指定某些算子操作的SSG组做到某些操作独占一个slot,指定方式如下:
TaskSlot与并行度关系
了解taskslot之后,我们很容易和之前的学习的并行度(Parallelism)混淆,两者关系如下: taskslot是静态概念,指的是 Flink TaskManager 能够并发执行的 task 数。并行度是动态概念,指的是每个应用程序实际的并发能力。
如果Flink集群中所有slot个数大于等于Flink 任务的并行度(Flink中所有算子最大并行度),那么Flink程序可以正常运行,否则Flink程序不能正常启动。我们结合下图来理解TaskSlot和并行度的关系:
首先我们的Flink集群有3个TaskManager,每个TaskManager根据配置划分3个slot,所以Flink整个集群Slot总数为9,代表了当前集群能够支持并发task的最高能力。
如图:example1中当我们向集群中提交Flink任务(WordCount)只有1个并行度时,这个任务只会占用集群中的1个taskslot。当我们向集群中提交的Flink任务有2个并行度时,这个任务占用集群2个taskslot,如图example2。
如上图example3中当我们提交的Flink任务有9个并行度时,任务在Flink集群中占用了所有的slot资源,当前集群不能再提交新的任务,因为当前集群中没有更多资源支撑新的Flink任务运行。
如果提交的Flink 任务所有算子并行度为9,就算其中有一些操作并行度为1(如example4中sink操作)同样占用Flink集群9个taskslot。
SSG测试
下面编写Flink Java代码来测试Flink 中SlotSharingGroup(SSG)分组情况,在代码中我们设置整体并行度为3,代码如下:
均匀分配TaskSlot
测试SlotSharingGroup的代码基于Standalone集群提交时我们发现当使用集群6个slot时,Standalone集群中在各个TaskManager节点划分taskslot时存在分配Task不均匀的问题,在Standalone集群中如果在客户端提交多个Flink 作业时这种分配taskslot不均匀问题极有可能造成某台TaskManager 分配的taskslot非常多负载高,一些TaskManager分配的taskslot非常少负载低的问题。
Flink在1.11版本后引入了" cluster.evenly-spread-out-slots"参数解决Standalone中taskslot分配不均匀问题。该参数默认值为false代表task在集群中不匀衡的分配到各个TaskManager上,该参数只针对standalone集群有效。
我们可以在Flink Standalone集群各个节点的$FLINK_HOME/conf/flink-conf.yaml文件中配置该参数为true均衡的在各个TaskManager节点上调度各个task:
#所有Flink Standalone 集群节点都配置 flink-conf.yaml
cluster.evenly-spread-out-slots: true
在所有Flink Standalone集群中配置完成以上参数后,重新启动Flink集群,在客户端提交上一小节中的代码,重新观察WebUI中slot的分配情况:
通过以上验证我们发现在Standalone中配置" cluster.evenly-spread-out-slots"参数为true后,task会均匀的在各个TaskManager上进行调度。
当基于Yarn提交Flink应用程序时Yarn会动态的在各个NodeManager节点上启动TaskManager进行划分Slot分配Task,测试如下:将以上代码并行度提高到6同时设置两个SSG(defalut和my-ssg-group),修改如下:
以上代码设置全局并行度为6,后续设置了"my-ssg-group"SSG,所以整个程序提交到集群中使用的Slot为12个,将以上代码打包使用Yarn-Application模式提交到Yarn集群中,客户端设置每个TaskManager有3个taskslot,所以会启动4个TaskManager,提交任务后观察对应的WebUI,提交命令如下:进入到FlinkWebUI查看任务使用Slot情况:
