数据
zhan.txt zhan.txt其实就是预处理二所得的数据,因为不想把原来的数据所替换,所以重命名为zhan.txt了
"uid":"131192622122401792" "platform":"Android" "app_version":"1007030202" "pid":"5616" "cityid":"626"
"uid":"131192622122401792" "platform":"Android" "app_version":"1007030202" "pid":"5616" "cityid":"626"
"uid":"131192622122401792" "platform":"Android" "app_version":"1007030202" "pid":"5616" "cityid":"626"
"uid":"142873087346606080" "platform":"Android" "app_version":"1007090002" "pid":"5057" "cityid":"86"
"uid":"142873087346606080" "platform":"Android" "app_version":"1007090002" "pid":"5057" "cityid":"86"
"uid":"142873087346606080" "platform":"Android" "app_version":"1007090002" "pid":"5057" "cityid":"86"
"uid":"142873087346606080" "platform":"Android" "app_version":"1007090002" "pid":"5057" "cityid":"86"
"uid":"142873087346606080" "platform":"Android" "app_version":"1007090002" "pid":"5057" "cityid":"86"
"uid":"142873087346606080" "platform":"Android" "app_version":"1007090002" "pid":"5057" "cityid":"86"
"uid":"142873087346606080" "platform":"Android" "app_version":"1007090002" "pid":"5057" "cityid":"86"
"uid":"142873087346606080" "platform":"Android" "app_version":"1007090002" "pid":"5057" "cityid":"86"
"uid":"161350486564405248" "platform":"Android" "app_version":"1007060402" "pid":"8888" "cityid":"1750"
"uid":"161350486564405248" "platform":"Android" "app_version":"1007060402" "pid":"8888" "cityid":"1750"
cityid.txt文件的数据如下:
1701|桐城市
|桐城市
|安徽
|中国
|安庆市
|华东地区
|四线城市
|31.05228|116.93861
1702|宿松县
|宿松县
|安徽
|中国
|安庆市
|华东地区
|四线城市
|30.151213|116.1142
1703|枞阳县
|枞阳县
|安徽
|中国
|安庆市
|华东地区
|四线城市
|30.69371|117.21059
1704|太湖县
|太湖县
|安徽
|中国
|安庆市
|华东地区
|四线城市
|30.420059|116.26508
1705|怀宁县
|怀宁县
|安徽
|中国
|安庆市
|华东地区
|四线城市
|30.409006|116.64709
1706|岳西县
|岳西县
|安徽
|中国
|安庆市
|华东地区
|四线城市
|30.857161|116.35818
1707|望江县
|望江县
|安徽
|中国
|安庆市
|华东地区
|四线城市
|30.123537|116.67433
1708|潜山县
|潜山县
|安徽
|中国
|安庆市
|华东地区
|四线城市
|30.630346|116.5672
5317|迎江区
|迎江区
|安徽
|中国
|安庆市
|华东地区
|四线城市
|30.511548|117.09115
5318|大观区
|大观区
|安徽
|中国
|安庆市
|华东地区
|四线城市
|30.553957|117.02167
1691|怀远县
|怀远县
|安徽
|中国
|蚌埠市
|华东地区
|四线城市
|32.95665|117.19356
1692|固镇县
|固镇县
|安徽
|中国
|蚌埠市
|华东地区
|四线城市
|33.314575|117.31171
1693|五河县
|五河县
|安徽
|中国
|蚌埠市
|华东地区
|四线城市
|33.139736|117.88253
题目要求
此为数据处理的常用方式,进行两个数据集的关联。将把log.log文件处理后的文件中city的值对应编号关联匹配数据cityid.txt,将城市编码替换为城市名称输出。
代码
package com
.mr2
;
import java
.io
.IOException
;
import java
.util
.Vector
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileSplit
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
public class preThree {
public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>
{
private FileSplit inputsplit
;
protected void map(LongWritable key
,Text value
,Context context
) throws IOException
,InterruptedException
{
inputsplit
= (FileSplit
)context
.getInputSplit();
String filename
= inputsplit
.getPath().getName();
if(filename
.contains("zhan"))
{
String s
= value
.toString();
String
[] split
= s
.split(" ");
String
[] m
= split
[4].split(":");
if(m
[1].length()>0)
{
String joinkey
= m
[1].substring(1,m
[1].length()-1);
String joinvalue
= "zhan"+split
[0]+" "+split
[1]+" "+split
[2]+" "+split
[3];
context
.write(new Text(joinkey
),new Text(joinvalue
));
}
}
if(filename
.contains("city"))
{
String s
= value
.toString();
String
[] split
= s
.split("\\|");
context
.write(new Text(split
[0]),new Text("cityid"+split
[5]));
}
}
}
public static class MyReduce extends Reducer<Text,Text,Text,Text>
{
protected void reduce(Text k2
,Iterable
<Text>v2
,Context context
) throws IOException
,InterruptedException
{
Vector
<String> vecA
= new Vector<String>();
Vector
<String> vecB
= new Vector<String>();
for(Text value
: v2
)
{
String line
= value
.toString();
if(line
.startsWith("zhan"))
{
vecA
.add(line
.substring("zhan".length()));
}
if(line
.startsWith("cityid"))
{
vecB
.add(line
.substring("cityid".length()));
}
}
for(String s1
: vecA
)
{
for(String s2
: vecB
)
{
context
.write(new Text(s1
),new Text("\"cityid\""+":"+"\""+s2
+"\""));
}
}
}
}
public static void main(String
[] args
) throws IOException
, ClassNotFoundException
, InterruptedException
{
Configuration conf
= new Configuration();
Job job
= Job
.getInstance(conf
,preThree
.class.getSimpleName());
job
.setJarByClass(preThree
.class);
job
.setMapperClass(MyMapper
.class);
job
.setReducerClass(MyReduce
.class);
job
.setMapOutputKeyClass(Text
.class);
job
.setMapOutputValueClass(Text
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputValueClass(Text
.class);
FileInputFormat
.addInputPath(job
,new Path(args
[0]));
FileOutputFormat
.setOutputPath(job
,new Path(args
[1]));
job
.waitForCompletion(true);
}
}
注意: 在对两个文件进行取joinkey时一定要确保一致,否则经过reduce()函数之后,文件没有输出。我就这困扰了很久,取zhan.txt的joinkey时带了双引号,如"9541",而取cityid.txt的时候却没有带双引号,如9541。
结果