CommonLounge Archive

Kafka, Flume, and Flafka Tutorial

February 24, 2018

Kafka

Apache Kafka is a publish-subscribe messaging system that is distributed across many servers so that it can handle high volumes of data flowing through it. A messaging system is a type of application that helps transfer data from one place to another so that other applications can focus on doing other things rather than sharing data.

Kafka Architecture

Below is a diagram of a simple messaging system:

A producer is the sender that sends a message to the messaging queue. The message queue then relays the message to the consumer. In this diagram, a message can only go to one consumer. This is a point to point message system.

The publish-subscribe messaging pattern refers to publishers (the senders of the messages) who publish (or send) a message without knowing which consumers will receive the message. These consumers or subscribers can subscribe to whichever messages they care about.

Kafka is a publish-subscribe (or pub-sub) messaging system. This is a little different than the above diagram because a producer publishes a message to the topic (message queue) and the message gets persisted to disk. Then multiple consumers can subscribe to the topic to get the message. The below diagram shows the differences:

Kafka Terminology

  • Broker — a server that has Kafka installed that houses topics.
  • Topic — a collection of data that is stored in the form of messages. A topic can be split up into many partitions.
  • Partitions — a subset of a topic. Having multiple partitions (which could possibly be on different brokers) help with increasing throughput. Partitions also allow consumption patterns and organization inside of a topic. A topic can simply be viewed as a collection of partitions.
  • Offset — a unique sequence id for a message inside of a topic.
  • Replicas — backups of topics to help prevent data loss.
  • Kafka Cluster — many brokers that work together to ensure throughput is achieved without downtime.
  • Producers — the application that gives the data from the source to the Broker. The broker then appends the message to the correct topic and partition. A producer can write to a specific topic or even to a specific partition of a topic.
  • Consumers — the application that gets the data out of a Kafka topic.

Let’s put all of those terms all together. One to many Kafka brokers make up a Kafka cluster. A broker houses one to many Kafka topics. Kafka topics can be broken down into partitions. When a message is committed to a topic, it is actually committed to a partition and given a specific offset. The other brokers make replicas of that data so that there is no data loss. A producer puts data into a specific partition of a topic on a certain broker and a consumer retrieves that data.

Benefits of Kafka

  • Scalability — Kafka is really good for scaling horizontally without any down time.
  • Reliability — Since Kafka is distributed across many brokers and data is replicated, it is very reliable and fault tolerant.
  • Durability — Kafka writes messages to disk as soon as possible to ensure that if the broker were to go down, the data would still be intact.
  • Performance — Kafka has a high throughput of data thanks to its ability to scale making Kafka a high throughput messaging service.

With all of these benefits, it is said that Kafka is very fast and can guarantee virtually no downtime and data loss.

Kafka Example

Let’s actually see some messages flow through our very own Kafka cluster.

First thing’s first, if you are using a Digital Ocean instance, go ahead and create the instance like we did in the very first lesson. Once you are inside the Cloudera quickstart Docker image we can begin on the same page for everyone.

The first thing we need to do once inside the docker image is issue the following commands:

yum install wget
wget http://www-us.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
tar -xvf kafka_2.10-0.10.1.0.tgz
mv kafka_2.10-0.10.1.0 /opt/

These commands simply install Kafka and put it into a certain directory (/opt/). Now let’s start our very own Kafka broker.

cd opt/kafka_2.10-0.10.1.0/
bin/kafka-server-start.sh config/server.properties

Awesome. Now there should be a Kafka broker running in this window. Don’t touch anything. We are going to open a brand new command line and get back inside of your docker container (if you’re on Digital Ocean just do the docker-machine ssh docker-sandbox). Run the following command:

docker ps
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                    NAMES
a4a04f5da8be        cloudera/quickstart:latest   "/usr/bin/docker-qui..." 38 minutes ago      Up 38 minutes       0.0.0.0:8888->8888/tcp   cranky_ride

