Sunday, June 9, 2013

Short tutorial on Dumbo with MapReduce


This short tutorial quickly guides you through the most important aspects of Dumbo.
  1. Prerequisites
  2. Simple example: Counting IPs
  3. Mapper and reducer classes
  4. Jobs and runners
  5. Programs and starters
  6. Input formats
  7. Eggs and jars
  8. Further reading

Prerequisites

In the remainder of this tutorial, it is assumed that you successfully completed all installation steps described in Building and installing for a recent Dumbo version. This tutorial also requires you to already be (a bit) familiar with MapReduce, since the goal is to explain Dumbo, not MapReduce.

Simple example: Counting IPs

Suppose that you want to generate a top five of the IPs that occur most frequently in a given Apache access log file access.log. In UNIX, this can be done as follows:
$ cut -d ' ' -f 1 access.log | sort | uniq -c | sort -nr | head -n 5
The following Dumbo program ipcount.py provides an alternatives solution:
def mapper(key, value):
    yield value.split(" ")[0], 1

def reducer(key, values):
    yield key, sum(values)

if __name__ == "__main__":
    import dumbo
    dumbo.run(mapper, reducer, combiner=reducer)

p. When you run this program by executing the commands
$ dumbo start ipcount.py -input access.log -output ipcounts
$ dumbo cat ipcounts | sort -k2,2nr | head -n 5 
you actually do something very similar to the UNIX command given above. The mapper and reducer run as separate UNIX processes, and the output from the mapper process is piped through sort before it goes to the reducer process. However, by adding the -hadoop option, this Dumbo program can be run on a whole Hadoop cluster instead of a single UNIX machine, which allows you to generate the top 5 for gigabytes or even terabytes of weblogs. For instance, if Hadoop is installed in /usr/local/hadoop on the machine from which you run your jobs, and your weblogs are put in directories of the form weblogs/<year>/<month>/<day>/ on the HDFS, then you can generate the top 5 for December 2008 as follows:
$ dumbo start ipcount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* -output ipcounts
$ dumbo cat ipcounts/part* -hadoop /usr/local/hadoop | sort -k2,2nr | head -n 5 
When your Hadoop cluster is large enough, this will still work if your website gets millions of page views every day, whereas the corresponding UNIX command would probably not be able to get the job done when the log files are this big.
Note that the path given via the -hadoop option is where Dumbo will look for both the hadoop command (in the bin/ subdirectory) and the Hadoop Streaming jar. Since these might not always be in the same directory, you can also specify additional Hadoop jar search paths via the -hadooplib option. In case of CDH4, for instance, you’ll typically want to use the hadoop command from /usr/bin/ and the Hadoop Streaming jar from /usr/lib/hadoop-0.20-mapreduce/, which can easily be achieved by specifying -hadoop /usr and -hadooplib /usr/lib/hadoop-0.20-mapreduce.

Mapper and reducer classes

When you generate the list of the 5 most frequently occurring IPs, you might want to exclude certain IPs. Hence, it might be useful to extend the program above such that it reads a file excludes.txt consisting of IPs that need to be ignored:
$ head -n 3 excludes.txt
127.0.0.1
72.14.247.99
209.85.171.99
The following Dumbo program is such an extension of the previous program:
class Mapper:
    def __init__(self):
        file = open("excludes.txt", "r")
        self.excludes = set(line.strip() for line in file)
        file.close()
    def __call__(self, key, value):
        ip = value.split(" ")[0]
        if not ip in self.excludes:
            yield ip, 1

def reducer(key, values):
    yield key, sum(values)

if __name__ == "__main__":
    import dumbo
    dumbo.run(Mapper, reducer, combiner=reducer)
The main difference between this program and the previous one is that the mapper is a class in this case. Since an instance of this class is created just before the mapping is started, you can use the constructor for initializations like, e.g., reading a file. When running this program, you need to make sure that the file excludes.txt is put in the working directory on each cluster node. This can be done with the -file option:
$ dumbo start ipcount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* \
-output ipcounts -file excludes.txt
When the excludes file is very big, it would probably be better to use the -cacheFile option instead, but for small enough files -file is more convenient. You can find more info about -cacheFile (and several other useful options) in Running programs.
As you probably expected already, reducers and combiners can be classes as well. There is no example that illustrates this in this tutorial though, since the mechanism is completely analogous (and people tend to dislike reading lengthy tutorials).

Jobs and runners

