MP 2: MapReduce on Hadoop

Introduction

In the last MP, you were introduced to the concept of Map/Reduce and how we can execute Map/Reduce using Python with MRJob.

This week, we’ll give you access to a couple modestly large datasets. Like in the last MP, we’ll be using the MRJob library to write mappers and reducers.

HDFS Primer

As we’ll be running this week’s MP on Hadoop, our input data will be stored in the Hadoop Distributed File System (HDFS).

In order to view the results of our Hadoop MapReduce job, we must use HDFS DFS commands to examine the directory and files generated by our Python MapReduce programs. The following list of DFS commands might prove useful to view the results of this map/reduce job.

Your “home directory” is at the path /user/<netid> on the course cluster (in HDFS).

Now when we run our MRJob applications, we will specify an output directory in HDFS instead of piping our output to a file. Usually, job output is split into multiple output files – one per reducer. If you set your output directory to be /user/<netid>/job_out, then your output file names will be in the format /user/<netid>/job_our/part-XXXXX, starting with part-00000.

# List the /data directory
hdfs dfs -ls /data

# Do a line count on the first part of the output
hdfs dfs -count -h /user/<netid>/job_out/part-00000

# Tail an output file
hdfs dfs -tail /user/<netid>/job_out/part-00000

# View an enture output file (careful not to 'cat' large files!)
hdfs dfs -car /user/<netid>/job_out/part-00000

We can also make directories, and delete files/directories as you’d expect:

# Create the /user/<netid>/job_out directory
hdfs dfs -mkdir /user/<netid>/job_out

# Delete the /user/<netid>/job_out/part-00000 file
hdfs dfs -rm /user/<netid>/job_out/part-00000

# Delete the /user/<netid>/job_out directory
hdfs dfs -rm -r /user/<netid>/job_out

Note that these Hadoop HDFS commands can be intermixed with Unix commands to perform additional text processing. The important point is that direct file I/O operations must use HDFS commands to work with the HDFS file system.

The Datasets

Twitter

This data comes from Stanford’s SNAP group, and is an (unfiltered) collection of ~467 million tweets from 2009.

The dataset is located in /data/snapTwitterData in HDFS. You’ll find these files:

/data/snapTwitterData/tweets2009-06.tsv
/data/snapTwitterData/tweets2009-07.tsv
/data/snapTwitterData/tweets2009-08.tsv
/data/snapTwitterData/tweets2009-09.tsv
/data/snapTwitterData/tweets2009-10.tsv
/data/snapTwitterData/tweets2009-11.tsv
/data/snapTwitterData/tweets2009-12.tsv

Each file is a tsv (tab-separated value) file composed of a tweet on each line, from the designated year/month (i.e. tweets2009-06.tsv contains tweets from June, 2009). The schema of the file is as follows:

POST_DATETIME <tab> TWITTER_USER_URL <tab> TWEET_TEXT

Example:

2009-10-31 23:59:58 http://twitter.com/sometwitteruser  Lorem ipsum dolor sit amet, consectetur adipiscing elit

Seattle Library

This data comes from Kaggle, and contains over a decade of library checkout records from the Seattle Public library.

The dataset is located in /data/seattleLibrary in HDFS. You’ll find these files:

/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2005.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2006.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2007.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2008.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2009.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2010.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2011.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2012.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2013.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2014.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2015.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2016.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2017.csv
/data/seattleLibrary/Library_Collection_Inventory.csv

This dataset has 2 distinct schema. The files entitled Checkouts_By_Title_Data_Lens_*.csv are records of Library checkout data. The file entitled Library_Collection_Inventory.csv has information about each of the Library’s inventory items (books, DVDs, CDs, etc.). More on this in Problem 3…

MP Activities

Problem 1: Twitter @-Mentions

Write a map/reduce program to determine the the number of @-mentions each user received. Do this by analyzing the text of each Tweet in the dataset, and extract @-mentions.

Details:

Output your results in the following format:

Example:

Input: See ./data/twitter_sample.tsv

Output: See ./data/at_mentions_solution.tsv

Running Your Code

python twitter_at_mentions.py data/twitter_sample.tsv > at_mentions.txt
python twitter_at_mentions.py -r hadoop --no-output --output-dir /user/<netid>/at_mentions hdfs:///data/snapTwitterData/tweets2009-06.tsv

