一文读懂Hadoop技术详细介绍
一、背景随着数据量的增大,存储技术出现以下问题,①硬件问题:复制数据解决(RAID)②分析需要从不同的硬盘读取数据:MapReduce而Hadoop提供了以下功能,①可靠的共享存储(分布式存储)②抽象的分析接口(分布式分析)首先简单解释一下大数据的概念,大数据实际上是不能使用一台机器进行处理的数据,大数据的核心是样本=总体。处理大数据所需要的关键技术主要包括以下要求,①数据分布在多台机器,每个数据
随着数据量的增大,存储技术出现一些问题。
————————
个人技术公众号:解决方案工程师
欢迎同领域的朋友关注、相互交流。
————————
一、背景
随着数据量的增大,存储技术出现以下问题,
①硬件问题:复制数据解决(RAID)
②分析需要从不同的硬盘读取数据:MapReduce
而Hadoop提供了以下功能,
①可靠的共享存储(分布式存储)
②抽象的分析接口(分布式分析)
首先简单解释一下大数据的概念,大数据实际上是不能使用一台机器进行处理的数据,大数据的核心是样本=总体。处理大数据所需要的关键技术主要包括以下要求,
①数据分布在多台机器,每个数据块都复制到多个节点,多个节点同时处理数据。
②计算随数据走,网络IO速度 << 本地磁盘IO速度,大数据系统会尽量地将任务分配到离数据最近的机器上运行(程序运行时,将程序及其依赖包都复制到数据所在的机器运行)。代码向数据迁移,避免大规模数据时,造成大量数据迁移的情况,尽量让一段数据的计算发生在同一台机器上。
③串行IO取代随机IO,传输时间 << 寻道时间,一般数据写入后不再修改。
二、Hadoop简介
Hadoop可运行于一般的商用服务器上,具有高容错、高可靠性、高扩展性等特点,特别适合写一次,读多次的场景。
Hadoop的架构如图所示,

其中,HDFS是分布式文件存储,YARN是分布式资源管理,MapReduce是分布式计算,Others是利用YARN的资源管理功能实现其他的数据处理方式。
三、Hadoop各组成部分
1:Hadoop HDFS
Hadoop HDFS的全称为:Hadoop Distributed File System,分布式文件系统。HDFS的架构如图所示,

其中,Block数据块是,
①基本存储单位,一般大小为64M,配置大的块主要是因为:1)减少搜寻时间,一般硬盘传输速率比寻道时间要快,大的块可以减少寻道时间;2)减少管理块的数据开销,每个块都需要在NameNode上有对应的记录;3)对数据块进行读写,减少建立网络的连接成本。
②一个大文件会被拆分成一个个的块,然后存储于不同的机器。如果一个文件少于Block大小,那么实际占用的空间为其文件的大小。
③基本的读写单位,类似于磁盘的页,每次都是读写一个块。
④每个块都会被复制到多台机器,默认复制3份。
NameNode是,
①存储文件的metadata,运行时所有数据都保存到内存,整个HDFS可存储的文件数受限于NameNode的内存大小。
②一个Block在NameNode中对应一条记录(一般一个block占用150字节),如果是大量的小文件,会消耗大量内存。同时map task的数量是由splits来决定的,所以用MapReduce处理大量的小文件时,就会产生过多的map task,线程管理开销将会增加作业时间。处理大量小文件的速度远远小于处理同等大小的大文件的速度。因此Hadoop建议存储大文件。
③数据会定时保存到本地磁盘,但不保存block的位置信息,而是由DataNode注册时上报和运行时维护(NameNode中与DataNode相关的信息并不保存到NameNode的文件系统中,而是NameNode每次重启后,动态重建)。
④NameNode失效则整个HDFS都失效了,所以要保证NameNode的可用性。
Secondary NameNode是,定时与NameNode进行同步(定期合并文件系统镜像和编辑日志,然后把合并后的传给NameNode,替换其镜像,并清空编辑日志,类似于CheckPoint机制),但NameNode失效后仍需要手工将其设置成主机。
DataNode是,
①保存具体的block数据。
②负责数据的读写操作和复制操作。
③DataNode启动时会向NameNode报告当前存储的数据块信息,后续也会定时报告修改信息。
④DataNode之间会进行通信,复制数据块,保证数据的冗余性。
1.1:Hadoop 写文件
HDFS 写文件的方式如图所示:

