Spark SQL
发家史
熟悉spark sql的都知道,spark sql是从shark发展而来。Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业(辅以内存列式存储等各种和Hive关系不大的优化);
同时还依赖Hive Metastore和Hive SerDe(用于兼容现有的各种Hive存储格式)。Spark SQL在Hive兼容层面仅依赖HQL parser、Hive Metastore和Hive SerDe。也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。执行计划生成和优化都由Catalyst负责。借助Scala的模式匹配等函数式语言特性,利用Catalyst开发执行计划优化策略比Hive要简洁得多。
Spark SQL中的DataFrame类似于一张关系型数据表。在关系型数据库中对单表或进行的查询操作,在DataFrame中都可以通过调用其API接口来实现。
Spark-SQL可以以其他RDD对象、parquet文件、json文件、hive表,以及通过JDBC连接到其他关系型数据库作为数据源来生成DataFrame对象。
以MySQL数据库为数据源,生成DataFrame对象后进行相关的DataFame之上的操作。 文中生成DataFrame的代码如下:
1 object DataFrameOperations { 2 def main (args: Array[String ]) { 3 val sparkConf = new SparkConf().setAppName( "Spark SQL DataFrame Operations").setMaster( "local[2]" ) 4 val sparkContext = new SparkContext(sparkConf) 5 6 val sqlContext = new SQLContext(sparkContext) 7 val url = "jdbc:mysql://k131:3306/test" 8 9 val jdbcDF = sqlContext.read.format( "jdbc" ).options( 10 Map( "url" -> url, 11 "user" -> "root", 12 "password" -> "root", 13 "dbtable" -> "spark_sql_test" )).load() 14 15 val joinDF1 = sqlContext.read.format( "jdbc" ).options( 16 Map("url" -> url , 17 "user" -> "root", 18 "password" -> "root", 19 "dbtable" -> "spark_sql_join1" )).load() 20 21 val joinDF2 = sqlContext.read.format( "jdbc" ).options( 22 Map ( "url" -> url , 23 "user" -> "root", 24 "password" -> "root", 25 "dbtable" -> "spark_sql_join2" )).load() 26 27 ... ... 28 } 29 }以表格的形式在输出中展示jdbcDF中的数据,类似于select * from spark_sql_test的功能。 show方法有四种调用方式,分别为, (1)show 只显示前20条记录。 示例:
jdbcDF.show
(2)show(numRows: Int) 显示numRows条 示例:
jdbcDF.show(3)(3)show(truncate: Boolean) 是否最多只显示20个字符,默认为true。 示例:
jdbcDF.show(true) jdbcDF.show(false)(4)show(numRows: Int, truncate: Boolean) 综合前面的显示记录条数,以及对过长字符串的显示格式。 示例:
jdbcDF.show(3, false)不同于前面的show方法,这里的collect方法会将jdbcDF中的所有数据都获取到,并返回一个Array对象。
jdbcDF.collect()功能和collect类似,只不过将返回结构变成了List对象,使用方法如下
jdbcDF.collectAsList()这个方法可以动态的传入一个或多个String类型的字段名,结果仍然为DataFrame对象,用于统计数值类型字段的统计值,比如count, mean, stddev, min, max等。 使用方法如下,其中c1字段为字符类型,c2字段为整型,c4字段为浮点型
jdbcDF .describe("c1" , "c2", "c4" ).show()
这里列出的四个方法比较类似,其中 (1)first获取第一行记录 (2)head获取第一行记录,head(n: Int)获取前n行记录 (3)take(n: Int)获取前n行数据 (4)takeAsList(n: Int)获取前n行数据,并以List的形式展现 以Row或者Array[Row]的形式返回一行或多行数据。first和head功能相同。 take和takeAsList方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生OutOfMemoryError
以下返回为DataFrame类型的方法,可以连续调用。
(1)where(conditionExpr: String):SQL语言中where关键字后的条件 传入筛选条件表达式,可以用and和or。得到DataFrame类型的返回结果, 示例:
jdbcDF .where("id = 1 or c1 = 'b'" ).show()
(2)filter:根据字段进行筛选 传入筛选条件表达式,得到DataFrame类型的返回结果。和where使用条件相同 示例:
jdbcDF .filter("id = 1 or c1 = 'b'" ).show()(1)select:获取指定字段值 根据传入的String类型字段名,获取指定字段的值,以DataFrame类型返回 示例:
jdbcDF.select( "id" , "c3" ).show( false)
转载于:https://www.cnblogs.com/Vowzhou/p/10867966.html