Spark系列---SparkSQL(三)读取Txt文件、Json文件、Hive、Mysql数据源(Java、Scala版本)

mac2024-07-02  63

1.读取txt文件

scala版本

package com.kevin.scala.dataframe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext /** * 读取txt文件转成DataFrame形式操作 */ object DataFrameTxt { def main(args: Array[String]): Unit = { // 1.创建sparkconf val conf = new SparkConf().setAppName("DataFrameTxt").setMaster("local") // 2.创建sc val sc = new SparkContext(conf) // 3.创建sqlcontext val sqlContext = new SQLContext(sc) val line = sc.textFile("DTSparkSql\\src\\main\\resources\\person.txt") import sqlContext.implicits._ // 4.读取文件用map切分,再用map将数据装成Person类型,toDF转成DataFrame line.map(_.split(",")).map(p => new Person(p(0),p(1),p(2).trim.toInt)).toDF.show() // 5.关闭sc sc.stop() } }

Java版本

package com.kevin.java.dataframe; import com.kevin.java.entity.Person; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; /** * @author kevin * @version 1.0 * @description 读取txt文件转成DataFrame形式操作 * @createDate 2019/1/6 */ public class DataFrameTxt { public static void main(String[] args) { // 1.创建SparkConf并设置作业名称和模式 SparkConf conf = new SparkConf().setAppName("DataFrameTxt").setMaster("local"); // 2.基于sparkConf创建SparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 3.创建SQLContext对象对sql进行分析处理 SQLContext sqlContext = new SQLContext(sc); // 4.读取文件数据 String file = "DTSparkSql\\src\\main\\resources\\person.txt"; JavaRDD<String> lineRDD = sc.textFile(file); // 5.获取每一行txt数据转为person类型 JavaRDD<Person> map = lineRDD.map(new Function<String, Person>() { @Override public Person call(String line) throws Exception { String[] s = line.split(","); Person p = new Person(); p.setId(s[0]); p.setName(s[1]); p.setAge(Integer.valueOf(s[2])); return p; } }); // 6.将person类型的数据转为DataFrame表 DataFrame df = sqlContext.createDataFrame(map, Person.class); // 7.查看所有数据,默认前20行 df.show(); // 8.关闭sc sc.close(); } }

2.读取Json文件

scala版本

package com.kevin.scala.dataframe import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} /** * 读取json格式文件转为DataFrame表形式分析处理 * json文件中不能嵌套json格式的内容 * 1.读取json格式两种方式 * 2.df.show()默认显示前20行,使用df.show(行数)显示多行 * 3.df.javaRDD/(scala df.rdd)将DataFrame转换成RDD * 4.df.printSchema()显示DataFrame中Schema信息 * 5.dataFrame自带的API操作DataFrame,一般不使用 * 6.使用sql查询,先将DataFrame注册成临时表:df.registerTempTable("jtable"), * 再使用sql,怎么使用sql?sqlContext.sql("sql语句") * 7.不能加载嵌套的json文件 * 8.df加载过来之后将列安装ascii排序 */ object DataFrameJson { def main(args: Array[String]): Unit = { // 1.创建SparkConf val conf = new SparkConf().setAppName("DataFrameJson").setMaster("local") // 2.创建SparkContext val sc = new SparkContext(conf) // 3.创建SQLContext对象对sql进行分析处理 val sqlContext = new SQLContext(sc) val file = "DTSparkSql\\src\\main\\resources\\json" // 4.读取json文件并转成DataFrame表 val df = sqlContext.read.json(file) // 5.获取表中所有的数据 df.show() // 6.使用DataFrame自带的API操作DataFrame // select name from table df.select("name").show() // select name, age+10 as addage from table df.select(df.col("name"),df.col("age").plus(10).alias("addage")).show() // select name ,age from table where age>19 df.select(df.col("name"),df.col("age")).where(df.col("age").gt("19")).show() // select age,count(*) from table group by age df.groupBy(df.col("age")).count().show() // 7.将DataFrame注册成临时表使用sql语句操作 df.registerTempTable("tempTable") sqlContext.sql("select age,count(*) as gg from tempTable group by age").show() sqlContext.sql("select name,age from tempTable").show() // 8.关闭 sc.stop() } }

java版本

package com.kevin.java.dataframe; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; /** * @author kevin * @version 1.0 * @description 读取json格式文件转为DataFrame表形式分析处理 * * json文件中不能嵌套json格式的内容 * 1.读取json格式两种方式 * 2.df.show()默认显示前20行,使用df.show(行数)显示多行 * 3.df.javaRDD/(scala df.rdd)将DataFrame转换成RDD * 4.df.printSchema()显示DataFrame中Schema信息 * 5.dataFrame自带的API操作DataFrame,一般不使用 * 6.使用sql查询,先将DataFrame注册成临时表:df.registerTempTable("jtable"), * 再使用sql,怎么使用sql?sqlContext.sql("sql语句") * 7.不能加载嵌套的json文件 * 8.df加载过来之后将列安装ascii排序 * @createDate 2019/1/6 */ public class DataFrameJson { public static void main(String[] args) { // 1.创建SparkConf作业设置作业名称和模式 SparkConf conf = new SparkConf().setAppName("DataFrameJson").setMaster("local"); // 2.基于Sparkconf对象创建一个SparkContext上下文,它是通往集群的唯一通道,且在创建时会创建任务调度器 JavaSparkContext sc = new JavaSparkContext(conf); // 3.根据sc创建SQLContext对象对sql进行分析处理 SQLContext sqlContext = new SQLContext(sc); // 4.读取文件数据 String file = "DTSparkSql\\src\\main\\resources\\json"; // 5.读取json文件并转成DataFrame表 DataFrame df = sqlContext.read().json(file); // 6.获取表中所有数据 df.show(); // 7.使用DataFrame自带的API操作DataFrame // select name from table df.select("name").show(); // select name, age+10 as addage from table df.select(df.col("name"),df.col("age").plus(10).alias("addage")).show(); // select name ,age from table where age>19 df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show(); // select age,count(*) from table group by age df.groupBy(df.col("age")).count().show(); // 8.将DataFrame注册成临时表使用sql语句操作 df.registerTempTable("jtable"); DataFrame sql = sqlContext.sql("select age,count(*) as gg from jtable group by age"); sql.show(); DataFrame sql2 = sqlContext.sql("select name,age from jtable"); sql2.show(); // 关闭 sc.close(); } }

3.读取Hive

如果读取hive中数据,要使用HiveContext HiveContext.sql(sql)可以操作hive表,还可以操作虚拟表 在idea访问由SparkSQL访问hive需要将hive-site.xml复制到resources

scala版本

package com.kevin.scala.dataframe import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} /** * 如果读取hive中数据,要使用HiveContext * HiveContext.sql(sql)可以操作hive表,还可以操作虚拟表 * 在idea访问由SparkSQL访问hive需要将hive-site.xml复制到resources */ object DataFrameHive { def main(args: Array[String]): Unit = { // 1.创建SparkConf val conf = new SparkConf().setAppName("DataFrameHive").setMaster("local") // 2.创建SparkContext val sc = new SparkContext(conf) // 3.创建HiveContext val hiveContext = new HiveContext(sc) // 4.查看所有数据库 hiveContext.sql("show tables").show() // 5.关闭sc sc.stop() } }

java版本

package com.kevin.java.dataframe; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.hive.HiveContext; import java.util.List; /** * @author kevin * @version 1.0 * @description 如果读取hive中数据,要使用HiveContext * HiveContext.sql(sql)可以操作hive表,还可以操作虚拟表 * @createDate 2019/1/6 */ public class DataFrameHive { public static void main(String[] args) { /* * 0.把hive里面的hive-site.xml放到spark/conf目录下 * 1.启动Mysql * 2.启动HDFS * 3.启动Hive ./hive * 4.初始化HiveContext * 5.打包运行 * * ./bin/spark-submit --master yarn-cluster --class com.kevin.java.dataframe.DataFrameHive /root/DTSparkSql.jar * ./bin/spark-submit --master yarn-client --class com.kevin.java.dataframe.DataFrameHive /root/DTSparkSql.jar */ // 如果不设置master,则无法在本地运行,需要打包在集群运行 SparkConf conf = new SparkConf().setAppName("DataFrameHive").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); //SparkSession // 创建HiveContext,注意,这里,它接收的是SparkContext作为参数,不是JavaSparkContext, // 其实也可以使用JavaSparkContext,只不过内部也是做了sc.sc()的操作 HiveContext hiveContext = new HiveContext(sc); DataFrame sql = hiveContext.sql("show databases"); sql.show(); sc.close(); } }

4.读取Mysql数据源

scala版本

package com.kevin.scala.dataframe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext /** * 读取mysql数据源 */ object DataFrameMysqlRead { def main(args: Array[String]): Unit = { // 1.创建SparkConf val conf = new SparkConf().setAppName("DataFrameMysqlRead").setMaster("local") // 配置join或者聚合操作shuffle数据时分区的数量 conf.set("spark.sql.shuffle.partitions","1") // 2.创建SparkContext val sc = new SparkContext(conf) // 3.创建SQLContext对象对sql进行分析处理 val sqlContext = new SQLContext(sc) val options = Map("url" -> "jdbc:mysql://192.168.171.101:3306/test", "driver" -> "com.mysql.jdbc.Driver", "user" -> "root", "password" -> "Hadoop01!", "dbtable" -> "person") // 4.读取Mysql数据库表,加载为DataFrame val jdbcDF = sqlContext.read.format("jdbc").options(options).load() jdbcDF.show() // 5.注册为临时表 jdbcDF.registerTempTable("temp_person") // 6.查询name=kevin val result = sqlContext.sql("select id,name,age from temp_person where name = 'kevin' ") result.show() // 7.关闭sc sc.stop() } }

将数据写入Mysql

package com.kevin.scala.dataframe import java.util.Properties import org.apache.spark.sql.{Row, SQLContext, SaveMode} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext} /** * 将数据写入mysql */ object DataFrameMysqlWrite { def main(args: Array[String]): Unit = { // 1.创建SparkConf val conf = new SparkConf().setAppName("DataFrameMysqlWrite").setMaster("local") // 2.创建sc val sc = new SparkContext(conf) // 3.创建sqlContext val sqlContext = new SQLContext(sc) // 4.通过并行化创建RDD val person = sc.parallelize(Array("3 cao 23","4 zheng 20","5 mai 20")).map(_.split(" ")) // 5.通过StructType直接指定每个字段的schema val schema = StructType(List( StructField("id",IntegerType,true), StructField("name",StringType,true), StructField("age",IntegerType,true) )) // 6.将rdd映射到rowRdd val row = person.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt)) // 7.创建DataFrame val df = sqlContext.createDataFrame(row,schema) // 8.数据库的账号和密码 val prop = new Properties() prop.put("user","root") prop.put("password","Hadoop01!") // 9.将数据插入表中,SaveMode.Overwrite覆盖表的数据 df.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.171.101:3306/test","person",prop) // 10.关闭sc sc.stop() } }

读取和写入Mysql

java版本

package com.kevin.java.dataframe; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SaveMode; import java.util.HashMap; import java.util.Properties; /** * @author kevin * @version 1.0 * @description 读取mysql数据源 * @createDate 2019/1/8 */ public class DataFrameMysql { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("DataFrameMysql").setMaster("local"); // 配置join或者聚合操作shuffle数据时分区的数量 conf.set("spark.sql.shuffle.partitions","1"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); // 第一种方式,读取Mysql数据库表,加载为DataFrame HashMap<String, String> options = new HashMap<>(); options.put("url","jdbc:mysql://192.168.171.101:3306/sparkdb"); options.put("driver","com.mysql.jdbc.Driver"); options.put("user","root"); options.put("password","Hadoop01!"); options.put("dbtable","person"); DataFrame person = sqlContext.read().format("jdbc").options(options).load(); person.show(); // 注册为临时表 person.registerTempTable("person1"); // 第二种方式,读取Mysql数据库表,加载为DataFrame DataFrameReader reader = sqlContext.read().format("jdbc"); reader.option("url","jdbc:mysql://192.168.171.101:3306/sparkdb"); reader.option("driver","com.mysql.jdbc.Driver"); reader.option("user","root"); reader.option("password","Hadoop01!"); reader.option("dbtable","score"); DataFrame score = reader.load(); score.show(); score.registerTempTable("score1"); DataFrame result = sqlContext.sql("select person1.id,person1.name,person1.age,score1.score " + "from person1,score1 " + "where person1.name = score1.name"); result.show(); // 将DataFrame结果保存到Mysql中 Properties properties = new Properties(); properties.setProperty("user","root"); properties.setProperty("password","Hadoop01!"); result.write().mode(SaveMode.Overwrite) .jdbc("jdbc:mysql://192.168.171.101:3306/sparkdb","result",properties); System.out.println("-----Finish------"); sc.stop(); } }

5.读取json文件并保存成parquet文件和加载parquet文件

scala版本

package com.kevin.scala.dataframe import org.apache.spark.sql.{SQLContext, SaveMode} import org.apache.spark.{SparkConf, SparkContext} /** * 读取json文件并保存成parquet文件和加载parquet文件 */ object DataFrameParquet { def main(args: Array[String]): Unit = { // 1.创建sparkconf val conf = new SparkConf().setAppName("DataFrameParquet").setMaster("local") // 2.创建sc val sc = new SparkContext(conf) // 3.创建sqlContext val sqlContext = new SQLContext(sc) val json = sc.textFile("DTSparkSql\\src\\main\\resources\\json") // 4.读取json文件将文件转成rdd val df = sqlContext.read.json(json) // SaveMode指定存储文件时的保存模式:Overwrite:覆盖,Append:追加,ErrorIfExists:如果存在就报错,Ignore:若果存在就忽略 // 5.将DataFrame保存成parquet文件,保存成parquet文件只能使用Overwrite和Ignore两种方式 df.write.mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet") // 6.加载parquet文件成df,加载parquet文件只能使用下面两种 val load = sqlContext.read.parquet("./sparksql/parquet") // val load = sqlContext.read.format("parquet").load("./sparksql/parquet") // 7.显示表中所有的数据 load.show() // 8.关闭sc sc.stop() } }

java版本

package com.kevin.java.dataframe; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SaveMode; /** * @author kevin * @version 1.0 * @description 读取json文件并保存成parquet文件和加载parquet文件 * @createDate 2019/1/6 */ public class DataFrameParquet { public static void main(String[] args) { // 1.创建SparkConf作业设置作业名称和模式 SparkConf conf = new SparkConf().setAppName("DataFrameParquet").setMaster("local"); // 2.基于Sparkconf对象创建一个SparkContext上下文,它是通往集群的唯一通道,且在创建时会创建任务调度器 JavaSparkContext sc = new JavaSparkContext(conf); // 3.根据sc创建SQLContext用于sql的分析处理 SQLContext sqlContext = new SQLContext(sc); // 4.读取文件数据 String file = "DTSparkSql\\src\\main\\resources\\json"; JavaRDD<String> jsonRDD = sc.textFile(file); // 5.读取json形式的文件并转为DataFrame DataFrame df = sqlContext.read().json(jsonRDD); // DataFrame json = sqlContext.read().format("json").load("./spark/json"); // json.show(); // 6.将DataFrame保存成parquet文件 // SaveMode指定存储文件时的保存模式:Overwrite:覆盖,Append:追加,ErrorIfExists:如果存在就报错,Ignore:若果存在就忽略 // 保存成parquet文件有以下两种方式 df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet"); // df.write().mode(SaveMode.Ignore).parquet("./sparksql/parquet"); // 7.加载parquet文件成DataFrame // 记载parquet文件有以下两种方式 DataFrame load = sqlContext.read().format("parquet").load("./sparksql/parquet"); // DataFrame load = sqlContext.read().parquet("./sparksql/parquet"); // 8.查看表中所有数据 load.show(); // 9.关闭 sc.close(); } }

 

最新回复(0)