Why GPUs on Spark?
The computing industry has continued to evolve in its quest for more and more performance, first opting for systems with higher and higher CPU frequencies and then through parallelism with multi-core and multi-thread processor systems.
However, the ever increasing workload demands require even more processing power which cannot be satisfied with one large system. So the industry has shifted to scale out solutions using cluster computing software to divide, distribute and synchronize execution across many smaller nodes. This increases power consumption by a large extent, and network overheads continue to rise in proportion to the number of nodes in the system. An alternative solution is to leverage special purpose hardware accelerators to provide additional computational power to individual cluster nodes. This requires fewer cluster nodes, which translates to less deployment complexity and less power consumption in aggregate.
For scaled-out, distributed computing, Apache Spark™ is the most popular software framework used to develop clustered applications for real-time big data analytics. On the special-purpose hardware acceleration side, NVIDIA’s CUDA GPU is the most popular hardware acceleration for HPC and machine learning.
In this article, we'll discuss how we're enabling these two technologies, Apache Spark and NVIDIA GPUs, to work together, while also making it easier for the programmer to exploit GPUs under Spark. This article assumes the audience are familiar with Spark and NVIDIA CUDA.
What's the Deal with Integrating GPUs to Spark:
Off-loading specific tasks to GPU involves multiple steps:
The first step in bringing GPUs into the Spark computational framework is to solve the problem of getting the data into a format that the GPU can consume. The fundamental programming abstraction in Spark is the RDD (Resilient Distributed Dataset) which Spark uses to partition data for parallel execution. By design, Spark performs computation on an individual RDD partition array data row by row for every transformation. In order to offload the computation to the GPU and enable the thousands of GPU threads to operate on the RDD partition data in parallel, we need all the data available for GPU simultaneously instead of row by row. Hence, we must convert the row-formatted RDDs to a columnar format for GPU and vice versa for GPU. These data should be made available in off-heap memory as GPU cannot access Java objects directly. For example, we need to accept array pointers as input and output parameters over which the GPU threads are made to perform operations in parallel on different indexes of the array.
The GPU must first read the data it will compute on from system memory into its local memory, and then likewise write the result of its computation from its local memory back to system memory. (On POWER8 systems with NVLINK 1.0 support, these data movements can occur with extremely high bandwidth compared to PCI Express. On future POWER9 systems with NVLINK 2.0, the GPU will have full coherent access to all of system memory, further expanding what's possible with GPUs.) With Spark, there is a need to transfer converted columnar RDD data from the CPU to GPU and vice versa for each computation. In some cases, it would be good to cache the data in GPU memory itself similar to Spark’s block manager cache. This will help with iterative processing algorithms like those found in machine learning.
The Spark application also needs to work in a heterogeneous environment where Java virtual machines (worker nodes) with and without GPU support coexist. If the GPU is attached to a worker node, the executor should be capable of initializing the GPU, loading the native GPU kernel provided by the user and then copying only the relevant partitioned RDD data into the GPU memory. If it succeeds, the executor should perform the GPU computation and copy back the results from the GPU memory to the CPU memory. In the case the executor does not have a GPU, it should run the fallback lambda expression provided by the user and yield the result. We will discuss this in detail in the later sections.
Normally, GPUs are attached to the machine in PCIe card slots which have much lower bandwidth compared to CPU memory bandwidth. The data transfer time should never outrun the computational gain we get from GPU executions. If data is transferred back and forth between CPU memory and GPU memory too often we will definitely hit this issue, and defeat the purpose of using the GPU accelerator. This is again where NVLINK connections to the host CPU memory on POWER systems will help expand the problem sets that can be tackled with GPUs.
To study the performance impact, we compared the CPU and GPU execution time on a Power8 machine (Firestone) with NVidia K80 card by running "Logistic Regression" with the following parameters:
- N – Number of Samples: 1 million records
- D – Feature count: 400
It's evident that the performance gain with respect to the computational time is ~2.5X including the data conversion time (time spent in modifying the row format to columnar format and then moving the data to GPU memory). Then the amount of gain with respect to computational cores after off-loading the task to GPU is multi-fold. In our case, we used 160 cores to get maximum application performance in CPU, however just 4 cores were used when the task was off-loaded to GPU. So the gain in CPU cores is around ~40X.
GPUEnabler package is up for the task
The GPUEnabler package (available here: https://github.com/IBMSparkGPU/GPUEnabler) provides these capabilities as a user plug-in to the Spark core. User applications can offload workloads to the GPU in a transparent way just by adding this package to its dependency list as follows:
Using SBT: libraryDependencies += "com.ibm" %% "gpu-enabler_2.10" % "1.0.0"
Using Maven: <dependency> <groupId>com.ibm</groupId> <artifactId>gpu-enabler_2.10</artifactId> <version>1.0.0</version> </dependency>
To try this package, here are some prerequisites:
- NVIDIA GPU card with CUDA support of 7.0+.
- CUDA drivers & Runtime drivers for your platform need to be installed.
This package adds two "Transformer APIs" (mapExtFunc and CacheGpu) and one "Action API" (reduceExtFunc) to the RDD interface. Running tasks on the GPU involves writing native code and this package provides a way to map the native symbol/functions to Scala objects using CUDAFunction.
Let's see how to write a GPU CUDA kernel for GPUEnabler package. The input/output arguments of the GPU kernel functions should adhere to the following guidelines:
- The first argument should always hold the number of elements to process (for example,
- The second set of arguments are considered input arguments to the GPU kernel. This set can be for one or more arguments.
- The third set of arguments are to hold the return values from the GPU kernel. This set can be for one or more arguments.
- The fourth set of arguments holds input free variables passed into
inputFreeVariables. This set can be empty or take arguments.
- The fifth set of arguments holds an array of dimensions and is used if the result is an array folded in a linear form. This set can be empty for a single dimension array or take one argument.
- The sixth set of arguments holds constant variables passed into
constArgs. This set can be empty or take arguments.
- The seventh argument holds the current stage number specifically for the reduce operation which will be done in multiple stages. This set can be empty or take one argument.
- The eighth argument holds the total stage count specifically for the reduce operation which will be done in multiple stages. This set can be empty or take one argument.
The CUDA program needs the nvcc compiler provided by NVIDIA to get the kernel ready. Once the kernel is ready, you can register the native symbols as follows:
// Import needed for the Spark GPU method to be added import com.ibm.gpuenabler.CUDARDDImplicits._ import com.ibm.gpuenabler.CUDAFunction // Load a kernel function from the GPU kernel binary val ptxURL = SparkGPULR.getClass.getResource("/GpuEnablerExamples.pt x") val mapFunction = new CUDAFunction( "multiplyBy2", // Native GPU function to multiple a given no. by 2 and return the result Array("this"), // Input arguments Array("this"), // Output arguments ptxURL) val reduceFunction = new CUDAFunction( "sum", // Native GPU function to sum the input argument and return the result Array("this"), // Input arguments Array("this"), // Output arguments ptxURL)
Once we have the CUDAFunction objects, we can pass them as an argument to either "mapExtFunc" or "reduceExtFunc" APIs so that the functionality corresponding to the native function mapped to the CUDAFunction object gets executed in the GPU using the data partitions referred to by the concerned RDD.
// 1. Apply a transformation. (Multiply all the values of the RDD by 2.) // (Note: Conversion of row based formatting to columnar format for consumption // by the GPU is done internally.) // 2. Trigger a reduction action (sum up all the values and return the result) val output = sc.parallelize(1 to n, 1) .mapExtFunc((x: Int) => 2 * x, mapFunction) .reduceExtFunc((x: Int, y: Int) => x + y, reduceFunction)
This library can also be added to Spark jobs launched through
spark-submit by using the
--packages command line option. For example, to include it when starting the Spark shell:
$ bin/spark-shell --packages com.ibm:gpu- enabler_2.10:1.0.0
--packages ensures that this library and its dependencies will be added to the classpath. The
--packages argument can also be used with
The complete program using GPUEnabler can be found under:
The CUDA program used by the above application can be found under:
Follow these steps to run this sample program:
The current support for GPU Enabler package is as follows:
- Supports x86_64 and ppc64le architectures
- Supports OpenJDK and IBM JDK
- Supports NVIDIA GPU with CUDA (We confirmed with CUDA 7.0).
- Supports CUDA 7.0 and 7.5 (Should work with CUDA 6.0 and 6.5.)
- Supports scalar variables in primitive scalar types and primitive array in RDD
To know more about compiling this package from source and to try out the examples, please follow the README.md file that comes along with the package found under https://github.com/IBMSparkGPU/GPUEnabler.
How it all works will continue to be a mystery until you access the source code also published under the github URL above.
That's not all, Folks:
We are continuing to improve and enhance the GPUEnabler, and would welcome your feedback and contributions. We'd also like to thank Kazuaki Ishizaki and Randy Swanberg for their contribution to this project.
Here are some of the future enhancements we are working on:
- Enabling the APIs to also work with DataFrames and DataSets
- GPU accelerated versions of Spark MLlib/ML and Graph algorithms.
- Dynamic generation of CUDA code for any given Spark operators.