In the last MP, you were introduced to the concept of Map/Reduce and how we can execute Map/Reduce using Python with MRJob.
This week, we’ll give you access to a couple modestly large datasets. Like in the last MP, we’ll be using the MRJob library to write mappers and reducers.
As we’ll be running this week’s MP on Hadoop, our input data will be stored in the Hadoop Distributed File System (HDFS).
In order to view the results of our Hadoop MapReduce job, we must use HDFS DFS commands to examine the directory and files generated by our Python MapReduce programs. The following list of DFS commands might prove useful to view the results of this map/reduce job.
Your “home directory” is at the path /user/<netid>
on the course cluster (in HDFS).
Now when we run our MRJob applications, we will specify an output directory in HDFS instead of piping our output to a file. Usually, job output is split into multiple output files – one per reducer. If you set your output directory to be /user/<netid>/job_out
, then your output file names will be in the format /user/<netid>/job_our/part-XXXXX
, starting with part-00000
.
# List the /data directory
hdfs dfs -ls /data
# Do a line count on the first part of the output
hdfs dfs -count -h /user/<netid>/job_out/part-00000
# Tail an output file
hdfs dfs -tail /user/<netid>/job_out/part-00000
# View an enture output file (careful not to 'cat' large files!)
hdfs dfs -car /user/<netid>/job_out/part-00000
We can also make directories, and delete files/directories as you’d expect:
# Create the /user/<netid>/job_out directory
hdfs dfs -mkdir /user/<netid>/job_out
# Delete the /user/<netid>/job_out/part-00000 file
hdfs dfs -rm /user/<netid>/job_out/part-00000
# Delete the /user/<netid>/job_out directory
hdfs dfs -rm -r /user/<netid>/job_out
Note that these Hadoop HDFS commands can be intermixed with Unix commands to perform additional text processing. The important point is that direct file I/O operations must use HDFS commands to work with the HDFS file system.
This data comes from Stanford’s SNAP group, and is an (unfiltered) collection of ~467 million tweets from 2009.
The dataset is located in /data/snapTwitterData
in HDFS. You’ll find these files:
/data/snapTwitterData/tweets2009-06.tsv
/data/snapTwitterData/tweets2009-07.tsv
/data/snapTwitterData/tweets2009-08.tsv
/data/snapTwitterData/tweets2009-09.tsv
/data/snapTwitterData/tweets2009-10.tsv
/data/snapTwitterData/tweets2009-11.tsv
/data/snapTwitterData/tweets2009-12.tsv
Each file is a tsv
(tab-separated value) file composed of a tweet on each line, from the designated year/month (i.e. tweets2009-06.tsv
contains tweets from June, 2009). The schema of the file is as follows:
POST_DATETIME <tab> TWITTER_USER_URL <tab> TWEET_TEXT
Example:
2009-10-31 23:59:58 http://twitter.com/sometwitteruser Lorem ipsum dolor sit amet, consectetur adipiscing elit
This data comes from Kaggle, and contains over a decade of library checkout records from the Seattle Public library.
The dataset is located in /data/seattleLibrary
in HDFS. You’ll find these files:
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2005.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2006.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2007.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2008.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2009.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2010.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2011.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2012.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2013.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2014.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2015.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2016.csv
/data/seattleLibrary/Checkouts_By_Title_Data_Lens_2017.csv
/data/seattleLibrary/Library_Collection_Inventory.csv
This dataset has 2 distinct schema. The files entitled Checkouts_By_Title_Data_Lens_*.csv
are records of Library checkout data. The file entitled Library_Collection_Inventory.csv
has information about each of the Library’s inventory items (books, DVDs, CDs, etc.). More on this in Problem 3…
Write a map/reduce program to determine the the number of @-mentions each user received. Do this by analyzing the text of each Tweet in the dataset, and extract @-mentions.
Details:
.
in-front of a username. (.@jack
should be counted as an at-mention. (@jack)
is also valid, for example)@jack hello world. what's up @jill?
contains an @-mention to @jack
and @jill
@jack @jack @jack
should only “count” as 1 @-mention to @jack
Output your results in the following format:
Example:
Input: See ./data/twitter_sample.tsv
Output: See ./data/at_mentions_solution.tsv
Running Your Code
python twitter_at_mentions.py data/twitter_sample.tsv > at_mentions.txt
python twitter_at_mentions.py -r hadoop --no-output --output-dir /user/<netid>/at_mentions hdfs:///data/snapTwitterData/tweets2009-06.tsv
Write a map/reduce program to determine the user that created the most Tweets for every given day in the dataset. (If there’s a tie, break the tie by sorting alphabetically on users’ handles, and picking the user with the “lowest” username lexicographically)
Output your results in the following format:
YYYY-MM-DD
formatExample:
Input: See ./data/twitter_sample.tsv
Output: See ./data/active_users_solution.tsv
Running Your Code
python twitter_active_users.py data/twitter_sample.tsv > active_users.txt
python twitter_active_users.py -r hadoop --no-output --output-dir /user/<netid>/active_users hdfs:///data/snapTwitterData/tweets2009-06.tsv
Write a map/reduce program to determine in the Seattle Library Dataset how many times each library item (book, DVD, etc.) – keyed by title – was checked out per year. To do this, you will have to execute a join in MapReduce. Note that we have 2 distinct input formats:
Checkouts_By_Title_Data_Lens_<YEAR>.csv
is formatted as:
BibNumber, ItemBarcode, ItemType, Collection, CallNumber, CheckoutDateTime
Library_Collection_Inventory.csv
is formatted as:
BibNum, Title, Author, ISBN, PublicationYear, Publisher
You should join these two types of data on the BibNum
/BibNumber
attribute to determine the number of times each inventory item has been checked out. You should aggregate your results by the title. If multiple inventory items have the same title, their checkout results should be summed. If, for some reason, you cannot find a corresponding title for a given BibNumber, do not emit any output for that item.
Note: You may find it useful/necessary to use 2 MapReduce steps for this problem.
Output your results in the following format:
"BOOK_TITLE|YEAR"
(i.e. "The Old Man and the Sea|2017"
)Example:
Input: See ./data/library_inventory.csv
and ./data/library_checkouts.csv
Output: See ./data/library_solution.tsv
Running Your Code
python library_checkout_count.py data/library_inventory.csv data/library_checkouts.csv > library.txt
python library_checkout_count.py -r hadoop --no-output --output-dir /user/<netid>/library hdfs:///data/seattleLibrary/Library_Collection_Inventory.csv hdfs:///data/seattleLibrary/Checkouts_By_Title_Data_Lens_2005.csv
For this part, we want you to become more acquainted with the performance characteristics of Hadoop MapReduce, and the options that you have as a Hadoop user to change the parameters of your job’s execution.
Notice that at the end of your Hadoop job, you’ll see a report that looks something like this:
...
File Input Format Counters
Bytes Read=1253590
File Output Format Counters
Bytes Written=409896
File System Counters
FILE: Number of bytes read=592839
FILE: Number of bytes written=1973404
FILE: Number of large read operations=0
FILE: Number of read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1254545
HDFS: Number of bytes written=409896
HDFS: Number of large read operations=0
HDFS: Number of read operations=18
HDFS: Number of write operations=2
Job Counters
Data-local map tasks=5
Killed map tasks=1
Launched map tasks=5
Launched reduce tasks=1
Total megabyte-milliseconds taken by all map tasks=70342656
Total megabyte-milliseconds taken by all reduce tasks=16027648
Total time spent by all map tasks (ms)=68694
Total time spent by all maps in occupied slots (ms)=2198208
Total time spent by all reduce tasks (ms)=7826
Total time spent by all reduces in occupied slots (ms)=500864
Total vcore-milliseconds taken by all map tasks=68694
Total vcore-milliseconds taken by all reduce tasks=7826
Map-Reduce Framework
CPU time spent (ms)=16490
Combine input records=0
Combine output records=0
Failed Shuffles=0
GC time elapsed (ms)=687
Input split bytes=955
Map input records=100000
Map output bytes=1126862
Map output materialized bytes=585596
Map output records=100000
Merged Map outputs=5
Physical memory (bytes) snapshot=1881874432
Reduce input groups=100000
Reduce input records=100000
Reduce output records=43102
Reduce shuffle bytes=585596
Shuffled Maps =5
Spilled Records=200000
Total committed heap usage (bytes)=1585971200
Virtual memory (bytes) snapshot=18156883968
...
These values are useful as a gauge for how Hadoop has executed your job. Of particular note are the following metrics:
File System Counters
HDFS: *
- How the MapReduce job interacted with the HDFS file systemJob Counters
Total megabyte-milliseconds taken by all map tasks
- Aggregated Map Phase “cost” in terms of megabytes of memory allocated times total runtime of all mappers (in seconds)Total megabyte-milliseconds taken by all reduce tasks
- Aggregated Reduce Phase “cost” in terms of megabytes of memory allocated times total runtime of all reducers (in seconds)Total vcore-milliseconds taken by all map tasks
- Aggregated Map Phase “cost” in terms of number of virtual CPU core in use times total runtime of all mappers (in seconds)Total vcore-milliseconds taken by all map tasks
- Aggregated Reduce Phase “cost” in terms of number of virtual CPU core in use times total runtime of all reducers (in seconds)Map-Reduce Framework
Using mrjob, we can tell Hadoop how many mappers and how many reducers to use. The mapreduce.job.maps
and mapreduce.job.reduces
configuration options control the number of map and reduce tasks, respectively.
For example, this invocation would have Hadoop use 10 mappers, and 5 reducers:
python my_hadoop_job.py -r haddop --jobconf mapreduce.job.maps=10 --jobconf mapreduce.job.reduces=5
Unfortunately, the number of map tasks is not reliably configurable due to the way we have Hadoop/YARN configured. However, you should be able to reliably change the number of reducers using the mapreduce.job.reduces
flag.
Chose one of the MapReduce jobs you wrote in Problems 1, 2, or 3 and experiment with 3 configurations of the number of reducers (i.e. perhaps 1 reducer vs 5 vs 10). In approximately 50-100 words, characterize your observations for these 3 configurations in report.txt
. Reference the “cost” of the job (in terms of any of the metrics given to you in the Hadoop job results), and mention the tradeoffs between job wall-clock performance and resource usage.
Hadoop jobs can take a very long time to complete. If you don’t take precautions, you’ll lose all your progress if something happens to your SSH session.
To mitigate this, we have installed tmux
on the cluster. Tmux is a tool that lets us persist shell sessions even when we lose SSH connection.
tmux
to enter into a tmux session.ping google.com
)tmux attach-session
and you should find your session undisturbed.-r hadoop
)-r hadoop
).
python my_job.py hdfs:///example/file1.csv hdfs:///example/file2.csv hdfs:///example/file3.csv
python my_job.py hdfs:///example/file*.csv
yarn application -list
to see if others are waiting. Otherwise, limit your input to a minimal number of files.Hadoop has a really useful web interface to view how jobs are executed. Accessing this web interface possible by using an ssh tunnel when you create your ssh session to the cluster.
UNIX users can access this UI by appending this sequence to their ssh command:
-L 20888:ip-172-31-5-92.ec2.internal:20888 -L 19888:ip-172-31-5-92.ec2.internal:19888
Windows users can use PuTTY in a similar fashion.
For example, your “full” ssh command may look like this:
ssh -i /path/to/ssh_key <netid>@<cluster_address> -L 20888:ip-172-31-5-92.ec2.internal:20888 -L 19888:ip-172-31-5-92.ec2.internal:19888
To see a list of running / completed jobs, visit localhost:19888
in your browser.
When you start a Hadoop job in the cluster, you’ll see a log message like this:
The url to track the job: http://ip-172-31-5-92.ec2.internal:20888/proxy/application_1517153180921_0008/
Copy the path suffix of this url and append it to localhost:20888
to visit the in-progress job UI. For example, with this job, the location you’d visit in your browser would be:
localhost:20888/proxy/application_1517153180921_0008
MP 2 is due on Tuesday, February 6th, 2018 at 11:59PM.
You can find starter code and the Git repository where you should submit your code here:
If you have issues accessing your repository, make sure that you’ve signed into Gitlab. If you still do not have access, please make a private post on the course Piazza.
Please write your solutions as indicated in the following files:
twitter_at_mentions.py
twitter_active_users.py
library_checkout_count.py
report.txt
… and commit your code to Gitlab like this:
git add .
git commit -m "Completed MP" # Your commit message doesn't need to match this
git push
WARNING: Our autograder runs on Python3. If you write code for this assignment that only works on Python2, we will be unable to grade your code. Code that contains syntax errors (i.e. unable to be interpreted by the Python interpreter) will receive no credit.