This short tutorial quickly guides you through the most important aspects of Dumbo.
p. When you run this program by executing the commands
Note that the path given via the -hadoop option is where Dumbo will look for both the hadoop command (in the bin/ subdirectory) and the Hadoop Streaming jar. Since these might not always be in the same directory, you can also specify additional Hadoop jar search paths via the -hadooplib option. In case of CDH4, for instance, you’ll typically want to use the hadoop command from /usr/bin/ and the Hadoop Streaming jar from /usr/lib/hadoop-0.20-mapreduce/, which can easily be achieved by specifying -hadoop /usr and -hadooplib /usr/lib/hadoop-0.20-mapreduce.
The main difference between this program and the previous one is that
the mapper is a class in this case. Since an instance of this class is
created just before the mapping is started, you can use the constructor
for initializations like, e.g., reading a file. When running this
program, you need to make sure that the file excludes.txt is put in the working directory on each cluster node. This can be done with the -file option:
As you probably expected already, reducers and combiners can be classes as well. There is no example that illustrates this in this tutorial though, since the mechanism is completely analogous (and people tend to dislike reading lengthy tutorials).
Running this program can be done as follows:
Note that you can also write
instead of
On its own this might not make much of a difference, but using a
runner does have an important advantage, namely, that you can also use a
starter then.
to the example above, it can be started as follows:
In case of text files, each input value is a string that corresponds to one line of the file and the keys are the offsets of the lines in the files (as Python integers). For sequence files, however, the type of the keys and values can differ from file to file. Most common writables are converted to suitable Python types, and the remaining writables are converted to a string by means of their toString() method. Hadoop records are converted to lists consisting of the values of their attributes.
Another possible value for the -inputformat option is code. This value was added for dealing with the files outputted by Dumbo, but since the output files on Hadoop really are sequence files containing Typed bytes objects, sequencefile and hence also auto work for these files as well. For local runs, however, it is necessary to use the option -inputformat code when you want to take output files as input, since Dumbo programs that run locally cannot deal with sequence files (and therefore their output is stored in special text files instead of sequence files). To print a file from HDFS as a “code” file for local processing, you can use dumbo cat with the option -ascode yes.
- Prerequisites
- Simple example: Counting IPs
- Mapper and reducer classes
- Jobs and runners
- Programs and starters
- Input formats
- Eggs and jars
- Further reading
Prerequisites
In the remainder of this tutorial, it is assumed that you successfully completed all installation steps described in Building and installing for a recent Dumbo version. This tutorial also requires you to already be (a bit) familiar with MapReduce, since the goal is to explain Dumbo, not MapReduce.Simple example: Counting IPs
Suppose that you want to generate a top five of the IPs that occur most frequently in a given Apache access log file access.log. In UNIX, this can be done as follows:$ cut -d ' ' -f 1 access.log | sort | uniq -c | sort -nr | head -n 5
The following Dumbo program ipcount.py provides an alternatives solution:def mapper(key, value):
yield value.split(" ")[0], 1
def reducer(key, values):
yield key, sum(values)
if __name__ == "__main__":
import dumbo
dumbo.run(mapper, reducer, combiner=reducer)
p. When you run this program by executing the commands
$ dumbo start ipcount.py -input access.log -output ipcounts
$ dumbo cat ipcounts | sort -k2,2nr | head -n 5
you actually do something very similar to the UNIX command given above. The mapper and reducer run as separate UNIX processes, and the output from the mapper process is piped through sort before it goes to the reducer process. However, by adding the -hadoop option, this Dumbo program can be run on a whole Hadoop cluster instead of a single UNIX
machine, which allows you to generate the top 5 for gigabytes or even
terabytes of weblogs. For instance, if Hadoop is installed in /usr/local/hadoop on the machine from which you run your jobs, and your weblogs are put in directories of the form weblogs/<year>/<month>/<day>/ on the HDFS, then you can generate the top 5 for December 2008 as follows:$ dumbo start ipcount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* -output ipcounts
$ dumbo cat ipcounts/part* -hadoop /usr/local/hadoop | sort -k2,2nr | head -n 5
When your Hadoop cluster is large enough, this will still work if
your website gets millions of page views every day, whereas the
corresponding UNIX command would probably not be able to get the job done when the log files are this big.Note that the path given via the -hadoop option is where Dumbo will look for both the hadoop command (in the bin/ subdirectory) and the Hadoop Streaming jar. Since these might not always be in the same directory, you can also specify additional Hadoop jar search paths via the -hadooplib option. In case of CDH4, for instance, you’ll typically want to use the hadoop command from /usr/bin/ and the Hadoop Streaming jar from /usr/lib/hadoop-0.20-mapreduce/, which can easily be achieved by specifying -hadoop /usr and -hadooplib /usr/lib/hadoop-0.20-mapreduce.
Mapper and reducer classes
When you generate the list of the 5 most frequently occurring IPs, you might want to exclude certain IPs. Hence, it might be useful to extend the program above such that it reads a file excludes.txt consisting of IPs that need to be ignored:$ head -n 3 excludes.txt
127.0.0.1
72.14.247.99
209.85.171.99
The following Dumbo program is such an extension of the previous program:class Mapper:
def __init__(self):
file = open("excludes.txt", "r")
self.excludes = set(line.strip() for line in file)
file.close()
def __call__(self, key, value):
ip = value.split(" ")[0]
if not ip in self.excludes:
yield ip, 1
def reducer(key, values):
yield key, sum(values)
if __name__ == "__main__":
import dumbo
dumbo.run(Mapper, reducer, combiner=reducer)
$ dumbo start ipcount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* \
-output ipcounts -file excludes.txt
When the excludes file is very big, it would probably be better to use the -cacheFile option instead, but for small enough files -file is more convenient. You can find more info about -cacheFile (and several other useful options) in Running programs.As you probably expected already, reducers and combiners can be classes as well. There is no example that illustrates this in this tutorial though, since the mechanism is completely analogous (and people tend to dislike reading lengthy tutorials).
Jobs and runners
Now, suppose that you want to find out for which IPs the daily counts are most often higher than a given number. The following Dumbo program dailycount.py can be used to generate this information:class DailyMapper:
def __init__(self):
file = open("excludes.txt","r")
self.excludes = set(line.strip() for line in file)
file.close()
def __call__(self, key, value):
parts = value.split(" ")
ip, date = parts[0], parts[3][1:].split(":")[0]
if not ip in self.excludes:
yield (ip, date), 1
class FilterMapper:
def __init__(self):
import os
self.mincount = int(self.params["mincount"])
def __call__(self, key, value):
ip, date = key
if value >= self.mincount:
yield ip, 1
def reducer(key, values):
yield key, sum(values)
if __name__ == "__main__":
import dumbo
job = dumbo.Job()
job.additer(DailyMapper, reducer, combiner=reducer)
job.additer(FilterMapper, reducer, combiner=reducer)
job.run()
$ dumbo start dailycount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* \
-output ipcounts -file excludes.txt -param mincount=100
This example program illustrates how parameters can be be passed to Dumbo programs by means of the -param option, but even more interesting is that it consists of two MapReduce iterations. As shown by this program, a Job object can be used to register multiple iterations.Note that you can also write
def runner(job):
job.additer(DailyMapper, reducer, combiner=reducer)
job.additer(FilterMapper, reducer, combiner=reducer)
if __name__ == "__main__":
import dumbo
dumbo.main(runner)
if __name__ == "__main__":
import dumbo
job = dumbo.Job()
job.additer(DailyMapper, reducer, combiner=reducer)
job.additer(FilterMapper, reducer, combiner=reducer)
job.run()
Programs and starters
Starters are similar to runners, but instead of running a job they start a program. Less abstractly, a starter can simplify the start commands quite a lot. For instance, by adding the codedef starter(program):
year = program.delopt("year")
if not year:
# an alternative (and probably better) way to
# bail out is to raise dumbo.Error(msg)
return "'year' not specified"
month = program.delopt("month")
if not month: return "'month' not specified"
if len(month) == 1:
month = "0" + month
mincount = program.delopt("mincount")
if not mincount:
mincount = "100"
program.addopt("input", "weblogs/%s/%s/*" % (year, month))
program.addopt("param", "mincount=" + mincount)
program.addopt("file", "excludes.txt")
if __name__ == "__main__":
import dumbo
dumbo.main(runner, starter)
$ dumbo start dailycount.py -hadoop /usr/local/hadoop -year 2008 -month 12 -output ipcounts
It can then also still be started without executing the starter by adding the option -starter no:$ dumbo start dailycount.py -hadoop /usr/local/hadoop -input weblogs/2008/12/* \
-output ipcounts -file excludes.txt -param mincount=100 -starter no
Input formats
So far we have always taken text files as input, but Dumbo can deal with other file formats too. The -inputformat option allows you to specify the format of the input files. Possible values are:- text: text files
- sequencefile: sequence files
- auto: decide between text and sequencefile based on the file header (this is the default value)
- <name_of_java_class>: use a custom InputFormat (this is rarely needed)
In case of text files, each input value is a string that corresponds to one line of the file and the keys are the offsets of the lines in the files (as Python integers). For sequence files, however, the type of the keys and values can differ from file to file. Most common writables are converted to suitable Python types, and the remaining writables are converted to a string by means of their toString() method. Hadoop records are converted to lists consisting of the values of their attributes.
Another possible value for the -inputformat option is code. This value was added for dealing with the files outputted by Dumbo, but since the output files on Hadoop really are sequence files containing Typed bytes objects, sequencefile and hence also auto work for these files as well. For local runs, however, it is necessary to use the option -inputformat code when you want to take output files as input, since Dumbo programs that run locally cannot deal with sequence files (and therefore their output is stored in special text files instead of sequence files). To print a file from HDFS as a “code” file for local processing, you can use dumbo cat with the option -ascode yes.
No comments:
Post a Comment