Spark 的运行模式
Local(单节点模式):开发和学习
Standalone(集群模式):master - worker.
Spark on Yarn(运行在Yarn上):Spark客户端直接连接Yarn。不需要额外构建Spark集群。
在IDEA中新建工程运行在Local模式中,关于版本后续补充
<dependencies>
<!-- Spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<!-- Scala 包-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.7</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.7</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.12</version>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>
</dependencies>
需要另外导入scala的sdk,点击+ 选择安装好的Scala SDK即可代码编辑 PV(page view)即页面浏览量或点击量,是衡量一个网站或网页用户访问量。具体的说,PV值就是所有访问者在24小时(0点到24点)内看了某个网站多少个页面或某个网页多少次。PV是指页面刷新的次数,每一次页面刷新,就算做一次PV流量。 度量方法就是从浏览器发出一个对网络服务器的请求(Request),网络服务器接到这个请求后,会将该请求对应的一个网页(Page)发送给浏览器,从而产生了一个PV。那么在这里只要是这个请求发送给了浏览器,无论这个页面是否完全打开(下载完成),那么都是应当计为1个PV。 //1.读取文件的内容 val line:RDD[String] = sc.textFile("pvuvdata.txt") //2.将一行文本按照\t的方式分开 val data:RDD[(String,Int)] = line.map(s=>{ val l:Array[String] = s.split("\t") (l(5),1) }) //3.通过将相同的key聚合,value的值 进行相加 val count:RDD[(String,Int)] = data.reduceByKey(_+_) //4.想要进行排序 ,先进性数据交换 val swap:RDD[(Int,String)] = count.map(s=>{(s._2,s._1)}) //5.根据数据进行排序 val sort:RDD[(Int,String)] = swap.sortByKey(false) //6.将数据换回原本的值 val swapAfter:RDD[(String,Int)] = sort.map(s=>{(s._2,s._1)}) //输出前5名 swapAfter.take(5).foreach(println) 最后结果: (www.jd.com,6) (www.baidu.com,6) (www.dangdang.com,5) (www.suning.com,4) (www.taobao.com,2)