Spark系列---SparkSQL(四)动态与反射方式创建DataFrame(Java、Scala版本)

mac2024-07-14  51

1.动态创建Scheme将非json格式RDD转换成DataFrame,推荐

scala版本

package com.kevin.scala.dataframe import org.apache.spark.sql.types._ import org.apache.spark.sql.{RowFactory, SQLContext} import org.apache.spark.{SparkConf, SparkContext} /** * 动态创建Scheme将非json格式RDD转换成DataFrame,推荐 */ object DataFrameRDDWithStruct { def main(args: Array[String]): Unit = { // 1.创建sparkconf val conf = new SparkConf().setAppName("DataFrameRDDWithStruct").setMaster("local") // 2.创建sc val sc = new SparkContext(conf) // 3.创建sqlcontext val sqlContext = new SQLContext(sc) val line = sc.textFile("DTSparkSql\\src\\main\\resources\\person.txt") // 4.读取文件数据,先使用map切分数据,再使用map将数据转成Row类型 val row = line.map(_.split(",")).map(s => RowFactory.create(s(0),s(1),Integer.valueOf(s(2)))) // 5.动态构建DataFrame中的元数据,一般来说这里的字段可以源于字符串,也可以源于外部数据库 val structFields = Array( StructField("id",StringType,true), StructField("name",StringType,true), StructField("age",IntegerType,true) ) // 6.将list转为DataFrame中的元数据 val structType = DataTypes.createStructType(structFields) // 7.将row类型的rdd数据和对应的字段名称类型转成DataFrame val df = sqlContext.createDataFrame(row,structType) // 8.查询数据 df.show() // 9.将df转成rdd然后遍历获取name的数据 df.rdd.foreach(r => println(r.getAs("name"))) // 10.关闭sc sc.stop() } }

java版本

package com.kevin.java.dataframe; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.Arrays; import java.util.List; /** * @author kevin * @version 1.0 * @description 动态创建Scheme将非json格式RDD转换成DataFrame,推荐 * @createDate 2019/1/6 */ public class DataFrameRDDWithStruct { public static void main(String[] args) { // 1.创建SparkConf作业设置作业名称和模式 SparkConf conf = new SparkConf().setAppName("DataFrameRDDWithStruct").setMaster("local"); // 2.基于Sparkconf对象创建一个SparkContext上下文,它是通往集群的唯一通道,且在创建时会创建任务调度器 JavaSparkContext sc = new JavaSparkContext(conf); // 3.根据sc创建SQLContext对象对sql进行分析处理 SQLContext sqlContext = new SQLContext(sc); // 4.读取文件数据 String file = "DTSparkSql\\src\\main\\resources\\person.txt"; JavaRDD<String> lineRDD = sc.textFile(file); // 5.转成Row类型的RDD JavaRDD<Row> rowJavaRDD = lineRDD.map(new Function<String, Row>() { @Override public Row call(String line) throws Exception { String[] s = line.split(","); return RowFactory.create(s[0], s[1], Integer.valueOf(s[2])); } }); // 6.动态构建DataFrame中的元数据,一般来说这里的字段可以源于字符串,也可以源于外部数据库 List<StructField> structFields = Arrays.asList( DataTypes.createStructField("id", DataTypes.StringType, true), DataTypes.createStructField("name", DataTypes.StringType, true), DataTypes.createStructField("age", DataTypes.IntegerType, true)); // 7.将list转为DataFrame中的元数据 StructType structType = DataTypes.createStructType(structFields); // 8.将row类型的rdd数据和对应的字段名称类型转成DataFrame DataFrame df = sqlContext.createDataFrame(rowJavaRDD, structType); // 9.查询前20行数据 df.show(); // 10.将DataFrame转为RDD JavaRDD<Row> javaRDD = df.javaRDD(); // 11.遍历rdd数据 javaRDD.foreach(new VoidFunction<Row>() { @Override public void call(Row row) throws Exception { System.out.println((String)row.getAs("name")); } }); // 12.关闭 sc.close(); } }

2.通过反射的方式将非json格式的RDD转换成DataFrame,不推荐使用

scala版本

package com.kevin.scala.dataframe import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} /** * 通过反射的方式将非json格式的RDD转换成DataFrame,不推荐使用 */ object DataFrameRDDWithReflect { def main(args: Array[String]): Unit = { // 1.创建sparkconf val conf = new SparkConf().setAppName("DataFrameRDDWithReflect").setMaster("local") // 2.创建sc val sc = new SparkContext(conf) // 3.创建sqlcontext val sqlContext = new SQLContext(sc) // 4.导入隐饰操作,否则RDD无法调用toDF方法 import sqlContext.implicits._ val line = sc.textFile("DTSparkSql\\src\\main\\resources\\person.txt") // 5.读取文件用map切分,再用map将数据装成Person类型,toDF转成DataFrame val df = line.map(_.split(",")).map(p => new Person(p(0),p(1),p(2).trim.toInt)).toDF // 6.注册成临时表 df.registerTempTable("person") // 7.查询id=2的数据 sqlContext.sql("select * from person where id = 2").show() // 8.将df转成rdd,map将数据封装到person中 val map = df.rdd.map(r => new Person(r.getAs("id"),r.getAs("name"),r.getAs("age"))) // 9.遍历person的数据 map.foreach(println(_)) // 10.关闭sc sc.stop } }

java版本

package com.kevin.java.dataframe; import com.kevin.java.entity.Person; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; /** * @author kevin * @version 1.0 * @description 通过反射的方式将非json格式的RDD转换成DataFrame,不推荐使用 * @createDate 2019/1/6 */ public class DataFrameRDDWithReflect { public static void main(String[] args) { // 1.创建SparkConf作业设置作业名称和模式 SparkConf conf = new SparkConf().setAppName("DataFrameRDDWithReflect").setMaster("local"); // 2.基于Sparkconf对象创建一个SparkContext上下文,它是通往集群的唯一通道,且在创建时会创建任务调度器 JavaSparkContext sc = new JavaSparkContext(conf); // 3.根据sc创建SQLContext对象对sql进行分析处理 SQLContext sqlContext = new SQLContext(sc); // 4.读取文件数据 String file = "DTSparkSql\\src\\main\\resources\\person.txt"; JavaRDD<String> lineRDD = sc.textFile(file); // 5.获取每一行txt数据转为person类型 JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() { @Override public Person call(String line) throws Exception { String[] s = line.split(","); Person p = new Person(); p.setId(s[0]); p.setName(s[1]); p.setAge(Integer.valueOf(s[2])); return p; } }); // 6.传入Person的时候,sqlContext是通过反射的方式创建DataFrame // 在底层通过反射的方式获得Person的所有field,结合RDD本,就生成了DataFrame DataFrame df = sqlContext.createDataFrame(personRDD, Person.class); // 7.将DataFrame注册成表 df.registerTempTable("person"); // 8.查询id=2的数据 DataFrame sql = sqlContext.sql("select id,name,age from person where id=2"); sql.show(); // 9.将DataFrame转成RDD并用row.getAs("列名")获取数据 JavaRDD<Row> javaRDD = df.javaRDD(); JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() { @Override public Person call(Row row) throws Exception { Person p = new Person(); p.setId((String) row.getAs("id")); p.setName((String) row.getAs("name")); p.setAge((Integer) row.getAs("age")); return p; } }); // 10.遍历数据 map.foreach(new VoidFunction<Person>() { @Override public void call(Person person) throws Exception { System.out.println(person); } }); // 11.关闭 sc.close(); } }

 

最新回复(0)