Thursday, 24 December 2015

Custom Hadoop Writable Data Type

 Implementing a Custom Hadoop Writable Data Type

There can be use cases where none of the built-in data types matches the requirements or a custom data type optimized for a  use case may perform better than a Hadoop built-in data type. In such scenarios a custom Writable data type can be written by implementing
the org.apache.hadoop.io.Writable interface to define the serialization format of
your data type.

The Writable interface-based types can be used as value types in Hadoop MapReduce computations.

In this recipe, we implement a sample Hadoop Writable data type for HTTP server log
entries. For the purpose of this sample, we consider that a log entry consists of the five
fields—request host, timestamp, request URL, response size, and the http status code.

The following is a sample log entry: - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/
HTTP/1.0" 200 6245


import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class CustomDataType {

       public static class LogProcessMap extends MapReduceBase implements
                     Mapper<LongWritable, Text, Text, LogWritable> {

              public void map(LongWritable key, Text value,
                           OutputCollector<Text, LogWritable> output, Reporter reporter)
                           throws IOException {


       public static class LogProcessReduce extends MapReduceBase implements
                     Reducer<Text, LogWritable, Text, IntWritable> {

              public void reduce(Text key, Iterator<LogWritable> values,
                           OutputCollector<Text, IntWritable> output, Reporter reporter)
                           throws IOException {


       public static void main(String[] args) throws Exception {
              JobConf newconf = new JobConf(CustomDataType.class);
              newconf.setJobName("Custom Data Type");




              FileInputFormat.setInputPaths(newconf, new Path(args[0]));
              FileOutputFormat.setOutputPath(newconf, new Path(args[1]));



import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;

public class LogWritable implements Writable{
       private Text userIPtimestamprequest;
       private IntWritable responseSizestatus;
       public LogWritable()
              this.userIP = new org.apache.hadoop.io.Text();
              this.userIP = new Text();
              this.timestampnew Text();
              this.request = new Text();
              this.responseSize = new IntWritable();
              this.status = new IntWritable();
       public LogWritable(Text userIP,Text timestamp,Text request,IntWritable responseSize,IntWritable status)
              this.userIP = userIP;
              this.timestamp = timestamp;
              this.request = request;
              this.responseSize = responseSize;
              this.status = status;

       public void readFields(DataInput in) throws IOException {

       public void write(DataOutput out) throws IOException {



Post a comment