Writing efficient Spark jobs

This article covers a multitude of levers that I discovered so far for tuning Apache Spark jobs so they use less memory and/or running time. For some time now Apache Spark has been the shooting star among big data technologies and rightfully so as among computing frameworks it does (in my opinion) the best job in bridging the gap between production jobs on the one hand and providing analytical tools on the other. But while Spark does provides an easy to use API which - correctly used - will give you great results without much effort, there is still a lot that one can do to improve a jobs efficiency. Such tuning is especially important when dealing with production jobs that run on a regular basis and therefore will continuously eat up cluster resources. The techniques are ordered by expectable impact - a subjective measure based on my experience. Note that I wrote this article with Spark 1.4 in mind. As Spark evolves rapidly, some of these findings may be outdated by the time you read this.

Before we dig deeper let us understand what the performance bottlenecks for some Spark job are. In order of severity there are:

  • Network communication
  • Disk reads/writes
  • Computation

Network communication occurs in all operations where machines need to coordinate among each other. Depending on the amount of data that needs to be transferred, the hardware in your cluster, as well as the proximity of the machines in the data center such tasks can be very expensive. In fact they mostly take up the lions share of a jobs total processing time.

Operations involving the disk occur when the processed data is to large to fit into memory. The Spark log file will indicate such situations as "spills". As disk reads are slower than memory access such situations are naturally more expensive than cases where all data can be hold in the clusters memory.

Often the least runtime is spent on the actual computations as these are usually fast once the data is in the memory of the right machine. There premature optimization here makes the least sense and this guide will focus on how to deal with the other two performance bottlenecks.

Reducing shuffles

If your tasks are fine with only accessing data from a single partition (i.e., the part of the data that resides on a specific machine) you will never need to create any network traffic. But in most realistic situations this is insufficient and we need to bring together data from different partitions.

Shuffles are performed for any of the following operations

  • .repartition
  • .cogroup
  • .join, .leftOuterJoin,.rightOuterJoin, .fullOuterJoin
  • Any of the ...ByKey operations
  • .distinct

While all these operations require network communication the amount of data that needs to be transferred can differ significantly. A common example here is .groupByKey which will require almost all data to be transfered and which is usually the most expensive of the "ByKey" operations. As an illustration consider the task of summing up values by a given key which can be done as

rdd.groupByKey.mapValues(_.sum)

Here each value of the RDD must be transfered to the respective machine that is responsible for that key before summing up. This shuffle can be made much more efficient by using a reduce operation:

rdd.reduceByKey(_ + _)

The reduce always operates on two elements of the RDD and reduces them to a single element. This can be done within a partition before the shuffle and therefore the data that needs to be shuffled can already be much smaller.

When joining data one can sometimes exploit the structure of the data to reduce network operations. I will show you two common use cases:

Joining a large and a small RDD

If the small RDD is small enough to fit into the memory of each worker we can turn it into a broadcast variable and turn the entire operation into a so called map side join for the larger RDD. In this way the larger RDD does not need to be shuffled at all. This can easily happen if the smaller RDD is a dimension table.

val smallLookup = sc.broadcast(smallRDD.collect.toMap)
largeRDD.flatMap { case(key, value) =>
  smallLookup.value.get(key).map { otherValue =>
    (key, (value, otherValue))
  }
}

Joining a large and a medium size RDD

If the medium size RDD does not fit fully into memory but its key set does, it is possible to exploit this. As a join will discard all elements of the larger RDD that do not have a matching partner in the medium size RDD, we can use the medium key set to do this before the shuffle. If there is a significant amount of entries that gets discarded this way, the resulting shuffle will need to transfer a lot less data.

val keys = sc.broadcast(mediumRDD.map(_._1).collect.toSet)
val reducedRDD = largeRDD.filter{ case(key, value) => keys.value.contains(key) }
reducedRDD.join(mediumRDD)

It is important to note that the efficiency gain here depends on the filter operation actually reducing the size of the larger RDD. If there are not a lot of entries lost here (e.g., because the medium size RDD is some king of large dimension table), there is nothing to be gained with this strategy.

