CommonLounge Archive

Apache Pig Tutorial

January 18, 2018

Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs ~Source

Pig uses a language called Pig Latin to create scripts that manipulate data. The Pig Scripts are submitted to the Pig Engine that turns the Pig Latin scripts into MapReduce jobs.

This is important because Pig offers an abstraction over MapReduce meaning you don’t need all of the bloat that comes with writing a Java MapReduce program. Programmers that aren’t very familiar with Java and are more familiar with SQL usually do very well with Pig. Pig Latin is very similar to SQL. Pig allows a multi-query approach which reduces the amount of code that a developer need to execute Big Data processing. Pig has been shown to reduce development times by up to 16 times.

What is Apache Pig Used for?

Pig was named for being able to take any type of data in and spitting out a result. This means that Pig can extend to many types of data!

Pig also offers users a ton of operators such as joins, sorts, filters, and other operations that are similar to SQL and more. Along with the ease of programming that comes with Pig (remember no Java), Pig also automatically optimizes the execution of the Pig scripts for the end user so they don’t have to worry about the boring stuff and can focus on just writing the scripts they want.

Pig isn’t only more convenient for the user, but also enhances the functionality of MapReduce.

  • It is a data flow language which means that the language passes the data throughout the execution steps.
  • It offers you the ability to use bags and tuples which are a way to group data that MapReduce doesn’t offer. We will cover them in more detail below.
  • It also allows you the ability to create User-Defined Functions (UDFs) that are written in many different languages like Java, Groovy, or Python. These UDFs can then be called via Pig Latin inside your scripts — they allow the extensibility to do whatever you need to do inside of your Pig Scripts.

Pig Components

There are two ways to write Pig scripts. One is by creating a .pig file and submit it to the Pig server. Another way is to do it interactively via the Grunt Shell.

First, the Pig Latin is submitted to the Parser component that checks the syntax and conducts other checks of the script. Then, the Parser outputs a Directed Acyclic Graph (DAG) which represents the flow of data throughout the script. DAGs are graphs that represent the operation as a node and the data flow as edges. Below is an example of what a DAG looks like:

A DAG

Once the DAG is created in the Parser, it is passed to the Optimizer which basically optimizes the data flow. The Compiler comes back to compile the optimized DAG into a series of MapReduce jobs for you and then the MapReduce jobs are submitted via the execution engine.

All that the developer needs to worry about is writing the Pig Latin script correctly and the Pig framework takes care of all the rest of these steps. Note how much more efficient this can make us as a Big Data developer!

Pig Latin Basics

Atom

An Atom is any single value in Pig Latin. No matter what kind of data it is, an Atom is any single value of it. Atoms are stored as a string and can be used as a string or as a number. The type of atoms are very familiar from other programming languages. These types include int, long, float, double, char array, and byte array. A field is any piece of data or atomic value in Pig terms. For example, ‘Pizza Nut’ or ‘12.32’

Tuple

A tuple is made of fields that are ordered in a certain way. A tuple can be thought of as a row that would make up a table in the relational database world. For example, (Pizza Nut, 12.32) is an example of a tuple.

Bag

A bag is a collection of tuples that aren’t ordered any specific way. Bags are a very powerful tool. Tuples inside a bag can have different number of fields and can even contain bags inside the bags. A regular bag would look like this {(Pizza Nut, 12.32), (Baco Tell, 1.02)}. An example of an inner bag would be the following {Pizza Nut, {(Large Pizza, 9.99)}}.

Map

