Spark性能调优系列:(八)开发调优(使用Kryo优化序列化性能)

mac2024-06-04  64

使用Kryo优化序列化性能

Spark主要有三个地方涉及序列化:

1.算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。 2.将自定义类型作为RDD的泛型类型时(比如Student是自定义类型),所有自定义类型对象,都会进行序列化。因此该情况下,也要求自定义的类必须实现Serializable接口。 3.使用可序列化的持久策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

这几个地方,都可以使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制, 也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但Spark同时支持使用Kryo序列化库,Kryo序列化机制比Java序列化机制性能高10倍左右。

Spark没有默认使用Kryo作为序列化类库,因为Kryo要求最好注册所有需要进行序列化的自定义类型。案例:

package com.kevin.scala.tuning import org.apache.spark.{SparkConf, SparkContext} /** * 使用Kryo优化序列化性能 */ object KryoTuning { def main(args: Array[String]): Unit = { // 创建SparkConf val conf = new SparkConf().setAppName("KryoTuning").setMaster("local") // 设置Spark序列化方式为Kryo conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") // 注册要序列化的自定义类型 conf.registerKryoClasses(Array(classOf[ClassDemo1], classOf[ClassDemo2])) // 创建SparkContext val sc = new SparkContext(conf) // 如果注册的要序列化自定义的类型本身非常大,比如属性有上百个,那么就会导致序列化的对象过大。 // 此时需要对Kryo本身进行优化,因为Kryo内部的缓存可能不够存放那么大的class对象,此时就需要调用SparkConf.set() // 设置spark.kryoSerializer.buffer.mb参数的值,将其调大。默认为2,就是最大能缓存2M的对象,然后序列化,我们可以加大缓存上限 sc.stop() } } class ClassDemo1{ val field1 = "" val field2 = "" val field3 = "" val field4 = "" val field5 = "" // ... } class ClassDemo2{ }

 

最新回复(0)