Now, suppose that you want to find out for which IPs the daily counts are most often higher than a given number. The following Dumbo program dailycount.py can be used to generate this information:
class DailyMapper:
    def __init__(self):
        file = open("excludes.txt","r")
        self.excludes = set(line.strip() for line in file)
        file.close()
    def __call__(self, key, value):
        parts = value.split(" ")
        ip, date = parts[0], parts[3][1:].split(":")[0]
        if not ip in self.excludes:
            yield (ip, date), 1

class FilterMapper:
    def __init__(self):
        import os
        self.mincount = int(self.params["mincount"])
    def __call__(self, key, value):
        ip, date = key
        if value >= self.mincount:
            yield ip, 1

def reducer(key, values):
    yield key, sum(values)

if __name__ == "__main__":
    import dumbo
    job = dumbo.Job()
    job.additer(DailyMapper, reducer, combiner=reducer)
    job.additer(FilterMapper, reducer, combiner=reducer)
    job.run()
Running this program can be done as follows:
$ dumbo start dailycount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* \
-output ipcounts -file excludes.txt -param mincount=100
This example program illustrates how parameters can be be passed to Dumbo programs by means of the -param option, but even more interesting is that it consists of two MapReduce iterations. As shown by this program, a Job object can be used to register multiple iterations.
Note that you can also write
def runner(job):
    job.additer(DailyMapper, reducer, combiner=reducer)
    job.additer(FilterMapper, reducer, combiner=reducer)

if __name__ == "__main__":
    import dumbo
    dumbo.main(runner)
instead of
if __name__ == "__main__":
    import dumbo
    job = dumbo.Job()
    job.additer(DailyMapper, reducer, combiner=reducer)
    job.additer(FilterMapper, reducer, combiner=reducer)
    job.run()
On its own this might not make much of a difference, but using a runner does have an important advantage, namely, that you can also use a starter then.

Programs and starters

Starters are similar to runners, but instead of running a job they start a program. Less abstractly, a starter can simplify the start commands quite a lot. For instance, by adding the code
def starter(program):
    year = program.delopt("year")
    if not year:
        # an alternative (and probably better) way to
        # bail out is to raise dumbo.Error(msg)
        return "'year' not specified"

    month = program.delopt("month")
    if not month: return "'month' not specified"
    if len(month) == 1:
        month = "0" + month

    mincount = program.delopt("mincount")
    if not mincount:
        mincount = "100"

    program.addopt("input", "weblogs/%s/%s/*" % (year, month))
    program.addopt("param", "mincount=" + mincount)
    program.addopt("file", "excludes.txt")

if __name__ == "__main__":
    import dumbo
    dumbo.main(runner, starter)
to the example above, it can be started as follows:
$ dumbo start dailycount.py -hadoop /usr/local/hadoop -year 2008 -month 12 -output ipcounts
It can then also still be started without executing the starter by adding the option -starter no:
$ dumbo start dailycount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* \
-output ipcounts -file excludes.txt -param mincount=100 -starter no

Input formats

So far we have always taken text files as input, but Dumbo can deal with other file formats too. The -inputformat option allows you to specify the format of the input files. Possible values are:
  • text: text files
  • sequencefile: sequence files
  • auto: decide between text and sequencefile based on the file header (this is the default value)
  • <name_of_java_class>: use a custom InputFormat (this is rarely needed)
Since auto is the default value, it usually is not necessary to use the -inputformat option, but it is more safe to do so anyway because auto could be mislead when the first bytes of a text file happen to correspond to the sequence file header (which is very unlikely but possible nevertheless).
In case of text files, each input value is a string that corresponds to one line of the file and the keys are the offsets of the lines in the files (as Python integers). For sequence files, however, the type of the keys and values can differ from file to file. Most common writables are converted to suitable Python types, and the remaining writables are converted to a string by means of their toString() method. Hadoop records are converted to lists consisting of the values of their attributes.
Another possible value for the -inputformat option is code. This value was added for dealing with the files outputted by Dumbo, but since the output files on Hadoop really are sequence files containing Typed bytes objects, sequencefile and hence also auto work for these files as well. For local runs, however, it is necessary to use the option -inputformat code when you want to take output files as input, since Dumbo programs that run locally cannot deal with sequence files (and therefore their output is stored in special text files instead of sequence files). To print a file from HDFS as a “code” file for local processing, you can use dumbo cat with the option -ascode yes.

Eggs and jars

Python modules contained in eggs can be used in Dumbo programs by adding -libegg <path_to_egg> options. Similarly, jars can used by adding -libjar <path_to_jar> options. A common use case for this is when the input consists of sequence files that contain custom Hadoop records. In order to be able to read such input, the classes for the records need to be put on the classpath, and adding a -libjar for the jar(s) that contain(s) these classes is a possible way of doing this.

Further reading

No comments:

Post a Comment