open source

Improvements to the SizeEstimator class in Apache™ Spark

by Vijay Sundaresan, Adam Roberts, and Andrew Craik

What's the SizeEstimator class and why should I care?

org.apache.spark.util.SizeEstimator is a core class used by Apache™ Spark that walks the object graph rooted at a given object and uses knowledge of the JVM object model to arrive at an estimate for the amount of Java heap that is transitively held live by the object. Note that this means size estimation follows field references to other objects recursively down the object graph and so it also needs to keep track of objects that it has previously encountered and estimated in order to be accurate in the presence of cycles/DAGs in the object graph. The size estimate that is produced gets used by various SizeTracking variants of many common internal Spark data structures in order to enforce the Spark data heap ratio for RDD data.

Inaccuracies in size estimation can lead to inefficiencies in executing the Spark workload:

  • Overestimation = excessive spilling to disk during computation
  • Underestimation = insufficient memory to execute application

How important is a good size estimator for performance?

As a result of these changes, we measured a 5% improvement in the execution time of the Pagerank benchmark (using the Large sized input included in the HiBench suite) running on Apache Spark 1.6.0 on a Linux 56-core Haswell setup as well as a 15% reduction in the estimated RDD size. Without our changes, Pagerank was taking around 85 seconds to run in this configuration and with our changes it ran in close to 81 seconds.

Great, so what are the changes?

We started looking into the SizeEstimator class because of the prominence of this class in several profiles we were analyzing for improving performance of Spark benchmark and it was clear that it had multiple areas in which the code could be improved.

We describe both the general changes to this class as well as changes that make it more accurate when run with the IBM Java SDK. We're planning to contribute these changes to the Apache Spark community in the near future.

Specifically, we looked at these JIRAs:

General performance improvements

The original implementation of the SizeEstimator class is shown below (with a comment next to each line of code that was observed to be a performance bottleneck in our analysis of this class).

private def getClassInfo(cls: Class[_]): ClassInfo = {  
  …
  val parent = getClassInfo(cls.getSuperclass)           // Expensive synchronization on global hash table
  var shellSize = parent.shellSize
  var pointerFields = parent.pointerFields
  val sizeCount = Array.fill(fieldSizes.max + 1)(0)
  // iterate through class fields and gather info.
  for (field <- cls.getDeclaredFields) {                            // Java reflection is expensive and the for loop creates an iterator to walk over the returned array
    if (!Modifier.isStatic(field.getModifiers)) {                  
      if (field.getType.isPrimitive) {
        sizeCount(primitiveSize(fieldClass)) += 1
      } else {
        field.setAccessible(true)                                           // Makes JIT compiler conservative around final fields
        sizeCount(pointerSize) += 1
        pointerFields = field :: pointerFields                      // Creates a new array and copies contents of current    
                                                                                                   // pointerFields list every time
      }
    }
  }

A brief description of each of the issues we identified and the code change that fixes the issue:

  • getClassInfo uses a global hash table as a cache with concurrency safety provided through the use of atomic add operations. Since there are not expected to be a huge number of entries in the hash table (one entry per estimated Java class), we changed this to use a thread local hash table and accept the slight increase in overall footprint for the more significant increase in execution speed by avoiding the overhead incurred by the provision of thread safety.

  • Java reflection is known to be expensive because it often involves calls into native JVM code; the results of getDeclaredFields (an array of fields) were also being packaged up into a newly allocated array iterator every time leading to extra instruction path length and Garbage Collection (GC) overhead. We made two changes to improve the performance of this part of the SizeEstimator logic: 1) changed the loop to iterate over the indices of the array of fields directly by changing the for loop into a while loop and 2) using the Unsafe.get API instead of calling setAccessible thereby conveying to the JIT compiler that final fields are no more likely to be changed than if we had not executed this size estimation logic.

  • The pointerFields variable was declared to be of type ArrayBuffer (a Scala class that was creating a new backing array and copying the contents from the older array every time a new element was added) originally and we changed it to use the Java collections class ArrayList instead because it was better both from a footprint as well as instruction path length perspective.

  • Array size estimation uses a sampling scheme when estimating the transitive size of large arrays. The estimate is computed by sampling a subset of the elements in the array, averaging their size, and then multiplying the average size by the number of elements in the array. This is considered representative because the sampled elements are chosen at random. To inject the randomness required for the sampling scheme to be effective, the original implementation used the Java utility class java.util.Random which uses synchronization to offer randomness guarantees across multiple threads. The current size estimation is single-threaded and so we do not need the overhead incurred by this synchronization infrastructure and a java.util.ThreadLocalRandom class was used instead since it provides sufficient randomness for the purposes of size estimation.

