- Back to Home »
- Showing high and low for a day across all stock symbols
Posted by : Sushanth
Thursday, 24 December 2015
Input:
NASDAQ,XYZ,2/8/10,8.99,8.99,8.40,8.66,78800,8.66
NASDAQ,XYZ,2/5/10,8.55,8.56,8.43,8.47,78800,8.47
NASDAQ,XYZ,2/4/10,8.46,8.49,8.22,8.35,78800,8.35
NASDAQ,XYZ,2/3/10,8.86,8.96,8.46,8.68,78800,8.68
NASDAQ,XYZ,2/2/10,8.21,8.55,8.19,8.39,78800,8.39
NASDAQ,XYZ,2/1/10,8.90,8.92,8.47,8.54,78800,8.54
NASDAQ,XYZ,1/29/10,8.22,8.23,8.10,8.15,78800,8.15
NYSE,XYZ,2/8/10,8.99,8.99,8.40,8.66,78800,8.66
NYSE,XYZ,2/5/10,8.55,8.56,8.43,8.47,78800,8.47
NYSE,XYZ,2/4/10,8.46,8.49,8.22,8.35,78800,8.35
NYSE,XYZ,2/3/10,8.86,8.96,8.46,8.68,78800,8.68
NYSE,XYZ,2/2/10,8.21,8.55,8.19,8.39,78800,8.39
NYSE,XYZ,2/1/10,8.90,8.92,8.47,8.54,78800,8.54
NYSE,XYZ,1/29/10,8.22,8.23,8.10,8.15,78800,8.15
AMEX,XYZ,2/8/10,8.99,8.99,8.40,8.66,78800,8.66
AMEX,XYZ,2/5/10,8.55,8.56,8.43,8.47,78800,8.47
AMEX,XYZ,2/4/10,8.46,8.49,8.22,8.35,78800,8.35
AMEX,XYZ,2/3/10,8.86,8.96,8.46,8.68,78800,8.68
AMEX,XYZ,2/2/10,8.21,8.55,8.19,8.39,78800,8.39
AMEX,XYZ,2/1/10,8.90,8.92,8.47,8.54,78800,8.54
Program:
Driver:
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* Hadoop MapReduce example showing high and low for a day across all stock symbols
*
*/
public class HighLowDayDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
String input, output;
if (args.length == 2) {
input = args[0];
output = args[1];
} else {
System.err.println("Incorrect number of arguments. Expected: input output");
return -1;
}
Job job = new Job(getConf());
job.setJarByClass(HighLowDayDriver.class);
job.setJobName("High Low per Day");
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
job.setMapperClass(HighLowDayMapper.class);
job.setReducerClass(HighLowDayReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
HighLowDayDriver driver = new HighLowDayDriver();
int exitCode = ToolRunner.run(driver, args);
System.exit(exitCode);
}
}
Mapper:
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Hadoop MapReduce example showing high and low for a day across all stock symbols
*
*/
public class HighLowDayMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
/**
* Expected input:<br>
*
* <pre>
* exchange,stock_symbol,date,stock_price_open,stock_price_high,stock_price_low,stock_price_close,stock_volume,stock_price_adj_close
* NASDAQ,XING,2010-02-08,1.73,1.76,1.71,1.73,147400,1.73
* </pre>
*/
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String inputLine = value.toString();
if (inputLine.startsWith("exchange,")) {
// Line is the header, ignore it
return;
}
String[] columns = inputLine.split(",");
if (columns.length != 9) {
// Line isn't the correct number of columns or formatted properly
return;
}
// You'd normally represent money as a BigDecimal, but we're using doubles
// to make things easier to understand
double close = Double.parseDouble(columns[6]);
context.write(new Text(columns[2]), new DoubleWritable(close));
}
}
Reducer:
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Hadoop MapReduce example showing high and low for a day across all stock symbols
*
*/
public class HighLowDayReducer extends Reducer<Text, DoubleWritable, Text, Text> {
@Override
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException,
InterruptedException {
double high = 0;
double low = Double.MAX_VALUE;
// Go through all values to find the high and low
for (DoubleWritable value : values) {
if (value.get() > high) {
high = value.get();
}
if (value.get() < low) {
low = value.get();
}
}
Text value = new Text("High:" + high + " Low:" + low);
context.write(key, value);
}
}
Output :