其中,
①客户端将文件写入本地磁盘的HDFS Client文件中。
②当临时文件大小达到一个block大小时,HDFS client通知NameNode,申请写入文件。
③NameNode在HDFS的文件系统中创建一个文件,并把该block id和要写入的DataNode的列表返回给客户端。
④客户端收到这些信息后,将临时文件写入DataNodes。
·4.1 客户端将文件内容写入第一个DataNode(一般以4kb为单位进行传输)。
·4.2 第一个DataNode接收后,将数据写入本地磁盘,同时也传输给第二个DataNode。
·4.3 依此类推到最后一个DataNode,数据在DataNode之间是通过pipeline的方式进行复制的。
·4.4 后面的DataNode接收完数据后,都会发送一个确认给前一个DataNode,最终第一个DataNode返回确认给客户端。
·4.5 当客户端接收到整个block的确认后,会向NameNode发送一个最终的确认信息。
·4.6 如果写入某个DataNode失败,数据会继续写入其他的DataNode。然后NameNode会找另外一个好的DataNode继续复制,以保证冗余性。
·4.7 每个block都会有一个校验码,并存放到独立的文件中,以便读的时候来验证其完整性。
⑤文件写完后(客户端关闭),NameNode提交文件(这时文件才可见,如果提交前,NameNode垮掉,那文件也就丢失了。fsync:只保证数据的信息写到NameNode上,但并不保证数据已经被写到DataNode中)。
1.2:Hadoop 读文件
HDFS 读文件的方式如图所示:

①客户端向NameNode发送读取请求。
②NameNode返回文件的所有block和这些block所在的DataNodes(包括复制节点)。
③客户端直接从DataNode中读取数据,如果该DataNode读取失败(DataNode失效或校验码不对),则从复制节点中读取(如果读取的数据就在本机,则直接读取,否则通过网络读取)。
1.3:Hadoop 可靠性
HDFS 的可靠性主要有以下几点:
①冗余副本策略,可以在 hdfs-site.xml 中设置复制因子指定副本数量,所有数据块都可副本,DataNode 启动时,遍历本地文件系统,产生一份 HDFS 数据块和本地文件的对应关系列表 (blockreport) 汇报给 Namenode。
②机架策略,HDFS 的“机架感知",通过节点之间发送一个数据包,来感应它们是否在同一个机架,一般在本机架放一个副本,在其他机架再存放一个副本,这样可以防止机架失效时丢失数据,也可以提高带宽利用率。
③心跳机制,NameNode 周期性从 DataNode 接受心跳信息和块报告,NameNode 根据块报告验证元数据,没有按时发送心跳的 DataNode 会被标记为宕机,不会再给他任何 I/O 请求,如果 DataNode 失效造成副本数量下降,并且低于预先设定的值,NameNode 会检测出这些数据库,并在合适的时机重新复制。引发重新复制的原因还包括数据副本本身损坏、磁盘错误、复制因子被增大等。
④安全模式,NameNode 启动时会先经过一个 "安全模式" 阶段,安全模式阶段不会产生数据写,在此阶段 NameNode 收集各个DataNode 的报告,当数据块达到最小副本数以上时,会被认为是"安全"的,在一定比例可设置)的数据块被确定为"安全" 后,再过若干时间,安全模式结束。当检测到副本数不足的数据块是,该块会被复制,直到达到最小副本数。
⑤效验和,在文件创立时,每个数据块都产生效验和,效验和会作为单独一个隐藏文件保存在命名空间下,客户端获取数据时可以检查效验和是否相同,从而发现数据块是否损坏,如果正在读取的数据块损坏,则可以继续读取其他副本。
⑥回收站,删除文件时,其实是放入回收站 /trash,回收站里的文件是可以快速恢复的,可以设置一个时间值,当回收站里文件的存放时间超过了这个值,就被彻底删除,并且释放占用的数据块。
⑦元数据保护,映像文件和事物日志是 NameNode 的核心数据,可以配置为拥有多个副本,副本会降低 NameNode 的处理速度,但增加安全性。NameNode 依然是单点,如果发生故障要手工切换。
1.4:Hadoop 命令工具
常用HDFS命令,
fsck:检查文件的完整性。
start-balancer.sh:重新平衡HDFS。
hdfs dfs -copyFromLocal:从本地磁盘复制文件到HDFS。
2:Hadoop YARN
首先说一下旧的MapReduce架构,如图所示,