This command just lists the various containers that are running inside docker right now. This is an example of our output but yours will be different. Copy the number under CONTAINER ID and then run the following command:

docker exec -it a4a04f5da8be bash

This allows you to have bash access to your container. Now you are inside of the same container in a different ssh session. First thing we are going to do while the Kafka broker is running in the other window is run the following command to create a topic:

Note that localhost:2181 is the default location for the Kafka zookeeper.

/opt/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
/opt/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
#outputs the following:
test

The first command creates a topic with the replication factor 1 and partitions of 1 with the name of test. The second command will just list all of the topics that are being tracked by zookeeper.

Now we are going to spin up a kafka console producer to actually write messages into the kafka topic that we just created. After you type the command on line 1, it will just wait for your input.

Type Hello , press enter, type I’m a message, press enter, and then press control + C to exit.

/opt/kafka_2.10-0.10.1.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Hello
I'm a message
^C

Now let’s spin up a kafka console consumer to pull the data out of the topic.

/opt/kafka_2.10-0.10.1.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Hello
I'm a message

This command creates a consumer to the topic test and asked for all the offsets from the beginning. If you put more messages in the producer, you will see more messages in the consumer.

Great! You have successfully published some messages to Kafka and retrieved them. Keep these two windows open because we are going to need them here soon.

Flume

Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of data.

Let’s break that statement down: it’s distributed which makes it reliable and available. It collects, combines, and moves large amounts of data.

So in simple terms, Flume is a data ingestion tool that moves data from one place to another. Flume guarantees data delivery.

Flume Architecture

Let’s get a picture of what the Flume architecture looks like so we can talk about it. Reference the diagram below:

Flume Architecture Diagram

  • First, an event is a single unit of data that is transported by Flume.
  • A client is the entity that creates events that are ingested via Flume.
  • A source is how the event enters into Flume. There are two types of sources in Flume:
  • Passively Waiting is a type of source that waits for events to be sent to it via the Client.
  • Actively Polling is a type of source that continues to ask the Client for events. Sources send events to the Channel.
  • The channel is the bridge between the source and the sink. Channels are used for buffers so that incoming events from the source do not overwhelm the sink.
  • Sinks are the mechanism that Flume uses to deliver the data to the destination. While in the channel, Flume bunches events into transactions to help the transfer of data through the sink. A sink can write transactions into file systems such as HDFS (very common) or it can pass the data into another Flume agent.

Flume has a very simple architecture that allows it the ability to pass data through with minor moving parts. It is great for taking streaming data in from the source and writing it to HDFS in batches. It can also write to HDFS in a near real time fashion if that is part of the use case, highlighting its flexibility. Id is defined by a configuration file.

Flume Example

Let’s write our own configuration file and take the data we wrote in Kafka (client) and put it into HDFS (destination). So in the window with the Kafka consumer from up above, let’s do the following:

First, lets create the configuration file:

vi kafkaFlume.conf

Copy in the following. Go through the comments for each of the configuration lines.

