- Back to Home »
- Custom Hadoop Writable Data Type
Posted by : Sushanth
Thursday, 24 December 2015
Implementing a Custom Hadoop Writable Data Type
There can be use cases where none of the built-in data types matches the requirements or a custom data type optimized for a use case may perform better than a Hadoop built-in data type. In such scenarios a custom Writable data type can be written by implementing
the org.apache.hadoop.io.Writable interface to define the serialization format of
your data type.
The Writable interface-based types can be used as value types in Hadoop MapReduce computations.
In this recipe, we implement a sample Hadoop Writable data type for HTTP server log
entries. For the purpose of this sample, we consider that a log entry consists of the five
fields—request host, timestamp, request URL, response size, and the http status code.
The following is a sample log entry:
192.168.0.2 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/
HTTP/1.0" 200 6245
Program:
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class CustomDataType {
public static class LogProcessMap extends MapReduceBase implements
Mapper<LongWritable, Text, Text, LogWritable> {
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, LogWritable> output, Reporter reporter)
throws IOException {
}
}
public static class LogProcessReduce extends MapReduceBase implements
Reducer<Text, LogWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<LogWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
}
}
public static void main(String[] args) throws Exception {
JobConf newconf = new JobConf(CustomDataType.class);
newconf.setJobName("Custom Data Type");
newconf.setOutputKeyClass(Text.class);
newconf.setOutputValueClass(IntWritable.class);
newconf.setMapperClass(LogProcessMap.class);
newconf.setReducerClass(LogProcessReduce.class);
newconf.setMapOutputKeyClass(Text.class);
newconf.setMapOutputValueClass(LogWritable.class);
newconf.setInputFormat(TextInputFormat.class);
newconf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(newconf, new Path(args[0]));
FileOutputFormat.setOutputPath(newconf, new Path(args[1]));
JobClient.runJob(newconf);
}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
public class LogWritable implements Writable{
private Text userIP, timestamp, request;
private IntWritable responseSize, status;
public LogWritable()
{
this.userIP = new org.apache.hadoop.io.Text();
this.userIP = new Text();
this.timestamp= new Text();
this.request = new Text();
this.responseSize = new IntWritable();
this.status = new IntWritable();
}
public LogWritable(Text userIP,Text timestamp,Text request,IntWritable responseSize,IntWritable status)
{
this.userIP = userIP;
this.timestamp = timestamp;
this.request = request;
this.responseSize = responseSize;
this.status = status;
}
@Override
public void readFields(DataInput in) throws IOException {
userIP.readFields(in);
timestamp.readFields(in);
request.readFields(in);
responseSize.readFields(in);
status.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
userIP.write(out);
timestamp.write(out);
request.write(out);
responseSize.write(out);
status.write(out);
}
}