open source

Exploring the Apache Spark™ DataSource API

Apache Spark™ provides a pluggable mechanism to integrate with external data sources using the DataSource APIs. These APIs allow Spark to read data from external data sources and also for data that is analyzed in Spark to be written back out to the external data sources. The DataSource APIs also support filter pushdowns and column pruning that can significantly improve the performance of queries. In this blog, we focus only on the read path, specifically getting data from the remote data source so that Spark can read it for further analysis.

The DataSource APIs consists of a few traits and an abstract class that is defined in:

spark/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala  

The following traits are useful for reading data from an external data source:

  • To register a short name for the datasource: DataSourceRegister

  • To create relations: RelationProvider, SchemaRelationProvider

  • Implementation of BaseRelation

  • To manage scans: TableScan, PrunedScan, and PrunedFilteredScan

See below for a list and description of the traits. For more information, see the Scaladoc for these traits here: https://spark.apache.org/docs/2.0.0/api/scala/index.html

RelationProvider - A factory method with one method, createRelation, that returns a BaseRelation. This is the starting point.

@DeveloperApi

trait RelationProvider {  
  /**

  * Returns a new base relation with the given parameters.

  * Note: the parameters' keywords are case 
  * insensitive and this insensitivity is enforced

  * by the Map that is passed to the function.

  */

  def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation

}

SchemaRelationProvider – Creates relations with a given schema. Similar to the RelationProvider’s createRelation method, but it takes the schema as an additional parameter.

def createRelation(  
  sqlContext: SQLContext,

  parameters: Map[String, String],

  schema: StructType): BaseRelation

BaseRelation - An abstract class that represents a collection of tuples with a given schema. It has 2 unimplemented methods:

def sqlContext: SQLContext  

def schema: StructType

Implementations require the schema for the data using the StructType format in Spark.

Scans:

A concrete implementation of a BaseRelation needs to mixin and implement one of the following scan traits to return the tuples as a RDD of Row objects:

TableScan: def buildScan(): RDD[Row]
Performs a complete table scan and returns all the tuples as an RDD of Row objects. If a BaseRelation can perform full table scans, it can implement this trait.

PrunedScan: Implement this trait if a BaseRelation can perform column pruning and return only the requested columns back as an RDD of Row objects.

trait PrunedScan {  
  def buildScan(requiredColumns: Array[String]): RDD[Row]

}

PrunedFilteredScan: Implement this trait if the BaseRelation can perform column pruning and filter pushdowns and return the tuples as an RDD of Row objects. Be sure to satisfy all the filter conditions.

Pushing the evaluation of the filters down to the remote data source can have significant performance improvements based on the selectivity of the predicates. Specifically, tuples that match the filters only get passed to Spark from the remote data source and rows that do not match are not sent. This helps to optimize the amount of data sent and thus improves response time. Unlike TableScan, all the rows are sent from the remote data source to Spark and then filters are applied inside Spark. If the datasource supports filter pushdowns, it’s optimal to implement the PrunedFilteredScan.

trait PrunedFilteredScan {  
  def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row]  

}

CatalystScan: An experimental API. Implement this trait if the BaseRelation can perform column pruning and filter pushdowns and return the tuples as an RDD for Row objects. The filters are the Sequence of the Expression objects internal to Spark, which is why this API is experimental. Note that this API might change across Spark releases.

trait CatalystScan {  

  def buildScan(requiredColumns: Seq[Attribute], filters: 
Seq[Expression]): RDD[Row]  
}

JDBC DataSource:

Spark has some default data source implementations, including a JdbcRelationProvider that provides connectivity to database systems that have a JDBC driver. Certain popular databases are supported by default, including DB2, Derby, Oracle, MySQL, MsSQL, and Postgres. For a full support list, see the implementations for JdbcDialect in the Spark SQL source code.

To see an implementation of the datasource, see the JdbcRelationProvider class in
sql/core/src/main/scala/org/apache/spark/sql/execution /datasources/jdbc/JdbcRelationProvider in the Spark code.

