统计日志数据中的脏数据 现在我们手头上有一个日志文件,里面只有3个字段分别是url,时间,流量。在日志文件里面这些都是字符串,所以无论是时间还是流量都可能是一些不可用的格式,现在我们要统计出有多少脏数据并保留。 先贴代码吧,这里很多东西我都是写死了,大家就当个简单例子来看看吧: package sparkHomWork import java.io.{File, PrintWriter} import java.text.SimpleDateFormat import org.apache.spark.{SparkConf, SparkContext} object LogStat { def main(args: Array[String]): Unit = { //正常生产上就别这样写啦 val conf = new SparkConf().setMaster("local[2]").setAppName("test") val sc = new SparkContext(conf) //读取一个本地日志文件 val file = sc.textFile("logs") //对每一行的数据进行切分 val lines = file.map(_.split("\t")) //这里我缓存了一下这个数据,其实我就是手痒写写的,实际上没啥事的 lines.persist() val num = lines.count() //我们要统计脏数据的数量,所以我在这里定义了一个计数器 val accum = sc.longAccumulator("Error Accum") //使用过滤器来将非脏数据过滤掉,并同时累计脏数据条数 val list = lines.filter(x => { try{ val fm = new SimpleDateFormat("yy-MM-dd HH:mm:ss") val tm = x(1) val dt = fm.parse(tm) val value = x(2).toInt false }catch { case e:Exception => {accum.add(1L);true} } }) //这里其实可以将RDD里面的数据处理一下,使用saveAsTextFile这个算子,我就是手痒才 list.map(x => (x(0) + "\t" + x(1) + "\t" + x(2)).saveAsTextFile("errorLogs") val end = list.collect() val writer = new PrintWriter(new File("errorNum")) writer.printer(accum.value) writer.flush() writer.close() println("the number of the error :" + accum.value) val writer = new PrintWriter(new File("corNum")) writer.printer(num) writer.flush() writer.close() sc.stop() } } 里面的实现过程就是,当我将log数据转成RDD后,先对数据进行切分成3个部分:url,时间,流量。但这个时候都是字符串,我们要将其转数据类型。我把这个过程放到了一个try-catch块里面了,如果在转换的过程中抛出了异常,那么这个数据可能就是脏数据,这个时候catch捕获到这个异常对累加器添加一个位数,同时返回一个true给filter函数,在RDD中保留下来。经过上面这个过程后,我们list这个rdd里面就剩下脏数据了,同时我们也有了脏数据的条数了(从累加器里面拿)。 其实,上面的代码还是可以再优化一点,比如不需要在每个元素的过滤时创建一个new SimpleDateFormat(“yy-MM-dd HH:mm:ss”)对象,我们可以在main方法里面new出来,再用广播变量将这个对象广播到各个exeecutor上面,既然要广播一个对象就要用到序列化,我们也是可以使用kryo序列器来序列化这个对象,但是要记得提前注册好。