Posted by : Sushanth Thursday, 24 December 2015


Input:

Program:

Driver:
package dictionary;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DictionaryDriver extends Configured implements Tool {
       public int run(String[] args) throws Exception {
              // getting configuration object and setting job name
              Configuration conf = getConf();
             
              //set the delimiter to seperate input and output
              //conf.set(
                           //"mapreduce.input.keyvaluelinerecordreader.key.value.separator",
                           //",");
              conf.set("key.value.separator.in.input.line"", ");
              Job job = new Job(conf, "Word Count hadoop-0.20");

              // setting the class names
              job.setJarByClass(DictionaryDriver.class);
              job.setMapperClass(DictionaryMapper.class);
              job.setReducerClass(DictionaryReducer.class);

              // setting the output data type classes
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(Text.class);
              job.setInputFormatClass(KeyValueTextInputFormat.class);
              // to accept the hdfs input and outpur dir at run time
              FileInputFormat.addInputPath(job, new Path(args[0]));
              FileOutputFormat.setOutputPath(job, new Path(args[1]));

              return job.waitForCompletion(true) ? 0 : 1;
       }

       public static void main(String[] args) throws Exception {
              int res = ToolRunner.run(new Configuration(), new DictionaryDriver(),
                           args);
              System.exit(res);
       }
}

Mapper:
package dictionary;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DictionaryMapper extends Mapper<Text, Text, Text, Text> {
       private Text word = new Text();

       public void map(Text key, Text value, Context context) throws IOException,
                     InterruptedException {
              StringTokenizer itr = new StringTokenizer(value.toString(), ",");
              while (itr.hasMoreTokens()) {
                     word.set(itr.nextToken());
                     context.write(key, word);
              }
       }
}

Reducer:

package com.dictionary;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public  class DictionaryReducer
extends Reducer<Text,Text,Text,Text>
{
    private Text result = new Text();
    public void reduce(Text key, Iterable<Text> values,
    Context context
    throws IOException, InterruptedException
    {
        String translations = "";
        for (Text val : values)
        {
            translations += "|"+val.toString();
        }
        result.set(translations);
        context.write(key, result);
    }
}


In this example,keyvalueinputformat is set using below code
job.setInputFormatClass(KeyValueTextInputFormat.class);

The delimiter to separate the input data is
conf.set("key.value.separator.in.input.line"", ");


Output:

Leave a Reply

Subscribe to Posts | Subscribe to Comments

- Copyright © Technical Articles - Skyblue - Powered by Blogger - Designed by Johanes Djogan -