Let’s look at the usage of this remote data source.

Usage

There are two methods for accessing a remote data source:

  • By creating a table using the JDBC data source
  • By using the DataFrameReader read

The example below shows how to access a remote DB2 table using the JDBC data source in Spark using both methods:

1) Creating a table using the JDBC DataSource

spark.sql(“CREATE TABLE employees USING org.apache.spark.sql.jdbc 
OPTIONS (url 'jdbc:db2:// mydb2server.ottawa.xyz.com:51051/GV1022DB', dbtable 'GV1022SL.EMP_REGION', user 'myuser1', password 'mypwd)”)  

OR

spark.sql(“CREATE TABLE employees USING org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider OPTIONS (url 'jdbc:db2://mydb2server.ottawa.xyz.com:51051/GV1022DB', dbtable 'GV1022SL.EMP_REGION', user 'myuser1', password 'mypwd')”)  
  • The USING clause takes in the data source provider class name or the short name that is defined in the DataSourceRegister.

  • The OPTIONS clause takes in key-value pairs. All the key value pairs are passed to the RelationProvider.createRelation method in the parameters argument. In the example above: url, dbtable, user, password are parameters that are used by the JDBC datasource. The JDBC datasource also takes other parameters to identify the partition information.

Once the table is created you can perform SQL queries on the data. For example:

spark.sql(“select empid,empname from employees where division=’7H’ “)  

2) Using the API:
SparkSession.read.format(..).options(..).load to get a dataframe.

val df = spark.read.format("org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider").options(Map("url" -> "jdbc:db2://mydb2server.ottawa.xyz.com:51051/GV1022DB",
  "user" -> "myuser1", "dbtable"-> "GV1022SL.EMP_REGION","password" -> "mypwd", "driver" -> "com.ibm.db2.jcc.DB2Driver")).load  

As an alternative, you can specify the short name in the format. The short name for the JDBC Data Source is jdbc.

val df = spark.read.format("jdbc").options(Map("url" -> "jdbc:db2://mydb2server.ottawa.xyz.com:51051/GV1022DB",
  "user" -> "myuser1", "dbtable"-> "GV1022SL.EMP_REGION","password" -> "mypwd", "driver" -> "com.ibm.db2.jcc.DB2Driver")).load  

Next, let’s look at some of the relevant classes of the JDBC DataSource implementation.

  • JdbcRelationProvider extends RelationProvider and mixin with DataSourceRegister
  • JdbcRelation extends BaseRelation and mixin with PrunedFilteredScan and InsertableRelation
  • JdbcRDD extends RDD
  • JDBCPartition extends Partition

JdbcRelationProvider implements the createRelation method and the shortName from the DataSourceRegister trait. The parameters needed to connect to the remote datasource are passed in via the parameters argument in the createRelation method. This map includes all the information that was passed in via the OPTIONS clause in the create table statement or the options() using the DataFrameReader API.

In this case, the parameters include the dbtable which holds the database table to connect to, the user, the password, and the url that represents the connection JDBC url. It also accepts information for partitioning the read across tasks. The parameters are:

  • partitionColumn — Represents the column name that is used to partition. This column must be of integer type.
  • lowerBound — Represents the lower bound for the partition column.
  • upperBound — Sets the upper bound value for the partition column.
  • numPartitions — Sets the number of partitions.

In createRelation, if the parameters contain the partition column information then JDBCPartition objects are created. The JDBCPartition encapsulates the information required to generate a SQL query with the appropriate predicates to use in a ‘where’ clause for a given partition. A new instance of JDBCRelation will be created in createRelation.

JDBCRelation implements the PrunedFilteredScan’s buildScan method returning a JDBCRDD. The JDBCRDD extends the RDD of InternalRow objects and implements the compute method that returns an iterator. JDBCRDD overrides the getPartitions and the compute methods from the RDD class:

  • override def getPartitions: Array[Partitions] - Returns the partitions, that is the JDBCPartition instances calculated based on the options passed to the JDBC Data source RelationProvider.

  • override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow] – Implements an Iterator and overrides the hasNext() and next methods of the iterator. In this iterator implementation, the SQL query is generated with the appropriate where clause that corresponds to the partition and the SQL string representation of the filters. A JDBC connection is opened to the remote database and records are fetched. For each record that is fetched, the datatype is converted from the Jdbc datatype to Spark’s catalyst datatype. It's converted to an InternalRow and returned via the next method. In this iterator implementation, there's also a hook added to close the resultset, statement and the JDBC connection once the task completes. This is done using the context.addTaskCompletionListener.

