在这里插入图片描述

1.概述

转载:YARN—— 任务提交启动流程

学习 yarn,就不得不涉及 yarn 的任务运行流程,虽然网上有很多文章进行相关的总结,但总觉得是别人,因此还是按照自己的理解,进行总结分享,也算是对过程的输出交付。

2.【名词概念】

首先来说明下 yarn 中的一些概念,后续流程中都会涉及到。

ResourceManager(RM)

负责整个集群的资源管理和分配,处理客户端和 AM 的请求,为 containr 分配资源(将任务调度到不同的 NM 上运行),同时通过 NM 的心跳获取所有 container 的运行状态,并进行必要的失败重试。

NodeManager(NM)

集群资源(CPU 和内存,还包括 GPU 等)的实际拥有者,接收 RM 和 AM 的请求,启动具体的 container,并通过心跳向 RM 汇报 container 的运行情况。

Application

对应 1.X 版本中的 job,它可以是一个 MapReduce 应用,也可以是一个 spark 应用,或者 flink 应用等。

ApplicationMaster(AM)

每个 Application 都有一个 ApplicationMaster,负责管理具体的某个应用,包括向 RM 申请具体任务所需的资源,向 NM 请求启动具体的任务,同时监控所有任务的运行状况,并进行必要的容错处理。spark 的 driver,flink 的 jobManager 都属于 AM。

Container

Container 是 YARN 中的一个抽象概念,它是任务运行所需资源,环境变量,启动参数等的一个封装和抽象。

一个 Application 中可以分为两类 Container,一类是前面提到的 AM,一类是具体任务的 container,常见的任务 container 有 MR 中的 map 任务、reduce 任务、spark 中的 executor、flink 的 taskManager。

3.【整体流程】

首先通过一张图来看下客户端提交任务到最终运行的整体流程。

在这里插入图片描述

  1. 客户端向 RM 提交应用,本质上是向 RM 请求启动 AM

  2. RM 选择合适的 NM,并向 NM 发送请求,要求启动 AM

  3. NM 收到启动 AM 的请求后,根据所携带的参数,下载 AM 所依赖的资源到本地

  4. 完成依赖资源的本地化后,NM 启动 AM 进程

  5. AM 启动后向 RM 进行注册,并向 RM 申请启动任务 containr 所需的资源

  6. RM 根据 NM 的资源汇报情况,向 AM 回复资源(container)的分配情况,即给请求的任务 container 分配具体的 NM。

  7. AM 根据任务 container 分配的 NM,向对应的 NM 发送请求,要求启动任务 container

  8. NM 收到启动任务 container 的请求后,同样根据请求参数,先完成依赖资源的本地化,然后启动任务 container 进程。

整体流程中有几点需要注意:

  • RM 中为 container 分配 container,是等待 NM 进行心跳汇报后,被动触发进行的。

  • 任务 container 的运行状态,是 NM 通过心跳向 RM 汇报,RM 再通过 AM 的心跳响应告知对应的 AM。

4.【RM 中的流程】

前面概念中提到了 application、container 以及 AM。在 RM 中,分别用 Application,Container,AppAttempt 类来对应这三个概念。整个任务提交运行流程也就围绕这三个类实例的创建,以及各自的状态机变化完成。

当然,还有一块内容未涉及,那就是调度器模块,这里暂不深入,后续再单独整理说明。

来看看任务提交运行在 RM 中的流程:

在这里插入图片描述

  1. 客户端向 RM 申请 Application 的 ID

  2. RM 内部生成 application 的唯一 ID

  3. 通过 rpc 响应将 applicaiton ID 告知客户端

  4. 客户端携带 ID,以及 container 上下文,通过 RPC 向 RM 提交任务。

  5. RM 的 rpc 服务将请求转发给内部的 AppManager 模块。

  6. AppManager 创建一个 App 实例对象(RMAppImpl)。

  7. 随后向该实例对象发送 start 事件。

  8. RMAppImpl 收到事件后,向状态存储服务请求保存 App 状态,状态从 NEW 变为 NEW_SAVING。

  9. 状态存储服务完成 APP 信息的存储后,再以事件的形式告知 RMAppImpl。

  10. RMAppImpl 向调度器发送添加 APP 的事件,状态从 NEW_SAVING 变为 SUBMITTED。

  11. 调度器收到消息后,进行相应的处理动作,然后告知 RMAppImpl 应用被接受。

  12. RMAppImpl 创建 Attempt 实例对象(RMAppAttemptImpl),

  13. 然后,向其发送 start 事件,随后状态从 SUBMITTED 变为 ACCEPTED。

  14. Attempt 创建后,先向 ApplicationMasterService 进行注册,使其在内存中有对应的记录,方便后面真正的 AM 进程进行注册。

  15. 然后,向调度器发送添加 Attempt 事件。

  16. 调度器同样进行一系列的处理,包括权限判断,队列应用计数等,在内存中记录相关信息,最后通知 Attempt 成功添加。

  17. Attempt 调用调度器的接口,申请启动 AM 所需的资源,同时状态从 NEW 变为 SUBMITTED。

  18. 当有 NM 节点向 RM 发送心跳请求时,RM 内部最终会以事件的形式通知到调度器,调度器则选择合适的应用为其分配资源。

  19. 资源分配过程中,会新建 Container 对象(RMContainerImpl)。

  20. 然后向 Container 对象发送 start 事件。

  21. container 收到 start 事件后,告知 attempt,资源已经完成分配。自身状态从 NEW 切换为 ALLOCATED。

  22. attempt 收到事件后,通过接口向调度器获取已分配的资源,然后状态从 SUBMITTED 切换为 SCHEDULED。

  23. 调度器的接口处理过程中会向 container 发送 acquire 事件。Container 的状态从 ALLOCATED 切换为 ACQUIRED。

  24. 随后,attempt 向状态存储模块发送请求,要求存储 attempt 的信息。自身状态从 SCHEDULED 切换为 ALLOCATED_SAVING。

  25. 状态存储完成后,以事件的形式告诉 attempt。

  26. attmpt 向 AMLaunch 模块发送启动 AM 的请求。自身状态从 ALLOCATED_SAVING 切换为 ALLOCATED。

  27. AMLaunch 通过 RPC 协议向指定的 NM 发送 startContainer 的请求。

  28. AMLaunch 告知 Attempt,container 已启动,Attempt 的状态从 ALLOCATED 切换为 LAUNCHED。

  29. NM 收到请求后,启动 AM 进程

  30. AM 进程启动后向 RM 中的 ApplicationMasterService 进行注册。

  31. ApplicationMasterService 收到注册请求后,告知对应的 Attempt。Attempt 的状态从 LAUNCHED 切为 RUNNING。

  32. Attempt 收到 AM 进程并成功注册的消息后,进而告诉 RMAppImpl。App 的状态从 ACCEPTED 转换为 RUNNING。

