blob: 868e38a64f524162a4f9074842920ea801407c99 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.examples;
import java.io.Serializable;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.crunch.CombineFn;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@SuppressWarnings("serial")
public class AverageBytesByIP extends Configured implements Tool, Serializable {
static enum COUNTERS {
NO_MATCH,
CORRUPT_SIZE
}
static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println();
System.err.println("Two and only two arguments are accepted.");
System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output");
System.err.println();
GenericOptionsParser.printGenericCommandUsage(System.err);
return 1;
}
// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(AverageBytesByIP.class, getConf());
// Reference a given text file as a collection of Strings.
PCollection<String> lines = pipeline.readTextFile(args[0]);
// Combiner used for summing up response size and count
CombineFn<String, Pair<Long, Long>> stringPairOfLongsSumCombiner = CombineFn.pairAggregator(CombineFn.SUM_LONGS,
CombineFn.SUM_LONGS);
// Table of (ip, sum(response size), count)
PTable<String, Pair<Long, Long>> remoteAddrResponseSize = lines
.parallelDo(extractResponseSize,
Writables.tableOf(Writables.strings(), Writables.pairs(Writables.longs(), Writables.longs()))).groupByKey()
.combineValues(stringPairOfLongsSumCombiner);
// Calculate average response size by ip address
PTable<String, Double> avgs = remoteAddrResponseSize.parallelDo(calulateAverage,
Writables.tableOf(Writables.strings(), Writables.doubles()));
// write the result to a text file
pipeline.writeTextFile(avgs, args[1]);
// Execute the pipeline as a MapReduce.
pipeline.done();
return 0;
}
// Function to calculate the average response size for a given ip address
//
// Input: (ip, sum(response size), count)
// Output: (ip, average response size)
MapFn<Pair<String, Pair<Long, Long>>, Pair<String, Double>> calulateAverage = new MapFn<Pair<String, Pair<Long, Long>>, Pair<String, Double>>() {
@Override
public Pair<String, Double> map(Pair<String, Pair<Long, Long>> arg) {
Pair<Long, Long> sumCount = arg.second();
double avg = 0;
if (sumCount.second() > 0) {
avg = (double) sumCount.first() / (double) sumCount.second();
}
return Pair.of(arg.first(), avg);
}
};
// Function to parse apache log records
// Given a standard apache log line, extract the ip address and
// response size. Outputs ip and the response size and a count (1) so that
// a combiner can be used.
//
// Input: 55.1.3.2 ...... 200 512 ....
// Output: (55.1.3.2, (512, 1))
DoFn<String, Pair<String, Pair<Long, Long>>> extractResponseSize = new DoFn<String, Pair<String, Pair<Long, Long>>>() {
transient Pattern pattern;
public void initialize() {
pattern = Pattern.compile(logRegex);
}
public void process(String line, Emitter<Pair<String, Pair<Long, Long>>> emitter) {
Matcher matcher = pattern.matcher(line);
if (matcher.matches()) {
try {
Long responseSize = Long.parseLong(matcher.group(7));
Pair<Long, Long> sumCount = Pair.of(responseSize, 1L);
String remoteAddr = matcher.group(1);
emitter.emit(Pair.of(remoteAddr, sumCount));
} catch (NumberFormatException e) {
this.getCounter(COUNTERS.CORRUPT_SIZE).increment(1);
}
} else {
this.getCounter(COUNTERS.NO_MATCH).increment(1);
}
}
};
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new AverageBytesByIP(), args);
}
}