Showing posts with label Hadoop. Show all posts
Showing posts with label Hadoop. Show all posts

Sunday, June 9, 2013

Short tutorial on Dumbo with MapReduce


This short tutorial quickly guides you through the most important aspects of Dumbo.
  1. Prerequisites
  2. Simple example: Counting IPs
  3. Mapper and reducer classes
  4. Jobs and runners
  5. Programs and starters
  6. Input formats
  7. Eggs and jars
  8. Further reading

Prerequisites

In the remainder of this tutorial, it is assumed that you successfully completed all installation steps described in Building and installing for a recent Dumbo version. This tutorial also requires you to already be (a bit) familiar with MapReduce, since the goal is to explain Dumbo, not MapReduce.

Simple example: Counting IPs

Suppose that you want to generate a top five of the IPs that occur most frequently in a given Apache access log file access.log. In UNIX, this can be done as follows:
$ cut -d ' ' -f 1 access.log | sort | uniq -c | sort -nr | head -n 5
The following Dumbo program ipcount.py provides an alternatives solution:
def mapper(key, value):
    yield value.split(" ")[0], 1

def reducer(key, values):
    yield key, sum(values)

if __name__ == "__main__":
    import dumbo
    dumbo.run(mapper, reducer, combiner=reducer)

p. When you run this program by executing the commands
$ dumbo start ipcount.py -input access.log -output ipcounts
$ dumbo cat ipcounts | sort -k2,2nr | head -n 5 
you actually do something very similar to the UNIX command given above. The mapper and reducer run as separate UNIX processes, and the output from the mapper process is piped through sort before it goes to the reducer process. However, by adding the -hadoop option, this Dumbo program can be run on a whole Hadoop cluster instead of a single UNIX machine, which allows you to generate the top 5 for gigabytes or even terabytes of weblogs. For instance, if Hadoop is installed in /usr/local/hadoop on the machine from which you run your jobs, and your weblogs are put in directories of the form weblogs/<year>/<month>/<day>/ on the HDFS, then you can generate the top 5 for December 2008 as follows:
$ dumbo start ipcount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* -output ipcounts
$ dumbo cat ipcounts/part* -hadoop /usr/local/hadoop | sort -k2,2nr | head -n 5 
When your Hadoop cluster is large enough, this will still work if your website gets millions of page views every day, whereas the corresponding UNIX command would probably not be able to get the job done when the log files are this big.
Note that the path given via the -hadoop option is where Dumbo will look for both the hadoop command (in the bin/ subdirectory) and the Hadoop Streaming jar. Since these might not always be in the same directory, you can also specify additional Hadoop jar search paths via the -hadooplib option. In case of CDH4, for instance, you’ll typically want to use the hadoop command from /usr/bin/ and the Hadoop Streaming jar from /usr/lib/hadoop-0.20-mapreduce/, which can easily be achieved by specifying -hadoop /usr and -hadooplib /usr/lib/hadoop-0.20-mapreduce.

Mapper and reducer classes

When you generate the list of the 5 most frequently occurring IPs, you might want to exclude certain IPs. Hence, it might be useful to extend the program above such that it reads a file excludes.txt consisting of IPs that need to be ignored:
$ head -n 3 excludes.txt
127.0.0.1
72.14.247.99
209.85.171.99
The following Dumbo program is such an extension of the previous program:
class Mapper:
    def __init__(self):
        file = open("excludes.txt", "r")
        self.excludes = set(line.strip() for line in file)
        file.close()
    def __call__(self, key, value):
        ip = value.split(" ")[0]
        if not ip in self.excludes:
            yield ip, 1

def reducer(key, values):
    yield key, sum(values)

if __name__ == "__main__":
    import dumbo
    dumbo.run(Mapper, reducer, combiner=reducer)
The main difference between this program and the previous one is that the mapper is a class in this case. Since an instance of this class is created just before the mapping is started, you can use the constructor for initializations like, e.g., reading a file. When running this program, you need to make sure that the file excludes.txt is put in the working directory on each cluster node. This can be done with the -file option:
$ dumbo start ipcount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* \
-output ipcounts -file excludes.txt
When the excludes file is very big, it would probably be better to use the -cacheFile option instead, but for small enough files -file is more convenient. You can find more info about -cacheFile (and several other useful options) in Running programs.
As you probably expected already, reducers and combiners can be classes as well. There is no example that illustrates this in this tutorial though, since the mechanism is completely analogous (and people tend to dislike reading lengthy tutorials).

Jobs and runners

Now, suppose that you want to find out for which IPs the daily counts are most often higher than a given number. The following Dumbo program dailycount.py can be used to generate this information:
class DailyMapper:
    def __init__(self):
        file = open("excludes.txt","r")
        self.excludes = set(line.strip() for line in file)
        file.close()
    def __call__(self, key, value):
        parts = value.split(" ")
        ip, date = parts[0], parts[3][1:].split(":")[0]
        if not ip in self.excludes:
            yield (ip, date), 1