Problem 2: Twitter Most Active Users

Write a map/reduce program to determine the user that created the most Tweets for every given day in the dataset. (If there’s a tie, break the tie by sorting alphabetically on users’ handles, and picking the user with the “lowest” username lexicographically)

Output your results in the following format:

Example:

Input: See ./data/twitter_sample.tsv

Output: See ./data/active_users_solution.tsv

Running Your Code

python twitter_active_users.py data/twitter_sample.tsv > active_users.txt
python twitter_active_users.py -r hadoop --no-output --output-dir /user/<netid>/active_users hdfs:///data/snapTwitterData/tweets2009-06.tsv

Problem 3: Library Book Checkout Count

Write a map/reduce program to determine in the Seattle Library Dataset how many times each library item (book, DVD, etc.) – keyed by title – was checked out per year. To do this, you will have to execute a join in MapReduce. Note that we have 2 distinct input formats:

  1. Checkouts_By_Title_Data_Lens_<YEAR>.csv is formatted as:

     BibNumber, ItemBarcode, ItemType, Collection, CallNumber, CheckoutDateTime
    
  2. Library_Collection_Inventory.csv is formatted as:

     BibNum, Title, Author, ISBN, PublicationYear, Publisher
    

You should join these two types of data on the BibNum/BibNumber attribute to determine the number of times each inventory item has been checked out. You should aggregate your results by the title. If multiple inventory items have the same title, their checkout results should be summed. If, for some reason, you cannot find a corresponding title for a given BibNumber, do not emit any output for that item.

Note: You may find it useful/necessary to use 2 MapReduce steps for this problem.

Output your results in the following format:

Example:

Input: See ./data/library_inventory.csv and ./data/library_checkouts.csv

Output: See ./data/library_solution.tsv

Running Your Code

python library_checkout_count.py data/library_inventory.csv data/library_checkouts.csv > library.txt
python library_checkout_count.py -r hadoop --no-output --output-dir /user/<netid>/library hdfs:///data/seattleLibrary/Library_Collection_Inventory.csv hdfs:///data/seattleLibrary/Checkouts_By_Title_Data_Lens_2005.csv

Problem 4: Hadoop Performance

For this part, we want you to become more acquainted with the performance characteristics of Hadoop MapReduce, and the options that you have as a Hadoop user to change the parameters of your job’s execution.

Notice that at the end of your Hadoop job, you’ll see a report that looks something like this:

...
File Input Format Counters
    Bytes Read=1253590
File Output Format Counters
    Bytes Written=409896
File System Counters
    FILE: Number of bytes read=592839
    FILE: Number of bytes written=1973404
    FILE: Number of large read operations=0
    FILE: Number of read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=1254545
    HDFS: Number of bytes written=409896
    HDFS: Number of large read operations=0
    HDFS: Number of read operations=18
    HDFS: Number of write operations=2
Job Counters
    Data-local map tasks=5
    Killed map tasks=1
    Launched map tasks=5
    Launched reduce tasks=1
    Total megabyte-milliseconds taken by all map tasks=70342656
    Total megabyte-milliseconds taken by all reduce tasks=16027648
    Total time spent by all map tasks (ms)=68694
    Total time spent by all maps in occupied slots (ms)=2198208
    Total time spent by all reduce tasks (ms)=7826
    Total time spent by all reduces in occupied slots (ms)=500864
    Total vcore-milliseconds taken by all map tasks=68694
    Total vcore-milliseconds taken by all reduce tasks=7826
Map-Reduce Framework
    CPU time spent (ms)=16490
    Combine input records=0
    Combine output records=0
    Failed Shuffles=0
    GC time elapsed (ms)=687
    Input split bytes=955
    Map input records=100000
    Map output bytes=1126862
    Map output materialized bytes=585596
    Map output records=100000
    Merged Map outputs=5
    Physical memory (bytes) snapshot=1881874432
    Reduce input groups=100000
    Reduce input records=100000
    Reduce output records=43102
    Reduce shuffle bytes=585596
    Shuffled Maps =5
    Spilled Records=200000
    Total committed heap usage (bytes)=1585971200
    Virtual memory (bytes) snapshot=18156883968