A map is pretty similar to maps in other programming languages. A map is just a set of key-value pairs. The only requirements is that the key must be unique and the type must be chararray. The value doesn’t have to unique and can be any type. For example, [id#1, name#Pizza Nut].

Relation

A relation is the cornerstone of Pig. It is simply a bag of tuples (also known as an outer bag) — think of this as the entire table in a relational database.

From the Apache Pig Docs: “Unlike a relational table, however, Pig relations don’t require that every tuple contain the same number of fields or that the fields in the same position (column) have the same type.”

They are also unordered because there is no way to guarantee when tuples are processed and in which order they get processed in.

Statements

Statements are the way that we construct Pig scripts. Note that every statement ends with a semicolon (just like in Java).

Relations are the cornerstone of Pig Latin precisely because statements take in a relation and output a new relation.

Apache Pig Example

Let’s dive into some examples of statements and common functions that are built in Pig.

First thing’s first. Let’s get the Cloudera Quickstart Docker Container up and running. If you need review, refer back to the Introduction to Hadoop wiki.

Set up the data

Let’s create a simple file with some data inside of it using a comma as the delimiter. Use the following commands to create a simple text file that we are going to load and do some common functions in Pig on:

vi pigData.txt

Remember to press i to get into insert mode and then type out the following:

1, Pizza Nut, 123 Main Street, 5551234567,8932
2, Baco Tell, 852 2nd Street, 5559092382, 9321
3, Burger Queen, 902 Maple Drive, 5552318242, 281
4, Shake N' Steak, 723 Applesauce Street, 5552801938, 5929

When finished press ESC and then type:wq.

Let’s make another set of data that we will use later.

First create the file:

vi joinData.txt

Then type in the following:

1,23000000
2,8723
3,92312322
4,900

Load the data

We now need to put it in HDFS with the following command

hdfs dfs -put pigData.txt /
hdfs dfs -put joinData.txt /

Now, you should be inside the quickstart container with a bash command line. Let’s jump into the Grunt Shell. Run the following command:

pig
grunt> 

Let’s have some fun now. With the next command we are going to load the test data that we just created.

stores = LOAD '/pigData.txt' USING PigStorage(',') as (id:int, name:chararray, address:chararray, phone:chararray, numberOfStores:int);

This type of command is known as a LOAD statement. We load the text file using PigStorage which takes in a delimiter. PigStorage is a function that loads and stores data in structured text files. It takes in a parameter denoting how the data is delimited. By default, PigStorage assumes it is by \t (a tab character). In our case, the delimiter is a comma (’,’). After the as keyword is the schema of the data. In our example, we have an id which is an integer, a name as a chararray, an address as a chararray, and a phone# as a chararray.

Print the relation and schema

The next command will print out the contents of a relation. This command is called DUMP.

DUMP stores;
/* Output: */
(1,Pizza Nut,123 Main Street,5551234567,8932)
(2,Baco Tell,852 2nd Street,5559387483,9321)
(3,Burger Queen,923 Maple Drive,5559209392,281)
(4,Shake N' Steak,222 Applesauce Street,5559390192,5929)

The output will show up after the MapReduce executes. You will see the logs.

Let’s create the relation for the joinData.txt.

sales = LOAD '/joinData.txt' USING PigStorage(',') as (id:int, sales:long);
DUMP sales;
/* Output: */
(1,23000000)
(2,8723)
(3,92312322)
(4,900)

The next command will tell us the schema of the relation.

DESCRIBE stores;
/* Output: */
stores: {id: int,name: chararray,address: chararray,phone: chararray,numStores: int}

Basic Pig Queries

Now that we have seen some ways to view the data, let’s filter some data to extract some information from the data.

filteredStores = FILTER stores BY numStores > 1000;
DUMP filteredStores;
/* Output: */
(1,Pizza Nut,123 Main Street,5551234567,8932)
(2,Baco Tell,852 2nd Street,5559387483,9321)
(4,Shake N' Steak,222 Applesauce Street,5559390192,5929)

Now let’s do an order by:

orderedStores = ORDER stores BY numStores DESC;
DUMP orderedStores;
/* Output: */
(2,Baco Tell,852 2nd Street,5559387483,9321)
(1,Pizza Nut,123 Main Street,5551234567,8932)
(4,Shake N' Steak,222 Applesauce Street,5559390192,5929)
(3,Burger Queen,923 Maple Drive,5559209392,281)

Even though we only have 4 fields in this example, let’s limit the output to only two returned rows.

limit_data = LIMIT orderedStores 2;
DUMP limit_data;
/* Output: */
(2,Baco Tell,852 2nd Street,5559387483,9321)
(1,Pizza Nut,123 Main Street,5551234567,8932)

Something that you should be noticing that there are a lot of logs when you say DUMP but not when you do other things like LIMIT or ORDER. Why is that? Pig uses lazy evaluation meaning nothing will actually process until you insert a command that triggers an action. For Pig, DUMP is an action that makes the script execute. Other commands like LIMIT and ORDER are “queued” up until DUMP is called.

Joins

Let’s try some joins. Joins are the combining of datasets that have a relation between each other to gain insights of the data. We currently have a list of 4 chains. We are going to add the relation we named sales into the view of the data to understand how much those same companies do.

company_sales = JOIN stores BY id, sales BY id;

If you are familiar with SQL, this looks very familiar to an inner join in the world of SQL. Let’s do a DUMP and a DESCRIBE to see the new relation we have created between the two sets of data.

DUMP company_sales;
/* Output: */
(1, Pizza Nut, 123 Main Street, 5551234567,8932,1,23000000)
(2, Baco Tell, 852 2nd Street, 5559092382,9321,2,8723)
(3, Burger Queen, 902 Maple Drive, 5552318242,281,3,92312322)
(4, Shake N' Steak, 723 Applesauce Street, 5552801938,5929,4,900)
DESCRIBE company_sales;
/* Output: */
company_sales: {stores::id: int,stores::name: chararray,stores::address: chararray,stores::phone: chararray,stores::numberOfStores: int,sales::id: int,sales::sales: long}

Notice in the describe that the schema now tells you which relation it came from.

Writing a Pig Script

You now have the hang of the Grunt Shell. Let’s actually write a Pig script and submit it to the Pig server.

There is a problem in HDFS known as the small file problem. Basically, HDFS wants large amounts of data. It needs huge files to operate correctly. However, some processes output very small files, which puts a lot of pressure on the NameNode and actually slows down performance significantly. So we are going to create a Pig script that takes in a HDFS directory with small files, combines them, and outputs them into a new directory. Let’s dive in.

First thing is first we need to make a HDFS directory and then put a lot of small files in it. Let’s begin.

hdfs dfs -mkdir -p /pig/smallfiles/

You should still have your /pigData.txt so let’s push that and a couple versions of that to HDFS. The following commands should do the trick.

Note that we’re just copying pigData.txt again and again and putting it inside the /pig/smallfiles directory.

hdfs dfs -put pigData.txt /pig/smallfiles/
cp pigData.txt pigData2.txt
hdfs dfs -put pigData2.txt /pig/smallfiles/
cp pigData.txt pigData3.txt
hdfs dfs -put pigData3.txt /pig/smallfiles/
cp pigData.txt pigData4.txt
hdfs dfs -put pigData4.txt /pig/smallfiles/
hdfs dfs -ls /pig/smallfiles/
/* Output: */
Found 4 items
-rw-r--r--   1 root supergroup        188 2018-01-17 02:21 /pig/smallfiles/pigData.txt
-rw-r--r--   1 root supergroup        188 2018-01-17 02:21 /pig/smallfiles/pigData2.txt
-rw-r--r--   1 root supergroup        188 2018-01-17 02:22 /pig/smallfiles/pigData3.txt
-rw-r--r--   1 root supergroup        188 2018-01-17 02:22 /pig/smallfiles/pigData4.txt

Let’s create the Pig script.

vi compact.pig

Don’t forget the i to insert text

in_dir = LOAD '$IN_DIR' USING PigStorage();
STORE in_dir INTO '$OUT_DIR' USING PigStorage();
out_data = LOAD '$OUT_DIR/part-m-00000' USING PigStorage(',') as (id:int, name:chararray, address:chararray, phone:chararray, numberOfStores:int);
group_data = GROUP out_data BY name;
count = foreach group_data Generate COUNT(out_data.name);
DUMP count;

Don’t forget to do ESC, followed by :wq to save and quit the script.

Let’s walk through the script above:

  1. We are creating a relation named in_dir and loading the directory.
  2. Then we simply STORE the relation into an output directory (out_data).
  3. Next, we take that data and apply the schema.
  4. Next, we group the data by the name of the companies
  5. Finally, we do a count of the number of times each company name was mentioned in our big file. Let’s put it into action.
pig -f compact.pig -param IN_DIR=/pig/smallfiles -param OUT_DIR=/pig/bigfile

Let’s talk about the above command. Earlier we entered the grunt shell by just typing PIG. Here we add the -f flag which tells pig that we don’t want the interactive shell, but we want to execute the commands inside a file (f). We follow the -f flag with the name of the actual file we want to execute. Inside the script we have some variables that we use. You can tell which ones they are by the ‘$’. To pass a value to those variables, we have to use the -param flag and the variable we want to pass. So we do that twice -param for the IN_DIR and -param for the OUT_DIR. It’s important to note that the output directory for any pig script cannot exist before the execution of the script or the script will fail.

Some of the output you should see is the following

(4)
(4)
(4)
(4)

But what does this mean? Remember we copied the same data 4 times so we should have 4 independent companies listed 4 times. So that’s where the 4s come. This is a direct result of Steps 4 & 5 in the pig script above.

After some time, we can now check the output.

hdfs dfs -cat /pig/bigfile/part-m-00000
/* Output: */
1,Pizza Nut,123 Main Street,5551234567,8932
2,Baco Tell,852 2nd Street,5559387483,9321
3,Burger Queen,923 Maple Drive,5559209392,281
4,Shake N' Steak,222 Applesauce Street,5559390192,5929
1,Pizza Nut,123 Main Street,5551234567,8932
2,Baco Tell,852 2nd Street,5559387483,9321
3,Burger Queen,923 Maple Drive,5559209392,281
4,Shake N' Steak,222 Applesauce Street,5559390192,5929
1,Pizza Nut,123 Main Street,5551234567,8932
2,Baco Tell,852 2nd Street,5559387483,9321
3,Burger Queen,923 Maple Drive,5559209392,281
4,Shake N' Steak,222 Applesauce Street,5559390192,5929
1,Pizza Nut,123 Main Street,5551234567,8932
2,Baco Tell,852 2nd Street,5559387483,9321
3,Burger Queen,923 Maple Drive,5559209392,281
4,Shake N' Steak,222 Applesauce Street,5559390192,5929

TaDa! Great job. You just solved one of the biggest issues that HDFS has. The Small File Problem. One thing to mention is that you might have notice that Pig runs pretty slow in the Docker container. That is expected because Pig is meant to run on a cluster of machines. Since we only have one Docker machine, it doesn’t have the other servers to distribute. We are just practicing anyway.

Conclusion

Congrats! You’ve learned Pig. It is a great tool to do ETL and other Data Flow workloads. Pig is an essential and early application inside of the Hadoop ecosystem. Don’t forget to turn off your docker machine if you are on Digital Ocean!


© 2016-2022. All rights reserved.