For Apache Spark 1.6, I’ve been working to add Pearson correlation aggregation functionality to Spark SQL. The aggregation function is one of the expressions in Spark SQL. It can be used with the GROUP BY clause within SQL queries or DSL syntax within DataFrame/Dataset APIs. The common aggregation functions are sum, count, etc.
At first glance, Pearson correlation might not seem connected to the Spark SQL aggregation function. However, since we want to compute a single value from a group of data, we’ll see that it does natively fit to the nature of the aggregation function.
All aggregation functions as expressions are located in the file sql/catalyst/src/main/ scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala. The definition is:
case class Corr(
mutableAggBufferOffset: Int = 0,
inputAggBufferOffset: Int = 0)
In the snippet, left and right represent expressions (typically two columns in your DataFrame) that we can use for the Pearson correlation. Similarly, mutableAggBufferOffset and inputAggBufferOffset are parameters specified for the Spark SQL aggregation framework. They’re related to the positions of this expression in the aggregation buffer object during execution. Typically, you can leave the default values.
Spark SQL has two kinds of aggregation functions: ImperativeAggregate and DeclarativeAggregate. ImperativeAggregate provides a fixed interface including initialize(), update(), and merge() functions that work on aggregation buffers. DeclarativeAggregate is implemented with Catalyst expressions. Regardless of the kind of aggregation functions you choose to implement, your aggregation function must define the schema (aggBufferSchema) and attributes (aggBufferAttributes) that indicate the schema and attributes for the aggregation buffer used by this aggregation function.
For the Corr aggregation function, the schema and attributes are
def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
val aggBufferAttributes: Seq[AttributeReference] = Seq(