开发过程中,如果需要在算子函数中使用外部变量的场景(尤其是100m以上的数据),那么此时应该使用广播变量来提升性能。
算子函数中,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本,如果变量本身比较大,那么大量的变量副本在网络中传输的性能开销以及各个节点的Executor中占用过多内存导致的频繁GC,都会极大影响性能。
如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后变量会保证每个Executor的内存中,只驻留一份变量副本,而Executor中task执行时共享该Executor的那份变量副本。这样可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。
package com.kevin.scala import org.apache.spark.{SparkConf, SparkContext} /** * 广播变量 * 将大变量广播出去,广播后变量会保证每个Executor的内存中,只驻留一份变量副本,而Executor中 * task执行时共享该Executor的那份变量副本。这样可以大大减少变量副本的数量,从而减少网络传 * 输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。 */ object BroadCastDemo { def main(args: Array[String]): Unit = { val file = "DTSparkCore\\src\\main\\resources\\records.txt" // 1.创建SparkConf val conf = new SparkConf().setAppName("BroadCastDemo").setMaster("local") // 2.创建SparkContext val sc = new SparkContext(conf) // 3.广播变量将list广播出去 val broadcast = sc.broadcast(List("hello java")) // 4.textFile读取文件数据,filter过滤掉非广播数据,foreach遍历 sc.textFile(file).filter(value => (broadcast.value.contains(value))).foreach(println(_)) // 5.关闭sc sc.stop() } }