Continuing with Spark, this week I read on Spark SQL: Relational Data Processing in Spark which talks about how Spark SQL intends to integrate relational processing with Spark itself. It builds on the experience of previous efforts like Shark and introduces two major additions to bridge the gap between relational and procedural processing:
Spark SQL uses a nested data model based on Hive and supports all major SQL data types along with complex (eg array, map, etc) and user-defined data types. It ships with a schema inference algorithm for JSON and other semistructured data. This algorithm is also used for inferring the schemas of RRDDs (Resilient Distributed Datasets) of Python objects. The algorithm attempts to infer a static tree structure of STRUCT types (which in turn may contain basic types, arrays etc) in one pass over the data. The algorithm starts by finding the most specific Spark SQL type for each record and then merges them using an associative most specific supertype function that generalizes the types of each field.
A DataFrame is a distributed collection of rows with the same schema. It is equivalent to a table in an RDBMS. They are similar to the native RDDs of Spark as they are evaluated lazily, but unlike RDDs, they have a schema. A DataFrame represents a logical plan and a physical plan is built only when an output function like save is called. Deferring the execution in this way makes more space for optimizations. Moreover. DataFrames are analyzed eagerly to identify if the column names and data types are valid or not.
DataFrames supports query using both SQL and a DSL which includes all common relational operators like select, where, join and groupBy. All these operators build up an abstract syntax tree (AST) of the expression (think of an expression as a column in a table), which is then optimized by the Catalyst. Spark SQL can cache data in memory using columnar storage which is more efficient than Spark’s native cache which simply stores data as JVM objects. The DataFrame API supports User-defined functions (UDFs) which can use the full Spark API internally and can be registered easily.
To query native datasets, Spark SQL creates a logical data scan operator (pointing to the RDD) which is compiled into a physical operator that accesses fields of the native objects in-place, extracting only the fiel needed for a query. This is better than traditional object-relational mapping (ORM) which translates an entire object into a different format.
Spark MLlib implemented a new API based on pipeline concept (think of a pipeline as a graph of transformations on the data) and choose DataFrame as the format to exchange data between pipeline stages. This makes is much easier to expose MLlib’s algorithms in Spark SQL.
Catalyst is an extensible optimizer based on Scala’s standard features. It supports both rule-based and cost-based optimizations and makes it easy to add new optimization techniques and data sources to Spark SQL. At its core, Catalyst is powered by a general library that represents and manipulates trees by applying rules to them. Tree is the main data type in Catalyst and is composed of node objects where a node has a type and zero or more children. Rules are functions to transform one tree to another. Trees offer a transform method that applies a pattern matching function recursively on all the nodes of the tree, transforming only the matching nodes. Rules can match multiple patterns in the same transform call and can contain arbitrary Scala code which removes the restriction of using a Domain Specific Language (DSL) only. Catalyst groups rules into batches and executes each batch until it reaches a fixed point (ie the tree stops changing). This means that each rule can be simple and self-contained while producing a global effect on the tree. Since both nodes and trees are immutable, optimizations can be easily performed in parallel as well.
Spark SQL uses Catalyst in four phases:
Logical Plan Analysis which requires resolving attribute references (one for which we do not know the type or which have not been matched to an input table). It uses a Catalog object to track the tables in all data sources to resolve references.
Logical Optimization phase applies standard rule-based optimizations to the logical plan which include constant folding, predicate pushdown, projection pruning, null propagation, Boolean expression simplification, etc.
In Physical Planning phase, Spark SQL generates multiple physical plans corresponding to a single logical plan and then selects one of the plans using a cost model. It also performs some rule-based physical optimizations like the previous case.
In Code Generation phase, Catalyst uses quassiquotes (provided by Scala) to construct an AST that can be fed to Scala Compiler and bytecode can be generated at runtime.
Extensions can be added even without understanding how Catalyst works. For example, to add a data source, one needs to implement a createRelation function that takes a set of key-value parameters and returns a BaseRelation object if successfully loaded. This BaseRelation can then implement interfaces to allow Spark SQL access to data. Similarly to add user-define types (UDTs), the UDTs are mapped to Catalyst’s inbuilt types. So one needs to provide a mapping from an object of UDT to a Catalyst row of built in types and an inverse mapping back.
Some recent work has also shown that it is quite easy to add special planning rules to optimize for specific use cases. For example, researchers in ADAM Project added a new rule to use interval trees instead of using normal joins for a computational genomics problem. Similarly, other works have used Catalyst to improve generality of online aggregation to support nested aggregate queries. These examples show that Spark and Spark SQL is quite easy to adapt to new use cases as well. As I mentioned previously, I am experimenting with Spark SQL and it does look promising. I have implemented some operators and it is indeed quite easy to extend. I am now looking forward to developing more concrete thing on top of it.