2019-05-07 18:56:18 1 package com.amoscloud.log.analyze
2
3 import java.text.SimpleDateFormat
4 import java.util.Date
5
6 import org.apache.spark.rdd.RDD
7 import org.apache.spark.{SparkConf, SparkContext}
8
9 object LogAnalyze1 {
10 def main(args: Array[String]): Unit =
{
11
12
13 val conf =
new SparkConf().setMaster(
"local[2]").setAppName(
"LogAnalyze2")
14 val sc =
new SparkContext(conf)
15
16 val data = sc.textFile(
"C:\\Users\\Administrator\\Desktop\\HTTP.txt")
17 data.cache()
18 // 1.(手机号,归属地,设备品牌,设备型号,连接时长)
19 // analyze1(data)
20 // 2.(时间段秒,访问流量)
21 analyze2(data)
22 // 3.(品牌,Array[(String,Int)]((型号1,个数1),(型号2,个数2)))
23 // analyze(data)
24 }
25
26 private def analyze(data: RDD[String]) =
{
27 data.filter(_.split(
",").length >=
72)
28 .map(x =>
{
29 val arr = x.split(
",")
30 val brand = arr(
70)
31 val model = arr(
71)
32 ((brand, model),
1)
33 })
34 .reduceByKey(_ +
_)
35 .map(t =>
{
36 val k =
t._1
37 (k._1, (k._2, t._2))
38 })
39 .groupByKey()
40 .collect()
41 .
foreach(println)
42 }
43
44 private def analyze2(data: RDD[String]) =
{
45 data.map(x =>
{
46 val arr = x.split(
",")
47 val time = arr(
16).take(arr(
16).length -
4)
48 val flow = arr(
7).toLong
49 (time, flow)
50 })
51 .reduceByKey(_ +
_)
52 // .map(x => (x._1, (x._2 / 1024.0).formatted("%.3f") + "KB"))
53 .map(x =>
(x._1, x._2))
54 .collect()
55 .
foreach(println)
56 }
57
58 private def analyze1(data: RDD[String]) =
{
59 data
60 .filter(_.split(
",").length >=
72)
61 .map(x =>
{
62 val arr = x.split(
",")
63 val phoneNum = arr(
3).takeRight(
11)
64 val local = arr(
61) + arr(
62) + arr(
63)
65 val brand = arr(
70)
66 val model = arr(
71)
67 val connectTime = timeDiff(arr(
16), arr(
17))
68 (phoneNum +
"|" + local +
"|" + brand +
"|" +
model, connectTime)
69 // 1.(手机号,归属地,设备品牌,设备型号,连接时长)
70 })
71 .reduceByKey(_ +
_)
72 .map(t =>
(t._1, formatTime(t._2)))
73 .collect()
74 .
foreach(println)
75 }
76
77 def timeDiff(time1: String, time2: String): Long =
{
78 val sdf =
new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss")
79 val timeStamp2 = sdf.parse(time2.take(time2.length -
4)).getTime + time2.takeRight(
3).toLong
80 val timeStamp1 = sdf.parse(time1.take(time1.length -
4)).getTime + time1.takeRight(
3).toLong
81 timeStamp2 -
timeStamp1
82 }
83
84
85 def formatTime(time: Long): String =
{
86 val timeS = time /
1000
87 val s = timeS %
60
88 val m = timeS /
60 %
60
89 val h = timeS /
60 /
60 %
24
90 h +
":" + m +
":" +
s
91 }
92
93 }
2:写spark程序统计iis网站请求日志中 每天每个小时段成功访问ip的数量
package com.amoscloud.log.analyze
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import scala.collection.mutable
object LogAnalyze {
def main(args: Array[String]): Unit =
{
// 写spark程序统计iis网站请求日志中 每天每个小时段成功访问ip的数量
//获取sc
val conf =
new SparkConf().setAppName(
"LogAnalyze").setMaster(
"local[2]")
val sc =
new SparkContext(conf)
//读取数据
val log: RDD[String] = sc.textFile(
"C:\\Users\\Administrator\\Desktop\\iis网站请求日志")
//将日志中,日期,时间,IP和响应码 保留
log
.filter(_.split("\\s").length >
10)
.map(line =>
{
val strings = line.split(
"\\s+")
//RDD[(String,String,String,String)]
(strings(
0), strings(
1).split(
":")(
0), strings(
8), strings(
10))
})
//RDD[(String,String,String,String)]
.filter(_._4 ==
"200")
//RDD[(日期|时间,IP)]
.map(t => (t._1 +
"|" +
t._2, t._3))
//RDD[(日期|时间,Iterable[IP])]
.groupByKey()
.map(t =>
(t._1, t._2.toList.size, t._2.toList.distinct.size))
.collect()
.foreach(t =>
{
val spl = t._1.split(
"\\|")
printf("%s\t%s\t%d\t%d\n", spl(
0), spl(
1), t._2, t._3)
})
//数据按照 日期和时间进行分区 相同key的数据都在同一个分区中
// .partitionBy(new HashPartitioner(48))
// .mapPartitions((iter: Iterator[(String, String)]) => {
// val set = mutable.HashSet[String]()
// var count = 0
// var next = ("", "")
// while (iter.hasNext) {
// next = iter.next()
// count += 1
// set.add(next._2)
// }
// ((next._1, count, set.size) :: Nil).iterator
// })
// .filter(_._1.nonEmpty)
}
}
更灵活的运用spark算子,意味着写更少的代码
2019-05-07 19:06:57
转载于:https://www.cnblogs.com/Vowzhou/p/10827349.html
相关资源:大数据处理技术网页数据清洗及分词