其中,JobTracker 负责资源管理,跟踪资源消耗和可用性,作业生命周期管理(调度作业任务,跟踪进度,为任务提供容错)。TaskTracker 加载或关闭任务,定时报告任务状态。
此架构会有以下问题,
①JobTracker 是 MapReduce 的集中处理点,存在单点故障。
②JobTracker 完成了太多的任务,造成了过多的资源消耗,当 MapReduce job 非常多的时候,会造成很大的内存开销。这也是业界普遍总结出老 Hadoop 的 MapReduce 只能支持 4000 节点主机的上限。
③在 TaskTracker 端,以 map/reduce task 的数目作为资源的表示过于简单,没有考虑到 cpu/ 内存的占用情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM。
④在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot , 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费,也就集群资源利用的问题。
总的来说就是单点问题和资源利用率问题难以满足大数据处理的需求。
YARN的构架如图所示,


YARN就是将 JobTracker 的职责进行拆分,将资源管理和任务调度监控拆分成独立 #x7ACB 的进程,即一个全局的资源管理和一个每个作业的管理(ApplicationMaster) ResourceManager 和 NodeManager 提供了计算资源的分配和管理,而 ApplicationMaster 则完成应用程序的运行。
①ResourceManager:全局资源管理和任务调度
②NodeManager:单个节点的资源管理和监控
③ApplicationMaster:单个作业的资源管理和任务监控
④Container:资源申请的单位和任务运行的容器
YARN的基本流程如图所示,


①Job submission
从ResourceManager 中获取一个Application ID 检查作业输出配置,计算输入分片拷贝作业资源(job jar、配置文件、分片信息)到 HDFS,以便后面任务的执行。
②Job initialization
ResourceManager 将作业递交给 Scheduler(有很多调度算法,一般是根据优先级)Scheduler 为作业分配一个 Container,ResourceManager 就加载一个 application master process 并交给 NodeManager。
管理 ApplicationMaster 主要是创建一系列的监控进程来跟踪作业的进度,同时获取输入分片,为每一个分片创建一个 Map task 和相应的 reduce task Application Master 还决定如何运行作业,如果作业很小(可配置),则直接在同一个JVM 下运行。
③Task assignment
ApplicationMaster 向 Resource Manager 申请资源(一个个的Container,指定任务分配的资源要求)一般是根据data locality来分配资源。
④Task execution
ApplicationMaster 根据 ResourceManager 的分配情况,在对应的 NodeManager 中启动 Container 从HDFSN#x4E2D; 读取任务所需资源(job jar,配置文件等),然后执行该任务。
⑤Progress and status update
定时将任务的进度和状态报告给 ApplicationMaster Client 定时向 ApplicationMaster 获取整个任务的进度和状态。
⑥Job completion
Client定时检查整个作业是否完成作业完成后,会清空临时文件、目录等。
2.1:Hadoop ResourceManager
YARN ResourceManager 负责全局的资源管理和任务调度,把整个集群当成计算资源池,只关注分配,不管应用,且不负责容错。
·资源管理
①以前资源是每个节点分成一个个的Map slot 和 Reduce slot,现在是一个个 Container,每个 Container 可以根据需要运行 ApplicationMaster、Map、Reduce 或者任意的程序。
②以前的资源分配是静态的,目前是动态的,资源利用率更高。
③Container 是资源申请的单位,一个资源申请格式:<resource-name, priority, resource-requirement, number-of-containers>, resource-name:主机名、机架名或*(代表任意机器), resource-requirement:目前只支持CPU和内存。
④用户提交作业到 ResourceManager,然后在某个 NodeManager 上分配一个 Container 来运行 ApplicationMaster,ApplicationMaster 再根据自身程序需要向 ResourceManager 申请资源。
⑤YARN 有一套 Container 的生命周期管理机制,而 ApplicationMaster 和其 Container 之间的管理是应用程序自己定义的。
·任务调度
①只关注资源的使用情况,根据需求合理分配资源。
②Scheluer 可以根据申请的需要,在特定的机器上申请特定的资源(ApplicationMaster 负责申请资源时的数据本地化的考虑,ResourceManager 将尽量满足其申请需求,在指定的机器上分配 Container,从而减少数据移动)。
·内部结构