Changes that would improve performance on the IBM Java SDK

The IBM Java SDK uses the J9 virtual machine as the core Java runtime technology whereas OpenJDK uses the Hotspot virtual machine. Different JVM implementations can have different performance characteristics for any number of reasons: different optimizing JIT compilers, different garbage collection technologies, different JVM lock contention schemes etc. since these are all JVM internals that are not covered by the Java virtual machine specification.

JVM implementations could also differ in terms of the object model used to represent Java objects and this could mean we have the same object in a given Java program be represented differently, e.g. have a different size and/or shape in two different JVM implementations. The object model decides the following four aspects of how a Java object gets represented:

  • Field and array element sizes: the number of bytes that it takes to represent a field or array element of a given datatype. e.g. if the datatype is a "byte" the JVM has the freedom to represent it either as an 8-bit value or as a 32-bit value.

  • Object and field alignment: every Java object is allocated at some minimum guaranteed alignment boundary generally in all JVM implementations. Furthermore, a JVM has to usually align a field of size N bytes to an address that is guaranteed to be some multiple of N but again, the details for how much padding is inserted for alignment could vary by implementation.

  • 64-bit compressed references : each JVM implementation supports 64-bit reference compression (representing Java object references as 32-bit values instead of 64-bit values) up to a different maximum Java heap threshold

  • Object header: the number of header fields and the meaning of each field in the header for each Java object and array. Usually varies depending on whether one is running a 32-bit or 64-bit (as well as compressed references or not).

The SizeEstimator was originally assuming the OpenJDK (Hotspot) object model regardless of which JVM implementation is being used.

We changed the code in the class to be more aware of the JVM implementation in use and implemented the correct object model description logic for all the four parameters listed above for the IBM Java SDK. Note: we did not affect the size estimates for Hotspot since we assumed those were accurate to begin with.

After we made our changes, we observed empirically that the size estimates were more accurate with the IBM Java SDK and were in fact showing a smaller RDD footprint in some cases when compared with OpenJDK, primarily because the IBM Java SDK has a smaller object header and supported compressed references up to a higher maximum heap threshold (almost 60GB) by default. These different design choices in the IBM Java SDK lead to substantial performance advantages in certain situations running Spark workloads because of reduced spilling and improved cache locality.

Conclusion and next steps

Our plan is to contribute these beneficial SizeEstimator changes to Apache Spark upstream soon so that the entire community can benefit from our work.

We also have some other areas of the Apache Spark codebase in which we are already prototyping changes that are showing promising results as well. We plan to share more information about these changes in future blog posts.

Our intent is to continue to search for new opportunities and make further improvements both to Apache Spark itself as well as the IBM Java SDK in an effort to improve the performance of key benchmarks in this space.

These performance efforts are just one part of IBM's multi-pronged commitment to working with the Apache Spark community to advance the technology and invest in its continued adoption and success.

Newsletter

You Might Also Enjoy

Kevin Bates
Kevin Bates
6 months ago

Limit Notebook Resource Consumption by Culling Kernels

There’s no denying that data analytics is the next frontier on the computational landscape. Companies are scrambling to establish teams of data scientists to better understand their clientele and how best to evolve product solutions to the ebb and flow of today’s business ecosystem. With Apache Hadoop and Apache Spark entrenched as the analytic engine and coupled with a trial-and-error model to... Read More

Gidon Gershinsky
Gidon Gershinsky
8 months ago

How Alluxio is Accelerating Apache Spark Workloads

Alluxio is fast virtual storage for Big Data. Formerly known as Tachyon, it’s an open-source memory-centric virtual distributed storage system (yes, all that!), offering data access at memory speed and persistence to a reliable storage. This technology accelerates analytic workloads in certain scenarios, but doesn’t offer any performance benefits in other scenarios. The purpose of this blog is to... Read More