交通运输Mapreduce

mac2024-03-26  25

map:

package com.traffic; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.io.IntWritable; public class MyTrafficMapper extends Mapper<LongWritable,Text,Text,Text> { String title = ""; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (key.toString().equals("0")) { title = value.toString(); } else { String[] data = value.toString().split(","); int sumyr1Index = FindIndex("SUM_YR_1"); int sumyr2Index = FindIndex("SUM_YR_2"); int avgIndex = FindIndex("avg_discount"); int segIndex = FindIndex("SEG_KM_SUM"); String sumyr1 = data[sumyr1Index]; String sumyr2 = data[sumyr2Index]; String avg = data[avgIndex]; String seg = data[segIndex]; if (!(sumyr1.equals("")&&sumyr2.equals(""))){ if (sumyr1.equals("0")||sumyr2.equals("0")|| Float.parseFloat(avg)==0||Float.parseFloat(seg)<=0){ }else { context.write(new Text(data[FindIndex("FFP_DATE")]+","+data[FindIndex("LOAD_TIME")]+","+data[FindIndex("FLIGHT_COUNT")]+","+avg+","+seg+","+data[FindIndex("LAST_TO_END")]),new Text("")); } } } } private int FindIndex(String target) { String[] data=title.split(","); for(int i=0;i<44;i++){ if (data[i].equals(target)){ return i; } } return 0; } }

reduce:

package com.traffic; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyTrafficReduce extends Reducer<Text, Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int count=0; for (Text value : values) { //count+=value.get(); } context.write(key,new Text(count+"")); } }

主类:

package com.traffic; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MyTrafficDrive { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(); Job job= Job.getInstance(); job.setJarByClass(MyTrafficDrive.class); job.setMapperClass(MyTrafficMapper.class); //job.setReducerClass(MyTrafficReduce.class); SetJobs(job); Path path=SetPath(job); path.getFileSystem(conf).delete(path,true); System.exit(job.waitForCompletion(true)?0:1); } private static Path SetPath(Job job) throws IOException { FileInputFormat.addInputPath(job,new Path("E:\\data1")); Path path=new Path("E:\\data1\\out"); FileOutputFormat.setOutputPath(job,path); return path; } private static void SetJobs(Job job) { job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //job.setOutputKeyClass(Text.class); //job.setOutputValueClass(IntWritable.class); } }
最新回复(0)