You can find more details for efficient shuffles in this Databricks presentation.

Serialization

All data that is send over the network or written to disk needs to be serialized. Furthermore RDDs that are persisted in memory may also be stored in serialized form to save memory. You can imagine that in all of these expensive operations, serialization has a pretty big role to play. The Java default serializer has very mediocre performance regarding runtime as well as the size of its results. Therefore the Spark team recommends to use the Kryo serializer instead. You can turn on Kryo with

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

When used in this way the serialized data structures will need to save their fully qualified class names in order to be deserializable. This can be pretty inefficient and Kryo provides a way to avoid this. You can register the classes that you will be serializing with Kryo beforehand. For registered classes Kryo can then safe the space for the class name. In Spark you can register classes for Kryo either by calling

sparkConf.registerKryoClasses(Array( classOf[A], classOf[B], ...))

or by registering a KryoRegistrator with

sparkConf.set("spark.kryo.registrator", "MyKryoRegistrator")

KryoRegistators have the following structure:

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[A])
    kryo.register(classOf[B])
    ...
  }
}

It can be difficult to figure out all the classes you need to register. Especially if you are using generic collections you will need to register the exact classes (including the generic types) you use. To help you it is possible to tell Kryo to throw an exception if it encounters an unregistered class:

sparkConf.set("spark.kryo.registrationRequired", "true")

This way the job will fail on encountering an unregistered class - telling you which class it was missing. You can then add this class to the registration and keep rerunning the job until it succeeds.

Efficient data structures

Though Kryo can help a lot with reducing your memory footprint it is not capable of performing magic. That means if you use very memory consuming data structures with lots of overhead, the serialized classes will still consume a fair amount of space. Therefore it is advisable to prefer simpler data structures for classes that will need to be serialized often (especially the containers for data stored in RDDs). In practice this means:

  • Prefer primitive types (Int, String, etc.) before fancy ones (java.lang.Integer, etc)
  • Prefer Arrays over other containers

When you need to employ more complex data structures, it can pay off to use non generic, specialized variants of these data structures. The spark team recommends the fastutil library. This library features many specialized data structures, like Maps from Int to Double or the like which are more efficient and have smaller memory footprint than there generic counterparts.

Using DataFrames

When mainly dealing with primitive data types, good advice is to store it in DataFrames instead of custom structure. The DataFrame API is currently one of the most busily developed parts of the Spark project and will likely see many great advances in the near future. The great advantage of DataFrames comes with many optimizations that Spark can automagically do for them like:

  • Query optimization (which means you do not have to worry about thins like groupByKey vs reduceByKey)
  • Code generation (which results in highly specialized bytecode that will likely outperform anything you write yourself)
  • R like operations (great news for everybody coming from the R world)

For example counting occurrences of a String can be done with

val rdd = sc.parallelize(Seq("a", "a", "b", "a", "c"))
val df = rdd.toDF
df.groupBy("_1").count

Using broadcast variables

When doing an operation on an RDD Spark will need to serialize the tasks closure and send it to the executor for each partition. If (as should be the case) there are much more partitions than executors, each executor will get a similar closure send multiple times. This usually is not to bad if the closure is not to large. But if it contains some kind of dimension table this will also be shipped multiple times and can itself create a burden on the network. This can be avoided using broadcast variables. These will only be shipped to the executor once and are then accessible from the task closure without serializing them again.

An example task without broadcast variables

val dimensionTable = Map[Int, String] (...)
rdd.map(i => dimensionTable(i))

can be turned into a variant with broadcast variables as follows:

val dimensionTable = sc.broadcast(Map[Int, String] (...))
rdd.map(i => dimensionTable.value(i))

Do not turn everything into a broadcast variable. There is nothing to be gained in broadcasting an integer that has less memory requirement than the container of the broadcast variable.

Memory allocation

You can envision Sparks memory to be divided into three parts:

  • Memory for caching: this is where RDDs get stored by default if you call .cache or .persist
  • Memory for shuffles: here your data gets buffered when transferring it to other machines
  • Memory for tasks: this is the heap space you can access for computations, for example inside a .map operation