...

These values are useful as a gauge for how Hadoop has executed your job. Of particular note are the following metrics:

Using mrjob, we can tell Hadoop how many mappers and how many reducers to use. The mapreduce.job.maps and mapreduce.job.reduces configuration options control the number of map and reduce tasks, respectively.

For example, this invocation would have Hadoop use 10 mappers, and 5 reducers:

python my_hadoop_job.py -r haddop --jobconf mapreduce.job.maps=10 --jobconf mapreduce.job.reduces=5

Unfortunately, the number of map tasks is not reliably configurable due to the way we have Hadoop/YARN configured. However, you should be able to reliably change the number of reducers using the mapreduce.job.reduces flag.

Chose one of the MapReduce jobs you wrote in Problems 1, 2, or 3 and experiment with 3 configurations of the number of reducers (i.e. perhaps 1 reducer vs 5 vs 10). In approximately 50-100 words, characterize your observations for these 3 configurations in report.txt. Reference the “cost” of the job (in terms of any of the metrics given to you in the Hadoop job results), and mention the tradeoffs between job wall-clock performance and resource usage.

Don’t lose your progress!

Hadoop jobs can take a very long time to complete. If you don’t take precautions, you’ll lose all your progress if something happens to your SSH session.

To mitigate this, we have installed tmux on the cluster. Tmux is a tool that lets us persist shell sessions even when we lose SSH connection.

  1. Run tmux to enter into a tmux session.
  2. Run some command that will run for a long time (i.e. ping google.com)
  3. Exit out of your SSH session.
  4. Log back into the server and run tmux attach-session and you should find your session undisturbed.

Suggested Workflow

  1. Write your MapReduce job and test it locally (using provided testing data and the same commands as last week)
  2. Test your MapReduce job on a subset of the entire data – just a single “large” HDFS file (using -r hadoop)
  3. Test your MapReduce job on the whole dataset stored in HDFS (using -r hadoop).
    • You can run your Hadoop jobs on multiple files by appending more filenames to your job invocation, or by using the “*” wildcard
      • python my_job.py hdfs:///example/file1.csv hdfs:///example/file2.csv hdfs:///example/file3.csv
      • python my_job.py hdfs:///example/file*.csv
    • NOTE Only run your jobs on the entire dataset if the cluster isn’t currently being used by others
      • You can run yarn application -list to see if others are waiting. Otherwise, limit your input to a minimal number of files.

Hadoop Web Interfaces

Hadoop has a really useful web interface to view how jobs are executed. Accessing this web interface possible by using an ssh tunnel when you create your ssh session to the cluster.

UNIX users can access this UI by appending this sequence to their ssh command:

-L 20888:ip-172-31-5-92.ec2.internal:20888 -L 19888:ip-172-31-5-92.ec2.internal:19888

Windows users can use PuTTY in a similar fashion.

For example, your “full” ssh command may look like this:

ssh -i /path/to/ssh_key <netid>@<cluster_address> -L 20888:ip-172-31-5-92.ec2.internal:20888 -L 19888:ip-172-31-5-92.ec2.internal:19888

To see a list of running / completed jobs, visit localhost:19888 in your browser.

When you start a Hadoop job in the cluster, you’ll see a log message like this:

The url to track the job: http://ip-172-31-5-92.ec2.internal:20888/proxy/application_1517153180921_0008/

Copy the path suffix of this url and append it to localhost:20888 to visit the in-progress job UI. For example, with this job, the location you’d visit in your browser would be:

localhost:20888/proxy/application_1517153180921_0008

Submission

MP 2 is due on Tuesday, February 6th, 2018 at 11:59PM.

You can find starter code and the Git repository where you should submit your code here:

If you have issues accessing your repository, make sure that you’ve signed into Gitlab. If you still do not have access, please make a private post on the course Piazza.

Please write your solutions as indicated in the following files:

… and commit your code to Gitlab like this:

git add .
git commit -m "Completed MP" # Your commit message doesn't need to match this
git push

WARNING: Our autograder runs on Python3. If you write code for this assignment that only works on Python2, we will be unable to grade your code. Code that contains syntax errors (i.e. unable to be interpreted by the Python interpreter) will receive no credit.