(求助帖)写了一个简单的mapreduce程序,遇到这个问题,实在解决不了了,求助大佬们

mac2025-02-09  12

错误如下:

Exception in thread "main" java.io.IOException: The ownership on the staging directory /tmp/hadoop-yarn/staging/root/.staging is not as expected. It is owned by xch. The directory must be owned by the submitter root or by root     at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:120)     at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:144)     at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)     at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)     at java.security.AccessController.doPrivileged(Native Method)     at javax.security.auth.Subject.doAs(Subject.java:422)     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)     at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)     at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)     at com.xch.flownumber.Drivers.main(Drivers.java:42)

Mapper:

package com.xch.flownumber; import java.io.IOException; import java.util.function.LongPredicate; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FlowMapper extends Mapper<LongWritable, Text, Text, Flowbean>{ Flowbean v = new Flowbean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //1.获取一行数据 String line = value.toString(); //2.切割数据 String[] files = line.split("\t"); //3.封装对象 //手机号 String phonenumber = files[1]; //上下行流量 long upFlow = Long.parseLong(files[files.length-3]); long downFlow = Long.parseLong(files[files.length-2]); v.set(upFlow, downFlow); k.set(phonenumber); //4.写出数据 context.write(k, v); } }

Reducer:

package com.xch.flownumber; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowReducer extends Reducer<Text, Flowbean, Text, Flowbean>{ @Override protected void reduce(Text key, Iterable<Flowbean> v, Context context) throws IOException, InterruptedException { // 1372623050 2481 24681 sum long sum_upFlow = 0; long sum_downFlow = 0; for (Flowbean flowbean : v) { sum_upFlow += flowbean.getUpFlow(); sum_downFlow +=flowbean.getDownFlow(); } Flowbean flowbean = new Flowbean(sum_upFlow,sum_downFlow); //输出 context.write(key, flowbean); } }

Drivers:

package com.xch.flownumber; import java.io.FileInputStream; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.xerces.impl.validation.ConfigurableValidationState; public class Drivers { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1.获取job对象 Configuration conf = new Configuration(); //1.设置job运行时要访问的默认文件系统 conf.set("fs.defaultFs","hdfs://hdp-1:9000"); //2.设置job提交到哪去运行 conf.set("mapreduce.framework.name","yarn"); conf.set("yarn.resourcemanager.hostname","hdp-1"); //3.如果要从'windows系统上运行这个job提交客户端程序,则需要加这个跨平台提交的参数 conf.set("mapreduce.app-submission.cross-platform","true"); Job job = Job.getInstance(conf); //2.设置jar包位置 job.setJarByClass(Drivers.class); //3.管理mapper和reducer类 job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //设置Mapper输出的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Flowbean.class); //设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Flowbean.class); //设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("/wordcount/input")); FileOutputFormat.setOutputPath(job, new Path("/wordcount/output")); //提交 boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }

javabean:

package com.xch.flownumber; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class Flowbean implements Writable{ private long upFlow; //上行流量 private long downFlow; //下行流量 private long sumFlow; //总流量 //空参构造,为了以后反射用 public Flowbean() { } public Flowbean(long upFlow , long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow , long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } //序列化方法 @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //反序列化方法 //!!:序列化顺序和反序列化顺序必须保持一致 @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } }

 

最新回复(0)