YARN架构深度解析:MapReduce On YARN工作原理(史诗级详解)
客户端提交:切片计算、资源上传、向RM申请AM启动:RM分配第一个Container启动AM资源申请:AM根据任务数向RM申请Container任务执行:AM在分配的Container中启动Map/Reduce Task进度监控:Task通过AM向RM汇报进度作业完成:所有Task完成后,AM清理资源并退出YARN的设计哲学高扩展性:RM只负责资源调度,不关心具体作业多框架支持:MapReduce
YARN架构深度解析:MapReduce On YARN工作原理(史诗级详解)
|
🌺The Begin🌺点点关注,收藏不迷路🌺
|
关键词:YARN、ResourceManager、NodeManager、ApplicationMaster、容器管理、资源调度、分布式计算
在Hadoop生态系统中,YARN(Yet Another Resource Negotiator)的出现,标志着Hadoop从单一的MapReduce计算框架,进化成为一个通用的分布式资源管理平台。MapReduce on YARN 更是将计算与资源管理彻底解耦,奠定了现代大数据计算的基础。
今天,我们将深入剖析MapReduce作业在YARN上的完整生命周期,从客户端提交到最终完成,每一步都配有流程图和源码级别的解析。
一、YARN架构总览
1.1 YARN的核心组件
| 组件 | 职责 | 类比 |
|---|---|---|
| ResourceManager (RM) | 集群资源管理器,全局唯一 | 公司CEO |
| NodeManager (NM) | 节点资源管理器,每节点一个 | 部门经理 |
| ApplicationMaster (AM) | 应用管理器,每个应用一个 | 项目经理 |
| Container | 资源容器(CPU/内存) | 项目成员 |
1.2 MapReduce on YARN的演进
传统Hadoop 1.x:
MapReduce <--> HDFS
(计算与资源管理耦合)
Hadoop 2.x+:
MapReduce Spark Flink ... (各种计算框架)
\ | /
\ | /
\ | /
YARN (资源管理层)
|
HDFS (存储层)
YARN的核心价值:将资源管理和作业控制分离,使得Hadoop成为一个多计算框架共存的生态系统。
二、MapReduce on YARN完整工作流程
2.1 宏观流程图
三、各阶段深度解析
3.1 第1阶段:作业提交
关键细节:
- 切片计算:客户端根据输入文件计算Split信息,决定Map任务数量
- 暂存区:默认路径
/tmp/hadoop-yarn/staging/username/.staging/ - 资源上传:所有作业相关的文件都上传到HDFS,供后续节点拉取
// 客户端提交的核心代码(简化版)
public void submitJobInternal() {
// 1. 创建暂存目录
Path jobStagingArea = FileInputFormat.getStagingDir();
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
// 2. 写入切片信息
int maps = writeSplits(submitJobDir);
// 3. 上传配置文件
writeConf(submitJobDir);
// 4. 上传jar包
copyJar(submitJobDir);
// 5. 向RM提交
clientRMProtocol.submitApplication(request);
}
3.2 第2阶段:ApplicationMaster启动
AM启动的关键代码:
// MRAppMaster启动流程(简化版)
public class MRAppMaster {
public static void main(String[] args) {
// 1. 初始化配置
MRAppMaster appMaster = new MRAppMaster();
// 2. 连接到ResourceManager
appMaster.serviceStart();
// 3. 从HDFS拉取作业信息
JobInfo jobInfo = downloadJobInfo();
// 4. 根据切片数计算需要的资源
int numMaps = jobInfo.getMaps();
int numReduces = jobInfo.getReduces();
// 5. 向RM申请资源
appMaster.requestContainers(numMaps, numReduces);
// 6. 启动任务调度器
appMaster.run();
}
}
3.3 第3-4阶段:任务调度与执行
3.3.1 AM与RM的心跳交互
3.3.2 任务启动过程
// Container启动任务的过程(简化版)
public class ContainerLauncher {
public void launchTask(Task task) {
// 1. 构造启动命令
List<String> commands = new ArrayList<>();
commands.add(JavaChild.class.getName());
commands.add("-Dtask.id=" + task.getTaskId());
commands.add("-Djob.id=" + task.getJobId());
// 2. 设置环境变量
Map<String, String> environment = new HashMap<>();
environment.put("CLASSPATH", "$HADOOP_HOME/*");
// 3. 准备本地资源
LocalResource jobJar = getJobJarFromHDFS();
// 4. 提交Container启动请求
ContainerLaunchContext ctx =
ContainerLaunchContext.newInstance(
commands, environment,
Collections.singletonMap("job.jar", jobJar),
null, null, null
);
nmClient.startContainer(container, ctx);
}
}
3.4 第5阶段:Shuffle与Reduce
Shuffle过程中的关键点:
// Reduce Task拉取数据的逻辑
public class ShuffleScheduler {
public void run() {
// 1. 获取所有Map任务的完成状态
Set<TaskAttemptID> pendingMaps = getPendingMaps();
// 2. 为每个Map输出创建拉取线程
for (TaskAttemptID mapId : completedMaps) {
FetcherThread fetcher = new FetcherThread(mapId);
fetcher.start();
}
// 3. 拉取数据的线程池管理
executor = Executors.newFixedThreadPool(
conf.getInt("mapreduce.reduce.shuffle.parallelcopies", 5)
);
// 4. 拉取数据并写入内存/磁盘
while (hasMoreData()) {
Fetcher fetcher = new Fetcher();
executor.submit(fetcher);
}
}
}
3.5 第6阶段:作业完成与清理
作业完成的日志示例:
2024-01-15 10:30:15,234 INFO mapreduce.Job: Job job_1705293000001_0001 completed successfully
2024-01-15 10:30:15,235 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=1,234,567,890
FILE: Number of bytes written=2,345,678,901
HDFS: Number of bytes read=987,654,321
Map-Reduce Framework
Map input records=10,000,000
Map output records=9,500,000
Reduce input records=9,500,000
Reduce output records=950,000
四、容错机制与高可用
4.1 各组件故障处理
| 故障组件 | 影响 | 处理机制 |
|---|---|---|
| Map Task失败 | 部分数据丢失 | AM重新申请Container,重跑该Map(默认4次) |
| Reduce Task失败 | Shuffle数据可能丢失 | AM重新申请Container,重跑该Reduce |
| NodeManager故障 | 该节点所有Container失效 | RM标记NM为dead,AM重新调度任务 |
| ApplicationMaster故障 | 整个作业失败 | RM重新启动新的AM(默认2次) |
| ResourceManager故障 | 整个集群不可用 | 依赖RM HA(ZooKeeper + 主备切换) |
4.2 任务重试机制
// 任务失败重试的配置
public class TaskAttemptListener {
// 最大重试次数
public static final String MAP_MAX_ATTEMPTS =
"mapreduce.map.maxattempts"; // 默认4
public static final String REDUCE_MAX_ATTEMPTS =
"mapreduce.reduce.maxattempts"; // 默认4
// 任务推测执行(加快作业完成)
public static final String MAP_SPECULATIVE =
"mapreduce.map.speculative"; // 默认true
public static final String REDUCE_SPECULATIVE =
"mapreduce.reduce.speculative"; // 默认true
}
五、YARN调度器详解
YARN支持多种调度策略,满足不同场景需求:
5.1 三种调度器对比
5.2 调度器配置示例
<!-- capacity-scheduler.xml -->
<configuration>
<!-- 定义队列 -->
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>production,development</value>
</property>
<!-- 生产队列占70%资源 -->
<property>
<name>yarn.scheduler.capacity.root.production.capacity</name>
<value>70</value>
</property>
<!-- 开发队列占30%资源 -->
<property>
<name>yarn.scheduler.capacity.root.development.capacity</name>
<value>30</value>
</property>
<!-- 每个用户最多占用队列50%资源 -->
<property>
<name>yarn.scheduler.capacity.root.production.user-limit-factor</name>
<value>0.5</value>
</property>
</configuration>
六、性能优化与调优参数
6.1 关键配置参数
| 阶段 | 参数 | 默认值 | 说明 |
|---|---|---|---|
| Container | yarn.nodemanager.resource.memory-mb |
8192 | 节点可用内存 |
yarn.nodemanager.resource.cpu-vcores |
8 | 节点可用CPU核数 | |
| AM | yarn.app.mapreduce.am.resource.mb |
1536 | AM内存 |
yarn.app.mapreduce.am.command-opts |
-Xmx1024m | AM JVM参数 | |
| Map | mapreduce.map.memory.mb |
1024 | Map Container内存 |
mapreduce.map.cpu.vcores |
1 | Map Container CPU | |
| Reduce | mapreduce.reduce.memory.mb |
1024 | Reduce Container内存 |
mapreduce.reduce.cpu.vcores |
1 | Reduce Container CPU | |
| Shuffle | mapreduce.reduce.shuffle.parallelcopies |
5 | 并行拉取线程数 |
mapreduce.shuffle.max.connections |
0 | HTTP连接数(0表示不限) |
6.2 调优实践
# 内存优化示例:大作业配置
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.map.memory.mb=4096"
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.reduce.memory.mb=8192"
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.map.java.opts=-Xmx3072m"
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.reduce.java.opts=-Xmx6144m"
export HADOOP_OPTS="$HADOOP_OPTS -Dyarn.app.mapreduce.am.resource.mb=2048"
# 提高并行度
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.reduce.shuffle.parallelcopies=20"
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.tasktracker.map.tasks.maximum=8"
export HADOOP_OPTS="$HADOOP_OPTS -Dmapreduce.tasktracker.reduce.tasks.maximum=4"
七、面试高频问题
Q1:YARN相比Hadoop 1.x的优势是什么?
答:
- 资源管理与计算框架解耦:支持多种计算框架
- 可扩展性更好:RM和NM职责分离
- 集群利用率更高:动态资源分配
- 容错性更强:AM可以恢复,任务可以重试
Q2:ApplicationMaster的作用是什么?
答:
- 与RM协商资源(申请Container)
- 与NM协同启动任务
- 监控任务执行和进度
- 处理任务失败(重新申请)
- 作业完成后清理资源
Q3:Container和Task的关系是什么?
答:
- Container:资源的抽象(CPU+内存),是物理执行环境
- Task:逻辑计算单元(Map/Reduce)
- 一个Container运行一个Task(或者可以运行多个Task的JVM)
Q4:YARN如何实现资源隔离?
答:
- 内存隔离:使用cgroups或纯进程监控
- CPU隔离:cgroups的CPU限制
- 磁盘隔离:每个Container有独立的临时目录
- 网络隔离:未做严格隔离,依赖操作系统
Q5:YARN调度器的选择原则?
答:
- FIFO:测试环境,单用户场景
- Capacity:多团队,需要资源保证的生产环境
- Fair:多用户,需要快速响应小作业的场景
八、总结
MapReduce on YARN的工作流程可以概括为:
- 客户端提交:切片计算、资源上传、向RM申请
- AM启动:RM分配第一个Container启动AM
- 资源申请:AM根据任务数向RM申请Container
- 任务执行:AM在分配的Container中启动Map/Reduce Task
- 进度监控:Task通过AM向RM汇报进度
- 作业完成:所有Task完成后,AM清理资源并退出
YARN的设计哲学:将资源管理和作业控制分离,用ApplicationMaster作为"作业代理",实现了:
- 高扩展性:RM只负责资源调度,不关心具体作业
- 多框架支持:MapReduce、Spark、Flink都可以运行
- 高容错性:各组件都有完善的故障恢复机制
理解YARN的工作原理,不仅是应对面试的关键,更是进行大数据平台性能调优的基础!
思考题:如果让你设计下一代资源调度系统,YARN的哪些设计值得保留?哪些需要改进?欢迎在评论区分享你的观点!

|
🌺The End🌺点点关注,收藏不迷路🌺
|
更多推荐



所有评论(0)