CommonLounge Archive

Streaming Tools Tutorial —Spark Streaming, Apache Flink, and Storm

February 28, 2018

Streaming Data

Nowadays, companies need an arsenal of tools to combat data problems. Traditionally, batch jobs have been able to give the companies the insights they need to perform at the right level. However, with the emergence of web based applications where data is being created at a high velocity and the customer wants to see results in near real time, new technology is necessary to combat this problem.

This is where Streaming Tools come into play. They are data processing tools that can carry out various forms of processing in near real time in the hopes of providing customers with accurate information instantaneously.

Streaming Tools can also be used to process data to give accurate metrics of how the business is performing to the organization. It can also find out in real time if a credit card transaction was fraudulent. There are a ton of use cases for streaming tools, but the basis is the same — the ability to get information from data in a real time fashion.

Spark Streaming

In a previous article, we introduced Spark as well as briefly touched on Spark Streaming. To recap, it is an add on to Spark’s Core Engine that uses the scheduling efficiency to create mini batches of data that get processed in near real time. The premise of Spark Streaming is the same as Spark in that it creates a Resilient Distributed Dataset that can be used for processing across a cluster.

Spark Refresher

If you need a refresher on the basics of Spark, head to the Apache Spark article. Below is the code we wrote for the Spark Word Count example (copied here again).

package com.spark.example
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
/** Count up how many of each word appears in a text file as simply as possible. */
object WordCount {
 /** Our main function where the action happens */
 def main(args: Array[String]) {
 // Set the log level to only print errors
 Logger.getLogger("org").setLevel(Level.ERROR)
 // Create a SparkContext and an application name of WordCount
 val sc = new SparkContext("WordCount")  
 // Read each line of the hdfs file
 val input = sc.textFile(input)
 // Split into words separated by a space character
 val words = input.flatMap(x => x.split(" "))
 // Count up the occurrences of each word
 val wordCounts = words.countByValue()
 // Print the results.
   wordCounts.foreach(println)
 }
}

Now we are going to create a Spark Streaming Word Count to pull from a Kafka topic in near real time. Here is a review on how to get the ScalaIDE set up if you need a refresher.

Installing the ScalaIDE

Before we can do anything we are going to have to install the ScalaIDE made by Eclipse. So head over to http://scala-ide.org/ and download the most recent version of ScalaIDE and install it by following the instructions.

Create a Scala Project

Once that is installed go ahead and open it up. We are going to create our very first Scala project by going to File → New → Other. Select Scala Project on the window that pops up and click Next. It will ask for a project name. Go ahead and make it whatever you wish. We suggest SparkStreamingWordCount.

Creating SparkStreamingWordCount

Once you have the project, right click on the project and select New → Package. Name your package something like com.spark.example and click Ok. Once the package is created, right click on the package and select New → Other. Go down to the Scala Wizard and select Scala Object. Name your object, SparkStreamingWordCount.

package com.spark.wordcount
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.SparkConf
object StreamingWordCount {
 def main(args: Array[String]): Unit ={
   //Setup Spark Configuration and Spark Streaming Context
   val sparkConf = new SparkConf().setAppName("SparkStreamingWordCount")
   val ssc = new StreamingContext(sparkConf, Seconds(2))
   //General information about our kafka brokers and what topic we are trying to hit
   val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
   val topic = Array("test")
   //We are actually making the connection between our Spark Streaming job and kafka topic
   val stream = KafkaUtils.createDirectStream[String, String](
     ssc,
     PreferConsistent,
     Subscribe[String, String](topic, kafkaParams)
   )
   //Doing the same logic that we did for the Spark example (see above)
   val lines = stream.map(x => x.toString())
   val words = lines.flatMap(_.split(" "))
   val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
   wordCounts.print()
   //Starting the spark streaming context and waiting for it to be terminated
   ssc.start()
   ssc.awaitTermination()
 }
}

We have to add a jar file so the imports will work. Go to the following site and click where it says jar (next to Files) and download it.

Now that we have the jar, we are going to add the jar to the actual project. Right click on the project name → Build Path → Configure Build Path → Add External Jars. Navigate to where you saved the downloaded jars and click into the folder and select the jar and then click Open.

By now most of your errors should be gone. Now we are going to right click on the project and scroll down to Export → Jar File. Just press next and then finish.

Start Docker and Kafka

Let’s get the jar file over to Digital Ocean so we can run it against a Kafka topic. If you aren’t using Digital Ocean, you can skip down to the docker run command below. The following command will copy (using scp) the file from your local machine into docker.

docker-machine scp /location/of/the/jar root@docker-sandbox:/tmp

Go ahead and SSH into the box and spin up your Docker image using the following command to make sure the jar makes it into the container:

docker run --hostname=quickstart.cloudera --privileged=true -t -i -p 8888:8888 -v /tmp:/tmp/mount cloudera/quickstart:latest /usr/bin/docker-quickstart bash

Note: if your jar is located anywhere other than /tmp, you need to change the following part of the command to reflect that new location -v /jar/location:/tmp/mount.

