过滤脏数据并统计

mac2024-05-10  7

统计日志数据中的脏数据 现在我们手头上有一个日志文件,里面只有3个字段分别是url,时间,流量。在日志文件里面这些都是字符串,所以无论是时间还是流量都可能是一些不可用的格式,现在我们要统计出有多少脏数据并保留。 先贴代码吧,这里很多东西我都是写死了,大家就当个简单例子来看看吧: package sparkHomWork import java.io.{File, PrintWriter} import java.text.SimpleDateFormat import org.apache.spark.{SparkConf, SparkContext} object LogStat {   def main(args: Array[String]): Unit = {   //正常生产上就别这样写啦     val conf = new SparkConf().setMaster("local[2]").setAppName("test")     val sc = new SparkContext(conf)     //读取一个本地日志文件     val file = sc.textFile("logs")     //对每一行的数据进行切分     val lines = file.map(_.split("\t"))     //这里我缓存了一下这个数据,其实我就是手痒写写的,实际上没啥事的     lines.persist()     val num = lines.count()     //我们要统计脏数据的数量,所以我在这里定义了一个计数器     val accum = sc.longAccumulator("Error Accum")     //使用过滤器来将非脏数据过滤掉,并同时累计脏数据条数     val list = lines.filter(x => {       try{         val fm = new SimpleDateFormat("yy-MM-dd HH:mm:ss")         val tm = x(1)         val dt = fm.parse(tm)         val value = x(2).toInt         false       }catch {         case e:Exception => {accum.add(1L);true}       }     })     //这里其实可以将RDD里面的数据处理一下,使用saveAsTextFile这个算子,我就是手痒才     list.map(x => (x(0) + "\t" + x(1) + "\t" + x(2)).saveAsTextFile("errorLogs")     val end = list.collect()     val writer = new PrintWriter(new File("errorNum"))     writer.printer(accum.value)     writer.flush()     writer.close()     println("the number of the error :" + accum.value)     val writer = new PrintWriter(new File("corNum"))     writer.printer(num)     writer.flush()     writer.close()     sc.stop()   } } 里面的实现过程就是,当我将log数据转成RDD后,先对数据进行切分成3个部分:url,时间,流量。但这个时候都是字符串,我们要将其转数据类型。我把这个过程放到了一个try-catch块里面了,如果在转换的过程中抛出了异常,那么这个数据可能就是脏数据,这个时候catch捕获到这个异常对累加器添加一个位数,同时返回一个true给filter函数,在RDD中保留下来。经过上面这个过程后,我们list这个rdd里面就剩下脏数据了,同时我们也有了脏数据的条数了(从累加器里面拿)。 其实,上面的代码还是可以再优化一点,比如不需要在每个元素的过滤时创建一个new SimpleDateFormat(“yy-MM-dd HH:mm:ss”)对象,我们可以在main方法里面new出来,再用广播变量将这个对象广播到各个exeecutor上面,既然要广播一个对象就要用到序列化,我们也是可以使用kryo序列器来序列化这个对象,但是要记得提前注册好。  

最新回复(0)