Mapper:
package cn.tedu.partflow; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class PartFlowMapper extends Mapper<LongWritable, Text, Text, Flow> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] arr = value.toString().split(" "); Flow f = new Flow(); f.setPhone(arr[0]); f.setAddr(arr[1]); f.setName(arr[2]); f.setFlow(Integer.parseInt(arr[3])); context.write(new Text(f.getName()), f); } }Partitioner:
package cn.tedu.partflow; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class AddrPartitioner extends Partitioner<Text, Flow> { // 指定分类规则 @Override public int getPartition(Text key, Flow value, int numReduceTasks) { // 按照地区分类 // 先拿到地区 String addr = value.getAddr(); if (addr.equals("bj")) return 0; else if (addr.equals("sh")) return 1; else return 2; } }Reducer:
package cn.tedu.partflow; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class PartFlowReducer extends Reducer<Text, Flow, Text, IntWritable> { public void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException { int sum = 0; for (Flow val : values) { sum += val.getFlow(); } context.write(key, new IntWritable(sum)); } }Driver:
package cn.tedu.partflow; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PartFlowDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "JobName"); job.setJarByClass(cn.tedu.serialflow.SerialFlowDriver.class); job.setMapperClass(PartFlowMapper.class); job.setReducerClass(PartFlowReducer.class); // 设置分区类 job.setPartitionerClass(AddrPartitioner.class); // 设置ReduceTask的数量 job.setNumReduceTasks(3); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Flow.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://10.42.3.8:9000/txt/flow.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://10.42.3.8:9000/result/partflow")); if (!job.waitForCompletion(true)) return; } }
