Scala
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 1.创建SparkContext
* 2.创建RDD
* 3调用RDD的Transformation
* 4调用Action
* 5释放资源
*/
object Demo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Demo")
//创建SparkContext,使用SparkContext来创建RDD
val sc: SparkContext = new SparkContext(conf)
//Spark写Spark程序,就是对抽象的神奇的大集合[RDD]编程,调用它高度密封的API
//使用SparkContext创建RDD
val lines: RDD[String] = sc.textFile(args(0))
//Transformation开始//
//切分压平
val words: RDD[String] = lines.flatMap(_.split(" "))
//将单词放入元组中
val wordAndone: RDD[(String, Int)] = words.map((_, 1))
//分组聚合,reduceByKey可以先局部聚合在全局聚合
val reduce: RDD[(String, Int)] = wordAndone.reduceByKey(_ + _)
//排序
val sorte: RDD[(String, Int)] = reduce.sortBy(_._2, false)
//Transformation 结束//
//调用Action将计算结果保存到HDFS中
sorte.saveAsTextFile(args(1))
sc.stop()
}
}
这里是上传数据到HDFS中: 1,将编写的Scala代码打包,并上传到虚拟机; 2,将单词元数据上传到HDFS中 3,提交任务这里需要注意一下:将代码指令到Spark/bin/spark-submit才可以.代码:
/spark-2.3.3-bin-hadoop2.7/bin/spark-submit (这里是指令)--master spark://qiu01:7077(Maser位置信息) --executor-memory 1g(资源配置) --total-executor-cores 4(4个核,一个worker是默认2个核) --class WorkCount.Demo(代码类的Copy Reference) /spark11-1.0-SNAPSHOT.jar(代码Jar包上传位置信息) hdfs://qiu01:9000/data/word.txt(输入文件) hdfs://qiu01:9000/data/out(输出文件)
Java
```java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class JavaWordCount {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
//创建JavaSparkContext
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
//使用JavaSparkContext创建RDD
JavaRDD<String> line = jsc.textFile(args[0]);
//调用Transformation(s)
//切分压平
JavaRDD<String> words = line.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});
//将单词和一组合在一起
JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return Tuple2.apply(word, 1);
}
});
//分散聚合
JavaPairRDD<String, Integer> reduce = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//排序,先调换KV的顺序VK
JavaPairRDD<Integer, String> swapped = reduce.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
return tp.swap();
}
});
//再排序
JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);
JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
return tp.swap();
}
});
//触发Action,将数剧保存到HDFS
result.saveAsTextFile(args[1]);
//释放资源
jsc.stop();
}
}
***
## JavaLambda
***
import org
.apache
.spark
.SparkConf
;
import org
.apache
.spark
.api
.java
.JavaPairRDD
;
import org
.apache
.spark
.api
.java
.JavaRDD
;
import org
.apache
.spark
.api
.java
.JavaSparkContext
;
import scala
.Tuple2
;
import java
.util
.Arrays
;
public class JavaLambdaWordCount {
public static void main(String
[] args
) {
SparkConf conf
= new SparkConf().setAppName("JavaLambdaWordCount");
JavaSparkContext jsc
= new JavaSparkContext(conf
);
JavaRDD
<String> lines
= jsc
.textFile(args
[0]);
JavaRDD
<String> words
= lines
.flatMap(line
-> Arrays
.asList(line
.split(" ")).iterator());
JavaPairRDD
<String, Integer> wordAndOne
= words
.mapToPair(word
-> Tuple2
.apply(word
, 1));
JavaPairRDD
<String, Integer> reduce
= wordAndOne
.reduceByKey((a
, b
) -> a
+ b
);
JavaPairRDD
<Integer, String> swapped
= reduce
.mapToPair(tp
-> tp
.swap());
JavaPairRDD
<Integer, String> sorted
= swapped
.sortByKey(false);
JavaPairRDD
<String, Integer> result
= sorted
.mapToPair(tp
-> tp
.swap());
result
.saveAsTextFile(args
[1]);
jsc
.stop();
}
}