Saturday 2 January 2016

Hadoop: MapReduce: word count application

MapReduce is a programming model to process large amount of data in parallel on large clusters. A cluster can contain thousands of nodes (Assume each node as a computer).

MapReduce Terminology
Job: Complete program, execution of mapper and reducer across dataset.

Task: Execution of a mapper (or) reducer on a piece of data.

For example, for a file with 20 blocks.
Running word count on file is one job.
20 blocks are mapped to 20 map tasks + one or more reducer tasks.

MapReduce model mainly comprised of two phases.
a.   Map Phase
b.   Reduce phase

Map Phase
It takes a set of data and converts it to another set of data. For example, Map phase can take a file, which contains temperature of a location for every minute for past 50 years like following as input data.

1965-06-15-01-01       28.9
1965-06-15-01-02       28.9
1965-06-15-01-03       28.9
1965-06-15-01-04       26.6
....

Each row in the file represents, temperature for a day in particular minute.

Format details
1965-06-15-01-01       28.9

1965: Year
06: Month
15: Day
01: Hour
01: minute
28.9: Temperature in Celsius

Map phase can process given data and give the output as maximum temperature for given day (excluding hours, minutes).

1965-06-15: 28.9
1965-06-16: 29.6
1965-06-17: 31
….
….
….

Reduce Phase
Reduce phase take output of map phase as input and process it.

1965-06-15: 28.9
1965-06-16: 29.6
1965-06-17: 31
….
….
….

For example, it takes the above output from map phase, and converts it to maximum temperature for that year (excluding month and day).

1965: 31
1966: 34
1967: 32
….
….

MapReduce Job
A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. Hadoop framework collect the output of map task, and send it as input to reduce tasks.

To learn any language/framework, the best way is to start writing application. Lets start writing applications in Hadoop.

Example : WordCount Application : Take a file as input and count number of occurrences of every word.

First I am going to explain, complete theory, how mapper and reducer works.

Let’s say my input file has following data.

input.txt

Hello How are you
are you going to canada
Hello are you there
ya i am doing work
Hi ptr How are you
Hi buddy good day
good day too
very very good morning
good night

Just assume above file is of size 200MB. As we know, block size in Hadoop is 64MB. So input.txt is divided into 4 blocks.

Block1 (64MB)
Hello How are you
are you going to canada

Block2 (64MB)
Hello are you there
ya i am doing work

Block3 (64MB)
Hi ptr How are you
Hi buddy good day
good day too

Block4 (8MB)
very very good morning
good night

Assume each block is stored in different datanodes like following.
Mapper Phase
Mapper transforms input into intermediate form. The output of mapper goes to reducer. User can control the which keys  go to which Reducer by implementing a custom Partitioner (I am going to explain it later).

Users can optionally specify a combiner, via Job.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.

As per convention, every line in the file treated as a record. Mapper function read every line from file and converts it into key value pairs like following.

Block1
(Hello, 1)
(How, 1)
(are, 1)
(you, 1)
(are, 1)
(you, 1)
(going, 1)
(to, 1)
(canada, 1)

Block2
(Hello, 1)
(are, 1)
(you, 1)
(there, 1)
(ya, 1)
(i, 1)
(am, 1)
(doing, 1)
(work, 1)

Block3
(Hi, 1)
(ptr, 1)
(How, 1)
(are, 1)
(you, 1)
(Hi, 1)
(buddy, 1)
(good, 1)
(day, 1)
(good, 1)
(day, 1)
(too, 1)

Block4
(very, 1)
(very, 1)
(good, 1)
(morning, 1)
(good, 1)
(night, 1)

As you observe the intermediate results of block1, “are”, “you” are repeated twice. If file is very large, then we can have more duplicated key like these. We can combine these intermediate results (Before sending to reducers) by using combiner.

After combining these duplicated entries, intermediate results transformed like following.

Block1
(Hello, 1)
(How, 1)
(are, 2)
(you, 2)
(going, 1)
(to, 1)
(canada, 1)

