问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Hadoop - MapReduce的核心原理、执行流程、运行架构详解

创作时间:
作者:
@小白创作中心

Hadoop - MapReduce的核心原理、执行流程、运行架构详解

引用
1
来源
1.
https://www.hangge.com/blog/cache/detail_3448.html

MapReduce是大数据处理领域的重要技术,它通过分布式计算框架实现了对海量数据的高效处理。本文将从MapReduce的基本概念出发,深入解析其核心原理、执行流程以及在YARN上的架构设计,帮助读者全面理解这一关键技术。

一、基本介绍

1. 什么是 MapReduce?

  • MapReduce是一种分布式计算框架,算是大数据行业的第一代离线数据计算引擎,可以稳定、可靠地并行处理TB、PB级别的海量数据,主要用于搜索领域。
  • MapReduce计算引擎的核心思想是,将计算逻辑抽象成Map和Reduce两个阶段进行处理。

2. MapReduce 的前世今生

(1)MapReduce源于Google在2004年发表的论文Simplified Data Processing on Large Clusters。

(2)MapReduce属于Hadoop项目的核心组件,主要负责海量数据的分布式计算。

  • 在Hadoop 1.x版本中,MapReduce需要负责分布式数据计算和集群资源管理,这导致MapReduce比较臃肿,并且此时在Hadoop集群中只能运行MapReduce任务,无法运行其他类型的任务。
  • 从Hadoop 2.x版本开始,官方将MapReduce的功能进行了拆分,并引入了YARN。此时MapReduce只需要负责分布式数据计算,YARN负责集群资源管理和分配。这样拆分之后,YARN就成了一个公共的集群资源管理平台,在它上面不仅可以运行MapReduce任务,还可以运行其他类型的任务。

提示:由于Hadoop起步比较早,属于大数据的开拓者,引入YARN之后,它变成了一个平台提供者,这样可以更好地发展基于Hadoop的生态圈。后来兴起的Spark和Flink这些计算引擎都可以在YARN上执行,这就更加巩固了Hadoop在大数据生态圈中的地位。

二、MapReduce 的核心思想

1. 分而治之

(1)MapReduce是分布式运行的,由Map和Reduce两个阶段组成。

  • Map阶段是一个独立的程序,可以在多个节点同时运行,每个节点处理一部分数据。
  • Reduce阶段也是一个独立的程序,可以在一个或多个节点同时运行,每个节点处理一部分数据

注意:如果是全局聚合需求,则Reduce阶段只会在一个节点上运行。

(2)通俗来说,Map阶段就是对海量数据进行并行局部汇总,Reduce阶段就是对局部汇总的数据进行全局汇总。

以计算一摞扑克牌中黑桃的个数为例:

  • 第一步:把这摞扑克牌分配给在座的所有玩家。
  • 第二步:让每个玩家检查自己手中的扑克牌有多少张黑桃,然后把这个数字汇报给你。
  • 第三步:你把所有玩家告诉你的数字加起来,得到最终的结果。

2. 移动计算

(1)传统的计算方式是,把需要计算的数据通过网络传输到计算程序所在的节点上。如果需要计算的数据量比较大,则这种方式效率就比较低了,因为需要通过网络传输大量的数据,会受制于磁盘I/O和网络I/O(网络I/O是最消耗时间的)。这种计算方式可以被称为移动数据。

(2)如果把计算程序移动到数据所在的节点,即计算程序和数据在同一个节点上,则可以节省网络I/O。这种方式可以被称为移动计算。

(3)MapReduce在计算海量数据时,使用的就是移动计算思想。假设需要对HDFS上PB级别的数据进行汇总计算,这份数据肯定会有多个Block,多个Block会存储在多个节点中,这时可以把计算程序复制到数据所在的多个节点上并行执行,这样就可以利用数据本地化的特性,节省网络I/O,提高计算效率。

提示:计算程序是很小的,一般也就几十KB或几百KB,通过网络复制计算程序不会消耗多少时间,几乎可以忽略不计。

(4)但是计算程序只能计算当前节点上的数据,无法获取全局的结果,所以还需要有一个汇总程序,这样每个数据节点上计算的临时结果就可以通过汇总程序得到最终的结果。

三、MapReduce 的执行流程

1. 执行原理

(1)下图是MapReduce详细的执行原理,其中左下角是一个File(文件),这个文件表示输入数据源。文件下面是多个Block,说明这个文件被切分成了多个Block。文件上面是一些Split,Split表示File的切片,这里的切片是逻辑切分,不会对Block数据进行真正的切分,默认情况下Split的大小等于Block的大小。

注意:特殊情况下Split的大小会大于Block的大小。默认会先按照Block的大小将文件切分为Split,当(文件的剩余大小/128MB) ≤ 1.1时,会将剩余的内容划分到一个Split中,这主要是为了提高计算效率。

(2)MapReduce任务在执行时,针对每个Split都会产生一个Map Task。下图中一共产生了5个Map Task。

(1)一个1G的文件,会产生多少个map任务?

  • Block块默认是128M,所以1G的文件会产生8个Block块
  • 默认情况下InputSplit的大小和Block块的大小一致,每一个InputSplit会产生一个map任务
  • 所以:1024/128=8个map任务

