Spark算子总结

mac2026-06-04  5

参考: https://blog.csdn.net/fortuna_i/article/details/81170565 https://www.cnblogs.com/alexzhang92/p/10750740.html

Transformation

1.map

map的输入变换函数应用于RDD中所有元素,而mapPartitions应用于所有分区。区别于mapPartitions主要在于调用粒度不同。如parallelize(1 to 10, 3),map函数执行10次,而mapPartitions函数执行3次。

val data = sc.parallelize(1 to 10,3) data.map(x=>x+1)

2.mapPartitions(function)

区于foreachPartition(属于Action,且无返回值),而mapPartitions可获取返回值。与map的区别前面已经提到过了,但由于单独运行于RDD的每个分区上(block),所以在一个类型为T的RDD上运行时,(function)必须是Iterator => Iterator类型的方法(入参)。

val data = sc.parallelize(1 to 10,3) def function(it:Iterator[Int]):Iterator[Int]={ for(e <- it) yield e * 2 } val res = data.mapPartitions(function) res.collect 结果:Array(2,4,6...)

3.filter(function)

过滤操作,满足filter内function函数为true的RDD内所有元素组成一个新的数据集。如:filter(a == 1)。

val res = data.filter(_%2==0) res.collect

filterByRange

val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1),("b",6))) val rdd2 = rdd1.filterByRange("b","d") rdd2.collect Array[(String, Int)] = Array((c,3), (d,4), (c,2), (b,6))

4.flatMap(function)

flatMap与map区别在于map为“映射”,而flatMap“先映射,后扁平化”,map对每一次(func)都产生一个元素,返回一个对象,而flatMap多一步就是将所有对象合并为一个对象。

data.flatMap(_ to 10).collect

flatMapValues

对values进行处理,类似flatMap,会将key和每一个分出来的value组成映射

val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4"))) val rdd4 = rdd3.faltMapValues(_.split(" ")).collect rdd4.collect 结果:Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))

5.mapPartitionsWithIndex(function)

与mapPartitions类似,但需要提供一个表示分区索引值的整型值作为参数,因此function必须是(int, Iterator)=>Iterator类型的。

def func(index:Int,it:Iterator[Int]):Iterator[String]{ it.toList.map(x=>"[partId:"+index+",value:"+x+"]").iterator } data.mapPartitionsWithIndex(func)

6.sample(withReplacement, fraction, seed)

采样操作,用于从样本中取出部分数据。withReplacement是否放回,fraction采样比例,seed用于指定的随机数生成器的种子。(是否放回抽样分true和false,fraction取样比例为(0, 1]。seed种子为整型实数。)

val res = data.sample(false,0.5,1) res.collect 结果:Array(1,5,8,9,10)

7.union(otherDataSet)

对于源数据集和其他数据集求并集,不去重。

data.union(Array(1,2,3,4,5))

8.intersection(otherDataSet)

对于源数据集和其他数据集求交集,并去重,且无序返回。

val res = data.intersection(Array(1,2,3,4,5)) res.collect 结果是无序的

9.distinct([numTasks])

返回一个在源数据集去重之后的新数据集,即去重,并局部无序而整体有序返回。

https://blog.csdn.net/Fortuna_i/article/details/81506936

data有三个分区 data.distinct.collect 结果:Array(6,3,9,4,1,7,10,8,5,2)

10.groupByKey([numTasks])

在一个PairRDD或(k,v)RDD上调用,返回一个(k,Iterable)。主要作用是将相同的所有的键值对分组到一个集合序列当中,其顺序是不确定的。groupByKey是把所有的键值对集合都加载到内存中存储计算,若一个键对应值太多,则易导致内存溢出。

在此,用之前求并集的union方法,将pair1,pair2变为有相同键值的pair3,而后进行groupByKey

val pair = sc.parallelize(Array((1,2),(1,3),(2,3),(2,2),(3,1))) pari.groupByKey.collect 结果:Array[(Int,Iterator[Int])]=Array((1,CompactBuffer(2,3)), (2,CompactBuffer(3,2)),(3,CompactBuffer(1)))