We need to make sure Kafka is started up so refer back to the Kafka article to show how to start a server and spin up a kafka-console-producer.

Start the Spark Streaming Job

After you have those two up and running, go ahead and open up another terminal and use the same technique to get into the same docker container. Once you are here let’s kick off our Spark Streaming job with the following command:

spark-submit --class com.spark.example.WordCount --master yarn --deploy-mode cluster --name wordcount /tmp/mount/SparkStreamingWordCount.jar

Spark-submit is the main command that we are running. Let’s break it down:

  • --class com.spark.example.WordCount specifies where the main class can be found
  • --master yarn specifies that yarn will be the coordinating the job
  • --deploy-mode cluster specifies that we are deploying the job in cluster mode.
  • --name wordcount specifies the name of the application as wordcount for tracking purposes.
  • Finally just tell Spark where to find the jar that holds the code.

Now if you start typing some input into the Kafka Producer and pressing enter, you should start to see your Spark Streaming application start printing our word counts from the last 2 seconds. Now that might not be many words, but if you copy and paste a news article into the kafka console producer, you can really test the power of your application.

Apache Flink

Apache Flink is an open source framework for distributed stream processing.

  • It provides accurate results even if data arrives out of order or late.
  • It is stateful and fault tolerant and can recover from failure all while maintaining one state. Stateful means that the application has the ability to recall previous events.
  • It has been proven to run on thousand of nodes with low latency and very fast throughput.

Bounded and Unbounded Datasets

There are two main types of datasets when dealing with streaming applications: bounded and unbounded datasets.

Bounded datasets are finite in size and don’t change. They are datasets that you would traditionally think of being processed using a batch methodology.

Unbounded datasets are infinite in size and data is appended to them continuously. They are streaming such as machine log data, where data is coming in continuously. It is important to know the difference because Flink can process both types of datasets using two different execution models.

Streaming and Batch Processing Models

Flink has a streaming execution model that processes continuously as data is being produced in the stream and holds on to resources for as long as the stream exists.

Flink also has a batch processing model that completes in a finite amount of time that can be predicted and then releases resources after the job is complete. These two execution models make Flink ideal for any type of process that might be needed.

Apache Flink vs Apache Spark Streaming

Spark is well known in the industry for being able to provide lightning speed to batch processes as compared to MapReduce. But how does it match up to Flink? As we stated above, Flink can do both batch processing flows and streaming flows except it uses a different technique than Spark does.

When Spark turns into Spark Streaming, it uses mini batches to mimic a pure stream. In the same way, Flink still uses streams during batch processing just implemented in a different way to mimic Spark’s implementation. This makes Flink a little nicer to work with when it comes to straight streaming use cases. It has also been shown to provide lower latency and higher throughput than Spark for some use cases. Moreover, it has powerful window operators which support more use cases than Spark.

At the end of the day, Spark is a little more mature and has way more users than Flink. However, Flink is gaining on Spark and becoming more popular for more streaming use cases than Spark. It is difficult to say which one is better at this point; however, in the next couple of years, Flink will start gaining some traction on the way of matching up with Spark on a more level playing field.

Apache Storm

Apache Storm is another real time big data processing system that is designed to process large amounts of data in a distributed and fault tolerant way. Storm is stateless meaning that it doesn’t keep track of state; however, Zookeeper helps manage the environment and cluster state.

Benefits of Apache Storm

Here are some of the main benefits of Apache Storm:

  • Storm has a lot of processing power which makes it fast and able to handle large amounts of data with ease.
  • Storm has the ability scale linearly to help increase the resource pool to allow more processing power.
  • Storm processes in real time enabling low latency delivery of data.
  • Storm provides guaranteed processing even if nodes die or messages are misplaced.
  • Storm is open source so it is available for anyone to use.

Apache Storm Terminology

There are four main terms of Storm that are important to know:

  • Tuple — A tuple is a list of ordered elements and is the main data structure
  • Stream — A stream is an sequence of tuples in any order
  • Spout — A spout is quite simple the source of the stream. This could be a web service, Kafka, or anything thanks to the ability to customize Spouts.
  • Bolts — Bolts are how we process data inside of Storm. Spouts pass Streams to Bolts for processing.

In the picture, you see a Spout picking up data from a source and feeding it into a Bolt. That Bolt does some type of transformation and sends it to other Bolts. Finally the Bolts come together and the final Bolt writes the data to some type of data storage solution.

Storm is a pure streaming architecture. Storm has no way of doing batch jobs natively like Flink can. Storm is different from both Spark Streaming and Flink because it is stateless so it has no idea about previous events throughout the flow of the data.

Storm was originally created by Nathan Marz. Nathan Marz is a legend in the world of Big Data. He not only created Storm, but he is also the father of the Lambda Architecture which revolutionized the way Big Data was processed and returned to the customer.

Conclusion

Congratulations! You’ve got some hands on experience with Spark Streaming and got a glimpse into some of the up and coming Streaming services that are starting to make their way into the industry.


© 2016-2022. All rights reserved.