①Client Service: 应用提交、终止、输出信息(应用、队列、集群等的状态信息)。
②Adaminstration Service: 队列、节点、Client权限管理。
③ApplicationMasterService: 注册、终止ApplicationMaster, 获取ApplicationMaster的资源申请或取消的请求,并将其异步地传给Scheduler, 单线程处理。
④ApplicationMaster Liveliness Monitor: 接收ApplicationMaster的心跳消息,如果某个ApplicationMaster在一定时间内没有发送心跳,则被任务失效,其资源将会被回收,然后ResourceManager会重新分配一个ApplicationMaster运行该应用(默认尝试2次)。
⑤Resource Tracker Service: 注册节点, 接收各注册节点的心跳消息。
⑥NodeManagers Liveliness Monitor: 监控每个节点的心跳消息,如果长时间没有收到心跳消息,则认为该节点无效, 同时所有在该节点上的Container都标记成无效,也不会调度任务到该节点运行。
⑦ApplicationManager: 管理应用程序,记录和管理已完成的应用。
⑧ApplicationMaster Launcher: 一个应用提交后,负责与NodeManager交互,分配Container并加载ApplicationMaster,也负责终止或销毁。
⑨YarnScheduler: 资源调度分配, 有FIFO(with Priority),Fair,Capacity方式。
⑩ContainerAllocationExpirer: 管理已分配但没有启用的Container,超过一定时间则将其回收。
2.2:Hadoop NodeManager
YARN NodeManager
Node 节点下的 Container 管理,主要包括以下,
①启动时向 ResourceManager 注册并定时发送心跳消息,等待 ResourceManager 的指令。
②监控 Container 的运行,维护 Container 的生命周期,监控 Container 的资源使用情况。
③启动或停止 Container,管理任务运行时的依赖包(根据 ApplicationMaster 的需要,启动 Container 之前将需要的程序及其依赖包、配置文件等拷贝到本地)。
其内部结构如图所示,

