blob: 62558106d4f9f7bd188e012493ae2db1f3960c24 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.examples.mroperator;
import java.io.IOException;
import java.util.Calendar;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* <p>LogCountsPerHour class.</p>
*
* @since 0.9.0
*/
public class LogCountsPerHour extends Configured implements Tool
{
public static class LogMapClass extends MapReduceBase
implements Mapper<LongWritable, Text, DateWritable, IntWritable>
{
private DateWritable date = new DateWritable();
private static final IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException
{
// Get the value as a String; it is of the format:
// 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
String text = value.toString();
// Get the date and time
int openBracket = text.indexOf('[');
int closeBracket = text.indexOf(']');
if (openBracket != -1 && closeBracket != -1) {
// Read the date
String dateString = text.substring(text.indexOf('[') + 1, text.indexOf(']'));
// Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500
int index = 0;
int nextIndex = dateString.indexOf('/');
int day = Integer.parseInt(dateString.substring(index, nextIndex));
index = nextIndex;
nextIndex = dateString.indexOf('/', index + 1);
String month = dateString.substring(index + 1, nextIndex);
index = nextIndex;
nextIndex = dateString.indexOf(':', index);
int year = Integer.parseInt(dateString.substring(index + 1, nextIndex));
index = nextIndex;
nextIndex = dateString.indexOf(':', index + 1);
int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex));
// Build a calendar object for this date
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.DATE, day);
calendar.set(Calendar.YEAR, year);
calendar.set(Calendar.HOUR, hour);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
if (month.equalsIgnoreCase("dec")) {
calendar.set(Calendar.MONTH, Calendar.DECEMBER);
} else if (month.equalsIgnoreCase("nov")) {
calendar.set(Calendar.MONTH, Calendar.NOVEMBER);
} else if (month.equalsIgnoreCase("oct")) {
calendar.set(Calendar.MONTH, Calendar.OCTOBER);
} else if (month.equalsIgnoreCase("sep")) {
calendar.set(Calendar.MONTH, Calendar.SEPTEMBER);
} else if (month.equalsIgnoreCase("aug")) {
calendar.set(Calendar.MONTH, Calendar.AUGUST);
} else if (month.equalsIgnoreCase("jul")) {
calendar.set(Calendar.MONTH, Calendar.JULY);
} else if (month.equalsIgnoreCase("jun")) {
calendar.set(Calendar.MONTH, Calendar.JUNE);
} else if (month.equalsIgnoreCase("may")) {
calendar.set(Calendar.MONTH, Calendar.MAY);
} else if (month.equalsIgnoreCase("apr")) {
calendar.set(Calendar.MONTH, Calendar.APRIL);
} else if (month.equalsIgnoreCase("mar")) {
calendar.set(Calendar.MONTH, Calendar.MARCH);
} else if (month.equalsIgnoreCase("feb")) {
calendar.set(Calendar.MONTH, Calendar.FEBRUARY);
} else if (month.equalsIgnoreCase("jan")) {
calendar.set(Calendar.MONTH, Calendar.JANUARY);
}
// Output the date as the key and 1 as the value
date.setDate(calendar.getTime());
output.collect(date, one);
}
}
}
public static class LogReduce extends MapReduceBase
implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable>
{
public void reduce(DateWritable key, Iterator<IntWritable> values, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException
{
// Iterate over all of the values (counts of occurrences of this word)
int count = 0;
while (values.hasNext()) {
// Add the value to our count
count += values.next().get();
}
// Output the word with its count (wrapped in an IntWritable)
output.collect(key, new IntWritable(count));
}
}
public int run(String[] args) throws Exception
{
// Create a configuration
Configuration conf = getConf();
// Create a job from the default configuration that will use the WordCount class
JobConf job = new JobConf(conf, LogCountsPerHour.class);
// Define our input path as the first command line argument and our output path as the second
Path in = new Path(args[0]);
Path out = new Path(args[1]);
// Create File Input/Output formats for these paths (in the job)
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
// Configure the job: name, mapper, reducer, and combiner
job.setJobName("LogAveragePerHour");
job.setMapperClass(LogMapClass.class);
job.setReducerClass(LogReduce.class);
job.setCombinerClass(LogReduce.class);
// Configure the output
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(DateWritable.class);
job.setOutputValueClass(IntWritable.class);
// Run the job
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception
{
// Start the LogCountsPerHour MapReduce application
int res = ToolRunner.run(new Configuration(), new LogCountsPerHour(), args);
System.exit(res);
}
}