MapReduce简介

对于大规模的数据,我们要对其进行查询、分析,单机效率是比较低的;在多机的情况下,我们则需要一种合适的编程的方式方法来处理大规模的数据。MapReduce作为一个早期的解决这类问题的模型,算是当前分布式系统的一个基础版本,当下的现代分布式计算引擎基本上都演化自MapReduce。

多机大规模调度带来的问题

1、进行分析查询的时候,文件可能非常大,内容被分布式存储在不同的磁盘上,怎么处理?(怎么查)

2、文件过大,怎么采用高效率的处理方法来进行处理?(怎么高效查询)

3、文件过大,一部分硬盘失效怎么处理?(怎么保证容错)

4、文件过大,时间很长,任务失败的话怎么处理?(如何失败处理)

可能存在的瓶颈

1、网络瓶颈:网络传输的速率可能不是很快,且不够稳定。如何更稳定/传输更小的流量是一个问题。

2、磁盘瓶颈:磁盘的I/O读写远远慢于内存,以往小文件可以在内存中加载查询,但是大型文件则需要合适的分布式加载方法来提升效率。

3、CPU、内存瓶颈:整个系统的CPU、内存是一定的,我们如果想让整个系统尽量快的完成任务,那么在总任务量一定的情况下就需要通过合适的调度手段来尽量吃满CPU、内存,从而更快的完成任务。

MapReduce流程

这里引用一张网上的图片,这张图算是比较详细的阐述了MapReduce的整个流程和解决方案。

image.png

然后是调度流程:

image.png

接下来我会比较详细的阐述每一步的流程,以及其具体打算解决的问题。

作业初始化

在MapReduce任务执行的开始,我们需要进行作业初始化。那么,JobClient会向JobTracker请求一个作业ID,然后完成初始化的各项准备工作和检查,对于输入文件进行划分,最后准备进入下一个阶段——作业提交。

任务分配

既然作业已经被创建并初始化,那么接下来的步骤就是给作业分配任务了。一个作业可以被分配一系列的任务去执行,这些任务被TaskTracker管理。

首先就是检测JobTracker心跳包,确保作业可以被申请。然后如果当前TaskTracker执行的作业数量没有达到设计的值,JobTracker就会向TaskTracker发送新任务。至于为何TaskRunner不直接由JobTracker来控制,主要原因可能是直接控制会带来更多的网络负载,这样对于I/O优化是不利的。这里的新任务具有一个优先级,优先级高的任务会优先被发配。这里也存在一些例如机架感知之类的优化,暂不赘述。

任务执行

任务执行的流程会产生一个TaskRunner;流程是产生一个新的JVM虚拟机,然后复制jar到合适的目录,最后再新建TaskRunner实例来负责执行任务。

Q:进度更新解决方案

对于进度的更新,则通过内置的计数器解决。同时会隔一段时间发送心跳包更新当前状态,可以通过WebUI进行查询。

任务失败

一般对于无响应的Task采取一个等待一段时间再重新发送心跳包的方式,否则会杀死进程;而对于发生错误的Task,我们会尝试重新调度,除非其失败次数超过我们设定的上限;TaskTracker在长时间无响应的时候,会被JobTracker移除;如果其失败次数过高,那么会被放在黑名单中。

作业调度

使用了Fair Scheduler;这个调度器保证了每个用户所能调度的Job所获得的总资源是尽量平均的。每个用户都拥有一个资源池,如果某个池在一定时间内获得的资源调度是不公平的,调度器就会终止掉池中获得较多资源的Job,从而把资源让给资源不足的池。这样可以保证不会某个Job吃掉大量资源导致原先运行相对快的Job运行缓慢;不过现在一般使用Yarn调度,这里暂时不叙述。

Map

Map所接收的数据来自内存,对于比较大的数据则会在之前的步骤被拆分,然后部分读取到内存中。然后Map则会按照我们的自定义函数来对于数据进行处理。处理完成的数据则会被写入到OutputCollector(缓冲区)中。对于后面没有Reduce的Map,我们会直接写入;否则我们会对数据进行序列化后再写入。当缓冲大小达到一定程度后,将会启动一个线程把数据spill到磁盘上。

Map所起到的作用主要是对大规模的K-V数据进行数据拆分。至于缓冲区的实际运作方式,在这篇文章中有写,这里不多说。

Combiner

这个部分是一个编程接口,用来缩减Map所产生的数据输出。一个更小的数据输出可以使得写入磁盘并被Reduce所接收的数据更小,从而减少了网络瓶颈。

Partitioner

