Spark整理笔记二:算子详解

mac2024-03-18  32

算子

学习spark之前,强烈建议先学习scala。学过java,再学scala不难,看两三天教程就能看会基本操作了。这对学习spark的使用会事半功倍。scala和java的一些写法还是存在很多不同点的。

Map:  可以进行计算以及格式转化,对每一条数据操作

// 进行计算 def mapTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd: RDD[Int] = sc.makeRDD(1 to 5).map(_*2) mapRdd.collect().foreach(println) sc.stop } 结果: 2 4 6 8 10

改变格式

def mapTest2={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd: RDD[(Int, String)] = sc.makeRDD(1 to 5).map((_,"heool")) mapRdd.collect().foreach(println) val mapRdd2 = mapRdd.map(_._2) mapRdd2.collect().foreach(println) sc.stop } maoRdd结果: (1,heool) (2,heool) (3,heool) (4,heool) (5,heool) mapRdd2结果: heool heool heool heool heool

 

mapPartitions:类似于map,但独立在RDD的每个分片(分区)上运行

mapPartitions 优于 map算子,减少发送到执行器的次数,但可能会出现内存溢出

def mapPatitionTest = { val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd = sc.makeRDD(1 to 5).mapPartitions(datas => datas.map((_,"heool"))) mapRdd.collect().foreach(println) val mapRdd2 = mapRdd.mapPartitions(datas => datas.map(_._2)) mapRdd2.collect().foreach(println) sc.stop }

 

mapPartitionsWithIndex:关联分区号

def mapPatitionsWithIndeTest = { val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd = sc.makeRDD(1 to 5).mapPartitionsWithIndex { case (num, datas) => datas.map((_,"分区号:"+num)) } mapRdd.collect().foreach(println) sc.stop } 结果: (1,分区号:0) (2,分区号:1) (3,分区号:2) (4,分区号:3) (5,分区号:3)

 

Driver & Executor

driver: 创建spark上下文对象的程序为Driver

executor:执行器接收任务并执行任务,所有算子的计算功能都由executor执行,若计算中存在类,则需要序列化(网络传输需要序列化)

 

FlatMap:做了map计算,然后降维度(例如二维数组变成一维数组)

def flatMapTest = { val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd = sc.makeRDD(Array("hello world","Mary","Jack","Kitty")).flatMap(_.split(" ")) mapRdd.collect().foreach(println) sc.stop } 结果 hello world Mary Jack Kitty

 

glom:将每一个分区形成一个数组,形成新的RDD类型:RDD[Array[T]]

def glomTest = { val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd: RDD[Array[Int]] = sc.makeRDD(1 to 18,4).glom() mapRdd.collect().foreach(datas => println(datas.mkString(","))) sc.stop } 结果: 1,2,3,4 5,6,7,8,9 10,11,12,13 14,15,16,17,18

 

groupBy:根据传入的函数进行分组

def groupByTest = { val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd: RDD[(Int, Iterable[Int])] = sc.makeRDD(1 to 18).groupBy(data =>{data%4}) mapRdd.collect().foreach(datas => println(" ======== "+datas._1+ " "+datas._2.mkString(","))) sc.stop } 结果 ======== 0 4,8,12,16 ======== 1 1,5,9,13,17 ======== 2 2,6,10,14,18 ======== 3 3,7,11,15

 

Filter:根据传入的函数进行过滤,返回一个新的RDD

def filterTest = { val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd = sc.makeRDD(1 to 18).filter(data =>{data%4 != 0}) mapRdd.collect().foreach(data => println(" ======== "+data)) sc.stop } 结果 ======== 1 ======== 2 ======== 3 ======== 5 ======== 6 ======== 7 ======== 9 ======== 10 ======== 11 ======== 13 ======== 14 ======== 15 ======== 17 ======== 18

 