# These three lines define the names of our sources, channels, and sinks 
# that are inside of the agent.
# Each command always starts with the name of the agent.
agent.sources = kafkaSource
agent.channels = channel
agent.sinks = sinkToHdfs
# We are going to now start configuring the source.
# Commands still start with the agent name and then 
# .<component> in this case "sources" then the name of the
# component you are configuring
# Here we define the type of source as a kafka source
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
# This tells the kafka source how to connnect to zookeeper 
# just like we did in the kafka consumer
agent.sources.kafkaSource.zookeeperConnect = localhost:2181
# Tells the source what topic to read from
agent.sources.kafkaSource.topic = test
# Default value in flume
agent.sources.kafkaSource.groupId = Flume
# Tells the source what channel it will be sending the data to. 
# We defined channel as the channel name up above
agent.sources.kafkaSource.channels = channel
#We are going to start the channel configuration
agent.channels.channel.type=memory
agent.channels.channel.capacity = 10000
agent.channels.channel.transactionCapacity = 1000
# The above configurations created a channel with type=memory meaning 
# all the data is stored in memory rather than on disk. 
# Then we specified how big to make it (capacity) and 
# how big each transaction can be.
#Sink configurations start here
agent.sinks.sinkToHdfs.type = hdfs
# This defines where in hdfs we are going to be putting the data
agent.sinks.sinkToHdfs.hdfs.path = hdfs://quickstart:8020/flumedata
# How many seconds before it will start writing to HDFS
agent.sinks.sinkToHdfs.rollInterval = 5
# We could define a size but we left it at zero to not worry about it
agent.sinks.sinkToHdfs.rollSize = 0
# This configuration is for how many events to track before writing to hdfs.
# We don't care about it, so we left it at 0.
agent.sinks.sinkToHdfs.rollCount = 0
agent.sinks.sinkToHdfs.hdfs.fileType = DataStream
agent.sinks.sinkToHdfs.channel = channel

We need to create the directory that we specified in our configuration file so run the following command:

hdfs dfs -mkdir /flumedata

Now we are going to start up the agent.

flume-ng agent --conf conf --conf-file ./kafkaFlume.conf --name agent -Dflume.root.logger-INFO.console

flume-ng agent is the name of the command and we are sending in base configurations of flume with --conf conf and then passing in our written conf-file with --conf-file ./kafkaFlume.conf. We then tell Flume what the name of our flume agent that we defined in our conf file with --name agent. -Dflume.root.logger-INFO.console is just setting the log level so we don’t get overwhelmed.

Go ahead and kick that off. Once you have that kicked off open another command line and do the same thing we did above to get back into the container. It’s copied here again for ease-of-reading.

docker ps
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                    NAMES
a4a04f5da8be        cloudera/quickstart:latest   "/usr/bin/docker-qui..." 38 minutes ago      Up 38 minutes       0.0.0.0:8888->8888/tcp   cranky_ride

This is an example of my output but yours will be different. Copy the number under Container Id and then run the following command:

docker exec -it a4a04f5da8be bash

Now start up a Kafka Console Producer and write some text while pressing enter after each line you want to send. When you start to do this you’ll start to see your Flume agent is outputting logs to the console.

18/02/24 03:22:42 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/02/24 03:22:42 INFO hdfs.BucketWriter: Creating hdfs://quickstart:8020/flumedata/FlumeData.1519442562281.tmp
18/02/24 03:23:15 INFO hdfs.BucketWriter: Closing hdfs://quickstart:8020/flumedata/FlumeData.1519442562281.tmp
18/02/24 03:23:15 INFO hdfs.BucketWriter: Renaming hdfs://quickstart:8020/flumedata/FlumeData.1519442562281.tmp to hdfs://quickstart:8020/flumedata/FlumeData.1519442562281

This showing that Flume is correctly writing into HDFS. In the output above, Flume created a file called FlumeData.1519442562281. Yours will be different but copy that file name and run the following command to see if your output is in there:

hdfs dfs -cat /flumedata/FlumeData.1519442562281

That should show the words and phrases you were writing in your kafka-console-producer in that file. Pretty awesome. To shut the Flume agent and kafka-console-producer down just do a control+c in the window.

Flafka

Flume is a great ingestion tool and Kafka is a great messaging system that is widely used in the Hadoop ecosystem. The creators of Flume have integrated Kafka into all aspects of Flume. They also created a Flume channel that is actually a Kafka topic to help insulate Flume channels and give them more reliability (remember Kafka is super reliable). We already saw how we can grab data from Kafka using a Kafka source. We can also send Flume data to Kafka using a Kafka sink. It’s a pretty cool project that is worth mentioning.

Conclusion

Congratulation you have learned the basics of Kafka and Flume and actually setup a very common ingestion pattern that is used in Hadoop.


© 2016-2022. All rights reserved.