- Back to Home »
- Custom Partitioner
Posted by : Sushanth
Thursday, 24 December 2015
Input:
France Europe female 45000
France Europe male 55000
Spain Europe female 65000
India Asia female 155000
India Asia male 165000
Program:
package com.countrypart;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class CountryDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
// getting configuration object and setting job name
Job partition_job = getJobConfPartition();
String input1 = args[0];
// String input2 = args[1];
String output = args[1];
FileSystem fs = FileSystem.getLocal(partition_job.getConfiguration());
Path opPath = new Path(output);
fs.delete(opPath, true);
// FileInputFormat.setInputPaths(partition_job, new Path(input1),new
// Path(input2)); // setting
FileInputFormat.setInputPaths(partition_job, new Path(input1)); // setting
// the input
// files for
// the job
FileOutputFormat.setOutputPath(partition_job, new Path(output)); // setting
// the
// output
// files
// for
// the
// job
partition_job.waitForCompletion(true);
return 0;
}
protected abstract class JobInfo {
public abstract Class<?> getJarByClass();
public abstract Class<? extends Mapper> getMapperClass();
public abstract Class<? extends Reducer> getCombinerClass();
public abstract Class<? extends Reducer> getReducerClass();
public abstract Class<?> getOutputKeyClass();
public abstract Class<?> getOutputValueClass();
}
// method to get job configuration for the customized partitioning MapReduce
// program
private Job getJobConfPartition() throws Exception {
JobInfo jobInfo = new JobInfo() {
@Override
public Class<? extends Reducer> getCombinerClass() {
return null;
}
@Override
public Class<?> getJarByClass() {
return CountryDriver.class;
}
@Override
public Class<? extends Mapper> getMapperClass() {
return CountryMapper.class;
}
public Class<?> getOutputKeyClass() {
return Text.class;
}
public Class<?> getOutputValueClass() {
return Text.class;
}
public Class<? extends Reducer> getReducerClass() {
return CountryReducer.class;
}
};
Job job = setupJob("CountryPartition", jobInfo);
job.setPartitionerClass(CountryPartitioner.class);
// job.setPartitionerClass(HashPartitioner.class);
job.setInputFormatClass(TextInputFormat.class);
return job;
}
protected Job setupJob(String jobName, JobInfo jobInfo) throws Exception {
Job job = new Job(new Configuration(), jobName);
// set the several classes
job.setJarByClass(jobInfo.getJarByClass());
// set the mapper class
job.setMapperClass(jobInfo.getMapperClass());
// the combiner class is optional, so set it only if it is required by
// the program
if (jobInfo.getCombinerClass() != null)
job.setCombinerClass(jobInfo.getCombinerClass());
// set the reducer class
job.setReducerClass(jobInfo.getReducerClass());
// the number of reducers is set to 3, this can be altered according to
// the program's requirements
job.setNumReduceTasks(2);
// set the type of the output key and value for the Map & Reduce
// functions
job.setOutputKeyClass(jobInfo.getOutputKeyClass());
job.setOutputValueClass(jobInfo.getOutputValueClass());
return job;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner
.run(new Configuration(), new CountryDriver(), args);
System.exit(res);
}
}
package com.countrypart;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class CountryPartitioner extends Partitioner<Text, Text> {
public int getPartition(Text key, Text value, int numReduceTasks) {
String[] countrycensus = value.toString().split("\t");
String country = countrycensus[0];
// int census = Integer.parseInt(age);
System.out.println(numReduceTasks);
System.out.println(country);
// this is done to avoid performing mod with 0
if (numReduceTasks == 0)
return 0;
if (country.equals("France")) {
return 0;
}
if (country.equals("Spain")) {
return 1 % numReduceTasks;
} else
{
return 2 % numReduceTasks;
}
}
}
package com.countrypart;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class CountryMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String[] tokens = value.toString().split("\t");
System.out.println(tokens[0].toString() + " "
+ tokens[1].toString());
System.out.println(value.toString());
String gender = tokens[2].toString();
String countrycensus = tokens[0] + "\t" + tokens[3];
// String gendercensus = tokens[2]+"\t"+tokens[3];
// the mapper emits key, value pair where the key is the gender and
// the value is the other information which includes name, age and
// score
context.write(new Text(gender), new Text(countrycensus));
}
}
}
package com.countrypart;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//The data belonging to the same partition go to the same reducer. In a particular partition, all the values with the same key are iterated and the person with the maximum score is found.
//Therefore the output of the reducer will contain the male and female maximum scorers in each of the 3 age categories.
// the type parameters are the input keys type, the input values type, the
// output keys type, the output values type
public class CountryReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// int maxScore = Integer.MIN_VALUE;
String gender = " ";
// String census = " ";
// String gender = " ";
// iterating through the values corresponding to a particular key
for (Text val : values) {
String[] valTokens = val.toString().split("\t");
pcount = Integer.parseInt(valTokens[1]);
census = pcount + census;
// if the new score is greater than the current maximum score,
// update the fields as they will be the output of the reducer after
// all the values are processed for a particular key
gender = key.toString();
context.write(new Text(gender), new Text("census- " + census));
pcount = 0;
census = 0;
}
}