🌺The Begin🌺点点关注,收藏不迷路🌺

关键词:YARN、ResourceManager、NodeManager、ApplicationMaster、容器管理、资源调度、分布式计算

在Hadoop生态系统中,YARN(Yet Another Resource Negotiator)的出现,标志着Hadoop从单一的MapReduce计算框架,进化成为一个通用的分布式资源管理平台。MapReduce on YARN 更是将计算与资源管理彻底解耦,奠定了现代大数据计算的基础。

今天,我们将深入剖析MapReduce作业在YARN上的完整生命周期,从客户端提交到最终完成,每一步都配有流程图和源码级别的解析。


一、YARN架构总览

1.1 YARN的核心组件

YARN集群

应用运行

NodeManager集群

ResourceManager
资源管理器

NodeManager 1

NodeManager 2

NodeManager 3

ApplicationMaster
MRAppMaster

Container
Map Task

Container
Map Task

Container
Reduce Task

客户端

组件 职责 类比
ResourceManager (RM) 集群资源管理器,全局唯一 公司CEO
NodeManager (NM) 节点资源管理器,每节点一个 部门经理
ApplicationMaster (AM) 应用管理器,每个应用一个 项目经理
Container 资源容器(CPU/内存) 项目成员

1.2 MapReduce on YARN的演进

传统Hadoop 1.x:
MapReduce <--> HDFS
(计算与资源管理耦合)

Hadoop 2.x+:
MapReduce   Spark   Flink   ... (各种计算框架)
     \        |       /
      \       |      /
       \      |     /
        YARN (资源管理层)
              |
            HDFS (存储层)

YARN的核心价值:将资源管理作业控制分离,使得Hadoop成为一个多计算框架共存的生态系统。


二、MapReduce on YARN完整工作流程

2.1 宏观流程图

第5阶段: Shuffle与Reduce

第4阶段: 任务分配与执行

第2阶段: 资源申请与AM启动

第1阶段: 作业提交

1.1 提交Job

1.2 检查与初始化

1.3 上传资源
jar/配置/分片信息

2.1 请求Container

2.2 启动

2.3 注册

3.1 从HDFS获取

3.3 构造资源请求

4.1 心跳请求资源

4.2 分配Container

4.3 启动Container

4.4 执行

4.4 执行

4.5 汇报进度

4.5 汇报进度

5.1 Map输出

5.1 Map输出

5.2 启动Reduce

5.3 拉取数据

5.4 最终输出

6.1 完成任务

6.2 注销

6.3 清理临时文件

6.4 释放Container

第6阶段: 作业完成

第3阶段: 作业初始化

客户端

ResourceManager

共享存储 HDFS

NodeManager

ApplicationMaster
MRAppMaster

Map Task

Map Task

临时文件

Reduce Task

HDFS


三、各阶段深度解析

3.1 第1阶段:作业提交

暂存区 HDFS共享存储 ResourceManager 客户端 暂存区 HDFS共享存储 ResourceManager 客户端 job.jar (作业jar包) job.split (切片元数据) job.xml (配置信息) 1. 计算Split(切片信息) 2. 调用JobSubmissionProtocol.submitJob() 3. 生成Application ID 4. 返回Job ID和提交路径 5. 在暂存区创建目录 (/tmp/hadoop-yarn/staging/...) 6. 上传资源文件 7. 调用RMClientProtocol.submitApplication() 8. 将请求加入调度队列

关键细节

  • 切片计算:客户端根据输入文件计算Split信息,决定Map任务数量
  • 暂存区:默认路径/tmp/hadoop-yarn/staging/username/.staging/
  • 资源上传:所有作业相关的文件都上传到HDFS,供后续节点拉取
// 客户端提交的核心代码(简化版)
public void submitJobInternal() {
    // 1. 创建暂存目录
    Path jobStagingArea = FileInputFormat.getStagingDir();
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    
    // 2. 写入切片信息
    int maps = writeSplits(submitJobDir);
    
    // 3. 上传配置文件
    writeConf(submitJobDir);
    
    // 4. 上传jar包
    copyJar(submitJobDir);
    
    // 5. 向RM提交
    clientRMProtocol.submitApplication(request);
}

3.2 第2阶段:ApplicationMaster启动

ApplicationMaster端

NodeManager端

ResourceManager端

收到提交请求

分配第一个Container

收到启动命令

加载环境

执行命令

启动

启动后

拉取资源

初始化

ResourceManager

调度器

NodeManager

Container

环境变量/依赖

容器启动脚本

MRAppMaster

向RM注册

从HDFS获取
job.split/job.xml/job.jar

初始化作业

AM启动的关键代码

// MRAppMaster启动流程(简化版)
public class MRAppMaster {
    public static void main(String[] args) {
        // 1. 初始化配置
        MRAppMaster appMaster = new MRAppMaster();
        
        // 2. 连接到ResourceManager
        appMaster.serviceStart();
        
        // 3. 从HDFS拉取作业信息
        JobInfo jobInfo = downloadJobInfo();
        
        // 4. 根据切片数计算需要的资源
        int numMaps = jobInfo.getMaps();
        int numReduces = jobInfo.getReduces();
        
        // 5. 向RM申请资源
        appMaster.requestContainers(numMaps, numReduces);
        
        // 6. 启动任务调度器
        appMaster.run();
    }
}

