使用SparkSQL读取数据库数据并返回dataframe,感觉都要被各种示例写烂了,本文大体上是没有新意的,只不过加了些细节,对需要的人的而言还是比较重要的。
此外,示例方法均是使用Java编写,为什么不用Scala呢,实在是语法糖对于我这样的水平最多只到泛型为止的人而言,过于抽象了,过了一个月就不太记得之前写的是啥了,还是习惯明确对象。
介绍下几块细节吧。
驱动使用getDriver方法获取,注意其中的hbase指的是phoenix而不是原生HBase。
使用sql语句获取数据并进行处理。答案就在dbtable参数中传入sql 语句并且sql语句需要包裹一层,并且另起一个别名作为表名。
option("dbtable", "(select * from table) wangleai") 在使用oracle的时候,可以使用sessionInitStatement参数在会话创建之后读取数据之前,执行自定义sql语句,一般用于修改会话的相关配置。注意此参数只有在2.3版本以上才有。官网链接 至于我代码里写的sql的作用,那是用来改变日期类型和时间戳类型的默认时间格式,这样在sql语句里就可以直接这么写了(用于增量是巨好用的)。 option("oracle.jdbc.mapDateToTimestamp", "false").option("sessionInitStatement", initSql) select * from table where datefield > '2019-11-01 00:00:00' 这个是重头戏了,那就是分区字段!如果需要获取并处理超大规模的数据时,建议一定要有分区字段,用于partitionColumn, numPartitions,lowerBound, upperBound这四个选项。 partitionColumn字段目前仅支持数字、日期或者时间戳,用于Spark程序使用此字段内容将数据分成多个区块去执行。numPartitions决定了要分成多少个区域,lowerBound和upperBound分别使用partitionColumn的上届和下届,配合numPartitions决定每个区域使用哪些数据。 注意的是,partitionColumn, lowerBound, upperBound三者必须同时出现。而且使用分区字段时,查询的sql语句也需要加入分区字段。 (代码示例中使用的是一个自增长的数字类型ID作为分区字段) option("lowerBound", minNum).option("upperBound", maxNum).option("numPartitions", numPartitions + "").option("partitionColumn", partitionField) 然后就是代码了: public static void main(String[] args) { //自己设置Spark配置O! SparkConf conf = new SparkConf().setAppName("wangleai"); SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); Map<String, String> jdbcConfig = new HashMap<>(8); //数据库类型 jdbcConfig.put("database", "mysql"); //连接字符串 jdbcConfig.put("url", "jdbc:mysql://localhost:3306/mydb"); //用户名 jdbcConfig.put("username", "username"); //密码 jdbcConfig.put("password", "password"); //查询sql,在Spark中本人比较习惯处理sql的查询结果 jdbcConfig.put("sql", "Select * from mytable"); //分区字段,这个比较重要,特别是数据量大的时候 jdbcConfig.put("partitionfield", ""); Dataset<Row> jdbcDf = getJdbcDf(jdbcConfig, spark); jdbcDf.show(); jdbcDf.foreachPartition(partition -> { while (partition.hasNext()) { Row row = partition.next(); //对每一行做任何你想做的处理 System.out.println(row); } }); } /** * 获取jdbc df * * @param jdbcConfig 配置 * @param spark 会话 * @return */ private static Dataset<Row> getJdbcDf(Map<String, String> jdbcConfig, SparkSession spark) { String dbType = jdbcConfig.get("database"); String url = jdbcConfig.get("url"); String userName = jdbcConfig.get("username"); String password = jdbcConfig.get("password"); //直接使用表名 //String exeSql = jdbcConfig.get("sql"); //使用sql查询语句 String exeSql = String.format("(%s) wangleai", jdbcConfig.get("sql")); String partitionField = jdbcConfig.get("partitionfield"); String driver = getDriver(dbType); DataFrameReader dataFrameReader = spark.read() .format("jdbc") .option("url", url) .option("user", userName) .option("password", password) .option("fetchsize", 200) .option("driver", driver); if ("oracle".equalsIgnoreCase(dbType)) { //修改oracle会话默认时间格式 String initSql = "BEGIN " + "EXECUTE IMMEDIATE 'ALTER SESSION SET NLS_DATE_FORMAT=\"YYYY-MM-DD HH24:MI:SS\"';" + "EXECUTE IMMEDIATE 'ALTER SESSION SET NLS_TIMESTAMP_FORMAT=\"YYYY-MM-DD HH24:MI:SS\"';" + "END;"; //此参数需要保证Spark版本大于2.3,从而在获取数据前修改会话的一些配置 dataFrameReader.option("oracle.jdbc.mapDateToTimestamp", "false").option("sessionInitStatement", initSql); } Dataset<Row> jdbcDf; if (!"".equals(partitionField)) { // 采用分区读取数据 exeSql = exeSql.replaceAll("(?i)from", "," + partitionField + " from"); // 每批数量 int minNum = 1; int maxNum = 100000000; int pageNum = 10000 * 10; long numPartitions = (maxNum - minNum) / pageNum + 1; jdbcDf = dataFrameReader.option("dbtable", exeSql) .option("lowerBound", minNum) .option("upperBound", maxNum) .option("numPartitions", numPartitions + "") .option("partitionColumn", partitionField).load(); } else { jdbcDf = dataFrameReader.option("dbtable", exeSql).load(); } return jdbcDf; } /** * 获取驱动类 * * @param dataBase 数据库类型 * @return */ private static String getDriver(String dataBase) { String driver = ""; switch (dataBase.toLowerCase()) { case "hive": driver = "org.apache.hive.jdbc.HiveDriver"; break; case "hbase": driver = "org.apache.phoenix.jdbc.PhoenixDriver"; break; case "postgresql": driver = "org.postgresql.Driver"; break; case "kylin": driver = "org.apache.kylin.jdbc.Driver"; break; case "mysql": driver = "com.mysql.jdbc.Driver"; break; case "oracle": driver = "oracle.jdbc.driver.OracleDriver"; break; case "sqlserver": driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; break; default: System.out.println("暂不支持的数据库类型:" + dataBase); break; } return driver; }原本是准备1024写的,但是工作之后时间真的过得实在是太快了,哎,老了。