Java大数据之路--MapReduce(1)

mac2025-01-25  44

MapReduce(分布式计算模型)

目录

MapReduce(分布式计算模型)

一、概述

二、入门案例

案例一、统计文件中的每一个单词出现的次数(文件:words.txt)

案例二、找出最大值

案例三、输出每一个单词出现的文件(目录:invert)


一、概述

MapReduce是一种分布式计算模型由谷歌提出,基于GFS进行设计,主要用于搜索领域中解决海量数据的计算问题Doug Cutting根据《MapReduce: Simplified Data Processing on Large Clusters》设计实现了Hadoop中基于HDFSMapReduceMapReduce是由两个阶段组成:Map和Reduce,用户只需要实现map以及reduce两个函数,即可实现分布式计算,这样做的目的是简化分布式程序的开发和调试周期,Map(映射)阶段和Reduce(规约)阶段.MapReduce中的键值对默认以制表符隔开,_开头的文件在MapReduce中会认为是隐藏文件,默认不读

二、入门案例

案例一、统计文件中的每一个单词出现的次数(文件:words.txt)

Mapper类:

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {     // key -- 当前行的开始位置在整个文件中的偏移量     // value -- 当前行的内容     // context -- 环境对象     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {         // 获取一行数据         String line = value.toString();         // 以空格为单位进行切分,获得单词对应的数组         String[] arr = line.split(" ");         // 遍历数组,输出这个词对应的频率         for (String str : arr) {             context.write(new Text(str), new LongWritable(1));         }     } }

 

Reducer类:

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {     // key 键     // it 集合的迭代器     // context 环境对象     public void reduce(Text key, Iterable<LongWritable> values, Context context)         throws IOException, InterruptedException {         // 定义变量记录次数         long sum = 0;         // 循环 遍历集合,进行累加的操作,得到当前单词出现的总次数         for (LongWritable val : values) {             // 记录总次数             sum += val.get();         }         // 输出数据,key是单词,value是在map阶段这个单词出现的总的次数         context.write(key, new LongWritable(sum));     } }

 

Driver类:

public class WordCountDriver {     public static void main(String[] args) throws Exception {         // 获取当前的默认配置         Configuration conf = new Configuration();         // 获取代表当前mapreduce作业的JOB对象          Job job = Job.getInstance(conf);         // 指定当前程序的入口类         job.setJarByClass(cn.zyj.wc.WordCountDriver.class);         // 设置要执行的Mapper类         job.setMapperClass(WordCountMapper.class);         // 设置要执行的Reducerr类         job.setReducerClass(WordCountReducer.class);         // 设置Mapper的结果类型         job.setMapOutputKeyClass(Text.class);         job.setMapOutputValueClass(LongWritable.class);         // 设置Reducer的结果类型         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(LongWritable.class);         // 设置输入路径         //  如果输入的是文件,那么读取的是指定的文件         // 如果输入的是目录,则读取当前目录下的所有的文件         FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.32.138:9000/mr/words.txt"));         // 设置输出路径         FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.32.138:9000/wcresult"));         // 执行job         if (!job.waitForCompletion(true))             return;     } } words.txt hello tom hello bob hello joy hello rose hello joy hello jerry hello tom hello rose hello joy

案例二、找出最大值

Mapper:

import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxMapper extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] sp = value.toString().split(" "); context.write(new Text(sp[0]), new IntWritable(Integer.parseInt(sp[1]))); } }

Reducer:

package cn.zyj.maxDemo; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 泛型表示输入输出两个k-v * @author Administrator * */ public class MaxReduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // process values //第一种方法采用基本类型 // int max = 0; // for (IntWritable val : values) { // if (Integer.parseInt(val.toString())>max) { // max = Integer.parseInt(val.toString()); // } // } // context.write(key, new IntWritable(max)); //第二种方法迭代器,需要注意的问题,采用地址复用 // 在MapReduce中,为了减少对象的创建和销毁,采用了地址复用机制 // 在迭代过程中,被迭代的对象只创建一次 IntWritable max = new IntWritable(0); // key = Bob // values = 684 512 340 312 // IntWritable val = new IntWritable(); // val.set(684); // val.get() > max.get() -> 684 > 0 -> true // max = val; - 将val赋值给max,给的是地址,所以max和val的指向地址一致 // val.set(512); // val.get() > max.get() -> 512 > 512 -> false // 最后max的值是最后一个被迭代的值 for (IntWritable val : values) { if (val.get()>max.get()) { //max=val; max.set(val.get()); } } context.write(key, max); } }

Driver:

package cn.zyj.maxDemo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(); job.setJarByClass(cn.zyj.maxDemo.MaxDriver.class); // TODO: specify a mapper job.setMapperClass(MaxMapper.class); // TODO: specify a reducer job.setReducerClass(MaxReduce.class); // TODO: specify output types job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // TODO: specify input and output DIRECTORIES (not files) FileInputFormat.setInputPaths(job, new Path("hdfs://10.42.3.8:9000/txt/score2.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://10.42.3.8:9000/result/max1")); if (!job.waitForCompletion(true)) return; } } score2.txt Bob 684 Alex 265 Grace 543 Henry 341 Adair 345 Chad 664 Colin 464 Eden 154 Grover 630 Bob 340 Alex 367 Grace 567 Henry 367 Adair 664 Chad 543 Colin 574 Eden 663 Grover 614 Bob 312 Alex 513 Grace 641 Henry 467 Adair 613 Chad 697 Colin 271 Eden 463 Grover 452 Bob 548 Alex 285 Grace 554 Henry 596 Adair 681 Chad 584 Colin 699 Eden 708 Grover 345

案例三、输出每一个单词出现的文件(目录:invert)

Mapper:

package cn.zyj.file; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class fileMapper extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split(" "); for (String string : val) { FileSplit fs = (FileSplit) context.getInputSplit(); String name = fs.getPath().getName(); context.write(new Text(string), new Text(name)); } } }

Reducer:

package cn.zyj.file; import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class fileReduce extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // process values Set<String> set = new HashSet<String>(); for (Text val : values) { set.add(val.toString()); } context.write(key, new Text(set.toString())); } }

Driver:

package cn.zyj.file; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class fileDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "JobName"); job.setJarByClass(cn.zyj.file.fileDriver.class); // TODO: specify a mapper job.setMapperClass(fileMapper.class); // TODO: specify a reducer job.setReducerClass(fileReduce.class); // TODO: specify output types job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // TODO: specify input and output DIRECTORIES (not files) FileInputFormat.setInputPaths(job, new Path("hdfs://10.42.3.8:9000/txt/invert")); FileOutputFormat.setOutputPath(job, new Path("hdfs://10.42.3.8:9000/result/invert1")); if (!job.waitForCompletion(true)) return; } } //a.txt hello nio hi concurrent hello zookeeper hello thrift hi avro //b.txt hi hadoop hello hdfs hi mapreduce hi yarn //c.txt hadoop hdfs netty nio serial avro hadoop mapreduce serial thrift //d.txt nio mina proto serial avro serial observer zookeeper ozone hadoop

 

最新回复(0)