Block2
(Hello, 1)
(are, 1)
(you, 1)
(there, 1)
(ya, 1)
(i, 1)
(am, 1)
(doing, 1)
(work, 1)

Block3
(Hi, 2)
(ptr, 1)
(How, 1)
(are, 1)
(you, 1)
(buddy, 1)
(good, 2)
(day, 2)
(too, 1)

Block4
(very, 2)
(good, 2)
(morning, 1)
(night, 1)

This data is sorted internally, before sending to reducer.

Block1
(Hello, 1)
(How, 1)
(are, 2)
(canada, 1)
(going, 1)
(to, 1)
(you, 2)

Block2
(am, 1)
(are, 1)
(doing, 1)
(Hello, 1)
(i, 1)
(there, 1)
(work, 1)
(ya, 1)
(you, 1)

Block3
(Hi, 2)
(How, 1)
(are, 1)
(buddy, 1)
(day, 2)
(good, 2)
(ptr, 1)
(too, 1)
(you, 1)

Block4
(good, 2)
(morning, 1)
(night, 1)
(very, 2)

Reducer Phase
Reducer can run on one (or) more machines. Reducer takes the output of Mapper phase and gives you final result.

Reducer has three primary phases.
a.Shuffle
Copies the sorted output from each Mapper using HTTP across the network.

b.Sort
Merge sorts Reducer inputs by keys. After this step, data looks like following.
(Hello, [1, 1])
(Hi, [2])
(How, [1, 1])
(am, [1])
(are, [2, 1, 1])
(buddy, [1])
(canada, [1])
(day, [2])
(doing, [1])
(going, [1])
(good, [2, 2])
(i, [1])
(morning, [1])
(night, [1])
(ptr, [1])
(there, [1])
(to, [1])
(too, [1])
(very, [2])
(work, [1])
(ya, [1])
(you, [2, 1, 1])

c. reduce
reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context) method is called for each <key, (collection of values)> in the sorted inputs.

After this step final output, looks like following.
Hello 2
Hi      2
How  2
am    1
are    4
buddy        1
canada       1
day   2
doing         1
going         1
good 4
i        1
morning     1
night 1
ptr     1
there 1
to      1
too    1
very  2
work 1
ya     1
you   4


I didn’t written any new application; I am just going to explain the sample application given in Hadoop site.


WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

 public static class TokenizerMapper extends
   Mapper<Object, Text, Text, IntWritable> {

  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   StringTokenizer itr = new StringTokenizer(value.toString());
   while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
   }
  }
 }

 public static class IntSumReducer extends
   Reducer<Text, IntWritable, Text, IntWritable> {
  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values,
    Context context) throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : values) {
    sum += val.get();
   }
   result.set(sum);
   context.write(key, result);
  }
 }

 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = Job.getInstance(conf, "word count");
  job.setJarByClass(WordCount.class);
  job.setMapperClass(TokenizerMapper.class);
  job.setCombinerClass(IntSumReducer.class);
  job.setReducerClass(IntSumReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}


Follow the steps, to run above MapReduce application.

Step 1: Set JAVA_HOME (If it is not set already)

Step 2: Set HADOOP_CLASSPATH like following
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

Step 3: Run following command to compile WordCount.java
hadoop com.sun.tools.javac.Main WordCount.java

Step 4: Create jar file
jar cf wc.jar WordCount*.class

Step 5: Run following commands
5.a. Create Home directory on HDFS for your current user (if not created already)
         hadoop fs -mkdir -p /user/[current login user]
        
For my system, I used command like following.
         hadoop fs -mkdir -p /user/harikrishna_gurram
        
         5.b. Create "example1" directory
         hadoop fs -mkdir /user/harikrishna_gurram/example1
        
         5.c. Copy input.txt to "/user/harikrishna_gurram "
hadoop fs -put /Users/harikrishna_gurram/input.txt /user/harikrishna_gurram/input.txt

        
Step 6: Now run the jar file, that we created in step 2
hadoop jar wc.jar WordCount /user/harikrishna_gurram/input.txt  /Users/harikrishna_gurram/results

