CommonLounge Archive

Hands-on Assignment: MapReduce

April 17, 2018

In this hands-on assignment, we’ll see map-reduce in action. In particular, we’ll implement the PageRank algorithm in the MapReduce framework. Our dataset is going to be Simple Wikipedia, which has a total of ~100,000 articles and (in this dataset) ~750,000 links. The main objective of the assignment is to get comfortable with how map-reduce works.

Overview

This assignment guides you through using map-reduce to calculate PageRank scores for wikipedia articles. Take the quiz once you complete the assignment, as the quiz asks you for the results from the assignment.

Note: In this assignment, the map-reduce will be run on your computer. However, the map and reduce functions you write can in general be executed easily in a distributed system.

Dataset

You can download the dataset at the following link: simplewiki-links. (the dataset is derived from the following dataset: Wikipedia, simple en (dynamic)).

Note: The dataset only includes article ids, not the actual title of the article.

Implementing PageRank using MapReduce

Setup

In addition, save the following code as main.py. This is the code that you need to run to get the final answers.

Note: You can skim the file if you are curious, but it is not required that you do so. I recommend taking a look at the execute_map_reduce function.

from collections import defaultdict
import pandas as pd
import mapreduce
###############################################################################
## load data
narticles = 100312
article_ids = [xx+1 for xx in range(narticles)]
links = pd.read_csv('./simplewiki-links', sep='\t', names=['source', 'target'], header=None)
###############################################################################
## execute map reduce
def execute_map_reduce(data, map_function, reduce_function):
    # map
    intermediate_values = defaultdict(list)
    for data_point in data:
        key, value = map_function(*data_point)
        intermediate_values[key].append(value)
    # reduce
    result = dict()
    for key, values in intermediate_values.items():
        key, value = reduce_function(key, values)
        result[key] = value
    return result
###############################################################################
## calculate outdegree
data = zip(links['source'], links['target'])
outdegrees = execute_map_reduce(data, mapreduce.outdegree_mapper, mapreduce.outdegree_reducer)
###############################################################################
## calculate pageranks
max_niterations = 250
tolerance = 0.001
default_pagerank = 1. - mapreduce.alpha
# init
pagerank = dict()
# iterate
for iteration in range(max_niterations):
    # calculate
    data = [(source, target, outdegrees[source], pagerank.get(source, default_pagerank)) for source, target in zip(links['source'], links['target'])]
    new_pagerank = execute_map_reduce(data, mapreduce.pagerank_mapper, mapreduce.pagerank_reducer)
    max_delta = max(abs(new_pagerank.get(xx, default_pagerank)-pagerank.get(xx, default_pagerank))/pagerank.get(xx, default_pagerank) for xx in pagerank.keys()) if pagerank else 100.0
    pagerank = new_pagerank # update
    print('Iteration number: ', iteration, 'Max delta: ', max_delta)
    print '\nFor checking:'
    for xx in article_ids[0:5]: print 'Pagerank for article %d is %f' % (xx, pagerank[xx])
    print '\nFor submitting:'
    for xx in article_ids[5:10]: print 'Pagerank for article %d is %f' % (xx, pagerank[xx])
    print '='*80
    if max_delta < tolerance: break # termination criteria
###############################################################################

MapReduce

main.py imports mapreduce.py. A template for mapreduce.py is included below.

###############################################################################
## outdegree
def outdegree_mapper(source, target):
    return ... 
def outdegree_reducer(source, values):
    return ... 
###############################################################################
## pagerank
alpha = 0.95
def pagerank_mapper(source, target, source_outdegree, source_pagerank):
    return ... 
def pagerank_reducer(target, values):
    return ... 
###############################################################################

Each of the above functions should return a (key, value) pair. You can assume that the map function for each link with the arguments described in the template code above. The results of all the map are combined, and each reducer receives a list of values associated with a key.

outdegree_mapper and outdegree_reducer should together implement the outdegree function, i.e. number of outdoing links from each source.

pagerank_mapper and pagerank_reducer should together implement one iteration of the PageRank algorithm. This is given by the following formula,

$$ \text{PageRank}(t) = (1 - \alpha) + \alpha \bigg( \sum_s \frac{\text{PageRank}(s)}{\text{outdegree}(s)} \bigg) $$

Running the code

Once you run the code, you’ll see outputs similar to the following.

('Iteration number: ', 104, 'Max delta: ', 0.0009919941206116103)
For checking:
Pagerank for article 1 is 6.830842
Pagerank for article 2 is 13.294348
Pagerank for article 3 is 7.168526
Pagerank for article 4 is 7.227037
Pagerank for article 5 is 11.144195
For submitting:
Pagerank for article 6 is ...
Pagerank for article 7 is ...
Pagerank for article 8 is ...
Pagerank for article 9 is ...
Pagerank for article 10 is ...

You can use the above values to check the correctness of your code. (make sure they match within 1%).

Expected return type: If you get errors when you run the code, make sure your return types for each function is correct. All functions should return (key, value) pairs. In addition, for the reduce functions, the value must be a number (int or float).


Once everything is working, you can submit the assignment by taking the quiz below.


© 2016-2022. All rights reserved.