data pros

Pearson correlation aggregation on Apache Spark SQL

  For Apache Spark 1.6, I’ve been working to add Pearson correlation aggregation functionality to Spark SQL. The aggregation function is one of the expressions in Spark SQL. It can be used with the GROUP BY clause within SQL queries or DSL syntax within DataFrame/Dataset APIs. The common aggregation functions are sum, count, etc. At first glance, Pearson correlation might not seem connected to the Spark SQL aggregation function. However, since we want to compute a single value from a group of data, we’ll see that it does natively fit to the nature of the aggregation function. All aggregation functions as expressions are located in the file sql/catalyst/src/main/ scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala. The definition is:
case class Corr(
  left: Expression,
  right: Expression, 
  mutableAggBufferOffset: Int = 0, 
  inputAggBufferOffset: Int = 0)
In the snippet, left and right represent expressions (typically two columns in your DataFrame) that we can use for the Pearson correlation. Similarly, mutableAggBufferOffset and inputAggBufferOffset are parameters specified for the Spark SQL aggregation framework. They’re related to the positions of this expression in the aggregation buffer object during execution. Typically, you can leave the default values. Spark SQL has two kinds of aggregation functions: ImperativeAggregate and DeclarativeAggregate. ImperativeAggregate provides a fixed interface including initialize(), update(), and merge() functions that work on aggregation buffers. DeclarativeAggregate is implemented with Catalyst expressions. Regardless of the kind of aggregation functions you choose to implement, your aggregation function must define the schema (aggBufferSchema) and attributes (aggBufferAttributes) that indicate the schema and attributes for the aggregation buffer used by this aggregation function. For the Corr aggregation function, the schema and attributes are
def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)

val aggBufferAttributes: Seq[AttributeReference] = Seq( 
   AttributeReference("xAvg", DoubleType)(), 
   AttributeReference("yAvg", DoubleType)(),
   AttributeReference("Ck", DoubleType)(), 
   AttributeReference("MkX", DoubleType)(), 
   AttributeReference("MkY", DoubleType)(), 
   AttributeReference("count", LongType)())
The code specifies six columns in the aggregation buffers and defines their attribute names and data types. Because Corr implements the ImperativeAggregate interface, it needs to implement the initialize(), update(), and merge() functions. The initialize() function has the signature: def initialize(buffer: MutableRow): Unit Its goal is to initialize the aggregation buffer. You can see that the aggregation buffer is actually a row. We put the initial values into the buffer. The signature for the update() function is:
def update(buffer: MutableRow, input: InternalRow): Unit
It’s responsible for updating the content of the aggregation buffer with an input row. Corr’s update function follows the algorithm of Pearson correlation to update the buffer. In other words, we compute the co-variance for all rows in the same partition. Then in the merge() function, we merge the two aggregation buffers from the two partitions:
def merge(buffer1: MutableRow, buffer2: InternalRow): Unit
The final evaluation calls the eval() function to compute the final result for this function based on the content of aggregation buffer:
def eval(buffer: InternalRow): Any
With the implementation of Pearson correlation aggregation function, we now can compute this measure between two columns in a DataFrame:
val df = Seq.tabulate(10)(i => (1.0 * i, 2.0 * i)).toDF("a", "b") 
val corr = df.groupBy().agg(corr("a", “b”)).collect()

Newsletter

You Might Also Enjoy

Kevin Bates
Kevin Bates
6 months ago

Limit Notebook Resource Consumption by Culling Kernels

There’s no denying that data analytics is the next frontier on the computational landscape. Companies are scrambling to establish teams of data scientists to better understand their clientele and how best to evolve product solutions to the ebb and flow of today’s business ecosystem. With Apache Hadoop and Apache Spark entrenched as the analytic engine and coupled with a trial-and-error model to... Read More

Gidon Gershinsky
Gidon Gershinsky
8 months ago

How Alluxio is Accelerating Apache Spark Workloads

Alluxio is fast virtual storage for Big Data. Formerly known as Tachyon, it’s an open-source memory-centric virtual distributed storage system (yes, all that!), offering data access at memory speed and persistence to a reliable storage. This technology accelerates analytic workloads in certain scenarios, but doesn’t offer any performance benefits in other scenarios. The purpose of this blog is to... Read More