【hadoop】Yarn任务提交启动流程
本文概述了YARN(Yet Another Resource Negotiator)的任务提交与启动流程。首先介绍了YARN中的核心组件:ResourceManager(RM)负责全局资源调度,NodeManager(NM)管理单节点资源,ApplicationMaster(AM)协调具体应用运行,以及Container作为资源封装单位。然后详细描述了任务从客户端提交到最终运行的8个主要步骤:客户

1.概述
学习 yarn,就不得不涉及 yarn 的任务运行流程,虽然网上有很多文章进行相关的总结,但总觉得是别人,因此还是按照自己的理解,进行总结分享,也算是对过程的输出交付。
2.【名词概念】
首先来说明下 yarn 中的一些概念,后续流程中都会涉及到。
ResourceManager(RM)
负责整个集群的资源管理和分配,处理客户端和 AM 的请求,为 containr 分配资源(将任务调度到不同的 NM 上运行),同时通过 NM 的心跳获取所有 container 的运行状态,并进行必要的失败重试。
NodeManager(NM)
集群资源(CPU 和内存,还包括 GPU 等)的实际拥有者,接收 RM 和 AM 的请求,启动具体的 container,并通过心跳向 RM 汇报 container 的运行情况。
Application
对应 1.X 版本中的 job,它可以是一个 MapReduce 应用,也可以是一个 spark 应用,或者 flink 应用等。
ApplicationMaster(AM)
每个 Application 都有一个 ApplicationMaster,负责管理具体的某个应用,包括向 RM 申请具体任务所需的资源,向 NM 请求启动具体的任务,同时监控所有任务的运行状况,并进行必要的容错处理。spark 的 driver,flink 的 jobManager 都属于 AM。
Container
Container 是 YARN 中的一个抽象概念,它是任务运行所需资源,环境变量,启动参数等的一个封装和抽象。
一个 Application 中可以分为两类 Container,一类是前面提到的 AM,一类是具体任务的 container,常见的任务 container 有 MR 中的 map 任务、reduce 任务、spark 中的 executor、flink 的 taskManager。
3.【整体流程】
首先通过一张图来看下客户端提交任务到最终运行的整体流程。

-
客户端向 RM 提交应用,本质上是向 RM 请求启动 AM
-
RM 选择合适的 NM,并向 NM 发送请求,要求启动 AM
-
NM 收到启动 AM 的请求后,根据所携带的参数,下载 AM 所依赖的资源到本地
-
完成依赖资源的本地化后,NM 启动 AM 进程
-
AM 启动后向 RM 进行注册,并向 RM 申请启动任务 containr 所需的资源
-
RM 根据 NM 的资源汇报情况,向 AM 回复资源(container)的分配情况,即给请求的任务 container 分配具体的 NM。
-
AM 根据任务 container 分配的 NM,向对应的 NM 发送请求,要求启动任务 container
-
NM 收到启动任务 container 的请求后,同样根据请求参数,先完成依赖资源的本地化,然后启动任务 container 进程。
整体流程中有几点需要注意:
-
RM 中为 container 分配 container,是等待 NM 进行心跳汇报后,被动触发进行的。
-
任务 container 的运行状态,是 NM 通过心跳向 RM 汇报,RM 再通过 AM 的心跳响应告知对应的 AM。
4.【RM 中的流程】
前面概念中提到了 application、container 以及 AM。在 RM 中,分别用 Application,Container,AppAttempt 类来对应这三个概念。整个任务提交运行流程也就围绕这三个类实例的创建,以及各自的状态机变化完成。
当然,还有一块内容未涉及,那就是调度器模块,这里暂不深入,后续再单独整理说明。
来看看任务提交运行在 RM 中的流程:

