This short tutorial quickly guides you through the most important aspects of Dumbo.
- Prerequisites
- Simple example: Counting IPs
- Mapper and reducer classes
- Jobs and runners
- Programs and starters
- Input formats
- Eggs and jars
- Further reading
Prerequisites
In the remainder of this tutorial, it is assumed that you successfully completed all installation steps described in
Building and installing for a recent Dumbo version. This tutorial also requires you to already be (a bit) familiar with
MapReduce, since the goal is to explain Dumbo, not MapReduce.
Simple example: Counting IPs
Suppose that you want to generate a top five of the IPs that occur most frequently in a given
Apache access log file access.log. In
UNIX, this can be done as follows:
$ cut -d ' ' -f 1 access.log | sort | uniq -c | sort -nr | head -n 5
The following Dumbo program
ipcount.py provides an alternatives solution:
def mapper(key, value):
yield value.split(" ")[0], 1
def reducer(key, values):
yield key, sum(values)
if __name__ == "__main__":
import dumbo
dumbo.run(mapper, reducer, combiner=reducer)
p. When you run this program by executing the commands
$ dumbo start ipcount.py -input access.log -output ipcounts
$ dumbo cat ipcounts | sort -k2,2nr | head -n 5
you actually do something very similar to the
UNIX command given above. The mapper and reducer run as separate
UNIX processes, and the output from the mapper process is piped through
sort before it goes to the reducer process. However, by adding the
-hadoop option, this Dumbo program can be run on a whole
Hadoop cluster instead of a single
UNIX
machine, which allows you to generate the top 5 for gigabytes or even
terabytes of weblogs. For instance, if Hadoop is installed in
/usr/local/hadoop on the machine from which you run your jobs, and your weblogs are put in directories of the form
weblogs/<year>/<month>/<day>/ on the
HDFS, then you can generate the top 5 for December 2008 as follows:
$ dumbo start ipcount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* -output ipcounts
$ dumbo cat ipcounts/part* -hadoop /usr/local/hadoop | sort -k2,2nr | head -n 5
When your Hadoop cluster is large enough, this will still work if
your website gets millions of page views every day, whereas the
corresponding
UNIX command would probably not be able to get the job done when the log files are this big.
Note that the path given via the
-hadoop option is where Dumbo will look for both the
hadoop command (in the
bin/
subdirectory) and the Hadoop Streaming jar. Since these might not
always be in the same directory, you can also specify additional Hadoop
jar search paths via the
-hadooplib option. In case of
CDH4, for instance, you’ll typically want to use the
hadoop command from
/usr/bin/ and the Hadoop Streaming jar from
/usr/lib/hadoop-0.20-mapreduce/, which can easily be achieved by specifying
-hadoop /usr and
-hadooplib /usr/lib/hadoop-0.20-mapreduce.
Mapper and reducer classes
When you generate the list of the 5 most frequently occurring IPs,
you might want to exclude certain IPs. Hence, it might be useful to
extend the program above such that it reads a file
excludes.txt consisting of IPs that need to be ignored:
$ head -n 3 excludes.txt
127.0.0.1
72.14.247.99
209.85.171.99
The following Dumbo program is such an extension of the previous program:
class Mapper:
def __init__(self):
file = open("excludes.txt", "r")
self.excludes = set(line.strip() for line in file)
file.close()
def __call__(self, key, value):
ip = value.split(" ")[0]
if not ip in self.excludes:
yield ip, 1
def reducer(key, values):
yield key, sum(values)
if __name__ == "__main__":
import dumbo
dumbo.run(Mapper, reducer, combiner=reducer)
The main difference between this program and the previous one is that
the mapper is a class in this case. Since an instance of this class is
created just before the mapping is started, you can use the constructor
for initializations like, e.g., reading a file. When running this
program, you need to make sure that the file
excludes.txt is put in the working directory on each cluster node. This can be done with the
-file option:
$ dumbo start ipcount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* \
-output ipcounts -file excludes.txt
When the excludes file is very big, it would probably be better to use the
-cacheFile option instead, but for small enough files
-file is more convenient. You can find more info about
-cacheFile (and several other useful options) in
Running programs.
As you probably expected already, reducers and combiners can be
classes as well. There is no example that illustrates this in this
tutorial though, since the mechanism is completely analogous (and people
tend to dislike reading lengthy tutorials).
Jobs and runners
Now, suppose that you want to find out for which IPs the daily counts
are most often higher than a given number. The following Dumbo program
dailycount.py can be used to generate this information:
class DailyMapper:
def __init__(self):
file = open("excludes.txt","r")
self.excludes = set(line.strip() for line in file)
file.close()
def __call__(self, key, value):
parts = value.split(" ")
ip, date = parts[0], parts[3][1:].split(":")[0]
if not ip in self.excludes:
yield (ip, date), 1
class FilterMapper:
def __init__(self):
import os
self.mincount = int(self.params["mincount"])
def __call__(self, key, value):
ip, date = key
if value >= self.mincount:
yield ip, 1
def reducer(key, values):
yield key, sum(values)
if __name__ == "__main__":
import dumbo
job = dumbo.Job()
job.additer(DailyMapper, reducer, combiner=reducer)
job.additer(FilterMapper, reducer, combiner=reducer)
job.run()
Running this program can be done as follows:
$ dumbo start dailycount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* \
-output ipcounts -file excludes.txt -param mincount=100
This example program illustrates how parameters can be be passed to Dumbo programs by means of the
-param option, but even more interesting is that it consists of two MapReduce iterations. As shown by this program, a
Job object can be used to register multiple iterations.
Note that you can also write
def runner(job):
job.additer(DailyMapper, reducer, combiner=reducer)
job.additer(FilterMapper, reducer, combiner=reducer)
if __name__ == "__main__":
import dumbo
dumbo.main(runner)
instead of
if __name__ == "__main__":
import dumbo
job = dumbo.Job()
job.additer(DailyMapper, reducer, combiner=reducer)
job.additer(FilterMapper, reducer, combiner=reducer)
job.run()
On its own this might not make much of a difference, but using a
runner does have an important advantage, namely, that you can also use a
starter then.
Programs and starters
Starters are similar to runners, but instead of running a job they
start a program. Less abstractly, a starter can simplify the start
commands quite a lot. For instance, by adding the code
def starter(program):
year = program.delopt("year")
if not year:
# an alternative (and probably better) way to
# bail out is to raise dumbo.Error(msg)
return "'year' not specified"
month = program.delopt("month")
if not month: return "'month' not specified"
if len(month) == 1:
month = "0" + month
mincount = program.delopt("mincount")
if not mincount:
mincount = "100"
program.addopt("input", "weblogs/%s/%s/*" % (year, month))
program.addopt("param", "mincount=" + mincount)
program.addopt("file", "excludes.txt")
if __name__ == "__main__":
import dumbo
dumbo.main(runner, starter)
to the example above, it can be started as follows:
$ dumbo start dailycount.py -hadoop /usr/local/hadoop -year 2008 -month 12 -output ipcounts
It can then also still be started without executing the starter by adding the option
-starter no:
$ dumbo start dailycount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* \
-output ipcounts -file excludes.txt -param mincount=100 -starter no
Input formats
So far we have always taken text files as input, but Dumbo can deal with other file formats too. The
-inputformat option allows you to specify the format of the input files. Possible values are:
-
text: text files
-
sequencefile: sequence files
-
auto: decide between text and sequencefile based on the file header (this is the default value)
-
<name_of_java_class>: use a custom InputFormat (this is rarely needed)
Since
auto is the default value, it usually is not necessary to use the
-inputformat option, but it is more safe to do so anyway because
auto
could be mislead when the first bytes of a text file happen to
correspond to the sequence file header (which is very unlikely but
possible nevertheless).
In case of text files, each input value is a string that corresponds
to one line of the file and the keys are the offsets of the lines in the
files (as Python integers). For sequence files, however, the type of
the keys and values can differ from file to file. Most common
writables are converted to suitable Python types, and the remaining writables are converted to a string by means of their
toString() method.
Hadoop records are converted to lists consisting of the values of their attributes.
Another possible value for the
-inputformat option is
code.
This value was added for dealing with the files outputted by Dumbo, but
since the output files on Hadoop really are sequence files containing
Typed bytes objects,
sequencefile and hence also
auto work for these files as well. For local runs, however, it is necessary to use the option
-inputformat code
when you want to take output files as input, since Dumbo programs that
run locally cannot deal with sequence files (and therefore their output
is stored in special text files instead of sequence files). To print a
file from
HDFS as a “code” file for local processing, you can use
dumbo cat with the option
-ascode yes.
Eggs and jars
Python modules contained in
eggs can be used in Dumbo programs by adding
-libegg <path_to_egg> options. Similarly, jars can used by adding
-libjar <path_to_jar>
options. A common use case for this is when the input consists of
sequence files that contain custom Hadoop records. In order to be able
to read such input, the classes for the records need to be put on the
classpath, and adding a
-libjar for the jar(s) that contain(s) these classes is a possible way of doing this.
Further reading