分别用Scala,Java,JavaLambda编写统计单词个数案列

mac2024-12-04  25


Scala


import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 1.创建SparkContext * 2.创建RDD * 3调用RDD的Transformation * 4调用Action * 5释放资源 */ object Demo { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Demo") //创建SparkContext,使用SparkContext来创建RDD val sc: SparkContext = new SparkContext(conf) //Spark写Spark程序,就是对抽象的神奇的大集合[RDD]编程,调用它高度密封的API //使用SparkContext创建RDD val lines: RDD[String] = sc.textFile(args(0)) //Transformation开始// //切分压平 val words: RDD[String] = lines.flatMap(_.split(" ")) //将单词放入元组中 val wordAndone: RDD[(String, Int)] = words.map((_, 1)) //分组聚合,reduceByKey可以先局部聚合在全局聚合 val reduce: RDD[(String, Int)] = wordAndone.reduceByKey(_ + _) //排序 val sorte: RDD[(String, Int)] = reduce.sortBy(_._2, false) //Transformation 结束// //调用Action将计算结果保存到HDFS中 sorte.saveAsTextFile(args(1)) sc.stop() } }

这里是上传数据到HDFS中: 1,将编写的Scala代码打包,并上传到虚拟机; 2,将单词元数据上传到HDFS中 3,提交任务这里需要注意一下:将代码指令到Spark/bin/spark-submit才可以.代码:

/spark-2.3.3-bin-hadoop2.7/bin/spark-submit (这里是指令)--master spark://qiu01:7077(Maser位置信息) --executor-memory 1g(资源配置) --total-executor-cores 4(4个核,一个worker是默认2个核) --class WorkCount.Demo(代码类的Copy Reference) /spark11-1.0-SNAPSHOT.jar(代码Jar包上传位置信息) hdfs://qiu01:9000/data/word.txt(输入文件) hdfs://qiu01:9000/data/out(输出文件)

Java


```java import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class JavaWordCount { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); //创建JavaSparkContext JavaSparkContext jsc = new JavaSparkContext(sparkConf); //使用JavaSparkContext创建RDD JavaRDD<String> line = jsc.textFile(args[0]); //调用Transformation(s) //切分压平 JavaRDD<String> words = line.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); //将单词和一组合在一起 JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return Tuple2.apply(word, 1); } }); //分散聚合 JavaPairRDD<String, Integer> reduce = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //排序,先调换KV的顺序VK JavaPairRDD<Integer, String> swapped = reduce.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception { return tp.swap(); } }); //再排序 JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false); JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception { return tp.swap(); } }); //触发Action,将数剧保存到HDFS result.saveAsTextFile(args[1]); //释放资源 jsc.stop(); } } *** ## JavaLambda *** import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; public class JavaLambdaWordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("JavaLambdaWordCount"); //创建SparkContext JavaSparkContext jsc = new JavaSparkContext(conf); //创建RDD JavaRDD<String> lines = jsc.textFile(args[0]); //切分压平 JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); //将单词和一组合 JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(word -> Tuple2.apply(word, 1)); //分组聚合 JavaPairRDD<String, Integer> reduce = wordAndOne.reduceByKey((a, b) -> a + b); //调换顺序 JavaPairRDD<Integer, String> swapped = reduce.mapToPair(tp -> tp.swap()); //排序 JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false); //调换顺序 JavaPairRDD<String, Integer> result = sorted.mapToPair(tp -> tp.swap()); //将数据保存到HDFS result.saveAsTextFile(args[1]); //释放资源 jsc.stop(); } }
最新回复(0)