class FilterMapper:
    def __init__(self):
        import os
        self.mincount = int(self.params["mincount"])
    def __call__(self, key, value):
        ip, date = key
        if value >= self.mincount:
            yield ip, 1

def reducer(key, values):
    yield key, sum(values)

if __name__ == "__main__":
    import dumbo
    job = dumbo.Job()
    job.additer(DailyMapper, reducer, combiner=reducer)
    job.additer(FilterMapper, reducer, combiner=reducer)
    job.run()
Running this program can be done as follows:
$ dumbo start dailycount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* \
-output ipcounts -file excludes.txt -param mincount=100
This example program illustrates how parameters can be be passed to Dumbo programs by means of the -param option, but even more interesting is that it consists of two MapReduce iterations. As shown by this program, a Job object can be used to register multiple iterations.
Note that you can also write
def runner(job):
    job.additer(DailyMapper, reducer, combiner=reducer)
    job.additer(FilterMapper, reducer, combiner=reducer)

if __name__ == "__main__":
    import dumbo
    dumbo.main(runner)
instead of
if __name__ == "__main__":
    import dumbo
    job = dumbo.Job()
    job.additer(DailyMapper, reducer, combiner=reducer)
    job.additer(FilterMapper, reducer, combiner=reducer)
    job.run()
On its own this might not make much of a difference, but using a runner does have an important advantage, namely, that you can also use a starter then.

Programs and starters

Starters are similar to runners, but instead of running a job they start a program. Less abstractly, a starter can simplify the start commands quite a lot. For instance, by adding the code
def starter(program):
    year = program.delopt("year")
    if not year:
        # an alternative (and probably better) way to
        # bail out is to raise dumbo.Error(msg)
        return "'year' not specified"

    month = program.delopt("month")
    if not month: return "'month' not specified"
    if len(month) == 1:
        month = "0" + month

    mincount = program.delopt("mincount")
    if not mincount:
        mincount = "100"

    program.addopt("input", "weblogs/%s/%s/*" % (year, month))
    program.addopt("param", "mincount=" + mincount)
    program.addopt("file", "excludes.txt")

if __name__ == "__main__":
    import dumbo
    dumbo.main(runner, starter)
to the example above, it can be started as follows:
$ dumbo start dailycount.py -hadoop /usr/local/hadoop -year 2008 -month 12 -output ipcounts
It can then also still be started without executing the starter by adding the option -starter no:
$ dumbo start dailycount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* \
-output ipcounts -file excludes.txt -param mincount=100 -starter no

Input formats

So far we have always taken text files as input, but Dumbo can deal with other file formats too. The -inputformat option allows you to specify the format of the input files. Possible values are:
  • text: text files
  • sequencefile: sequence files
  • auto: decide between text and sequencefile based on the file header (this is the default value)
  • <name_of_java_class>: use a custom InputFormat (this is rarely needed)
Since auto is the default value, it usually is not necessary to use the -inputformat option, but it is more safe to do so anyway because auto could be mislead when the first bytes of a text file happen to correspond to the sequence file header (which is very unlikely but possible nevertheless).
In case of text files, each input value is a string that corresponds to one line of the file and the keys are the offsets of the lines in the files (as Python integers). For sequence files, however, the type of the keys and values can differ from file to file. Most common writables are converted to suitable Python types, and the remaining writables are converted to a string by means of their toString() method. Hadoop records are converted to lists consisting of the values of their attributes.
Another possible value for the -inputformat option is code. This value was added for dealing with the files outputted by Dumbo, but since the output files on Hadoop really are sequence files containing Typed bytes objects, sequencefile and hence also auto work for these files as well. For local runs, however, it is necessary to use the option -inputformat code when you want to take output files as input, since Dumbo programs that run locally cannot deal with sequence files (and therefore their output is stored in special text files instead of sequence files). To print a file from HDFS as a “code” file for local processing, you can use dumbo cat with the option -ascode yes.

Eggs and jars

Python modules contained in eggs can be used in Dumbo programs by adding -libegg <path_to_egg> options. Similarly, jars can used by adding -libjar <path_to_jar> options. A common use case for this is when the input consists of sequence files that contain custom Hadoop records. In order to be able to read such input, the classes for the records need to be put on the classpath, and adding a -libjar for the jar(s) that contain(s) these classes is a possible way of doing this.

Further reading

Introduction to MapReduce with Hadoop on Linux

When your data and work grow, and you still want to produce results in a timely manner, you start to think big. Your one beefy server reaches its limits. You need a way to spread your work across many computers. You truly need to scale out.
In pioneer days they used oxen for heavy pulling, and when one ox couldn't budge a log, they didn't try to grow a larger ox. We shouldn't be trying for bigger computers, but for more systems of computers.—Grace Hopper
Clearly, cluster computing is old news. What's changed? Today:
  • We collect more data than ever before.
  • Even small-to-medium-size businesses can benefit from tools like Hadoop and MapReduce.
  • You don't have to have a PhD to create and use your own cluster.
  • Many decent free/libre open-source tools can help you easily cluster commodity hardware.
