MapReduce从入门到精通

mac2025-06-21  7

目录

 

一、MapReduce概念

二、理论篇

1、序列化

2、FileInputFormat

3、CombineTextInputFormat切片机制

4、shuffle机制

shuffle机制流程图解

5、排序

6、GroupingComparator分组

7、Combiner合并

8、数据倾斜

9、ReduceTask工作机制

10、自定义OutputFormat

三、Yarn

1、Yarn相关概念

2、工作机制流程

四、案例

1、统计一堆文件中单词出现的个数(WordCount案例)

2、把单词按照ASCII码奇偶分区(Partitioner)

3、对每一个maptask的输出局部汇总(Combiner)

4、大量小文件的切片优化(CombineTextInputFormat)

5、流量汇总程序案例

6、求每个订单中最贵的商品(GroupingComparator)

7、MapReduce中多表合并案例


一、MapReduce概念

Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;

Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。

MapReduce进程:

MrAppMaster:负责整个程序的过程调度及状态协调

MapTask:负责map阶段的整个数据处理流程

ReduceTask:负责reduce阶段的整个数据处理流程

用户的业务逻辑非常复杂,那就只能多个mapreduce程序,串行运行

为什么学习Mapreduce?

开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。

二、理论篇

 

1、序列化

Java类型

Hadoop Writable类型

boolean

BooleanWritable

byte

ByteWritable

int

IntWritable

float

FloatWritable

long

LongWritable

double

DoubleWritable

string

Text

map

MapWritable

array

ArrayWritable

注意:

1)必须实现Writable接口               (2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

(3)反序列化的顺序和序列化的顺序完全一致

(4)要想把结果显示在文件中,需要重写toString(),且用”\t”分开,方便后续用

7)如自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序

2、FileInputFormat

FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 

mapreduce.input.fileinputformat.split.minsize=1 默认值为1

mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue

因此,默认情况下,切片大小=blocksize

maxsize(切片最大值):参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值。

minsize (切片最小值):参数调的比blockSize大,则可以让切片变得比blocksize还大。

3、CombineTextInputFormat切片机制

关于大量小文件的优化策略

1)默认TextInputformat是按文件规划切片,不管文件多小,都是单独的切片,都会交给一个maptask,这样如果有大量小文件,就会产生大量的maptask,处理效率极其低下。

2)优化策略

       最好的办法,在预处理/采集阶段,将小文件先合并成大文件,再上传到HDFS做后续分析。

       补救措施:使用InputFormat来做切片(CombineTextInputFormat),它的切片逻辑跟TextFileInputFormat不同:它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个maptask。

       (3)优先满足最小切片大小,不超过最大切片大小

              CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

              CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

       举例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m

详细代码:案例4

3)具体实现步骤

// 9 如果不设置InputFormat,它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class) CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

4、shuffle机制

shuffle机制流程图解

(1)partition分区

默认分区是根据key的hashCode对reduceTasks个数取模得到的。自定义partiton:

自定义类继承Partitioner,重新getPartition()方法

public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 1 获取电话号码的前三位 String preNum = key.toString().substring(0, 3); int partition = 4; // 2 判断是哪个省 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } }

job驱动中,设置自定义partitioner

job.setPartitionerClass(CustomPartitioner.class)

自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task

job.setNumReduceTasks(5);

如果reduceTask的数量  > getPartition数,则会多产生几个空的输出文件part-r-000xx;

如果 1 < reduceTask的数量  <  getPartition数,则有一部分分区数据无处安放,会Exception;

如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;

5、排序

自定义排序WritableComparable:bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序

@Override public int compareTo(FlowBean o) { // 倒序排列,从大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }

6、GroupingComparator分组

      对reduce阶段的数据根据某一个或几个字段进行分组,从而生成不同的文件。

7、Combiner合并

combinerMR程序中MapperReducer之外的一种组件,它的父类就是Reducer,区别在于运行的位置:

Combiner是在每一个maptask所在的节点运行,Reducer是接收全局所有Mapper的输出结果。

combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量

自定义一个combiner继承Reducer,重写reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable v :values){ count = v.get(); } context.write(key, new IntWritable(count)); } }