①NodeStatusUpdater: 启动向ResourceManager注册,报告该节点的可用资源情况,通信的端口和后续状态的维护。
②ContainerManager: 接收RPC请求(启动、停止),资源本地化(下载应用需要的资源到本地,根据需要共享这些资源)。
③PUBLIC: /filecache。
④PRIVATE: /usercache//filecache。
⑤APPLICATION: /usercache//appcache//(在程序完成后会被删除)。
⑥ContainersLauncher: 加载或终止Container。
⑦ContainerMonitor: 监控Container的运行和资源使用情况。
⑧ContainerExecutor: 和底层操作系统交互,加载要运行的程序。
2.3:Hadoop ApplicationMaster
YARN ApplicationMaster,单个作业的资源管理和任务监控,具体功能描述如下,
①计算应用的资源需求,资源可以是静态或动态计算的,静态的一般是 Client 申请时就指定了,动态则需要 ApplicationMaster 根据应用的运行状态来决定。
②根据数据来申请对应位置的资源(Data Locality)。
③向 ResourceManager 申请资源,与 NodeManager 交互进行程序的运行和监控,监控申请的资源的使用情况,监控作业进度。
④跟踪任务状态和进度,定时向 ResourceManager 发送心跳消息,报告资源的使用情况和应用的进度信息。
⑤负责本作业内的任务的容错。
ApplicationMaster 可以是用任何语言编写的程序,它和 ResourceManager 和 NodeManager 之间是通过 ProtocolBuf 交互,以前是一个全局的 JobTracker 负责的,现在每个作业都一个,可伸缩性更强,至少不会因为作业太多,造成 JobTracker 瓶颈。同时将作业的逻辑放到一个独立的 ApplicationMaster 中,使得灵活性更加高,每个作业都可以有自己的处理方式,不用绑定到 MapReduce 的处理模式上。
ApplicationMaster计算资源需求的方式,一般的 MapReduce 是根据 block 数量来定 Map 和 Reduce 的计算数量,然后一般的 Map 或 Reduce 就占用一个 Container。
如何发现数据的本地化,数据本地化是通过 HDFS 的 block 分片信息获取的。
2.4:Hadoop Container
YARN Container 包含以下,
①基本的资源单位(CPU、内存等)。
②Container 可以加载任意程序,而且不限于 Java。
③一个 Node 可以包含多个 Container,也可以是一个大的 Container。
④ApplicationMaster 可以根据需要,动态申请和释放 Container。
2.5:Hadoop Failover
YARN Failover ,其失败类型主要包括程序问题、进程崩溃、硬件问题。
失败处理的方法主要如下,
①当任务失败时,
·运行时异常或者 JVM 退出都会报告给 ApplicationMaster。
·通过心跳来检查挂住的任务(timeout),会检查多次(可配置)才判断该任务是否失效。
·一个作业的任务失败率超 过配置,则认为该作业失败。
·失败的任务或作业都会有ApplicationMaster 重新运行。
②当 ApplicationMaster 失败时,
·ApplicationMaster 定时发送心跳信号到 ResourceManager,通常一旦 ApplicationMaster 失败,则认为失败,但也可以通过配置多次后才失败。
·一旦 ApplicationMaster 失败,ResourceManager 会启动一个新的 ApplicationMaster。
·新的 ApplicationMaster 负责恢复之前错误的 ApplicationMaster 的状态 (yarn.app.mapreduce.am.job.recovery.enable=true),这一步是通过将应用运行状态保存到共享的存储上来实现的,ResourceManager 不会负责任务状态的保存和恢复。
·Client 也会定时向 ApplicationMaster 查询进度和状态,一旦发现其失败,则向 ResouceManager 询问新的 ApplicationMaster。
③当 NodeManager 失败时,
·NodeManager 定时发送心跳到 ResourceManager,如果超过一段时间没有收到心跳消息,ResourceManager 就会将其移除。
·任何运行在该 NodeManager 上的任务和 ApplicationMaster 都会在其他 NodeManager 上进行恢复。
·如果某个 NodeManager 失败的次数太多,ApplicationMaster 会将其加入黑名单(ResourceManager 没有),任务调度时不在其上运行任务。
④当 ResourceManager 失败时,
·通过 checkpoint 机制,定时将其状态保存到磁盘,然后失败的时候,重新运行。
·通过 zookeeper 同步状态和实现透明的 HA。
可以看出,一般的错误处理都是由当前模块的父模块进行监控(心跳)和恢复。而最顶端的模块则通过定时保存、同步状态和 zookeeper 来ֹ实现 HA。
3:Hadoop MapReduce
Hadoop MapReduce 是一种分布式的计算方式指定一个Map(映#x5C04;)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
Hadoop MapReduce 的模式如图所示,

map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3)。
Map输出格式和Reduce输入格式一定是相同的。
Hadoop MapReduce 的基本流程是 MapReduce 主要是先读取文件数据,然后进行Map处理,接着Reduce处理,最后把处理结果写到文件中,如图所示,

详细流程如图所示,

多节点下的流程如图所示,

主要过程如图所示,