Let me start with some simple examples that will run on one machine and scale to meet larger demands. You can try them on your laptop and then transition to a larger cluster—like one you've built with commodity Linux machines, your company or university's Hadoop cluster or Amazon Elastic MapReduce.

Parallel Problems

Let's start with problems that can be divided into smaller independent units of work. These problems are roughly classified as "embarrassingly parallel" and are—as the term suggests—suitable for parallel processing. Examples:
  • Classify e-mail messages as spam.
  • Transcode video.
  • Render an Earth's worth of map tile images.
  • Count logged lines matching a pattern.
  • Figure out errors per day of week for a particular application.
Now the hard work begins. Parallel computing is complex. Race conditions, partial failure and synchronization impede our progress. Here's where MapReduce saves our proverbial bacon.

MapReduce by Example

MapReduce is a coding pattern that abstracts much of the tricky bits of scalable computations. We're free to focus on the problem at hand, but it takes practice. So let's practice!
Say you have 100 10GB log files from some custom application—roughly a petabyte of data. You do a quick test and estimate it will take your desktop days do grep every line (assuming you even could fit the data on your desktop). And, that's before you add in logic to group by host and calculate totals. Your tried-and-true shell utilities won't help, but MapReduce can handle this without breaking a sweat.
First let's look at the raw data. Log lines from the custom application look like this:

localhost: restarting
dsl5.example.com: invalid user 'bart'
dsl5.example.com: invalid user 'charlie'
dsl5.example.com: invalid user 'david'
dsl8.example.net: invalid password for user 'admin'
dsl8.example.net: user 'admin' logged in
The log format is hostname, colon, message. Your boss suspects someone evil is trying to brute-force attack the application. The same host trying many different user names may indicate an attack. He wants totals of "invalid user" messages grouped by hostname. Filtering the above log lines should yield:

dsl5.example.com        3
With gigabytes of log files, your trusty shell tools do just fine. For a terabyte, more power is needed. This is a job for Hadoop and MapReduce.
Before getting to Hadoop, let's summon some Python and test locally on a small dataset. I'm assuming you have a recent Python installed. I tested with Python 2.7.3 on Ubuntu 12.10.
The first program to write consumes log lines from our custom application. Let's call it map.py:

#!/usr/bin/python
import sys
for line in sys.stdin:
  if 'invalid user' in line:
    host = line.split(':')[0]
    print '%s\t%s' % (host, 1)
map.py prints the hostname, a tab character and the number 1 any time it sees a line containing the string "invalid user". Write the example log lines to log.txt, then test map.py:

chmod 755 map.py
./map.py < log.txt
The output is:

dsl5.example.com        1
dsl5.example.com        1
dsl5.example.com        1
Output of map.py will be piped into our next program, reduce.py:

#!/usr/bin/python
import sys
last_host = None
last_count = 0
host = None
for line in sys.stdin:
  host, count = line.split('\t')
  count = int(count)
  if last_host == host:
    last_count += count
  else:
    if last_host:
      print '%s\t%s' % (last_host, last_count)
    last_host = host
    last_count = count
if last_host == host:
  print '%s\t%s' % (last_host, last_count)
reduce.py totals up consecutive lines of a particular host. Let's assume lines are grouped by hostname. If we see the same hostname, we increment a total. If we encounter a different hostname, we print the total so far and reset the total and hostname. When we exhaust standard input, we print the total if necessary. This assumes lines with the same hostname always appear consecutively. They will, and I'll address why later. Test by piping it together with map.py like so:

chmod 755 reduce.py
./map.py < log.txt | sort | ./reduce.py
Later, I'll explain why I added sort to the pipeline. This prints:

dsl5.example.com        3
Exactly what we want. A successful test! Our test log lines contain three "invalid user" messages for the host dsl5.example.com. Later we'll get this local test running on a Hadoop cluster.
Let's dive a little deeper. What exactly does map.py do? It transforms unstructured log data into tab-separated key-value pairs. It emits a hostname for a key, a tab and the number 1 for a value (again, only for lines with "invalid user" messages). Note that any number of log lines could be fed to any number of instances of the map.py program—each line can be examined independently. Similarly, each output line of map.py can be examined independently.
Output from map.py becomes input for reduce.py. The output of reduce.py (hostname, tab, number) looks very similar to its input. This is by design. Key-value pairs may be reduced multiple times, so reduce.py must handle this gracefully. If we were to re-reduce our final answer, we would get the exact same result. This repeatable, predictable behavior of reduce.py is known as idempotence.
We just tested with one instance of reduce.py, but you could imagine many instances of reduce.py handling many lines of output from map.py. Note that this works only if lines with the same hostname appear consecutively. In our test, we enforce this constraint by adding sort to the pipeline. This simulates how our code behaves within Hadoop MapReduce. Hadoop will group and sort input to reduce.py similarly.
We don't have to bother with how execution will proceed and how many instances of map.py and reduce.py will run. We just follow the MapReduce pattern and Hadoop does the rest.