| /* |
| * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.datatorrent.demos.mroperator; |
| |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.IntWritable; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.mapred.Mapper; |
| import org.apache.hadoop.mapred.Reducer; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.MapReduceBase; |
| import org.apache.hadoop.mapred.OutputCollector; |
| import org.apache.hadoop.mapred.Reporter; |
| import org.apache.hadoop.mapred.TextOutputFormat; |
| import org.apache.hadoop.mapred.JobClient; |
| import org.apache.hadoop.mapred.FileInputFormat; |
| import org.apache.hadoop.mapred.FileOutputFormat; |
| |
| import java.io.IOException; |
| import java.util.Calendar; |
| import java.util.Iterator; |
| |
| /** |
| * <p>LogCountsPerHour class.</p> |
| * |
| * @since 0.9.0 |
| */ |
| public class LogCountsPerHour extends Configured implements Tool { |
| |
| public static class LogMapClass extends MapReduceBase |
| implements Mapper<LongWritable, Text, DateWritable, IntWritable> |
| { |
| private DateWritable date = new DateWritable(); |
| private final static IntWritable one = new IntWritable( 1 ); |
| |
| public void map( LongWritable key, // Offset into the file |
| Text value, |
| OutputCollector<DateWritable, IntWritable> output, |
| Reporter reporter) throws IOException |
| { |
| // Get the value as a String; it is of the format: |
| // 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)" |
| String text = value.toString(); |
| |
| // Get the date and time |
| int openBracket = text.indexOf( '[' ); |
| int closeBracket = text.indexOf( ']' ); |
| if( openBracket != -1 && closeBracket != -1 ) |
| { |
| // Read the date |
| String dateString = text.substring( text.indexOf( '[' ) + 1, text.indexOf( ']' ) ); |
| |
| // Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500 |
| int index = 0; |
| int nextIndex = dateString.indexOf( '/' ); |
| int day = Integer.parseInt( dateString.substring(index, nextIndex) ); |
| |
| index = nextIndex; |
| nextIndex = dateString.indexOf( '/', index+1 ); |
| String month = dateString.substring( index+1, nextIndex ); |
| |
| index = nextIndex; |
| nextIndex = dateString.indexOf( ':', index ); |
| int year = Integer.parseInt(dateString.substring(index + 1, nextIndex)); |
| |
| index = nextIndex; |
| nextIndex = dateString.indexOf( ':', index+1 ); |
| int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex)); |
| |
| // Build a calendar object for this date |
| Calendar calendar = Calendar.getInstance(); |
| calendar.set( Calendar.DATE, day ); |
| calendar.set( Calendar.YEAR, year ); |
| calendar.set( Calendar.HOUR, hour ); |
| calendar.set( Calendar.MINUTE, 0 ); |
| calendar.set( Calendar.SECOND, 0 ); |
| calendar.set( Calendar.MILLISECOND, 0 ); |
| |
| if( month.equalsIgnoreCase( "dec" ) ) |
| { |
| calendar.set( Calendar.MONTH, Calendar.DECEMBER ); |
| } |
| else if( month.equalsIgnoreCase( "nov" ) ) |
| { |
| calendar.set( Calendar.MONTH, Calendar.NOVEMBER ); |
| } |
| else if( month.equalsIgnoreCase( "oct" ) ) |
| { |
| calendar.set( Calendar.MONTH, Calendar.OCTOBER ); |
| } |
| else if( month.equalsIgnoreCase( "sep" ) ) |
| { |
| calendar.set( Calendar.MONTH, Calendar.SEPTEMBER ); |
| } |
| else if( month.equalsIgnoreCase( "aug" ) ) |
| { |
| calendar.set( Calendar.MONTH, Calendar.AUGUST ); |
| } |
| else if( month.equalsIgnoreCase( "jul" ) ) |
| { |
| calendar.set( Calendar.MONTH, Calendar.JULY ); |
| } |
| else if( month.equalsIgnoreCase( "jun" ) ) |
| { |
| calendar.set( Calendar.MONTH, Calendar.JUNE ); |
| } |
| else if( month.equalsIgnoreCase( "may" ) ) |
| { |
| calendar.set( Calendar.MONTH, Calendar.MAY ); |
| } |
| else if( month.equalsIgnoreCase( "apr" ) ) |
| { |
| calendar.set( Calendar.MONTH, Calendar.APRIL ); |
| } |
| else if( month.equalsIgnoreCase( "mar" ) ) |
| { |
| calendar.set( Calendar.MONTH, Calendar.MARCH ); |
| } |
| else if( month.equalsIgnoreCase( "feb" ) ) |
| { |
| calendar.set( Calendar.MONTH, Calendar.FEBRUARY ); |
| } |
| else if( month.equalsIgnoreCase( "jan" ) ) |
| { |
| calendar.set( Calendar.MONTH, Calendar.JANUARY ); |
| } |
| |
| |
| // Output the date as the key and 1 as the value |
| date.setDate( calendar.getTime() ); |
| output.collect(date, one); |
| } |
| } |
| } |
| |
| public static class LogReduce extends MapReduceBase |
| implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable> |
| { |
| public void reduce( DateWritable key, Iterator<IntWritable> values, |
| OutputCollector<DateWritable, IntWritable> output, |
| Reporter reporter) throws IOException |
| { |
| // Iterate over all of the values (counts of occurrences of this word) |
| int count = 0; |
| while( values.hasNext() ) |
| { |
| // Add the value to our count |
| count += values.next().get(); |
| } |
| |
| // Output the word with its count (wrapped in an IntWritable) |
| output.collect( key, new IntWritable( count ) ); |
| } |
| } |
| |
| |
| public int run(String[] args) throws Exception |
| { |
| // Create a configuration |
| Configuration conf = getConf(); |
| |
| // Create a job from the default configuration that will use the WordCount class |
| JobConf job = new JobConf( conf, LogCountsPerHour.class ); |
| |
| // Define our input path as the first command line argument and our output path as the second |
| Path in = new Path( args[0] ); |
| Path out = new Path( args[1] ); |
| |
| // Create File Input/Output formats for these paths (in the job) |
| FileInputFormat.setInputPaths( job, in ); |
| FileOutputFormat.setOutputPath( job, out ); |
| |
| // Configure the job: name, mapper, reducer, and combiner |
| job.setJobName( "LogAveragePerHour" ); |
| job.setMapperClass( LogMapClass.class ); |
| job.setReducerClass( LogReduce.class ); |
| job.setCombinerClass( LogReduce.class ); |
| |
| // Configure the output |
| job.setOutputFormat( TextOutputFormat.class ); |
| job.setOutputKeyClass( DateWritable.class ); |
| job.setOutputValueClass( IntWritable.class ); |
| |
| // Run the job |
| JobClient.runJob(job); |
| return 0; |
| } |
| |
| public static void main(String[] args) throws Exception |
| { |
| // Start the LogCountsPerHour MapReduce application |
| int res = ToolRunner.run( new Configuration(), |
| new LogCountsPerHour(), |
| args ); |
| System.exit( res ); |
| } |
| } |