Java大数据之路--MapReduce(2)序列化和分区

mac2025-12-29  6

MapReduce(分布式计算模型)---序列化和分区

一、序列化

在MapReduce中,要求数据能够被序列化MapReduce的序列化机制默认采用的AVROMapReduce对AVRO的序列化机制进行了封装,提供了更简便的序列化形式 - 实现接口Writable

案例一、创建一个flow类并对其序列化

package cn.zyj.flow; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class Flow implements Writable{ private String phone; private String name; private String addr; private int flow; public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAddr() { return addr; } public void setAddr(String addr) { this.addr = addr; } public int getFlow() { return flow; } public void setFlow(int flow) { this.flow = flow; } //反序列化 public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub //按照什么顺序写的就按照什么顺序读 this.phone = in.readUTF(); this.addr = in.readUTF(); this.name = in.readUTF(); this.flow = in.readInt(); } //序列化 //只需要将有必要的属性来依次写出即可序列化 public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(phone); out.writeUTF(addr); out.writeUTF(name); out.writeInt(flow); } }

二、分区

分区操作是shuffle操作中的一个重要过程,作用就是将map的结果按照规则分发到不同reduce中进行处理,从而按照分区得到多个输出结果。Partitioner是分区的基类,如果需要定制partitioner也需要继承该类HashPartitioner是MapReduce的默认partitioner。计算方法是:which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks默认情况下,reduceTask数量为1很多时候MapReduce自带的分区规则并不能满足业务需求,为了实现特定的效果,可以需要自己来定义分区规则如果定义了几个分区,则需要定义对应数量的ReduceTask

案例一、求每个城市中每个人使用的流量

Mapper:

package cn.tedu.partflow; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class PartFlowMapper extends Mapper<LongWritable, Text, Text, Flow> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] arr = value.toString().split(" "); Flow f = new Flow(); f.setPhone(arr[0]); f.setAddr(arr[1]); f.setName(arr[2]); f.setFlow(Integer.parseInt(arr[3])); context.write(new Text(f.getName()), f); } }

Partitioner:

package cn.tedu.partflow; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class AddrPartitioner extends Partitioner<Text, Flow> { // 指定分类规则 @Override public int getPartition(Text key, Flow value, int numReduceTasks) { // 按照地区分类 // 先拿到地区 String addr = value.getAddr(); if (addr.equals("bj")) return 0; else if (addr.equals("sh")) return 1; else return 2; } }

Reducer:

package cn.tedu.partflow; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class PartFlowReducer extends Reducer<Text, Flow, Text, IntWritable> { public void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException { int sum = 0; for (Flow val : values) { sum += val.getFlow(); } context.write(key, new IntWritable(sum)); } }

Driver:

package cn.tedu.partflow; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PartFlowDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "JobName"); job.setJarByClass(cn.tedu.serialflow.SerialFlowDriver.class); job.setMapperClass(PartFlowMapper.class); job.setReducerClass(PartFlowReducer.class); // 设置分区类 job.setPartitionerClass(AddrPartitioner.class); // 设置ReduceTask的数量 job.setNumReduceTasks(3); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Flow.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://10.42.3.8:9000/txt/flow.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://10.42.3.8:9000/result/partflow")); if (!job.waitForCompletion(true)) return; } }

 

最新回复(0)