By using
custom partitioner, we can send specific <key, value> pairs to specific
reducers. Partitioning phase execute after mapping phase and before reduce
phase.
For example,
I had following data. My task is to find out highest salary of manager, SeniorSoftwareEngineer,
SoftwareEngineer.
employee.txt
Vadiraj,Manager,150000
Shreyas,Manager,200000
Anand,Manager,100000
Mukund,Manager,500000
Kiran,SeniorSoftwareEngineer,70000
Gopi,SeniorSoftwareEngineer,55000
Sudheer,SeniorSoftwareEngineer,90000
Krishna,SeniorSoftwareEngineer,80000
Joel,SeniorSoftwareEngineer,65000
Hari,SoftwareEngineer,35000
PTR,SoftwareEngineer,79000
Shanmugham,SoftwareEngineer,65000
Pavan,SoftwareEngineer,45000
As shown in
the above figure, by using custom Partitioner we can send all Managers data to
reducer1, SeniorSoftwareEngineers data to
reducer2 and all SoftwareEngineers data to reducer3.
Step1: Following application returns maximum salary for
each category of employee.
EmployeeSalary.java
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class EmployeeSalary { public static class EmployeeMapper extends Mapper<Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokens = new StringTokenizer(value.toString(), ","); System.out.println("****" + value.toString()); String name = tokens.nextToken(); System.out.println("****" + name); String category = tokens.nextToken(); System.out.println("****" + category); String salary = tokens.nextToken(); System.out.println("****" + salary); context.write(new Text(category), new Text(name + "," + salary)); } } public static class EmployeeReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int maxSalary = 0; String empName = ""; for (Text value : values) { StringTokenizer tokens = new StringTokenizer(value.toString(), ","); String name = tokens.nextToken(); int salary = Integer.parseInt(tokens.nextToken()); if (salary > maxSalary) { maxSalary = salary; empName = name; } } context.write(key, new Text(empName + "," + maxSalary)); } } public static class CategoryPartitoner extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numPartitions) { /* * Send all managers data to reducer1, all SeniorSoftwareEngineers * data to reducer2, all SoftwareEngineers data to reducer3 */ if (key.toString().equalsIgnoreCase("Manager")) { return 0 % numPartitions; } else if (key.toString() .equalsIgnoreCase("SeniorSoftwareEngineer")) { return 1 % numPartitions; } else { return 2 % numPartitions; } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Maximum salary finder"); job.setJarByClass(EmployeeSalary.class); job.setMapperClass(EmployeeMapper.class); job.setReducerClass(EmployeeReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setPartitionerClass(CategoryPartitoner.class); job.setNumReduceTasks(3); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Step 2: Compile above java file.
$ hadoop
com.sun.tools.javac.Main EmployeeSalary.java
Step 3: Create jar file.
$ jar cf
emp.jar EmployeeSalary*class
Step 4: Run jar file.
$ hadoop jar
emp.jar EmployeeSalary /user/harikrishna_gurram/employee.txt
/user/harikrishna_gurram/salaries
Since there
are 3 reducers, there are 3 output files located in /user/harikrishna_gurram/salaries.
$ hadoop fs -ls /user/harikrishna_gurram/salaries Found 4 items -rw-r--r-- 3 harikrishna_gurram supergroup 0 2015-06-23 13:38 /user/harikrishna_gurram/salaries/_SUCCESS -rw-r--r-- 3 harikrishna_gurram supergroup 22 2015-06-23 13:38 /user/harikrishna_gurram/salaries/part-r-00000 -rw-r--r-- 3 harikrishna_gurram supergroup 37 2015-06-23 13:38 /user/harikrishna_gurram/salaries/part-r-00001 -rw-r--r-- 3 harikrishna_gurram supergroup 27 2015-06-23 13:38 /user/harikrishna_gurram/salaries/part-r-00002 $ $ hadoop fs -cat /user/harikrishna_gurram/salaries/part* Manager Mukund,500000 SeniorSoftwareEngineer Sudheer,90000 SoftwareEngineer PTR,79000
No comments:
Post a Comment