RDD的三种创建方式

mac2025-09-26  18

1.从集合中创建

Spark主要提供了两种函数:parallelize和makeRDD

/** Distribute a local Scala collection to form an RDD. * * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call * to parallelize and before the first action on the RDD, the resultant RDD will reflect the * modified collection. Pass a copy of the argument to avoid this. * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions. */ def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } /** Distribute a local Scala collection to form an RDD. * * This method is identical to `parallelize`. */ def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) }

 测试:

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

2.从外部存储创建

外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等

测试:从HDFS创建RDD

scala> val rdd2 = sc.textFile("hdfs://master-node:9000/wordcount.txt") rdd2: org.apache.spark.rdd.RDD[String] = hdfs://master-node:9000/wordcount.txt MapPartitionsRDD[2] at textFile at <console>:24

3.从其他RDD创建

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> val rdd3 = rdd1.map(_*2) rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:26 scala> rdd3.collect res5: Array[Int] = Array(2, 4, 6, 8, 10)

 

最新回复(0)