Spark RDD

mac2024-03-06  30

Resilient Distributed Datasets (RDDs) 应用程序中创建RDD和外部文件创建RDD

| |-定义: | | | |--Spark revolves around the concept of a resilient distributed dataset (RDD), | |--which is a fault-tolerant collection of elements that can be operated on in parallel. | |-创建RDD的两种方式 | | | |--1.parallelizing an existing collection in your driver program | |--2.referencing a dataset in an external storage system | | such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat. | |-Parallelized Collections(parallelizing an existing collection) | | | |--created by calling SparkContext’s parallelize method on an existing collection. | | The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. | | | |--For example: | | val data = Array(1, 2, 3, 4, 5) | | val distData = sc.parallelize(data) | | | |--Once created, the distributed dataset (distData) can be operated on in parallel. | | distData.reduce((a, b) => a + b) | | | |--the number of partitions:One important parameter for parallel collections. | | | | | |---Normally, Spark tries to set the number of partitions automatically based on your cluster. | | |---you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). | |-Text file RDDs(External Datasets) | | | |--can be created using SparkContext’s textFile method. | | This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) | | and reads it as a collection of lines. | | | |--For example: | | val distFile = sc.textFile("data.txt") | | | |--Once created, we can add up the sizes of all the lines using the map and reduce operations as follows: | | distFile.map(s => s.length).reduce((a, b) => a + b) | | | |--The textFile method also takes an optional second argument for controlling the number of partitions of the file. | | | |--By default, Spark creates one partition for each block of the file | | but you can also ask for a higher number of partitions by passing a larger value. | | Note that you cannot have fewer partitions than blocks. | | | |--Notes|---If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. | | |---Either copy the file to all workers or use a network-mounted shared file system. | |-wholeTextFiles RDDs(External Datasets):SparkContext.wholeTextFiles | | | |--lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. | | This is in contrast with textFile, which would return one record per line in each file. | | | |--Partitioning is determined by data locality which, in some cases, may result in too few partitions. | | For those cases, wholeTextFiles provides an optional second argument for controlling the minimal number of partitions. | |-sequenceFile RDDs(External Datasets):SparkContext’s sequenceFile[K, V] method | | | |--K and V are the types of key and values in the file. | | | |--These should be subclasses of Hadoop’s Writable interface, like IntWritable and Text. | | In addition, Spark allows you to specify native types for a few common Writables; | | for example, sequenceFile[Int, String] will automatically read IntWritables and Texts. | | | |-SparkContext.hadoopRDD For other Hadoop InputFormats | | | |--For other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method, | | which takes an arbitrary JobConf and input format class, key class and value class. | | Set these the same way you would for a Hadoop job with your input source. | | | |--You can also use SparkContext.newAPIHadoopRDD for InputFormats based on the “new” MapReduce API (org.apache.hadoop.mapreduce). | | | |-RDD.saveAsObjectFile and SparkContext.objectFile | | | |--support saving an RDD in a simple format consisting of serialized Java objects. | | While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.

RDD Operations(Basic)

| |-两种操作类型(懒类型和强制计算类型) | | | |--transformations, which create a new dataset from an existing one | | | | map is a transformation that passes each dataset element through a function | | and returns a new RDD representing the results. | | | |--actions, which return a value to the driver program after running a computation on the dataset. | | | | reduce is an action that aggregates all the elements of the RDD using some function | | and returns the final result to the driver program | | (although there is also a parallel reduceByKey that returns a distributed dataset) | |-懒类型操作的作用 | | | |--This design enables Spark to run more efficiently. | | | | For example, we can realize that a dataset created through map will be used in a reduce | | and return only the result of the reduce to the driver, | | rather than the larger mapped dataset. | |-Demo|--val lines = sc.textFile("data.txt") | | //dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. | | | | val lineLengths = lines.map(s => s.length) | | //lineLengths is not immediately computed, due to laziness. | | | | val totalLength = lineLengths.reduce((a, b) => a + b) | | //At this point Spark breaks the computation into tasks to run on separate machines | | //each machine runs both its part of the map and a local reduction, | | //returning only its answer to the driver program. | | | |--lineLengths.persist() | | //lineLengths to be saved in memory after the first time it is computed.

RDD Operations(Passing Functions to Spark)

| |-给SparkContext传递函数的两种方式 | | | |--1.匿名方式:适合代码量少的场景 | | | |--2.1 Static methods in a global singleton object. | | For example, you can define object MyFunctions and then pass MyFunctions.func1, as follows: | | | | | |---object MyFunctions { | | | def func1(s: String): String = { ... } | | | } | | | | | | myRdd.map(MyFunctions.func1) | | | |--2.2 it is also possible to pass a reference to a method in a class instance | | this requires sending the object that contains that class along with the method. | | | | | |---class MyClass { | | | def func1(s: String): String = { ... } | | | def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) } | | | }

RDD Operations(Understanding closures)

| |-The closure is those variables and methods | which must be visible for the executor to perform its computations on the RDD. | |-For Example|--var counter = 0 | | var rdd = sc.parallelize(data) | | | | //may not work as intended. | | //To ensure well-defined behavior in these sorts of scenarios one should use an Accumulator. | | rdd.foreach(x => counter += x) | | println("Counter value: " + counter) | |-The executors only see the copy from the serialized closure. | Thus, the final value of counter will still be zero since all operations | on counter were referencing the value within the serialized closure. | |-In local mode, in some circumstances, the foreach function will actually execute within the same JVM as the driver | and will reference the same original counter, and may actually update it.

Printing elements of an RDD

| |-分析rdd.foreach(println) or rdd.map(println). | | | |--On a single machine, this will generate the expected output and print all the RDD’s elements. | | | |--In cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, | | not the one on the driver, so stdout on the driver won’t show these! | | //one can use the collect() method to first bring the RDD to the driver node thus: |-rdd.collect().foreach(println). | | | |--This can cause the driver to run out of memory, | | though, because collect() fetches the entire RDD to a single machine; | | //if you only need to print a few elements of the RDD, a safer approach is to use the take(): |-rdd.take(100).foreach(println).

"shuffle"(Working with Key-Value Pairs)

| |-The most common ones are distributed "shuffle" operations, | such as grouping or aggregating the elements by a key. | |-The key-value pair operations are available in the PairRDDFunctions class, which automatically wraps around an RDD of tuples. | | | |--val lines = sc.textFile("data.txt") | | val pairs = lines.map(s => (s, 1)) | | //to count how many times each line of text occurs | | val counts = pairs.reduceByKey((a, b) => a + b) | | //ort the pairs alphabetically | | counts.sortByKey() | | //bring them back to the driver program as an array of objects | | counts.collect()
最新回复(0)