These memory blocks are assigned for your job beforehand and are fixed for the job. They can be adjusted with

sparkConf.set("spark.storage.memoryFraction", "0.6")
sparkConf.set("spark.shuffle.memoryFraction", "0.2")

The values given here are Sparks default ones. The memory not assigned for storage (i.e., caching) or shuffles is available within your tasks. Adjusting these settings can greatly benefit your job, especially if you do not have memory in abundance. The following are three use cases and respective memory settings that make sense for them:

Low cluster memory

While Spark is build on assuming that you have arbitrary amounts of memory available this is often not true. If memory is a scarce resource it makes sense to turn down both caching and shuffle memory so that you do not encounter "out of memory" exceptions:

sparkConf.set("spark.storage.memoryFraction", "0")
sparkConf.set("spark.shuffle.memoryFraction", "0.1")

These settings will make your jobs perform more like classical map reduce jobs.

Shuffle intensive jobs

If your job mainly does joining of large data sets but not a lot of computation upon them one can turn up the shuffle memory (to avoid the creation of shuffle files) and reduce the space for caching (which makes sense if the resulting data will be persisted in some storage after shuffling):

sparkConf.set("spark.storage.memoryFraction", "0.02")
sparkConf.set("spark.shuffle.memoryFraction", "0.8")

Computation intensive jobs

If your data is already sorted and cleaned up and you want to run algorithms (e.g., a regression from MLlib) upon it, one can turn down the shuffle memory (most algorithms only shuffle very small, aggregated amounts of data - like gradients - between machines):

sparkConf.set("spark.storage.memoryFraction", "0.02")
sparkConf.set("spark.shuffle.memoryFraction", "0.6")

Note that one should not set the storage.memoryFraction larger than 0.6 without also adjusting JVM settings (to learn more have a look at the Spark configuration manual).

MapPartitions

If a .map operation requires expensive setup (like loading data from the disk) one can avoid doing this operation for each element of the RDD, by using the .mapPartitions operation instead. This allows you to have an operation on an entire partition instead of single elements thereof. Instead of

rdd.map{ element =>
  val dimensionTable = loadDimensionTableFromDisk()
  dimensionTable(element)
}

you can do

rdd.mapPartitions{ elementIter =>
  val dimensionTable = loadDimensionTableFromDisk()
  elementIter.map{ element =>
    dimensionTable(element)
  }
}

Note: in this case one could also have loaded the dimension table in the driver and distributed it as a broadcast variable to achieve the same effect. But in some cases this might not be possible, e.g., if there is some non serializable operation involved.

Compression

By default Spark will compress all data before it sends it over the network. This makes much sense as in most cases the time needed for compression is more than offset by the smaller I/O footprint. Therefore it does usually not make sense to turn off this compression. But the used compression codec can make a difference. Most codecs somehow represent a trade off between the memory they use, the time they need and the size of the compressed data. So far I had good experiences with "lz4". You can change the used Spark codec with

sparkConf.set("spark.io.compression.codec", "lz4")

Garbage collection

When working with Java or Scala one usually does not think about garbage collection as it mostly does its work magically in the background without bothering us much. And usually we do not really have a reason (and definitely do not want) to open this black box. But something that is important to know is that there exist multiple variants of garbage collectors that can be used by the JVM. And changing the used garbage collector reduce the time lost with collecting deletable objects. Therefore it can pay off to exchange the default garbage collector by something else. For example you can use G1GC by calling

sparkConf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC")

You can find more details about Spark and tuning the JVM garbage collection in this article

Conclusion

Of course this list of possible tuning points is not exhaustive. For example Spark has many more settings that can affect performance and which you might want to play around with. But as usual it is advisable to not optimize prematurely and keep default settings as long as there is no reason to change them. Especially as you will need to perform experiments for many settings to see which configuration actually performs best this can easily cost you more than the resource you safe with the tuned parameters.


comments powered by Disqus