分组的类型:
The simplest grouping is to just summarize a complete DataFrame by performing an aggregation in a select statement.
A “group by” allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns.
A “window” gives you the ability to specify one or more keys as well as one or more aggregation functions to transform the value columns. However, the rows input to the function are somehow related to the current row.
A “grouping set,” which you can use to aggregate at multiple different levels. Grouping sets are available as a primitive in SQL and via rollups and cubes in DataFrames.
A “rollup” makes it possible for you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized hierarchically.
A “cube” allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized across all combinations of columns.
每次分组产生RelationalGroupedDataset。
在交互式查询与hot Analysis中,Spark提供了精度与速度的权衡。
count() 最简单的聚合,是一个action,不是transformation。
val dataPath = "data/retail-data/all/*.csv" val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(dataPath).coalesce(5) df.cache() df.createOrReplaceTempView("dfTable") df.count()we can do one of two things: specify a specific column to count, or all the columns by using count(*) or count(1) to represent that we want to count every row as the literal one
import org.apache.spark.sql.functions.count df.select(count("StockCode")).show()当count(*),Spark会计算包含null的行,当单独在某些列上用count时,不计算null
countDistinct // in Scala import org.apache.spark.sql.functions.countDistinct df.select(countDistinct("StockCode")).show() // 4070 approx_count_distinct // in Scala import org.apache.spark.sql.functions.approx_count_distinct df.select(approx_count_distinct("StockCode", 0.1)).show() // 3364You will notice that approx_count_distinct took another parameter with which you can specify the maximum estimation error allowed.这样有很大性能提升。
first、lastThis will be based on the rows in the DataFrame, not on the values in the DataFrame
// in Scala import org.apache.spark.sql.functions.{first, last} df.select(first("StockCode"), last("StockCode")).show()min、max
sum
sumDistinct
avg、mean
Variance and Standard Deviation
By default, Spark performs the formula for the sample standard deviation or variance if you use the variance or stddev functions.
// in Scala import org.apache.spark.sql.functions.{var_pop, stddev_pop} import org.apache.spark.sql.functions.{var_samp, stddev_samp} df.select(var_pop("Quantity"), var_samp("Quantity"), stddev_pop("Quantity"), stddev_samp("Quantity")).show() skewness and kurtosisSkewness and kurtosis are both measurements of extreme points in your data. Skewness measures the asymmetry of the values in your data around the mean, whereas kurtosis is a measure of the tail of data.
import org.apache.spark.sql.functions.{skewness, kurtosis} df.select(skewness("Quantity"), kurtosis("Quantity")).show() Covariance and Correlationcov() 协方差
corr()相关性
import org.apache.spark.sql.functions.{corr, covar_pop, covar_samp} df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"), covar_pop("InvoiceNo", "Quantity")).show() Aggregating to Complex Typescollect_set
collect_list
agg
import org.apache.spark.sql.functions.{collect_set, collect_list} df.agg(collect_set("Country"), collect_list("Country")).show()First we specify the column(s) on which we would like to group, and then we specify.the aggregation(s). The first step returns a RelationalGroupedDataset, and the second step returns a DataFrame.
Grouping with ExpressionsRather than passing that function as an expression into a select statement, we specify it as within agg. This makes it possible for you to pass-in arbitrary expressions that just need to have some aggregation specified.
import org.apache.spark.sql.functions.{count, expr} df.groupBy("InvoiceNo").agg(count("Quantity").alias("quan"),expr("count(Quantity)")).show() Grouping with MapsSometimes, it can be easier to specify your transformations as a series of Maps for which the key is the column, and the value is the aggregation function (as a string) that you would like to perform. You can reuse multiple column names if you specify them inline, as well.
// in Scala df.groupBy("InvoiceNo").agg("Quantity"->"avg", "Quantity"->"stddev_pop").show()Spark supports three kinds of window functions: ranking functions, analytic functions, and aggregate functions.
import org.apache.spark.sql.functions.{col, to_date} val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm")) dfWithDate.createOrReplaceTempView("dfWithDate")The first step to a window function is to create a window specification. Note that the partition by is unrelated to the partitioning scheme concept that we have covered thus far. It’s just a similar concept that describes how we will be breaking up our group. The ordering determines the ordering within a given partition, and, finally, the frame specification (the rowsBetween statement) states which rows will be included in the frame based on its reference to the current input row.In the following example, we look at all previous rows up to the current row:
// in Scala import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.col val windowSpec = Window .partitionBy("CustomerId", "date") .orderBy(col("Quantity").desc) .rowsBetween(Window.unboundedPreceding, Window.currentRow)Now we want to use an aggregation function to learn more about each specific customer. An example might be establishing the maximum purchase quantity over all time. To answer this, we use the same aggregation functions that we saw earlier by passing a column name or expression. In addition, we indicate the window specification that defines to which frames of data this function will apply:
import org.apache.spark.sql.functions.max val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)You will notice that this returns a column (or expressions). We can now use this in a DataFrame select statement.Before doing so, though, we will create the purchase quantity rank. To do that we use the dense_rank function to determine which date had the maximum purchase quantity for every customer. We use dense_rank as opposed to rank to avoid gaps in the ranking sequence when there are tied values (or in our case, duplicate rows):
// in Scala import org.apache.spark.sql.functions.{dense_rank, rank} val purchaseDenseRank = dense_rank().over(windowSpec) val purchaseRank = rank().over(windowSpec)Now we can perform a select to view the calculated window values:
// in Scala import org.apache.spark.sql.functions.col dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId") .select( col("CustomerId"), col("date"), col("Quantity"), purchaseRank.alias("quantityRank"), purchaseDenseRank.alias("quantityDenseRank"), maxPurchaseQuantity.alias("maxPurchaseQuantity")).show(100)sometimes we want something a bit more complete—an aggregation across multiple groups. We achieve this by using grouping sets. Grouping sets are a low-level tool for combining sets of aggregations together. They give you the ability to create arbitrary aggregation in their group-by statements.
// in Scala val dfNoNull = dfWithDate.drop() dfNoNull.createOrReplaceTempView("dfNoNull") //传统方法 spark.sql("""SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull GROUP BY customerId, stockCode ORDER BY CustomerId DESC, stockCode DESC""").show(100) //Grouping Sets spark.sql("""SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode)) ORDER BY CustomerId DESC, stockCode DESC""").show()警告:Grouping sets depend on null values for aggregation levels. If you do not filter-out null values, you will get incorrect results. This applies to cubes, rollups, and grouping sets.
if you also want to include the total number of items, regardless of customer or stock code? With a conventional group-by statement, this would be impossible. But,it’s simple with grouping sets: we simply specify that we would like to aggregate at that level, as well, in our grouping set. This is, effectively, the union of several different groupings together: ?:什么意思
```scala
spark.sql("""
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),())
ORDER BY CustomerId DESC, stockCode DESC
""").show()
//?: 与传统方法的结果一样
The GROUPING SETS operator is only available in SQL. To perform the same in DataFrames, you use the rollup and cube operators—which allow us to get the same results.
When we set our grouping keys of multiple columns, Spark looks at those as well as the actual combinations that are visible in the dataset. A rollup is a multidimensional aggregation that performs a variety of group-by style calculations for us.
val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity")) .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity") .orderBy("Date") rolledUpDF.show()A null in both rollup columns specifies the grand total across both of those columns:
rolledUpDF.where("Country IS NULL").show() rolledUpDF.where("Date IS NULL").show()A cube takes the rollup to a level deeper. Rather than treating elements hierarchically, a cube does the same thing across all dimensions. This means that it won’t just go by date over the entire time period, but also the country.
// in Scala dfNoNull.cube("Date", "Country").agg(sum(col("Quantity"))) .select("Date", "Country", "sum(Quantity)").orderBy("Date").show()Sometimes when using cubes and rollups, you want to be able to query the aggregation levels so that you can easily filter them down accordingly. We can do this by using the grouping_id, which gives us a column specifying the level of aggregation that we have in our result set.
// in Scala import org.apache.spark.sql.functions.{grouping_id, sum, expr} dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity")) .orderBy(expr("grouping_id()").desc) .show()Pivots make it possible for you to convert a row into a column.
// in Scala val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()This DataFrame will now have a column for every combination of country, numeric variable, and a column specifying the date.
pivoted.where("date > '2011-12-05'").select("date" ,"`USA_sum(Quantity)`").show()User-defined aggregation functions (UDAFs) are a way for users to define their own aggregation functions based on custom formulae or business rules. You can use UDAFs to compute custom calculations over groups of input data (as opposed to single rows). Spark maintains a single AggregationBuffer to store intermediate results for every group of input data.
To create a UDAF, you must inherit from the UserDefinedAggregateFunction base class and implement the following methods:
inputSchema represents input arguments as a StructType
bufferSchema represents intermediate UDAF results as a StructType
dataType represents the return DataType
deterministic is a Boolean value that specifies whether this UDAF will return the same result for a given input
initialize allows you to initialize values of an aggregation buffer
update describes how you should update the internal buffer based on a given row
merge describes how two aggregation buffers should be merged
evaluate will generate the final result of the aggregation
定义UADF
// in Scala import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.Row import org.apache.spark.sql.types._ class BoolAnd extends UserDefinedAggregateFunction { def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", BooleanType) :: Nil) def bufferSchema: StructType = StructType( StructField("result", BooleanType) :: Nil ) def dataType: DataType = BooleanType def deterministic: Boolean = true def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = true } def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0) } def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0) } def evaluate(buffer: Row): Any = { buffer(0) } }注册及使用
// in Scala val ba = new BoolAnd spark.udf.register("booland", ba) import org.apache.spark.sql.functions._ spark.range(1) .selectExpr("explode(array(TRUE, TRUE, TRUE)) as t") .selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t") .select(ba(col("t")), expr("booland(f)")) .show()in Spark 2.3, you will also be able to call Scala or Java UDFs and UDAFs by registering the function just as we showed in the UDF.
转载于:https://www.cnblogs.com/DataNerd/p/10399778.html
相关资源:JAVA上百实例源码以及开源项目