11.reduceByKey(function,[numTasks])

​ 与groupByKey类似,却有不同。如(a,1), (a,2), (b,1), (b,2)。

​ groupByKey产生中间结果为( (a,1), (a,2) ), ( (b,1), (b,2) )。而reduceByKey为(a,3), (b,3)。

​ reduceByKey主要作用是聚合,groupByKey主要作用是分组。(function对于key值来进行聚合)

foldByKey

​ 和reduceByKey类似,只不过赋一个初始值rdd1.foldByKey("")(+)

data.reduceByKey((v1,v2)=>v1+v2) data.reduceByKey(_+_) 结果形式:Array((1,3),(2,6))

combineByKey

和reduceByKey的效果相同,reduceByKey底层就是调用combineByKey

rdd6 = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee)) val rdd7 = rdd6.combineByKey(List(_),(x:List[String],y:String)=>x :+ y,(m:List[String],n:List[String])=>m++n) List(_)也可以这么写,_::List();_::Nil res7: Array[(Int, List[String])] = Array((1,List(dog, cat, turkey)), (2,List(wolf, bear, bee, salmon, rabbit, gnu)))

12.aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

类似reduceByKey,对pairRDD中想用的key值进行聚合操作,使用初始值(seqOp中使用,而combOpenCL中未使用)对应返回值为pairRDD,而区于aggregate(返回值为非RDD)

第一个参数是对每个分区 每个key产生一个新的(k,v),相同分区下,相同key只会产生一次 val pair = sc.parallelize(Array((1,2),(1,3),(2,3),(2,2),(3,1),(3,2)),3) val pairRdd = pair.aggregateByKey(0)(math.max(_,_),_+_) 分区情况 ArrayBuffer(0:(1,2), 0:(1,3), 1:(2,3), 1:(2,2), 2:(3,1), 2:(3,2)) 结果: (3,2) (1,3) (2,3) ---------------------------------------- val pairRdd = pair.aggregateByKey(3)(math.max(_,_),_+_) 结果: (3,3) (1,3) (2,3) ------------------------------------------ val pair = sc.parallelize(Array((1,2),(1,3),(2,3),(2,2),(3,1),(3,2)),2) ArrayBuffer(0:(1,2), 0:(1,3), 0:(2,3), 1:(2,2), 1:(3,1), 1:(3,2)) val pairRdd = pair.aggregateByKey(3)(math.max(_,_),_+_) 结果: (2,6) (1,3) (3,3) --------------------------------------------- val pair = sc.parallelize(Array((1,2),(2,3),(1,3),(2,2),(3,1),(3,2)),3) ArrayBuffer(0:(1,2), 0:(2,3), 1:(1,3), 1:(2,2), 2:(3,1), 2:(3,2)) val pairRdd = pair.aggregateByKey(3)(math.max(_,_),_+_) (3,3) (1,6) (2,6)

aggregate

聚合函数,是一个action

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2) //两个分区内数据分别进行相加,然后两个的分区相加后的结果再相加得出最后的结果 rdd1.aggregate(0)(_+_,_+_) 结果:45 //先对每个分区求最大值,然后每个分区求出的最大值再相加 rdd1.aggregate(0)(math.max(_,_),_+_) 结果:13 4+9 //这里需要注意,初始值是每次都要参与运算的,例如下面的代码:分区1是1,2,3,4;初始值为5,则他们比较最大值就是5,分区2是5,6,7,8,9;初始值为5,则他们比较结果最大值就是9;然后再相加,这里初始值也要参与运算,5+(5+9)=19 scala> rdd1.aggregate(5)(math.max(_,_),_+_) 结果: Int = 19 val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2) //这里需要注意,由于每个分区计算是并行计算,所以计算出的结果有先后顺序,所以结果会出现两种情况:如下 scala> rdd2.aggregate("")(_+_,_+_) res0: String = defabc scala> rdd2.aggregate("")(_+_,_+_) res2: String = abcdef //这里的例子更能说明上面提到的初始值参与计算的问题,我们可以看到初始值=号参与了三次计算 scala> rdd2.aggregate("=")(_+_,_+_) res0: String = ==def=abc scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2) scala> rdd3.aggregate("")((x,y)=>math.max(x.length,y.length).toString,_+_) res1: String = 42 scala> rdd3.aggregate("")((x,y)=>math.max(x.length,y.length).toString,_+_) res3: String = 24

