MapReduce is
a programming model to process large amount of data in parallel on large
clusters. A cluster can contain thousands of nodes (Assume each node as a
computer).
Previous
Next
Home
MapReduce Terminology
Job: Complete program, execution of mapper and reducer across dataset.
Task: Execution of a mapper (or) reducer on a piece of data.
For example,
for a file with 20 blocks.
Running word
count on file is one job.
20 blocks
are mapped to 20 map tasks + one or more reducer tasks.
MapReduce
model mainly comprised of two phases.
a.
Map
Phase
b.
Reduce
phase
Map Phase
It takes a
set of data and converts it to another set of data. For example, Map phase can
take a file, which contains temperature of a location for every minute for past
50 years like following as input data.
1965-06-15-01-01 28.9
1965-06-15-01-02 28.9
1965-06-15-01-03 28.9
1965-06-15-01-04 26.6
....
Each row in
the file represents, temperature for a day in particular minute.
Format details
1965-06-15-01-01 28.9
1965: Year
06: Month
15: Day
01: Hour
01: minute
28.9:
Temperature in Celsius
Map phase can
process given data and give the output as maximum temperature for given day
(excluding hours, minutes).
1965-06-15:
28.9
1965-06-16:
29.6
1965-06-17:
31
….
….
….
Reduce Phase
Reduce phase
take output of map phase as input and process it.
1965-06-15:
28.9
1965-06-16:
29.6
1965-06-17:
31
….
….
….
For example,
it takes the above output from map phase, and converts it to maximum
temperature for that year (excluding month and day).
1965: 31
1966: 34
1967: 32
….
….
MapReduce Job
A MapReduce
job usually splits the input data-set into independent chunks which are
processed by the map tasks in a completely parallel manner. Hadoop framework
collect the output of map task, and send it as input to reduce tasks.
To learn any
language/framework, the best way is to start writing application. Lets start writing
applications in Hadoop.
Example : WordCount Application : Take a file as input and count
number of occurrences of every word.
First I am
going to explain, complete theory, how mapper and reducer works.
Let’s say my
input file has following data.
input.txt
Hello How are you are you going to canada Hello are you there ya i am doing work Hi ptr How are you Hi buddy good day good day too very very good morning good night
Just assume
above file is of size 200MB. As we know, block size in Hadoop is 64MB. So
input.txt is divided into 4 blocks.
Block1 (64MB)
Hello How
are you
are you
going to canada
Block2 (64MB)
Hello are
you there
ya i am
doing work
Block3 (64MB)
Hi ptr How
are you
Hi buddy
good day
good day too
Block4 (8MB)
very very
good morning
good night
Assume each
block is stored in different datanodes like following.
Mapper Phase
Mapper
transforms input into intermediate form. The output of mapper goes to reducer.
User can control the which keys go to
which Reducer by implementing a custom Partitioner (I am going to explain it
later).
Users can optionally
specify a combiner, via Job.setCombinerClass(Class), to perform local
aggregation of the intermediate outputs, which helps to cut down the amount of
data transferred from the Mapper to the Reducer.
As per
convention, every line in the file treated as a record. Mapper function read
every line from file and converts it into key value pairs like following.
Block1
(Hello, 1)
(How, 1)
(are, 1)
(you, 1)
(are, 1)
(you, 1)
(going, 1)
(to, 1)
(canada, 1)
Block2
(Hello, 1)
(are, 1)
(you, 1)
(there, 1)
(ya, 1)
(i, 1)
(am, 1)
(doing, 1)
(work, 1)
Block3
(Hi, 1)
(ptr, 1)
(How, 1)
(are, 1)
(you, 1)
(Hi, 1)
(buddy, 1)
(good, 1)
(day, 1)
(good, 1)
(day, 1)
(too, 1)
Block4
(very, 1)
(very, 1)
(good, 1)
(morning, 1)
(good, 1)
(night, 1)
As you observe
the intermediate results of block1, “are”, “you” are repeated twice. If file is
very large, then we can have more duplicated key like these. We can combine
these intermediate results (Before sending to reducers) by using combiner.
After
combining these duplicated entries, intermediate results transformed like
following.
Block1
(Hello, 1)
(How, 1)
(are, 2)
(you, 2)
(going, 1)
(to, 1)
(canada, 1)
Block2
(Hello, 1)
(are, 1)
(you, 1)
(there, 1)
(ya, 1)
(i, 1)
(am, 1)
(doing, 1)
(work, 1)
Block3
(Hi, 2)
(ptr, 1)
(How, 1)
(are, 1)
(you, 1)
(buddy, 1)
(good, 2)
(day, 2)
(too, 1)
Block4
(very, 2)
(good, 2)
(morning, 1)
(night, 1)
This data is
sorted internally, before sending to reducer.
Block1
(Hello, 1)
(How, 1)
(are, 2)
(canada, 1)
(going, 1)
(to, 1)
(you, 2)
Block2
(am, 1)
(are, 1)
(doing, 1)
(Hello, 1)
(i, 1)
(there, 1)
(work, 1)
(ya, 1)
(you, 1)
Block3
(Hi, 2)
(How, 1)
(are, 1)
(buddy, 1)
(day, 2)
(good, 2)
(ptr, 1)
(too, 1)
(you, 1)
Block4
(good, 2)
(morning, 1)
(night, 1)
(very, 2)
Reducer Phase
Reducer can
run on one (or) more machines. Reducer takes the output of Mapper phase and
gives you final result.
Reducer has
three primary phases.
a.Shuffle
Copies the
sorted output from each Mapper using HTTP across the network.
b.Sort
Merge sorts
Reducer inputs by keys. After this step, data looks like following.
(Hello, [1,
1])
(Hi, [2])
(How, [1,
1])
(am, [1])
(are, [2, 1,
1])
(buddy, [1])
(canada,
[1])
(day, [2])
(doing, [1])
(going, [1])
(good, [2,
2])
(i, [1])
(morning,
[1])
(night, [1])
(ptr, [1])
(there, [1])
(to, [1])
(too, [1])
(very, [2])
(work, [1])
(ya, [1])
(you, [2, 1,
1])
c. reduce
reduce(Object,
Iterable, org.apache.hadoop.mapreduce.Reducer.Context) method is called for
each <key, (collection of values)> in the sorted inputs.
After this
step final output, looks like following.
Hello 2
Hi 2
How 2
am 1
are 4
buddy 1
canada 1
day 2
doing 1
going 1
good 4
i 1
morning 1
night 1
ptr 1
there 1
to 1
too 1
very 2
work 1
ya 1
you 4
I didn’t written
any new application; I am just going to explain the sample application given in
Hadoop site.
WordCount.java
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Follow the
steps, to run above MapReduce application.
Step 1: Set JAVA_HOME (If it is not set already)
Step 2: Set HADOOP_CLASSPATH like following
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
Step 3: Run following command to compile WordCount.java
hadoop com.sun.tools.javac.Main WordCount.java
Step 4: Create jar file
jar cf wc.jar WordCount*.class
Step 5: Run following commands
5.a. Create Home directory on HDFS for your
current user (if not created already)
hadoop
fs -mkdir -p /user/[current login user]
For my system, I used command like following.
hadoop
fs -mkdir -p /user/harikrishna_gurram
5.b.
Create "example1" directory
hadoop
fs -mkdir /user/harikrishna_gurram/example1
5.c.
Copy input.txt to "/user/harikrishna_gurram
"
hadoop fs -put
/Users/harikrishna_gurram/input.txt /user/harikrishna_gurram/input.txt
Step 6: Now run the jar file, that we created in step 2
hadoop jar wc.jar WordCount
/user/harikrishna_gurram/input.txt
/Users/harikrishna_gurram/results
Step 7: Once you run above command, output is stored in “/Users/harikrishna_gurram/results”.
Use ls
command to view all the contents of output directory.
$ hadoop fs
-ls /Users/harikrishna_gurram/results
-rw-r--r-- 3 harikrishna_gurram supergroup 0 2015-06-17 14:41
/Users/harikrishna_gurram/ results /_SUCCESS
-rw-r--r-- 3 harikrishna_gurram supergroup 495 2015-06-17 14:41
/Users/harikrishna_gurram/ results /part-r-00000
Use cat
command to view the contents of file.
$ hadoop fs
-cat /Users/harikrishna_gurram/results/part-r-00000
Hello 2
Hi 2
How 2
am 1
are 4
buddy 1
canada 1
day 2
doing 1
going 1
good 4
i 1
morning 1
night 1
ptr 1
there 1
to 1
too 1
very 2
work 1
ya 1
you 4
Detailed explanation about the program
public void map(Object key, Text value,
Context context)
throws
IOException, InterruptedException {
StringTokenizer itr =
new StringTokenizer(value.toString());
while
(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word,
one);
}
}
The Mapper
implementation, via the map method, processes one line at a time.
Stringztokenizer tokenize the lines, and emits a key-value pair of <
<word>, 1>. For given sample, first map emits
map1 emits
(Hello, 1)
(How, 1)
(are, 1)
(you, 1)
(are, 1)
(you, 1)
(going, 1)
(to, 1)
(canada, 1)
map2 emits
(Hello, 1)
(are, 1)
(you, 1)
(there, 1)
(ya, 1)
(i, 1)
(am, 1)
(doing, 1)
(work, 1)
map3 emits
(Hi, 1)
(ptr, 1)
(How, 1)
(are, 1)
(you, 1)
(Hi, 1)
(buddy, 1)
(good, 1)
(day, 1)
(good, 1)
(day, 1)
(too, 1)
map4 emits
(very, 1)
(very, 1)
(good, 1)
(morning, 1)
(good, 1)
(night, 1)
job.setCombinerClass(IntSumReducer.class);
Output of
each mapper passed through the local combiner for local aggregation. After
being sorted on keys, the output looks like.
Output of map1
(Hello, 1)
(How, 1)
(are, 2)
(canada, 1)
(going, 1)
(to, 1)
(you, 2)
Output of map2
(am, 1)
(are, 1)
(doing, 1)
(Hello, 1)
(i, 1)
(there, 1)
(work, 1)
(ya, 1)
(you, 1)
Output of map3
(Hi, 2)
(How, 1)
(are, 1)
(buddy, 1)
(day, 2)
(good, 2)
(ptr, 1)
(too, 1)
(you, 1)
Output of map4
(good, 2)
(morning, 1)
(night, 1)
(very, 2)
public void reduce(Text key,
Iterable<IntWritable> values,
Context
context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val
: values) {
sum +=
val.get();
}
result.set(sum);
context.write(key, result);
}
The Reducer
implementation, via the reduce method just sums up the values, which are the occurrence
counts for each key. The number of reduces for the job is set by the user via
Job.setNumReduceTasks(int). If you have one reducer, output is written to one output file. If you
have n reducers, it produces n output files.
Final output
of the job is.
Hello 2
Hi 2
How 2
am 1
are 4
buddy 1
canada 1
day 2
doing 1
going 1
good 4
i 1
morning 1
night 1
ptr 1
there 1
to 1
too 1
very 2
work 1
ya 1
you 4
To make you
comfortable with hadoop map reduce, next few posts shows some map reduce
programs. After those I am going to explain each concept in detail.
No comments:
Post a Comment