图来自于大佬博客: https://blog.csdn.net/zhongqi2513/article/details/78321664
整个过程拆分成四小部分进行操作:
读取数据
MRAppMaster 解析程序找到map端文件输入路径与reduce端文件输出路径
经过 InputFormat.getSplit( ), 进行逻辑切片, 确定mapTask的个数
map端真正读取文件的代码: InputFormat.getRecordReader.nextKeyValue()
读取文件数据, 执行map端逻辑后输出, 至此开始进入map端shuffle
map 端shuffle
reduce 端shuffle
输出到 HDFS
reduceTask 的个数与map端 输出时, 分区个数相同
即: int numPartition = Partition.getPartition(outKey)
有几个reduce, 就输出到 HDFS 几个文件
map 端 shuffle
map 端执行代码逻辑后, 输出数据首先经过分区: Partition.getPartition(outKey)
分区后进入内存中的环形缓冲区
环形缓冲区在mapTask这个jvm进行启动时, 就初始化好了
初始化一个 kvBuffer 字节数组, 临时存储数据
初始化一个 spillThread 线程, 执行任务
环形缓冲区, 默认大小100M (生产环境中会根据处理数据量进行改变)
当数据写入达到 环形缓冲区大小80% 时, 首先进行排序(根据分区编号以及key进行排序)然后将其刷写到磁盘中, 形成一个个spill文件
将一个个磁盘文件进行 Merge 合并为一个磁盘文件
为其创建索引文件, 记录每个分区的起始与结束偏移量
至此 map端shuffle 结束, 进入 网络传输阶段
reduce 端 shuffle
在有一个 maptask 执行完成后, reduceTask 就会进行拉取数据, 一边拉取一边合并
根据建立的索引文件, 拉取对应分区的数据首先到内存中
若拉取数据量小, 则直接保存在内存中, reduce端直接在内存中进行读取 若拉取数据量大, 则超过内存中量时, 刷写到磁盘中, 形成一个个磁盘文件, 再合并为一个, reduce 端读取该磁盘文件的数据
至此 reduce端shuffle 结束