apache spark

0 to Life-Changing App: New Apache SystemML API on Spark Shell

SystemML on Spark Shell? Yes!

A very simple way of using SystemML for all of your machine learning and big data needs. This tutorial will get you set up and running SystemML on the Spark Shell like a star. But first, to refresh your memory, let me remind you that I am on a quest to create a life-changing app! I am new to the world of data science and am currently tackling the challenge of building an app using Apache SystemML and Apache Spark one step at a time. If you haven't already, make sure to check out my previous tutorials, which start here.

So far we've daydreamed about delightful data, complained about how hard it is to find good data, found good data, learned how to write Scala and NOW we will learn how to access SystemML from the Spark Shell.

Not familiar with the Spark shell? Here's a great tutorial. Not sure what SystemML is? Look here!

At a high-level, SystemML is what is used for the machine learning and mathematical part of your data science project. You can log into Spark Shell, load SystemML on the shell, load your data and write your linear algebra, statistical equations, matrices, etc. in code much shorter than it would be in the Spark shell syntax. It helps not only with mathematical exploration and machine learning algorithms, but it allows you to be on Spark where you can do all of the above with really big data that you couldn't use on your local computer. Focusing on this step of your project, let's walk through how to set your computer up for all of SystemML's assumptions, how to load Spark Shell, load SystemML, load data and do a few examples in scala. (I promise a PySpark tutorial will come in the future!)

Now let's get going on our learning. First step: assumptions for SystemML.

Have Java, Scala, wget and Spark installed on your computer.

brew tap caskroom/cask  
brew install Caskroom/cask/java  
brew install scala  
brew install wget  
brew install apache-spark  

Now let's set up SystemML!

Download SystemML.

wget https://sparktc.ibmcloud.com/repo/latest/SystemML.jar  

Now type the following code to access the Spark Shell with SystemML.

spark-shell --executor-memory 4G --driver-memory 4G --jars SystemML.jar  

Now, using the Spark Shell (Scala), import the MLContext for SystemML.

import org.apache.sysml.api.mlcontext._  
import org.apache.sysml.api.mlcontext.ScriptFactory._  
val ml = new MLContext(sc)  


In the future you will just need to do the last two steps to get this going.

Let's figure out how to load a script and run it as well as load data and run some examples.

These examples and tons of documentation can also be found here.

Here's a quick example: Script from a URL.
Here s1 is created by reading Univar-Stats.dml from a URL address.

val uniUrl = "https://raw.githubusercontent.com/apache/incubator-systemml/master/scripts/algorithms/Univar-Stats.dml"  
val s1 = ScriptFactory.dmlFromUrl(uniUrl)  

More examples of how to load scripts can be found here.

Our next step is to parallelize the information, read in two matrices as RDDs, getting the sum of the first, the sum of the second and a message.