Map Side:
①Record reader
记录阅读器会翻译由输入格式生成的记录,记录阅读器用于将数据解析给记录,并不分析记录自身。记录读取器的目的是将数据解析成记录,但不分析记录本身。它将数据以键值对的形式传输给 mapper。通常键是位置信息,值是构成记录的数据存储块.自定义记录不在本文讨论范围之内。
②Map
在映射器中用户提供的代码称为中间对。对于键值的具体定义是慎重的,因为定义对于分布式任务的完成具有重要意义.键决定了数据分类的依据,而值决定了处理器中的分析信息.本书的设计模式将会展示大量细节来解释特定键值如何选择。
③Shuffle and Sort
ruduce任务以随机和排序步骤开始。此步骤写入输出文件并下载到本地计算机。这些数据采用键进行排序以把等价密钥组合到一起。
④Reduce
reducer采用分组数据作为输入。该功能传递键和此键相关值的迭代器。可以采用多种方式来汇总、过滤或者合并数据。当reduce功能完成,就会发送0个或多个键值对。
⑤输出格式
输出格式会转换最终的键值对并写入文件。默认情况下键和值以tab分割,各记录以换行符分割。因此可以自定义更多输出格式,最终数据会写入HDFS。类似记录读取,自定义输出格式不在本书范围。
3.1:Hadoop 读取数据
MapReduce 读取数据是通过 InputFormat 决定读取的数据的类型,然后拆分成一个个 InputSplit,每个 InputSplit 对应一个 Map 处理,RecordReader 读取 InputSplit 的内容给 Map。
InputFormat 决定读取数据的格式,可以是文件或数据库等。其功能主要包括,
①验证作业输入的正确性,如格式等。
②将输入文件切割成逻辑分片 (InputSplit),一个 InputSplit 将会被分配给一个独立的 Map 任务。
③提供 RecordReader 实现,读取 InputSplit 中的"K-V 对"供 Mapper 使用。
方法主要包括,
①List getSplits(): 获取由输入文件计算出输入分片 (InputSplit),解决数据或文件分割成片问题。
②RecordReader createRecordReader(): 创建 RecordReader,从 InputSplit 中读取数据,解决读取分片中数据问题。
类结构如下图所示,

其中,
①TextInputFormat: 输入文件中的每一行就是一个记录,Key 是这一行的 byte offset,而 value 是这一行的内容。
②KeyValueTextInputFormat: 输入文件中每一行就是一个记录,第一个分隔符字符切分每行。在分隔符字符之前的内容为 Key,在之后的为 Value。分隔符变量通过 key.value.separator.in.input.line 变量设置,默认为(\t)字符。
③NLineInputFormat: 与 TextInputFormat 一样,但每个数据块必须保证有且只有N行,mapred.line.input.format.linespermap 属性,默认为1。
④SequenceFileInputFormat: 一个用来读取字符流数据的 InputFormat,<key,value> 为用户自定义的。字符流数据是 Hadoop 自定义的压缩的二进制数据格式。它用来优化从一个 MapReduce 任务的输出到另一个 MapReduce 任务的输入之间的数据传输过程。
InputSplit 代表一个个逻辑分片,并没有真正存储数据,只是提供了一个如何将数据分片的方法。Split 内有 Location 信息,利于数据局部化,一个 InputSplit 给一个单独的 Map 处理。

RecordReader 将 InputSplit 拆分成一个个 <key,value> 对给 Map 处理,也是实际的文件读取分隔对象 </key,value>。
问题:
①大量小文件如何处理?
CombineFileInputFormat 可以将若干个 Split 打包成一个,目的是避免过多的 Map 任务(因为 Split 的数目决定了 Map 的数目,大量的 Mapper Task 创建销毁开销将是巨大的)。
②怎么计算split的?
通常一个 split 就是一个 block(FileInputFormat 仅仅拆分比 block 大的文件),这样做的好处是使得 Map 可以在存储有当前数据的节点上运行本地的任务,而不需要通过网络进行跨节点的任务调度。
通过 mapred.min.split.size, mapred.max.split.size, block.size 来控制拆分的大小。
如果 mapred.min.split.size 大于 block size,则会将两个 block 合成到一个 split,这样有部分 block 数据需要通过网络读取。
如果 mapred.max.split.size 小于 block size,则会将一个 block 拆成多个 split,增加了 Map 任务数(Map 对 split 进行计算并且上报结果,关闭当前计算打开新的split均需要耗费资源)。
先获取文件在 HDFS 上的路径和 Block 信息,然后根据 splitSize 对文件进行切分( splitSize = computeSplitSize(blockSize, minSize, maxSize) ),默认 splitSize 就等于 blockSize 的默认值(64m)。