在job中的位置:

job.setCombinerClass(WordcountCombiner.class);

8、数据倾斜

产生原因:如果是多张表的操作都是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。

解决方法:在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端数据的压力,尽可能的减少数据倾斜。

具体方案:distributedcache

       (1)在mappersetup阶段,将文件读取到缓存集合中

       2)在驱动函数中加载缓存。

job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 缓存普通文件到task运行节点

9、ReduceTask工作机制

reducetask的并行度同样影响整个job的执行并发度和执行效率,maptask的并发数由切片个数决定,Reducetask数量默认是1、可以直接手动设置:

job.setNumReduceTasks(4); //具体多少个reducetask,需要根据集群性能而定

10、自定义OutputFormat

在mapreduce程序中根据数据的不同输出结果到不同目录,可以通过自定义outputformat来实现。自定义outputformat,改写recordwriter,具体改写输出数据的方法write()

输入数据接口

InputFormat--->FileInputFormat(文件类型数据读取的通用抽象类)  DBInputFormat (数据库数据读取的通用抽象类)。默认使用的实现类是:TextInputFormat,   job.setInputFormatClass(TextInputFormat.class)

TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,数据作为value

输出数据接口:OutputFormat---> 有一系列子类FileOutputformat  DBoutputFormat  .....默认是TextOutputFormat,将每一个KV对向目标文本文件中输出为一行

三、Yarn

Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序,同样Yarn也支持其他分布式计算模式。

1、Yarn相关概念

1Yarn并不清楚程序的运行机制,只提供运算资源的调度(用户程序向Yarn申请资源,Yarn就负责分配资源)

2)Yarn中的主管角色叫ResourceManager,具体运算资源的角色叫NodeManager

3)这样一来,Yarn其实就与运行的用户程序完全解耦,就意味着Yarn上可以运行各种类型的分布式运算程序(mapreduce只是其中的一种),比如mapreducestorm程序,spark程序……

2、工作机制流程

工作机制详解

       0Mr程序提交到客户端所在的节点

       1yarnrunnerResourcemanager申请一个application

       2mr将该应用程序的资源路径返回给yarnrunner

       3)该程序将运行所需资源提交到HDFS

       4)程序资源提交完毕后,申请运行mrAppMaster

       5RM将用户的请求初始化成一个task

       6)其中一个NodeManager领取到task任务。

       7)该NodeManager创建容器Container,并产生MRAppmaster

       8ContainerHDFS上拷贝资源到本地

       9MRAppmasterRM 申请运行maptask容器

       10RM将运行maptask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。

       11MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动maptaskmaptask对数据分区排序。

       12MRAppmasterRM申请2个容器,运行reduce task

       13reduce taskmaptask获取相应分区的数据。

       14)程序运行完毕后,MR会向RM注销自己。

四、案例

1、统计一堆文件中单词出现的个数(WordCount案例)

mapper类

package com.xin.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long; * 在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而是用LongWritable * VALUEIN:默认情况下,是mr框架所读到的一行文本内容,String;此处用Text * * KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String;此处用Text * VALUEOUT,是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,此处用IntWritable * @author Administrator */ public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ /** * map阶段的业务逻辑就写在自定义的map()方法中 * maptask会对每一行输入数据调用一次我们自定义的map()方法 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 将maptask传给我们的文本内容先转换成String String line = value.toString(); // 2 根据空格将这一行切分成单词 String[] words = line.split(" "); // 3 将单词输出为<单词,1> for(String word:words){ // 将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reducetask中 context.write(new Text(word), new IntWritable(1)); } } }

reducer类

package com.xin.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * KEYIN , VALUEIN 对应mapper输出的KEYOUT, VALUEOUT类型 * * KEYOUT,VALUEOUT 对应自定义reduce逻辑处理结果的输出数据类型 KEYOUT是单词 VALUEOUT是总次数 * @author Administrator */ public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * key,是一组相同单词kv对的key */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; // 1 汇总各个key的个数 for(IntWritable value:values){ count +=value.get(); } // 2输出该key的总次数 context.write(key, new IntWritable(count)); } }