The column pruning happens when creating the JDBCRDD. Only the required columns used in the query are fetched from the data source. For each filter the datasource supports, the predicates are converted to a string representation of a SQL expression and they are ‘AND’ed together. The partition clause and the filter SQL strings are all ‘AND’ed and the ‘where’ clause is generated for the SQL query. This is the SQL query that will be executed against the remote data source when the compute method is invoked on the RDD.

The next section provides pointers to the class that instantiates the DataSource and also gives an overview of the flow.

When you execute a query, the query goes through different phases from the parser to the analyzer — and from there to the optimizer, physical planning, and execution. The parser parses the query and generates a logical plan. The logical plan then passes through the analyzer that resolves the references to relations and attributes. For example:

select empid, empname from employees where division=’7H’  

When the employees table is created, an entry is stored in Spark’s catalog about the table. This is referred to as a datasource table and it stores the information about the datasource class name (that is, the RelationProvider implementation class), and the options that were passed in the create table statement.

In the Analyzer phase, the analyzer checks whether there is a relation ‘employees’ in the catalog. This happens as part of the ResolveRelations rule in the Analyzer. When Spark is setup with the InMemoryCatalog, then a SimpleCatalogRelation object containing the table metadata from the catalog is returned. The SimpleCatalogRelation is a LogicalPlan Node that wraps the Catalog Table. CatalogTable is the object representation of table metadata stored in Spark’s catalog.

The Analyzer has another rule, FindDataSourceTable, which converts the SimpleCatalogRelation into a DataSourceTable if the table is a data source table and creates a LogicalRelation. The DataSource.resolveRelation calls the createRelation on the RelationProvider implementation and returns the BaseRelation that gets wrapped inside the LogicalRelation.

DataSource is the main class responsible for performing the lookup on the datasource RelationProvider implementation class, and instantiating a BaseRelation by calling the RelationProvider.createRelation.

The LogicalRelation holds the BaseRelation in the logical plan.

The DataSourceStrategy class is used to plan the scans over data sources, which happens during the physical planning phase. Based on the scan traits that we saw earlier, the appropriate buildScan is called. RowDataSourceScanExec is the physical plan node for scanning data from a relation.

Examples of a third party DataSource:

Conclusion:

With the DataSource APIs available in Apache Spark for reading data from external data sources, you can read data and do further analysis. Spark provides APIs to allow data from Spark to be written to remote data sources as well.

Newsletter

You Might Also Enjoy

Gidon Gershinsky
Gidon Gershinsky
2 months ago

How Alluxio is Accelerating Apache Spark Workloads

Alluxio is fast virtual storage for Big Data. Formerly known as Tachyon, it’s an open-source memory-centric virtual distributed storage system (yes, all that!), offering data access at memory speed and persistence to a reliable storage. This technology accelerates analytic workloads in certain scenarios, but doesn’t offer any performance benefits in other scenarios. The purpose of this blog is to... Read More

James Spyker
James Spyker
4 months ago

Streaming Transformations as Alternatives to ETL

The strategy of extracting, transforming and then loading data (ETL) to create a version of your data optimized for analytics has been around since the 1970s and its challenges are well understood. The time it takes to run an ETL job is dependent on the total data volume so that the time and resource costs rise as an enterprise’s data volume grows. The requirement for analytics databases to be mo... Read More