分布式计算框架,是一种编程模型,思想:分而治之
离线大数据计算,主要解决海量离线数据的批处理。
四部分组成 a.Client
把用户编写的MapReduce程序提交到JobTracker端 用户通过client提供的一些接口查看作业运行状态b.JobTracker
负责资源监控和作业调度 监控所有的taskTracker和job的健康状态,一旦失败就将相应的任务转移到其他节点 跟踪任务的执行进度和资源使用量,并将这些信息告诉任务调度器c.taskTracker
会周期性通过心跳将本节点的资源使用情况和任务运行进度汇报给JobTracker 接收JobTracker发过来的命令并执行 用”sort“等量划分本节点的资源,将sort 分配给taskd.task
分为map task 和 reduce task 两种,均由taskTracker启动 slot分为MapSlot和ReduceSlot两种整体流程:Input Split --> Map --> Combiner(optional) --> Shuffle --> Reduce --> Output
1.切片
在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)) minSize的默认值是1,而maxSize的默认值是long类型的最大值,即可得切片的默认大小是blockSize(128M) maxSize参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值 minSize参数调的比blockSize大,则可以让切片变得比blocksize还大 hadoop为每个分片构建一个map任务,可以并行处理多个分片上的数据,整个数据的处理过程将得到很好的负载均衡,因为一台性能较强的计算机能处理更多的数据分片. 分片也不能切得太小,否则多个map和reduce间数据的传输时间,管理分片,构建多个map任务的时间将决定整个作业的执行时间.(大部分时间都不在计算上) 如果文件大小小于128M,则该文件不会被切片,不管文件多小都会是一个单独的切片,交给一个maptask处理.如果有大量的小文件,将导致产生大量的maptask,大大降低集群性能.
大量小文件的优化策略:
(1) 在数据处理的前端就将小文件整合成大文件,再上传到hdfs上,即避免了hdfs不适合存储小文件的缺点,又避免了后期使用mapreduce处理大量小文件的问题。(最提倡的做法)
(2)小文件已经存在hdfs上了,可以使用另一种inputformat来做切片(CombineFileInputFormat),它的切片逻辑和FileInputFormat(默认)不同,它可以将多个小文件在逻辑上规划到一个切片上,交给一个maptask处理。
2.环形缓存区
经过map函数的逻辑处理后的数据输出之后,会通过OutPutCollector收集器将数据收集到环形缓存区保存。 环形缓存区的大小默认为100M,当保存的数据达到80%时,就将缓存区的数据溢出到磁盘上保存。 3.溢出
环形缓存区的数据达到其容量的80%时就会溢出到磁盘上进行保存,在此过程中,程序会对数据进行分区(默认HashPartition)和排序(默认根据key进行快排) 缓存区不断溢出的数据形成多个小文件 4.合并
溢出的多个小文件各个区合并在一起(0区和0区合并成一个0区),形成大文件 通过归并排序保证区内的数据有序 5.shuffle
从过程2到过程7之间,即map任务和reduce任务之间的数据流称为shuffle(混洗),而过程5最能体现出混洗这一概念。一般情况下,一个reduce任务的输入数据来自与多个map任务,多个reduce任务的情况下就会出现如过程5所示的,每个reduce任务从map的输出数据中获取属于自己的那个分区的数据。
6.合并
运行reducetask的节点通过过程5,将来自多个map任务的属于自己的分区数据下载到本地磁盘工作目录。这多个分区文件通过归并排序合并成大文件,并根据key值分好组(key值相同的,value值会以迭代器的形式组在一起)。
7.reducetask
reducetask从本地工作目录获取已经分好组并且排好序的数据,将数据进行reduce函数中的逻辑处理。
8.输出
每个reducetask输出一个结果文件。
分块 作用:为了方便文件的管理和备份,分块是数据存储的逻辑划分(hdfs存储) 块儿是hdfs存储系统中的最小单位,默认一个块儿是64兆,一个块儿存储在一个datanode
分片 作用:分片是数据处理的输入逻辑划分(MapReduce计算) 计算过程数据可能会很多,所以把数据分成小片,便于计算,一个分片对应一个map
默认分片大小与分块大小是相同
//分片的 默认参数:minSize=1字节,maxSize=1L(1长整型) //分片和分块都是默认的话:片儿的大小 = 块儿的大小 protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize , Math.min(maxSize, 块的大小)); }为了获得高性能,所以提倡数据本地化,一个块儿存储在一个节点上,所以一个片也最好在是这个节点上的。 如果 片儿size > 块儿size,那么片儿的数据会包括两个块儿的数据(两个),也就是包括两个节点的数据,违背数据本地化
1、Input-Map-Reduce-Output
2、Input-Map-Output
3、Input-Multiple Maps-Reduce-Output
4、Input-Map-Combiner-Reduce-Output
源码:根据 block块儿大小、设定的片的minSize、maxSize大小三个值,计算出片儿的大小
//计算出 片的大小 return Math.max(minSize , Math.min(maxSize, 块的大小));再根据 片儿的大小、片儿的偏移量、块儿的列表,获取片的信息(路径、偏移量、块儿的副本列表信息、缓存信息)及片儿个数,就是map的个数
有差错或者需要补充的地方,还望大家评论指出,并详细论证,相互学习,共同进步哈!