Many of you will have used Spark SQL – a powerful component on top of Spark Core that implements the data abstraction called DataFrames, which supports a wide range of capabilities to query and manipulate structured and semi-structured data.
Using DataFrames, Spark SQL allows you query structured data inside Spark programs, using either SQL or the DataFrame API.
Spark SQL can act as a distributed SQL query engine and includes a cost-based optimizer, which is able to scale to multi-hour queries using the Spark engine, while supporting mid-query fault tolerance. Another important thing that Spark SQL provides is support for a persistent, columnar store format called Parquets, which we have found to significantly improve the performance of sparse-column queries. Apache Parquet [Apache Parquet](http://parquet.apache.org) is a columnar storage format that is widely used and supported within the Hadoop ecosystem. It is integrated with a variety of frameworks (e.g. Hive, Pig, Cascading, Crunch, Scalding, and Spark) which provides flexibility of choice on the query engine to create powerful, efficient data analysis pipelines. Columnar storage pivots the traditional row-based tabular model of relational data such that columns become the primary serialization object. [![Columnar Data](/content/images/2015/10/Columnar-Data.jpg?resize=565%2C175)](/content/images/2015/10/Columnar-Data.jpg) This makes columnar stores especially good for sparse-column queries, which involve only a small subset of the columns within a table, as would be the case when performing aggregation operations such as SUM() and AVG() that will process most or all of the values from a single column. In this way, queries against a Parquet table can retrieve and analyze these values from any column quickly and with minimal I/O. Moreover, the self-describing and persistent nature of Parquets make them very easy to work with and share, which opens up a wide range of options to implement complex analytic pipelines. Spark SQL provides support for both reading and writing Parquet files, which materializes the data and preserves the schema of the original data.
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.json("examples/src/main/resources/people.json") df.select(df['name'], df['age']).show()
df = sqlContext.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
All of these benefits are being leveraged in the IBM project with the SETI Institute, as described below.
Analyzing Radio Signal Data from the Alien Telescope Array
The IBM Emerging Technology jStart team is engaged in a project with the SETI Institute to analyze several terabytes of radio telescope signal data. The SETI Institute’s mission is to explore, understand and explain the origin and nature of life in the universe. A central element of the Institute’s operations is the Allen Telescope Array (ATA) located in the Hat Creek Radio Observatory in California. This phased array observatory combines over 40 radio dishes to look for faint signals which may betray the presence of intelligent extraterrestrial life.
The ATA data archives is comprised of two different data types: the 15 million binary file recordings of signals that are of potential interest (“Complex Amplitude” files), and the 200 million row table that registers every signal event detected by the ATA in the last decade (“SignalDB”).
Mining the ATA data for potential patterns can help to filter out radio frequency interference (usually human transmission, such as radar and satellites) and also identify anomalies that warrant further investigation. The large size of the SignalDB table, the relatively static nature of the ATA data archives, and the sparse-column nature of many analytic work streams, make Parquets an ideal way to apply the Spark engine in novel ways.
200 Million Signal Events in a Parquet
The jStart team is using Jupyter notebooks to define analytics pipelines in Python. In this context, Parquets are used to provide three fundamental benefits to the SETI project:
1) Improved query performance for queries that use only a small subset of the SignalDB columns. Some examples of the performance gains are detailed below.
2) Analytic output that can be used as a new “data baseline”, or launching point for several other analytic pipelines. For example, a complex calculation on signal Doppler effects can be done once, saved as a Parquet, and then used as the shared starting point for dozens of other notebooks.
3) A saved “checkpoint” in lengthy analytic pipelines, that permit recovery from unexpected errors within a complex, multi-step analytic pipeline. For example, some SignalDB queries involve table joins as the most efficient way to consolidate parallel analytics pipelines. Parquets can be created at key convergence points in the pipeline to create convenient “re-start” points in the overall notebook pipeline that mitigate the impact of unexpected platform errors.
With respect to performance gains, the following simple example shows how specific query times can be cut in half. The overall time savings in a lengthy pipeline can be considerable higher, since previously saved and shared Parquets can eliminate the need for costly “preparation” steps in the analytic process.
In this example, some aggregate values are being computed for the Doppler Drift measures of 200 million signal events, grouped by the Target ID (which uniquely identifies a celestial target of interest, such as a star with known planets from the Kepler telescope respository). Only two columns are bring used to compute these summaries (TgtId and DriftHzs), and the results could form the basis for powerful statistical analysis that will follow.
The first query is being run against a DataFrame that is mapped to the original source data, clocking in at 436 ms of total CPU time.
The second query is identical except that it uses a DataFrame applied to a Parquet file as its source. In this case, the total CPU time is reduced by over 35%, down to 228 ms.
In this particular instance, wall clock time was cut by more half. From here, it might make sense to save these results as another parquet, which can be used over days and weeks as the numeric basis for variance calculations, table joins with other analysis, etc.
In summary, Parquet covers off a sweet spot for many common types of structured analysis. The self-describing nature makes them easy to save, share and load. Their columnar format results in significant performance gains, especially on wide tables with only a subset of the columns being queried, and their maturity as a data format allows for broad support across a wide range of analytic platforms.
*This post was originally published in kdnuggets. – Ed. *