(2)1000个文件,每个文件100KB,会产生多少个map任务?

  • 一个文件,不管再小,都会占用一个block,所以这1000个小文件会产生1000个Block
  • 那最终会产生1000个InputSplit,也就对应着会产生1000个 map任务

(3)一个140M的文件,会产生多少个map任务?

  • 根据前面的分析,140M的文件会产生2个Block,那是不是对应的就会产生2个InputSplit了?其实不是的。
  • 注意:这个有点特殊,140M/128M=1.09375<1.1
  • 所以,这个文件只会产生一个InputSplit,也最终也就只会产生1个map任务。

(3)Map Task计算的中间结果会通过Shuffle远程复制到Reduce Task中进行汇总计算。

(4)图中一共有3个Reduce Task,每个Reduce Task负责处理一部分数据。3个Reduce Task最终会在结果目录下产生3个文件:part-r-00000、part-r-00001和part-r-00002。

2. Map 阶段详解

(1)以统计hello.txt文件中每个单词出现的总次数为例。假设文件中有两行内容,单词之间使用空格分隔,文件内容如下:

hello you
hello me

(2)第一步:MapReduce框架会把输入文件划分为很多Split。默认情况下,每个Block对应一个Split。通过RecordReader类,把每个Split对应的数据解析成一个个的<k1,v1>。默认情况下,每一行数据都会被解析成一个<k1,v1>。k1表示每一行的起始偏移量,v1表示那一行内容。

提示:<k1,v1>表示键值对类型的数据。后面还会出现<k2,v2>和<k3,v3>,分别代表数据的不同阶段。

  • 所以,hello.txt文件中的数据经过第一步处理之后的结果如下:
<0,hello you>
<10,hello me>

提示:第1次执行此步骤会产生<0,hello you>,第2次执行此步骤会产生<10,hello me>。并不是执行一次就获取这两行结果,因为框架每次只会读取一行数据,这里只是把两次执行的最终结果一起列出来了。

(3)第二步:MapReduce框架调用Mapper类中的map()函数。map()函数的输入是<k1,v1>,输出是<k2,v2>。一个Split对应一个Map Task,程序员需要自己覆盖Mapper类中的map()函数,实现具体的业务逻辑。

  • 因为需要统计文件中每个单词出现的总次数,所以需要先把每一行内容中的单词切开,然后记录每个单词出现次数为1,这个逻辑需要在map()函数中实现。
  • 对于<0,hello you>,执行map()函数中的逻辑之后结果为:
<hello,1>
<you,1>
  • 对于<10,hello me>,执行map()函数中的逻辑之后结果为:
<hello,1>
<me,1>

(4)第三步:MapReduce框架对map()函数输出的<k2,v2>数据进行分区,不同分区中的<k2,v2>由不同的Reduce Task处理。默认只有1个分区,所以所有的数据都会被分到1个分区中,最后只产生一个Reduce Task。

  • 经过这个步骤之后,数据没什么变化。如果有多个分区,则需要将这些数据根据指定的分区规则分开。
<hello,1>
<you,1>
<hello,1>
<me,1>

(5)第四步:MapReduce框架将每个分区中的数据都按照k2进行排序和分组。分组表示把相同k2的v2分到一个组。

  • 按照k2进行排序:
<hello,1>
<hello,1>
<me,1>
<you,1>
  • 按照k2进行分组:
<hello,{1,1}>
<me,{1}>
<you,{1}>

(6)第五步:在Map阶段,MapReduce框架选择执行Combiner过程。Combiner可以被翻译为“规约”。

  • 在这个例子中,最终是要在Reduce阶段汇总每个单词出现的总次数,所以可以在Map阶段提前执行Reduce阶段的计算逻辑,即在Map阶段对单词出现的次数进行局部汇总,这样就可以减少Map阶段到Reduce阶段的数据传输量,这就是规约的好处。

提示:并不是所有场景都可以使用规约。对于求平均值之类的操作就不能使用规约了,否则最终计算的结果就不准确了。

(7)第六步:MapReduce框架会把Map Task输出的<k2,v2>写入Linux系统的本地磁盘文件中。至此,整个Map阶段执行结束。

  • 写入Linux系统本地磁盘文件的内容大致如下:
<hello,{1,1}>
<me,{1}>
<you,{1}>

3. Reduce 阶段详解

提示:MapReduce程序是由Map和Reduce这两个阶段组成的,但是Reduce阶段并不是必需的。如果某个需求不需要最终的汇总聚合操作,则只需要对数据进行清洗处理,即数据经过Map阶段处理完就结束了,Map阶段可以直接将结果数据输出到HDFS中。

(1)第一步:MapReduce框架对多个Map Task的输出,按照不同的分区,通过网络复制到不同的ReduceTask中。这个过程被称为Shuffle。

  • 当前需求只涉及1个分区,所以把数据复制到Reduce Task之后不会发生变化。
<hello,{1,1}>
<me,{1}>
<you,{1}>