注:NM 通过心跳告知 RM 节点上 container 的运行状态,RM 内部处理该消息最终通知对应 container,container 状态从 ACQUIRED 转为 RUNNING。

5.【NM 中的流程】

与 RM 不同,在 NM 中并不感知 container 是具体任务还是 AM,因此内部只有 application 和 container,任务运行流程也就围绕这两个类实例的创建,状态机的变化及周边配套模块完成。

在 NM 中,任务运行的流程如下图所示:

在这里插入图片描述

  1. NM 内部的 containerManagerImpl 处理启动 container 的请求,先新建一个 AppImpl(App 的具体实现,后面简称为 App)的实例对象,然后向该 APP 发送一个初始化事件,然后新建一个 ContainerImpl(Container 的具体实现,后面简称为 Container)对象。

  2. App 向日志聚合模块发送请求,告知 App 启动,要求进行相应的初始化动作,同时状态从 NEW 变为 INITING。

  3. 日志聚合模块完成 app 的初始化动作后,通过事件告知 App。

  4. APP 收到事件后,接着向资源本地化服务模块发送请求,要求完成 App 所依赖资源的下载。

  5. 资源本地化服务模块完成对应的资源下载后,通过事件告知 App。

  6. App 收到事件后,向 Container 发送初始化事件,同时状态从 INITING 变为 RUNNING。

  7. Container 同样向资源本地化服务模块发送请求,要求完成 Container 所依赖资源的下载,此时状态从 NEW 变为 LOCALIZING。

  8. 资源本地化服务模块每成功完成一个资源的下载,都会以事件的形式通知 Container。

  9. 当 Container 感知所有依赖资源都完成本地化后,通过事件告知资源本地化服务模块进行清理动作(这里的清理动作不是清理资源文件,而是结束相应的资源下载进程)。

  10. Container 继续向 Container 启动服务模块发送请求,要求启动具体的 Container 进程,随后状态从 LOCALIZING 变为 LOCALIZED。

  11. Container 启动服务模块根据 Container 上下文,设置环境变量、启动参数生成启动脚本,并创建 Container 的进程,然后通过事件告知 Container。

  12. Container 收到进程启动的事件后,状态从 LOCALIZED 变为 RUNNING。

  13. 当 Container 的进程运行结束后,其对应的创建线程获取其结束码,并通知 Container。(假设这里为运行成功并正常结束)

  14. Container 收到事件后,向资源本地化服务模块发送请求,要求清理资源文件,然后状态从 RUNNING 切换为 EXITED_WITH_SUCCESS。

  15. 资源本地化服务模块对 Container 的资源文件进行清理后,告知 Container。

  16. Container 通知日志聚合模块运行结束,让其准备进行日志聚合。

  17. 随后也通知 App,Container 运行结束,最后状态切换为 DONE。

  18. App 感知 Container 运行结束后,只是在内存中进行相关的记录,NM 通过心跳向 RM 上报所有 container 的运行状况。RM 会再通过心跳告知 AM,当 AM 得知所有任务均结束时,向 RM 进行注销,并自身退出。RM 得知 AM 结束后,进行相应的处理动作,最终告知该应用对应任务 containerd 的 NM,应用结束。NM 内部最终告知 App。

  19. App 收到消息后,通知资源本地化服务模块进行资源的清理。然后自身状态从 RUNNING 切换为 APPLICATION_RESOURCE_CLEANUP。

  20. 资源化本地服务模块完成资源清理后事件通知 App。

  21. App 通知日志聚合模块进行日志聚合,最后状态变为 FINISHED。

6.【总结】

本文简单总结了 yarn 任务提交运行的流程,可以看到整体流程还是比较长的,涉及的模块也非常多,那么任意一个环节出现异常,都可能导致任务的运行失败。yarn 为此也做了很多异常处理,也有部分处理逻辑交给 AM 来决定,下篇文章我们就来聊聊 yarn 任务运行过程中的异常处理。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