主类,描述job并提交

package com.xin.wordcount; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 相当于一个yarn集群的客户端, * 需要在此封装我们的mr程序相关运行参数,指定jar包 * 最后提交给yarn * @author Administrator */ public class WordcountDriver { public static void main(String[] args) throws Exception { // 1 获取配置信息,或者job对象实例 Configuration configuration = new Configuration(); // 8 配置提交到yarn上运行,windows和Linux变量不一致 // configuration.set("mapreduce.framework.name", "yarn"); // configuration.set("yarn.resourcemanager.hostname", "hadoop103"); Job job = Job.getInstance(configuration); // 6 指定本程序的jar包所在的本地路径 // job.setJar("/home/wc.jar"); job.setJarByClass(WordcountDriver.class); // 2 指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); // 3 指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 4 指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 5 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 // job.submit(); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }

将程序打成jar包,然后拷贝到hadoop集群中。启动hadoop集群,执行wordcount程序

hadoop jar  wc.jar com.xin.wordcount.WordcountDriver /user/xin/input /user/xin/output1

2、把单词按照ASCII码奇偶分区(Partitioner)

自定义分区

package com.xin.mapreduce.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class WordCountPartitioner extends Partitioner<Text, IntWritable>{ @Override public int getPartition(Text key, IntWritable value, int numPartitions) { // 1 获取单词key String firWord = key.toString().substring(0, 1); char[] charArray = firWord.toCharArray(); int result = charArray[0]; // int result = key.toString().charAt(0); // 2 根据奇数偶数分区 if (result % 2 == 0) { return 0; }else { return 1; } } }

在驱动中配置加载分区,设置reducetask个数

job.setPartitionerClass(WordCountPartitioner.class); job.setNumReduceTasks(2);

3、对每一个maptask的输出局部汇总(Combiner)

统计过程中对每一个maptask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。

方案一:增加一个WordcountCombiner类继承Reducer

package com.xin.mr.combiner; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable v :values){ count = v.get(); } context.write(key, new IntWritable(count)); } }

在WordcountDriver驱动类中指定combiner

// 9 指定需要使用combiner,以及用哪个类作为combiner的逻辑 job.setCombinerClass(WordcountCombiner.class);

方案二:直接将WordcountReducer作为combiner在WordcountDriver驱动类中指定

// 9 指定需要使用combiner,以及用哪个类作为combiner的逻辑 job.setCombinerClass(WordcountReducer.class);

4、大量小文件的切片优化(CombineTextInputFormat)

将输入的大量小文件合并成一个切片统一处理。在cordcount中指定:

// 9 如果不设置InputFormat,它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

5、流量汇总程序案例

统计手机号耗费的总上行流量、下行流量、总流量(序列化),将结果按照手机归属地不同省份输出到不同文件中(Partitioner),并且统计结果按照总流量倒序排序(排序)

FlowBean:

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; private long downFlow; private long sumFlow; // 反序列化时,需要反射调用空参构造函数,所以必须有 public FlowBean() { super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } /** * 序列化方法 * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致 * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } @Override public int compareTo(FlowBean o) { // 倒序排列,从大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; } } import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCountSort { static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{ FlowBean bean = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 拿到的是上一个统计程序输出的结果,已经是各手机号的总流量信息 String line = value.toString(); // 2 截取字符串并获取电话号、上行流量、下行流量 String[] fields = line.split("\t"); String phoneNbr = fields[0]; long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); // 3 封装对象 bean.set(upFlow, downFlow); v.set(phoneNbr); // 4 输出 context.write(bean, v); } } static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{ @Override protected void reduce(FlowBean bean, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(), bean); } } public static void main(String[] args) throws Exception { // 1 获取配置信息,或者job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 6 指定本程序的jar包所在的本地路径 job.setJarByClass(FlowCountSort.class); // 2 指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(FlowCountSortMapper.class); job.setReducerClass(FlowCountSortReducer.class); // 3 指定mapper输出数据的kv类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 4 指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 5 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); Path outPath = new Path(args[1]); // FileSystem fs = FileSystem.get(configuration); // if (fs.exists(outPath)) { // fs.delete(outPath, true); // } FileOutputFormat.setOutputPath(job, outPath); // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }

6、求每个订单中最贵的商品(GroupingComparator)

定义订单信息OrderBean

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable<OrderBean> { private String orderId; private double price; public OrderBean() { super(); } public OrderBean(String orderId, double price) { super(); this.orderId = orderId; this.price = price; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } @Override public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.price = in.readDouble(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(orderId); out.writeDouble(price); } @Override public int compareTo(OrderBean o) { // 1 先按订单id排序(从小到大) int result = this.orderId.compareTo(o.getOrderId()); if (result == 0) { // 2 再按金额排序(从大到小) result = price > o.getPrice() ? -1 : 1; } return result; } @Override public String toString() { return orderId + "\t" + price ; } }

编写OrderSortMapper处理流程

package com.xin.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class OrderSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{ OrderBean bean = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { // 1 获取一行数据 String line = value.toString(); // 2 截取字段 String[] fields = line.split("\t"); // 3 封装bean bean.setOrderId(fields[0]); bean.setPrice(Double.parseDouble(fields[2])); // 4 写出 context.write(bean, NullWritable.get()); } }

编写OrderSortReducer处理流程

package com.xin.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class OrderSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{ @Override protected void reduce(OrderBean bean, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // 直接写出 context.write(bean, NullWritable.get()); } }

driver类

package com.xin.mapreduce.order; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderSortDriver { public static void main(String[] args) throws Exception { // 1 获取配置信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 设置jar包加载路径 job.setJarByClass(OrderSortDriver.class); // 3 加载map/reduce类 job.setMapperClass(OrderSortMapper.class); job.setReducerClass(OrderSortReducer.class); // 4 设置map输出数据key和value类型 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 5 设置最终输出数据的key和value类型 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 6 设置输入数据和输出数据路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 10 设置reduce端的分组 job.setGroupingComparatorClass(OrderSortGroupingComparator.class); // 7 设置分区 job.setPartitionerClass(OrderSortPartitioner.class); // 8 设置reduce个数 job.setNumReduceTasks(3); // 9 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }

编写OrderSortPartitioner处理流程

package com.xin.mapreduce.order; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class OrderSortPartitioner extends Partitioner<OrderBean, NullWritable>{ @Override public int getPartition(OrderBean key, NullWritable value, int numReduceTasks) { return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks; } } 编写OrderSortGroupingComparator处理流程 package com.xin.mapreduce.order; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderSortGroupingComparator extends WritableComparator { protected OrderSortGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; // 将orderId相同的bean都视为一组 return abean.getOrderId().compareTo(bbean.getOrderId()); } }

7、MapReduce中多表合并案例

reduce端表合并(数据倾斜)、创建商品和订合并后的bean

package com.xin.mapreduce.table; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class TableBean implements Writable { private String order_id; // 订单id private String p_id; // 产品id private int amount; // 产品数量 private String pname; // 产品名称 private String flag;// 表的标记 public TableBean() { super(); } public TableBean(String order_id, String p_id, int amount, String pname, String flag) { super(); this.order_id = order_id; this.p_id = p_id; this.amount = amount; this.pname = pname; this.flag = flag; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } public String getOrder_id() { return order_id; } public void setOrder_id(String order_id) { this.order_id = order_id; } public String getP_id() { return p_id; } public void setP_id(String p_id) { this.p_id = p_id; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(order_id); out.writeUTF(p_id); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { this.order_id = in.readUTF(); this.p_id = in.readUTF(); this.amount = in.readInt(); this.pname = in.readUTF(); this.flag = in.readUTF(); } @Override public String toString() { return order_id + "\t" + p_id + "\t" + amount + "\t" ; } }

编写TableMapper程序

package com.xin.mapreduce.table; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{ TableBean bean = new TableBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取输入文件类型 FileSplit split = (FileSplit) context.getInputSplit(); String name = split.getPath().getName(); // 2 获取输入数据 String line = value.toString(); // 3 不同文件分别处理 if (name.startsWith("order")) {// 订单表处理 // 3.1 切割 String[] fields = line.split(","); // 3.2 封装bean对象 bean.setOrder_id(fields[0]); bean.setP_id(fields[1]); bean.setAmount(Integer.parseInt(fields[2])); bean.setPname(""); bean.setFlag("0"); k.set(fields[1]); }else {// 产品表处理 // 3.3 切割 String[] fields = line.split(","); // 3.4 封装bean对象 bean.setP_id(fields[0]); bean.setPname(fields[1]); bean.setFlag("1"); bean.setAmount(0); bean.setOrder_id(""); k.set(fields[0]); } // 4 写出 context.write(k, bean); } }

TableReducer

package com.xin.mapreduce.table; import java.io.IOException; import java.util.ArrayList; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { // 1准备存储订单的集合 ArrayList<TableBean> orderBeans = new ArrayList<>(); // 2 准备bean对象 TableBean pdBean = new TableBean(); for (TableBean bean : values) { if ("0".equals(bean.getFlag())) {// 订单表 // 拷贝传递过来的每条订单数据到集合中 TableBean orderBean = new TableBean(); try { BeanUtils.copyProperties(orderBean, bean); } catch (Exception e) { e.printStackTrace(); } orderBeans.add(orderBean); } else {// 产品表 try { // 拷贝传递过来的产品表到内存中 BeanUtils.copyProperties(pdBean, bean); } catch (Exception e) { e.printStackTrace(); } } } // 3 表的拼接 for(TableBean bean:orderBeans){ bean.setP_id(pdBean.getPname()); // 4 数据写出去 context.write(bean, NullWritable.get()); } } }

driver类

package com.xin.mapreduce.table; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TableDriver { public static void main(String[] args) throws Exception { // 1 获取配置信息,或者job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定本程序的jar包所在的本地路径 job.setJarByClass(TableDriver.class); // 3 指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); // 4 指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); // 5 指定最终输出的数据的kv类型 job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); // 6 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }

 缺点:这种方式中,合并的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜

解决方案: map端实现数据合并

需求2:map端表合并(Distributedcache)

package test; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DistributedCacheDriver { public static void main(String[] args) throws Exception { // 1 获取job信息 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 设置加载jar包路径 job.setJarByClass(DistributedCacheDriver.class); // 3 关联map job.setMapperClass(DistributedCacheMapper.class); // 4 设置最终输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 加载缓存数据 job.addCacheFile(new URI("file:/e:/cache/pd.txt")); // 7 map端join的逻辑不需要reduce阶段,设置reducetask数量为0 job.setNumReduceTasks(0); // 8 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }

读缓存的文件

package test; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ Map<String, String> pdMap = new HashMap<>(); @Override protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 1 获取缓存的文件 BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"))); String line; while(StringUtils.isNotEmpty(line = reader.readLine())){ // 2 切割 String[] fields = line.split("\t"); // 3 缓存数据到集合 pdMap.put(fields[0], fields[1]); } // 4 关流 reader.close(); } Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 截取 String[] fields = line.split("\t"); // 3 获取订单id String orderId = fields[1]; // 4 获取商品名称 String pdName = pdMap.get(orderId); // 5 拼接 k.set(line + "\t"+ pdName); // 6 写出 context.write(k, NullWritable.get()); } }

 

最新回复(0)