Posted by : Sushanth Thursday 24 December 2015


Input Data:
"0195153448";"Classical Mythology";"Mark P. O. Morford";"2002";"Oxford University Press";"http://images.amazon.com/images/P/0195153448.01.THUMBZZZ.jpg";"http://images.amazon.com/images/P/0195153448.01.MZZZZZZZ.jpg";"http://images.amazon.com/images/P/0195153448.01.LZZZZZZZ.jpg"
"0002005018";"Clara Callan";"Richard Bruce Wright";"2001";"HarperFlamingo Canada";"http://images.amazon.com/images/P/0002005018.01.THUMBZZZ.jpg";"http://images.amazon.com/images/P/0002005018.01.MZZZZZZZ.jpg";http://images.amazon.com/images/P/0002005018.01.LZZZZZZZ.jpg

Program:

package Books;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
//import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class BooksFrequency
{
       public static class BooksXMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
       {
              @Override
              public void map(LongWritable key, Text value,
                           OutputCollector<Text, IntWritable> output, Reporter reporter)
                           throws IOException {
                     String temp = value.toString();
                     String[] SingleBookdata = temp.split("\";\"");
                     output.collect(new Text(SingleBookdata[3]),new IntWritable(1));
              }
       }
      
       public static class BooksXReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
       {
              @Override
              public void reduce(Text key, Iterator<IntWritable> values,
                           OutputCollector<Text, IntWritable> output, Reporter reporter)
                           throws IOException {
                     int bookfreq = 0;
                    
                     while(values.hasNext())
                     {
                           IntWritable value = (IntWritable)values.next();
                           bookfreq = bookfreq + value.get();
                     }
                     output.collect(key, new IntWritable(bookfreq));
              }
       }

       public static void main(String[] args) throws Exception
       {
              JobConf newconf = new JobConf();
              newconf.setJarByClass(BooksFrequency.class);
              newconf.setJobName("bookfrequency");
             
              newconf.setOutputKeyClass(Text.class);
              newconf.setOutputValueClass(IntWritable.class);
             
              newconf.setMapperClass(BooksXMapper.class);
              newconf.setReducerClass(BooksXReducer.class);
              //newconf.setCombinerClass(BooksXReducer.class);
             
              newconf.setInputFormat(TextInputFormat.class);
              newconf.setOutputFormat(TextOutputFormat.class);
             
              FileInputFormat.setInputPaths(newconf, new Path(args[0]));
              FileOutputFormat.setOutputPath(newconf, new Path(args[1]));

              JobClient.runJob(newconf);
       }
}

Output:

2002 – 1
2001 – 1


Leave a Reply

Subscribe to Posts | Subscribe to Comments

- Copyright © Technical Articles - Skyblue - Powered by Blogger - Designed by Johanes Djogan -