sample(withReplacement,fraction,seed):以指定的随机种子随机拍样出数量为fraction的数据,withReplacement表示抽样出的数据是否放回,true放回的抽样,false不放回的抽样,seed用于指定随机数生成的种子。可用于数据量大的时候采样。

def sampleTest = { val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd: RDD[Int] = sc.makeRDD(1 to 10).sample(false,0.4,1) //随机数用如下比较好,java中种子确定就没有真正的随机数 //val mapRdd: RDD[Int] = sc.makeRDD(1 to 10).sample(false,0.4,System.currentTimeMillis()) println(" ============ "+mapRdd.collect().mkString(",")) sc.stop } 结果: ============ 3,5,6,8

 

distinct:去重,数据会被打乱重组(shuffle),存到不同的分区。因为需要等待每个分区数据读取,所有速度会慢,没有shuffle,速度更快

def distinctTest = { val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd: RDD[Int] = sc.makeRDD(Array(1,2,3,5,3,1,6)).distinct() // val mapRdd: RDD[Int] = sc.makeRDD(Array(1,2,3,5,3,1,6)).distinct(2) 存入2个分区 println(" ============ "+mapRdd.collect().mkString(",")) sc.stop } 结果 ============ 1,5,6,2,3

 

coalesce:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。把需要缩减的分区放到其他分区中,不需要打乱重拍

def coalesceTest = { val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd: RDD[Int] = sc.makeRDD(1 to 16,4).coalesce(3) println(" ============ "+mapRdd.partitions.length) sc.stop } 结果 ============ 3

 

repartition:

def repartitionTest = { val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd: RDD[Int] = sc.makeRDD(1 to 16,4) val mapRdd2 = mapRdd.repartition(3) println(" ============ "+mapRdd.partitions.length) println(" ============ "+mapRdd2.partitions.length) sc.stop } 结果,用saveAsText测试效果更明显 ============ 4 ============ 3

 

repartition:有shuffle(两个分区的数据合到一个分区,不算shuffle),有打乱重组

coalesce 和 repartition 的比较:

coalesce:重新分区,可以选择是否进行shuffle过程,由参数shuffle:Boolean:true/false决定

repartion:实际是调用coalesce,默认进行shuffle

def repartitionTest = { val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd: RDD[Int] = sc.makeRDD(1 to 16,4) val mapRdd2 = mapRdd.repartition(3) mapRdd.glom().collect().foreach( data => println(data.mkString(","))) mapRdd2.glom().collect().foreach(data => println(data.mkString(","))) sc.stop } 结果: map: 1,2,3,4 5,6,7,8 9,10,11,12 13,14,15,16 map2: 3,7,10,13,16 1,4,5,8,11,14 2,6,9,12,15

 

sortBy:排序,默认升序,false参数可降序

def sortByTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) //sc.makeRDD(Array(3,2,1,6,7,5)).sortBy(t => t,false).collect().mkString(",") 降序 println(" ===== "+sc.makeRDD(Array(3,2,1,6,7,5)).sortBy(t => t).collect().mkString(",")) }

 

union:对源RDD和参数RDD求并集,并返回一个新的RDD

def unionTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(1 to 5) val mapRdd2 = sc.makeRDD(3 to 7) println(" ===== "+mapRdd1.union(mapRdd2).collect().mkString(",")) } 结果: mapRdd1:1,2,3,4,5 mapRdd2:3,4,5,6,7 ===== 1,2,3,4,5,3,4,5,6,7

 

subtract:计算差的函数

def subtractTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(1 to 5) val mapRdd2 = sc.makeRDD(3 to 7) println(" ===== "+mapRdd1.subtract(mapRdd2).collect().mkString(",")) } 结果: mapRdd1:1,2,3,4,5 mapRdd2:3,4,5,6,7 ===== 1,2

 

intersection:求交集

def intersectionTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(1 to 5) val mapRdd2 = sc.makeRDD(3 to 7) println(" ===== "+mapRdd1.intersection(mapRdd2).collect().mkString(",")) } 结果: mapRdd1:1,2,3,4,5 mapRdd2:3,4,5,6,7 ===== 4,5,3

 

cartesian:笛卡儿积(尽量避免使用)

def cartesianTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(1 to 5) val mapRdd2 = sc.makeRDD(3 to 7) println(" ===== "+mapRdd1.cartesian(mapRdd2).collect().mkString(",")) } 结果: mapRdd1:1,2,3,4,5 mapRdd2:3,4,5,6,7 ===== (1,3),(1,4),(1,5),(1,6),(1,7),(2,3),(2,4),(2,5),(2,6),(2,7),(3,3),(3,4),(3,5),(3,6),(3,7),(4,3),(5,3),(4,4),(5,4),(4,5),(5,5),(4,6),(4,7),(5,6),(5,7)

 

zip:将两个RDD组合成key/value形式的RDD,这里默认partition数量以及元素个数相同,否则会抛异常

def cartesianTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(1 to 5) val mapRdd2 = sc.makeRDD(3 to 7) println(" ===== "+mapRdd1.cartesian(mapRdd2).collect().mkString(",")) sc.stop } 结果 mapRdd1:1,2,3,4,5 mapRdd2:5,6,7,8,9 (1,5),(2,6),(3,7),(4,8),(5,9)

 

partitionBy:对pairRDD进行分区操作,如果原有的partitionRDD和现有的partitionRDD是一致的话就不进行分区,否则会生成shuffleRDD,即会产生shuffle过程

def partitionByTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(Array((1,"a"),(2,"b"),(3,"c"))) mapRdd1.partitionBy(new HashPartitioner(2)).glom().collect().foreach(data => println(" ===== "+data.mkString(","))) } 结果 ===== (2,b) ===== (1,a),(3,c)

 

groupByKey:根据key进行分组

def groupByKeyTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(Array((1,"a"),(2,"b"),(3,"c"),(1,"d"))) mapRdd1.groupByKey().collect().foreach(data=>println(" ===== key : "+data._1+" value: "+data._2.mkString(","))) } 结果: ===== key : 1 value: a,d ===== key : 2 value: b ===== key : 3 value: c

 

reduceByKey:根据key进行聚合统计

reduceByKey 和 groupByKey区别

reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]

groupByKey:按照key进行分组,直接进行shuffle

def reduceByKeyTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(Array(('a',1),('b',1),('c',1),('a',1))) mapRdd1.reduceByKey(_+_).collect().foreach(println) } 结果: (a,2) (b,1) (c,1)

 

aggregateByKey:根据key进行统计,有分区内和分区间的区别

def aggregateByKeyTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(List(('a',3),('a',2),('c',4),('b',3),('c',6),('c',8)),2) mapRdd1.glom().collect().foreach(d => println("初始化分区 :"+d.mkString(","))) //取出每个分区相同key 的最大值,然后相加 val aggRDD1 =mapRdd1.aggregateByKey(0)(Math.max(_,_) , _+_) val aggRDD2 =mapRdd1.aggregateByKey(0)((k,v)=>{ if(k > v) {k} else {v} } ,_+_) println("=====aggRDD1 "+aggRDD1.collect().mkString(",")) println("=====aggRDD2 "+aggRDD2.collect().mkString(",")) } 结果 初始化分区 :(a,3),(a,2),(c,4) 初始化分区 :(b,3),(c,6),(c,8) =====aggRDD1 (b,3),(a,3),(c,12) =====aggRDD2 (b,3),(a,3),(c,12)

 

foldByKey:aggregateByKey的简化操作,分区内和分区间一样

def foldByKeyTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(List(('a',3),('a',2),('c',4),('b',3),('c',6),('c',8)),2) mapRdd1.glom().collect().foreach(d => println("初始化分区 :"+d.mkString(","))) val aggRDD1 =mapRdd1.foldByKey(0)(_+_) println("=====aggRDD1 "+aggRDD1.collect().mkString(",")) } 结果 =====aggRDD1 (b,3),(a,5),(c,18)

 

combineByKey:对相同的key,把value合并成一个集合

求平均值,先转换格式,然后进行计算,改变初始值

def combineByKeyTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(List(('a',3),('a',2),('a',4),('c',8))) val conRDD = mapRdd1.combineByKey((_, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc: (Int, Int), acc2: (Int, Int)) => (acc._1 + acc2._1, acc._2 + acc2._2)) println(" ===== conRDD : "+conRDD.collect().mkString(",")) //如果分区内的不好计算,返回原值,在分区间进行计算 val conRdd3: RDD[(Char, Int)] = conRDD.map{case(k:Char,v:(Int,Int)) => (k,v._1/v._2)} conRdd3.foreach(d=>println(" ===== conRDD3 : "+d.toString())) sc.stop } 结果 ===== conRDD : (a,(9,3)),(c,(8,1)) ===== conRDD3 : (c,8) ===== conRDD3 : (a,3)

 

sortByKey:排序

def sortByKeyTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(List(('a',3),('d',2),('c',4),('b',3),('d',6),('c',8)),2) mapRdd1.sortByKey().collect().foreach(d => println(" 1 ===== "+ d)) mapRdd1.sortByKey(false).collect().foreach(d => println(" 2 ===== "+ d)) } 结果 1 ===== (a,3) 1 ===== (b,3) 1 ===== (c,4) 1 ===== (c,8) 1 ===== (d,2) 1 ===== (d,6) 2 ===== (d,2) 2 ===== (d,6) 2 ===== (c,4) 2 ===== (c,8) 2 ===== (b,3) 2 ===== (a,3)

 

sortWith:比较的数据结构是List集合,比较内容是 两个字段的大小,即比较规则

def sortWithTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(List(('a',3),('d',2),('c',4),('b',3),('d',6),('c',8)),2) val mapRdd2 = mapRdd1.groupByKey().map(d => { (d._1, d._2.toList.sortWith(_ > _)) }) mapRdd2.collect().foreach(d => println(" ===== "+ d)) } 结果: ===== (d,List(6, 2)) ===== (b,List(3)) ===== (a,List(3)) ===== (c,List(8, 4))

 

sortBy:排序的内容是字段key/value

def sortByTest1={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(Array(('a',3),('d',2),('c',4),('b',3),('d',6),('c',8)),2) val mapRdd2 = mapRdd1.sortBy(data => data._1,false) val mapRdd3 = mapRdd1.sortBy(data => data._2,false) mapRdd2.collect().foreach(d => println(" ===== "+ d)) mapRdd3.collect().foreach(d => println(" ===== "+ d)) } 结果 ===== (d,2) ===== (d,6) ===== (c,4) ===== (c,8) ===== (b,3) ===== (a,3) ===== (c,8) ===== (d,6) ===== (c,4) ===== (a,3) ===== (b,3) ===== (d,2)

 

mapValues:key不变,对value值进行格式转化

def mapValuesTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(List(('a',3),('d',2),('c',4),('b',3),('d',6),('c',8)),2) mapRdd1.mapValues(d => ((d,1))).collect().foreach(d => println(" ===== "+d)) } 结果 ===== (a,(3,1)) ===== (d,(2,1)) ===== (c,(4,1)) ===== (b,(3,1)) ===== (d,(6,1)) ===== (c,(8,1))

 

join:在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

def joinTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(List(('a',3),('d',2),('c',4),('b',3),('d',6),('c',8)),2) //如下也不会报错,但key = g的数据不出现 //val mapRdd1 = sc.makeRDD(List(('a',3),('d',2),('c',4),('b',3),('d',6),('c',8),('g',8)),2) val mapRdd2 = sc.makeRDD(List(('a','a'),('d','d'),('c','c'),('b','b'),('d','d'),('c','c')),2) mapRdd1.join(mapRdd2).collect().foreach(d => println("==== "+d)) } 结果 ==== (d,(2,d)) ==== (d,(2,d)) ==== (d,(6,d)) ==== (d,(6,d)) ==== (b,(3,b)) ==== (a,(3,a)) ==== (c,(4,c)) ==== (c,(4,c)) ==== (c,(8,c)) ==== (c,(8,c))

 

cogroup:在类型为(K,V)和(K,W)上的RDD调用,返回一个(K,Interable(V),Interable(W))类型的RDD

def cogroupTest={ val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val mapRdd1 = sc.makeRDD(List(('a',3),('d',2),('c',4),('b',3),('d',6),('c',8)),2) val mapRdd2 = sc.makeRDD(List(('a','a'),('d','d'),('c','c'),('b','b'),('d','d'),('c','c')),2) mapRdd1.cogroup(mapRdd2).collect().foreach(d => println("==== "+d)) } 结果: ==== (d,(CompactBuffer(2, 6),CompactBuffer(d, d))) ==== (b,(CompactBuffer(3),CompactBuffer(b))) ==== (a,(CompactBuffer(3),CompactBuffer(a))) ==== (c,(CompactBuffer(4, 8),CompactBuffer(c, c)))

取前几个:

take:于获取RDD中从0到num-1下标的元素,不排序

 

top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素

 

takeOrdered:takeOrdered和top类似,只不过以和top相反的顺序返回元素

 

练习

//每个省广告的点击,点击倒序,然后取每个省的前三显示 def test1={ // cogroupTest val conf = new SparkConf().setMaster("local[*]").setAppName("hello") val sc = new SparkContext(conf) val input = sc.makeRDD(Array("河南 洗发水", "河南 沐浴露","河南 沐浴露","河南 豆汁","河南 西湖醋鱼","河南 豆汁","河南 西湖醋鱼","河南 西湖醋鱼","河南 豆汁","北京 豆汁")) val rdd1: RDD[(String, Int)] = input.map(d => ((d.split(" ")(0)+"-"+d.split(" ")(1),1))).reduceByKey(_+_) val rdd2= rdd1.map(d=>( d._1.split("-")(0),(d._1.split("-")(1),d._2))).groupByKey() val rdd3 = rdd2.map(data => { val sortRdd: Seq[(String, Int)] = data._2.toList.sortWith((k, v) => { k._2 > v._2 }).take(3) (data._1,sortRdd) }) rdd3.collect().foreach(d=> println(" ===== "+d)) } 结果 ===== (河南,List((豆汁,3), (西湖醋鱼,3), (沐浴露,2))) ===== (北京,List((豆汁,1)))

 

===================================================================

Action算子

reduce(): 将所有元素聚合得到结果

    eg:  reduce(_+_)

collect(): 在驱动程序中,以数组的形式返回数据集的所有元素

count(): 统计元素个数

first(): 返回RDD中的第一个元素

take(n): 返回一个由RDD前n个元素组成的数组

takeOrdered(n): 返回该RDD排序后的前n个元素组成的数组

aggregate(zeroValue)(seqOp,combOp): 将每个分区(分区内和分区间)里面的元素通过seqOp 和初始值进行聚合,然后通过combine函数将每个分区的结果和初始值进行combine操作,这个函数最终返回的类型不需要和RDD元素中元素类型一致

fold(num)(func):折叠操作,是aggregate的简化操作

saveAsTextFile: 结果以text形式保存

saveAsSequenceFile: 结果以序列形式保存

saveAsObjectFile: 将RDD序列化成对象,保存到文件

countByKey: 针对(key,value)形式的RDD,返回一个(key,Int)的map,表示每个key对应的元素个数

foreach

 

最新回复(0)