Posted by : Sushanth Thursday 24 December 2015



 Implementing a Custom Hadoop Writable Data Type

There can be use cases where none of the built-in data types matches the requirements or a custom data type optimized for a  use case may perform better than a Hadoop built-in data type. In such scenarios a custom Writable data type can be written by implementing
the org.apache.hadoop.io.Writable interface to define the serialization format of
your data type.

The Writable interface-based types can be used as value types in Hadoop MapReduce computations.

In this recipe, we implement a sample Hadoop Writable data type for HTTP server log
entries. For the purpose of this sample, we consider that a log entry consists of the five
fields—request host, timestamp, request URL, response size, and the http status code.

The following is a sample log entry:
192.168.0.2 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/
HTTP/1.0" 200 6245


Program:

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class CustomDataType {

       public static class LogProcessMap extends MapReduceBase implements
                     Mapper<LongWritable, Text, Text, LogWritable> {

              @Override
              public void map(LongWritable key, Text value,
                           OutputCollector<Text, LogWritable> output, Reporter reporter)
                           throws IOException {

              }
       }

       public static class LogProcessReduce extends MapReduceBase implements
                     Reducer<Text, LogWritable, Text, IntWritable> {

              @Override
              public void reduce(Text key, Iterator<LogWritable> values,
                           OutputCollector<Text, IntWritable> output, Reporter reporter)
                           throws IOException {

              }
       }

       public static void main(String[] args) throws Exception {
              JobConf newconf = new JobConf(CustomDataType.class);
              newconf.setJobName("Custom Data Type");

              newconf.setOutputKeyClass(Text.class);
              newconf.setOutputValueClass(IntWritable.class);

              newconf.setMapperClass(LogProcessMap.class);
              newconf.setReducerClass(LogProcessReduce.class);
             
              newconf.setMapOutputKeyClass(Text.class);
              newconf.setMapOutputValueClass(LogWritable.class);

              newconf.setInputFormat(TextInputFormat.class);
              newconf.setOutputFormat(TextOutputFormat.class);

              FileInputFormat.setInputPaths(newconf, new Path(args[0]));
              FileOutputFormat.setOutputPath(newconf, new Path(args[1]));

              JobClient.runJob(newconf);
       }

}

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;


public class LogWritable implements Writable{
      
       private Text userIPtimestamprequest;
       private IntWritable responseSizestatus;
      
       public LogWritable()
       {
              this.userIP = new org.apache.hadoop.io.Text();
              this.userIP = new Text();
              this.timestampnew Text();
              this.request = new Text();
              this.responseSize = new IntWritable();
              this.status = new IntWritable();
       }
      
       public LogWritable(Text userIP,Text timestamp,Text request,IntWritable responseSize,IntWritable status)
       {
              this.userIP = userIP;
              this.timestamp = timestamp;
              this.request = request;
              this.responseSize = responseSize;
              this.status = status;
       }

       @Override
       public void readFields(DataInput in) throws IOException {
              userIP.readFields(in);
              timestamp.readFields(in);
              request.readFields(in);
              responseSize.readFields(in);
              status.readFields(in);
       }

       @Override
       public void write(DataOutput out) throws IOException {
              userIP.write(out);
              timestamp.write(out);
              request.write(out);
              responseSize.write(out);
              status.write(out);
       }

}

Leave a Reply

Subscribe to Posts | Subscribe to Comments

- Copyright © Technical Articles - Skyblue - Powered by Blogger - Designed by Johanes Djogan -