这个部分是一个编程接口,用来对Map输出后的数据进行切分。这里支持用户自定义的好处就是规避掉默认情况下的一些小坑:hashcode大量雷同导致数据分布不均匀(用户自定义合适的hash函数),输出总体无序。

Q:输出无序的解决方案

自己定义一个Partitioner,用输入数据的最大值除以系统Reduce task数量的商作为分割边界,也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行Partition后的数据是整体有序的。通过只启动一个Reducer也可以达到类似的效果,不过这样就无法发挥集群优势了。对于数据不均匀倾斜的问题,可以参考这篇文章解决,现在来简单描述一下这篇文章。对于数据倾斜,我们可以采用的方法有:在combiner先进行一次聚合,减少发送到reducer的数据量;同时,还有一个策略是对于相同的key,在map层面增加一个随机的salt附加到key后面(比如wordcount的时候,出现单词aaa,map后产生的key有如aaa1,aaa3这种),增加一轮reduce来进行合并。
也可以修改参数hive.groupby.skewindata=true来抑制数据倾斜的危害。

Shuffle/Sort

这个部分可以参考这篇文章。sort的时候我们一般会使用hashcode作为关键字的方式来进行sort,在要求有序时我们才会用键值的方式排序。

这里稍微详细讲一下这个文章吧。

image.png

这里我们利用partition进行数据的分割处理后,会把数据送入collector里面,而这里面维护了一个kvbuffer的数据结构。这个数据结构是环形的;一边是存储的kv数据索引,一边是实际上的数据,两边从一个起点开始同时向两个方向增长。

image.png

当环形缓冲区占用达到80%的时候,我们就会对其进行进行spill操作,把数据写入到磁盘上。spill后,我们取当前kvbuffer剩余空间的中点作为新的起始坐标然后一路往上覆盖即可。这里这么处理的原因是可以在spill的同时map,不影响数据本身。当然缓冲区占满了的话还是会触发停顿。

image.png

Reduce

这个部分首先是对之前产出的文件进行合并排序,文件来源是JobTracker所安排发送的信息。对于每个不同的key,最后都会被执行一次reduce函数,直到全部完成。

别的细节

别的细节相对比较简单,具体可以直接查阅《Hadoop权威指南 第四版》的181-187页。

问题解决方案

这里针对前面所提出的一系列问题,给出我理解的MapReduce对其进行解决的方案。

1、文件读取的解决方案:这里使用了读取元信息,同时切分的方案。对输入文件进行切分,然后就可以分布式的读取到内存中,从而解决了大文件读取的问题。根据这篇文章,这里切分我们考虑的问题有切分的块大小要和hdfs的数据块大小对齐。

2、高效文件处理的解决方案:这里在前面的流程分析的时候写过了,不再赘述。

3、容错的解决方案:这里主要考虑的是上文对于磁盘处理的容错性;这里采用了HDFS的三副本的方法,机架感知来解决。由于不涉及到MapReduce所以略过;

4、任务失败处理:采取了网络通信心跳包的方式探测是否存活且网络连接可行,使用了推测执行的方式(某个任务执行过慢就再次启动一个新的)以及黑名单机制来保证对于部分运转缓慢机器的过滤。

瓶颈解决方案

这里针对前面所提出的一系列瓶颈,给出我理解的MapReduce对其进行解决的方案。

1、网络瓶颈:对于大规模的数据集,我们则需要尽量避免直接对于数据的复制,对于大数据采取复制可能相对较小的jar包(也就是打包好的程序)到对应机器上执行的策略,从而避免对于过大数据集的传输;小数据则可以直接序列化传输读取到内存中,在spark的操作中则是广播(Broadcast)来进行操作;对于不得不利用网络传输的数据,采用了序列化的数据压缩策略,同时设计了checksum来保证传输的正确性,从而尽量避免了网络瓶颈。

2、磁盘瓶颈:尽量保证更多的内容在内存中被处理,这里利用了环形缓冲区的技巧,不再赘述;同时在不得不进行spill操作的时候尽量利用合理的文件大小来进行对于磁盘性能的优化。同时利用很多的trick来进一步减少对于磁盘的读写。

3、CPU、内存瓶颈:对于负载均衡的问题来说,这里使用了合适的调度算法来解决这一个问题。在现在的Hadoop版本中,使用了Yarn作为资源调度框架,而以往的版本也使用了一些诸如优先级调度、Preempt等方法来保证尽量释放性能,避免长期负载不均衡从而导致性能问题。

It is my final heart.
最后更新于 2022-07-24