scala> val data1 = sc.parallelize(Array("1.0,2.0", "3.0,4.0”))  
scala> val data2 = sc.parallelize(Array("5.0,6.0", "7.0,8.0”))  
scala>val s = """  
     | s1 = sum(m1);
     | s2 = sum(m2);
     | if (s1 > s2) {
     |  message = "s1 is greater"
     | } else if (s2 > s1) {
     |  message = "s2 is greater"
     | } else {
     |  message = "s1 and s2 are equal"
     | }
     | """

scala> val script = dml(s).in("m1",data1).in("m2", data2).out("s1","s2", "message”)  

Your should get:

script: org.apache.sysml.api.mlcontext.Script =  
[1] (RDD) m1: ParallelCollectionRDD[0] at parallelize at <console>:33
[2] (RDD) m2: ParallelCollectionRDD[1] at parallelize at <console>:33

[1] s1
[2] s2
[3] message

Now print your script info. You should see:

scala> println(script.info)  
Script Type: DML

[1] (RDD) m1: ParallelCollectionRDD[0] at parallelize at <console>:33
[2] (RDD) m2: ParallelCollectionRDD[1] at parallelize at <console>:33

[1] s1
[2] s2
[3] message

Input Parameters:  

Input Variables:  
[1] m1
[2] m2

Output Variables:  
[1] s1
[2] s2
[3] message

Symbol Table:  
[1] (Matrix) m1: Matrix: null, [-1 x -1, nnz=-1, blocks (1 x 1)], csv, not-dirty
[2] (Matrix) m2: Matrix: null, [-1 x -1, nnz=-1, blocks (1 x 1)], csv, not-dirty

Script String:

s1 = sum(m1);  
s2 = sum(m2);  
if (s1 > s2) {  
 message = "s1 is greater"
} else if (s2 > s1) {
 message = "s2 is greater"
} else {
 message = "s1 and s2 are equal"

Script Execution String:  
m1 = read('');  
m2 = read('');

s1 = sum(m1);  
s2 = sum(m2);  
if (s1 > s2) {  
 message = "s1 is greater"
} else if (s2 > s1) {
 message = "s2 is greater"
} else {
 message = "s1 and s2 are equal"
write(s1, '');  
write(s2, '');  
write(message, '');  

Execute your script and get your results!

scala> val results = ml.execute(script)  
results: org.apache.sysml.api.mlcontext.MLResults =  
[1] (Double) s1: 10.0
[2] (Double) s2: 26.0
[3] (String) message: s2 is greater

Just as an example, you can set your value as x and get your results in Double form.
Not familiar with Scala? Check this tutorial out!

scala> val x = results.getDouble("s1")  
x: Double = 10.0

scala> val y = results.getDouble("s2")  
y: Double = 26.0

scala> x + y  
res1: Double = 36.0  

Here is another version. Because the API is very Scala friendly, you can pull out your results as a Scala tuple.

scala> val (firstSum, secondSum, sumMessage) = results.getTuple[Double, Double, String]("s1", "s2", "message")  
firstSum: Double = 10.0  
secondSum: Double = 26.0  
sumMessage: String = s2 is greater  

Here is the really handy part. As another example you can load in your data, type the short code and get a whole table of standard statistical measures for each feature!

Let's first get our data into Spark.
Because this step of our awesome life-changing, data science project/app is about focusing on the mathematical exploration (very soon it will be about machine learning algorithms), we want to make sure our data is clean and ready to go. Let's load in some data and run a SystemML script.

scala> val habermanUrl = "http://archive.ics.uci.edu/ml/machine-learning-databases/haberman/haberman.data"

scala> val habermanList = scala.io.Source.fromURL(habermanUrl).mkString.split("\n")

scala> val habermanRDD = sc.parallelize(habermanList)

scala> val typesRDD = sc.parallelize(Array("1.0,1.0,1.0,2.0"))

scala> val scriptUrl = "https://raw.githubusercontent.com/apache/incubator-systemml/master/scripts/algorithms/Univar-Stats.dml"

scala> val script = dmlFromUrl(scriptUrl).in("A", habermanRDD, habermanMetadata).in("K", typesRDD, typesMetadata).in("$CONSOLE_OUTPUT", true)  
scala> val results = ml.execute(script)

Feature [1]: Scale  
 (01) Minimum             | 30.0
 (02) Maximum             | 83.0
 (03) Range               | 53.0
 (04) Mean                | 52.45751633986928
 (05) Variance            | 116.71458266366658
 (06) Std deviation       | 10.803452349303281
 (07) Std err of mean     | 0.6175922641866753
 (08) Coeff of variation  | 0.20594669940735139
 (09) Skewness            | 0.1450718616532357
 (10) Kurtosis            | -0.6150152487211726
 (11) Std err of skewness | 0.13934809593495995
 (12) Std err of kurtosis | 0.277810485320835
 (13) Median              | 52.0
 (14) Interquartile mean  | 52.16013071895425
Feature [2]: Scale  
 (01) Minimum             | 58.0
 (02) Maximum             | 69.0
 (03) Range               | 11.0
 (04) Mean                | 62.85294117647059
 (05) Variance            | 10.558630665380907
 (06) Std deviation       | 3.2494046632238507
 (07) Std err of mean     | 0.18575610076612029
 (08) Coeff of variation  | 0.051698529971741194
 (09) Skewness            | 0.07798443581479181
 (10) Kurtosis            | -1.1324380182967442
 (11) Std err of skewness | 0.13934809593495995
 (12) Std err of kurtosis | 0.277810485320835
 (13) Median              | 63.0
 (14) Interquartile mean  | 62.80392156862745
Feature [3]: Scale  
 (01) Minimum             | 0.0
 (02) Maximum             | 52.0
 (03) Range               | 52.0
 (04) Mean                | 4.026143790849673
 (05) Variance            | 51.691117539912135
 (06) Std deviation       | 7.189653506248555
 (07) Std err of mean     | 0.41100513466216837
 (08) Coeff of variation  | 1.7857418611299172
 (09) Skewness            | 2.954633471088322
 (10) Kurtosis            | 11.425776549251449
 (11) Std err of skewness | 0.13934809593495995
 (12) Std err of kurtosis | 0.277810485320835
 (13) Median              | 1.0
 (14) Interquartile mean  | 1.2483660130718954
Feature [4]: Categorical (Nominal)  
 (15) Num of categories   | 2
 (16) Mode                | 1
 (17) Num of modes        | 1
results: org.apache.sysml.api.mlcontext.MLResults =  
[1] (Matrix) baseStats: Matrix: scratch_space/_p5250_9.31.116.229/parfor/2_resultmerge1, [17 x 4, nnz=44, blocks (1000 x 1000)], binaryblock, dirty

You can also ask for the base stats.

scala> val baseStats = results.getMatrix("baseStats")  
baseStats: org.apache.sysml.api.mlcontext.Matrix = org.apache.sysml.api.mlcontext.Matrix@237cd4e5

scala> baseStats.  
asDataFrame          asDoubleMatrix       asInstanceOf         asJavaRDDStringCSV   asJavaRDDStringIJV   asMLMatrix           asMatrixObject       asRDDStringCSV  
asRDDStringIJV       isInstanceOf         toString             

You can also get the base stats as an RDD. Note: IJV leaves out non values and CSV includes them. Here's an example of both:

scala> baseStats.asRDDString  
asRDDStringCSV   asRDDStringIJV   

scala> baseStats.asRDDStringCSV.collect  
res4: Array[String] = Array(30.0,58.0,0.0,0.0, 83.0,69.0,52.0,0.0, 53.0,11.0,52....1.0)

scala> baseStats.asRDDStringIJV.collect  
res5: Array[String] = Array(1 1 30.0, 1 2 58.0, 1 3 0.0, 1 4 0.0, 2 1 83.0, 2 2 69.0, 2 3 52.0, 2 4 0.0, ... 1...  

I think that's a great start to using SystemML with Spark Shell! Once you're done you can quit to exit.


You have successfully set up your computer for running SystemML and Spark, loaded the Spark shell, ran scripts, loaded data and run some examples!! Congrats!!

Stay tuned for more tutorials and next steps on our life-changing app!

By Madison J. Myers


You Might Also Enjoy

Gidon Gershinsky
Gidon Gershinsky
21 days ago

How Alluxio is Accelerating Apache Spark Workloads

Alluxio is fast virtual storage for Big Data. Formerly known as Tachyon, it’s an open-source memory-centric virtual distributed storage system (yes, all that!), offering data access at memory speed and persistence to a reliable storage. This technology accelerates analytic workloads in certain scenarios, but doesn’t offer any performance benefits in other scenarios. The purpose of this blog is to... Read More

James Spyker
James Spyker
3 months ago

Streaming Transformations as Alternatives to ETL

The strategy of extracting, transforming and then loading data (ETL) to create a version of your data optimized for analytics has been around since the 1970s and its challenges are well understood. The time it takes to run an ETL job is dependent on the total data volume so that the time and resource costs rise as an enterprise’s data volume grows. The requirement for analytics databases to be mo... Read More