13.sortByKey([ascending], [numTasks])

同样是基于pairRDD的,根据key值来进行排序。ascending升序,默认为true,即升序

data.sortByKey(true,2)

14. join、leftOuterJoin、rightOuterJoin

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3))) val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7))) ------------------------------------------------------------------ val rdd3 = rdd1.join(rdd2).collect rdd3: Array[(String, (Int, Int))] = Array((tom,(1,8)), (jerry,(2,9))) ------------------------------------------------------------------ val rdd3 = rdd1.leftOuterJoin(rdd2).collect rdd3: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(8))), (jerry,(2,Some(9))), (kitty,(3,None))) ------------------------------------------------------------------ val rdd3 = rdd1.rightOuterJoin(rdd2).collect rdd3: Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),8)), (jerry,(Some(2),9)), (shuke,(None,7)))

15.cogroup(otherDataSet,[numTasks])

合并两个RDD,生成一个新的RDD。实例中包含两个Iterable值,第一个表示RDD1中相同值,第二个表示RDD2中相同值(key值),这个操作需要通过partitioner进行重新分区,因此需要执行一次shuffle操作。(若两个RDD在此之前进行过shuffle,则不需要)

val pair = sc.parallelize(Array((3,1),(3,3),(2,2),(2,6),(1,1),(1,2))) val pair1 = sc.parallelize(Array((3,2),(3,3),(2,3),(2,3),(1,1),(1,2))) pair1.cogroup(pair2).collect 结果: (3,(CompactBuffer(1, 3),CompactBuffer(2, 3))) (1,(CompactBuffer(1, 2),CompactBuffer(1, 2))) (2,(CompactBuffer(2, 6),CompactBuffer(3, 3)))

16.cartesian(otherDataSet)

求笛卡尔乘积。该操作不会执行shuffle操作。

pair.cartesian(pair1).collect ((3,1),(3,2)) ((3,1),(3,3)) ((3,1),(2,3)) ((3,3),(3,2)) ((3,3),(3,3)) ((3,3),(2,3)) ((2,2),(3,2)) ((2,2),(3,3)) ((2,2),(2,3)) ((3,1),(2,3)) ((3,1),(1,1)) ((3,1),(1,2)) ((3,3),(2,3)) ((3,3),(1,1)) ((3,3),(1,2)) ((2,2),(2,3)) ((2,2),(1,1)) ((2,2),(1,2)) ((2,6),(3,2)) ((2,6),(3,3)) ((2,6),(2,3)) ((1,1),(3,2)) ((1,1),(3,3)) ((1,1),(2,3)) ((1,2),(3,2)) ((1,2),(3,3)) ((1,2),(2,3)) ((2,6),(2,3)) ((2,6),(1,1)) ((2,6),(1,2)) ((1,1),(2,3)) ((1,1),(1,1)) ((1,1),(1,2)) ((1,2),(2,3)) ((1,2),(1,1)) ((1,2),(1,2))

17.pipe(command,[envVars])

通过一个shell命令来对RDD各分区进行“管道化”。通过pipe变换将一些shell命令用于Spark中生成的新RDD

val rdd = sc.parallelize(0 to 7,3) rdd.glom.collect.foreach(x=>{println(x.toBuffer)}) rdd.glom.collect 结果: ArrayBuffer(0, 1) ArrayBuffer(2, 3, 4) ArrayBuffer(5, 6, 7) ------------------- !!!报错 rdd.pipe("head -n 1").collect 提取每一个分区中的第一个元素构成新的RDD

18.coalesce(numPartitions)

重新分区,减少RDD中分区的数量到numPartitions。

当适当缩小分区数时,如1000->100,spark会把之前的10个分区当作一个分区,并行度变为100,不会引起数据shuffle。

