Apache Spark™ 2.0: Extend Structured Streaming for Spark ML

Early methods to integrate machine learning using Naive Bayes and custom sinks.

To learn more about Structured Streaming and Machine Learning, check out Holden Karau’s and Seth Hendrickson’s session Spark Structured Streaming for machine learning at Strata + Hadoop World New York from 2:05pm to 2:45pm, Thursday September 29th.

Slides from Holden and Seth's presentation are available here.

This post was originally published by O'Reilly.

Spark’s new ALPHA Structured Streaming API has caused a lot of excitement because it brings the Data set/DataFrame/SQL APIs into a streaming context. In this initial version of Structured Streaming, the machine learning APIs have not yet been integrated. However, this doesn’t stop us from having fun exploring how to get machine learning to work with Structured Streaming. (Simply keep in mind this is exploratory, and things will change in future versions.)

For our Spark Structured Streaming for machine learning talk on at Strata + Hadoop World New York 2016, we’ve started early proof-of-concept work to integrate structured streaming and machine learning available in the spark-structured-streaming-ml repo. If you are interested in following along with the progress toward Spark's ML pipelines supporting structured streaming, I encourage you to follow SPARK-16424 and give us your feedback on our early draft design document.

One of the simplest streaming machine learning algorithms you can implement on top of structured streaming is Naive Bayes, since much of the computation can be simplified to grouping and aggregating. The challenge is how to collect the aggregate data in such a way that you can use it to make predictions. The approach taken in the current streaming Naive Bayes won’t directly work, as the ForeachSink available in Spark Structured Streaming executes the actions on the workers, so you can’t update a local data structure with the latest counts.

Instead, Spark's Structured Streaming has an in-memory table output format you can use to store the aggregate counts.

// Compute the counts using a Dataset transformation
    val counts = ds.flatMap{
      case LabeledPoint(label, vec) => from 1).map(value => LabeledToken(label, value))
    }.groupBy($"label", $"value").agg(count($"value").alias("count"))
    // Create a table name to store the output in
    val tblName = "qbsnb" + java.util.UUID.randomUUID.toString.filter(_ != '-').toString
    // Write out the aggregate result in complete form to the in memory table
    val query = counts.writeStream.outputMode(OutputMode.Complete())
    val tbl = ds.sparkSession.table(tblName).as[LabeledTokenCounts]

The initial approach taken with Naive Bayes is not easily generalizable to other algorithms, which cannot as easily be represented by aggregate operations on a Dataset. Looking back at how the early DStream-based Spark Streaming API implemented machine learning can provide some hints on one possible solution. Provided you can come up with an update mechanism on how to merge new data into your existing model, the DStream foreachRDD solution allows you to access the underlying micro-batch view of the data. Sadly, foreachRDD doesn't have a direct equivalent in Structured Streaming, but by using a custom sink, you can get similar behavior in Structured Streaming.

The sink API is defined by StreamSinkProvider, which is used to create an instance of the Sink given a SQLContext and settings about the sink, and Sink trait, which is used to process the actual data on a batch basis.

abstract class ForeachDatasetSinkProvider extends StreamSinkProvider {  
  def func(df: DataFrame): Unit

  def createSink(
      sqlContext: SQLContext,
      parameters: Map[String, String],
      partitionColumns: Seq[String],
      outputMode: OutputMode): ForeachDatasetSink = {
    new ForeachDatasetSink(func)

case class ForeachDatasetSink(func: DataFrame => Unit)  
    extends Sink {
  override def addBatch(batchId: Long, data: DataFrame): Unit = {

As with writing DataFrames to customs formats, to use a third-party sink, you can specify the full class name of the sink. Since you need to specify the full class name of the format, you need to ensure that any instance of the SinkProvider can update the model—and since you can’t get access to the sink object that gets constructed—you need to make the model outside of the sink.

object SimpleStreamingNaiveBayes {  
  val model = new StreamingNaiveBayes()

class StreamingNaiveBayesSinkProvider extends ForeachDatasetSinkProvider {  
  override def func(df: DataFrame) {
    val spark = df.sparkSession

You can use the custom sink shown above to integrate machine learning into Structured Streaming while you are waiting for Spark ML to be updated with Structured Streaming.

// Train using the model inside SimpleStreamingNaiveBayes object
  // - if called on multiple streams all streams will update the same model :(
  // or would except if not for the hard coded query name preventing multiple
  // of the same running.
  def train(ds: Dataset[_]) = {
      "com.highperformancespark.examples.structuredstreaming." +

If you are willing to throw caution to the wind, you can access some Spark internals to construct a sink that behaves more like the original foreachRDD. If you are interested in custom sink support, you can follow SPARK-16407 or this PR.

The cool part is, regardless of whether you want to access the internal Spark APIs, you can now handle batch updates in the same way Spark’s earlier streaming machine learning is implemented.

While this certainly isn't ready for production usage, you can see that the Structured Streaming API offers a number of different ways it can be extended to support machine learning.

You can learn more in High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark.

Follow Holden on Twitter: @holdenkarau


You Might Also Enjoy

Kevin Bates
Kevin Bates
2 months ago

Limit Notebook Resource Consumption by Culling Kernels

There’s no denying that data analytics is the next frontier on the computational landscape. Companies are scrambling to establish teams of data scientists to better understand their clientele and how best to evolve product solutions to the ebb and flow of today’s business ecosystem. With Apache Hadoop and Apache Spark entrenched as the analytic engine and coupled with a trial-and-error model to... Read More

Gidon Gershinsky
Gidon Gershinsky
4 months ago

How Alluxio is Accelerating Apache Spark Workloads

Alluxio is fast virtual storage for Big Data. Formerly known as Tachyon, it’s an open-source memory-centric virtual distributed storage system (yes, all that!), offering data access at memory speed and persistence to a reliable storage. This technology accelerates analytic workloads in certain scenarios, but doesn’t offer any performance benefits in other scenarios. The purpose of this blog is to... Read More