3.3 第3-4阶段:任务调度与执行

3.3.1 AM与RM的心跳交互
Map/Reduce Task NodeManager ResourceManager ApplicationMaster Map/Reduce Task NodeManager ResourceManager ApplicationMaster 包含: 已请求的Container 已完成的Container 待释放的资源 alt [有可用资源] [资源不足] loop [心跳周期(默认1秒)] 1. Heartbeat请求 2. 资源调度决策 3. 分配新Container 4. 启动Container 5. 执行任务 3. 等待 4. 更新任务进度
3.3.2 任务启动过程
// Container启动任务的过程(简化版)
public class ContainerLauncher {
    public void launchTask(Task task) {
        // 1. 构造启动命令
        List<String> commands = new ArrayList<>();
        commands.add(JavaChild.class.getName());
        commands.add("-Dtask.id=" + task.getTaskId());
        commands.add("-Djob.id=" + task.getJobId());
        
        // 2. 设置环境变量
        Map<String, String> environment = new HashMap<>();
        environment.put("CLASSPATH", "$HADOOP_HOME/*");
        
        // 3. 准备本地资源
        LocalResource jobJar = getJobJarFromHDFS();
        
        // 4. 提交Container启动请求
        ContainerLaunchContext ctx = 
            ContainerLaunchContext.newInstance(
                commands, environment, 
                Collections.singletonMap("job.jar", jobJar),
                null, null, null
            );
        
        nmClient.startContainer(container, ctx);
    }
}

3.4 第5阶段:Shuffle与Reduce

Reduce阶段

Shuffle阶段

Map阶段

输出

输出

输出

HTTP拉取

HTTP拉取

HTTP拉取

归并排序

分组

reduce函数

写入

Map Task 1

溢写文件

Map Task 2

溢写文件

Map Task 3

溢写文件

Reduce Task

归并后的数据

按Key分组

最终输出

HDFS

Shuffle过程中的关键点

// Reduce Task拉取数据的逻辑
public class ShuffleScheduler {
    public void run() {
        // 1. 获取所有Map任务的完成状态
        Set<TaskAttemptID> pendingMaps = getPendingMaps();
        
        // 2. 为每个Map输出创建拉取线程
        for (TaskAttemptID mapId : completedMaps) {
            FetcherThread fetcher = new FetcherThread(mapId);
            fetcher.start();
        }
        
        // 3. 拉取数据的线程池管理
        executor = Executors.newFixedThreadPool(
            conf.getInt("mapreduce.reduce.shuffle.parallelcopies", 5)
        );
        
        // 4. 拉取数据并写入内存/磁盘
        while (hasMoreData()) {
            Fetcher fetcher = new Fetcher();
            executor.submit(fetcher);
        }
    }
}

3.5 第6阶段:作业完成与清理

清理流程

作业完成

1. 向RM报告完成
2. 清理临时文件
3. 释放所有Container
4. 停止自身
5. 移除作业记录
6. 回收资源

所有任务完成

ApplicationMaster

ResourceManager

HDFS
临时目录

NodeManager

AM进程退出

RM内存清理

NM资源释放

作业完成的日志示例

2024-01-15 10:30:15,234 INFO mapreduce.Job: Job job_1705293000001_0001 completed successfully
2024-01-15 10:30:15,235 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=1,234,567,890
        FILE: Number of bytes written=2,345,678,901
        HDFS: Number of bytes read=987,654,321
    Map-Reduce Framework
        Map input records=10,000,000
        Map output records=9,500,000
        Reduce input records=9,500,000
        Reduce output records=950,000

四、容错机制与高可用

4.1 各组件故障处理

故障组件 影响 处理机制
Map Task失败 部分数据丢失 AM重新申请Container,重跑该Map(默认4次)
Reduce Task失败 Shuffle数据可能丢失 AM重新申请Container,重跑该Reduce
NodeManager故障 该节点所有Container失效 RM标记NM为dead,AM重新调度任务
ApplicationMaster故障 整个作业失败 RM重新启动新的AM(默认2次)
ResourceManager故障 整个集群不可用 依赖RM HA(ZooKeeper + 主备切换)

4.2 任务重试机制

// 任务失败重试的配置
public class TaskAttemptListener {
    // 最大重试次数
    public static final String MAP_MAX_ATTEMPTS = 
        "mapreduce.map.maxattempts";  // 默认4
    
    public static final String REDUCE_MAX_ATTEMPTS = 
        "mapreduce.reduce.maxattempts";  // 默认4
    
    // 任务推测执行(加快作业完成)
    public static final String MAP_SPECULATIVE = 
        "mapreduce.map.speculative";  // 默认true
    
    public static final String REDUCE_SPECULATIVE = 
        "mapreduce.reduce.speculative";  // 默认true
}

五、YARN调度器详解