​ 当严重缩小分区数时,如1000->1,运算时的并行度会变成1。为了避免并行效率低下问题,可将shuffle设为true。shuffle之前的运算和之后的运算分为不同stage,它们的并行度分别为1000,1。

当把分区数增大时,必会存在shuffle,shuffle须设为true。

data.coalesce(2,true)

19.repartition(numPartitions)

repartition是coalesce接口中shuffle为true的简易实现,即Reshuffle RDD并随机分区,使各分区数据量尽可能平衡。若分区之后分区数远大于原分区数,则需要shuffle。

data.repartition(5)

partitionBy(partitioner: Partitioner): RDD[(K, V)] 按照传入的参数进行分区,传入的参数为分区的实例对象,可以传入之定义分区的实例或者默认的HashPartitioner;源码如下:

20.repartitionAndSortWithinPartitions(partitioner)

该方法根据partitioner对RDD进行分区,并且在每个结果分区中按key进行排序。

21.keys、values

取出rdd的key或者value,生成新的RDD

val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1))) rdd1.keys.collect Array[String] = Array(e, c, d, c, a) scala> rdd1.values.collect Array[Int] = Array(5, 3, 4, 2, 1)

22.keyBy

以传入的参数作为key,生成新的RDD

val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val rdd2 = rdd1.keyBy(_.length) rdd2.collect 结果: Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))

Action

1.reduce(function)

reduce将RDD中元素两两传递给输入函数,同时产生一个新值,新值与RDD中下一个元素再被传递给输入函数,直到最后只有一个值为止。

val data = sc.parallelize(1 to 10,3) data.reduce(_+_) data.reduce((x,y)=>x+y)

2.collect()

将一个RDD以一个Array数组形式返回其中的所有元素。

rdd.collect()

collectAsMap

将RDD转换成Map(注意RDD的数据应为对偶元组)

val rdd1 = sc.parallelize(List(("a", 1), ("b", 2),("c", 2),("d", 4),("e", 1))) rdd1.collectAsMap scala.collection.Map[String,Int] = Map(e -> 1, b -> 2, d -> 4, a -> 1, c -> 2)

3.count()

返回数据集中元素个数,默认Long类型。

countByKey、countByValue:

按照key或者value计算出现的次数

val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1))) rdd1.countByKey 结果:Map[String,Int]=Map(a->1,b->2,c->2) rdd1.countByValue 结果:Map[(String, Int),Long] = Map((c,2) -> 1, (a,1) -> 1, (b,2) -> 2, (c,1) -> 1)

4.first()

返回数据集的第一个元素(类似于take(1))

5.takeSample(withReplacement, num, [seed])

对于一个数据集进行随机抽样,返回一个包含num个随机抽样元素的数组,withReplacement表示是否有放回抽样,参数seed指定生成随机数的种子。

该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver端的内存中。

data.takeSample(true,5,1) Array[Int] = Array(8,5,7,3,6)

6.take(n)

返回一个包含数据集前n个元素的数组(从0下标到n-1下标的元素),不排序。

7.takeOrdered(n,[ordering])

返回RDD中前n个元素,并按默认顺序排序(升序)或者按自定义比较器顺序排序。

data.takeOrdered(3) //从小到大排序,取三个

8.saveAsTextFile(path)

将dataSet中元素以文本文件的形式写入本地文件系统或者HDFS等。Spark将对每个元素调用toString方法,将数据元素转换为文本文件中的一行记录。

若将文件保存到本地文件系统,那么只会保存在executor所在机器的本地目录。

9.saveAsSequenceFile(path)(Java and Scala)

将dataSet中元素以Hadoop SequenceFile的形式写入本地文件系统或者HDFS等。(对pairRDD操作)

10.saveAsObjectFile(path)(Java and Scala)

将数据集中元素以ObjectFile形式写入本地文件系统或者HDFS等。

11.countByKey()

用于统计RDD[K,V]中每个K的数量,返回具有每个key的计数的(k,int)pairs的hashMap。

pair.countByKey Map[Int,Long]=Map(5->1,3->2)

12.foreach(function)

对数据集中每一个元素运行函数function。

最新回复(0)