Step 7: Once you run above command, output is stored in “/Users/harikrishna_gurram/results”.

Use ls command to view all the contents of output directory.

$ hadoop fs -ls /Users/harikrishna_gurram/results
-rw-r--r--   3 harikrishna_gurram supergroup          0 2015-06-17 14:41 /Users/harikrishna_gurram/ results /_SUCCESS
-rw-r--r--   3 harikrishna_gurram supergroup        495 2015-06-17 14:41 /Users/harikrishna_gurram/ results /part-r-00000

Use cat command to view the contents of file.

$ hadoop fs -cat /Users/harikrishna_gurram/results/part-r-00000
Hello 2
Hi      2
How  2
am    1
are    4
buddy        1
canada       1
day   2
doing         1
going         1
good 4
i        1
morning     1
night 1
ptr     1
there 1
to      1
too    1
very  2
work 1
ya     1
you   4

Detailed explanation about the program
         public void map(Object key, Text value, Context context)
                                    throws IOException, InterruptedException {
                           StringTokenizer itr = new StringTokenizer(value.toString());
                           while (itr.hasMoreTokens()) {
                                    word.set(itr.nextToken());
                                    context.write(word, one);
                           }
                  }

The Mapper implementation, via the map method, processes one line at a time. Stringztokenizer tokenize the lines, and emits a key-value pair of < <word>, 1>. For given sample, first map emits

map1 emits
(Hello, 1)
(How, 1)
(are, 1)
(you, 1)
(are, 1)
(you, 1)
(going, 1)
(to, 1)
(canada, 1)

map2 emits
(Hello, 1)
(are, 1)
(you, 1)
(there, 1)
(ya, 1)
(i, 1)
(am, 1)
(doing, 1)
(work, 1)

map3 emits
(Hi, 1)
(ptr, 1)
(How, 1)
(are, 1)
(you, 1)
(Hi, 1)
(buddy, 1)
(good, 1)
(day, 1)
(good, 1)
(day, 1)
(too, 1)

map4 emits
(very, 1)
(very, 1)
(good, 1)
(morning, 1)
(good, 1)
(night, 1)

job.setCombinerClass(IntSumReducer.class);
Output of each mapper passed through the local combiner for local aggregation. After being sorted on keys, the output looks like.

Output of map1
(Hello, 1)
(How, 1)
(are, 2)
(canada, 1)
(going, 1)
(to, 1)
(you, 2)

Output of map2
(am, 1)
(are, 1)
(doing, 1)
(Hello, 1)
(i, 1)
(there, 1)
(work, 1)
(ya, 1)
(you, 1)


Output of map3
(Hi, 2)
(How, 1)
(are, 1)
(buddy, 1)
(day, 2)
(good, 2)
(ptr, 1)
(too, 1)
(you, 1)


Output of map4
(good, 2)
(morning, 1)
(night, 1)
(very, 2)

         public void reduce(Text key, Iterable<IntWritable> values,
                                    Context context) throws IOException, InterruptedException {
                           int sum = 0;
                           for (IntWritable val : values) {
                                    sum += val.get();
                           }
                           result.set(sum);
                           context.write(key, result);
                  }

The Reducer implementation, via the reduce method just sums up the values, which are the occurrence counts for each key. The number of reduces for the job is set by the user via Job.setNumReduceTasks(int). If you have one reducer,  output is written to one output file. If you have n reducers, it produces n output files.

Final output of the job is.

Hello 2
Hi      2
How  2
am    1
are    4
buddy        1
canada       1
day   2
doing         1
going         1
good 4
i        1
morning     1
night 1
ptr     1
there 1
to      1
too    1
very  2
work 1
ya     1
you   4

To make you comfortable with hadoop map reduce, next few posts shows some map reduce programs. After those I am going to explain each concept in detail.






Previous                                                 Next                                                 Home

No comments:

Post a Comment