MapReduce-shuffle 流程详解

mac2024-10-25  16

hadoop-MapReduce-shuffle

一. MapReduce 整体流程

mapreduce编程套路: 数据加载读取: FileInputFormat TextInputFormat RecordReader LineRecordReader 返回:LongWritable getCurrentKey() 返回:Text getCurrentValue() Mapper<> map(LongWritable key,Text value,Context context){ //获取 拆分 封装 发送 context.write(k,v); } shuffle: 1.分区:决定reducetask的并行度,减轻reduce压力;继承Partitioner 2.排序:默认按照map输出的key字典升序;实现WritableComparator 3.分组:默认按照map输出的key分组,相同key分到一组 4.combiner:默认没有,优化,局部聚合组件(针对每个maptask的输出结果做聚合) 贯穿整个shuffle过程中,并非某特定步骤,过程中都有用到. Reducer<> reduce(key,values,context){ context.write(k,v); } 输出(文件写出): FileOutputFormat TextOutputFormat RecordWriter LineRecordWriter 输出 ---> HDFS

二. Shuffle 过程详解

( 一 ) shuffle 整体

图来自于大佬博客: https://blog.csdn.net/zhongqi2513/article/details/78321664

整个过程拆分成四小部分进行操作:

读取数据

MRAppMaster 解析程序找到map端文件输入路径与reduce端文件输出路径

经过 InputFormat.getSplit( ), 进行逻辑切片, 确定mapTask的个数

map端真正读取文件的代码: InputFormat.getRecordReader.nextKeyValue()

读取文件数据, 执行map端逻辑后输出, 至此开始进入map端shuffle

map 端shuffle

reduce 端shuffle

输出到 HDFS

reduceTask 的个数与map端 输出时, 分区个数相同

​ 即: int numPartition = Partition.getPartition(outKey)

有几个reduce, 就输出到 HDFS 几个文件

( 二 ) shuffle 详细

map 端 shuffle

map 端执行代码逻辑后, 输出数据首先经过分区: Partition.getPartition(outKey)

分区后进入内存中的环形缓冲区

环形缓冲区在mapTask这个jvm进行启动时, 就初始化好了

​ 初始化一个 kvBuffer 字节数组, 临时存储数据

​ 初始化一个 spillThread 线程, 执行任务

​ 环形缓冲区, 默认大小100M (生产环境中会根据处理数据量进行改变)

当数据写入达到 环形缓冲区大小80% 时, 首先进行排序(根据分区编号以及key进行排序)然后将其刷写到磁盘中, 形成一个个spill文件

将一个个磁盘文件进行 Merge 合并为一个磁盘文件

为其创建索引文件, 记录每个分区的起始与结束偏移量

至此 map端shuffle 结束, 进入 网络传输阶段

reduce 端 shuffle

在有一个 maptask 执行完成后, reduceTask 就会进行拉取数据, 一边拉取一边合并

根据建立的索引文件, 拉取对应分区的数据首先到内存中

若拉取数据量小, 则直接保存在内存中, reduce端直接在内存中进行读取 若拉取数据量大, 则超过内存中量时, 刷写到磁盘中, 形成一个个磁盘文件, 再合并为一个, reduce 端读取该磁盘文件的数据

至此 reduce端shuffle 结束

最新回复(0)