idea下mapreduce的wordcount

mac2023-02-01  33

idea下mapreduce的wordcount

pom.xml

<?xml version="1.0" encoding="UTF-8"?> ​ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion> ​  <groupId>com.henu</groupId>  <artifactId>henu</artifactId>  <version>1.0-SNAPSHOT</version> ​  <name>henu</name>  <!-- FIXME change it to the project's website -->  <url>http://www.example.com</url> ​  <properties>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>    <maven.compiler.source>1.8</maven.compiler.source>    <maven.compiler.target>1.8</maven.compiler.target>  </properties> ​  <dependencies>    <dependency>      <groupId>junit</groupId>      <artifactId>junit</artifactId>      <version>RELEASE</version>    </dependency>    <dependency>      <groupId>log4j</groupId>      <artifactId>log4j</artifactId>      <version>1.2.17</version>    </dependency>    <dependency>      <groupId>org.apache.hadoop</groupId>      <artifactId>hadoop-common</artifactId>      <version>2.7.2</version>    </dependency>    <dependency>      <groupId>org.apache.hadoop</groupId>      <artifactId>hadoop-client</artifactId>      <version>2.7.2</version>    </dependency>    <dependency>      <groupId>org.apache.hadoop</groupId>      <artifactId>hadoop-hdfs</artifactId>      <version>2.7.2</version>    </dependency>  </dependencies> ​  <build>    <plugins>      <plugin>        <groupId>org.apache.maven.plugins</groupId>        <artifactId>maven-compiler-plugin</artifactId>        <configuration>          <source>1.8</source>          <target>1.8</target>          <encoding>utf-8</encoding>        </configuration>      </plugin>    </plugins>  </build> </project>

WordCount

package com.henu; ​ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; ​ import java.io.IOException; ​ /** * @author George * @description **/ public class WC { ​    public static class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable>{        Text k1 = new Text();        IntWritable v1 = new IntWritable(1); ​        @Override        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            String line = value.toString();            String[] strings = line.split("\\s+");            for (String s : strings) {                k1.set(s);                context.write(k1,v1);           }       }   } ​    public static class WCReducer extends Reducer<Text, IntWritable,Text, IntWritable> {        int count;        IntWritable v2 = new IntWritable(); ​        @Override        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {            count = 0;            for (IntWritable value : values) {                count += value.get();           }            v2.set(count);            context.write(key,v2);       }   } ​    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { ​        Configuration conf = new Configuration();        Job job = Job.getInstance(conf); ​        job.setJarByClass(WC.class); ​        job.setMapperClass(WCMapper.class);        job.setReducerClass(WCReducer.class); ​        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class); ​        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class); ​        FileInputFormat.setInputPaths(job,new Path(args[0]));        FileOutputFormat.setOutputPath(job,new Path(args[1])); ​        job.waitForCompletion(true);   } ​ }

进行分区:

package com.henu; ​ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; ​ import java.io.IOException; ​ /** * @author George * @description **/ public class WC { ​    public static class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable>{        Text k1 = new Text();        IntWritable v1 = new IntWritable(1); ​        @Override        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            String line = value.toString();            String[] strings = line.split("\\s+");            for (String s : strings) {                k1.set(s);                context.write(k1,v1);           }       }   } ​    public static class WCReducer extends Reducer<Text, IntWritable,Text, IntWritable> {        int count;        IntWritable v2 = new IntWritable(); ​        @Override        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {            count = 0;            for (IntWritable value : values) {                count += value.get();           }            v2.set(count);            context.write(key,v2);       }   } ​    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { ​        Configuration conf = new Configuration();        Job job = Job.getInstance(conf); ​        job.setJarByClass(WC.class); ​        job.setMapperClass(WCMapper.class);        job.setReducerClass(WCReducer.class); ​        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class); ​        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class); ​        //map阶段设置分区        job.setPartitionerClass(MyPartitoner.class);        job.setNumReduceTasks(2); ​        FileInputFormat.setInputPaths(job,new Path(args[0]));        FileOutputFormat.setOutputPath(job,new Path(args[1])); ​        job.waitForCompletion(true);   } ​    private static class MyPartitoner extends Partitioner<Text,IntWritable> {        @Override        public int getPartition(Text text, IntWritable intWritable, int i) {            String kStr = text.toString();            return kStr.equalsIgnoreCase("hello")?0:1;       }   } }

发送到linux上运行:

yarn jar henu-1.0-SNAPSHOT.jar com.henu.WC /hello /abc 

 

 

最新回复(0)