- Back to Home »
- Aggregate all the synonyms of a word(Using KeyVaueInputFormat)
Posted by : Sushanth
Thursday, 24 December 2015
Input:
Program:
Driver:
package dictionary;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DictionaryDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
// getting configuration object and setting job name
Configuration conf = getConf();
//set the delimiter to seperate input and output
//conf.set(
//"mapreduce.input.keyvaluelinerecordreader.key.value.separator",
//",");
conf.set("key.value.separator.in.input.line", ", ");
Job job = new Job(conf, "Word Count hadoop-0.20");
// setting the class names
job.setJarByClass(DictionaryDriver.class);
job.setMapperClass(DictionaryMapper.class);
job.setReducerClass(DictionaryReducer.class);
// setting the output data type classes
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
// to accept the hdfs input and outpur dir at run time
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DictionaryDriver(),
args);
System.exit(res);
}
}
Mapper:
package dictionary;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DictionaryMapper extends Mapper<Text, Text, Text, Text> {
private Text word = new Text();
public void map(Text key, Text value, Context context) throws IOException,
InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString(), ",");
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(key, word);
}
}
}
Reducer:
package com.dictionary;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DictionaryReducer
extends Reducer<Text,Text,Text,Text>
{
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException
{
String translations = "";
for (Text val : values)
{
translations += "|"+val.toString();
}
result.set(translations);
context.write(key, result);
}
}
In this example,keyvalueinputformat is set using below code
job.setInputFormatClass(KeyValueTextInputFormat.class);
The delimiter to separate the input data is
conf.set("key.value.separator.in.input.line", ", ");
Output: