* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package com.datatorrent.demos.mroperator;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import java.util.Calendar;
import java.util.Iterator;
* <p>LogCountsPerHour class.</p>
* @since 0.9.0
public class LogCountsPerHour extends Configured implements Tool {
public static class LogMapClass extends MapReduceBase
implements Mapper<LongWritable, Text, DateWritable, IntWritable>
private DateWritable date = new DateWritable();
private final static IntWritable one = new IntWritable( 1 );
public void map( LongWritable key, // Offset into the file
Text value,
OutputCollector<DateWritable, IntWritable> output,
Reporter reporter) throws IOException
// Get the value as a String; it is of the format:
// - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +"
String text = value.toString();
// Get the date and time
int openBracket = text.indexOf( '[' );
int closeBracket = text.indexOf( ']' );
if( openBracket != -1 && closeBracket != -1 )
// Read the date
String dateString = text.substring( text.indexOf( '[' ) + 1, text.indexOf( ']' ) );
// Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500
int index = 0;
int nextIndex = dateString.indexOf( '/' );
int day = Integer.parseInt( dateString.substring(index, nextIndex) );
index = nextIndex;
nextIndex = dateString.indexOf( '/', index+1 );
String month = dateString.substring( index+1, nextIndex );
index = nextIndex;
nextIndex = dateString.indexOf( ':', index );
int year = Integer.parseInt(dateString.substring(index + 1, nextIndex));
index = nextIndex;
nextIndex = dateString.indexOf( ':', index+1 );
int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex));
// Build a calendar object for this date
Calendar calendar = Calendar.getInstance();
calendar.set( Calendar.DATE, day );
calendar.set( Calendar.YEAR, year );
calendar.set( Calendar.HOUR, hour );
calendar.set( Calendar.MINUTE, 0 );
calendar.set( Calendar.SECOND, 0 );
calendar.set( Calendar.MILLISECOND, 0 );
if( month.equalsIgnoreCase( "dec" ) )
calendar.set( Calendar.MONTH, Calendar.DECEMBER );
else if( month.equalsIgnoreCase( "nov" ) )
calendar.set( Calendar.MONTH, Calendar.NOVEMBER );
else if( month.equalsIgnoreCase( "oct" ) )
calendar.set( Calendar.MONTH, Calendar.OCTOBER );
else if( month.equalsIgnoreCase( "sep" ) )
calendar.set( Calendar.MONTH, Calendar.SEPTEMBER );
else if( month.equalsIgnoreCase( "aug" ) )
calendar.set( Calendar.MONTH, Calendar.AUGUST );
else if( month.equalsIgnoreCase( "jul" ) )
calendar.set( Calendar.MONTH, Calendar.JULY );
else if( month.equalsIgnoreCase( "jun" ) )
calendar.set( Calendar.MONTH, Calendar.JUNE );
else if( month.equalsIgnoreCase( "may" ) )
calendar.set( Calendar.MONTH, Calendar.MAY );
else if( month.equalsIgnoreCase( "apr" ) )
calendar.set( Calendar.MONTH, Calendar.APRIL );
else if( month.equalsIgnoreCase( "mar" ) )
calendar.set( Calendar.MONTH, Calendar.MARCH );
else if( month.equalsIgnoreCase( "feb" ) )
calendar.set( Calendar.MONTH, Calendar.FEBRUARY );
else if( month.equalsIgnoreCase( "jan" ) )
calendar.set( Calendar.MONTH, Calendar.JANUARY );
// Output the date as the key and 1 as the value
date.setDate( calendar.getTime() );
output.collect(date, one);
public static class LogReduce extends MapReduceBase
implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable>
public void reduce( DateWritable key, Iterator<IntWritable> values,
OutputCollector<DateWritable, IntWritable> output,
Reporter reporter) throws IOException
// Iterate over all of the values (counts of occurrences of this word)
int count = 0;
while( values.hasNext() )
// Add the value to our count
count +=;
// Output the word with its count (wrapped in an IntWritable)
output.collect( key, new IntWritable( count ) );
public int run(String[] args) throws Exception
// Create a configuration
Configuration conf = getConf();
// Create a job from the default configuration that will use the WordCount class
JobConf job = new JobConf( conf, LogCountsPerHour.class );
// Define our input path as the first command line argument and our output path as the second
Path in = new Path( args[0] );
Path out = new Path( args[1] );
// Create File Input/Output formats for these paths (in the job)
FileInputFormat.setInputPaths( job, in );
FileOutputFormat.setOutputPath( job, out );
// Configure the job: name, mapper, reducer, and combiner
job.setJobName( "LogAveragePerHour" );
job.setMapperClass( LogMapClass.class );
job.setReducerClass( LogReduce.class );
job.setCombinerClass( LogReduce.class );
// Configure the output
job.setOutputFormat( TextOutputFormat.class );
job.setOutputKeyClass( DateWritable.class );
job.setOutputValueClass( IntWritable.class );
// Run the job
return 0;
public static void main(String[] args) throws Exception
// Start the LogCountsPerHour MapReduce application
int res = new Configuration(),
new LogCountsPerHour(),
args );
System.exit( res );