-
客户端向 RM 申请 Application 的 ID
-
RM 内部生成 application 的唯一 ID
-
通过 rpc 响应将 applicaiton ID 告知客户端
-
客户端携带 ID,以及 container 上下文,通过 RPC 向 RM 提交任务。
-
RM 的 rpc 服务将请求转发给内部的 AppManager 模块。
-
AppManager 创建一个 App 实例对象(RMAppImpl)。
-
随后向该实例对象发送 start 事件。
-
RMAppImpl 收到事件后,向状态存储服务请求保存 App 状态,状态从 NEW 变为 NEW_SAVING。
-
状态存储服务完成 APP 信息的存储后,再以事件的形式告知 RMAppImpl。
-
RMAppImpl 向调度器发送添加 APP 的事件,状态从 NEW_SAVING 变为 SUBMITTED。
-
调度器收到消息后,进行相应的处理动作,然后告知 RMAppImpl 应用被接受。
-
RMAppImpl 创建 Attempt 实例对象(RMAppAttemptImpl),
-
然后,向其发送 start 事件,随后状态从 SUBMITTED 变为 ACCEPTED。
-
Attempt 创建后,先向 ApplicationMasterService 进行注册,使其在内存中有对应的记录,方便后面真正的 AM 进程进行注册。
-
然后,向调度器发送添加 Attempt 事件。
-
调度器同样进行一系列的处理,包括权限判断,队列应用计数等,在内存中记录相关信息,最后通知 Attempt 成功添加。
-
Attempt 调用调度器的接口,申请启动 AM 所需的资源,同时状态从 NEW 变为 SUBMITTED。
-
当有 NM 节点向 RM 发送心跳请求时,RM 内部最终会以事件的形式通知到调度器,调度器则选择合适的应用为其分配资源。
-
资源分配过程中,会新建 Container 对象(RMContainerImpl)。
-
然后向 Container 对象发送 start 事件。
-
container 收到 start 事件后,告知 attempt,资源已经完成分配。自身状态从 NEW 切换为 ALLOCATED。
-
attempt 收到事件后,通过接口向调度器获取已分配的资源,然后状态从 SUBMITTED 切换为 SCHEDULED。
-
调度器的接口处理过程中会向 container 发送 acquire 事件。Container 的状态从 ALLOCATED 切换为 ACQUIRED。
-
随后,attempt 向状态存储模块发送请求,要求存储 attempt 的信息。自身状态从 SCHEDULED 切换为 ALLOCATED_SAVING。
-
状态存储完成后,以事件的形式告诉 attempt。
-
attmpt 向 AMLaunch 模块发送启动 AM 的请求。自身状态从 ALLOCATED_SAVING 切换为 ALLOCATED。
-
AMLaunch 通过 RPC 协议向指定的 NM 发送 startContainer 的请求。
-
AMLaunch 告知 Attempt,container 已启动,Attempt 的状态从 ALLOCATED 切换为 LAUNCHED。
-
NM 收到请求后,启动 AM 进程
-
AM 进程启动后向 RM 中的 ApplicationMasterService 进行注册。
-
ApplicationMasterService 收到注册请求后,告知对应的 Attempt。Attempt 的状态从 LAUNCHED 切为 RUNNING。
-
Attempt 收到 AM 进程并成功注册的消息后,进而告诉 RMAppImpl。App 的状态从 ACCEPTED 转换为 RUNNING。
注:NM 通过心跳告知 RM 节点上 container 的运行状态,RM 内部处理该消息最终通知对应 container,container 状态从 ACQUIRED 转为 RUNNING。
5.【NM 中的流程】
与 RM 不同,在 NM 中并不感知 container 是具体任务还是 AM,因此内部只有 application 和 container,任务运行流程也就围绕这两个类实例的创建,状态机的变化及周边配套模块完成。
在 NM 中,任务运行的流程如下图所示:

-
NM 内部的 containerManagerImpl 处理启动 container 的请求,先新建一个 AppImpl(App 的具体实现,后面简称为 App)的实例对象,然后向该 APP 发送一个初始化事件,然后新建一个 ContainerImpl(Container 的具体实现,后面简称为 Container)对象。
-
App 向日志聚合模块发送请求,告知 App 启动,要求进行相应的初始化动作,同时状态从 NEW 变为 INITING。
-
日志聚合模块完成 app 的初始化动作后,通过事件告知 App。
-
APP 收到事件后,接着向资源本地化服务模块发送请求,要求完成 App 所依赖资源的下载。
-
资源本地化服务模块完成对应的资源下载后,通过事件告知 App。
-
App 收到事件后,向 Container 发送初始化事件,同时状态从 INITING 变为 RUNNING。
-
Container 同样向资源本地化服务模块发送请求,要求完成 Container 所依赖资源的下载,此时状态从 NEW 变为 LOCALIZING。
-
资源本地化服务模块每成功完成一个资源的下载,都会以事件的形式通知 Container。
-
当 Container 感知所有依赖资源都完成本地化后,通过事件告知资源本地化服务模块进行清理动作(这里的清理动作不是清理资源文件,而是结束相应的资源下载进程)。
-
Container 继续向 Container 启动服务模块发送请求,要求启动具体的 Container 进程,随后状态从 LOCALIZING 变为 LOCALIZED。
-
Container 启动服务模块根据 Container 上下文,设置环境变量、启动参数生成启动脚本,并创建 Container 的进程,然后通过事件告知 Container。
-
Container 收到进程启动的事件后,状态从 LOCALIZED 变为 RUNNING。
-
当 Container 的进程运行结束后,其对应的创建线程获取其结束码,并通知 Container。(假设这里为运行成功并正常结束)
-
Container 收到事件后,向资源本地化服务模块发送请求,要求清理资源文件,然后状态从 RUNNING 切换为 EXITED_WITH_SUCCESS。
-
资源本地化服务模块对 Container 的资源文件进行清理后,告知 Container。
-
Container 通知日志聚合模块运行结束,让其准备进行日志聚合。
-
随后也通知 App,Container 运行结束,最后状态切换为 DONE。
-
App 感知 Container 运行结束后,只是在内存中进行相关的记录,NM 通过心跳向 RM 上报所有 container 的运行状况。RM 会再通过心跳告知 AM,当 AM 得知所有任务均结束时,向 RM 进行注销,并自身退出。RM 得知 AM 结束后,进行相应的处理动作,最终告知该应用对应任务 containerd 的 NM,应用结束。NM 内部最终告知 App。
-
App 收到消息后,通知资源本地化服务模块进行资源的清理。然后自身状态从 RUNNING 切换为 APPLICATION_RESOURCE_CLEANUP。
-
资源化本地服务模块完成资源清理后事件通知 App。
-
App 通知日志聚合模块进行日志聚合,最后状态变为 FINISHED。
6.【总结】
本文简单总结了 yarn 任务提交运行的流程,可以看到整体流程还是比较长的,涉及的模块也非常多,那么任意一个环节出现异常,都可能导致任务的运行失败。yarn 为此也做了很多异常处理,也有部分处理逻辑交给 AM 来决定,下篇文章我们就来聊聊 yarn 任务运行过程中的异常处理。
更多推荐

所有评论(0)