Posted by : Sushanth Thursday, 24 December 2015


Input:
NASDAQ,XYZ,2/8/10,8.99,8.99,8.40,8.66,78800,8.66
NASDAQ,XYZ,2/5/10,8.55,8.56,8.43,8.47,78800,8.47
NASDAQ,XYZ,2/4/10,8.46,8.49,8.22,8.35,78800,8.35
NASDAQ,XYZ,2/3/10,8.86,8.96,8.46,8.68,78800,8.68
NASDAQ,XYZ,2/2/10,8.21,8.55,8.19,8.39,78800,8.39
NASDAQ,XYZ,2/1/10,8.90,8.92,8.47,8.54,78800,8.54
NASDAQ,XYZ,1/29/10,8.22,8.23,8.10,8.15,78800,8.15 
NYSE,XYZ,2/8/10,8.99,8.99,8.40,8.66,78800,8.66
NYSE,XYZ,2/5/10,8.55,8.56,8.43,8.47,78800,8.47
NYSE,XYZ,2/4/10,8.46,8.49,8.22,8.35,78800,8.35
NYSE,XYZ,2/3/10,8.86,8.96,8.46,8.68,78800,8.68
NYSE,XYZ,2/2/10,8.21,8.55,8.19,8.39,78800,8.39
NYSE,XYZ,2/1/10,8.90,8.92,8.47,8.54,78800,8.54
NYSE,XYZ,1/29/10,8.22,8.23,8.10,8.15,78800,8.15
AMEX,XYZ,2/8/10,8.99,8.99,8.40,8.66,78800,8.66
AMEX,XYZ,2/5/10,8.55,8.56,8.43,8.47,78800,8.47
AMEX,XYZ,2/4/10,8.46,8.49,8.22,8.35,78800,8.35
AMEX,XYZ,2/3/10,8.86,8.96,8.46,8.68,78800,8.68
AMEX,XYZ,2/2/10,8.21,8.55,8.19,8.39,78800,8.39
AMEX,XYZ,2/1/10,8.90,8.92,8.47,8.54,78800,8.54
AMEX,XYZ,1/29/10,8.22,8.23,8.10,8.15,78800,8.15

Program:

Driver:
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 Hadoop MapReduce example showing high and low for a day across all stock symbols
 *
 */
public class HighLowDayDriver extends Configured implements Tool {

       @Override
       public int run(String[] args) throws Exception {
              String input, output;
              if (args.length == 2) {
                     input = args[0];
                     output = args[1];
              else {
                     System.err.println("Incorrect number of arguments.  Expected: input output");
                     return -1;
              }

              Job job = new Job(getConf());
              job.setJarByClass(HighLowDayDriver.class);
              job.setJobName("High Low per Day");

              FileInputFormat.setInputPaths(job, new Path(input));
              FileOutputFormat.setOutputPath(job, new Path(output));

              job.setMapperClass(HighLowDayMapper.class);
              job.setReducerClass(HighLowDayReducer.class);

              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(DoubleWritable.class);

              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(Text.class);

              boolean success = job.waitForCompletion(true);
              return success ? 0 : 1;
       }

       public static void main(String[] args) throws Exception {
              HighLowDayDriver driver = new HighLowDayDriver();
              int exitCode = ToolRunner.run(driver, args);
              System.exit(exitCode);
       }
}

Mapper:

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Hadoop MapReduce example showing high and low for a day across all stock symbols
 *
 */
public class HighLowDayMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
                /**
                 * Expected input:<br>
                 *
                 * <pre>
                 * exchange,stock_symbol,date,stock_price_open,stock_price_high,stock_price_low,stock_price_close,stock_volume,stock_price_adj_close
                 * NASDAQ,XING,2010-02-08,1.73,1.76,1.71,1.73,147400,1.73
                 * </pre>
                 */
                @Override
                public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                                String inputLine = value.toString();
                               
                                if (inputLine.startsWith("exchange,")) {
                                                // Line is the header, ignore it
                                                return;
                                }
                               
                                String[] columns = inputLine.split(",");
                               
                                if (columns.length != 9) {
                                                // Line isn't the correct number of columns or formatted properly
                                                return;
                                }

                                // You'd normally represent money as a BigDecimal, but we're using doubles
                                // to make things easier to understand
                                double close = Double.parseDouble(columns[6]);
                                context.write(new Text(columns[2]), new DoubleWritable(close));
                }
}

Reducer:

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Hadoop MapReduce example showing high and low for a day across all stock symbols
 *
 */
public class HighLowDayReducer extends Reducer<Text, DoubleWritable, Text, Text> {

                @Override
                public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException,
                                                InterruptedException {
                                double high = 0;
                                double low = Double.MAX_VALUE;

                                // Go through all values to find the high and low
                                for (DoubleWritable value : values) {
                                                if (value.get() > high) {
                                                                high = value.get();
                                                }
                                               
                                                if (value.get() < low) {
                                                                low = value.get();
                                                }
                                }

                                Text value = new Text("High:" + high + " Low:" + low);
                               
                                context.write(key, value);
                }
}

Output :

Leave a Reply

Subscribe to Posts | Subscribe to Comments

- Copyright © Technical Articles - Skyblue - Powered by Blogger - Designed by Johanes Djogan -