Spark 中的 Kryo 磁盘序列化

mac2025-04-21  2

在 Apache Spark 中,对于大数据应用程序,建议使用 Kryo 序列化而不是 java 序列化。与 java 序列化相比,当您移动和缓存大量数据时,与 java 序列化相比,Kryo 占用的内存更少。

虽然 kryo 支持 RDD 缓存 和 shuffle,但它本身并不支持序列化到磁盘。RDD 上的 saveAsObjectFile 方法和 SparkContext 上的 objectFile 方法都只支持 java 序列化。

随着自定义数据类型数量的增加,支持多个序列化变得非常繁琐。因此,如果我们可以在任何地方使用 kryo 序列化,那就太好了。在这篇文章中,我们将讨论如何使用 kryo 序列化来保存和读取磁盘。

Write

通常我们使用 rdd.saveAsObjectFile api将序列化的对象保存到磁盘中。下面的代码展示了如何编写自己的 saveAsObjectFile 方法,该方法将对象保存为 kryo 序列化格式。

def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String)

我们将写入的 rdd 和输出路径作为输入参数。

val kryoSerializer = new KryoSerializer(rdd.context.getConf)

KryoSerializer 是 spark 提供的一个帮助类,用于处理 kryo。我们创建一个 KryoSerializer 的实例,它配置所需缓冲区大小。

rdd.mapPartitions(iter => iter.grouped(10) .map(_.toArray)) .map(splitArray => {}

每个 objectFile 都保存为 HDFS 序列文件。因此,我们循环遍历每个 rdd 分割,然后将这些分割转换为字节数组。

val kryo = kryoSerializer.newKryo()

对于每个 splitArray,首先创建一个 kryo 实例。kryo 实例不是线程安全的。这就是为什么我们为每个 map 操作创建一个。当我们调用 kryoSerializer.newKryo() 时,它会创建一个新的 kryo 实例,并调用我们的自定义注册器(如果有的话)。

//create output stream and plug it to the kryo output val bao = new ByteArrayOutputStream() val output = kryoSerializer.newKryoOutput() output.setOutputStream(bao) kryo.writeClassAndObject(output, splitArray) output.close()

一旦我们有了 kryo 实例,我们就创建 kryo 输出。然后我们写入类信息并对输出进行处理。

val byteWritable = new BytesWritable(bao.toByteArray) (NullWritable.get(), byteWritable) }).saveAsSequenceFile(path)

一旦我们有了 kryo 的字节表示,我们就把 bytearray 封装到 BytesWritable 中并保存为序列文件。

因此,只需几行代码,现在就可以将kryo对象保存到磁盘中。

Read

如果你不仅仅把数据写入磁盘。您还应该能够从这些数据创建 RDD。通常我们在 sparkContext 上使用 objectFile api 从磁盘读取数据。在这里,我们将编写自己的 objectFile api 来读取 kryo 对象文件。

def objectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)(implicit ct: ClassTag[T]) = { val kryoSerializer = new KryoSerializer(sc.getConf) sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x => { val kryo = kryoSerializer.newKryo() val input = new Input() input.setBuffer(x._2.getBytes) val data = kryo.readClassAndObject(input) val dataObject = data.asInstanceOf[Array[T]] dataObject }) }

大多数步骤与写入相同,唯一的区别是我们使用输入而不是使用输出。我们从 BytesWritable 读取字节并使用 readClassAndObject api 进行反序列化。

Example

下面的示例使用上述两个方法来序列化和反序列化一个名为 Person 的自定义对象。

// user defined class that need to serialized class Person(val name: String) def main(args: Array[String]) { if (args.length < 1) { println("Please provide output path") return } val outputPath = args(0) val conf = new SparkConf().setMaster("local").setAppName("kryoexample") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(conf) //create some dummy data val personList = 1 to 10000 map (value => new Person(value + "")) val personRDD = sc.makeRDD(personList) saveAsObjectFile(personRDD, outputPath) val rdd = objectFile[Person](sc, outputPath) println(rdd.map(person => person.name).collect().toList) }

因此,如果您在您的项目中使用kryo序列化,现在您也可以将相同的序列化保存到磁盘中。

附录代码

Write

package com.madhu.spark.kryo import java.io.ByteArrayOutputStream import com.esotericsoftware.kryo.io.Input import org.apache.hadoop.io.{BytesWritable, NullWritable} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.KryoSerializer import org.apache.spark.{SparkConf, SparkContext} import scala.reflect.ClassTag /** * Example code showing how to kryo serialization for disk */ object KryoExample { def main(args: Array[String]) { if (args.length < 1) { println("Please provide output path") return } val outputPath = args(0) val conf = new SparkConf().setMaster("local").setAppName("kryoexample") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(conf) //create some dummy data val personList = 1 to 10000 map (value => new Person(value + "")) val personRDD = sc.makeRDD(personList) saveAsObjectFile(personRDD, outputPath) val rdd = objectFile[Person](sc, outputPath) println(rdd.map(person => person.name).collect().toList) } /* * Used to write as Object file using kryo serialization */ def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) { val kryoSerializer = new KryoSerializer(rdd.context.getConf) rdd.mapPartitions(iter => iter.grouped(10) .map(_.toArray)) .map(splitArray => { //initializes kyro and calls your registrator class val kryo = kryoSerializer.newKryo() //convert data to bytes val bao = new ByteArrayOutputStream() val output = kryoSerializer.newKryoOutput() output.setOutputStream(bao) kryo.writeClassAndObject(output, splitArray) output.close() // We are ignoring key field of sequence file val byteWritable = new BytesWritable(bao.toByteArray) (NullWritable.get(), byteWritable) }).saveAsSequenceFile(path) } /* * Method to read from object file which is saved kryo format. */ def objectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)(implicit ct: ClassTag[T]) = { val kryoSerializer = new KryoSerializer(sc.getConf) sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x => { val kryo = kryoSerializer.newKryo() val input = new Input() input.setBuffer(x._2.getBytes) val data = kryo.readClassAndObject(input) val dataObject = data.asInstanceOf[Array[T]] dataObject }) } // user defined class that need to serialized class Person(val name: String) }

 

最新回复(0)