③分片间的数据如何处理?
split 是根据文件大小分割的,而一般处理是根据分隔符进行分割的,这样势必存在一条记录横跨两个 split。

解决办法是只要不是第一个split,都会远程读取一条记录。不是第一个split的都忽略到第一条记录。



3.2:Hadoop Mapper
MapReduce Mapper 主要是读取 InputSplit 的每一个 Key,Value 对并进行处理。



3.3:Hadoop Shuffle
MapReduce-Shuffle对Map的结果进行排序并传输到Reduce进行处理,Map的结果并不是直接存放到硬盘,而是利用缓存做一些预排序处理,Map会调用Combiner、压缩、按key进行分区、排序等,尽量减少结果的大小 每个Map完成后都会通知Task,然后Reduce就可以进行处理,如图所示。

Map端,当Map程序开始产生结果的时候,并不是直接写到文件的,而是利用缓存做一些排序方面的预处理操作。每个Map任务都有一个循环内存缓冲区(默认100MB),当缓存的内容达到80%时,后台线程开始将内容写到文件,此时Map任务可以继续输出结果,但如果缓冲区满了,Map任务则需要等待。写文件使用round-robin方式。在写入文件之前,先将数据按照Reduce进行分区。对于每一个分区,都会在内存中根据key进行排序,如果配置了Combiner,则排序后执行Combiner(Combine之后可以减少写入文件和传输的数据)
每次结果达到缓冲区的阀值时,都会创建一个文件,在Map结束时,可能会产生大量的文件。在Map完成前,会将这些文件进行合并和排序。如果文件的数量超过3个,则合并后会再次运行Combiner(1、2个文件就没有必要了)。
如果配置了压缩,则最终写入的文件会先进行压缩,这样可以减少写入和传输的数据。一旦Map完成,则通知任务管理器,此时Reduce就可以开始复制结果数据。
Reduce端,Map的结果文件都存放到运行Map任务的机器的本地硬盘中。如果Map的结果很少,则直接放到内存,否则写入文件中。同时后台线程将这些文件进行合并和排序到一个更大的文件中(如果文件是压缩的,则需要先解压)。当所有的Map结果都被复制和合并后,就会调用Reduce方法。Reduce结果会写入到HDFS中。
调优,一般的原则是给shuffle分配尽可能多的内存,但前提是要保证Map、Reduce任务有足够的内存。对于Map,主要就是避免把文件写入磁盘,例如使用Combiner,增大io.sort.mb的值。对于Reduce,主要是把Map的结果尽可能地保存到内存中,同样也是要避免把中间结果写入磁盘。默认情况下,所有的内存都是分配给Reduce方法的,如果Reduce方法不怎么消耗内存,可以mapred.inmem.merge.threshold设成0,mapred.job.reduce.input.buffer.percent设成1.0。在任务监控中可通过Spilled records counter来监控写入磁盘的数,但这个值是包括map和reduce的。对于IO方面,可以Map的结果可以使用压缩,同时增大buffer size(io.file.buffer.size,默认4kb)。
配置如下表所示。


3.3:Hadoop 编程
MapReduce 编程,处理如下步骤。
①select:直接分析输入数据,取出需要的字段数据即可。
②where: 也是对输入数据处理的过程中进行处理,判断是否需要该数据。
③aggregation:min, max, sum。
④group by: 通过Reducer实现。
⑤sort。
⑥join: map join, reduce join。
Third-Party Libraries 如下。

更多推荐



所有评论(0)