YARN支持多种调度策略,满足不同场景需求:

5.1 三种调度器对比

Fair Scheduler

时间点1

时间点2

时间点3

Capacity Scheduler

队列C 20%

JobC1

JobC2

队列B 30%

JobB1

队列A 50%

JobA1

JobA2

FIFO Scheduler

队列

Job1 大作业

Job2 小作业

Job3 中等作业

Fair: 公平调度
动态平衡,小作业快速响应

5.2 调度器配置示例

<!-- capacity-scheduler.xml -->
<configuration>
    <!-- 定义队列 -->
    <property>
        <name>yarn.scheduler.capacity.root.queues</name>
        <value>production,development</value>
    </property>
    
    <!-- 生产队列占70%资源 -->
    <property>
        <name>yarn.scheduler.capacity.root.production.capacity</name>
        <value>70</value>
    </property>
    
    <!-- 开发队列占30%资源 -->
    <property>
        <name>yarn.scheduler.capacity.root.development.capacity</name>
        <value>30</value>
    </property>
    
    <!-- 每个用户最多占用队列50%资源 -->
    <property>
        <name>yarn.scheduler.capacity.root.production.user-limit-factor</name>
        <value>0.5</value>
    </property>
</configuration>

六、性能优化与调优参数

6.1 关键配置参数

阶段 参数 默认值 说明
Container yarn.nodemanager.resource.memory-mb 8192 节点可用内存
yarn.nodemanager.resource.cpu-vcores 8 节点可用CPU核数
AM yarn.app.mapreduce.am.resource.mb 1536 AM内存
yarn.app.mapreduce.am.command-opts -Xmx1024m AM JVM参数
Map mapreduce.map.memory.mb 1024 Map Container内存
mapreduce.map.cpu.vcores 1 Map Container CPU
Reduce mapreduce.reduce.memory.mb 1024 Reduce Container内存
mapreduce.reduce.cpu.vcores 1 Reduce Container CPU
Shuffle mapreduce.reduce.shuffle.parallelcopies 5 并行拉取线程数
mapreduce.shuffle.max.connections 0 HTTP连接数(0表示不限)

6.2 调优实践

# 内存优化示例:大作业配置
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.map.memory.mb=4096"
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.reduce.memory.mb=8192"
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.map.java.opts=-Xmx3072m"
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.reduce.java.opts=-Xmx6144m"
export HADOOP_OPTS="$HADOOP_OPTS -Dyarn.app.mapreduce.am.resource.mb=2048"

# 提高并行度
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.reduce.shuffle.parallelcopies=20"
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.tasktracker.map.tasks.maximum=8"
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.tasktracker.reduce.tasks.maximum=4"

七、面试高频问题

Q1:YARN相比Hadoop 1.x的优势是什么?

  1. 资源管理与计算框架解耦:支持多种计算框架
  2. 可扩展性更好:RM和NM职责分离
  3. 集群利用率更高:动态资源分配
  4. 容错性更强:AM可以恢复,任务可以重试

Q2:ApplicationMaster的作用是什么?

  1. 与RM协商资源(申请Container)
  2. 与NM协同启动任务
  3. 监控任务执行和进度
  4. 处理任务失败(重新申请)
  5. 作业完成后清理资源

Q3:Container和Task的关系是什么?

  • Container:资源的抽象(CPU+内存),是物理执行环境
  • Task:逻辑计算单元(Map/Reduce)
  • 一个Container运行一个Task(或者可以运行多个Task的JVM)

Q4:YARN如何实现资源隔离?

  1. 内存隔离:使用cgroups或纯进程监控
  2. CPU隔离:cgroups的CPU限制
  3. 磁盘隔离:每个Container有独立的临时目录
  4. 网络隔离:未做严格隔离,依赖操作系统

Q5:YARN调度器的选择原则?

  • FIFO:测试环境,单用户场景
  • Capacity:多团队,需要资源保证的生产环境
  • Fair:多用户,需要快速响应小作业的场景

八、总结

MapReduce on YARN的工作流程可以概括为:

  1. 客户端提交:切片计算、资源上传、向RM申请
  2. AM启动:RM分配第一个Container启动AM
  3. 资源申请:AM根据任务数向RM申请Container
  4. 任务执行:AM在分配的Container中启动Map/Reduce Task
  5. 进度监控:Task通过AM向RM汇报进度
  6. 作业完成:所有Task完成后,AM清理资源并退出

YARN的设计哲学:将资源管理和作业控制分离,用ApplicationMaster作为"作业代理",实现了:

  • 高扩展性:RM只负责资源调度,不关心具体作业
  • 多框架支持:MapReduce、Spark、Flink都可以运行
  • 高容错性:各组件都有完善的故障恢复机制

理解YARN的工作原理,不仅是应对面试的关键,更是进行大数据平台性能调优的基础!


思考题:如果让你设计下一代资源调度系统,YARN的哪些设计值得保留?哪些需要改进?欢迎在评论区分享你的观点!

在这里插入图片描述


🌺The End🌺点点关注,收藏不迷路🌺
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