Spark任务调度详解:Stage级与Task级调度机制
Spark任务调度详解:Stage级与Task级调度机制
Spark RDD通过转换(Transactions)算子,形成了血缘关系图DAG,最后通过行动(Action)算子,触发Job并调度执行。Spark在具体任务的调度中,总的分两路进行:Stage级别调度和Task级别调度。
- DAGScheduler负责Stage级的调度,主要是将DAG切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。
- TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行。
具体来说,以SparkContext为程序运行的总入口,在SparkContext的初始化过程中,Spark会分别创建DAGScheduler作业调度和TaskScheduler任务调度两级调度模块。
- DAGScheduler是基于任务阶段的高层调度模块,它为每个Spark作业计算具有依赖关系的多个调度阶段(通常根据shuffle来划分),然后为每个阶段构建出一组具体的任务(通常会考虑数据的本地性等),然后以TaskSets(任务组) 的形式提交给任务调度模块来具体执行。
- TaskScheduler则负责具体启动任务、监控和汇报任务运行情况。
总体的调度流程为:
- 构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
- 资源管理器分配Executor资源并启动StandaloneExecutorBackend(这是在standalone模式下),Executor运行情况将随着心跳发送到资源管理器上;
- SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor。
- Task在Executor上运行,运行完毕释放所有资源。
1、Spark Stage级调度
Spark的任务调度是从DAG切割开始,主要是由DAGScheduler来完成。当遇到一个Action操作后就会触发一个Job的计算,并交给DAGScheduler来处理。
(1)切分stage
DAGScheduler会根据RDD的血缘关系构成的DAG进行切分,将一个Job划分为若干Stages,具体划分策略是:从后往前,由最终的RDD不断通过依赖回溯判断父依赖是否是宽依赖,遇到一个shuffle就划分一个Stage。无shuffle的称为窄依赖,窄依赖之间的RDD被划分到同一个Stage中。划分的Stages分两类,一类叫做ResultStage,为DAG最下游的Stage,由Action方法决定,另一类叫做ShuffleMapStage,为下游Stage准备数据。
stage任务调度本身是一个反向的深度遍历算法,以下图wordcount为例。此处只有saveAsTextFile为行动算子,该 Job 由 RDD-3 和 saveAsTextFile方法组成,根据依赖关系回溯,知道回溯至没有依赖的RDD-0。回溯过程中,RDD-2和RDD-3存在reduceByKey的shuffle,会划分stage,由于RDD-3在最后一个stage,即划为ResultStage,RDD-2,RDD-1,RDD-0,这些依赖之间的转换算子flatMap,map没有shuffle,因为他们之间是窄依赖,划分为ShuffleMapStage。
(2)打包Taskset提交Stage
一个Stage如果没有父Stage,那么从该Stage开始提交,父Stage执行完毕才能提交子Stage。Stage提交时会将Task信息(分区信息以及方法等)序列化并被打包成TaskSet交给TaskScheduler,一个Partition对应一个Task,另一方面TaskScheduler会监控Stage的运行状态,只有Executor丢失或者Task由于Fetch失败才需要重新提交失败的Stage以调度运行失败的任务,其他类型的Task失败会在TaskScheduler的调度过程中重试。
SchedulerBackend
不过,如果我们给集群中处于繁忙或者是饱和状态的 Executors 分发了任务,执行效果会大打折扣。因此,在分发任务之前,调度系统得先判断哪些节点的计算资源空闲,然后再把任务分发过去。那么,调度系统是怎么判断节点是否空闲的呢?
SchedulerBackend 就是用来干这个事的,它是对于资源调度器的封装与抽象,为了支持多样的资源调度模式如Standalone、YARN 和 Mesos,SchedulerBackend 提供了对应的实现类。在运行时,Spark 根据用户提供的 MasterURL,来决定实例化哪种实现类的对象。MasterURL就是你通过各种方式指定的资源管理器,如–masterspark://ip:host(Standalone模式)、–master yarn(YARN 模式)。
对于集群中可用的计算资源,SchedulerBackend 会用一个叫做 ExecutorDataMap 的数据结构,来记录每一个计算节点中 Executors 的资源状态。ExecutorDataMap 是一种HashMap,它的 Key 是标记 Executor 的字符串,Value 是一种叫做 ExecutorData 的数据结构,ExecutorData 用于封装 Executor 的资源状态,如 RPC 地址、主机地址、可用CPU 核数和满配 CPU 核数等等,它相当于是对 Executor 做的“资源画像”。
总的来说,对内,SchedulerBackend用ExecutorData 对 Executor 进行资源画像;对外,SchedulerBackend 以 WorkerOffer 为粒度提供计算资源,WorkerOffer 封装了机地址和 CPU 核数,用来表示一份可用于调度任务的空闲资源。显然,基于Executor 资源画像,SchedulerBackend 可以同时提供多个 WorkerOffer 用于分布式任务调度。WorkerOffer 这个名字起得蛮有意思,Offer 的字面意思是公司给你提供的工作机会,结合 Spark 调度系统的上下文,就变成了使用硬件资源的机会。
2、 Spark Task 级调度
SparkTask的调度是由TaskScheduler来完成,TaskScheduler将接收的TaskSet封装为TaskSetManager加入到调度队列中。同一时间可能存在多个TaskSetManager,一个TaskSetManager对应一个TaskSet,而一个TaskSet含有n多个task信息,这些task都是同一个stage的。
TaskScheduler初始化后会启动SchedulerBackend,它负责跟外界打交道,接收Executor的注册信息,并维护Executor的状态,SchedulerBackend会监控到有资源后,会询问TaskScheduler有没有任务要运行。
TaskScheduler会从调度队列中按照指定的调度策略选择TaskSetManager去调度运行。TaskSetManager按照一定的调度规则(fifo或者fair)一个个取出task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。
Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend则告诉TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager,并通知到该TaskSetManager,这样TaskSetManager就知道Task的运行状态。