(2)第二步:MapReduce框架对Reduce Task接收到的相同分区的<k2,v2>数据进行合并、排序和分组。Reduce Task接收到的是多个Map Task的输出,所以需要对多个Map Task中相同分区的数据进行合并、排序和分组。

  • 当前需求需求只涉及1个Map Task、1个分区,所以执行合并、排序和分组之后数据是不变的。
<hello,{1,1}>
<me,{1}>
<you,{1}>

(3)第 三 步 :MapReduce框架调用Reducer类中的reduce()函数 。reduce()函数的输入是<k2,{v2...}>,输出是<k3,v3>。每个<k2,{v2...}>会调用一次reduce()函数,程序员需要覆盖reduce()函数实现具体的业务逻辑。

  • 这里需要先在reduce()函数中实现最终的聚合计算逻辑,将相同k2的{v2...}累加求和,然后转换为<k3,v3>写出去。此需求中会调用3次reduce()函数,最终的结果如下所示:
<hello,2>
<me,1>
<you,1>

(4)第四步:MapReduce框架把Reduce Task的输出结果保存到HDFS中。至此,整个Reduce阶段执行结束。

  • 结果文件内容如下:
hello 2
me 1
you 1

4. Shuffle 过程详解

(1)shuffer是一个网络拷贝的过程,是把map端产生的数据通过网络拷贝到reduce阶段进行统一聚合计算。

(2)通过上图可以看到shuffle的一些细节信息:

  • 首先看map阶段,map任务在执行的时候会把k1,v1转化为k2,v2,这些数据会先临时存储到一个内存缓冲区中,这个内存缓冲区的大小默认是100M(io.sort.mb属性),当达到内存缓冲区大小的80%(io.sort.spill.percent)也就是80M的时候,会把内存中的数据溢写到本地磁盘中(mapred.local.dir),一直到map把所有的数据都计算完,最后会把内存缓冲区中的数据一次性全部刷新到本地磁盘文件中。在上面这个图里面表示产生了3个临时文件,每个临时文件中有3个分区,这是由于map阶段中对数据做了分区,所以数据在存储的时候,在每个临时文件中也划分为了3块,最后需要对这些临时文件进行合并,合并为一个大文件,因为一个map任务最终只会产生一个文件,这个合并之后的文件也是有3个分区的。
  • 这3个分区的数据会被shuffle线程分别拷贝到三个不同的reduce节点,图里面只显示了一个reduce节点,下面还有两个没有显示。不同map任务中的相同分区的数据会在同一个reduce节点进行合并,合并以后会执行reduce的功能,最终产生结果数据。

5. 执行流程示例

(1)下面是通过单文件方式描述单词计数案例的执行流程:

(2)在单文件的执行流程中,有一些阶段数据的变化不是很清晰。下面通过多文件的方式进行分析。多文件肯定会有多个Block,这样就会产生多个Split,进而产生多个Map Task。

  • 下面示例使用自定义分区将数据分为两个分区,并且使用了可选的Combiner(规约)在Map端提前对数据进行了局部聚合,这样可以减少Shuffle过程传输的数据量,提高任务的执行效率。

四、MapReduce 在 YARN 上的架构分析

1. 基本介绍

在Hadoop 3.x中,MapReduce是在YARN中执行的。MapReduce在YARN上的运行架构,大致可以分为两个阶段:

  • 第 1 个阶段:ResourceManager(实际是ResourceManager中的ApplicationManager)启动MR AppMaster进程。MR AppMaster是MapReduce ApplicationMaster的简写,主要负责管理MapReduce任务的生命周期。对于每一个MapReduce任务都会启动一个AppMaster进程。
  • 第 2 个阶段:MR AppMaster创建应用程序,申请资源,并且监控应用程序的运行过程。

2. 详细执行流程

(1)用户通过客户端节点向集群提交任务,该任务首先会找到ResourceManager中的ApplicationManager。

(2)ApplicationManager在接收到任务后,会在集群中找一个NodeManager,并在该NodeManager所在的节点上分配一个Container(Container是YARN动态分配的资源容器,包括一定的内存和CPU),在这个Container中启动此任务对应的MR AppMaster进程,该进程用于进行任务的划分和任务的监控。

(3)MR AppMaster在启动之后,会向ResourceManager中的ApplicationManager注册其信息,目的是与之通信。这样用户就可以通过ResourceManager查询作业的运行状态了。

(4)MR AppMaster向ResourceManager中的ResourceScheduler申请计算任务所需要的资源。

(5)MR AppMaster在申请到资源之后,会与对应的NodeManager通信,要求它们启动应用程序所需的任务(Map Task和Reduce Task)。

(6)各个NodeManager启动对应的Container来执行Map Task和Reduce Task。

(7)各个任务(Map Task和Reduce Task)会向MR AppMaster汇报自己的执行进度和执行状况,以便让MR AppMaster随时掌握各个任务的运行状态,在某个任务出了问题之后重启执行该任务。在任务运行期间,用户可以通过MR AppMaster查询任务当前的运行状态。

(8)在任务执行完成之后,MR AppMaster向ApplicationManager汇报,让ApplicationManager注销并关闭自己,释放并回收资源。

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号