Apache Spark Tutorial
January 10, 2018
What is Apache Spark?
Apache Spark is a fast and general engine for large-scale data processing. It is an in-memory data processing tool that can take large amounts of data and perform distributed operations across the data in many kinds of ways.
The most popular use case for Spark is to perform batch data processing. Batch processing is where a job or application runs based on time or a set amount of data. For example, we might have a Spark job to process bank transactions at 1am every morning. This is a batch process.
Spark also gives us the option of processing data in a streaming fashion. Stream processing is processing data when it is available in near real time. For example, we might have a Spark streaming job process bank transactions to determine if the transaction is fraud or not. This is a streaming process.
Spark was created by Matei Zaharia in the University of California Berkeley’s AMPLab in 2009. It was then donated to the Apache Software Foundation under the Apache License 2.0. The first official release was on May 30, 2014. Databricks was founded by the team that created Spark in 2013. Their main goal is to make Spark easier to use and run and all of their work is donated back to the Apache Spark project.
Why Apache Spark?
Spark has really come along way since 2009. Spark is now the largest Apache open source project out there. Many companies such as Yahoo, Netflix, and Ebay have adopted Spark as their Big Data processing engine. Some of these companies have to been able to scale Spark across thousands of nodes to process petabytes of data. But why are so many individuals and companies jumping on the Spark band wagon?
There are three main benefits of Spark: speed, ease of use, and a single unified engine.
Speed
Spark was born out of the need to use a MapReduce like framework across a bunch of nodes. But one of the main goal was to increase the performance of the current MapReduce framework. Spark was engineered for speed and performance. Spark can be 100x faster than MapReduce for in memory computing. Even if Spark application uses disk for computing (like MapReduce does), Spark is still 10x faster than MapReduce. Spark actually holds the world record for large-scale on-disk sorting.
Ease of Use
The creators of Spark also wanted to solve all of the overhead and complexities that came with writing MapReduce code. Spark itself is written in a JVM (Java Virtual Machine) language known as Scala. Scala is a hybrid language mixing concepts of object oriented languages and functional languages. So Spark APIs were created in the hopes of creating a better user experience. The Spark APIs include over 100 operators for transforming data.
On top of that, the Spark Dataframe APIs allow users to manipulate semi-structured data with ease by adding a schema and allowing querying of data via SQL. Another awesome thing that was included with Spark is the development of the framework in other languages. There are Spark APIs written in Scala, Java, R, and Python. There are even more DSL (Domain Specific Languages) that help with bridge even more gaps between different languages (such as Clojure) and the Spark APIs.
Unified Engine
Spark can do a ton of processing for you. It can run SQL queries, stream processing, machine learning processing, graph processing, and a lot more. In old systems, you would have to use different frameworks and applications to achieve of each of those processing goals. With Spark, you achieve it all with the same unified engine (Spark Core) which leads to increase productivity because they can be used seamlessly to create complex workflows.
Spark Components
Spark has a core set of libraries called Spark Core. On top of Spark Core, users can run Spark SQL, Spark Streaming, MLlib, and GraphX. Let’s dive into a quick look at each of the components.
Spark Core
Spark Core is the meat and potatoes of Spark. It is the general execution engine for Spark — every single component in Spark uses it. It provides the abstraction to run in-memory jobs as well as the ability to reference datasets that are many different kind of systems such as HDFS, Cassandra, and HBase.
Spark Core provides the abstraction of data in the form of the Resilient Distributed Dataset (RDD). A RDD is the fundamental data structure of Spark. It is an immutable, partitioned collection of elements that can be operated on in parallel. The RDD concept is the reason why Spark is faster than MapReduce.
Since a RDD is immutable meaning that once it is created, that RDD cannot change. Doing a transformation on a RDD will result in a new immutable RDD and is stored in memory for fast recollection for processing. With MapReduce, the only way to reuse data between transformation or computations is to write to an external storage system perhaps HDFS. In MapReduce, data sharing is slow due to serialization, replication, and disk IO. About 90% of MapReduce application are spent doing read-write operations with external systems. Because of this single reason, Spark is faster than MapReduce, making the RDD the most important concept in Spark.
Spark Core also enables the use of a Spark Context as the entry into a spark application. The Spark Context is the way to parallelize data, create RDDs, and do any and all processing on the data. Spark Contexts are also the way that you create a SQLContext/HiveContext. A SQLContext is the gateway into creating a dataframe internal to spark. A HiveContext is the ability to interact with Hive through Spark. Only one spark context can be active per JVM. Make sure to stop the Spark Context when done with it.
Spark SQL
SparkSQL is the ability to support structured and semi structured data with the ability to apply a schema to a set of data. SparkSQL uses an API called Dataframes to allow the user to load some data into a dataframe and run SQL queries on that data. This is done by taking applying a schema to the data. It is super important because the schema has to be defined or the Dataframe is useless.
Spark Streaming
Spark Streaming is a near real time processing framework that allows the user to take in data in mini batches and perform operations on it. Because Spark Streaming uses mini batches, it’s not like a pure streaming framework such as Flink. However, Spark Streaming use Spark Core’s fast scheduling capability to complete these mini batches in a way that the application acts like a pure streaming application.
MLlib
MLlib is the Spark library that allows machine learning in a distributed framework. There are many libraries that Spark provides that allow users to hit the ground running with machine learning. These machine learning algorithms can also be loaded in Spark as a batch process or into Spark Streaming as a streaming process. Users can also interact with MLlib to create their own machine learning algorithms.
GraphX
GraphX is a distributed framework that allows graph-processing. The GraphX library runs on top of Spark Core and provides an API for creating graph computations. GraphX uses Pregel abstraction API to create user-defined graphs. GraphX is also optimized for runtime making it very efficient.
Spark 2.0
Spark 2.0 recently came out with some improved functionality. In the examples, we will be using Spark 1.6, but there are a few Spark 2.0 concepts that we need to mention.
Datasets is a new concept which applies the ability to have type safe data inside dataframes.
Spark Sessions is the single point entry into a SQLContext. So far the Spark Session is an entry point into the SQLContext and HiveContext. It will be extended the StreamingSession which is the Spark Streaming addition which is another addition in Spark 2.0.
All of these updates are an attempt to create a better unified Spark always with the developers in mind.
Spark WordCount Example
We already wrote a MapReduce wordcount example. One of the benefits of Spark is that it is easier for developers to code in. Let’s put that to the test. Review the MapReduce Word Count wiki because we are about to rewrite for Spark using Scala.
Before we can do anything we are going to have to install the ScalaIDE made by Eclipse. So head over to http://scala-ide.org/ and download the most recent version of ScalaIDE and install by following the instructions.
Once that is installed go ahead and open it up. We are going to create our very first Scala project by going to File → New → Other
. I window should pop up and Scala Project should be an option. Go ahead and select and click Next
. It will ask for a project name. Go ahead and make it whatever you wish. I suggest SparkWordCount
.
Once you have the project, right click on the project and select New → Package
. Name your package something like com.spark.example and click Ok. Once the package is created, right click on the package and select New → Other
. A window should pop up — go down to the Scala Wizard
and select Scala Object
. Name your object, WordCount
.
Now we can code a little bit!! Feel free to copy the following code:
package com.spark.example
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
/** Count up how many of each word appears in a text file as simply as possible. */
object WordCount {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext and an application name of WordCount
val sc = new SparkContext("WordCount")
// Read each line of the hdfs file
val input = sc.textFile(input)
// Split into words separated by a space character
val words = input.flatMap(x => x.split(" "))
// Count up the occurrences of each word
val wordCounts = words.countByValue()
// Print the results.
wordCounts.foreach(println)
}
}
Now that we have the code, we have to add the Spark library to the Scala project. Head over to https://spark.apache.org/downloads.html. Where it says “Choose a Spark Release:” Pick the drop down and select Spark 2.0.0 and then click the link in step 3 and the download should start.
Now that we have the jars, we are going to add the jars to the actual project. Right click on the (project name) → Build Path → Configure Build Path → Add External Jars
. Navigate to where you saved the downloaded jars and click into the folder and select all of the jars and then click Open.
By now most of your errors should be gone. Now we are going to right click on the project and scroll down to Export → Jar File
. Just press next and then finish.
Now you should have your jar. Before we continue, if you are using Digital Ocean for the examples, please go ahead and spin up the docker-machine and resize it appropriately. Please see this post if you need a refresh.
Digital Ocean users will need to run the following command:
docker-machine scp /location/of/the/jar root@docker-sandbox:/tmp
This will get your jar file up to the machine that we are about to spin up the Cloudera Docker container with.
Non Digital Ocean users can just run:
cp /location/of/the/jar /tmp
Everyone now will run this command:
docker run --hostname=quickstart.cloudera --privileged=true -t -i -p 8888:8888 -v /tmp:/tmp/mount cloudera/quickstart:latest /usr/bin/docker-quickstart bash
Once this command is done, you will be inside your Cloudera Docker Container ready to rock!
Just like in the MapReduce discussion, we need to create some data and put it into HDFS. Here are the commands.
echo "Hello World! This is a test to ensure that my hello world application works" > wordcountdata
hdfs dfs -mkdir -p /wordcount/input
hdfs dfs -copyFromLocal /home/course/WordCountData /wordcount/input
Now we have our data in HDFS. Let’s run the WordCount jar that will be in /tmp/mount. Let’s run our spark-submit command.
spark-submit --class com.spark.example.WordCount --master yarn --deploy-mode cluster --name wordcount /tmp/mount/SparkWordCount.jar hdfs://quickstart.cloudera:8020/wordcount/input
And after all the mumbo jumbo, you should see the results of your words counted.
Spark Dataframes Example
Inside your docker container, we are going to create some very quick JSON data and apply a Dataframe to it and allow you to run queries on it. JSON is a type of data format that inherently has a schema with it, but we want to put it into a dataframe so it helps to simplify things that it is in JSON so run these commands with me.
vi sales.json
This will open a new document titled sales.json. Once inside you need to press i
to be in interactive mode so you can type. Please make sure that you are typing and not copy / pasting because it might not work when we create the dataframe. Type the following:
{
{"salesId" : "1", "description" : "Wally World", "amount" : "129.23", "numberOfItems" : "15"}
{"salesId" : "2", "description" : "Toca Bell", "amount" : "5.65", "numberOfItems" : "3"}
{"salesId" : "3", "description" : "Whatasalad", "amount" : "32.12", "numberOfItems" : "6"}
{"salesId" : "4", "description" : "Pizza Nut", "amount" : "23.43", "numberOfItems" : "2"}
{"salesId" : "5", "description" : "Bullseye", "amount" : "2398.40", "numberOfItems" : "29"}
{"salesId" : "6", "description" : "Wally World", "amount" : "1.83", "numberOfItems" : "1"}
}
To get out of this interactive mode, hit ESC
and then type :wq
.
Spark-shell is a command that you can run to get an interactive shell (REPL) to run spark commands or just Scala if you wish. While still in your docker container, run spark-shell
Now you should have a prompt that says Scala. Run the following command to load the data into a dataframe.
val dataframe = sqlContext.read.json("file:///tmp/mount/sales.json")
In the parenthesis, put the location that your sales.json
files is located. Once this command has ran we now have a data frame inside the spark shell. Let’s do some cool things with it.
dataframe.show()
Output:
+---------------+-------+-----------+-------------+-------+
|_corrupt_record| amount|description|numberOfItems|salesId|
+---------------+-------+-----------+-------------+-------+
| { | null| null| null| null|
| null| 129.23|Wally World| 15| 1|
| null| 5.65| Toca Bell| 3| 2|
| null| 32.12| Whatasalad| 6| 3|
| null| 23.43| Pizza Nut| 2| 4|
| null|2398.40| Bullseye| 29| 5|
| null| 1.83|Wally World| 1| 6|
| }| null| null| null| null|
+---------------+-------+-----------+-------------+-------+
If you see corrupt record inside of your dataframe, don’t worry too much about it. When you use the internal libraries like read.json, it is very sensitive when it infers the schema. When you apply your own schema, you can control it a little bit more.
This command should show you your data in a nice format.
dataframe.printSchema()
Output:
|-- _corrupt_record: string (nullable = true)
|-- amount: string (nullable = true)
|-- description: string (nullable = true)
|-- numberOfItems: string (nullable = true)
|-- salesId: string (nullable = true)
This command will show you the schema of your data.
dataframe.select("description").show()
This command will show you all of the descriptions in our data frame.
dataframe.filter(dataframe("amount" > 100)).show()
This command will give you the amounts that are over $100 and shows you a glimpse of the SQL like language that you can use to create queries of the data inside a dataframe.
Conclusion
We have looked at what Spark is and have compared Spark to MapReduce. We even took MapReduce code written in Java and condensed it down into a WordCount using Scala and Spark showing just how simple Spark makes data processing compared to MapReduce. After that, we created your very first dataframe and ran some queries on it to get the feel. We encourage you to play around with the dataframe and see what other queries you can create. Don’t forget to shut down your docker-machine if you are using Digital Ocean. Refer back to this post for a refresher.