CommonLounge Archive

MapReduce Tutorial

March 09, 2018

MapReduce is a Java-based framework used for processing large amounts of data. The MapReduce framework is made up of two major components: Map and Reduce.

The Map part of the algorithm takes data and transforms it into key/value pairs (<k1,v1>) or tuples.

The Reduce part of the algorithm takes the output of a map and combines or reduces the input into a smaller group of tuples.

The advantage of this simple framework is the ability to scale across multiple nodes based on a configuration change.

We’ll go through a hands-on example at the bottom of this article.

MapReduce Architecture

MapReduce is the main processing algorithm and processing framework in Hadoop. Whether you write your own MapReduce code or you use applications such as Pig that convert into MapReduce, MapReduce is still used fairly often in the Hadoop ecosystem. One of the main reasons is because of the idea of sending the processing to where the data lives. Since the MapReduce processing happens on the data nodes, the processing happens where the data lives in HDFS. This helps to cut down on network costs and improve performance. There are three main stages of a MapReduce application: Map, Shuffle, and Reduce.

The Map Stage is responsible for processing the input data. The input is supplied to the map job line by line from generally a file in HDFS. Once the map stage is complete it gets passed to the shuffle stage which combined with the reduce stage processes the data from the mapper still in <key,value> fashion usually taking the multiple key values pairs and reducing them by the key. Once the values are consolidated by keys, all of the newly reduced <key,value>’s are stored into some type of output that is usually stored in a file in HDFS. A lot of the nitty gritty operations like task creation, movement of data between data nodes, and verification of completion of tasks is handled by the MapReduce framework.

MapReduce Terminology

  • Job - A MapReduce application
  • Task - The actual implementation of individual mappers and reducers on a piece of data
  • Job Tracker - The applications that schedules jobs and reports the jobs to Task Tracker
  • Task Tracker - Tracks the tasks and reports the statuses back to the Job Tracker

MapReduce Example in Java — Word Count

Now that you have an idea of what MapReduce is, let’s dive into the Word Count example which is widely known as the Hello World of MapReduce. The goal is to take in x amount of random words and be able to count each word and print out the word plus it’s count <word, value>. Let’s dive into it. We are going to be doing this the old fashion way just in case you don’t have your computer setup for Java development, but first, let’s look at a diagram to see a little more detail of what we will be doing.

We will have an input file that will get split into chunks to allow parallel processing. From there our mappers will make a <key, value> pair based on <word, 1>. After that there will be some shuffling to get similar <key, value> pairs in the same reducer. Once the reducer adds the values together for each word, they will write to an output file. Let’s jump into it.

First thing we need to do is create a file with a bunch of words. It’s a good idea to know a couple of counts of words so you can check your output to ensure that it is correct.

echo "Hello World! This is a test to ensure that my hello world application works" > wordcountdata

Now you should have a file named wordcountdata that has some words inside. Now let’s jump into the code. Now we need to actually write the MapReduce application. This code is based off of the Cloudera WordCount Version 1 code to ensure that it runs correctly with the Cloudera Quickstart that we are using. Please make sure to copy exactly what is below to ensure no compilation errors. Also there are some comments in the code that explains the code.

package WordCount;
//just some imports that are needed. We will add these dependencies at compile time
import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
public class WordCount extends Configured implements Tool {
   //main method to kick of the run method
   public static void main(String[] args) throws Exception{
       int exitCode = ToolRunner.run(new WordCount(), args);
       System.exit(exitCode);
   }
   //housekeeping to ensure that the MapReduce job knows what is going to happen
   public int run(String[] args) throws Exception{
       Job job = Job.getInstance(getConf(), "wordcount");
       job.setJarByClass(this.getClass());
       FileInputFormat.addInputPath(job, new Path(args[0]));
       FileOutputFormat.setOutputPath(job, new Path(args[1]));
       job.setMapperClass(Map.class);
       job.setReducerClass(Reduce.class);
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(IntWritable.class);
       return job.waitForCompletion(true) ? 0 : 1;
   }
   public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
       private final static IntWritable one = new IntWritable(1);
       private Text word = new Text();
       //simple regex to ensure that words get seperated
       private static final Pattern WORD_CRITERIA = Pattern.compile("\\s*\\b\\s*");
       public void map(LongWritable offset, Text lineText, Context context) throws IOException, InterruptedException{
           String line = lineText.toString();
           Text currentWord = new Text();
           //maps every word to a key value of <word, 1> and passes it to the context to be ready for //the reduce phase
           for(String word: WORD_CRITERIA.split(line.toLowerCase())){
               if (word.isEmpty()) {
                   continue;
               }
               currentWord = new Text(word);
               context.write(currentWord,one);
           }
       }
   }
   public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
       @Override
       public void reduce(Text word, Iterable<IntWritable> counts, Context context) throws IOException, InterruptedException{
         //sets the sum of each word to 0 and then counts each word and puts the value into a new key //value pair. E.g <word, 3>  
         int sum = 0;
           for (IntWritable count : counts) {
               sum += count.get();
           }
           context.write(word, new IntWritable(sum));
       }
   }
}

You can copy and paste this code into a file named WordCount.java. Once that is complete let’s copy it over to our docker-machine that we have running.

docker-machine scp ./WordCountData root@docker-sandbox:/home/course
docker-machine scp ./WordCount.java root@docker-sandbox:/home/course

Now let’s compile this MapReduce job. Make sure that you are inside the docker container when running the following commands.

mkdir /home/course/build
cd /home/course
javac -cp /usr/lib/hadoop/*:/usr/lib/hadoop-mapreduce/* WordCount.java -d build -Xlint
jar -cvf wordcount.jar -C build/ .

Before we execute the actual MapReduce job, we need to put the data into the correct spot.

hdfs dfs -mkdir -p /wordcount/input
hdfs dfs -copyFromLocal /home/course/WordCountData /wordcount/input

Now that our data is inside HDFS, we are ready to go. This next command will kick off your MapReduce job. It is important to put a directory that hasn’t been created yet as the output or the application will fail.

hadoop jar wordcount.jar WordCount.WordCount /wordcount/input /wordcount/output

Once the magic has completed, we need to go check the output in HDFS. You should see two files when you list the directory that you put in the output argument. You’ll want to do a -cat on the part file as done below:

hdfs dfs -ls /wordcount/output
Found 2 items
-rw-r--r--   1 root supergroup          0 2017-12-18 00:42 /wordcount/output/_SUCCESS
-rw-r--r--   1 root supergroup        172 2017-12-18 00:42 /wordcount/output/part-r-00000
hdfs dfs -cat /wordcount/output/part-r-00000
! 1
. 2
a 1
are 1
be 1
count 1
counted 1
example 1
hello 2
how 1
in 1
is 1
many 1
please 1
see 1
should 1
test 1
there 1
this 2
three 1
to 1
watch 1
word 1
words 1
world 3

Congratulations you have now ran your first MapReduce program!

Next we will look into using some of the Hadoop ecosystem to create MapReduce jobs without writing MapReduce.


© 2016-2022. All rights reserved.