This week I read on Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. The paper describes the architecture of RDDs (Resilient Distributed Datasets), what problems they can be used to solve, how they perform on different benchmarks and how they are different from existing solutions.
Many generalized cluster computing frameworks, like MapReduce and Dryad, lack in two areas:
- Iterative algorithms where intermediate results are used across multiple computations.
- Interactive data analysis where users run ad-hoc queries on the data.
One way around these problems is to use specialized frameworks like Pregel. But this leads to loss of generality. This is the problem that RDD intends to solve — by providing a general purpose, fault tolerant, distributed memory abstraction.
RDDs are immutable partitioned collections that are created through deterministic operations on data in stable storage or other RDDs. They keep enough information about how they are derived from other sources (this information is called lineage). This lineage ensures that RDDs can be easily reconstructed in case of failures without having to perform explicit checkpointing. In fact, a program can not reference an RDD that it can not reconstruct after a failure. RDDs are lazy and ephemeral. They are constructed on demand and discarded after use. This allows for pipelining of many operations. For example:
rawData = spark.textfile(filepath) // read data from file
dataAfterApplyingFirstFilter = rawData.filter(condition1)
dataAfterApplyingSecondFilter = dataAfterApplyingFirstFilter.filter(condition2)
The execution will take place on line 4, and the two filter conditions can be merged into a single condition to avoid multiple passes over the data.
RDDs provide an interface based on fine-grained reads and coarse-grained updates. This means transformations (functions) are applied to all data items. These transformations can be logged to build lineage graph so as to provide fault tolerance. But this update nature makes RDDs unsuitable for applications like incremental web crawler that needs asynchronous fine-grained updates to a shared state. In such cases, DSM (Distributed Shared Memory) would be a better choice as it provides fine-grained reads and writes. Although RDDs offer many advantages over DSM. First, unlike DSM, RDDs do not need to incur checkpointing overhead. Second, RDDs, being immutable, can mitigate stragglers (slow nodes), by running backup tasks just like MapReduce. Third, since only bulk writes are supported, run time can schedule tasks based on data locality to enhance performance. Lastly, even if RDDs choose to take checkpoints (in cases where the lineage graph grows very big), consistency is not a concern because of the immutable nature of RDDs.
RDDs have been implemented in Spark to provide a language integrated API. Details about this implementation have been discussed here separately.
The paper proposes a graph-based representation of RDDs where an RDD is expressed through a common interface that exposes five functions:
- partition — represents atomic pieces of the dataset.
- dependencies — list of dependencies that an RDD has on its parent RDDs or data sources
- iterator —a function that computes an RDD based on its parents
- partitioner — whether data is range/hash partitioned.
- preferredLocation — nodes where a partition can be accessed faster due to data locality.
The most interesting aspect of this representation is how dependencies are expressed. Dependencies belong to one of the two classes:
- Narrow Dependencies — where each partition of the parent node is used by at most one child partition. For example, map and filter operations.
- Wide Dependencies — where multiple child partitions use a single parent partition.
Narrow dependencies support pipelined execution on one cluster node while wide dependencies require data from all parent partitions to be available and to be shuffled across nodes. Recovery is easier with narrow dependencies while in the case of wide dependencies, failure of a single partition may require a complete re-execution. The figure shows some examples of narrow and wide dependencies. Note that join operation defines a narrow dependency when parents are hash-partitioned and wide dependency in other cases.
Figure 1: Example of narrow and wide dependencies.
Whenever an “action” is executed, the scheduler builds a DAG (Directed Acyclic Graph) of stages based on the lineage graph. Each stage would contain pipelined transformations with narrow dependencies. The boundaries between different stages are the shuffle operation which are required by wide dependencies. Some of these stages may be precomputed (due to the persistence of previous computations). For remaining tasks, the scheduler uses delay scheduling to assign tasks to machines based on data locality. For wide dependencies, intermediate records are materialized on nodes holding the parent partition.
Spark outperforms Hadoop and HadoopBinMem for following reasons:
- Minimum overhead of Hadoop Stack as Hadoop incurs around 25 seconds of overhead to complete the minimal requirements of job setup, starting tasks, and cleaning up.
- Overhead of HDFS while serving data as HDFS performs multiple memory copies and a checksum to serve each block.
- Deserialization cost to convert binary data to in-memory Java objects.
Note that HadoopBinMem converts input data to low-overhead binary format and stores it in an in-memory HDFS instance.
Case studies also show that Spark performs well for interactive data analysis and other user applications. One limitation of the experiments is that in all the cases comparing the 3 systems, the cluster had sufficient RAM to keep all the data in-memory. It would have been interesting to compare the performance of the three systems in the case where the cluster does not have sufficient RAM to keep the entire data in main memory.
RDDs and Spark learn from and improve the existing systems in many ways.
- Data flow models like MapReduce share data through stable storage but have to incur the cost of data replication, I/O and serialization.
- DryadLINQ and FlumeJava provide language integrated APIs and pipeline data across operators in the same query. But unlike Spark, they can not share data across multiple queries.
- Piccolo and DSM do not provide a high-level programming interface like RDDs. Moreover, they use checkpointing and roll back which are more expensive than lineage based approach.
- Nectar, Ceil and FlumeJava do not provide in-memory caching.
- MapReduce and Dryad use lineage based recovery within a computation, but this information is lost after a job ends. In contrast, RDDs persists lineage information across computations.
RDDs can be used to express many existing models like MapReduce, DryadLINQ, Pregel, Batched Stream Processing, etc. This seems surprising given that RDDs offer only a limited interface due to their immutable nature and coarse-grained transformations. But these limitations have a negligible impact on many parallel applications. For example, many parallel programs prefer to apply the same operation to many records to keep the program simple. Similarly, multiple RDDs can be created to represent different versions of the same data.
The paper also offers an interesting insight on the question of why previous frameworks could not offer the same level of generality. It says previous frameworks did not observe that “the common cause of these problems was a lack of data sharing abstractions”.