1.概述
Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂。
在MR框架中,Shuffle是连接Mapper和Reducer之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce, 而Reduce阶段负责从Map端拉取数据并行计算。在整个shuffle过程中,往往伴随着大量的磁盘IO和网络的数据传输。所以,shuffle性能的高低也直接决定了整个应用程序性能的高低。相比较于MR的shuffle, Spark框架也有自己的独特的shuffle实现过程。
2.Hadoop的Mapreduce Shuffle原理
- Shuffle过程中,提供数据的一端被称作Map端,Map端每个生成数据的任务称为Mapper
- 接收数据的一端被称作reduce端,reduce端负责拉取数据,拉取的数据被称为Reducer
- Shuffle过程的本质上是将Map端处理的数据使用分区器进行划分,并将这些数据落地磁盘,并由Reducer端负责拉取属于自己分区的数据的过程。
- Map端的shuffle过程
- 1.input, 根据input用split切分数据,运行map任务。
- 2.partition:每个map task都有内存缓冲区,存储着map的输出结果.
- 3.spill: 当缓冲区快满的时候需要将缓冲区的数据以临时文件的方式存放到磁盘。
- 4.merge: 合并操作,当整个map task结束后再对磁盘中这个map task产生的临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉取数据。
- Reduce端的shuffle过程
- 拉取数据;
- 合并从各个节点拉取来的磁盘小文件
- Reducer计算
- 最终输出计算的结果
3.Spark Shuffle的演变历史
- Spark 0.8及以前 Hash Based Shuffle
- Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
- Spark 0.9 引入ExternalAppendOnlyMap
- Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
- Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
- Spark 1.4 引入Tungsten-Sort Based Shuffle
- Spark 1.6 Tungsten-sort并入Sort Based Shuffle
- Spark 2.0 Hash Based Shuffle退出历史舞台
4.Spark Shuffle的演变过程与原理
4.1.Hash Shuffle普通机制
- shuffle write
- 将每个task处理的数据按key进行分区,即对相同的key应用hash算法,从而将相同的key写入同一个磁盘文件中,而每一个磁盘文件都只属于reduce端的stage的一个task
- shuffle read
- stage的每一个task就需要将上一个stage的计算结果中所有相同的key, 从各个节点上通过网络传输都拉取到自己所在的节点上,然后进行key的聚合或连接等操作,shuffle read的拉取过程是一边拉取一边进行聚合
- Hash Shuffle普通机制存在的问题
- shuffle前在磁盘上会产生海量的数据小文件(M*R, 即map task的数量*reduce task的数量),建立通信和拉取数据的次数就会变多,此时会产生大量耗时的IO操作,可能因为网络原因或者会产生OOM风险。
4.2.Hash Shuffle的合并机制
- 每一个Executor进程根据核数,决定Task的并发数量,比如executor核数是2,就是可以并发运行两个task,如果是一个则只能运行一个task。
- 假设executor核数是1,MapTask数量是M,那么它依然会根据ResultTask的数量R,创建R个buffer缓存,然后对key进行hash,数据进入不同的buffer中,每一个buffer对应着一个block file,用于刷新buffer缓存里的数据。
- 然后下一个task运行的时候,那么不会再创建新的buffer和block file,而是复用之前的task已经创建好的buffer和block file。即同一个Executor进程里所有Task都会把相同的key放入相同的buffer缓冲区中。
- 这样的话,生成文件的数量就是(本地worker的executor数量*executor的cores*ResultTask数量)如上图所示,即2 * 1* 3 = 6个文件,每一个Executor的shuffle MapTask数量100,ReduceTask数量为100,那么未优化的HashShuffle的文件数是2 *1* 100*100 =20000,优化之后的数量是2*1*100 = 200文件,相当于少了100倍。
- 缺点:如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。
4.3 Sort Shuffle普通机制
为了缓解Shuffle过程产生文件数过多和Writer缓存开销过大的问题,spark引入了类似于hadoop Map-Reduce的shuffle机制。该机制每一个Shuffle的MapTask不会为后续的任务创建单独的文件,而是会将所有的Task结果写入同一个文件,并且对应生成一个索引文件。以前的数据是放在内存缓存中,等到数据完了再刷到磁盘,现在为了减少内存的使用,在内存不够用的时候,可以将输出溢写到磁盘,结束的时候,再将这些不同的文件联合内存的数据一起进行归并,从而减少内存的使用量。一方面文件数量显著减少,另一方面减少Writer缓存所占用的内存大小,而且同时避免GC的风险和频率。
- 在该模式下,数据会先写入一个内存数据结构中(默认5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
- 排序
在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。 - 溢写
排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java BufferedOutputStream实现的。首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。 - 合并(merge):
一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
4.4 Sort Shuffle的by pass机制
对于Bypass机制,使用这个模式需要考虑几下几点:
- 主要用于处理不需要排序和聚合的Shuffle操作,所以数据是直接写入文件,数据量较大的时候,网络I/O和内存负担较重
- 主要适合处理Reducer任务数量比较少的情况下
- 将每一个分区写入一个单独的文件,最后将这些文件合并,减少文件数量;但是这种方式需要并发打开多个文件,对内存消耗比较大。
- 因为bypass机制这种方式不需要排序,所以效率比Sort Shuffle的普通机制高,所以在reduce端数量不大,又不需要在map端做聚合和排序,并且shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值, 就用这种方式。