Saturday 2 January 2016

Hadoop: custom partitioner

By using custom partitioner, we can send specific <key, value> pairs to specific reducers. Partitioning phase execute after mapping phase and before reduce phase.

For example, I had following data. My task is to find out highest salary of manager, SeniorSoftwareEngineer, SoftwareEngineer.

employee.txt
Vadiraj,Manager,150000
Shreyas,Manager,200000
Anand,Manager,100000
Mukund,Manager,500000
Kiran,SeniorSoftwareEngineer,70000
Gopi,SeniorSoftwareEngineer,55000
Sudheer,SeniorSoftwareEngineer,90000
Krishna,SeniorSoftwareEngineer,80000
Joel,SeniorSoftwareEngineer,65000
Hari,SoftwareEngineer,35000
PTR,SoftwareEngineer,79000
Shanmugham,SoftwareEngineer,65000
Pavan,SoftwareEngineer,45000
As shown in the above figure, by using custom Partitioner we can send all Managers data to reducer1, SeniorSoftwareEngineers data to reducer2 and all  SoftwareEngineers data to reducer3.

Step1: Following application returns maximum salary for each category of employee.


EmployeeSalary.java
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EmployeeSalary {
 public static class EmployeeMapper extends Mapper<Object, Text, Text, Text> {

  public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {

   StringTokenizer tokens = new StringTokenizer(value.toString(), ",");

   System.out.println("****" + value.toString());
   String name = tokens.nextToken();
   System.out.println("****" + name);
   String category = tokens.nextToken();
   System.out.println("****" + category);
   String salary = tokens.nextToken();
   System.out.println("****" + salary);

   context.write(new Text(category), new Text(name + "," + salary));
  }
 }

 public static class EmployeeReducer extends Reducer<Text, Text, Text, Text> {

  public void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {

   int maxSalary = 0;
   String empName = "";

   for (Text value : values) {
    StringTokenizer tokens = new StringTokenizer(value.toString(),
      ",");
    String name = tokens.nextToken();
    int salary = Integer.parseInt(tokens.nextToken());

    if (salary > maxSalary) {
     maxSalary = salary;
     empName = name;
    }
   }

   context.write(key, new Text(empName + "," + maxSalary));
  }
 }

 public static class CategoryPartitoner extends Partitioner<Text, Text> {

  @Override
  public int getPartition(Text key, Text value, int numPartitions) {
   /*
    * Send all managers data to reducer1, all SeniorSoftwareEngineers
    * data to reducer2, all SoftwareEngineers data to reducer3
    */
   if (key.toString().equalsIgnoreCase("Manager")) {
    return 0 % numPartitions;
   } else if (key.toString()
     .equalsIgnoreCase("SeniorSoftwareEngineer")) {
    return 1 % numPartitions;
   } else {
    return 2 % numPartitions;
   }
  }

 }

 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = Job.getInstance(conf, "Maximum salary finder");
  job.setJarByClass(EmployeeSalary.class);
  job.setMapperClass(EmployeeMapper.class);
  job.setReducerClass(EmployeeReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setPartitionerClass(CategoryPartitoner.class);
  job.setNumReduceTasks(3);
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

Step 2: Compile above java file.
$ hadoop com.sun.tools.javac.Main EmployeeSalary.java

Step 3: Create jar file.
$ jar cf emp.jar EmployeeSalary*class

Step 4: Run jar file.
$ hadoop jar emp.jar EmployeeSalary /user/harikrishna_gurram/employee.txt /user/harikrishna_gurram/salaries


Since there are 3 reducers, there are 3 output files located in /user/harikrishna_gurram/salaries.

$ hadoop fs -ls /user/harikrishna_gurram/salaries
Found 4 items
-rw-r--r--   3 harikrishna_gurram supergroup          0 2015-06-23 13:38 /user/harikrishna_gurram/salaries/_SUCCESS
-rw-r--r--   3 harikrishna_gurram supergroup         22 2015-06-23 13:38 /user/harikrishna_gurram/salaries/part-r-00000
-rw-r--r--   3 harikrishna_gurram supergroup         37 2015-06-23 13:38 /user/harikrishna_gurram/salaries/part-r-00001
-rw-r--r--   3 harikrishna_gurram supergroup         27 2015-06-23 13:38 /user/harikrishna_gurram/salaries/part-r-00002
$ 
$ hadoop fs -cat /user/harikrishna_gurram/salaries/part*
Manager Mukund,500000
SeniorSoftwareEngineer Sudheer,90000
SoftwareEngineer PTR,79000





Previous                                                 Next                                                 Home

No comments:

Post a Comment