MapReduce的工作机制(三)

MapReduce之shuffle和排序

MapReduce确保每个reducer的输入都是按键排序的。系统执行排序、将map输出作为输入传给reducer的过程称为shuffle。

map端

map函数开始产生输出时,利用缓冲的方式写到内存并出于效率的考虑进行预排序。

  • 每个map任务都有一个环形内存缓冲区用于存储任务输出。默认缓冲区大小为100MB(可以通过mapreduce.task.io.sort.mb属性调整)。一旦缓冲内容达到阈值(mapreduce.map.sort.spill.percent属性,默认为0.8),一个后台线程便开始把内容溢出到磁盘。
  • 在溢出写到磁盘过程中,map输出继续写到缓冲区。但如果在此期间缓冲区被填满,map会被阻塞直到写磁盘过程完成。溢出内容写到指定目录中(mapreduce.cluster.local.dir属性指定)。
  • 在写磁盘之前,线程首先根据reducer把数据划分成相应的分区。在每个分区中,后台线程按键进行内存中排序,如果有一个combiner函数,它就在排序后的输出上运行。运行combiner函数使得map输出结果更紧凑,因此减少写到磁盘的数据和传递给reducer的数据。
  • map任务写完最后一个输出记录后,会有几个溢出文件。在任务完成之前,溢出文件被合并成一个已分区且已排序的输出文件。
  • 如果至少存在3个溢出文件(mapreduce.map.combine.minspills属性设置)时,则combiner就会在输出文件写到磁盘之前再次运行。如果只有1或2个溢出文件,则不会为该map输出运行combiner(combiner的运行不会影响最终结果)。
  • 将map输出写到磁盘的过程中对其压缩会加快写磁盘的速度,节约磁盘空间,并且减少传给reducer的数据量。默认不进行压缩,将mapreduce.map.output.compress设为True启用压缩,压缩格式由mapreduce.map.output.compress.codec指定。
  • reducer通过HTTP得到输出文件的分区。

reduce端

reduce任务需要集群上若干个map任务的输出作为其分区文件。每个map任务完成时间可能不同,因此在每个任务完成时,reduce任务就开始复制其输出。这是reduce任务的复制阶段。reduce任务有少量复制线程,因此能够并行取得map输出。默认是5个线程。

reduce如何知道要从哪台机器取得map输出呢?
map任务完成后,会使用心跳机制通知ApplicationMaster。因此,对于指定作业,ApplicationMaster知道map输出和主机位置之间的映射关系。reduce中的一个线程定期询问master以便获取map输出主机的位置,知道获得所有输出位置。

  • 如果map输出相当小,会被复制到reduce的JVM中,否则被复制到磁盘中。随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。(压缩的map输出都必须在内存中被解压缩
  • 复制完所有map输出后,reduce任务进入排序阶段(实际上是合并,排序是在map端进行的),这个阶段将合并map输出,维持其顺序排序。假如有50个map输出,合并因子是10,则会5次合并,最后生成5个中间文件。
  • 最后,reduce阶段直接把数据输入给reduce函数,并没有将这5个文件合并成一个已排序的文件作为一趟。
  • 在reduce阶段,对已排序输出的每个键调用reduce函数。此阶段的输出直接写到输出文件系统。